Linux培训
达内IT学院
400-996-5531
我们在接触Hadoop的时候,第一个列子一般是运行Wordcount程序,在Spark我们可以用Java代码写一个Wordcount程序并部署在Yarn上运行。我们知道,在Spark源码中就存在一个用Java编写好的JavaWordCount程序,源码如下:
查看源代码打印帮助01package org.apache.spark.examples;
02
03import org.apache.spark.api.java.JavaPairRDD;
04import org.apache.spark.api.java.JavaRDD;
05import org.apache.spark.api.java.JavaSparkContext;
06import org.apache.spark.api.java.function.FlatMapFunction;
07import org.apache.spark.api.java.function.Function2;
08import org.apache.spark.api.java.function.PairFunction;
09import scala.Tuple2;
10
11import java.util.Arrays;
12import java.util.List;
13import java.util.regex.Pattern;
14
15public final class JavaWordCount {
16 private static final Pattern SPACE = Pattern.compile(" ");
17
18 public static void main(String[] args) throws Exception {
19 if (args.length < 2) {
20 System.err.println("Usage: JavaWordCount ");
21 System.exit(1);
22 }
23
24 JavaSparkContext ctx = new JavaSparkContext(args[0],
25 "JavaWordCount",
26 System.getenv("SPARK_HOME"),
27 JavaSparkContext.jarOfClass(JavaWordCount.class));
28 JavaRDD lines = ctx.textFile(args[1], 1);
29
30 JavaRDD words = lines.flatMap(
31 new FlatMapFunction() {
32 @Override
33 public Iterable call(String s) {
34 return Arrays.asList(SPACE.split(s));
35 }
36 });
37
38 JavaPairRDD ones = words.map(
39 new PairFunction() {
40 @Override
41 public Tuple2 call(String s) {
42 return new Tuple2(s, 1);
43 }
44 });
45
46 JavaPairRDD counts = ones.reduceByKey(
47 new Function2() {
48 @Override
49 public Integer call(Integer i1, Integer i2) {
50 return i1 + i2;
51 }
52 });
53
54 List> output = counts.collect();
55 for (Tuple2 tuple : output) {
56 System.out.println(tuple._1() + ": " + tuple._2());
57 }
58 System.exit(0);
59 }
60}
这里有必要介绍一下这里用到的几个函数。首先是map函数,它根据现有的数据集返回一个新的分布式数据集,由每个原元素经过func函数转换后组成,这个过程一般叫做转换(transformation);flatMap函数类似于map函数,但是每一个输入元素,会被映射为0到多个输出元素,因此,func函数的返回值是一个Seq,而不是单一元素,可以从上面的代码中看出;reduceByKey函数在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。
运行上面的代码之前你得先编译好(话说我好几次用Maven编译老是不成功啊,不过大家可以用./sbt/sbt assembly进行编译)。编译好之后可以用下面的命令进行运行:
1./bin/spark-class \
2org.apache.spark.deploy.yarn.Client \
3--jar ./jars/spark-examples-assembly-0.9.1.jar \
4--class org.apache.spark.examples.JavaWordCount \
5--args yarn-standalone \
6--args /home/wyp/cite75_99.txt \
org.apache.spark.examples.JavaWordCount类接收两个参数,第一个参数指定你程序运行的master;第二个参数指定你需要计算Wordcount文件的绝对路径,这个文件需要在HDFS上。程序运行的过程中我们可以在Hadoop的WEB UI上进行查看,程序运行完的时候,可以在logs里面看到运行的结果,类似下面:
01submitting: 1
02find: 1
03versions: 4
04Regression: 1
05via: 2
06tests: 2
07open: 2
08./bin/spark-shell: 1
09When: 1
10All: 1
11download: 1
12requires: 2
13SPARK_YARN=true: 3
14Testing: 1
15take: 1
16project: 4
17no: 1
18systems.: 1
19file: 1
20Or,: 1
21About: 1
22project's: 3
23programs: 2
24given.: 1
25obtained: 1
26sbt/sbt: 5
27artifact: 1
28SBT: 1
29local[2]: 1
30not: 1
31runs.: 1
32you: 5
33building: 1
当然,程序默认的输出直接输到logs里面去了,我们可以将结果输出到文本里面,修改如下:
1counts.saveAsTextFile("/home/wyp/result");
2
3或者:
4
5counts.saveAsHadoopFile("/home/wyp/result",
6 Text.class,
7 IntWritable.class,
8 TextOutputFormat.class);
上面的两行代码都可以将计算的结果存储到HDFS上的/home/wyp/result文件夹里面,但是两者输出来的结果内容格式是有区别的,第一种输出内容格式如下:
01(5,5)
02(1,1)
03(15,1)
04(7,6)
05(11,5)
06(14,2)
07(3,3)
08(8,6)
09(6,6)
10(12,4)
11(4,4)
12(10,6)
13(13,3)
14(2,2)
15(9,6)
格式是(key, value)的;第二种输出内容格式如下:
015 5
021 1
0315 1
047 6
0511 5
0614 2
073 3
088 6
096 6
1012 4
114 4
1210 6
1313 3
142 2
159 6
格式是key value。我们可以根据自己的需要定义一个自己的输出格式,而且我们在输出的时候如果文件比较大,还可以指定输出文件的压缩方式。
填写下面表单即可预约申请免费试听! 怕学不会?助教全程陪读,随时解惑!担心就业?一地学习,可全国推荐就业!
Copyright © 京ICP备08000853号-56 京公网安备 11010802029508号 达内时代科技集团有限公司 版权所有
Tedu.cn All Rights Reserved