2015年1月26日 星期一

Use Flume-ng With SparkStreaming to HDFS

試了好久的東西~總算打通了!!
為了要將利用Flume擁有很多Protocal的優勢與Spark Straming結合~
因此試著將原本皆可以獨立運作的東西串起來。


1.安裝Flume (yum沒有就自己想辦法= = )
sudo yum install --assumeyes flume-ng

2.設置Config,檔名及位置/etc/hadoop/conf/spark-flumeng.conf
[training@elephant ~]$ sudo vi /etc/hadoop/conf/spark-flumeng.conf
-------------內容----------------------------------------------------
#Agent5
#List the sources, sinks and channels for the agent
agent5.sources =  source1
agent5.sinks =  hdfs01
agent5.channels = channel1

#set channel for sources and sinks
agent5.sources.source1.channels = channel1
agent5.sinks.hdfs01.channel = channel1

#properties of someone source
agent5.sources.source1.type = spooldir
agent5.sources.source1.spoolDir = /home/training/Spooling/
agent5.sources.source1.ignorePattern = .*(\\.index|\\.tmp|\\.xml)$
agent5.sources.source1.fileSuffix = .1
agent5.sources.source1.fileHeader = true
agent5.sources.source1.fileHeaderKey = filename

# set interceptors
agent5.sources.source1.interceptors = i1 i2
agent5.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent5.sources.source1.interceptors.i1.preserveExisting = false
agent5.sources.source1.interceptors.i1.hostHeader = elephant
agent5.sources.source1.interceptors.i1.useIP = false
agent5.sources.source1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

#properties of mem-channel-1
agent5.channels.channel1.type = memory
agent5.channels.channel1.capacity = 100000
agent5.channels.channel1.transactionCapacity = 100000
agent5.channels.channel1.keep-alive = 30

#properties of sink
agent5.sinks.hdfs01.type = avro
agent5.sinks.hdfs01.hostname = elephant
agent5.sinks.hdfs01.port = 11000
其中Flume是採用spooling的方式~
指定spoolDir 為 /home/training/Spooling/


3.接著準備Spark Streaming用來測試的Java Code,產出JavaFlumeEventCount.jar 檔
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;

public final class JavaFlumeEventCount {
    private JavaFlumeEventCount() {
    }

    public static void main(String[] args) {
     if (args.length < 2) {
   System.err.println("Usage: JavaFlumeEventCount <file>");
   System.exit(1);
  }
     
        String host = args[0];
        int port = Integer.parseInt(args[1]);

        Duration batchInterval = new Duration(Integer.parseInt(args[2]));
       //Duration batchInterval = new Duration(10000);
        SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
                batchInterval);
        //JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "elephant", 11000);
        JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc,host , port);
        flumeStream.count();

        flumeStream.count().map(new Function<Long, String>() {
            private static final long serialVersionUID = -572435064083746235L;

            public String call(Long in) {
                return "Received "   in   " flume events.";
            }
        }).print();

        ssc.start();
        ssc.awaitTermination();
    }
}


4.執行Spark Master 跟Spark worker,成功之後一樣在http://elephant:8080 可以看到Master 8081可以看到worker
[training@elephant spark-1.2.0-bin-hadoop2.3]$ sbin/start-master.sh 
starting org.apache.spark.deploy.master.Master, logging to /home/training/software/spark-1.2.0-bin-hadoop2.3/sbin/../logs/spark-training-org.apache.spark.deploy.master.Master-1-elephant.out
[training@elephant spark-1.2.0-bin-hadoop2.3]$ ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://elephant:7077

5.執行Spark ,不過這邊要留意 --master 原本是直接接spark://elephant:7077 改接 local[10] 。請參考官方文件
[training@elephant spark-1.2.0-bin-hadoop2.3]$ ./bin/spark-submit --class JavaFlumeEventTest --jars lib/spark-assembly-1.2.0-hadoop2.3.0.jar,lib/spark-examples-1.2.0-hadoop2.3.0.jar,lib/guava-18.0.jar --master local[10] /home/training/software/JavaFlumeEventTest.jar elephant 11000 10000

6.執行Flume-ng Agent
[training@elephant spark-1.2.0-bin-hadoop2.3]$ flume-ng agent --conf /etc/hadoop/conf --conf-file /etc/hadoop/conf/spark-flumeng.conf --name agent5


7.由於我們指定spoolDir 為 /home/training/Spooling/ 所以只要將檔案丟進該資料夾~
他就會自動將該檔案讀出往Spark送~並且改副檔名為.1



8.接著改寫Java Cdoe 使其將結果寫進HDFS裡面(1/27更新)
加入saveAsNewAPIHadoopFiles("hdfs://elephant:8020/user/training/input/flumeresult/", "txt" ,Text.class,Integer.class, (Class<? extends OutputFormat<?, ?>>) TextOutputFormat.class);



import java.nio.ByteBuffer;

public final class JavaFlumeEventTest {
	private static final Pattern SPACE = Pattern.compile(" ");
  private JavaFlumeEventTest() {
  }

  public static void main(String[] args) {

    String host = args[0];
    int port = Integer.parseInt(args[1]);

    Duration batchInterval = new Duration(Integer.parseInt(args[2]));
    SparkConf sparkConf = new SparkConf()
    .setAppName("JavaFlumeEventTest")
    .set("spark.executor.memory", "256m");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
    JavaReceiverInputDStream flumeStream = FlumeUtils.createStream(ssc, host, port);
    JavaDStream
    words = flumeStream.flatMap(new FlatMapFunction(){

		@Override
		public Iterable call(SparkFlumeEvent arg0) throws Exception {
			String body = new String(arg0.event().getBody().array(), Charset.forName("UTF-8"));
			return Lists.newArrayList(SPACE.split(body));
		}
    	
    });
    JavaPairDStream wordCounts = words.mapToPair(
    	      new PairFunction() {
    	        @Override
    	        public Tuple2 call(String s) {
    	          return new Tuple2(s, 1);
    	        }
    	      }).reduceByKey(new Function2() {
    	        @Override
    	        public Integer call(Integer i1, Integer i2) {
    	          return i1 + i2;
    	        }
    	      });

    wordCounts.print();
    
    wordCounts.saveAsNewAPIHadoopFiles("hdfs://elephant:8020/user/training/input/flumeresult/", "txt" ,Text.class,Integer.class,  (Class>) TextOutputFormat.class);
    //這個不行,必須指定outputformat
    //wordCounts.saveAsNewAPIHadoopFiles("hdfs://elephant:8020/user/training/input/flumeresult/", "txt"); 


    flumeStream.count().map(new Function() {
      @Override
      public String call(Long in) {
        return "Received " + in + " flume events.";
      }
    }).print();
    
    ssc.start();
    ssc.awaitTermination();
  }
}
9.由於我們沒有在Java裡面加入若接收為空則不寫入的條件(事實上我現在還不會= = )所以會多出內容無法讀取的Block



10.從Spark的Terminal中可以找到哪資料夾是正確的







壓呼~完成了~
參考網址:
1.Flume-ng的原理和使用
2.Master URLs
3.Spark Streaming + Flume Integration Guide
4.Spark Streaming 结合FlumeNG使用实例