SPARK AE

2021/03/17 spark 共 3792 字,约 11 分钟

Spark AE

Spark sql 自适应执行功能 Adaptive execution

Spark AE 包含 3个功能:

动态调整执行计划: sortmergejoin 优化成broadcast join

动态处理数据倾斜: SkewJoin

自动设置shuffle partition 数

代码:

QueryExecution.prepareForExecution->PlanQueryStage

AE 总体思想是为每一个stage单独创建一个子job,子job执行完后收集该stage相关的统计信息(主要是数据量和记录数),并依据这些统计信息优化调整下游stage的执行计划

SortMergeJoin优化成BroadcastJoin

满足条件

1.SortMergeJoin

2.join类型及size (以下满足一个即可)

a.inner join 且sizeInBytes(left 或right)

<= spark.sql.adaptiveBroadcastJoinThreshold

b.left join 且sizeInBytes(right)

<= spark.sql.adaptiveBroadCastJoinThreshold

c.right join 且sizeInbytes(left)

<=spark.sql.adaptiveBroadcastJoinThreshold

full out join不能转成 broadcastjoin

动态处理join数据倾斜

执行完stage后收集每个map的shuffle数据大小和记录条数。如果某一个partition 的数据量或者记录条数超过中位数的spark.sql.adaptive.skewedPartitionFactor倍,并且大于某个预先配置的阈值,就认为这是一个数据倾斜的partition

需满足以下条件:

1.SortMergeJoin 且该Stage中只有一个Join

2.join 类型(以下满足一个即可)

a. inner join:left 和right

b.left join: left

c.right join:right

扩展:支持多个join和joinWithagg

自动设置shuffle partition数

上游stage按照 spark.sql.adaptive.maxNumPostShufflePartitions(default 500) 设置的数量生成shuffle数据

下游stage根据reduce task id 顺序遍历tasks,把数据量小的多个连续的task合并成一个task执行,多个task合并成一个task的标准是:

合并的tasks总数据量 spark.sql.adaptive.shuffle.targetPostShuffleInputSize

总记录数spark.sql.adaptive.shuffle.targetPostShuffleRowCount

假设设置的shufflePartition个数为5,在map stage结束之后,统计得到每一个partition的大小分别是70MB,30MB,20MB,10MB和50MB

设置spark.sql.adaptive.shuffle.targetPostShuffleInputSIze=64MB

Spark AE 优化效果

ShuffledHashJoin实现原理

spark中shuffled hash join 实现: shuffledHashJoinExec

目前spark 中在build hash map 时,根据join key的个数以及join key的数据类型,使用两种不同的方式来完成hash map的构建工作:

a. 只有一个join key并且是long 类型,使用LongHashRelation

b.其他情况:使用UnsafeHashedRelation

UnsafeHashedRelation 与longHashedRelation 在build hash map 时整体逻辑相同,只是在细节处理时稍有差异

LongHashedRelation

使用LongToUnsafeRowMap作为hash map用了存放build side 的所有数据

LongToUnsafeRow Map由两个Long数组组成:

Var page: Array[Long],page 用于存放所有的row 及与当前row具有相同key的next row在page中的地址:

Row1pointer1row2pointer2 ,其中pointer=offsetsize,也就是既包含了next row 在page数组中的offset,又包含了next row的size.
Var array : Array[Long],array 用于存放所有的key以及key 在page中的第一个row的地址 key1offset1size1key2offsetsize2
先将row直接存入page数组的尾部,然后通过key的hash code找到key在array 数组中的位置,如果是第一次出现的key,则array 中直接保存该key以及对应的row在page中的pointer信息,否则还需要先将array中该key对应的pointer信息(该key上一个row的pointer信息) 更新到page中,然后将当前插入到page中的row的pointer(offsetsize)更新到array 数组中该key对应的位置上

UnsafeHashedRelation

使用BytesTOBytesMap作为hash map用来存放build side的所有数据

BytesToBytesMap由两部分组成:

TaskMemoryManager:用于存放row信息

longArray:Long Array,用于存放所有的key的hashcode以及在taskMemoryManager中的address

Address1,hashcode1,address2,hashcode2

leftJoin operation

  1. left side端逐个遍历没一条记录,并通过key的hash code在array数组中找到该key对应的pointer并解析出offset和size
  2. 通过offset 可以找到next row的第一个byte的位置,通过size可以确定该row具有多少个bytes
  3. 读取完row数据后,接着解析紧跟在row后面的pointer,并重复前面的过程便可以找出该key对应的所有row数据,从而完成数据连接操作
  4. 当在hashmap中匹配不到key时,LEFT SIDE 端的记录与null row 进行join (即right side端的所有fields都用null填充)

基于AE的hashJoin

1.SortMergeJoin

2.join类型以及size

a. Inner join 且maxPartitionBytes(left 或right)

<=spark.sql.adaptiveHashJoinThreshold

b.left join 且maxPartitionBytes(right)

<=spark.sql.adaptiveHashJoinThreshold

c.right join 且maxPartitionBytes(left)

<=spark.sql.adaptiveHashJOinTHreshold

Full out join 不能转成ShuffledHashJoin

Leftjoin 中build left side

1.SortMergeJoin

2.join类型以及size

a. left join且maxPartitionBytes(left) <= sparl.sql.adaptiveHashJoinThreshold

且maxPartitionBytes(right) > spark.sql.adaptiveHashJoinThreshold

b. Right join且maxPartitionBytes(right) <=spark.sql.adaptiveHashJoinThreshold

且maxPartitionBytes(left)<=spark.sql.adaptiveHashJoinThreshold

Build hash map(left side):

使用 val nullkeyROws:ArrayBuffer[UnsafeRow]保存key=null的记录,遍历完所有记录后保存到relation中

Left join operation (A left join B)

初始化flagArray = new Array[Byte]array.length 用于记录hash map中的key是否join上

遍历right side 的所有记录,通过key的hash code在array数组中寻找该key相关的记录,如果能匹配到相应的记录,则进行join操作并修改flagArray中该key对应的标志位为1.

join操作输出的结果集 记为R1

在完成right side端所有记录的遍历后,遍历flagArray数组,找出所有标志位为0的key以及在page数组中对应的rows,将这些记录与null Row进行join生成结果集R2

将nullKeyRows中所有的记录与nullRow进行join 生成结果集R3

最后合并结果集R1,R2,R3 得到最终的结果

文档信息

Search

    Table of Contents