MapReduce实现两表的Join--原理及python和java代码实现⽤Hive⼀句话搞定的,但是有时必须要⽤mapreduce
⽅法介绍
1. 概述
在传统数据库(如:MYSQL)中,JOIN操作是⾮常常见且⾮常耗时的。⽽在HADOOP中进⾏JOIN操作,同样常见且耗时,由于Hadoop 的独特设计思想,当进⾏JOIN操作时,有⼀些特殊的技巧。
本⽂⾸先介绍了Hadoop上通常的JOIN实现⽅法,然后给出了⼏种针对不同输⼊数据集的优化⽅法。
2. 常见的join⽅法介绍
假设要进⾏join的数据分别来⾃File1和File2.
2.1 reduce side join
reduce side join是⼀种最简单的join⽅式,其主要思想如下:
在map阶段,map函数同时读取两个⽂件File1和File2,为了区分两种来源的key/value数据对,对每条数
据打⼀个标签(tag),⽐如:
tag=0表⽰来⾃⽂件File1,tag=2表⽰来⾃⽂件File2。即:map阶段的主要任务是对不同⽂件中的数据打标签。
在reduce阶段,reduce函数获取key相同的来⾃File1和File2⽂件的value list, 然后对于同⼀个key,对File1和File2中的数据进⾏
join(笛卡尔乘积)。即:reduce阶段进⾏实际的连接操作。
2.2 map side join
之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同⼀个key对应的字段可能位于不同map中。Reduce side join是⾮常低效的,因为shuffle阶段要进⾏⼤量的数据传输。
Map side join是针对以下场景进⾏的优化:两个待连接表中,有⼀个表⾮常⼤,⽽另⼀个表⾮常⼩,以⾄于⼩表可以直接存放到内存中。这样,我们可以将⼩表复制多份,让每个map task内存中存在⼀份(⽐如存放到hash table中),然后只扫描⼤表:对于⼤表中的每⼀条记录key/value,在hash table中查是否有相同的key的记录,如果有,则连接后输出即可。
为了⽀持⽂件的复制,Hadoop提供了⼀个类DistributedCache,使⽤该类的⽅法如下:
(1)⽤户使⽤静态⽅法DistributedCache.addCacheFile()指定要复制的⽂件,它的参数是⽂件的URI(如果是HDFS上的⽂件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是⾃⼰配置的NameNode端⼝号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的⽂件拷贝到各个TaskTracker的本地磁盘上。(2)⽤户使⽤LocalCacheFiles()⽅法获取⽂件⽬录,并使⽤标准的⽂件读写API读取相应的⽂件。
2.3 SemiJoin
2.4 reduce side join + BloomFilter
3. ⼆次排序
在Hadoop中,默认情况下是按照key进⾏排序,如果要按照value进⾏排序怎么办?即:对于同⼀个key,reduce函数接收到的value list 是按照value排序的。这种应⽤需求在join操作中很常见,⽐如,希望相同的key中,⼩表对应的value排在前⾯。
有两种⽅法进⾏⼆次排序,分别为:buffer and in memory sort和 value-to-key conversion。
对于buffer and in memory sort,主要思想是:在reduce()函数中,将某个key对应的所有value保存下来,然后进⾏排序。 这种⽅法最⼤的缺点是:可能会造成out of memory。
对于value-to-key conversion,主要思想是:将key和部分value拼接成⼀个组合key(实现WritableComparable接⼝或者调⽤setSortComparatorClass函数),这样reduce获取的结果便是先按key排序,后按value排序的结果,需要注意的是,⽤户需要⾃⼰实现Paritioner,以便只按照key进⾏数据划分。Hadoop显式的⽀持⼆次排序,在Configuration类中有个setGroupingComparatorClass()⽅法,可⽤于设置排序group的key值,
reduce-side-join python代码
hadoop有个⼯具叫做steaming,能够⽀持python、shell、C++、PHP等其他任何⽀持标准输⼊stdin及标准输出stdout的语⾔,其运⾏原理可以通过和标准java的map-reduce程序对⽐来说明:
使⽤原⽣java语⾔实现Map-reduce程序
1. hadoop准备好数据后,将数据传送给java的map程序
2. java的map程序将数据处理后,输出O1
3. hadoop将O1打散、排序,然后传给不同的reduce机器
4. 每个reduce机器将传来的数据传给reduce程序
5. reduce程序将数据处理,输出最终数据O2
借助hadoop streaming使⽤python语⾔实现Map-reduce程序
1. hadoop准备好数据后,将数据传送给java的map程序
2. java的map程序将数据处理成“键/值”对,并传送给python的map程序
3. python的map程序将数据处理后,将结果传回给java的map程序
4. java的map程序将数据输出为O1
5. hadoop将O1打散、排序,然后传给不同的reduce机器
6. 每个reduce机器将传来的数据处理成“键/值”对,并传送给python的reduce程序
7. python的reduce程序将数据处理后,将结果返回给java的reduce程序
8. java的reduce程序将数据处理,输出最终数据O2
上⾯红⾊表⽰map的对⽐,蓝⾊表⽰reduce的对⽐,可以看出streaming程序多了⼀步中间处理,这样
说来steaming程序的效率和性能应该低于java版的程序,然⽽python的开发效率、运⾏性能有时候会⼤于java,这就是streaming的优势所在。
hadoop之实现集合join的需求
hadoop是⽤来做数据分析的,⼤都是对集合进⾏操作,因此该过程中将集合join起来使得⼀个集合能得到另⼀个集合对应的信息的需求⾮常常见。
⽐如以下这个需求,有两份数据:学⽣信息(学号,姓名)和学⽣成绩(学号、课程、成绩),特点是有个共同的主键“学号”,现在需要将两者结合起来得到数据(学号,姓名,课程,成绩),计算公式:
(学号,姓名) join (学号,课程,成绩)= (学号,姓名,课程,成绩)
数据事例1-学⽣信息:
学号sno姓名name
01name1
02name2
03name3
04name4
数据事例2:-学⽣成绩:
学号sno课程号courseno成绩grade
010180
010290
020182
020295
期待的最终输出:
学号sno姓名name课程courseno成绩grade
01name10180
01name10290
02name20182
02name20295
实现join的注意点和易踩坑总结
如果你想写⼀个完善健壮的map reduce程序,我建议你⾸先弄清楚输⼊数据的格式、输出数据的格式,然后⾃⼰⼿动构建输⼊数据并⼿动计算出输出数据,这个过程中你会发现⼀些写程序中需要特别处理的地⽅:
1. 实现join的key是哪个,是1个字段还是2个字段,本例中key是sno,1个字段
2. 每个集合中key是否可以重复,本例中数据1不可重复,数据2的key可以重复
3. 每个集合中key的对应值是否可以不存在,本例中有学⽣会没成绩,所以数据2的key可以为空
第1条会影响到hadoop启动脚本中key.fields和partition的配置,第2条会影响到map-reduce程序中具体的代码实现⽅式,第3条同样影响代码编写⽅式。
hadoop实现join操作的思路
具体思路是给每个数据源加上⼀个数字标记label,这样hadoop对其排序后同⼀个字段的数据排在⼀起并且按照label排好序了,于是直接将相邻相同key的数据合并在⼀起输出就得到了结果。
1、 map阶段:给表1和表2加标记,其实就是多输出⼀个字段,⽐如表⼀加标记为0,表2加标记为2;
2、 partion阶段:根据学号key为第⼀主键,标记label为第⼆主键进⾏排序和分区
3、 reduce阶段:由于已经按照第⼀主键、第⼆主键排好了序,将相邻相同key数据合并输出
hadoop使⽤python实现join的map和reduce代码
mapper.py的代码:
python转java代码
1 2# -*- coding: utf-8 -*-#Mapper.py
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30import os
import sys
#mapper脚本
def mapper():
#获取当前正在处理的⽂件的名字,这⾥我们有两个输⼊⽂件
#所以要加以区分
viron["map_input_file"]
filename=os.path.split(filepath)[-1]
for line in sys.stdin:
if line.strip()=="":
continue
fields=line[:-1].split("\t")
sno=fields[0]
#以下判断filename的⽬的是不同的⽂件有不同的字段,并且需加上不同的标记if filename=='data_info':
name=fields[1]
#下⾯的数字'0'就是为数据源1加上的统⼀标记
print '\t'.join((sno,'0',name))
elif filename=='data_grade':
courseno=fields[1]
grade=fields[2]
#下⾯的数字'1'就是为数据源1加上的统⼀标记
print '\t'.join((sno,'1',courseno,grade))
if __name__=='__main__':
mapper()
reducer的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33# -*- coding: utf-8 -*-
#reducer.py
import sys
def reducer():
#为了记录和上⼀个记录的区别,⽤lastsno记录上个sno
lastsno=""
for line in sys.stdin:
if line.strip()=="":
continue
fields=line[:-1].split("\t")
sno=fields[0]
'''
处理思路:
遇见当前key与上⼀条key不同并且label=0,就记录下来name值,
当前key与上⼀条key相同并且label==1,则将本条数据的courseno、grade联通上⼀条记录的name⼀起输出成最终结果
'''
if sno!=lastsno:
name=""
#这⾥没有判断label==1的情况,
#因为sno!=lastno,并且label=1表⽰该条key没有数据源1的数据
if fields[1]=="0":
name=fields[2]
elif sno==lastno:
#这⾥没有判断label==0的情况,
#因为sno==lastno并且label==0表⽰该条key没有数据源2的数据
if fields[2]=="1":
courseno=fields[2]
grade=fields[3]
if name:
print '\t'.join((lastsno,name,courseno,grade))
34 35 36 37 38lastsno=sno
if __name__=='__main__': reducer()
使⽤shell脚本启动hadoop程序的⽅法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23#先删除输出⽬录
~/hadoop-client/hadoop/bin/hadoop fs-rmr/hdfs/jointest/output #注意,下⾯配置中的环境值每个⼈机器不⼀样
~/hadoop-client/hadoop/bin/hadoop streaming \
-D mapred.map.tasks=10\
-duce.tasks=5\
-D mapred.job.map.capacity=10\
-D duce.capacity=5\
-D mapred.job.name="join--sno_name-sno_courseno_grade"\
-D num.key.fields.for .partition=1\
-D stream.num.map.output.key.fields=2\
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input "/hdfs/jointest/input/*"\
-output "/hdfs/jointest/output"\
-mapper "python26/bin/python26.sh mapper.py"\
-reducer "python26/bin/python26.sh reducer.py"\
-file "mapper.py"\
-file "reducer.py"\
-cacheArchive "/share/#python26"
#看看运⾏成功没,若输出0则表⽰成功了
echo $?
可以⾃⼰⼿⼯构造输⼊输出数据进⾏测试,本程序是验证过的。
更多需要注意的地⽅
hadoop的join操作可以分为很多类型,各种类型脚本的编写有所不同,其分类是按照key字段数⽬、value字段数⽬、key是否可重复来划分的,以下是⼀个个⼈总结的对照表,表⽰会影响的地⽅:
影响类
影响的范围
key字段数⽬1、启动脚本中num.key.fields.for.partition的配置2、启动脚本中stream.num.map.output.key.fields的配置
3、map和reduce脚本中key的获取
4、map和reduce脚本中每⼀条数据和上⼀条数据⽐较的⽅法key是否可重复如果数据源1可重复,标记为M;数据源2可重复标记为N,那么join可以分为:1*1、M*1、M*N类型
1*1类型:reduce中先记录第⼀个value,然后在下⼀条直接合并输出;
M*1类型:将类型1作为标记⼩的输出,然后每次遇见label=1就记录value,每遇见⼀次label=2就输出⼀次最终结果;
M*N类型:遇见类型1,就⽤数组记录value值,遇见label=2就将将记录的数组值全部连同该⾏value输出。value字段数⽬影响每次
label=1时记录的数据个数,需要将value都记录下来
reduce-side-join java代码

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。