> 文章列表 > SparkSQL倾斜处理

SparkSQL倾斜处理

SparkSQL倾斜处理

  1. 过滤倾斜的key,比如业务中爬虫异常key,null
    可以使用df.sample(n)抽样找出倾斜异常key,再df.filter(func)过滤掉倾斜key如果key中的null倾斜需要保留,我们可以将null数据单独拿出来进行处理,比如:
-- 假设a.mobile存在大量的null倾斜
select a.id,a.mobile,b.fields
from table_a
left join table_b
on nvl(a.mobile,rand()) = b.mobile  -- 或者使用union all改写,单独摘出来再拼接上去
select a.id,a.mobile,b.fields_name
from table_a a 
left join table_b b
on a.mobile=b.mobile  
where a.mobile is not null 
union all 
select a.id,a.mobile,null as fields_name
from table_a a
where a.mobile is null -- 过滤异常key
select groupby_field,count(1) as cnt 
from table_a
where groupby_field <> '异常key'
group by groupby_field
  1. 增加shuffle并行度,如果各个key的数据量整体差异不大,task < executor_num(executor个数) * executor_cores(每个executor的核心数),我们可以考虑增加task数量,来充分利用计算资源;spark.sql.shuffle.partitions参数可以设置并行度(默认是200),一般设置每核跑1-3个task,磁盘io时可以充分利用计算资源。
    spark中有很多算子有指定并行度参数,比如:
    textFile(xx,minnumpartition)
    sc.parallelize(xx,num)
    sc.makeRDDD(xx,num)
    sc.parallelizePairs(List[Tuple2],num)
    redduceByKey(xx,num)groupByKey(xx,num),join,distinct
    repartition,coalesce
    spark.default.parallelism在sparksql中并行度由spark.sql.shuffle.partitions决定

  2. 双重聚合,类似于hive中的groupby倾斜参数set hive.groupby.skewindata=true,用两个mr计算作业,第一个mr中的key随机分发聚合,第二个mr做全局聚合;比如:

select groupby_field,sum(cnt) as cnt   -- 全局聚合
from 
(  -- key打散聚合select ceiling(rand() * 10) as rnd  -- 添加随机数打散,groupby_field   -- 分组字段,count(1) as cnt  from table_namegroup by ceiling(rand() * 10),groupby_field
) t 
group by groupby_field
  1. reduce的joion改写成mapjoin,如果存在小表情况下,可以使用mapjoin,将小表回收到driver端,再广播到各个执行的executor上应用map关联;此场景使用于大表join小表的情况;
    这里需要注意,在外连接时,比如left join或者right join,小表是主表,mapjoin不生效
select /*+mapjoin(b)*/ a.id,b.fields_name
from table_a a
join table_b b  -- b小表
on a.id=b.id
  1. join中倾斜key单独摘出来操作。在hive中会有join倾斜参数,hive.optimize.skewjoin=true;它会将join作业拆分成两个MR,对于倾斜健部分单独摘出来使用mapjoin处理,非倾斜键走正常reduce的join。在spark中,如果倾斜键数据符合大表+小表原则,也可以使用该策略。如果倾斜健两个表的数据都比较大,大表倾斜部分同一个key添加n种前缀,小表膨胀倾斜健部分膨胀n倍,倾斜部分join,再union 非倾斜部分join
select  a.id,a.field_a,b.field_b
from 
( -- 加入随机数select id,field_a,ceiling(rand()*10) as rnd_namefrom table_awhere  a.id in ('倾斜健')
) a 
join 
( -- 数据膨胀select id,subview.rnd_name,field_bfrom table_b blateral view explode(array(1,2,3,4,5,6,7,8,9,10))  subview as rnd_namewhere b.id in ('倾斜健')
) b 
on a.id=b.id  and  a.rnd_name=b.rnd_name
union all   -- 拼接非倾斜部分的join
select  a.id,a.field_a,b.field_b
from table_a a 
join table_b b 
on a.id=b.id 
where a.id not in ('倾斜健') and b.id  not in ('倾斜健')

对于rdd计算优化,在代码层面,
如果rdd多次使用使用cache(),persist()持久化
尽量避免shuffle类算子,尽量使用有map端聚合算子,比如reduceByKey,aggregateByKey(可以自定义map端聚合函数,自定义初始记录),combineByKey(类同aggregateByKey,初始记录为rdd数据行):可以减少shuffle write数据量,shuffle读数据量,redduce端聚合次数;
尽量使用高性能算子,比如用reduceByKey取代groupByKey;使用mapPartitions取代map;数据filter过滤后使用coalse减少分区
使用广播变量,比如mapjoin