更多课程 选择中心

Linux培训
美国上市教育机构

400-111-8989

Linux培训

Spark在Yarn上运行Wordcount程序

  • 发布:linux培训
  • 来源:网络
  • 时间:2015-07-03 21:04

我们在接触Hadoop的时候,第一个列子一般是运行Wordcount程序,在Spark我们可以用Java代码写一个Wordcount程序并部署在Yarn上运行。我们知道,在Spark源码中就存在一个用Java编写好的JavaWordCount程序,源码如下:

查看源代码打印帮助01package org.apache.spark.examples;

02

03import org.apache.spark.api.java.JavaPairRDD;

04import org.apache.spark.api.java.JavaRDD;

05import org.apache.spark.api.java.JavaSparkContext;

06import org.apache.spark.api.java.function.FlatMapFunction;

07import org.apache.spark.api.java.function.Function2;

08import org.apache.spark.api.java.function.PairFunction;

09import scala.Tuple2;

10

11import java.util.Arrays;

12import java.util.List;

13import java.util.regex.Pattern;

14

15public final class JavaWordCount {

16 private static final Pattern SPACE = Pattern.compile(" ");

17

18 public static void main(String[] args) throws Exception {

19 if (args.length < 2) {

20 System.err.println("Usage: JavaWordCount ");

21 System.exit(1);

22 }

23

24 JavaSparkContext ctx = new JavaSparkContext(args[0],

25 "JavaWordCount",

26 System.getenv("SPARK_HOME"),

27 JavaSparkContext.jarOfClass(JavaWordCount.class));

28 JavaRDD lines = ctx.textFile(args[1], 1);

29

30 JavaRDD words = lines.flatMap(

31 new FlatMapFunction() {

32 @Override

33 public Iterable call(String s) {

34 return Arrays.asList(SPACE.split(s));

35 }

36 });

37

38 JavaPairRDD ones = words.map(

39 new PairFunction() {

40 @Override

41 public Tuple2 call(String s) {

42 return new Tuple2(s, 1);

43 }

44 });

45

46 JavaPairRDD counts = ones.reduceByKey(

47 new Function2() {

48 @Override

49 public Integer call(Integer i1, Integer i2) {

50 return i1 + i2;

51 }

52 });

53

54 List> output = counts.collect();

55 for (Tuple2 tuple : output) {

56 System.out.println(tuple._1() + ": " + tuple._2());

57 }

58 System.exit(0);

59 }

60}

这里有必要介绍一下这里用到的几个函数。首先是map函数,它根据现有的数据集返回一个新的分布式数据集,由每个原元素经过func函数转换后组成,这个过程一般叫做转换(transformation);flatMap函数类似于map函数,但是每一个输入元素,会被映射为0到多个输出元素,因此,func函数的返回值是一个Seq,而不是单一元素,可以从上面的代码中看出;reduceByKey函数在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。

运行上面的代码之前你得先编译好(话说我好几次用Maven编译老是不成功啊,不过大家可以用./sbt/sbt assembly进行编译)。编译好之后可以用下面的命令进行运行:

1./bin/spark-class \

2org.apache.spark.deploy.yarn.Client \

3--jar ./jars/spark-examples-assembly-0.9.1.jar \

4--class org.apache.spark.examples.JavaWordCount \

5--args yarn-standalone \

6--args /home/wyp/cite75_99.txt \

org.apache.spark.examples.JavaWordCount类接收两个参数,第一个参数指定你程序运行的master;第二个参数指定你需要计算Wordcount文件的绝对路径,这个文件需要在HDFS上。程序运行的过程中我们可以在Hadoop的WEB UI上进行查看,程序运行完的时候,可以在logs里面看到运行的结果,类似下面:

01submitting: 1

02find: 1

03versions: 4

04Regression: 1

05via: 2

06tests: 2

07open: 2

08./bin/spark-shell: 1

09When: 1

10All: 1

11download: 1

12requires: 2

13SPARK_YARN=true: 3

14Testing: 1

15take: 1

16project: 4

17no: 1

18systems.: 1

19file: 1

20Or,: 1

21About: 1

22project's: 3

23programs: 2

24given.: 1

25obtained: 1

26sbt/sbt: 5

27artifact: 1

28SBT: 1

29local[2]: 1

30not: 1

31runs.: 1

32you: 5

33building: 1

当然,程序默认的输出直接输到logs里面去了,我们可以将结果输出到文本里面,修改如下:

1counts.saveAsTextFile("/home/wyp/result");

2

3或者:

4

5counts.saveAsHadoopFile("/home/wyp/result",

6 Text.class,

7 IntWritable.class,

8 TextOutputFormat.class);

上面的两行代码都可以将计算的结果存储到HDFS上的/home/wyp/result文件夹里面,但是两者输出来的结果内容格式是有区别的,第一种输出内容格式如下:

01(5,5)

02(1,1)

03(15,1)

04(7,6)

05(11,5)

06(14,2)

07(3,3)

08(8,6)

09(6,6)

10(12,4)

11(4,4)

12(10,6)

13(13,3)

14(2,2)

15(9,6)

格式是(key, value)的;第二种输出内容格式如下:

015 5

021 1

0315 1

047 6

0511 5

0614 2

073 3

088 6

096 6

1012 4

114 4

1210 6

1313 3

142 2

159 6

格式是key value。我们可以根据自己的需要定义一个自己的输出格式,而且我们在输出的时候如果文件比较大,还可以指定输出文件的压缩方式。

预约申请免费试听课

填写下面表单即可预约申请免费试听!怕钱不够?可就业挣钱后再付学费! 怕学不会?助教全程陪读,随时解惑!担心就业?一地学习,可全国推荐就业!

上一篇:MapReduce:详细介绍Shuffle的执行过程
下一篇:Hadoop元数据合并异常及解决方法

Spark在Yarn上运行Wordcount程序

Spark和Hadoop作业之间的区别

选择城市和中心
黑龙江省

吉林省

河北省

湖南省

贵州省

云南省

广西省

海南省