為了要將利用Flume擁有很多Protocal的優勢與Spark Straming結合~
因此試著將原本皆可以獨立運作的東西串起來。
1.安裝Flume (yum沒有就自己想辦法= = )
sudo yum install --assumeyes flume-ng
2.設置Config,檔名及位置/etc/hadoop/conf/spark-flumeng.conf
[training@elephant ~]$ sudo vi /etc/hadoop/conf/spark-flumeng.conf -------------內容---------------------------------------------------- #Agent5 #List the sources, sinks and channels for the agent agent5.sources = source1 agent5.sinks = hdfs01 agent5.channels = channel1 #set channel for sources and sinks agent5.sources.source1.channels = channel1 agent5.sinks.hdfs01.channel = channel1 #properties of someone source agent5.sources.source1.type = spooldir agent5.sources.source1.spoolDir = /home/training/Spooling/ agent5.sources.source1.ignorePattern = .*(\\.index|\\.tmp|\\.xml)$ agent5.sources.source1.fileSuffix = .1 agent5.sources.source1.fileHeader = true agent5.sources.source1.fileHeaderKey = filename # set interceptors agent5.sources.source1.interceptors = i1 i2 agent5.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder agent5.sources.source1.interceptors.i1.preserveExisting = false agent5.sources.source1.interceptors.i1.hostHeader = elephant agent5.sources.source1.interceptors.i1.useIP = false agent5.sources.source1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder #properties of mem-channel-1 agent5.channels.channel1.type = memory agent5.channels.channel1.capacity = 100000 agent5.channels.channel1.transactionCapacity = 100000 agent5.channels.channel1.keep-alive = 30 #properties of sink agent5.sinks.hdfs01.type = avro agent5.sinks.hdfs01.hostname = elephant agent5.sinks.hdfs01.port = 11000其中Flume是採用spooling的方式~
指定spoolDir 為 /home/training/Spooling/
3.接著準備Spark Streaming用來測試的Java Code,產出JavaFlumeEventCount.jar 檔
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
public final class JavaFlumeEventCount {
private JavaFlumeEventCount() {
}
public static void main(String[] args) {
if (args.length < 2) {
System.err.println("Usage: JavaFlumeEventCount <file>");
System.exit(1);
}
String host = args[0];
int port = Integer.parseInt(args[1]);
Duration batchInterval = new Duration(Integer.parseInt(args[2]));
//Duration batchInterval = new Duration(10000);
SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
batchInterval);
//JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "elephant", 11000);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc,host , port);
flumeStream.count();
flumeStream.count().map(new Function<Long, String>() {
private static final long serialVersionUID = -572435064083746235L;
public String call(Long in) {
return "Received " in " flume events.";
}
}).print();
ssc.start();
ssc.awaitTermination();
}
}
4.執行Spark Master 跟Spark worker,成功之後一樣在http://elephant:8080 可以看到Master 8081可以看到worker
[training@elephant spark-1.2.0-bin-hadoop2.3]$ sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /home/training/software/spark-1.2.0-bin-hadoop2.3/sbin/../logs/spark-training-org.apache.spark.deploy.master.Master-1-elephant.out [training@elephant spark-1.2.0-bin-hadoop2.3]$ ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://elephant:7077
5.執行Spark ,不過這邊要留意 --master 原本是直接接spark://elephant:7077 改接 local[10] 。請參考官方文件
[training@elephant spark-1.2.0-bin-hadoop2.3]$ ./bin/spark-submit --class JavaFlumeEventTest --jars lib/spark-assembly-1.2.0-hadoop2.3.0.jar,lib/spark-examples-1.2.0-hadoop2.3.0.jar,lib/guava-18.0.jar --master local[10] /home/training/software/JavaFlumeEventTest.jar elephant 11000 10000
6.執行Flume-ng Agent
[training@elephant spark-1.2.0-bin-hadoop2.3]$ flume-ng agent --conf /etc/hadoop/conf --conf-file /etc/hadoop/conf/spark-flumeng.conf --name agent5
7.由於我們指定spoolDir 為 /home/training/Spooling/ 所以只要將檔案丟進該資料夾~
他就會自動將該檔案讀出往Spark送~並且改副檔名為.1
8.接著改寫Java Cdoe 使其將結果寫進HDFS裡面(1/27更新)
加入saveAsNewAPIHadoopFiles("hdfs://elephant:8020/user/training/input/flumeresult/", "txt" ,Text.class,Integer.class, (Class<? extends OutputFormat<?, ?>>) TextOutputFormat.class);
import java.nio.ByteBuffer;
public final class JavaFlumeEventTest {
private static final Pattern SPACE = Pattern.compile(" ");
private JavaFlumeEventTest() {
}
public static void main(String[] args) {
String host = args[0];
int port = Integer.parseInt(args[1]);
Duration batchInterval = new Duration(Integer.parseInt(args[2]));
SparkConf sparkConf = new SparkConf()
.setAppName("JavaFlumeEventTest")
.set("spark.executor.memory", "256m");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
JavaReceiverInputDStream flumeStream = FlumeUtils.createStream(ssc, host, port);
JavaDStream
words = flumeStream.flatMap(new FlatMapFunction(){
@Override
public Iterable call(SparkFlumeEvent arg0) throws Exception {
String body = new String(arg0.event().getBody().array(), Charset.forName("UTF-8"));
return Lists.newArrayList(SPACE.split(body));
}
});
JavaPairDStream wordCounts = words.mapToPair(
new PairFunction() {
@Override
public Tuple2 call(String s) {
return new Tuple2(s, 1);
}
}).reduceByKey(new Function2() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
wordCounts.saveAsNewAPIHadoopFiles("hdfs://elephant:8020/user/training/input/flumeresult/", "txt" ,Text.class,Integer.class, (Class>) TextOutputFormat.class);
//這個不行,必須指定outputformat
//wordCounts.saveAsNewAPIHadoopFiles("hdfs://elephant:8020/user/training/input/flumeresult/", "txt");
flumeStream.count().map(new Function() {
@Override
public String call(Long in) {
return "Received " + in + " flume events.";
}
}).print();
ssc.start();
ssc.awaitTermination();
}
}
9.由於我們沒有在Java裡面加入若接收為空則不寫入的條件(事實上我現在還不會= = )所以會多出內容無法讀取的Block10.從Spark的Terminal中可以找到哪資料夾是正確的
壓呼~完成了~
參考網址:
1.Flume-ng的原理和使用
2.Master URLs
3.Spark Streaming + Flume Integration Guide
4.Spark Streaming 结合FlumeNG使用实例
沒有留言:
張貼留言