Kafka消息重新发送
Kafka消息重新发送
1、使⽤kafka消息队列做消息的发布、订阅,如果consumer端消费出问题,导致数据并没有消费,此时不需要担⼼,数据并不会⽴刻丢失,kafka会把数据在服务器的磁盘上默认存储7天,或者⾃⼰指定有两种⽅式:1)指定时间,ion.hours=168;2)指定⼤
⼩,log.segment.bytes=1073741824。此时就可以通过重置某个topic的offset来是消息重新发送,进⾏消费
2、查看topic的offset的范围
1)使⽤下⾯的命令可以查看topic为userlog,broker为spark:9092的offset的最⼩值:
#./kafka-run-class.ls.GetOffsetShell --broker-list spark:9092 -topic userlog --time -2
2)offset的最⼤值:
kafka命令#./kafka-run-class.ls.GetOffsetShell --broker-list spark:9092 -topic userlog --time -1
3、设置consumer group的offset
1)启动zookeeper,如果使⽤的是kafka内置的zookeeper,直接启动bin⽬录下的zookeeper-shell.sh,进⾏登录:
#./zookeeper-shell.sh  localhost:2181
通过如下命令,来重置offset值,⽐如topic:userlog,group:userlogs,partition:0 ,offset:2181
set /consumers/userlogs/offsets/userlog/0 1288
如果有多个partition都执⾏上输的命令,并将0换为相对应的分区号就可以了。
###如果创建分区的时候设置了zk的根⽬录,如localhost:2181/kafka
则重置的命令为:
set /kafka/consumers/userlogs/offsets/userlog/0 1288
2)如果使⽤单独安装的zookeeper,直接使⽤bin⽬录下的ZKCli.sh登录后,执⾏上述命令即可。
4、设置完成后需要重启相关的服务,就可以从设置offset的地⽅开始消费。
重启服务:consumer服务。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。