编程实现Spark的WordCount的单词统计
需求⼀:使⽤PyCharm编程实现SparkCore的WordCount单词统计,并保存在HDFS中from pyspark import SparkConf,SparkContext
import os
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
# 当存在多个版本时,不指定很可能会导致出错
if __name__ == '__main__':
# 1.创建上下⽂,指定应⽤的名字和⽤谁的资源来跑
conf=SparkConf().setAppName("first_wordcount").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 2.加载⽂件形成⼀个RDD,RDD的每个元素是⽂本的每⼀⾏
File('file://file:///export/servers/')
# 过滤空⾏
rdd1 = rdd1.filter(lambda line: len(line.strip()) > 0)
# 3.进⼀步将⽂本内容打散成单词
rdd2=rdd1.flatMap(lambda line:line.split(" "))
# 4.为每个单词标记上1,形成⼀个元组,具有键值对数据结构,⽅便做bykey的操作
rdd3=rdd2.map(lambda word:(word,1))
# 5.进⼀步做reduceByKey,得到wordcount结果
duceByKey(lambda x,y:x+y)
# 6.结果打印到控制台
llect()
print('wordcount结果是:',arr)
# 7.结果输出到本地⽂件
rdd4.saveAsTextFile("hdfs://node1:8020/output/file1")
需求⼆:使⽤PyCharm编程实现SparkSQL的DSL和SQL⽅式WordCount单词统计
import pyspark.sql.functions as F
import os
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
# 当存在多个版本时,不指定很可能会导致出错
if __name__ == '__main__':
# 1.创建上下⽂对象
spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()
file_ad.text('file:///export/pyworkspace/pyspark_sz26/pyspark-sparksql-3.1.2/')
file_df.printSchema()
file_df.show(truncate=False)
# 3.注册成临时表
ateOrReplaceTempView('words_t')
# 4.做wordcount
print('SQL风格做wordcount')
spark.sql('''
select t.word,
count(*) as cnt
from
(select explode(split(value,' ')) as word from words_t) t
group by t.word
order by cnt desc ''').show()
print('DSL做wordcount')
file_df.plode(F.split('value',' ')).alias('word')) \
.groupBy('word') \
.
count() \
.orderBy('count',ascending=False) \
.show()
spark.stop()
需求三:使⽤PySpark读取json数据格式,多种⽅式查询字段并进⾏统计分析
import os
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
# 当存在多个版本时,不指定很可能会导致出错
if __name__ == '__main__':
# 1-创建上下⽂对象
spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()
sc = spark.sparkContext
df = ad.json('file:///export/pyworkspace/pyspark_sz26/pyspark-sparksql-3.1.2/data/employee.json') # (1)查询所有数据;
df.show()
# (2)查询所有数据,并去除重复的数据;
df.distinct().show()
# (3)查询所有数据,打印时去除id字段;
df.drop('id').show()
# (4)筛选出age>30的记录;
df.where('age>30').show()
# (5)将数据按age分组;
# (6)将数据按name升序排列;
# (7)取出前3⾏数据;
lambda编程print(df.take(3))
# (8)查询所有记录的name列,并为其取别名为username;
df.select(df.name.alias('username')).show()
# (9)查询年龄age的平均值;
from pyspark.sql.functions import avg
df.agg(avg('age')).show()
# (10)查询年龄age的最⼩值。
from pyspark.sql.functions import min
df.agg(min('age')).show()
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论