在这个教程里面我们将学习如何创建Topologies, 并且把topologies部署到storm的集里面去。Java将是我们主要的示范语言, 个别例子会使用python以演示storm的多语言特性。
准备工作
这个教程使用storm-starter项目里面的例子。我推荐你们下载这个项目的代码并且跟着教程一起做。先读一下:配置storm开发环境和新建一个strom项目这两篇文章把你的机器设置好。
一个Storm集的基本组件
storm的集表面上看和hadoop的集非常像。但是在Hadoop上面你运行的是MapReduce的Job, 而在Storm上面你运行的是Topology。它们是非常不一样的 — 一个关键的区别是: 一个MapReduce Job最终会结束, 而一个Topology运永远运行(除非你显式的杀掉他)。
在Storm的集里面有两种节点: 控制节点(master node)和工作节点(worker node)。控制节点上面运行一个后台程序: Nimbus, 它的作用类似Hadoop里面的JobTracker。Nimbus负责在集里面分布代码,分配工作给机器, 并且监控状态。
每一个工作节点上面运行一个叫做Supervisor的节点。Supervisor会监听分配给它那台机器的工作,根据需要 启动/关闭工作进程。每一个工作进程执行一个Topology的一个子集;一个运行的Topology由运行在很多机器上的很多工作进程组成。
storm topology结构
Nimbus和Supervisor之间的所有协调工作都是通过一个Zookeeper集来完成。并且,nimbus进程和supervisor都是快速失败(fail-fast)和无状态的。所有的状态要么在Zooke
eper里面, 要么在本地磁盘上。这也就意味着你可以用kill -9来杀死nimbus和supervisor进程, 然后再重启它们,它们可以继续工作, 就好像什么都没有发生过似的。这个设计使得storm不可思议的稳定。
Topologies
为了在storm上面做实时计算, 你要去建立一些topologies。一个topology就是一个计算节点所组成的图。Topology里面的每个处理节点都包含处理逻辑, 而节点之间的连接则表示数据流动的方向。
运行一个Topology是很简单的。首先,把你所有的代码以及所依赖的jar打进一个jar包。然后运行类似下面的这个命令。
帮助
1 | strom jar all-your-code.jar backtype.storm.MyTopology arg1 arg2 |
这个命令会运行主类: backtype.strom.MyTopology, 参数是arg1, arg2。这个类的main函数定义这个topology并且把它提交给Nimbus。storm jar负责连接到nimbus并且上传jar文件。
因为topology的定义其实就是一个Thrift结构并且nimbus就是一个Thrift服务, 有可以用任何语言创建并且提交topology。上面的方面是用JVM
-based语言提交的最简单的方法, 看一下文章: 在生产集上运行topology去看看怎么启动以及停止topologies。
-based语言提交的最简单的方法, 看一下文章: 在生产集上运行topology去看看怎么启动以及停止topologies。
Stream
Stream是storm里面的关键抽象。一个stream是一个没有边界的tuple序列。storm提供一些原语来分布式地、可靠地把一个stream传输进一个新的stream。比如: 你可以把一个tweets流传输到热门话题的流。
storm提供的最基本的处理stream的原语是spout和bolt。你可以实现Spout和Bolt对应的接口以处理你的应用的逻辑。
spout的流的源头。比如一个spout可能从Kestrel队列里面读取消息并且把这些消息发射成
一个流。又比如一个spout可以调用twitter的一个api并且把返回的tweets发射成一个流。
bolt可以接收任意多个输入stream, 作一些处理, 有些bolt可能还会发射一些新的stream。一些复杂的流转换, 比如从一些tweet里面计算出热门话题, 需要多个步骤, 从而也就需要多个bolt。 Bolt可以做任何事情: 运行函数, 过滤tuple, 做一些聚合, 做一些合并以及访问数据库等等。
spout和bolt所组成一个网络会被打包成topology, topology是storm里面最高一级的抽象, 你可以把topology提交给storm的集来运行。topology的结构在Topology那一段已经说过了,这里就不再赘述了。
topology结构
topology里面的每一个节点都是并行运行的。 在你的topology里面, 你可以指定每个节点的并行度, storm则会在集里面分配那么多线程来同时计算。
一个topology会一直运行直到你显式停止它。storm自动重新分配一些运行失败的任务, 并且storm保证你不会有数据丢失, 即使在一些机器意外停机并且消息被丢掉的情况下。
数据模型(Data Model)
storm使用tuple来作为它的数据模型。每个tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型, 在我的理解里面一个tuple可以看作一个没有方法的java对象。总体来看,storm支持所有的基本类型、字符串以及字节数组作为tuple的值类 型。你也可以使用你自己定义的类型来作为值类型, 只要你实现对应的序列化器(serializer)。
topology里面的每个节点必须定义它要发射的tuple的每个字段。 比如下面这个bolt定义它所发射的tuple包含两个字段,类型分别是: double和triple。
帮助
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | public class DoubleAndTripleBolt implements IRichBolt { private OutputCollectorBase _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) { _collector = collector; } @Override public void execute(Tuple input) { int val = Integer(0); _it(input, new Values(val*2, val*3)); _collector.ack(input); } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("double", "triple")); } } |
declareOutputFields方法定义要输出的字段 : ["double", "triple"]。这个bolt的其它部分我们接下来会解释。
一个简单的Topology
让我们来看一个简单的topology的例子, 我们看一下storm-starter里面的ExclamationTopology:
帮助
1 2 3 4 5 6 | TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(1, new TestWordSpout(), 10); builder.setBolt(2, new ExclamationBolt(), 3) .shuffleGrouping(1); builder.setBolt(3, new ExclamationBolt(), 2) .shuffleGrouping(2); |
这个Topology包含一个Spout和两个Bolt。Spout发射单词, 每个bolt在每个单词后面加个”!!!”。这三个节点被排成一条线: spout发射单词给第一个bolt, 第一个bolt然后把处理好的单词发射给第二个bolt。如果spout发射的单词是["bob"]和["john"], 那么第二个bolt会发射["bolt!!!!!!"]和["john!!!!!!"]出来。
我们使用setSpout和setBolt来定义Topology里面的节点。这些方法接收我们指定的一个id, 一个包含处理逻辑的对象(spout或者bolt), 以及你所需要的并行度。
这个包含处理的对象如果是spout那么要实现IRichSpout的接口, 如果是bolt,那么就要实现IRichBolt接口.
最后一个指定并行度的参数是可选的。它表示集里面需要多少个thread来一起执行这个节点。如果你忽略它那么storm会分配一个线程来执行这个节点。
最后一个指定并行度的参数是可选的。它表示集里面需要多少个thread来一起执行这个节点。如果你忽略它那么storm会分配一个线程来执行这个节点。
setBolt方法返回一个InputDeclarer对象, 这个对象是用来定义Bolt的输入。 这里第一个Bolt声明它要读取spout所发射的所有的tuple — 使用shuffle grouping。而第二个bolt声明它读取第一个bolt所发射的tuple。shuffle grouping表示所有的tuple会被随机的分发给bolt的所有task。给task分发tuple的策略有很多种,后面会介绍。
如果你想第二个bolt读取spout和第一个bolt所发射的所有的tuple, 那么你应该这样定义第二个bolt:
帮助
1 2 3 | builder.setBolt(3, new ExclamationBolt(), 5) .shuffleGrouping(1) .shuffleGrouping(2); |
让我们深入地看一下这个topology里面的spout和bolt是怎么实现的。Spout负责发射新的tuple到这个topology里面 来。TestWordSpout从["nathan", "mike", "jackson", "golda", "bertels"]里面随机选择一个单词发射出来。TestWordSpout里面的nextTuple()方法是这样定义的:
帮助
1 2 3 thrift4 5 6 7 8 | public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = Int(words.length)]; _it(new Values(word)); } |
可以看到,实现很简单。
ExclamationBolt把”!!!”拼接到输入tuple后面。我们来看下ExclamationBolt的完整实现。
ExclamationBolt把”!!!”拼接到输入tuple后面。我们来看下ExclamationBolt的完整实现。
帮助
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 | public static class ExclamationBolt implements IRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { _it(tuple, new String(0) + "!!!")); _collector.ack(tuple); } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } |
prepare方法提供给bolt一个Outputcollector用来发射tuple。Bolt可以在任何时候发射tuple — 在prepare, execute或者cleanup方法里面, 或者甚至在另一个线程里面异步发射。这里prepare方法只是简单地把OutputCollector作为一个类字段保存下来给后面execute方法 使用。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论