Linux培训
达内IT学院

400-996-5531

编写一个Spark应用


编写Spark应用与通过交互式控制台使用Spark类似。API是相同的。首先,你需要访问

使用Spark编写Spark应用的一个基本模板如下:

## Spark Application - execute with spark-submit

## Imports
from pyspark import SparkConf, SparkContext

## Module Constants
APP_NAME = "My Spark Application"

## Closure Functions

## Main functionality

def main(sc):
pass

if __name__ == "__main__":

# Configure Spark
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster("local[*]")
sc   = SparkContext(conf=conf)


# Execute Main functionality
main(sc)

这个模板列出了一个Spark应用所需的东西:导入Python库,模块常量,用于调试和Spark UI的可识别的应用名称,还有作为驱动程序运行的一些主要分析方法学。在ifmain中,我们创建了SparkContext,使用了配置好的context执行main。我们可以简单地导入驱动代码到pyspark而不用执行。注意这里Spark配置通过setMaster方法被硬编码到SparkConf,一般你应该允许这个值通过命令行来设置,所以你能看到这行做了占位符注释。

使用<sc.stop()或<sys.exit(0)来关闭或退出程序。

## Spark Application - execute with spark-submit

## Imports
import csv
import matplotlib.pyplot as plt

from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import SparkConf, SparkContext

## Module Constants
APP_NAME = "Flight Delay Analysis"
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"

fields   = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
Flight   = namedtuple('Flight', fields)

## Closure Functions
def parse(row):

"""

Parses a row and returns a named tuple.

"""

row[0]  = datetime.strptime(row[0], DATE_FMT).date()
row[5]  = datetime.strptime(row[5], TIME_FMT).time()
row[6]  = float(row[6])
row[7]  = datetime.strptime(row[7], TIME_FMT).time()
row[8]  = float(row[8])
row[9]  = float(row[9])
row[10] = float(row[10])
return Flight(*row[:11])

def split(line):

"""

Operator function for splitting a line with csv module

"""
reader = csv.reader(StringIO(line))
return reader.next()

def plot(delays):

"""

Show a bar chart of the total delay per airline

"""
airlines = [d[0] for d in delays]
minutes  = [d[1] for d in delays]
index    = list(xrange(len(airlines)))

fig, axe = plt.subplots()
bars = axe.barh(index, minutes)


# Add the total minutes to the right
for idx, air, min in zip(index, airlines, minutes):
if min > 0:
bars[idx].set_color('#d9230f')
axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center')
else:
bars[idx].set_color('#469408')
axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center')


# Set the ticks
ticks = plt.yticks([idx+ 0.5 for idx in index], airlines)
xt = plt.xticks()[0]
plt.xticks(xt, [' '] * len(xt))


# minimize chart junk
plt.grid(axis = 'x', color ='white', linestyle='-')

plt.title('Total Minutes Delayed per Airline')
plt.show()

## Main functionality
def main(sc):


# Load the airlines lookup dictionary
airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect())


# Broadcast the lookup dictionary to the cluster
airline_lookup = sc.broadcast(airlines)


# Read the CSV Data into an RDD
flights = sc.textFile("ontime/flights.csv").map(split).map(parse)


# Map the total delay to the airline (joined using the broadcast value)
delays  = flights.map(lambda f: (airline_lookup.value[f.airline],
                  add(f.dep_delay, f.arv_delay)))


# Reduce the total delay for the month to the airline
delays  = delays.reduceByKey(add).collect()
delays  = sorted(delays, key=itemgetter(1))


# Provide output from the driver
for d in delays:
print "%0.0f minutes delayed\t%s" % (d[1], d[0])


# Show a bar chart of the delays
plot(delays)

if __name__ == "__main__":

# Configure Spark
conf = SparkConf().setMaster("local[*]")
conf = conf.setAppName(APP_NAME)
sc   = SparkContext(conf=conf)


# Execute Main functionality
main(sc)

使用<spark-submit命令来运行这段代码(假设你已有ontime目录,目录中有两个CSV文件):

~$ spark-submit app.py

这个Spark作业使用本机作为master,并搜索app.py同目录下的ontime目录下的2个CSV文件。最终结果显示,4月的总延误时间(单位分钟),既有早点的(如果你从美国大陆飞往夏威夷或者阿拉斯加),但对大部分大型航空公司都是延误的。注意,我们在app.py中使用matplotlib直接将结果可视化出来了:

这段代码做了什么呢?我们特别注意下与Spark最直接相关的main函数。首先,我们加载CSV文件到RDD,然后把split函数映射给它。split函数使用csv模块解析文本的每一行,并返回代表每行的元组。最后,我们将collect动作传给RDD,这个动作把数据以Python列表的形式从RDD传回驱动程序。本例中,airlines.csv是个小型的跳转表(jump table),可以将航空公司代码与全名对应起来。我们将转移表存储为Python字典,然后使用sc.broadcast广播给集群上的每个节点。

接着,main函数加载了数据量更大的flights.csv([译者注]作者笔误写成fights.csv,此处更正)。拆分CSV行完成之后,我们将parse函数映射给CSV行,此函数会把日期和时间转成Python的日期和时间,并对浮点数进行合适的类型转换。每行作为一个NamedTuple保存,名为Flight,以便高效简便地使用。

有了Flight对象的RDD,我们映射一个匿名函数,这个函数将RDD转换为一些列的键值对,其中键是航空公司的名字,值是到达和出发的延误时间总和。使用reduceByKey动作和add操作符可以得到每个航空公司的延误时间总和,然后RDD被传递给驱动程序(数据中航空公司的数目相对较少)。最终延误时间按照升序排列,输出打印到了控制台,并且使用matplotlib进行了可视化。

这个例子稍长,但是希望能演示出集群和驱动程序之间的相互作用(发送数据进行分析,结果取回给驱动程序),以及Python代码在Spark应用中的角色。

预约申请免费试听课

填写下面表单即可预约申请免费试听! 怕学不会?助教全程陪读,随时解惑!担心就业?一地学习,可全国推荐就业!

上一篇:Spark的执行
下一篇:高薪Linux运维工程师的十个基本技能点

Copyright © 2023 Tedu.cn All Rights Reserved 京ICP备08000853号-56 京公网安备 11010802029508号 达内时代科技集团有限公司 版权所有

选择城市和中心
黑龙江省

吉林省

河北省

湖南省

贵州省

云南省

广西省

海南省