基于Hadoop的⼤数据处理系统
基于Hadoop的⼤数据处理系统
基于Hadoop的⼤数据处理系统
By
2015/11/10
0. 前⾔
伴随Internet和Web技术的飞速发展,⽹络⽇志、互联⽹搜索索引、电⼦商务、社交⽹站等技术的⼴泛使⽤带来了数据量的急剧增长。计算机技术在各⾏各业的普遍使⽤也促使⼤量数据的产⽣,如物联⽹中的传感器所产⽣的海量数据。近⼏年数据以惊⼈的速度增长,这预⽰我们⼰经进⼊⼤数据时代。⼤数据时代给我们带来的不仅是数据量的爆炸式增长、数据结构的复杂多样,⽽且也使处理这些数据信息的⼿段变的复杂起来。海量数据的存储以及分布式计算是⼤数据分析与处理的⾸要问题。
⽬前⼤数据的处理平台以Hadoop为主,Hadoop是⼀个开源的可运⾏于⼤规模集上的分布式⽂件系统和和分布式计算的基础框架,提供了对于海量数据存储以及分布式计算的⽀持。Hadoop擅长于在廉价机器
搭建的集上进⾏海量数据(结构化与⾮结构化)的存储与离线处理,⽬前能够让数千台普通、廉价的服务器组成⼀个稳定的、强⼤的集,使其能够对PB级别的⼤数据进⾏存储、计算。此外,Hadoop已经具有了强⼤稳定的⽣态系统,有很多延伸产品,如Hive,HBase,Sqoop,ZooKeeper等等。Hadoop的这些优势,使其成为⼤数据处理的⾸选平台和开发标准。我们⽬前进⾏的⼤数据学习研究也是基于Hadoop平台展开。
本报告主要包括以下⼏⽅⾯主题:
1. 分布式计算架构及分布式计算原理概述
2. Hadoop架构及集⽅式介绍
3. 基于Hadoop完全分布式集进⾏演⽰
1. 架构介绍
⼤数据处理平台依赖于分布式存储和分布式计算。本节主要包括以下⼏个要点:
1.1 分布式系统架构
分布式数据处理系统主要处理以下两⽅⾯的问题:
1. 存储 分布式存储系统,解决海量数据的存储及管理。典型的分布式存储系统有NFS,AFS,GFS,HDFS等等。
2. 计算 分布式计算系统,主要处理计算资源的调度,任务监控,系统容错,节点间协调等问题。⽐较典型的是MapReduce架构。1.2. Hadoop系统架构
Hadoop DFS
Hadoop分布式⽂件系统,简称HDFS,是⼀个分布式⽂件系统。它是⾕歌GFS的开源实现。具有较⾼的容错性,⽽且提供了⾼吞吐量的数据访问,⾮常适合⼤规模数据集上的应⽤,是⼀个⾼度容错性和⾼吞吐量的海量数据存储解决⽅案。
Hadoop MapReduce
MapReduce的名字源于这个模型中的两项核⼼操作:Map和Reduce。这是函数式编程(Functional Programming)中的两个核⼼概念。
MapReduce是⼀种简化的分布式编程模式,让程序⾃动分布到⼀个由普通机器组成的超⼤集上并发执⾏。如同Java程序员可以不考虑内存泄露⼀样,MapReduce的runtime系统会解决输⼊数据的分布细节,跨越机器集的程序执⾏调度,处理机器的失效,并且管理机器之间的通讯请求。这样的模式允许
程序员可以不需要有什么并发处理或者分布式系统的经验,就可以处理超⼤的分布式系统资源。这样的优势使得Hadoop在众多分布式存储和计算技术中脱颖⽽出,成为⼤数据分析与处理的标准平台。
2. 集⽅式
Hadoop有三种集⽅式可以选择:
Local (Standalone) Mode
Pseudo-Distributed Mode
Fully-Distributed Mode
以下分别予以介绍。
hadoop分布式集搭建Local (Standalone) Mode
Local (Standalone) Mode即单机模式,是⼀种⽆集模式,⽐较简单。⼀般成功安装Hadoop并配置相关环境变量(主要是
JAVA_HOME和HADOOP_HOME)后即可进⼊该模式,⽽⽆需额外配置。该模式并没有充分发挥分布式
计算的优势,因为集中只有⼀台主机,但是该模式下可以测试Hadoop及相关环境变量是否配置正常。
Pseudo-Distributed Mode
Pseudo-Distributed Mode即伪分布模式,它是单机集模式。Hadoop可以在单节点上以伪分布式的⽅式运⾏,Hadoop进程以分离的Java进程来运⾏,节点既作为NameNode也作为DataNode。伪分布式模式配置也很简单,只需在单机模式基础上配置core-
Fully-Distributed Mode
Fully-Distributed Mode(完全分布模式)是⼀种多机集模式。它不是⽤Java进程来模拟分布式计算中的各种⾓⾊,⽽是⽤真实的主机来充当分布式计算中NameNode,DataNode,SecondaryNameNode,ResouceManager,NodeManager等⾓⾊。这种模式的集能够完全体现分布式计算系统的⼯作原理。也是本次演⽰所采⽤的集模式。
从完全分布式的概念可知,配置这种模式⾄少需要3台主机。因为从分布式计算的逻辑上看,master是调度者的⾓⾊,⽽slave是执⾏者的⾓⾊,所以slave⾄少为2才能体现分布式计算的概念。
关于分布式系统中NameNode,DataNode,SecondaryNameNode,ResouceManager,NodeManager等⾓⾊的描述及相互之间的通信在⽹上有很多精彩的博客,此处不再赘述。
3. 系统部署
系统部署在实验室服务器(Windows Server 2008 R2 Enterprise)上,利⽤VMware Workstation软件创建多台虚拟机,模拟真实物理机,搭建了⼀个完全分布式的Hadoop分布式计算环境。
3.1 硬件环境
集共包括6台主机,每台主机4G内存,4x4核,拥有20GB SCSI硬盘。集中主机名和IP地址配置及主机在集中的⾓⾊如下表所⽰:
Table 1. Cluster Host Configuration
Index Host IP Role
1SprakMaster192.168.174.20NameNode,ResourceManager
2SprakSlave1192.168.174.21SecondaryNameNode,DataNode,NodeManager
3SprakSlave2192.168.174.22DataNode,NodeManager
4SprakSlave3192.168.174.23DataNode,NodeManager
5SprakSlave4192.168.174.24DataNode,NodeManager
6SprakSlave5192.168.174.25DataNode,NodeManager
注:在Hadoop应⽤中,还存在⼀种⾓⾊:Client,即负责提交计算任务(Job)的⽤户。在本系统中,集中任何⼀个节点均能成为client 提交Job。
3.2 软件环境
操作系统版本: CentOS-6.0-x86_64
Hadoop: 2.7.1
Java: jdk 1.7.0_79
4. 演⽰实例
本节给出5个实例,⽤于演⽰基于Hadoop完全分布式集进⾏MapReduce计算原理。
4.1 QuasiMonteCarlo
在Hadoop软件⽂档hadoop-mapreduce-examples-2.7.1.jar中提供了许多利⽤Hadoop进⾏MapReduce开发的demo,例如wordcount,pi 等。我们选择其中最简单的pi来测试我们刚刚搭建起来的集。
此处不选择经典的wordcount进⾏测试是因为pi这个demo更加简单,⽤户不需要指定输⼊⽂件路径和输出⽂件路径,程序中会⽣成数据作为mapper的输⼊。
⾸先简单介绍⼀下pi的⼯作原理,它是利⽤Monte Carlo⽅法估计圆周率值,类似“布丰投针”实验,都是根据⼤数定律的思想⽤频率逼近概率。阅读pi的源代码可以知道,程序通过产⽣随机数来模拟从⼀个边长为1的正⽅形中随机取点的过程,可知该点落在其内接圆内的概率为,从⽽可以根据落在内接圆内的点的频率来估计概率,进⽽求出值。
在命令⾏输⼊:
cd $HADOOP_HOME
hadoop jar \
share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.1.jar \
pi 10020000
第⼀个参数100指定⽣成的mapper的个数,第⼆个参数指定在每个mapper中要产⽣20000样本点(根据⼤数定律,样本点的个数⾜够⼤时样本均值才能逼近总体均值)。
程序运⾏结果如下:
计算⽤时94.319s,得到的估计值为3.1415。通过增⼤第2个参数,可以得到更⾼的估计精度。
4.2 Streaming
第2个实例也选⾃Hadoop官⽅⽂档。
以下是关于Hadoop Streaming机制的⼀些介绍。
Hadoop是基于Java开发的,⽽Streaming是Hadoo提供的⼀个能够利⽤其他编程语⾔来进⾏MapReduce开发的API。Hadoop Streaming并不复杂,其只是利⽤了Unix的标准输⼊输出作为Hadoop
和其他编程语⾔的开发接⼝,因⽽在其他编程语⾔所写的map和reduce过程中,只必需将标准输⼊作为map和reduce过程的输⼊,将标准输出作为map和reduce过程的输出即可。在标准输⼊输出
中,key和value是以tab作为分隔符,并且在reduce的标准输⼊中,Hadoop框架保证了输⼊的数据是按key排序的。
利⽤Streaming机制,⽤户可以使⽤Shell命令⾏,C语⾔程序,Python脚本,Perl脚本等来编写map程序和reduce程序(官⽹给出了具体实例),这样极⼤增强了MapReduce开发的灵活性。
在命令⾏输⼊以下命令:
hadoop fs -mkdir myInputDirs
hadoop fs -mkdir myOutputDirs
hadoop fs -put xxxx myInputDirs/xxxx
hadoop jar hadoop-streaming-2.7.1.jar \
-input myInputDirs \
-
output myOutputDir \
-mapper /bin/cat \
-reducer /usr/bin/wc
这个例⼦是通过*nix系统下的wc对⽂本⽂件中的字符和单词进⾏统计。其中mapper采⽤cat程序,只是将输⼊内容原封不动的输出给reducer;reducer为wc程序,完成实际的字符和单词统计⼯作。
以下是官⽹给出的利⽤Python脚本进⾏Streaming的⼀个实例:
hadoop jar hadoop-streaming-2.7.1.jar \
-input myInputDirs \
-output myOutputDir \
-mapper myPythonScript.py \
-reducer /usr/bin/wc \
-file myPythonScript.py
此外,Hadoop中还提供了Pipes机制。Hadoop Pipes是Hadoop MapReduce的C++接⼝。与利⽤标准输⼊输出的Hadoop Streaming 不同(当然Streaming也能够⽤于C++),Hadoop Pipes以Hadoop IPC通信时利⽤的socket作为管道,⽽不是标准输⼊输出。与Java 的接⼝不⼀样,Hadoop Pipes的key和value都是基于STL的string,因⽽在处理时开发⼈员必需⼿动地进⾏数据类型的转换。
4.3 ABCEntropy
这个实例是我在学习MapReduce编程时模仿WordCount编写的⼀个简单MapReduce程序,主要是将WordCount中统计单词改为了统计英⽂字符。最后根据统计结果计算英⽂⽂本的熵。采⽤的英⽂语料为⽹上下载的英⽂⽂学名著的txt⽂本,经过初步预处理(如剔除空⾏等等)后上传到HDFS,其⼤⼩达到227MB。
abcentropy.sh代码如下:
#! /bin/bash
INPUT_DIR=datasets/englishliterature
OUTPUT_DIR=abcentropy/output
TESTDATA=$INPUT_DIR/englishliterature.data
echo "preprocessing text material in englishliterature "
cat englishliterature/* | sed '/^\s*$/d'  > $INPUT_DIR/englishliterature.data
# echo "preprocessing completed successfully !"
# ls -hl $INPUT_DIR/englishliterature.data
hadoop fs -rm -r $INPUT_DIR
hadoop fs -mkdir -p $INPUT_DIR
echo "uploading data to $INPUT_DIR ..."
hadoop fs -put -f$TESTDATA$TESTDATA
echo "execute ABCEntropy on the cluster ..."
hadoop jar abc.jar bigben.demo.ABCEntropy $INPUT_DIR$OUTPUT_DIR -ow
echo "Finished!"
计算结果如下图所⽰:
由图可知:计算出英⽂字母的熵为4.17 bit。在⽹上查到的数据为4.03 bit。若假设英⽂26个字母完全等概,则英⽂⽂本的熵为 bit(实际英⽂⽂本的熵⼩于这个值)。从⽽说明计算得到的结果还是有⼀定可信度的。
4.4 Iris
iris为UCI(University of California Irvine)机器学习数据库中下载的鸢尾花数据集。
iris以鸢尾花的特征作为数据来源,常⽤在分类操作中,是进⾏分类算法性能分析的著名的benchmark。该数据集由3种不同类型的鸢尾花(Setosa(⼭鸢尾),Versicolour(杂⾊鸢尾)以及Virginica(维吉尼亚鸢尾))的150个样本数据构成。每个样本数据包含4个属性,分别是:
Sepal Length(花萼长度),单位是cm
Sepal Width(花萼宽度),单位是cm
Petal Length(花瓣长度),单位是cm
Petal Width(花瓣宽度),单位是cm
前段时间看到有些论⽂上⽤UCI数据集(iris,wine)来做聚类测试。故本例也尝试对iris数据集进⾏聚类,聚类结果在Matlab中⽤平⾏坐标法进⾏可视化。聚类采⽤两种⽅法,⼀种是利⽤Matlab⾃带的kmeans函数,⼀种是利⽤Mahout提供的k-means算法。最后对⽐⼆者的性能。
为了利⽤Mahout进⾏Kmeans聚类,需要将数据转换为Mahout能够处理的SequenceFile格式。⾸先将利⽤shell脚本将数据导出为空格分隔的⽂本⽂件,再利⽤Mahout中提供的org.apache.version.InputDriver将⽂本格式转换为SequenceFile,最后输⼊Mahout的k-means算法进⾏k-means聚类并分析结果。由于Mahout的k-means聚类输出结果不直观,为了便于在Matlab中画图,还需编写脚本对输出结果进⾏转换。
设置聚类算法⽣成3个簇,最⼤迭代次数maxIter为10,距离测度distanceMeasure采⽤默认的平⽅欧⽒距离。
以下是部分代码
iris.sh代码如下:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。