Storm分布式实时计算系统搭建
Storm实时计算系统
Apache Storm 是⼀个免费的开源分布式实时计算系统。Storm 可以⾮常容易地实时处理⽆限的流数据。所谓实时处理是指在每条数据的产⽣时刻不确定的情况下,⼀旦有数据产⽣,系统就会⽴刻对该条数据进⾏处理。
Storm常⽤于实时分析,在线机器学习,持续计算,分布式RPC和ETL等。Storm速度很快,它在每个节点每秒可以轻松处理上百万条消息。同时,Storm是可伸缩的,容错的,可以保证每条数据⾄少处理⼀次(没有遗漏),并且易于设置和操作。
在Storm中,⼀个实时应⽤的计算任务被称为拓扑(Topology)。对⽐Hadoop MapReduce可以很好地理解: MapReduce上运⾏的是任务(Job),⽽Storm上运⾏的则是Topology; MapReduce任务最终会结束,⽽Topology会⼀直运⾏,除⾮显⽰地将其杀掉。通常使⽤Java 代码编写⼀个Topology任务,然后打包成jar,最后发布到Storm集中运⾏。
⼀个Topology好⽐⼀个⾃来⽔系统,由多个⽔龙头和多个⽔管转接⼝组成。其中的⽔龙头称为Spout,每⼀个转接⼝称为Bolt,⽽⽔则为数据流(Stream)。Stream是由多个Tuple组成的,Tuple是数据传递的基本单元,
是数据在Storm中流动的基本单位。Spout⽤于源源不断地读取Stream,然后发送给Blot,Blot对数据进⾏处理后,会将处理结果发送给下⼀个Blot,依次类推。
⼀个简单的Topology可以由1个Spout和1个Bolt组成;稍微复杂的Topology可以由1个Spout和多个Bolt组成;复杂的Topology由多个Spout和多个Bolt组成。Topology的Stream是有⽅向的,但是不能形成⼀个环状。具体来说,⼀个Topology是由不同的Spout和Blot,通过Stream连接起来的有向⽆环图。
⼀个复杂的Topology的数据流向如图所⽰
Topology的主要构成组件的详细解析如下:
Spout: Topology中数据的源头,通常不断地调⽤nextTuple()⽅法从外部源(如kafka等)读取最新的数据并将数据(Tuple)发射到Blot中,此外,Spout根据可靠性可以分为可靠Spout和⾮可靠的Spout。可靠Spou
t会重新发送Storm处理失败的Tuple;⾮可靠Spout将Tuple发送出去后,不再关⼼处理结果。
Bolt:接收Spout的数据并对其进⾏处理。Topology中的所有处理⼯作都是在Bolt中完成的,例如过滤,函数,集合,合并,写数据库等操作。Bolt在接收到消息后会调⽤execute()⽅法,该⽅法接收⼀个Tuple对象作为输⼊,也可以直接将处理结果进⾏持久化存储(例如,存储到MYSQL 或者HBase中)。
Tuple:消息传递的基本单元。
Stream: 源源不断传递的Tuple组成了Stream。Storm是⼀个实时计算的流式处理框架,通过不断从外部数据源中获取新的Tuple数据,然后将新的数据传递给Bolt处理,这样不断的获取与传输就形成了⼀个数据流(Stream)。
Storm集架构
Storm集架构与Hadoop类似,都是利⽤了分布式集中经典的主从式(Master/Slave)架构,在Storm中,Master节点被称为Nimbus,在该节点上会启动⼀个名为Nimbus的主控进程,类似于Hadoop集的ResourceManager; Slave节点被称为Supervisor,在该节点上会启动⼀个名为Supervisor的⼯作进程,类似于Hadoop集的NodeManager。Storm集的Master/Slave架构如下图所⽰。
客户端提交Topology代码给Nimbus,Nimbus负责分发Topology给Supervisor节点(⼯作节点),并通过Zookeeper监控Supervisor节点的状态和确定任务分配策略。
Supervisor会定时与Zookeeper同步,以便获取Topology信息,任务分配信息及各类⼼跳信息。每个Supervisor节点运⾏⼀个Supervisor 进程,Supervisor节点在接收Topology时并不是由Supervisor进程直接执⾏Topology,⽽是会根据需要启动⼀个或多个Worker进程,由Worker进程执⾏具体的Topology。每个Worker进程只能执⾏⼀个Topology,但是同⼀个Topology可以有多个Worker共同执⾏。因此,⼀个运⾏中的Topology通常都是由集中的多个节点中的多个Worker共同来完成的。
此外,Supervisor还会根据新的任务分配情况来调整Worker的数量并进⾏负载均衡。所以实践上,Topology最终都是分配到了Worker上。Storm集运⾏架构如下图
Topology的详细执⾏流程如下:
1. 客户端将写好的Topology代码(以jar包的形式)提交到Nimbus。
2. Nimbus对Topology进⾏校验,校验的内容包括:是否已经有同名的Topology正在运⾏,Topology中是否有两个Spout或Blot使⽤了相
同的ID(Topology代码中需要给Spout和Bolt指定ID)等。
3. Nimbus建⽴⼀个本地⽬录,⽤于存放Topology jar包和⼀些临时⽂件。
4. Nimbus将Topology的状态信息(Topology代码在Nimbus的存储位置等)同步到Zookeeper
1. Nimbus将Topology的配置计算Task(Spout或Bolt实例)的数量,并根据Zookeeper中存储的Supervisor的资源空闲情况计算Task的
任务分配(每个Task与⼯作节点及端⼝的映射,Task与Worker的对应关系),并计算结果同步到Zookeeper。
2. Supervisor从Zookeeper中获取Topology jar包所在的Nimbus的位置信息和Task的任务分配信息,从Nimbus相应位置下载jar包到
本地(⽆论是否由⾃⼰执⾏)。
3. Supervisor根据Task任务分配信息,启动相应的 Worker进程执⾏Task。
Worker进程包含⼀个或多个称为Executor的执⾏线程,⽤于执⾏具体的Task。Task是Storm中最⼩的处理单元,⼀个Task可以看作是⼀个Spout或Bolt实例。⼀个Executor可以执⾏⼀个或者多个Task(默认是1个)。Worker的⼯作⽅式如下图:
Storm依赖于Zookeeper进⾏数据状态的交互,状态数据存储在Zookeeper中,可以说,Zookeeper是Nimbus和Supervisor进⾏交互的中介。Nimbus通过在Zookeeper中写⼊状态信息来分配任务,通俗地讲就是指定那些Supervisor执⾏那些Task; ⽽Supervisor会定期访问Zookeeper的相应⽬录,查看是否有新的任务,有则领取任务。此外,Supervisor和Worker会定期发送⼼跳给Zookeeper,是Zookeeper可以监控集的运⾏状态,以便及时向Nimbus进⾏汇报。Storm的数据交互图如下所⽰
Storm流分组
Storm流分组(Stream grouping) ⽤于在定义⼀个Topology时,为Bolt指定它应该接收那些Stream作为输⼊。⼀个Stream Grouping定义了如何在Bolt的多个Task之间划分该Stream,即对Stream中的Tuple进⾏分组,使不同的Tuple进⼊不同的Task。
Storm中有8个内置的流分组⽅式,也可以通过实现CustomStreamGrouping接⼝来实现⾃定义流分组。
1. Shuffle grouping: 随机分组,也是通常情况下最常⽤的分组。Stream中的Tuple被随机分布在Bolt的Task中,以保证同⼀级Bolt上的
每个Task都能得到相等数量的Tuple,如下图
2.Fields grouping: 字段分组。通过为Tuple指定⼀个字段,根据指定的字段对Stream进⾏分组,字段值相同的Tuple会被分到同⼀个Task。例如,如果Stream按照”user-id”字段分组,具有相同”user-id”的Tuple会被分到相同的Task中,但是具有不同的”user-id”的Tuple可能会被分到不同的Task中如下图
3.Partial Key grouping: 通过对Stream指定字段进⾏分组,与Fields grouping类似,不同的是,该分组会在下游Bolt之间进⾏负载均衡,当发⽣数据倾斜时提供了更好的资源利⽤。
4.All grouping: 所有的Tuple会被复制分发到所有的Task中,相当于⼴播模式。该分组需要谨慎使⽤,如下
5.Global grouping: 全局(单选)分组。整个Stream会被分发到同⼀个Task中。实践上会被分发到ID最⼩的Task,如下图
6.None grouping: 此分组表⽰⽤户不关⼼Stream是如何被分发的。⽬前,该分组等同于Shuffle grouping。
7.Direct grouping: 这是⼀种特殊的分组。以这种⽅式分组的Stream意味着产⽣Tuple的Spout或Bolt⾃⼰明确指定Tuple被分配到Bolt的那个Task。使⽤该分组的Stream必须被声明为Direct Stream。发送到Direct Stream的Tuple必须使⽤OutputCollector类中的emitDirect()⽅法进⾏发送。
8.Local or shuffle grouping: 如果⽬标Bolt有⼀个或多个Task与Stream源(Tuple的发送者)在同⼀个Worker进程中,则Tuple会被分发到该Worker进程中的Task。否则,该⽅式与Shuffle grouping相同。
Storm集环境搭建
Storm安装依赖Java与Zookeeper,本节打算在三个节点上搭建Storm集,具体搭建步骤如下。
tar -zxvf apache-storm-2.1. -C /usr/local/ 解压storm安装包到/usr/local/⽬录下
重命名
mv apache-storm-2.1.0/ storm-2.1.0
执⾏下⾯的命令,修改环境变量⽂件/etc/profile:
vi /etc/profile
export STORM_HOME=/usr/local/storm-2.1.0
hadoop分布式集搭建export PATH=$PATH:$STORM_HOME/bin
然后执⾏source /etc/profile 命令刷新环境变量⽂件。
修改$STORM_HOME/conf中的⽂件storm-env.sh,加⼊以下内容,指定JDK与Storm配置⽂件的⽬录:
export JAVA_HOME=/usr/local/jdk1.8.0_192
export STORM_CONF_DIR="/usr/local/storm-2.1.0/conf"
(zookeeper与storm不在同⼀对应机器上,node-1,node-2,node-3分别为zookeeper所在的主机的地址)
修改$STORM_HOME/conf中的⽂件storm.yaml,添加以下内容(注意: “-”后的空格不能省略)
- "node-1"
- "node-2"
-
"node-3"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
storm.local.dir: "/usr/local/storm-2.1.0/data"
nimbus.seeds: ["centosnode01"]
上述配置属性解析如下。
supervisor.slots.ports: 定义Worker⽤于通信的端⼝号,端⼝的数量为每个Supervisor中的Worker数量的最⼤值。对于每个Supervisor⼯作节点,需要配置该⼯作节点可以运⾏的最⼤Worker数量。每个Worker占⽤⼀个单独的端⼝,该配置属性即⽤于定义哪些端⼝是可被Worker使⽤的。默认情况下,每个节点上可运⾏4个Worker,分别运⾏在6700,6701,6702,6703端⼝
storm.local.dir: Nimbus和Supervisor守护进程需要本地硬盘上的⼀个⽬录存储少量的状态信息.
nimbus.seeds: Nimbus的候选节点,此处只配置⼀个
创建该⽬录来存储少量的状态信息
mkdir -p /usr/local/storm-2.1.0/data
将配置好的Storm安装⽂件复制到集其他节点(centosnode02和centosnode03),命令如下
scp -r storm-2.1.0/ centosnode02:/usr/local/
scp -r storm-2.1.0/ centosnode03:/usr/local/
注意配置zookeeper地址
执⾏下⾯的命令,修改环境变量⽂件/etc/profile:
vi /etc/profile
export STORM_HOME=/usr/local/storm-2.1.0
export PATH=$PATH:$STORM_HOME/bin
然后执⾏source /etc/profile 命令刷新环境变量⽂件。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论