2015年1月16日 星期五

Use Spark to Count Word And write file to HDFS

話說上一篇安裝Apache Maven 3.2.5 原本是打算自己建置Spark~
後來好像可以不必這樣~
所以就直接下載Pre Built的版本就好了阿~~挖哈哈哈哈哈~

環境背景:
4個VM centOS with Hadoop Avaliability
Spark-1.2.0-bin-hadoop2.3



1.下載Spark-1.2.0-bin-hadoop2.3並解壓縮



2.在 Spark的conf下將spark-env.sh.template複製成spark-env.sh
[training@elephant conf]$ cp spark-env.sh.template spark-env.sh 

3.編輯spark-env.sh限定worker memory 當然可以加入其他的設定~但我還不會
[training@elephant conf]$ vi spark-env.sh 
export SPARK_WORKER_MEMORY=512M 

4.啟動Spark Master
[training@elephant Spark-1.2.0-bin-hadoop2.3]$ sbin/start-master.sh 
從http://elephant:8080/可以看到Spark的畫面~



5.啟動work node ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT
[training@elephant Spark-1.2.0-bin-hadoop2.3]$ ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://elephant:7077 
從http://elephant:8081/可以看到Spark的畫面~



6.上傳一個文字檔到HDFS 的input資料夾並確認是否成功
[training@elephant Spark-1.2.0-bin-hadoop2.3]$ hadoop fs -put testdoc.txt input 
[training@elephant Spark-1.2.0-bin-hadoop2.3]$ hadoop fs -ls input 
Found 1 item
-rw-r--r--   3 training supergroup         23 2015-01-15 22:36 input/testdoc.txt

7.建置Java版的WordCount.jar (要有Eclipse以及Spark-1.2.0-bin-hadoop2.3 lib裡面的spark-assembly-1.2.0-hadoop2.3.0.jar)
Java code

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;


public class WordCount {
 
 private static final Pattern SPACE = Pattern.compile(" ");

 @SuppressWarnings("serial")
 public static void main(String[] args) throws Exception {
  if (args.length < 1) {
   System.err.println("Usage: JavaWordCount <file>");
   System.exit(1);
  }

  SparkConf sparkConf = new SparkConf()
  .setAppName("JavaWordCount")
  .set("spark.executor.memory", "256m");
  JavaSparkContext ctx = new JavaSparkContext(sparkConf);
  JavaRDD<String> lines = ctx.textFile(args[0], 1);

  JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
     @Override
     public Iterable<String> call(String s) {
      return Arrays.asList(SPACE.split(s));
     }
    });

  JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
     @Override
     public Tuple2<String, Integer> call(String s) {
      return new Tuple2<String, Integer>(s, 1);
     }
    });

  JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
     @Override
     public Integer call(Integer i1, Integer i2) {
      return i1   i2;
     }
    });
  
  List<Tuple2<String, Integer>> output = counts.collect();
  
  for (Tuple2<?, ?> tuple : output) {
   System.out.println(tuple._1()   " : "   tuple._2());
  }
  ctx.stop();
 }
}

8.在另一個Terminal下執行~可以選擇來源是HDFS或者是LocalFile
[training@elephant spark-1.2.0-bin-hadoop2.3]$ ./bin/spark-submit --class WordCount --master spark://elephant:7077 /home/training/software/WordCount.jar hdfs://elephant:8020/user/training/input/testdoc.txt

...or....

[training@elephant spark-1.2.0-bin-hadoop2.3]$ ./bin/spark-submit --class WordCount --master spark://elephant:7077 /home/training/software/WordCount.jar ~/training_materials/admin/data/testdoc.txt

...
15/01/16 03:15:53 INFO DAGScheduler: Job 0 finished: collect at WordCount.java:55, took 5.987810 s
have : 1
love : 1
a : 1
I : 1
lot : 1
! : 1
of : 1



9.改寫Java Code在ctx.stop(); 之前加入 counts.saveAsTextFile("hdfs://elephant:8020/user/training/input/result");
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;


public class WordCount {
 
 private static final Pattern SPACE = Pattern.compile(" ");

 @SuppressWarnings("serial")
 public static void main(String[] args) throws Exception {
  if (args.length < 1) {
   System.err.println("Usage: JavaWordCount <file>");
   System.exit(1);
  }

  SparkConf sparkConf = new SparkConf()
  .setAppName("JavaWordCount")
  .set("spark.executor.memory", "256m");
  JavaSparkContext ctx = new JavaSparkContext(sparkConf);
  JavaRDD<String> lines = ctx.textFile(args[0], 1);

  JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
     @Override
     public Iterable<String> call(String s) {
      return Arrays.asList(SPACE.split(s));
     }
    });

  JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
     @Override
     public Tuple2<String, Integer> call(String s) {
      return new Tuple2<String, Integer>(s, 1);
     }
    });

  JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
     @Override
     public Integer call(Integer i1, Integer i2) {
      return i1   i2;
     }
    });
  
  List<Tuple2<String, Integer>> output = counts.collect();
  
  for (Tuple2<?, ?> tuple : output) {
   System.out.println(tuple._1()   " : "   tuple._2());
  }
  counts.saveAsTextFile("hdfs://elephant:8020/user/training/input/result");
  ctx.stop();
 }
}
10.重新Export成Jar後再執行一次~接著就可以在HDFS上找到了喔!
[training@elephant spark-1.2.0-bin-hadoop2.3]$ hadoop fs -ls input/result
Found 2 items
-rw-r--r--   3 training supergroup          0 2015-01-16 03:15 input/result/_SUCCESS
-rw-r--r--   3 training supergroup         51 2015-01-16 03:15 input/result/part-00000
[training@elephant spark-1.2.0-bin-hadoop2.3]$ hadoop fs -cat input/result/part-00000
(have,1)
(love,1)
(a,1)
(I,1)
(lot,1)
(!,1)
(of,1)
成功了~~~
參考連結:
1.Spark 
2.Spark学习笔记-安装部署与运行实例 
3.Java程序如何生成Jar、exe及安装文件
4.Running a Job on Spark 0.9.0 throws error
5.Spark Standalone Mode
好用小工具
1. Img 轉 Base64
2.特殊字元轉換