Kafka:概述、体系架构、Kafka集部署、命令⾏操作、⼯作
流程
⽬录
1、Kafka(0.10.0.1)概述
1.1、消息队列JMS
1)点对点模式(⼀对⼀,消费者主动拉取数据,消息收到后消息清除)
点对点模型通常是⼀个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,⽽不是将消息推送到客户端。这个模型的特点是发送到队列的消息被⼀个且只有⼀个接收者接收处理,即使有多个消息监听者也是如此。
2)发布/订阅模式(⼀对多,数据⽣产后,推送给所有订阅者)
发布订阅模型则是⼀个基于推送的消息传送模型。发布订阅模型可以有多种不同的 订阅者,临时订阅者只在主动监听主题时才接收消息,⽽持久订阅者则监听主题的所有消息,即使当前订阅者不可⽤,处于离线状态。
1.2、消息队列的优点
1)异步处理
很多时候,⽤户不想也不需要⽴即处理消息。消息队列提供了异步处理机制,允许⽤户把⼀个消息放⼊队列,但并不⽴即处理它。想向队列中放⼊多少消息就放多少,然后在需要的时候再去处理它们。
2)应⽤解耦/扩展性/可恢复性
允许独⽴地扩展或修改两边的处理过程,只要确保它们遵守同样的接⼝约束;
因为消息队列解耦了处理过程,所以增⼤消息⼊队和处理的频率是很容易的,只要另外增加处理过程即可;
系统的⼀部分组件失效时,不会影响到整个系统,加⼊队列中的消息仍然可以在系统恢复后被处理。
3)流量消峰/缓冲
在访问量剧增的情况下,应⽤仍然需要继续发挥作⽤,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投⼊资源随时待命⽆疑是巨⼤的浪费。使⽤消息队列能够使关键组件顶住突发的访问压⼒,⽽不会因为突发的超负荷的请求⽽完全崩溃;
有助于控制和优化数据流经过系统的速度,解决⽣产消息和消费消息的处理速度不⼀致的情况。
4)顺序保证
hadoop分布式集搭建在⼤多使⽤场景下,数据处理的顺序都很重要。⼤部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。磁盘顺序读写⽐内存随机访问效率更⾼。
5)冗余
消息队列把数据进⾏持久化直到它们已经被完全处理,通过这⼀⽅式规避了数据丢失风险。许多消息队列所采⽤的"插⼊-获取-删除"范式中,在把⼀个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从⽽确保你的数据被安全的保存直到你使⽤完毕。
1.3、Kafka是什么
Kafka是⼀个分布式、⽀持分区的、多复本的、基于zookeeper协调的分布式消息系统。
1)最⼤的特性就是可以实时的处理⼤量数据以满⾜各种需求场景:⽐如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx⽇志、访问⽇志,消息服务等等,⽤scala语⾔编写。在流式计算中,Kafka ⼀般⽤来缓存数
据,SparkStreaming通过消费Kafka的数据进⾏计算。
2)Kafka是⼀个分布式消息队列,具有⽣产者、消费者的功能。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。相当于结合了JMS的两个模式:Producer以订阅的⽅式向brocker推送topic;Consumer以抓取的⽅式从brocker中抓取topic
3)Kafka对消息保存时根据Topic进⾏归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集有多个kafka实例组成,每个实例(server)成为broker。
4)⽆论是kafka集,还是producer和consumer都依赖于zookeeper集保存⼀些meta信息,来保证系统可⽤性
Kafka与消息队列对⽐
Kafka是⼀个流平台,在这个平台上可以发布和订阅数据流,并把它们保存起来、进⾏处理,这是构建Kafka的初衷。
Kafka有些类似消息系统,但Kafka以集的⽅式运⾏,可以⾃由伸缩;
作为数据连接层,Kafka提供了数据传递保证,即可复制、持久化,保存多久都可以;
消息系统只会传递消息,⽽Kafka的流式处理能⼒能够很动态地处理派⽣流和数据集;
Kafka与flume对⽐
flume
适合多个⽣产者;(⼀个⽣产者对应⼀个 Agent 任务)
适合下游数据消费者不多的情况;(多 channel 多 sink 会耗费很多内存)
适合数据安全性要求不⾼的操作;(实际中更多使⽤ Memory Channel)
适合与 Hadoop ⽣态圈对接的操作。(Cloudera 公司的特长)
kafka
适合数据下游消费者众多的情况;(开启更多的消费者任务即可,与 Kafka 集⽆关)
适合数据安全性要求较⾼的操作,⽀持replication。(数据放在磁盘⾥)
因此我们常⽤的⼀种模型是:
线上数据 --> flume(适合采集tomcat⽇志) --> kafka(离线/实时) --> flume(根据情景增删该流程) --> HDFS
Kafka与Hadoop对⽐
Kafka与可以看成是实时版的Hadoop,Hadoop可以存储和定期处理⼤量的数据⽂件,⽽Kafka可以存储和持续处理⼤型的数据流,它们之间的最⼤不同体现在持续的低延迟处理和批处理之间的差异上。Hadoop和⼤数据应⽤主要⽤于数据分析,⽽Kafka因其低延迟的特点更适合⽤在核⼼的业务应⽤上
Kafka与ETL⼯具对⽐
Kafka并⾮只是把数据从⼀个系统拆解出来再塞到另⼀个系统,它其实是⼀个⾯向实时数据流的平台。也就是说,它不仅可以将现有的应⽤程序和数据系统连接起来,还能够⽤于加强这些触发相同数据流的应⽤
2、Kafka体系架构
1)消息 / 键 / 批次
Kafka中的数据单元被称为消息。消息由字节数组组成,可理解为数据库中的⼀条"记录";
消息可以由⼀个可选的元数据,也就是键,键也是⼀个字节数组。键与消息对于Kafka来说没什么特殊的含义;
当消息以⼀种可控的⽅式写⼊不同的分区时会⽤到键,⽐如为键⽣成⼀个⼀致性散列值,然后使⽤散列值对topic分区数进⾏取模,为消息选取分区;
为了提⾼效率,消息被分批次写⼊Kafka。批次就是⼀组消息,这些消息属于同⼀topic和分区。批次⼤⼩的选择需要在时间延迟和吞吐量之间做出权衡
2)Topic
主题可理解为⼀个队列。Kafka的消息通过topic进⾏分类,topic就像是数据库中的"表"。topic可以被分成若⼲个分区(类似HBase中建表时预分区),⼀个分区对应⼀个提交⽇志。消息以追加的⽅式写⼊分区,然后以FIFO进⾏顺序读取,每个分区内的顺序是确定的
3)Producer
消息⽣产者,向 broker 推送消息的客户端。⼀个消息会被发布到⼀个特定的topic上,producer默认会将消息均衡地分布到主题的所有分区上(Hash分区器)
4)Consumer
消息消费者,向 broker 抓取消息的可独断。Consumer可以抓取同⼀个topic中的不同分区,并按照消息⽣成的顺序读取
Consumer通过检查消息的offset来区分已经读取过的消息。
offset是⼀种元数据,是⼀个不断递增的整数值,在创建消息时被添加到消息中。在⼀个分区内,每个消息的offset是唯⼀的。consumer 将每个分区最后读取的消息offset保存在zookeeper或kafka中,若consumer关闭或重启它的读取状态不会丢失。
5)Consumer Group
消费者是消费组的⼀部分,会有⼀个或多个consumer共同读取⼀个主题。消费组保证每个分区只能被⼀个consumer使⽤。主要⽤来提⾼consumer的可靠性,⼀个consumer失效可由同个CG中的consumer顶替。
Consumer Group还是 kafka ⽤来实现⼀个 topic 消息的⼴播(发给所有的 consumer)和单播(发给任意⼀个 consumer)的⼿段。⼀个 topic 可以有多个 CG。topic 的消息会被抓取到所有的 CG,但每个 partion 只会把消息发给该 CG 中的⼀个 consumer。如果需要实现⼴播,只要每个consumer 有⼀个独⽴的 CG 就可以了;实现单播只要所有的 consumer 在同⼀个 CG。
6)Partition
为了实现扩展性,⼀个⾮常⼤的 topic 可以分布到多个 broker(即服务器)上,⼀个 topic 可以分为多个 partition,每个 partition 是⼀个有序的队列。partition 中的每条消息都会被分配⼀个有序的 id(offset)。将消息发给consumer,kafka 只保证按⼀个 partition 中的消息的顺序,不保证⼀个 topic 的整体(多个 partition 间)的顺序。
leader/follower
每个分区对应⼀个leader,根据(复本数-1)决定了follower的数量,leader的选举由zookeeper决定。在producer写消息时只负责向leader中写,leader负责向follower节点发送数据进⾏复本冗余
分区的好处
实现负载均衡。分区对于消费者来说,可以提⾼并发度,提⾼效率。若针对于某⼀个 topic有n个分区,
我们就对应的建⼀个有m个消费者的消费者组。即:n⼤于或者等于m,最好是n=m。当n>m时,就意味着某⼀个消费者会消费多个分区的数据。不仅如此,⼀个消费者还可以消费多个 Topic 数据
7)brocker
⼀台 kafka 服务器就是⼀个 broker。⼀个集由多个 broker 组成。⼀个 broker 可以容纳多个 topic。
brocker接受来⾃producer的消息,为消息设置offset,并提交消息到磁盘保存;并对consumer读取分区的请求作出响应,返回已经提交到磁盘上的信息
注意,复本数不能⼤于brocker数(这与hdfs不同)
3、Kafka集部署
单机版与伪分布式略,以下为完全分布式的搭建
1)集规划
master1slave1slave2
zk zk zk
kafka(brocker)*2kafka(brocker)*2-
3)配置环境变量$KAFKA_HOME、$PATH
4)修改$KAFKA_HOME/config下配置⽂件server.properties
由于我规划中⼀台机器中有两个brocker(模拟伪分布,通常⼀个节点对应⼀个brocker),则对应两个配置⽂件,server-1.properties、server-2.properties。以下为⼀个节点中的配置,另⼀个节点类推
#broker的全局唯⼀编号,不能重复
broker.id=0  #第⼆个配置⽂件broker.id=1
#kafka运⾏⽇志存放的路径,若在同⼀个机器上多个brocker,⽇志⽬录不可相同
log.dirs=/home/jinge/apps/kafka/tmp/logs/master1-0  #第⼆个配置⽂件/home/jinge/apps/kafka/tmp/logs/master1-1
#kafka中⾃带了zookeeper,若使⽤独⽴zk需配置连接Zookeeper集地址
#⽣产者连接该brocker端⼝号
listeners=PLAINTEXT://master1:9092  #第⼆个配置⽂件PLAINTEXT://master1:9093
#删除topic功能使能,默认为false删不了topic
#处理⽹络请求的线程数量
numwork.threads=3
#⽤来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区⼤⼩
socket.send.buffer.bytes=102400
#接收套接字的缓冲区⼤⼩
#请求套接字的缓冲区⼤⼩
#topic在当前broker上的分区个数
num.partitions=1
#⽤来恢复和清理data下数据的线程数量
#segment⽂件保留的最长时间,超时将被删除(单位⼩时)
5)配置$KAFKA_HOME/config/zookeeper.properties
#与其他使⽤到zk的组件⽬录保持⼀致
dataDir=/home/jinge/apps/zookeeper/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
6)发送kafka⾄其他节点,注意broker.id的设置不得重复
7)分别在不同机器上开启kafka-server(broker)
#启动Kafka是⼀个阻塞进程,会打印我们操作kafka的⽇志,我们可以把窗⼝放到后台,在命令后⾯加⼀个与&符号,将该阻塞进程放到后台。@master1 kafka]$ kafka-server-start.sh config/server-1.properties &
@master1 kafka]$ kafka-server-start.sh config/server-2.properties &
@slave1 kafka]$ kafka-server-start.sh config/server-1.properties &
@slave1 kafka]$ kafka-server-start.sh config/server-2.properties &
8)关闭kafka-server

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。