FlinkSQL内置优化参数功能以及适⽤场景介绍
前⾔
这⼏天在看 Flink SQL 内置优化参数的功能和原理,虽然⽹上会有⼀些⽂章介绍,这⾥还是⾃⼰做⼀个整体的总结和思考,⽅便⾃⼰以后的回顾。
Flink SQL 内置的优化参数是 Blink Planner ⾥⾯的功能,也就是 1.9 以后 Blink Planner ⾃带功能,从 Flink 1.11 开始,Blink Planner 已经成为 Flink 默认的 Planner,⽬前聚合优化参数是针对⽆界流⾮窗⼝类聚合,窗⼝类聚合优化参数未来会进⾏⽀持。下⾯开始讲解⼀下 Flink SQL 优化参数的功能以及其适⽤场景,官⽹相关参考:Streaming Aggregation。
结论
1. 针对⽆界流⾮窗⼝聚合,在数据量⾮常⼤的情况下,如果业务⽅允许⼀定的时延,那么可以配置 Mini Batch 参数,通过牺牲⼀点延
迟,降低对于状态的频繁操作,换取更⼤的吞吐量。同时对于可撤回流做⼆次聚合时,配置该参数,尽可能降低因数据记录撤回导致数据抖动的问题。
2. 对于 SUM,COUNT, MAX, MIN,AVG 等⾮ Distinct 聚合时,为了防⽌数据倾斜对实时作业的影响,可以配置 Local-Global
Aggregation 参数,⼀般公司内部的实时计算平台会⽀持该参数的配置。另外⼀种⽅式,可以在消息记录后⾯加⼀个随机数,然后聚合时,group by 后⾯的 Key 再加上这个随机数,先打散明细记录再聚合,然后再在该聚合流上进⾏⼀次聚合,key 还是之前的业务聚合 key,这种情况也需要配置 Mini Batch,防⽌数据抖动。
3. 针对 Distinct 类聚合,配置 table.optimizer.abled 参数,尽可能减低数据倾斜对于实时作业的影响。
4. 针对 Distinct 类聚合,同时多个指标都是相同的聚合类型和 Key 时,只是聚合条件不同,可以使⽤ FILTER 代替 CASE WHEN,能
够减⼩对于状态的访问以及状态的存储⼤⼩。
⼀、Mini Batch 优化参数
1.1 Mini Batch 介绍
默认情况下,在⽆界流聚合场景下,每来⼀条记录,会经历下⾯三个步骤:
1. 会先获取到这条记录的所对应的 Key,从状态后端获取其状态值
2. 通过聚合函数,结合之前状态,进⾏结果计算
3. 将新的结果值写⼊到状态后端中
当数据量⾮常⼤时,由于每条记录都需要经过上⾯三个步骤,同时还涉及到序列化和反序列化,所以此时这种场景下,实时作业的吞吐量以及 RocksDB StateBackend 负载都会受到很⼤的影响,所以在 Blink Planer 中就引⼊了 MiniBatch 功能。
MiniBatch 的本质还是在内存中缓存⼀批数据,通过周期性时间或者缓存的记录数到达预设值时,会触发计算。简单理解,会将记录存储在⼀个 HashMap 中,Key 就是业务聚合 Key,Value 是这个 Key
sql优化的几种方式的消息记录集合,之后会遍历内存的数据(通过 Key),先获取该 Key 之前的状态值,将内存中缓存的数据参与到状态计算,最终写⼊到状态后端中。通过对数据攒批处理后,降低对于状态后端的操作,从⽽提升实时作业的吞吐量。Mini Batch 功能是 Flink 在吞吐量以及延迟之间做的权衡。
开启 Mini Bathch 功能有三个参数:
// instantiate table environmentTableEnvironment tEnv = ...// access flink configurationConfiguration configuration = Config().getConfiguration();// s et low-level key-value optionsconfiguration.setString("abled", "true");configuration.setString("ini-batch.allow-latency", "
5 s"); configuration.setString("ini-batch.size", "5000");
1.2 Mini Batch 适⽤场景
个⼈认为 Mini Batch 参数开启的适⽤场景有两点:
1. 应⽤场景为⽆界流⾮窗⼝聚合时,⽽且实时任务的数据量⾮常⼤,业务⽅能够允许实时作业有⼀定延迟,这种情况下,你可以牺牲⼀
点点延迟,来换取更⼤的实时任务的吞吐量。
2. 对于可撤回流的⼆次聚合,引⼊该参数,尽可能降低聚合值突然变⼩⽽后⼜恢复正常值的抖动。⽐如下⾯统计最新 word 的次数:
select word,count(*) as cnt from (select name,last_value(word) as new_word from source group name) as t group word 由于内层逻辑是⼀个聚合场景,同时实时数据也可能⼀直在变,所以内层结果存在撤回情况。当外层聚合逻辑遇到撤回记录时,会减去撤回消息记录 key 的相关结果值,然后在根据新发送的记录进⾏统计,所以就可能导致结果抖动,尤其在⼤促期间,⼤屏实时统计类任务,这种会造成业务⽅的疑问和担⼼,为什么结果值变⼩了,数据会不会丢失等等。引⼊ Mini Batch 参数,可以对⼀批数据进⾏计算后,在进⾏结果更新,尽可能减少这种数据抖动的情形。
⼆、Local-Global Aggregation
Local-Global 聚合参数主要解决⾮ Distinct 聚合场景下,⽐如 SUM, COUNT, MAX, MIN, AVG,数据倾斜问题。Flink 在进⾏ keyBy 时,相同的 Key 肯定会到同⼀ TaskManager 中,所以如果某类 Key 数据量过多时,会造成某个 TaskManager 负载过⾼,极端情况可能会导致实时作业反压,Checkpoint 超时失败等问题。
Flink Local-Global 聚合类似 Hadoop MapReduce 任务的 Combine,先在上游将结果本地聚合好,在发送聚合后的数据到下游,⼤⼤降低了发送到下游的数据量(将明细数据转换成聚合后数据),从⽽解决数据倾斜问题。下⾯是 Flink Local-Global 聚合⽰意图:
使⽤ Local-Global 聚合优化的前提,需要开启 Mini Batch 功能,下⾯是代码使⽤ Local-Global 功能:
// instantiate table environmentTableEnvironment tEnv = ...// access flink configurationConfiguration configuration = Config().getConfiguration();// s et low-level key-value optionsconfiguration.setString("abled", "true"); // local-global aggregation depends on mini-batch is enabledc onfiguration.setString("ini-batch.allow-latency", "5 s");configuration.setString("ini-batch.size", "5000");configuration.setString("table .optimizer.agg-phase-strategy", "TWO_PHASE");
三、Split Distinct Aggregation
Local-Global 聚合类参数,能够解决⾮ Distinct 类的聚合场景数据倾斜问题,却⽆法解决 Distinct 类聚合场景,因为 Distinct 需要记住之前的原始数据,进⾏去重。下⾯是可能存在 Distinct 类数据倾斜聚合的 SQL 语句:
SELECT day, COUNT(DISTINCT user_id)FROM TGROUP BY day
由于 day ⼀般是当天的⽇期,所以这种情况,day 相同的数据都会到同⼀个 TaskManager 上⾯去,最终造成实时任务热点。Flink 内置的 Distinct 聚合优化参数table.optimizer.abled,通过将 Key 相同的记录,分到不同的 BUCKET(桶) 中去,BUCKET
默认数量为 1024,可以通过参数table.optimizer.distinct-agg.split.bucket-num 配置,配置 Split Distinct 聚合优化参数后,上⾯ SQL 会被转成:
SELECT day,SUM(cnt)FROM (    SELECT day,COUNT(DISTINCT user_id) as cnt    FROM T    GROUP BY day,MOD(HASH_CODE(user_id),1024))GR OUP BY day
在 day 相同的情况下,通过对 user_Id hash 取模,尽可能把消息打散到多个桶中,多个桶有分散在不同的 TaskManager,可以确定的是,user_id 相同的记录肯定会到同⼀ TaskManager 上⾯进⾏进⾏聚合。
下图是使⽤ Local Global 聚合参数和Split Distinct 聚合优化参数⽰意图:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。