记⼀次Spark引擎执⾏Sql超时优化
前⼏天⽤spark引擎执⾏了⼀个较⼤的sql,涉及的表和数据量都不少,不同时间段执⾏了⼏次都超时,经过上⽹及分析,尝试解决了此问题,使⽤spark引擎测试⼏次⼤概都在半个⼩时左右完成,不再出现超时现象sql优化的几种方式
⼀、问题现象
摘抄部分现场⽇志如下:
2022-02-01 13:02:12 INFO 22/02/01 13:02:12 [dag-scheduler-event-loop] INFO DAGScheduler: ShuffleMapStage 28 (run at ThreadPoolExecutor.java:1149) finished in 569.587 s
2022-02-01 13:02:12 INFO 22/02/01 13:02:12 [dag-scheduler-event-loop] INFO DAGScheduler: looking for newly runnable stages
2022-02-01 13:02:12 INFO 22/02/01 13:02:12 [dag-scheduler-event-loop] INFO DAGScheduler: running:
Set(ShuffleMapStage 13, ShuffleMapStage 5, ShuffleMapStage 22, ShuffleMapStage 14)
2022-02-01 13:02:12 INFO 22/02/01 13:02:12 [dag-scheduler-event-loop] INFO DAGScheduler: failed: Set()
Caused by: org.apache.spark.SparkException: Could not execute broadcast in 600 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or spark.sql.broadcastMaxRetries or disable broadcast join by setting
spark.sql.autoBroadcastJoinThreshold to -1
2022-02-01 13:02:14 WARNING spark-sql运⾏异常!
2022-02-01 13:02:14 WARNING 将会使⽤hive重试!
⼆、分析问题
从上述⽇志中可以看出在ShuffleMapStage阶段,也就是ShuffleRead阶段,在Driver在向各个Executor⼴播输⼊数据时候,出现了超时现象,容错机制⾃动启动了hive引擎重新执⾏,hive引擎经过数较长⼀段时间完成了,但也出现了reduce阶段的数据倾斜。
既然出现了超时,要么增加超时时间,设置更长的超时时间,要么重试⼀下,也可能是当时⽹络原因导致的,要么在超时时间内增⼤吞吐处理能⼒
简单从以下为以下⼏个⽅⾯着⼿:
1、适当增加超时时间
2、适当增加重试次数
3、加快处理速度
4、禁⽤BroadcastJoin
看到⽇志中给出 disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -
1,spark.sql.autoBroadcastJoinThreshold参数默认值是10M,⼩于设置的此值⼩表将不能被⼴播,将,设置为-1是禁⽤了BroadcastJoin⽅式⼴播
此⽅式没有尝试过,⾄于禁⽤BroadcastJoin后,会使⽤什么⽅式join,这就涉及了Spark的⼏种join实现直接的关系,后续有时间再探讨这个
三、解决⽅案
根据以上简单分析,⼤致有以下三种解决⽅案
1、适当增加超时时间
spark.sql.broadcastTimeout=800
不过可能需要反复多调试⼏次,因为不知道多长时间合适,也就是常说的以时间换空间的⽅案
2、适当增加重试次数
spark.sql.broadcastMaxRetries=3
此种适合偶然现象,可能是因为集中当时太多任务在运⾏,⽹络传输较为繁忙,数据输⼊较慢
3、加快处理速度(推荐)
这⾥通过优化调整参数来提升处理速度,开启MapJoin参数设置,在Map阶段完成join
(1)开启Mapjoin
vert.join = true;
该参数开启了在map侧join,避免在reduce侧join,默认值true
(2)设置Mapjoin⼩表的阈值
hive.mapjoin.smalltable.filesize=50000000;
该参数决定在map侧join的具体标准,默认值25000000=25M,不超此值则开启map侧优化 ,具体多少合适需要根据⾃⼰表数据量的⼤⼩设置并反复调试
(3)开启worker节点缓存
ditionaltask = true;
该参数默认值为true ,⽤来来将⼀个⼩表变成hashtable然后作为分布式缓存⽂件分发到各个worker节点,进⽽实现在Map侧的连接优化机制
(4)设置worker节点缓存⼩表阈值
ditionaltask.size = 15000000;
该参数主要⽤来决定worker节点缓存的具体标准,默认值10000000=10M。
如果参与连接的n个表(或分区)中的n-1 的⼤⼩总和⼩于这个参数的值,即不超过此值,hive将启⽤⼩
表在worker缓存,实现Map侧的连接优化机制。
具体多少合适需要根据⾃⼰表数据量的⼤⼩设置并反复调试
如有不正之处,还请留⾔指出,感谢批评!

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。