SparkSQL⾼级篇(⼀)聚合操作
SparkSQL ⾼级篇(⼀) 聚合操作
聚合操作
聚合操作是⼤数据分析过程中⼀个常⽤的操作,⽤来分析汇总数据,或者⽣成汇总报告。
聚合通常需要对整个数据集或⼀个或多个列进⾏某种形式的分组,然后对每个组应⽤聚合函数,如求和、计数或求平均值。Spark提供了许多常⽤的聚合函数。这⾥介绍下基本的聚合函数和分组聚合操作函数。
⾸先创建⼀个⽤于demo的DataFrame
val flight_summary = ad.format("csv").option("header","true")
.option("inferSchema","true").load("flight-summary.csv")
unt()
Long = 4693
这⾥的count()是DataFrame的⼀个Action。
⽽我们这次介绍的聚合函数中的count()是⼀个function,所有的聚合函数都是延迟计算的函数
基本聚合函数
count(col)
countDistinct(col)
approx_count_distinct(col)
min(col)
max(col)
sum(col)
sumDistinct(col)
avg(col)
skewness(col)
kurtosis(col)
variance(col)
stddev(col)
collect_list(col)
collect_set(col)
⾸先创建⼀个DataFrame
1. count(col)函数
count(col)函数⽤于统计⼀个分组中的项⽬的数量。⽐如统计数据中某些列的数量
例⼦:
flight_summary.select(count("origin_airport"), count("dest_airport").as("dest_count")).show
输出为:
+----------------------+-----------+
| count(origin_airport)| dest_count|
+----------------------+-----------+
|                  4693|      4693|
+----------------------+-----------+
为了易读性这⾥⽤.as对结果列重命名,并使⽤show查看结果
默认情况下count(col)统计⼀个列中的条⽬数的时候是不包含值为null的那些⾏的。
如果要统计包含值为null的那些⾏,需要使⽤count(*)。
具体看下⾯这个例⼦
⾸先数据如下:
badMoviesDF.show
±---------±-------------±-------------+
|actor_name| movie_title| produced_year|
±---------±-------------±-------------+
| null| null| 2018|
| John Doe| Awesome Movie| 2018|
| null| Awesome Movie| 2018|
| Mary Jane| Awesome Movie| 2018|
±---------±-------------±-------------+
然后执⾏查询
badMoviesDF.select(count("actor_name"), count("movie_title"), count("produced_year"), count("*")).show
结果如下:
±-----------------±------------------±--------------------±--------+
| count(actor_name)| count(movie_title)| count(produced_year)| count(1)|
±-----------------±------------------±--------------------±--------+
| 2| 3| 4| 4|
±-----------------±------------------±--------------------±--------+
可以看到count(col) 不会包含列值为null的那些⾏。
2. countDistinct(col)
countDistinct(col)和count(col)类似,从名字也可以看出countDistinct(col)只包含值不重复的那些⾏。例⼦
flight_summary.select(countDistinct("origin_airport"), countDistinct("dest_airport"), count("*")).show
±------------------------------±----------------------------±--------+
| count(DISTINCT origin_airport)| count(DISTINCT dest_airport)| count(1)|
±------------------------------±----------------------------±--------+
| 322| 322| 4693|
±------------------------------±----------------------------±--------+
从结果中可以看出其数量远⼩于count(col)得到的数量。
3. approx_count_distinct (col, max_estimated_error=0.05)
当⼀个数据集很⼤时,精确的统计⼀个数据集中不重复⾏的数量是⼀个⼗分耗时的操作。
在某些情况下我们只需要得到⼀个对不重复⾏数进⾏估计的总数。
既然是估计就有误差,⽤户可以通过max_estimated_error指定可以接受的误差。
例⼦:
flight_summary.select(count("count"),countDistinct("count"), approx_count_distinct("count", 0.05)).show
±-------------±---------------------±----------------------------+
| count(count) | count(DISTINCT count)| approx_count_distinct(count)|±-------------±---------------------±----------------------------+
| 4693| 2033| 2252|
±-------------±---------------------±----------------------------+
可以看到在4693⾏中实际不重复数是2033,估计得到的值是2252。
当允许的误差越⼤时,执⾏的速度就越快。
4. min(col), max(col)
这两个聚合函数很容易理解,就是统计列中的最⼩值和最⼤值。
例⼦:
flight_summary.select(min("count"), max("count")).show
结果:
±----------±----------+
| min(count)| max(count)|
±----------±----------+
| 1| 13744|
±----------±----------+
5. sum(col)
对列中所有值进⾏求和操作。
flight_summary.select(sum("count")).show
结果:
±----------+
| sum(count)|
±----------+
| 5332914|
±----------+
6. sumDistinct(col)
就像名字⼀样,对列中不重复值进⾏求和操作。
flight_summary.select(sumDistinct("count")).show
结果:
±-------------------+
| sum(DISTINCT count)|
±-------------------+
| 3612257|
±-------------------+
7. avg(col)
求平均函数。其值等价于sum(col)/count(col)
例⼦:
flight_summary.select(avg("count"), (sum("count") / count("count"))).show
±------------------±---------------------------+
| avg(count)| (sum(count) / count(count))|
±------------------±---------------------------+
| 1136.3549968037503| 1136.3549968037503|
±------------------±---------------------------+
8. variance(col), stddev(col)
计算⽅差和标准差函数。⽅差(variance)和标准差(stddev)
例⼦:
flight_summary.select(variance("count"), var_pop("count"), stddev("count"), stddev_pop("count")).show
结果:
±----------------±-----------------±-----------------±----------------+
| var_samp(count)| var_pop(count)|stddev_samp(count)|stddev_pop(count)|
±----------------±-----------------±-----------------±----------------+
|1879037.7571558713|1878637.3655604832| 1370.779981308405| 1370.633928355957|
±----------------±-----------------±-----------------±----------------+
分组聚合函数
分组执⾏聚合是⼀个分两步的过程。第⼀步是使⽤groupBy(col1,col2,.)执⾏分组,转换,可以在其中指定按哪些列对⾏进⾏分组。与返回DataFrame的其他转换不同,这⾥的groupBy转换返回RelationalGroupedDataset类的实例,然后可以对其应⽤⼀个或多个聚合函数。
1. ⼀个简单的在⼀个列上应⽤⼀个聚合的例⼦
upBy("origin_airport").count().show(5, false)
groupby是什么函数
结果:
±-------------------------------------------------±-----+
| origin_airport | count|
±-------------------------------------------------±-----+
|Melbourne International Airport | 1|
|San Diego International Airport (Lindbergh Field) | 46|
|Eppley Airfield | 21|
|Kahului Airport | 18|
|Austin-Bergstrom International Airport | 41|
±-------------------------------------------------±-----+
2. 通过两个列进⾏分组,并应⽤count() 统计个数
upBy('origin_state, 'origin_city).count
.where('origin_state === "CA").orderBy('count.desc).show(5)
±------------±----------------±------+
| origin_state| origin_city| count|
±------------±----------------±------+
| CA| San Francisco| 80|
| CA| Los Angeles| 80|
| CA| San Diego| 47|
| CA| Oakland| 35|
| CA| Sacramento| 27|
±------------±----------------±------+
类RelationalGroupedDataset提供了⼀组可应⽤于每个⼦组的标准聚合函数。
它们是avg(cols),count(),mean(cols),min(cols),max(cols)和sum(cols)。
除count()函数外,其余所有函数都对数值列进⾏操作。
以上两个例⼦都只应⽤了⼀个聚合函数,下⾯举⼏个多个聚合函数的例⼦。
⽐如我们想同时统计⼀个列的⾏数,最⼤值,最⼩值等。
核⼼词是agg函数
upBy("origin_airport")
.
agg(
count("count").as("count"),
min("count"), max("count"),
sum("count")
).show(5)
结果为:
±-------------------±-----±----------±----------±----------+
| origin_airport| count| min(count)| max(count)| sum(count)|
±-------------------±-----±----------±----------±----------+
|Melbourne Interna…| 1| 1332| 1332| 1332|
|San Diego Interna…| 46| 4| 6942| 70207|
| Eppley Airfield| 21| 1| 2083| 16753|
| Kahului Airport| 18| 67| 8313| 20627|
|Austin-Bergstrom …| 41| 8| 4674| 42067|
±-------------------±-----±----------±----------±----------+
通过agg函数我们同时对⼀个分组进⾏了求和,求最⼤值,求最⼩值,计数的操作。
此外agg函数提供了另⼀种通过基于字符串的键值映射来表⽰列表达式的⽅法。键是列名,值是聚合函数。⽐如:
upBy(“origin_airport”)
.agg(
“count” -> “count”,
“count” -> “min”,
“count” -> “max”,
“count” -> “sum”)
.show(5)
3. collect_list(col)和collect_set(col)
collect_list(col)和collect_set(col)⽤来在分组后收集特定组的所有值。
唯⼀的区别是collect_list(col)返回可能包含重复值的集合。
collect_set(col)返回仅包含唯⼀值的集合就像list和set的区别⼀样。
例⼦:
val highCountDestCities = flight_summary.where('count > 5500)
.groupBy("origin_state")
.agg(collect_list("dest_city").as("dest_cities"))
highCountDestCities.withColumn("dest_city_count", size('dest_cities)).show(5, false) # withColumn 增加⼀列
结果:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。