spark调⽤python算法_⽤Python语⾔写Spark
001
PySpark 基础
Spark 是⽬前⼤数据处理的事实标准。PySpark能让你使⽤Python语⾔来写Spark程序。
我们先做⼀个最简单的字符数统计程序。这样我们就知道⼀个PySpark程序是什么样⼦,以及如何运转起来。
我们准备⼀个⽂件a.csv。⾥⾯的内容如下:
a b c,1.0a b,2.0c,3.0d,4.0
然后我们打开编辑器,⽐如我这⾥是Intellij IDEA。 新建⼀个myfirstpyspark.py⽂件。
PySpark需要你有⼀个SparkSession对象,这是⼀切的开始,在这⾥你可以做⼀些配置。
from pyspark.sql import SparkSession
session = SparkSession.builder.master("local[*]").appName("test").getOrCreate()
其中"local[*]"表⽰我们执⾏的是local模式,也就是单机模式,*表⽰使⽤所有的CPU核⼼,如果你写成local[2]那么表⽰单机模式,并且使⽤两个核。
在很多情况下,单机模式⾜够我们使⽤,因为他已经是多线程并⾏运⾏了,这⽐我们⾃⼰完成⼀个多线程的程序来的简单,⽽且,Spark可以让你的代码看起来就像单机,他⾃动完成分布式⼯作。
如果愿意,还可以运⾏在Standalone,Yarn,Mesos等模式下,它们都是真正的分布式模式。Spark是⼀个典型的Master-Slave结构。Master负责解释你写的代码,Slave则负责执⾏你的代码。
python怎么读取json文件构建好了session之后,现在可以去读⽂件了。
from pes import *import pyspark.sql.functions as f
df = ad.csv( "a.csv",
encoding="utf-8",
header=False,
schema=StructType(
[StructField("text", StringType()),
StructField("index", StringType())]))
这⾥我使⽤了⼀个较为复杂的⽅式构建Dataframe, Dataframe你可以简单理解为SQL的编程表达形式。不使⽤常规教程提及的RDD API 的原因是因为Spark花了⼤⼒⽓将⼤部分东西都迁移到Dataframe上,我们也就不要倒⾏逆施了。实际场景中,RDD⾮常灵活,但是往往导致代码难以管理和维护。Dataframe API ⽐较受限,但是更”SQL”化,更”结构化”,⼤家写出来的会是基本⼀致的,并且性能更好。
除了读取CSV⽂件,你还可以读取parquet, json, elasticsearch,mysql 等其他存储器。
让我们回转过来,读取⼀个csv⽂件的⽅式如同上⾯的⽰例代码,具体参数我会简单说明下。
header 指的是是否有头部。CSV⽂件通常第⼀⾏是列名。我们这⾥设置为False,因为构建的⽂件并没有这个需求。
schema 可以指定CSV每列的名字,类型。⼤家只要记住是这么写就OK了。这种⽅式⾮常有⽤。对于⾮结构化数据,你可以简单认为他是只有⼀列的结构化数据。
现在,我们已经有了⼀张名字叫’df’的表,这张表有两个字段,text 和 index。 现在想统计text⾥每个词汇出现的次数。这在pyspark ⾥只需要⼀⾏代码就可以搞定。
df.plode(f.split("text", " ")).alias("word")).\
groupBy("word").\
count().\
show()
最后运⾏的结果如下:
通过导⼊import pyspark.sql.functions as f 我们可以使⽤Spark内置的UDF函数并且使⽤f来进⾏引⽤,额外的收获是你可以获得更友好的代码提⽰。
select 你可以理解为SQL中select语法的⼀种编程形态。 先对text字段按空格切分,然后通过expode展开为多⾏,并且宠幸把⾏命名为word。对应的SQL为:
select explode(split("text"," ")) as word from ....
接着按word字段进⾏groupBy,然后count。count其实是⼀个简写形式,我们还可以这么写:
df.plode(f.split("text", " ")).alias("word")).\
groupBy("word").\
unt("word").alias("wordCount")).\
show()
agg ⾥⾯会填写⼀个聚合函数,配合groupBy使⽤,这和我们使⽤SQL的⽅式是⼀致的。现在增加个额外需求,我想排个序,应该怎么弄呢?也不是难事:
df.plode(f.split("text", " ")).alias("word")).\
groupBy("word").\
unt("word").alias("wordCount")).\
orderBy(f.desc("wordCount")).\
show()
如果觉得官⽅提供的UDF函数满⾜不了需求,我们也是可以⾃定义函数的。 定义⼀个普通的python函数:
def my_split(col):
return col.split(" ")
把my_split封装为⼀个UDF函数:
udf_my_split = f.udf(my_split, ArrayType(StringType()))
ArrayType(StringType()) 指的是返回类型。接着就是使⽤它:
df.plode(udf_my_split("text")).alias("word")). \
groupBy("word"). \
unt("word").alias("wordCount")). \
orderBy(f.desc("wordCount")). \
show()
现在我们看看如何运⾏这个程序。截⽌到现在,你的代码看起来应该是这个样⼦的。
# -*- coding: UTF-8 -*-import osfrom pyspark.sql import SparkSessionfrom pes import *import pyspark.sql.functions as f
spark = SparkSession.builder.master("local[*]").appName("test").getOrCreate()
df = ad.csv( "YOUR-PATH/a.csv",
encoding="utf-8",
header=False,
schema=StructType(
[StructField("text", StringType()),
StructField("index", StringType())]))def my_split(col):
return col.split(" ")
udf_my_split = f.udf(my_split, ArrayType(StringType()))
df.plode(udf_my_split("text")).alias("word")). \
groupBy("word"). \
unt("word").alias("wordCount")). \
orderBy(f.desc("wordCount")). \
show()
我们现在需要运⾏这段程序,具体⽅式如下:
export PYTHONIOENCODING=utf8;./bin/spark-submit \--driver-memory 2g \--master "local[*]" [YOUR-PATH]/my_first_pyspark.py
截⽌⽬前为⽌,我们已经能够⽤PySpark开发应⽤了,并且了解了⾥⾯的⼀些⾼级⽤法,⽐如⾃定义UDF函数的使⽤。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。