為了要將利用Flume擁有很多Protocal的優勢與Spark Straming結合~
因此試著將原本皆可以獨立運作的東西串起來。
1.安裝Flume (yum沒有就自己想辦法= = )
sudo yum install --assumeyes flume-ng
2.設置Config,檔名及位置/etc/hadoop/conf/spark-flumeng.conf
[training@elephant ~]$ sudo vi /etc/hadoop/conf/spark-flumeng.conf -------------內容---------------------------------------------------- #Agent5 #List the sources, sinks and channels for the agent agent5.sources = source1 agent5.sinks = hdfs01 agent5.channels = channel1 #set channel for sources and sinks agent5.sources.source1.channels = channel1 agent5.sinks.hdfs01.channel = channel1 #properties of someone source agent5.sources.source1.type = spooldir agent5.sources.source1.spoolDir = /home/training/Spooling/ agent5.sources.source1.ignorePattern = .*(\\.index|\\.tmp|\\.xml)$ agent5.sources.source1.fileSuffix = .1 agent5.sources.source1.fileHeader = true agent5.sources.source1.fileHeaderKey = filename # set interceptors agent5.sources.source1.interceptors = i1 i2 agent5.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder agent5.sources.source1.interceptors.i1.preserveExisting = false agent5.sources.source1.interceptors.i1.hostHeader = elephant agent5.sources.source1.interceptors.i1.useIP = false agent5.sources.source1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder #properties of mem-channel-1 agent5.channels.channel1.type = memory agent5.channels.channel1.capacity = 100000 agent5.channels.channel1.transactionCapacity = 100000 agent5.channels.channel1.keep-alive = 30 #properties of sink agent5.sinks.hdfs01.type = avro agent5.sinks.hdfs01.hostname = elephant agent5.sinks.hdfs01.port = 11000其中Flume是採用spooling的方式~
指定spoolDir 為 /home/training/Spooling/
3.接著準備Spark Streaming用來測試的Java Code,產出JavaFlumeEventCount.jar 檔
import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.flume.FlumeUtils; import org.apache.spark.streaming.flume.SparkFlumeEvent; public final class JavaFlumeEventCount { private JavaFlumeEventCount() { } public static void main(String[] args) { if (args.length < 2) { System.err.println("Usage: JavaFlumeEventCount <file>"); System.exit(1); } String host = args[0]; int port = Integer.parseInt(args[1]); Duration batchInterval = new Duration(Integer.parseInt(args[2])); //Duration batchInterval = new Duration(10000); SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); //JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "elephant", 11000); JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc,host , port); flumeStream.count(); flumeStream.count().map(new Function<Long, String>() { private static final long serialVersionUID = -572435064083746235L; public String call(Long in) { return "Received " in " flume events."; } }).print(); ssc.start(); ssc.awaitTermination(); } }
4.執行Spark Master 跟Spark worker,成功之後一樣在http://elephant:8080 可以看到Master 8081可以看到worker
[training@elephant spark-1.2.0-bin-hadoop2.3]$ sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /home/training/software/spark-1.2.0-bin-hadoop2.3/sbin/../logs/spark-training-org.apache.spark.deploy.master.Master-1-elephant.out [training@elephant spark-1.2.0-bin-hadoop2.3]$ ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://elephant:7077
5.執行Spark ,不過這邊要留意 --master 原本是直接接spark://elephant:7077 改接 local[10] 。請參考官方文件
[training@elephant spark-1.2.0-bin-hadoop2.3]$ ./bin/spark-submit --class JavaFlumeEventTest --jars lib/spark-assembly-1.2.0-hadoop2.3.0.jar,lib/spark-examples-1.2.0-hadoop2.3.0.jar,lib/guava-18.0.jar --master local[10] /home/training/software/JavaFlumeEventTest.jar elephant 11000 10000
6.執行Flume-ng Agent
[training@elephant spark-1.2.0-bin-hadoop2.3]$ flume-ng agent --conf /etc/hadoop/conf --conf-file /etc/hadoop/conf/spark-flumeng.conf --name agent5
7.由於我們指定spoolDir 為 /home/training/Spooling/ 所以只要將檔案丟進該資料夾~
他就會自動將該檔案讀出往Spark送~並且改副檔名為.1
8.接著改寫Java Cdoe 使其將結果寫進HDFS裡面(1/27更新)
加入saveAsNewAPIHadoopFiles("hdfs://elephant:8020/user/training/input/flumeresult/", "txt" ,Text.class,Integer.class, (Class<? extends OutputFormat<?, ?>>) TextOutputFormat.class);
import java.nio.ByteBuffer; public final class JavaFlumeEventTest { private static final Pattern SPACE = Pattern.compile(" "); private JavaFlumeEventTest() { } public static void main(String[] args) { String host = args[0]; int port = Integer.parseInt(args[1]); Duration batchInterval = new Duration(Integer.parseInt(args[2])); SparkConf sparkConf = new SparkConf() .setAppName("JavaFlumeEventTest") .set("spark.executor.memory", "256m"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); JavaReceiverInputDStream9.由於我們沒有在Java裡面加入若接收為空則不寫入的條件(事實上我現在還不會= = )所以會多出內容無法讀取的BlockflumeStream = FlumeUtils.createStream(ssc, host, port); JavaDStream words = flumeStream.flatMap(new FlatMapFunction (){ @Override public Iterable call(SparkFlumeEvent arg0) throws Exception { String body = new String(arg0.event().getBody().array(), Charset.forName("UTF-8")); return Lists.newArrayList(SPACE.split(body)); } }); JavaPairDStream wordCounts = words.mapToPair( new PairFunction () { @Override public Tuple2 call(String s) { return new Tuple2 (s, 1); } }).reduceByKey(new Function2 () { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); wordCounts.saveAsNewAPIHadoopFiles("hdfs://elephant:8020/user/training/input/flumeresult/", "txt" ,Text.class,Integer.class, (Class>) TextOutputFormat.class); //這個不行,必須指定outputformat //wordCounts.saveAsNewAPIHadoopFiles("hdfs://elephant:8020/user/training/input/flumeresult/", "txt"); flumeStream.count().map(new Function () { @Override public String call(Long in) { return "Received " + in + " flume events."; } }).print(); ssc.start(); ssc.awaitTermination(); } }
10.從Spark的Terminal中可以找到哪資料夾是正確的
壓呼~完成了~
參考網址:
1.Flume-ng的原理和使用
2.Master URLs
3.Spark Streaming + Flume Integration Guide
4.Spark Streaming 结合FlumeNG使用实例
沒有留言:
張貼留言