你好,游客 登录
背景:
阅读新闻

Spark面试经典系列之数据倾斜

[日期:2017-09-13] 来源:tom_8899_li的博客   作者: [字体: ]

Spark面试经典系列之数据倾斜

Spark面试经典系列之数据倾斜:数据倾斜之痛

1、Spark性能真正的杀手
2、数据倾斜之痛

数据倾斜两大直接致命性的后果:
1、OOM,一般OOM都是由于数据倾斜所致
2、速度变慢

数据倾斜基本形态特征:个别Task处理大量数据

数据倾斜的定位:
1、Web UI,可以清晰看见哪些Task运行的数据量大小
2、Log,Log的一个好处是可以清晰的告诉是哪一行出现问题OOM,同时可以清晰的看到在具体哪个Stage出现了数据倾斜(数据倾斜一般会在 Shuffle过程中产生的),从而定位具体Shuffle的代码。也有可能发现绝大多数Task非常快,但是个别Task非常慢。
3、代码走读,重点看join、groupByKey、reduceByKey等关键代码
4、对数据特征分布进行分析;

解决原理和方法总论

1、Spark数据倾斜解决的原理总论
2、Spark数据倾斜解决方法总论

使数据膨胀,tachyon,复用RDD

Map端Reduce及问题思考

1、Spark数据倾斜解决之Map端Reduce
2、Map端Reduce的问题思考

图137-1 数据倾斜解决案例

给小的一段broadcast,然后在大的一端使用mapPartition。
如果数据量太大,有可能引起OOM

采样分而治之解决方案

1、采样算法解决数据倾斜的思想
2、采样算法在Spark数据倾斜中的具体操作

某个或某几个Key的Value非常大,从而导致数据倾斜
RDD1和RDD2进行join操作,其中我们采用采样的方式发现RDD1中有严重的数据倾斜的Key
第一步:采用Spark RDD中提供的采样接口,我们可以很方便的对全体(例如100亿条数据)进行采样,然后基于采样的数据,我们可以计算出哪个(哪些)Key的Value个数最多;
第二步:把全体数据分成两部分,即把原来RDD1变成RDD11和RDD12,其中RDD11代表导致数据倾斜的Key,RDD12中包含的是不会产生数据倾斜的Key;
第三步:把RDD11和RDD2进行join操作,且把RDD12和RDD2进行join操作,然后把分别join操作后的结果进行Union操作,从而得出和RDD1与RDD2直接进行join操作相同的结果

Spark自己的机制保证的不会产生数据倾斜。

图138-1 数据倾斜采样分而治之解决方案

上述流程中:
第一种情况:如果RDD11中的数据量不是很多,可以采用map端的join操作,避免了shuffle和数据倾斜。
第二种情况:如果RDD11中的数据量特别多,此时之所以能够缓解数据倾斜是因为采用了Spark Core天然的并行机制对RDD11中的同样一个Key的数据进行了拆分。从而达到让原本倾斜的Key分散到不同的Task的目的,就缓解了数据倾斜。

思考:在上述过程中如果把倾斜的Key加上随机数,会怎么样?
增加随机数,并行Task数量可能增加,具体是如何操作的?
RDD11中倾斜的Key加上1000以内的随机数,然后和RDD2进行join操作?不行!此时一定需要把RDD11中的Key在RDD2中的相同的 Key进行1000以内的随机数,然后再进行join操作,这样做的好处:让倾斜的Key更加不倾斜,在实际生产环境下,会极大的解决在两个进行join 的RDD数量都很大且其中一个RDD有一个或者两三个明显倾斜的Key的情况下的数据倾斜问题。

对于两个RDD数据量都很大且倾斜的Key特别多如何解决?

1、数据量都很大且倾斜的Key多的情况
2、此种情况下具体操作步骤

两个RDD数据都特别多且倾斜的Key成千上万个,该如何解决数据倾斜的问题?
初步的想法:在倾斜的Key上面加上随机数
该想法的原因:shuffle的时候把key的数据分到不同的task里去

但是现在的倾斜的key非常多,成千上万,所以如果说采样找出倾斜的key的话并不是一个非常好的想法

扩容?
首先,什么是扩容?就是把该RDD中的每一条数据变成5条、10条、20条等,例如RDD中原来是10亿条数据,扩容后可能变成1000亿条数据;
其次,如何做到扩容?flatMap中对要进行扩容的每一条数据都通过0~N-1个不同的前缀变成N条数据(例如变成)
问题:N的值可以随便取吗?需要考虑当前程序能够使用的Core的数量
答案:N的数值一般不能取的太大,通常小于50,否则会对磁盘、内存、网络都会形成极大负担,例如会造成OOM

N这个数值取成10和1000除了OOM等不同以外,是否还有其他影响呢?其实N的数值的大小还会对数据倾斜的解决程度构成直接的影响!N越大,越不容易倾斜,但是也会占用更多的内存、磁盘、网络以及(不必要的)消耗更多的CPU时间

模拟代码:
RDD1 join RDD2

rdd22 = RDD2.flatMap{
for(0 to 9) {
0_item
}
}

rdd11 = RDD1.map{
Random(10)
random_item
}

result = rdd11.join(rdd22)

result.map{
item_1.split 去掉前缀
}

并行度的深度使用

1、并行度的初级使用
2、并行度的高级使用

用并行度解决数据倾斜的基本应用:例如reduceByKey
改变并行度之所以能够改善数据倾斜的原因在于,如果某个Task有100个Key且数据量特别大,就极有可能导致OOM或者任务运行特别缓慢,此时如果把 并行度变大,则可以分解该Task的数据量,例如把原本该Task的100个Key分解给10个Task,这个就可以减少每个Task的数据量,从而有可 能解决OOM和任务慢的问题。
对于reduceByKey而言,你可以传入并行度的参数,也可以自定义Partitioner
增加Executor:改变计算资源,从仅仅数据倾斜的角度来看并不能够直接去解决数据倾斜的问题,但是也有好处,好处是可以同时并发运行更多的Task,结果是可能加快了运行速度。

用并行度解决数据倾斜的高级使用:例如reduceByKey
假设说有倾斜的Key,我们给所有的Key加上一个随机数,然后进行reduceByKey操作;此时同一个Key会有不同的随机数前缀,在进行 reduceByKey操作的时候原来的一个非常大的倾斜的Key就分而治之变成若干个更小的Key,不过此时结果和原来不一样,怎么破?进行map操 作,目的是把随机数前缀去掉,然后再次进行reduceByKey操作。(当然,如果你很无聊,可以再次做随机数前缀),这样我们就可以把原本倾斜的 Key通过分而治之方案分散开来,最后又进行了全局聚合,在这里的本质还是通过改变并行度去解决数据倾斜的问题。

解决方案的“银弹”是什么?

1、数据倾斜解决方案总结
2、方案之外的方案
3、数据倾斜解决方案的“银弹”?

逃离Spark技术本身之外如何解决数据倾斜的问题?
之所以会有这样的想法,是因为从结果上来看,数据倾斜的产生来自于数据和数据的处理技术,前面几节课和大家分享都是数据的处理技术层面如何解决数据倾斜,因此,我们现在需要回到数据的层面去解决数据倾斜的问题。
数据本身就是Key-Value的存在方式,所谓的数据倾斜就是说某(几)个Key的Values特别多,所以如果要解决数据倾斜,实质上是解决单一的Key的Values的个数特别多的情况。新的数据倾斜解决方案由此诞生了。
1、把一个大的Key-Values数据分解成为Key-subKey-Values的方式
2、预先和其他的表进行join,将数据倾斜提前到上游的Hive ETL
3、可以把大的Key-Values中的Values组拼成为一个字符串,从而形成只有一个元素的Key-Value。
4、加一个中间适配层,当数据进来的时候进行Key的统计和动态排名,基于该排名动态调整Key分布

假如10万个Key都发生了数据倾斜,如何解决呢?此时一般就是加内存和Core

收藏 推荐 打印 | 阅读:
相关新闻       spark面试题  数据倾斜