Spark AE
Spark sql 自适应执行功能 Adaptive execution
Spark AE 包含 3个功能:
动态调整执行计划: sortmergejoin 优化成broadcast join
动态处理数据倾斜: SkewJoin
自动设置shuffle partition 数
代码:
QueryExecution.prepareForExecution->PlanQueryStage
AE 总体思想是为每一个stage单独创建一个子job,子job执行完后收集该stage相关的统计信息(主要是数据量和记录数),并依据这些统计信息优化调整下游stage的执行计划
SortMergeJoin优化成BroadcastJoin
满足条件
1.SortMergeJoin
2.join类型及size (以下满足一个即可)
a.inner join 且sizeInBytes(left 或right)
<= spark.sql.adaptiveBroadcastJoinThreshold
b.left join 且sizeInBytes(right)
<= spark.sql.adaptiveBroadCastJoinThreshold
c.right join 且sizeInbytes(left)
<=spark.sql.adaptiveBroadcastJoinThreshold
full out join不能转成 broadcastjoin
动态处理join数据倾斜
执行完stage后收集每个map的shuffle数据大小和记录条数。如果某一个partition 的数据量或者记录条数超过中位数的spark.sql.adaptive.skewedPartitionFactor倍,并且大于某个预先配置的阈值,就认为这是一个数据倾斜的partition
需满足以下条件:
1.SortMergeJoin 且该Stage中只有一个Join
2.join 类型(以下满足一个即可)
a. inner join:left 和right
b.left join: left
c.right join:right
扩展:支持多个join和joinWithagg
自动设置shuffle partition数
上游stage按照 spark.sql.adaptive.maxNumPostShufflePartitions(default 500) 设置的数量生成shuffle数据
下游stage根据reduce task id 顺序遍历tasks,把数据量小的多个连续的task合并成一个task执行,多个task合并成一个task的标准是:
合并的tasks总数据量 spark.sql.adaptive.shuffle.targetPostShuffleInputSize
总记录数spark.sql.adaptive.shuffle.targetPostShuffleRowCount
假设设置的shufflePartition个数为5,在map stage结束之后,统计得到每一个partition的大小分别是70MB,30MB,20MB,10MB和50MB
设置spark.sql.adaptive.shuffle.targetPostShuffleInputSIze=64MB
Spark AE 优化效果
ShuffledHashJoin实现原理
spark中shuffled hash join 实现: shuffledHashJoinExec
目前spark 中在build hash map 时,根据join key的个数以及join key的数据类型,使用两种不同的方式来完成hash map的构建工作:
a. 只有一个join key并且是long 类型,使用LongHashRelation
b.其他情况:使用UnsafeHashedRelation
UnsafeHashedRelation 与longHashedRelation 在build hash map 时整体逻辑相同,只是在细节处理时稍有差异
LongHashedRelation
使用LongToUnsafeRowMap作为hash map用了存放build side 的所有数据
LongToUnsafeRow Map由两个Long数组组成:
Var page: Array[Long],page 用于存放所有的row 及与当前row具有相同key的next row在page中的地址:
| Row1 | pointer1 | row2 | pointer2 ,其中pointer=offset | size,也就是既包含了next row 在page数组中的offset,又包含了next row的size. |
| Var array : Array[Long],array 用于存放所有的key以及key 在page中的第一个row的地址 key1 | offset1 | size1 | key2 | offset | size2 |
| 先将row直接存入page数组的尾部,然后通过key的hash code找到key在array 数组中的位置,如果是第一次出现的key,则array 中直接保存该key以及对应的row在page中的pointer信息,否则还需要先将array中该key对应的pointer信息(该key上一个row的pointer信息) 更新到page中,然后将当前插入到page中的row的pointer(offset | size)更新到array 数组中该key对应的位置上 |
UnsafeHashedRelation
使用BytesTOBytesMap作为hash map用来存放build side的所有数据
BytesToBytesMap由两部分组成:
TaskMemoryManager:用于存放row信息
longArray:Long Array,用于存放所有的key的hashcode以及在taskMemoryManager中的address
Address1,hashcode1,address2,hashcode2
leftJoin operation
- left side端逐个遍历没一条记录,并通过key的hash code在array数组中找到该key对应的pointer并解析出offset和size
- 通过offset 可以找到next row的第一个byte的位置,通过size可以确定该row具有多少个bytes
- 读取完row数据后,接着解析紧跟在row后面的pointer,并重复前面的过程便可以找出该key对应的所有row数据,从而完成数据连接操作
- 当在hashmap中匹配不到key时,LEFT SIDE 端的记录与null row 进行join (即right side端的所有fields都用null填充)
基于AE的hashJoin
1.SortMergeJoin
2.join类型以及size
a. Inner join 且maxPartitionBytes(left 或right)
<=spark.sql.adaptiveHashJoinThreshold
b.left join 且maxPartitionBytes(right)
<=spark.sql.adaptiveHashJoinThreshold
c.right join 且maxPartitionBytes(left)
<=spark.sql.adaptiveHashJOinTHreshold
Full out join 不能转成ShuffledHashJoin
Leftjoin 中build left side
1.SortMergeJoin
2.join类型以及size
a. left join且maxPartitionBytes(left) <= sparl.sql.adaptiveHashJoinThreshold
且maxPartitionBytes(right) > spark.sql.adaptiveHashJoinThreshold
b. Right join且maxPartitionBytes(right) <=spark.sql.adaptiveHashJoinThreshold
且maxPartitionBytes(left)<=spark.sql.adaptiveHashJoinThreshold
Build hash map(left side):
使用 val nullkeyROws:ArrayBuffer[UnsafeRow]保存key=null的记录,遍历完所有记录后保存到relation中
Left join operation (A left join B)
初始化flagArray = new Array[Byte]array.length 用于记录hash map中的key是否join上
遍历right side 的所有记录,通过key的hash code在array数组中寻找该key相关的记录,如果能匹配到相应的记录,则进行join操作并修改flagArray中该key对应的标志位为1.
join操作输出的结果集 记为R1
在完成right side端所有记录的遍历后,遍历flagArray数组,找出所有标志位为0的key以及在page数组中对应的rows,将这些记录与null Row进行join生成结果集R2
将nullKeyRows中所有的记录与nullRow进行join 生成结果集R3
最后合并结果集R1,R2,R3 得到最终的结果
文档信息
- 本文作者:Jessica
- 本文链接:https://jessica0530.github.io/2021/03/17/SparkAE/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)