SparkSql性能调优问题
⾸先我的业务场景是对⼤量的数据(百万级)进⾏cpu密集型的计算,⼀次全量计算需要8个⼩时左右。计算结果分别简单处理后写⼊hive 和Es。在使⽤spark sql处理时遇到了两个性能问题:
1. 由于单次计算⾮常耗时,因此使⽤dataframe.cache()后再分别写⼊hive和ES,但实际运算了两遍,缓存没有按预想的⽣效。
2. 全量计算⾮常耗时,因此基于业务特点只对增量数据运算。使⽤了case when, 和spark.sql.function中的when otherwise逻辑上做了增量计算,但实际还是全量计算。
# cpu密集型计算逻辑
def cpu_bound_compute(content):
return compute(content)
isterFunction("cpu_bound_compute", cpu_bound_compute)
sql = """
select uid, content, cpu_bound_compute(content) computed, time
from source_table
where date={date}
""".format(date=current_date)
# step1.运算并缓存结果
data_frame = hive_context.sql(sql).cache()
# step2.创建视图并写⼊hive
ateTempView("view_table")
insert_sql = """
insert overwrite table sink_table partition(date='{date}')
select *
from view_table
""".format(date=current_date)
# step3.写⼊es
data_frame.write("es")
上⾯是产⽣性能问题的简化样例代码,通过运⾏时的DAG图以及Stages表,可以很清晰的看出在step2和step3中均进⾏了step1的计算,最终耗时16个⼩时!!
问题1排查:
# step1.运算并缓存结果
data_frame = hive_context.sql(sql).cache()
# step2.触发计算逻辑
unt()
# step3.测试缓存是否失效
ateTempView("view_table")
insert_sql = """
insert overwrite table sink_table partition(date='{date}')
select *
from view_table
""".format(date=current_date)
# step4.测试缓存是否失效
unt()
为了排查问题,将程序改造如上图并运⾏,这时通过DAG图看到,step3时缓存⽣效,跳过了step1的计算逻辑;但到step4⼜开始重复step1的计算,说明缓存在此失效。基于这些可以推测,createTempView()函数的运⾏会导致缓存的失效。因此对dataframe的操作置于视图操作之前,才能避免缓存失效的问题。
总结:所有cache后的dataframe操作需要放在视图操作之前来避免缓存失效
问题2排查:
解决1的问题后,计算耗时缩短到8个⼩时,但依旧时间太久了。我们业务数据的特点是每天都是全量数据,当天数据⽐之前数据新增在⼗万左右,也就是90%的运算是重复的,只要把运算改成增量,理论上可以缩短90%的时间。因此,将运算逻辑改造如下:
sql = """
select a.uuid, content,
case when coumputed is null then cpu_bound_compute(content)
else computed
end as computed,
time
from (
select uuid, content,time
from source_table
where date={date}
) a left join (
select uuid, computed
from sink_table
where date={one_date_ago}
) on a.uuid = b.uuid
""".format(date=current_date, one_date_ago=one_date_ago)
sql优化的几种方式## 当然这⾥也可以不在sql中进⾏增量计算,⽽是使⽤spark.sql.function中的when otherwise函数,
## 其逻辑和最终效果和case when⼀致,这⾥不再赘述
上图计算逻辑运⾏后,并没有按预期那样缩短时间,,其DAG图和问题⼀优化后的DAG图⼏乎⼀样,最后的运⾏时长也相差⽆⼏。原因细节未知,但可以猜测这种条件式增量逻辑依然会全量处理。因此采⽤了将两类数据分开处理的⽅式,优化效果显著,运⾏时长缩短到半个⼩时。 优化代码如下:
sql = """
with base as(
select a.uuid, content,  computed, time
from (
select uuid, content,time
from source_table
where date={date}
) a left join (
select uuid, computed
from sink_table
where date={one_date_ago}
) on a.uuid = b.uuid
)
select uuid, content, computed, time
from base
where computed is not null
union all
select uuid, content, cpu_bound_compute(content) computed, time
from base
where computed is not null
""".format(date=current_date, one_date_ago=one_date_ago)
综上,本⽂主要处理了两个spark sql隐藏的性能上的坑:
1. spark sql中对df的缓存会在在执⾏createTempView("view") 视图操作后失效
2.case when或when otherwise等条件函数⽆法分离数据运算,达到性能优化的效果

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。