hive中的两表join
作为数据分析中经常进⾏的join 操作,传统DBMS 数据库已经将各种算法优化到了极致,⽽对于hadoop 使⽤的mapreduce 所进⾏的join 操作,去年开始也是有各种不同的算法论⽂出现,讨论各种算法的适⽤场景和取舍条件,本⽂讨论hive 中出现的⼏种join 优化,然后讨论其他算法实现,希望能给使⽤hadoop 做数据分析的开发⼈员提供⼀点帮助.
Facebook 今年在yahoo 的hadoop summit ⼤会上做了⼀个关于最近两个版本的hive 上所做的⼀些join 的优化,其中主要涉及到hive 的⼏个关键特性: 值分区 , hash 分区 , map join , index ,
Common Join
最为普通的join策略,不受数据量的⼤⼩影响,也可以叫做reduce side join ,最没效率的⼀种join ⽅式. 它由⼀个mapreduce job 完成.
⾸先将⼤表和⼩表分别进⾏map 操作, 在map shuffle 的阶段每⼀个map output key 变成了table_name_tag_prefix + join_column_value , 但是在进⾏partition 的时候它仍然只使⽤join_column_value 进⾏hash.
每⼀个reduce 接受所有的map 传过来的split , 在reducce 的shuffle 阶段,它将map output key 前⾯的ta
ble_name_tag_prefix 给舍弃掉进⾏⽐较. 因为reduce 的个数可以由⼩表的⼤⼩进⾏决定,所以对于每⼀个节点的reduce ⼀定可以将⼩表的split 放⼊内存变成hashtable. 然后将⼤表的每⼀条记录进⾏⼀条⼀条的⽐较.
Map Join
Map Join 的计算步骤分两步,将⼩表的数据变成hashtable⼴播到所有的map 端,将⼤表的数据进⾏合理的切分,然后在map 阶段的时候⽤⼤表的数据⼀⾏⼀⾏的去探测(probe) ⼩表的hashtable. 如果join key 相等,就写⼊HDFS.
map join 之所以叫做map join 是因为它所有的⼯作都在map 端进⾏计算.
hive 在map join 上做了⼏个优化:
hive 0.6 的时候默认认为写在select 后⾯的是⼤表,前⾯的是⼩表,或者使⽤ /*+mapjoin(map_table) */ 提⽰进⾏设定. hive 0.7 的时候这个计算是⾃动化的,它⾸先会⾃动判断哪个是⼩表,哪个是⼤表,这个参数由(vert.join=true)来控制. 然后控制⼩表的⼤⼩由(hive.smalltable.filesize=25000000L)参数控制(默认是25M),当⼩表超过这个⼤⼩,hive 会默认转化成common join. 你可以查看HIVE-1642.
⾸先⼩表的Map 阶段它会将⾃⼰转化成MapReduce Local Task ,然后从HDFS 取⼩表的所有数据,将⾃⼰转化成Hashtable file 并压缩打包放⼊DistributedCache ⾥⾯.
⽬前hive 的map join 有⼏个限制,⼀个是它打算⽤BloomFilter 来实现hashtable , BloomFilter ⼤概⽐hashtable 省8-10倍的内存, 但是BloomFilter 的⼤⼩⽐较难控制.
现在DistributedCache ⾥⾯hashtable默认的复制是3份,对于⼀个有1000个map 的⼤表来说,这个数字太⼩,⼤多数map 操作都等着DistributedCache 复制.
Bucket Map Join
hive 建表的时候⽀持hash 分区通过指定clustered by (col_name,xxx ) into number_buckets buckets 关键字.
当连接的两个表的join key 就是bucket column 的时候,就可以通过
hive.optimize.bucketmapjoin= true
来控制hive 执⾏bucket map join 了, 需要注意的是你的⼩表的number_buckets 必须是⼤表的倍数. ⽆论多少个表进⾏连接这个条件都必须满⾜. (其实如果都按照2的指数倍来分bucket, ⼤表也可以是⼩表
的倍数,不过这中间需要多计算⼀次,对int 有效,long 和string 不清楚) Bucket Map Join 执⾏计划分两步,第⼀步先将⼩表做map 操作变成hashtable 然后⼴播到所有⼤表的map端,⼤表的map端接受
了number_buckets 个⼩表的hashtable并不需要合成⼀个⼤的hashtable,直接可以进⾏map 操作,map 操作会产⽣number_buckets 个split,每个split 的标记跟⼩表的hashtable 标记是⼀样的, 在执⾏projection 操作的时候,只需要将⼩表的⼀个hashtable 放⼊内存即可,然后将⼤表的对应的split 拿出来进⾏判断,所以其内存限制为⼩表中最⼤的那个hashtable 的⼤⼩.
Bucket Map Join 同时也是Map Side Join 的⼀种实现,所有计算都在Map 端完成,没有Reduce 的都被叫做Map Side Join ,Bucket 只是hive 的⼀种hash partition 的实现,另外⼀种当然是值分区.
create table a (xxx) partition by (col_name)
不过⼀般hive 中两个表不⼀定会有同⼀个partition key, 即使有也不⼀定会是join key. 所以hive 没有这种基于值的map side join, hive 中的list partition 主要是⽤来过滤数据的⽽不是分区. 两个主要参数为(hive.optimize.cp = true 和 hive.optimize.pruner=true)
sql left join 多表连接hadoop 源代码中默认提供map side join 的实现, 你可以在hadoop 源码的src/contrib/data_join/src ⽬录下到相关的⼏个类. 其中TaggedMapOutput 即可以⽤来实现hash 也可以实现list , 看你⾃⼰决定
怎么分区. Hadoop Definitive Guide 第8章关于map side join 和side data distribution 章节也有⼀个例⼦⽰例怎样实现值分区的map side join.
Sort Merge Bucket Map Join
Bucket Map Join 并没有解决map join 在⼩表必须完全装载进内存的限制, 如果想要在⼀个reduce 节点的⼤表和⼩表都不⽤装载进内存,必须使两个表都在join key 上有序才⾏,你可以在建表的时候就指定sorted by join key 或者使⽤index 的⽅式.
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
Bucket columns == Join columns == sort columns
这样⼩表的数据可以每次只读取⼀部分,然后还是⽤⼤表⼀⾏⼀⾏的去匹配,这样的join 没有限制内存的⼤⼩. 并且也可以执⾏全外连接.
Skew Join
真实数据中数据倾斜是⼀定的, hadoop 中默认是使⽤
也就是每个节点的reduce 默认是处理1G⼤⼩的数据,如果你的join 操作也产⽣了数据倾斜,那么你可以在hive 中设定
set hive.optimize.skewjoin = true;
set hive.skewjoin.key = skew_key_threshold (default = 100000)
hive 在运⾏的时候没有办法判断哪个key 会产⽣多⼤的倾斜,所以使⽤这个参数控制倾斜的阈值,如果超过这个值,新的值会发送给那些还没有达到的reduce, ⼀般可以设置成你
(处理的总记录数/reduce个数)的2-4倍都可以接受.
倾斜是经常会存在的,⼀般select 的层数超过2层,翻译成执⾏计划多于3个以上的mapreduce job 都很容易产⽣倾斜,建议每次运⾏⽐较复杂的sql 之前都可以设⼀下这个参数. 如果你不知道设置多少,可以就按官⽅默认的1个reduce 只处理1G 的算法,那么
skew_key_threshold = 1G/平均⾏长. 或者默认直接设成250000000 (差不多算平均⾏长4个字节)
Left Semi Join
hive 中没有in/exist 这样的⼦句,所以需要将这种类型的⼦句转成left semi join. left semi join 是只传递表的join key给map 阶段 , 如果key ⾜够⼩还是执⾏map join, 如果不是则还是common join.
join 策略中的难点
⼤多数只适合等值连接(equal join) ,
范围⽐较和全外连接没有合适的⽀持
提前分区,零时分区,排序,多种不同执⾏计划很难评价最优⽅案.
没有考虑IO ⽐如临时表,⽹络消耗和⽹络延迟时间,CPU时间,
最优的⽅案不代表系统资源消耗最少.
参考资料:
Join Strategy in Hive
Join Optimization
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论