SparkSQL(13):窗⼝函数和groupby执⾏顺序
⼀、实现功能
当sql中既有groupby⼜有窗⼝函数,那么两者的执⾏顺序是什么样?这个特此研究⼀下,⽅便后续有使⽤的时候会⽅便。
⼆、实际例⼦
3.1案例数据
/opt/datas/score.json,学⽣名字、课程、分数
{"name":"A","lesson":"Math","score":100}
{"name":"B","lesson":"Math","score":100}
{"name":"C","lesson":"Math","score":99}
{"name":"D","lesson":"Math","score":98}
{"name":"A","lesson":"E","score":100}
{"name":"B","lesson":"E","score":99}
{"name":"C","lesson":"E","score":99}
{"name":"D","lesson":"E","score":98}
Spark读取数据,并且注册临时表
scala>val dfJson = ad.format("json").load("file:///opt/datas/score.json")
dfJson: org.apache.spark.sql.DataFrame =[lesson: string, name: string ...1 more field]
scala> dfJson.show
+------+----+-----+
|lesson|name|score|
+------+----+-----+
|  Math|  A|100|
|  Math|  B|100|
|  Math|  C|99|
|  Math|  D|98|
|    E|  A|100|
|    E|  B|99|
|    E|  C|99|
|    E|  D|98|
+------+----+-----+
scala> ateOrReplaceTempView("score")
3.2 进⾏分析实例
(1)单纯groupby求聚合后数量
scala>  spark.sql("select  lesson,count(*) from score  group by lesson").show
+------+--------+
|lesson|count(1)|
+------+--------+
groupby是什么函数|    E|      4|
|  Math|      4|
+------+--------+
打印执⾏计划
scala>  spark.sql("select  lesson,count(*) from score  group by lesson").explain
== Physical Plan ==
*(2) HashAggregate(keys=[lesson#315], functions=[count(1)])
+- Exchange hashpartitioning(lesson#315, 200)
+- *(1) HashAggregate(keys=[lesson#315], functions=[partial_count(1)])
+- *(1) FileScan json [lesson#315] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/home/mip/chongliang/tmp/score.json], PartitionFilte rs: [], PushedFilters: [], ReadSchema: struct<lesson:string>
(2)单纯窗⼝函数,以lesson为窗⼝,计数
scala> spark.sql("select  lesson,count(*) over (partition by lesson order by name) as count from score ").show
+------+-----+
|lesson|count|
+------+-----+
|    E|1|
|    E|2|
|    E|3|
|    E|4|
|  Math|1|
|  Math|2|
|  Math|3|
|  Math|4|
+------+-----+
打印执⾏计划
scala> spark.sql("select  lesson,count(*) over (partition by lesson order by name) as count from score ").explain
== Physical Plan ==
*(3) Project [lesson#315, count#2426L]
+- Window [count(1) windowspecdefinition(lesson#315, name#316 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), curr entrow$())) AS count#2426L],[lesson#315],[name#316 ASC NULLS FIRST]
+-*(2) Sort [lesson#315 ASC NULLS FIRST, name#316 ASC NULLS FIRST],false,0
+- Exchange hashpartitioning(lesson#315,200)
+-*(1) Project [lesson#315, name#316]
+-*(1) FileScan json [lesson#315,name#316] Batched:false, Format: JSON, Location: InMemoryFileIndex[file:/home/mip/chongliang/tmp/score.jso n], PartitionFilters:[], PushedFilters:[], ReadSchema: struct<lesson:string,name:string>
(3)同时groupby,然后开窗⼝
scala> spark.sql(" select lesson,count(*) over (partition by lesson) as count from score  group by lesson").show
+------+-----+
|lesson|count|
+------+-----+
|    E|1|
|  Math|1|
+------+-----+
打印执⾏计划
scala> spark.sql(" select lesson,count(*) over (partition by lesson) as count from score  group by lesson").explain
== Physical Plan ==
Window [count(1) windowspecdefinition(lesson#315, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS count#2662 L],[lesson#315]
+-*(2) Sort [lesson#315 ASC NULLS FIRST],false,0
+-*(2) HashAggregate(keys=[lesson#315], functions=[])
+- Exchange hashpartitioning(lesson#315,200)
+-*(1) HashAggregate(keys=[lesson#315], functions=[])
+-*(1) FileScan json [lesson#315] Batched:false, Format: JSON, Location: InMemoryFileIndex[file:/home/mip/chongliang/tmp/score.json], Partition Filters:[], PushedFilters:[], ReadSchema: struct<lesson:string>
三、总结
所以,Sparksql窗⼝函数,是在聚合groupby后,再在聚合集合的基础上去开窗的。也就是先执⾏groupby,然后,执⾏窗⼝函数。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。