Linux培训
达内IT学院
400-996-5531
编写Spark应用与通过交互式控制台使用Spark类似。API是相同的。首先,你需要访问
使用Spark编写Spark应用的一个基本模板如下:
## Spark Application - execute with spark-submit ## Imports from pyspark import SparkConf, SparkContext ## Module Constants APP_NAME = "My Spark Application" ## Closure Functions ## Main functionality def main(sc): pass if __name__ == "__main__": # Configure Spark conf = SparkConf().setAppName(APP_NAME) conf = conf.setMaster("local[*]") sc = SparkContext(conf=conf) # Execute Main functionality main(sc)
这个模板列出了一个Spark应用所需的东西:导入Python库,模块常量,用于调试和Spark UI的可识别的应用名称,还有作为驱动程序运行的一些主要分析方法学。在ifmain中,我们创建了SparkContext,使用了配置好的context执行main。我们可以简单地导入驱动代码到pyspark而不用执行。注意这里Spark配置通过setMaster方法被硬编码到SparkConf,一般你应该允许这个值通过命令行来设置,所以你能看到这行做了占位符注释。
使用<sc.stop()或<sys.exit(0)来关闭或退出程序。
## Spark Application - execute with spark-submit ## Imports import csv import matplotlib.pyplot as plt from StringIO import StringIO from datetime import datetime from collections import namedtuple from operator import add, itemgetter from pyspark import SparkConf, SparkContext ## Module Constants APP_NAME = "Flight Delay Analysis" DATE_FMT = "%Y-%m-%d" TIME_FMT = "%H%M" fields = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep', 'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance') Flight = namedtuple('Flight', fields) ## Closure Functions def parse(row): """ Parses a row and returns a named tuple. """ row[0] = datetime.strptime(row[0], DATE_FMT).date() row[5] = datetime.strptime(row[5], TIME_FMT).time() row[6] = float(row[6]) row[7] = datetime.strptime(row[7], TIME_FMT).time() row[8] = float(row[8]) row[9] = float(row[9]) row[10] = float(row[10]) return Flight(*row[:11]) def split(line): """ Operator function for splitting a line with csv module """ reader = csv.reader(StringIO(line)) return reader.next() def plot(delays): """ Show a bar chart of the total delay per airline """ airlines = [d[0] for d in delays] minutes = [d[1] for d in delays] index = list(xrange(len(airlines))) fig, axe = plt.subplots() bars = axe.barh(index, minutes) # Add the total minutes to the right for idx, air, min in zip(index, airlines, minutes): if min > 0: bars[idx].set_color('#d9230f') axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center') else: bars[idx].set_color('#469408') axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center') # Set the ticks ticks = plt.yticks([idx+ 0.5 for idx in index], airlines) xt = plt.xticks()[0] plt.xticks(xt, [' '] * len(xt)) # minimize chart junk plt.grid(axis = 'x', color ='white', linestyle='-') plt.title('Total Minutes Delayed per Airline') plt.show() ## Main functionality def main(sc): # Load the airlines lookup dictionary airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect()) # Broadcast the lookup dictionary to the cluster airline_lookup = sc.broadcast(airlines) # Read the CSV Data into an RDD flights = sc.textFile("ontime/flights.csv").map(split).map(parse) # Map the total delay to the airline (joined using the broadcast value) delays = flights.map(lambda f: (airline_lookup.value[f.airline], add(f.dep_delay, f.arv_delay))) # Reduce the total delay for the month to the airline delays = delays.reduceByKey(add).collect() delays = sorted(delays, key=itemgetter(1)) # Provide output from the driver for d in delays: print "%0.0f minutes delayed\t%s" % (d[1], d[0]) # Show a bar chart of the delays plot(delays) if __name__ == "__main__": # Configure Spark conf = SparkConf().setMaster("local[*]") conf = conf.setAppName(APP_NAME) sc = SparkContext(conf=conf) # Execute Main functionality main(sc)
使用<spark-submit命令来运行这段代码(假设你已有ontime目录,目录中有两个CSV文件): ~$ spark-submit app.py
这个Spark作业使用本机作为master,并搜索app.py同目录下的ontime目录下的2个CSV文件。最终结果显示,4月的总延误时间(单位分钟),既有早点的(如果你从美国大陆飞往夏威夷或者阿拉斯加),但对大部分大型航空公司都是延误的。注意,我们在app.py中使用matplotlib直接将结果可视化出来了:
这段代码做了什么呢?我们特别注意下与Spark最直接相关的main函数。首先,我们加载CSV文件到RDD,然后把split函数映射给它。split函数使用csv模块解析文本的每一行,并返回代表每行的元组。最后,我们将collect动作传给RDD,这个动作把数据以Python列表的形式从RDD传回驱动程序。本例中,airlines.csv是个小型的跳转表(jump table),可以将航空公司代码与全名对应起来。我们将转移表存储为Python字典,然后使用sc.broadcast广播给集群上的每个节点。
接着,main函数加载了数据量更大的flights.csv([译者注]作者笔误写成fights.csv,此处更正)。拆分CSV行完成之后,我们将parse函数映射给CSV行,此函数会把日期和时间转成Python的日期和时间,并对浮点数进行合适的类型转换。每行作为一个NamedTuple保存,名为Flight,以便高效简便地使用。
有了Flight对象的RDD,我们映射一个匿名函数,这个函数将RDD转换为一些列的键值对,其中键是航空公司的名字,值是到达和出发的延误时间总和。使用reduceByKey动作和add操作符可以得到每个航空公司的延误时间总和,然后RDD被传递给驱动程序(数据中航空公司的数目相对较少)。最终延误时间按照升序排列,输出打印到了控制台,并且使用matplotlib进行了可视化。
这个例子稍长,但是希望能演示出集群和驱动程序之间的相互作用(发送数据进行分析,结果取回给驱动程序),以及Python代码在Spark应用中的角色。
填写下面表单即可预约申请免费试听! 怕学不会?助教全程陪读,随时解惑!担心就业?一地学习,可全国推荐就业!
Copyright © 京ICP备08000853号-56 京公网安备 11010802029508号 达内时代科技集团有限公司 版权所有
Tedu.cn All Rights Reserved