PySpark学习笔记(⼆)DataFrame上简单的SQL查询
记得原来看到⼀个类似官⽅⽂档的quick start页⾯,我是照着它写的,但不到了,如果有⼈告诉我将⼗分感谢。
Why SQL
以下只代表本⼈的理解。
可以⽀持SQL的⼀系列数据库操作是Spark的⼀⼤特性,当数据量很⼤时,传统的单机数据库⽆法负载。Spark可以把⽂件的数据内容读到内存中进⾏操作,所以只要集资源⾜够,很多SQL的操作是很快的!
以⼀个实际任务作为例⼦
HDFS上BOSS把⼀些数据放到了我的⽂件夹下,这就是我要处理的数据了,这次只是做⼀个简单的统计任务。
hdfs dfs -ls看了⼀下,发现在⽬标⽂件夹下数据被分成了很多份,类似这样 :sult,其中是按数字顺序的。具体的我还不了解,估计是和分布式的⼀些特性有关吧。(我这⾥埋了个隐患)
python怎么读文件夹下的文件夹开始写python代码
pyspark程序中,有⼀个概念:SparkSession,以⽬前我粗浅的理解,这相当于⼀个上下⽂环境,有了它你才好做⼀系列事情。
先来 from pyspark.sql import SparkSession
然后获取将要⽤到的Session,这边给个例⼦吧:
spark = SparkSession.builder.appName("test1").getOrCreate()
读取DataFrame
然后之前提过的,已经配置过默认的⽂件路径是在HDFS上,以JSON⽂件为例,你可以这样读取:
logData = ad.json(logFile)
logFile是你要读取的⽂件路径。
但是⽂件那么多份怎么读呢,我⾛了⼀些弯路这⾥记录⼀下
⽅法⼀:Union(很⿇烦不推荐)
就是⽂件⼀个⼀个读成DataFrame,然后union多个DataFrame,这是我⼀开始想到的办法,结果运⾏起来耗时很多很多。
⼀个⼩tip:⽂件名是s,格式化输⼊:
for i in range(1,800):
file_path="active_token/sult" % (i)
data_append= ad.json(file_path)
data_append= data_append.select(data_append['app_name'],data_append['identifier'])
ogData=logData.union(data_append)
print("file %d processed," % (i) )
最后logData就是union了全部的,你想要的
⽅法⼆:通配符读取(推荐)
写完上⾯的代码,跑得贼慢,我觉得不对啊,这个读取⽂件作为DataFrame应该是⼀个常规操作,怎么会这么⿇烦呢?于是我问了⼀下同事......
原来可以⽤通配符代表多个⽂件⼀次读取。你把⽂件名写成这样:"filePath/*"⼀次读取就成了,啊啊啊啊啊。
SQL查询
DataFrame要⽀持原⽣的SQL直接查询是需要创建视图的。
然后你就可以
DF=spark.sql("SELECT DISTINCT name,id FROM total_data WHERE app_name!='' AND identifier!='' ")
类似这样的查询,注意spark前⾯声明过,是Session,语句返回的也是⼀个DataFrame
DF.show()可以看⼀看格式化输出的DF。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。