後來好像可以不必這樣~
所以就直接下載Pre Built的版本就好了阿~~挖哈哈哈哈哈~
環境背景:
4個VM centOS with Hadoop Avaliability
Spark-1.2.0-bin-hadoop2.3
1.下載Spark-1.2.0-bin-hadoop2.3並解壓縮
2.在 Spark的conf下將spark-env.sh.template複製成spark-env.sh
[training@elephant conf]$ cp spark-env.sh.template spark-env.sh
3.編輯spark-env.sh限定worker memory 當然可以加入其他的設定~但我還不會
[training@elephant conf]$ vi spark-env.sh
export SPARK_WORKER_MEMORY=512M
4.啟動Spark Master
[training@elephant Spark-1.2.0-bin-hadoop2.3]$ sbin/start-master.sh從http://elephant:8080/可以看到Spark的畫面~
5.啟動work node ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT
[training@elephant Spark-1.2.0-bin-hadoop2.3]$ ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://elephant:7077從http://elephant:8081/可以看到Spark的畫面~
6.上傳一個文字檔到HDFS 的input資料夾並確認是否成功
[training@elephant Spark-1.2.0-bin-hadoop2.3]$ hadoop fs -put testdoc.txt input [training@elephant Spark-1.2.0-bin-hadoop2.3]$ hadoop fs -ls input Found 1 item -rw-r--r-- 3 training supergroup 23 2015-01-15 22:36 input/testdoc.txt
7.建置Java版的WordCount.jar (要有Eclipse以及Spark-1.2.0-bin-hadoop2.3 lib裡面的spark-assembly-1.2.0-hadoop2.3.0.jar)
Java code
import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class WordCount { private static final Pattern SPACE = Pattern.compile(" "); @SuppressWarnings("serial") public static void main(String[] args) throws Exception { if (args.length < 1) { System.err.println("Usage: JavaWordCount <file>"); System.exit(1); } SparkConf sparkConf = new SparkConf() .setAppName("JavaWordCount") .set("spark.executor.memory", "256m"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaRDD<String> lines = ctx.textFile(args[0], 1); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { return Arrays.asList(SPACE.split(s)); } }); JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 i2; } }); List<Tuple2<String, Integer>> output = counts.collect(); for (Tuple2<?, ?> tuple : output) { System.out.println(tuple._1() " : " tuple._2()); } ctx.stop(); } }
8.在另一個Terminal下執行~可以選擇來源是HDFS或者是LocalFile
[training@elephant spark-1.2.0-bin-hadoop2.3]$ ./bin/spark-submit --class WordCount --master spark://elephant:7077 /home/training/software/WordCount.jar hdfs://elephant:8020/user/training/input/testdoc.txt ...or.... [training@elephant spark-1.2.0-bin-hadoop2.3]$ ./bin/spark-submit --class WordCount --master spark://elephant:7077 /home/training/software/WordCount.jar ~/training_materials/admin/data/testdoc.txt ... 15/01/16 03:15:53 INFO DAGScheduler: Job 0 finished: collect at WordCount.java:55, took 5.987810 s have : 1 love : 1 a : 1 I : 1 lot : 1 ! : 1 of : 1
9.改寫Java Code在ctx.stop(); 之前加入 counts.saveAsTextFile("hdfs://elephant:8020/user/training/input/result");
import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class WordCount { private static final Pattern SPACE = Pattern.compile(" "); @SuppressWarnings("serial") public static void main(String[] args) throws Exception { if (args.length < 1) { System.err.println("Usage: JavaWordCount <file>"); System.exit(1); } SparkConf sparkConf = new SparkConf() .setAppName("JavaWordCount") .set("spark.executor.memory", "256m"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaRDD<String> lines = ctx.textFile(args[0], 1); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { return Arrays.asList(SPACE.split(s)); } }); JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 i2; } }); List<Tuple2<String, Integer>> output = counts.collect(); for (Tuple2<?, ?> tuple : output) { System.out.println(tuple._1() " : " tuple._2()); } counts.saveAsTextFile("hdfs://elephant:8020/user/training/input/result"); ctx.stop(); } }10.重新Export成Jar後再執行一次~接著就可以在HDFS上找到了喔!
[training@elephant spark-1.2.0-bin-hadoop2.3]$ hadoop fs -ls input/result Found 2 items -rw-r--r-- 3 training supergroup 0 2015-01-16 03:15 input/result/_SUCCESS -rw-r--r-- 3 training supergroup 51 2015-01-16 03:15 input/result/part-00000 [training@elephant spark-1.2.0-bin-hadoop2.3]$ hadoop fs -cat input/result/part-00000 (have,1) (love,1) (a,1) (I,1) (lot,1) (!,1) (of,1)成功了~~~
參考連結:
1.Spark
2.Spark学习笔记-安装部署与运行实例
3.Java程序如何生成Jar、exe及安装文件
4.Running a Job on Spark 0.9.0 throws error
5.Spark Standalone Mode
好用小工具
1. Img 轉 Base64
2.特殊字元轉換
沒有留言:
張貼留言