Flink⼊坑指南第四章:SQL中的经典操作GroupBy+Agg Flink⼊坑指南系列⽂章,从实际例⼦⼊⼿,⼀步步引导⽤户零基础⼊门实时计算/Flink,并成长为使⽤Flink的⾼阶⽤户。
简介
Group By + Agg这个最经典的SQL使⽤⽅式。Group By是SQL中最基础的分组操作,agg的全称是aggregation(聚合操作),是⼀类SQL算⼦的统称,Flink中最常⽤的Agg操作有COUNT/SUM/AVG等,详情参见。在实际使⽤中,Group By+Agg绝⼤部分场景下都会⼀起出现。作为最常⽤的SQL模式,学习好这种模式的最优写法,也就⾮常重要了。本章从两个需求开始,进⼀步了解⼀下Group By + Agg 模式的最优写法,及实时计算产品/Alibaba Flink版本中的部分优化.
需求
上⼀章中,⼩明已经把第⼀个需求完成了,同时也了解了,等流计算中的基础概念。熟悉了Flink/的基础⽤法之后,⼩明开始着⼿开发其他的需求:
1. 从0点开始,每个类⽬的成交额
2. 从0点开始,每个店铺的uv/pv
3. 从0点开始,每个⽤户点击了多少商品,多少店铺
Group By + Agg
先看需求1:从0点开始,每个类⽬的成交额。进⼊Flink的原始数据结构如下:
ctime category_id shop_id item_id price
2018-12-04 15:44:54cat_01shop_01item_0110
2018-12-04 15:45:46cat_02shop_02item_0211.1
2018-12-04 15:46:11cat_01shop_03item_0312.4
FlinkSQL代码如下,看上去与传统数据库/批处理的SQL相同:
SELECT
date_format(ctime, '%Y%m%d') as cdate, -- 将数据从时间戳格式(2018-12-04 15:44:54),转换为date格式(20181204)
category_id,
sum(price) as category_gmv
FROM src
GROUP BY date_format(ctime, '%Y%m%d'), category_id; --按照天做聚合
以这个例⼦⼊⼿,着重说明Group By+Agg通⽤模式的两个问题:
1. 计算特点
2. 常见问题及解法
Group by+Agg模式在底层的有⼀些特点:
1. Group by分组操作,会产⽣数据shuffle
2. 按Key的agg操作,最终都需要落到同⼀个物理进程上才能保证计算的正确性
以这个最简单SQL为例,其数据流程图如下,不同颜⾊代表不同的category_id:
数据源进来的数据先经过group by进⾏分组,同⼀个key的数据被分到同⼀个worker上之后再进⾏聚合操作。特点2就决定了,Group By + Agg 模式中,SQL作业性能与数据分布⾮常相关,如果数据中存在__数据倾斜__,也就是某个key的数据异常的多,那么某个聚合节点就会成为瓶颈,作业就会有明
显的反压及延时现象。
为了解决这个问题,就需要将堵住的聚合节点进⾏拆分,优化后的SQL如下:
SELECT cdate,category_id,sum(category_gmv_p) as category_gmv
FROM(
SELECT
date_format(ctime, '%Y%m%d') as cdate, -- 将数据从时间戳格式(2018-12-04 15:44:54),转换为date格式(20181204)
category_id,
sum(price) as category_gmv_p
FROM src
GROUP BY category_id, mod(hash_code(FLOOR(RAND(1)*1000), 256),date_format(ctime, '%Y%m%d'); --按照天做聚合
)
GROUP BY cdate,category_id
SQL中做了将⼀个Group By+Agg拆称了两个,⼦查询⾥按照category_id和mod(hash_code(FLOOR(RAND(1)*1000), 256)分组,将同⼀个category_id上的数据打散成了256份,先做⼀层聚合。外层Group By+Agg,将⼦查询聚合后的结果再次做聚合。这样通过两层聚合的⽅式,即可⼤⼤缓解某聚合节点拥堵的现象。其数据流程图如下:
如果⽤户⽤的是开源Flink1.7版本,如果作业出现数据倾斜情况,就需要按以上⽅法对SQL进⾏改造,以提⾼作业吞吐,降低由于数据倾斜造成的业务延时。
相关函数⽤法,, ,
在__使⽤ Flink版本,针对这种情况做了特殊优化,使⽤Local-Global Agg的⽅式完美解决了Group By+Agg模式中的数据倾斜问题,⽤户使⽤第⼀种(最简单)的SQL即可。__关于Local-Global Agg原理⽅⾯的介绍,后续会有专门⽂章,敬请期待。
GroupBy+单Distinct Agg
第⼆个需求:计算从0点开始,每个店铺的uv/pv
原始数据:
ctime category_id shop_id item_id uid action
2018-12-04 15:44:54cat_01shop_01item_0110001
2018-12-04 15:45:46cat_02shop_02item_0210001
2018-12-04 15:46:11cat_01shop_03item_0310002
其中action有三种:
0: 浏览
1: 点击
2: 加购
3: 购买
经过这段时间的学习,⼩明三两下就写出SQL:
SELECT
date_format(ctime, '%Y%m%d') as cdate, -- 将数据从时间戳格式(2018-12-04 15:44:54),转换为date格式(20181204)
shop_id,
count(distinct uid) as shop_uv, -- shop uv
count(uid) as shop_pv -- show pv
FROM src
GROUP BY date_format(ctime, '%Y%m%d'), shop_id; --按照天做聚合
同样,按照上节所述,如果这个作业出现了数据倾斜的现象,就需要将SQL优化为:
select
cdate,
shop_id,
sum(shop_uv_partial) as shop_uv,
sum(shop_pv_partial) as shop_pv
from (
select
date_format(ctime, '%Y%m%d') as cdate, -- 将数据从时间戳格式(2018-12-04 15:44:54),转换为date格式(20181204)
shop_id,
count(distinct uid) as shop_uv_partial,
count(uid) as shop_pv_partial
from src
group by shop_id, mod(hash_code(uid), 256),date_format(ctime, '%Y%m%d')
)
group by cdate,shop_id
本例⼦中,将原始SQL中的⼀层查询,拆成了两层查询。内层⼦查询,按照shop_id和mod(hash_code(uid),256)做聚合,将同⼀个shop_id的数据打散到多个节点中。外层查询,将⼦查询聚合后的结果,再按shop_id聚合。通过两层聚合即可⼤⼤缓解数据倾斜情况下聚合节点的压⼒。
Group By+Agg场景与Group By+Distinct Agg场景的主要区别,在于state中存储的数据。上⼀章中提到过,Flink是增量计算,state中会保存增量数据,⽐如上次SUM的值等等,但是在DISTINCT计算过程中,就需要保留所有的distinct的key,在本例⼦中,就是uid。且在每⼀次计算过程中,都要查询当前state中是否有同⼀个uid,并计数。因此在⼤数据量情况下distinct节点往往成为Flink作业的瓶颈。需要通过扩并发等⽅式解决。
同样,在使⽤ Flink版本,针对这种情况做了特殊优化,使⽤Partial-Final Agg的⽅式完美解决了Group By+Distinct Agg模式中的数据倾斜问题,⽤户使⽤第⼀种(最简单)的SQL即可。关于Partial-Final Agg原理⽅⾯的介绍,后续会有专门⽂章,敬请期待。
Group By+多Distinct Agg
第三个需求:从0点开始,每个⽤户点击了多少商品,多少店铺,以及该⽤户总点击item次数。原始数据如下:
ctime category_id shop_id item_id uid action
2018-12-04 15:44:54cat_01shop_01item_0110001
2018-12-04 15:45:46cat_02shop_02item_0210001
2018-12-04 15:46:11cat_01shop_03item_0310002
经过⼀番思索,⼩明写出了如下SQL:
SELECT UDTF
date_format(ctime, '%Y%m%d') as cdate, -- 将数据从时间戳格式(2018-12-04 15:44:54),转换为date格式(20181204)
uid,
count(distinct shop_id) as shop_cnt,
count(distinct item_id) as item_cnt,
count(item_id) as click_cnt
FROM src
GROUP BY date_format(ctime, '%Y%m%d'), uid;
需求2相⽐,SQL中distinct个数变成了多个,这种情况下要优化SQL就更复杂了。有⼀种⽐较原始的做法:
1. 先使⽤UDTF,将原始数据⼀⾏拆成多⾏,每⾏添加n+1列,n为distinct的个数。n列分别对distinct的值做hash。具体例⼦如下:ctime category_id shop_id item_id uid action hash_shop hash_item flag
2018-12-04 15:44:54cat_01shop_01item_0110001hash(shop_01)null flag0 2018-12-04 15:44:54cat_01shop_01item_0110001null hash(item_01)flag1 2018-12-04 15:44:54cat_01shop_01item_0110001null null flag2 2018-12-04 15:45:46cat_02shop_02item_0210001hash(shop_02)null flag0 2018-12-04 15:45:46cat_02shop_02item_0210001null hash(item_02)flag1 2018-12-04 15:45:46cat_02shop_02item_0210001null null flag2 2018-12-04 15:46:11cat_01shop_03item_0310002hash(shop_03)null flag0 2018-12-04 15:46:11cat_01shop_03item_0310002null hash(item_03)flag1 2018-12-04 15:46:11cat_01shop_03it
em_0310002null null flag2
1. 在SQL中,先在⼦查询中分别计算各指标的count值,在外层再做⼀层sum即可,SQL⽰例如下:
select
cdate,
uid,
sum(shop_cnt_p) as shop_cnt,
sum(item_id_p) as item_id_cnt,
sum(item_cnt_p) as item_cnt
from (
select
date_format(ctime, '%Y%m%d') as cdate,
每天学点sql经典句子
uid,
count(distinct shop_id) filter (where flag = flag0) as shop_cnt_p,
count(distinct item_id) filter (where flag = flag1) as item_id_p ,
sum(item_id) filter (where flag = flag2) as item_cnt_p
from Expand_T
group by uid, hash_user, hash_shop, date_format(ctime, '%Y%m%d')
)
group by uid
这种问题可以解决多个distinct中的数据倾斜问题,但是会增加sql复杂度,并且计算过程中数量会膨胀,并且占⽤更多资源。
同样,在使⽤ Flink版本,针对这种情况做了特殊优化,使⽤Partial-Final Agg+Incremental Agg的⽅式完美解决了Group By+多个Distinct Agg模式中的数据倾斜问题,⽤户不需要在SQL上做拆分。关于Par
tial-Final Agg+Incremental Agg原理⽅⾯的介绍,后续会有专门⽂章,敬请期待。
数据倾斜相关配置
在使⽤实时计算产品时,如果遇到数据倾斜问题,可以增加以下配置,即可解决,不需要⼿动进⾏SQL优化。
# 开启5秒的microbatch
blink.microBatch.allowLatencyMs=5000
blink.miniBatch.allowLatencyMs=5000
blink.miniBatch.size=20000
# Local 优化,默认已经开启
# abled=true
# 开启 Partial 优化,解决count distinct热点
abled=true
# Incremental 优化,默认已经开启
# abled=true
本⽂为云栖社区原创内容,未经允许不得转载。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。