Python3:Python+spark编程实战总结不宜妄⾃菲薄,引喻失义。
0、前提
0.1 配置
有关spark
说明:
spark2.0.1的版本还不⽀持Python3.6
安装注意版本
可参考:
⼀、实例分析
1.1 数据
1.2 代码
#studentExample 例⼦练习
def map_func(x):
s = x.split()
return (s[0], [int(s[1]),int(s[2]),int(s[3])]) #返回为(key,vaklue)格式,其中key:x[0],value:x[1]且为有三个元素的列表    #return (s[0],[int(s[1],s[2],s[3])])  #注意此⽤法不合法
def has100(x):
for y in x:
if(y == 100):  #把x、y理解为 x轴、y轴
return True
return False
def allis0(x):
sql2000 win10下载
if(type(x)==list and sum(x) == 0): #类型为list且总分为0 者为true;其中type(x)==list :判断类型是否相同
return True
return False
def subMax(x,y):
m = [x[1][i] if(x[1][i] > y[1][i]) else y[1][i] for i in range(3)]
threading模块
return('Maximum subject score', m)
def sumSub(x,y):
n = [x[1][i]+y[1][i] for i in range(3)]
#或者 n = ([x[1][0]+y[1][0],x[1][1]+y[1][0],x[1][2]+y[1][2]])
return('Total subject score', n)
def sumPer(x):
return (x[0],sum(x[1]))
#停⽌之前的SparkContext,不然重新运⾏或者创建⼯作会失败;另外,只有 sc.stop()也可以,但是⾸次运⾏会有误try:
sc.stop()
sc.stop()
except:
pass
from pyspark import SparkContext  #导⼊模块
sc=SparkContext(appName='Student')  #命名
前端样式框架File("").map(lambda x:map_func(x)).cache() #导⼊数据且保持在内存中,其中cache():数据保持在内存中unt()  #对RDD中的数据个数进⾏计数;其中,RDD⼀⾏为⼀个数据集
#RDD'转换'运算(筛选关键字filter)
whohas100 = lines.filter(lambda x: has100(x[1])).collect() #注意:处理的是value列表,也就是x[1]
whois0 = lines.filter(lambda x: allis0(x[1])).collect()
sumScore = lines.map(lambda x: (x[0],sum(x[1]))).collect()
#‘动作’运算
maxScore = max(sumScore,key=lambda x: x[1]) #总分最⾼者
minScore = min(sumScore,key=lambda x: x[1]) #总分最低者
sumSubScore = duce(lambda x,y: sumSub(x,y))
avgScore = [x/count for x in sumSubScore[1]]#单科成绩平均值
#RDD key-value‘转换’运算
subM = duce(lambda x,y: subMax(x,y))
redByK = duceByKey(lambda x,y: [x[i]+y[i] for i in range(3)]).collect() #合并key相同的value值x[0]+y[0],x[1]+y[1],x[2]+y[2]
#RDD'转换'运算
sumPerSore = lines.map(lambda x: sumPer(x)).collect() #每个⼈的总分 #sumSore = lines.map(lambda x: (x[0],sum(x[1]))).collect() sorted = lines.sortBy(lambda x: sum(x[1])) #总成绩低到⾼的学⽣成绩排序
学习通c语言答案sortedWithRank = sorted.zipWithIndex().collect()#按总分排序
first3 = sorted.takeOrdered(3,key=lambda x:-sum(x[1])) #总分前三者
#限定以空格的形式输出到⽂件中
first3RDD = sc.parallelize(first3)\
.map(lambda x:str(x[0])+' '+str(x[1][0])+' '+str(x[1][1])+' '+str(x[1][2])).saveAsTextFile("result")
#llect())
print("数据集个数(⾏):",count)
print("单科满分者:",whohas100)
print("单科零分者:",whois0)
print("单科最⾼分者:",subM)
print("单科总分:",sumSubScore)
print("合并名字相同的分数:",redByK)
print("总分/(⼈)",sumPerSore)
print("最⾼总分者:",maxScore)
print("最低总分者:",minScore)
print("每科平均成绩:",avgScore)
print("总分倒序:",sortedWithRank)
print("总分前三者:",first3)
以下哪种矿物质是常量元素( )print(first3RDD)
sc.stop()
1.3 结果展⽰
数据集个数(⾏): 7
单科满分者: [('li', [100, 54, 0]), ('li', [100, 54, 0])]
单科零分者: [('yanf', [0, 0, 0])]
单科最⾼分者: ('Maximum subject score', [100, 90, 100])
单科总分: ('Total subject score', [485, 438, 280])
合并名字相同的分数: [('li', [200, 108, 0]), ('zhang', [180, 180, 200]), ('yang', [85, 90, 30]), ('wang', [20, 60, 50]), ('yanf', [0, 0, 0])]
总分/(⼈) [('yang', 205), ('wang', 130), ('zhang', 280), ('zhang', 280), ('li', 154), ('li', 154), ('yanf', 0)]
最⾼总分者: ('zhang', 280)
最低总分者: ('yanf', 0)
每科平均成绩: [69.28571428571429, 62.57142857142857, 40.0]
总分倒序: [(('yanf', [0, 0, 0]), 0), (('wang', [20, 60, 50]), 1), (('li', [100, 54, 0]), 2), (('li', [100, 54, 0]), 3), (('yang', [85, 90, 30]), 4), (('zhang', [90, 90, 100]), 5) , (('zhang', [90, 90, 100]), 6)]
总分前三者: [('zhang', [90, 90, 100]), ('zhang', [90, 90, 100]), ('yang', [85, 90, 30])]
None
⼆、代码解析
2.1函数解析
python在线编辑器python3
2.1.1 collect()
RDD的特性
在进⾏基本RDD“转换”运算时不会⽴即执⾏,结果不会显⽰在显⽰屏中,collect()是⼀个“动作”运算,会⽴刻执⾏,显⽰结果。2.1.2 reduce()
说明
reduce()函数会对参数序列中的元素进⾏累积。
语法
reduce(function, iterable[, initializer])
参数
function – 函数,有两个参数
iterable – 可迭代对象
initializer – 可选,初始参数
实例
说明:Python3的内建函数移除了reduce函数,reduce函数放在functools模块
In [24]:
#r = reduce(lambda x, y: x+y, [4,4,5,5])  # 使⽤ lambda 匿名函数from functools import reduce
def add(x, y) :            # 两数相加
return x + y
reduce(add, [1,2,3,4,5])
Out[24]:
15
In [25]:
reduce(lambda x, y: x+y, [1,2,3,4,5])  # 使⽤ lambda 匿名函数Out[25]:
15
2.1.3 type()
语法
class type(name, bases, dict)
参数
name – 类的名称。
bases – 基类的元组。
dict – 字典,类内定义的命名空间变量。
返回值
⼀个参数返回对象类型, 三个参数,返回新的类型对象。实例
# ⼀个参数实例
In [1]:
type(1)
Out[1]:
int
In [2]:
type([2])
Out[2]:
list
In [3]:
type({3:'three'})
Out[3]:
dict
In [5]:
x = 5
type(x) == list #判断x的类型是否为list
Out[5]:
False
# 三个参数实例
class y(object):
z = 5
x = type('y',(object,),dict(z=5))
print(x)
<class '__main__.y'>  #产⽣⼀个新的类型
三、问题分析
An error occurred while calling z:org.apache.spark.api.llectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 5, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
解析
1、检查拼写是否有误
2、检查缩进是否合规
3、检查()是否⼀⼀配对
四、实例⼩练
4.1 数据 user_small
1441900799.728000 1441900802.452000 8618245698655 0134730038729312 2 1 1 IPHONE_5 17999 20693 10.67.23.157 111.13.34.100 6 58986 80 G ET mmsns.qpic /mmsns/PdibpV1sFDHdaOTqNXb8VGSNicyYpOVa9R7icxSr4BkwbsSyzJbBTmE5Zz5aZichejbkKuia7twzraqk/150?tp=webp&length=113 6&width=640 weixin.qq/?version=369229843&uin=2925174340&nettype=0&scene=moment WeChat/6.2.0.19 CFNetwork/711.3.18 Darwin/14.0.0 200 59 image/webp 7504 706 8212 7 1827
1441900750.023000 1441900754.063000 8613836044032 0136210021269713 2 1 1 IPHONE_5 17752 25632 10.67.21.71 117.144.242.26 6 52941 80 P OST short.weixin.qq short.weixin.qq/cgi-bin/micromsg-bin/tenpay - MicroMessenger Client - - - - 715 0 7 1827
1441900755.480472 1441900756.762000 8618246899077 0131830068670612 2 1 1 IPHONE_4S 17875 61433 10.67.43.51 120.192.84.86 6 58684 312 71 img i.gtimg/qqshow/admindata/comdata/vip_emoji_aio_ios_new_config/xydata.json - QQ/5.7.0.469 CFNetwork/672.0.8 Darwin/14.0 .0 304 83 x-json - 0 0 18 1041
1441900754.860000 1441900755.480472 8618246899077 0131830068670612 2 1 1 IPHONE_4S 17875 61433 10.67.43.51 120.192.84.86 6 58684 312 71 img i.gtimg/club/item/avatar/zip/0/i0/all.zip - QQ/5.7.0.469 CFNetwork/672.0.8 Darwin/14.0.0 404 210 text/html 85 487 411 18 1041 1441900753.786000 1441900755.726000 8618246195634 9900026543899411 2 1 1 IPHONE_4S 17783 19302 10.67.29.55 111.40.194.207 6 49412 80 GET sb.symcd /MFYwVKADAgEAME0wSzBJMAkGBSsOAwIaBQAEFDmvGLQcAh85EJZW%2FcbTWO90hYuZBBROQ8gddu83U3pP8lhvlPM44tW93 wIQd9jUM82by0%2FVy957MNapGQ%3D%3D - securityd (unknown version) CFNetwork/672.0.2 Darwin/14.0.0 - - - - 522 0 18 1041
1441900761.308739 1441900761.408000 8615045213668 0127590050857822 2 1 1 IPHONE_4 17772 50621 10.67.63.219 183.232.95.61 6 49337 80 P OST szminorshort.weixin.qq szminorshort.weixin.qq/cgi-bin/micromsg-bin/rtkvreport - MicroMessenger Client - - - - 500 16 7 1827 1441900696.427624 1441900761.308739 8615045213668 0127590050857822 2 1 1 IPHONE_4 17772 50621 10.67.63.219 183.232.95.61 6 49337 80 P OST szminorshort.weixin.qq szminorshort.weixin.qq/cgi-bin/micromsg-bin/rtkvreport - MicroMessenger Client - - - - 500 16 7 1827 1441900693.219000 1441900696.427624 86150452136
68 0127590050857822 2 1 1 IPHONE_4 17772 50621 10.67.63.219 183.232.95.61 6 49337 80 P OST szminorshort.weixin.qq szminorshort.weixin.qq/cgi-bin/micromsg-bin/rtkvreport - MicroMessenger Client - - - - 502 16 7 1827 1441900750.845345 1441900753.537000 8618246195634 9900026543899411 2 1 1 IPHONE_4S 17783 19302 10.67.29.55 117.135.169.124 6 49411 8 0 GET b227.photo.store.qq /psb?/V12jlwSP30SPej/VE1V5LlXFMzHeg5gTzpyuCueaEVEGV*0X6BbSyJZRhs!/b/dCWGUIc.HQAA&ek=1&kp=1&pt=0&bo =yAD6AAAAAAABBxI!&t=5 v1_iph_sq_5.6.0_1_app_a-4-2 QQ/5.6.0.438 CFNetwork/672.0.2 Darwin/14.0.0 - - - - 792 0 18 1041
1441900748.094000 1441900750.845345 8618246195634 9900026543899411 2 1 1 IPHONE_4S 17783 19302 10.67.29.55 117.135.169.124 6 49411 8 0 GET b227.photo.store.qq /psb?/V12jlwSP30SPej/VE1V5LlXFMzHeg5gTzpyuCueaEVEGV*0X6BbSyJZRhs!/b/dCWGUIc.HQAA&ek=1&kp=1&pt=0&bo =yAD6AAAAAAABBxI!&t=5 v1_iph_sq_5.6.0_1_app_a-4-2 QQ/5.6.0.438 CFNetwork/672.0.2 Darwin/14.0.0 - - - - 792 0 18 1041
4.2 ⽤户上⽹记录统计(⼀⾏为⼀条记录).(⽤户:第3列)

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。