springboot整合kafka案例demo
(1)打开zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
(2)打开kafka:
bin/kafka-server-start.sh config/server.properties
(3)创建topic:
bin/kafka-create-topic.sh --replica 2 --partition 8 --topic test --zookeeper 127.0.0.1:2181
(4)创建⽣产者:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
(5)创建消费者:
bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning
4.关闭kafka
bin/kafka-server-stop.sh
5.关闭zookeeper
bin/zookeeper-server-stop.sh
6.列出topic
bin/kafka-list-topic.sh --zookeeper localhost:2181
7.登录zookeeper
bin/zookeeper-shell.sh host_name:2181
需要安装包的可以在⽹上,不到的也可以给我邮件:roczheng@126
**windows下的操作
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\windows\kafka-server-start.bat config\server.properties
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Test bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic Test
bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic Test --from-beginning
例⼦都是在⽤kafkaRest来实现的,Rest的API学习地址:
afka是需要zk来⽀持.所以先下载zk
选择不带source的
下载下来解压2次
进⼊到 D:\zookeeper\apache-zookeeper-3.6.1-bin\conf ⽬录下,把zoo_sample.cfg重命名成zoo.cfg
修改zoo.cfg
会在这⾥⽣成数据⽂件
添加ZK环境变量
ZK到此配置完成.可以启动ZK了
或者打开cmd 输⼊zkserver
提⽰这个启动成功!
2.下载安装kafka
下载地址
下载最新版本
下载下来之后解压,得解压2次
进⼊到:D:\kafka\kafka_2.12-2.5.0\config
修改 server.properties
进⼊到 D:\kafka\kafka_2.12-2.5.0⽬录执⾏.\bin\windows\kafka-server-start.bat .\config\server.properties
kafka启动成功!
测试使⽤
zk和刚启动的kafka不要关闭
创建topic
进⼊到kafka的windows⽬录中执⾏以下命令,2181是zookeeper端⼝
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
创建topic成功!
再启动两个窗⼝,⼀个消费者,⼀个⽣产者
还进进⼊到kafka的windows⽬录下执⾏以下命令.9092是kafka的端⼝
⽣产者:kafka-console-producer.bat --broker-list localhost:9092 --topic demo
消费者:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo --from-beginning
启动的4个服务:
⼀、简介
Kafka 是⼀个实现了分布式的、具有分区、以及复制的⽇志的⼀个服务。它通过⼀套独特的设计提供了消息系统中间件的功能。它是⼀种发布订阅功能的消息系统。
1、名词介绍
Message
消息,就是要发送的内容,⼀般包装成⼀个消息对象。
Topic
通俗来讲的话,就是放置“消息”的地⽅,也就是说消息投递的⼀个容器。假如把消息看作是信封的话,那么 Topic 就是⼀个邮箱
Partition && Log
Partition 分区,可以理解为⼀个逻辑上的分区,像是我们电脑的磁盘 C:, D:, E: 盘⼀样,
Kafka 为每个分区维护着⼀份⽇志Log⽂件。
Producers(⽣产者)
和其他消息队列⼀样,⽣产者通常都是消息的产⽣⽅。
在 Kafka 中它决定消息发送到指定Topic的哪个分区上。
Consumers(消费者)
消费者就是消息的使⽤者,在消费者端也有⼏个名词需要区分⼀下。
⼀般消息队列有两种模式的消费⽅式,分别是 队列模式 和 订阅模式。
队列模式:⼀对⼀,就是⼀个消息只能被⼀个消费者消费,不能重复消费。⼀般情况队列⽀持存在多个消费者,但是对于⼀个消息,只会有⼀个消费者可以消费它。
订阅模式:⼀对多,⼀个消息可能被多次消费,消息⽣产者将消息发布到Topic中,只要是订阅改Topic的消费者都可以消费。
⼆、安装zookeeper
1、简介
Kafka使⽤zookeeper作为其分布式协调框架,很好的将消息⽣产、消息存储、消息消费的过程结合在⼀起。同时借助zookeeper,kafka 能够⽣产者、消费者和broker在内的所以组件在⽆状态的情况下,建⽴起⽣产者和消费者的订阅关系,并实现⽣产者与消费者的负载均衡。
2、下载zookeeper
可以到zookeeper官⽹下载
3、配置zookeeper
(1)下载解压完成后,来到conf⽂件夹下,有⼀个 zoo_sample.cfg 官⽅默认的配置⽂件。复制⼀份,重命名为 zoo.cfg
(2)配置,打开zoo.cfg 修改配置信息
复制代码
#存储内存中数据库快照的位置,如果不设置参数,更新事务⽇志将被存储到默认位置。
dataDir=…/zkData
#⽇志⽂件的位置
dataLogDir=…/zkLog
#监听端⼝
clientPort=2181
复制代码
(3)集配置
server.1=127.0.0.1:12888:1388
server.2=127.0.0.1:12889:1389
server.3=127.0.0.1:12887:1387
格式: server.A = B:C:D
A:是⼀个数字,表⽰第⼏号服务器
B:服务器IP地址
C:是⼀个端⼝号,⽤来集成员的信息交换,表⽰这个服务器与集中的leader服务器交换信息的端⼝D:是在leader挂掉时专门⽤来进⾏选举leader所⽤的端⼝
完整的配置⽂件如下
复制两份zookeeper解压好配置后的⽂件夹,命名为
在对应的⽂件下下⾯修改zoo.cfg的监听端⼝地址
⽐如:
第⼀个zookeeper-3.4.6程序 修改zoo.cfg 配置⽂件
clientPort=2181
第⼆个zookeeper-3.4.6-2程序 修改zoo.cfg 配置⽂件
clientPort=2182
第三个zookeeper-3.4.6-2程序 修改zoo.cfg 配置⽂件
clientPort=2183
创建ServerID
在配置的dataDir⽬录下⾯新建⼀个 myid ⽂件,⽂件内容就是对应的id号,
⽐如:
zookeeper-3.4.6程序 myid ⽂件的内容 为 1
zookeeper-3.4.6-2程序 myid ⽂件的内容 为 2
zookeeper-3.4.6-3程序 myid ⽂件的内容 为 3
我这边配置的⽬录是
启动zookeeper
在对应的bin⽬录下启动
这个版本
(2)配置
解压后到config⽂件夹下 打开server.properties配置⽂件进⾏配置
(3)配置内容
validation框架修改或新增以下配置信息
#唯⼀标识
broker.id=0
#监听端⼝
port=9092
host.name=127.0.0.1
#消息最⼤⼤⼩
message.max.bytes=50485760
#配置副本数量
#获取的最⼤⼤⼩
replica.fetch.max.bytes=50485760
#队列中消息持久化存放的位置,可以多个⽬录,⽤逗号分开
log.dirs=/tmp/kafka-logs
#默认的分区数
num.partitions=2
#对应着刚刚配置的zookeeper的三个ip与端⼝地址
(4)集配置
复制两份解压后的⽂件,命名如下
修改部分配置信息
在对应的server.properties中修改
#唯⼀标识
broker.id=0
broker.id=1
broker.id=2
#监听端⼝
port=9092
port=9093
port=9094
启动对应的kafka
进⼊到bin/windows⽬录下 启动kafka并指定配置⽂件
kafka-server-start.bat …/…/config/server.properties
启动过程中如果遇到Kafka中错误:
Unrecognized VM option ‘UseCompressedOops’ Error: Clould not create the Java Vritual Machine. Error: A fatal exception has occurres . Program will exit.
解决⽅案:
到bin/windows/kafka-run-class.bat ⽂件,
到112⾏左右

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。