kafka监听问题的解决和剖析
⽬录
问题如下:
⼀. 解决问题⼀(kafka监听不到数据)
⼆. 解决问题⼆(kafka为什么会有重复数据发送)
三. 解决问题三(kafka数据重复如何解决)
四. 解决问题四(为什么kafka会出现俩个消费端都可以消费问题)
五. 粘⼀下我的监听配置⽂件
总结
问题如下:
1. kafka为什么监听不到数据
2. kafka为什么会有重复数据发送
3. kafka数据重复如何解决
4. 为什么kafka会出现俩个消费端都可以消费问题
5. kafka监听配置⽂件
⼀. 解决问题⼀(kafka监听不到数据)
  ⾸先kafka监听不得到数据,检查如下
检查配置⽂件是否正确(可能会出现改了监听地址,监听Topic,监听的地址的数量问题)
检查接收数据的正确性(⽐如原⽣的代码,可能是⽤byte序列化接收的数据,⽽你接收使⽤String。也是配置⽂件序列化问题,还有与发送者商量问题)
检查kafka版本问题(⼀般的版本其实是没什么问题的,只有个别版本会出现监听不到问题)
没有加
@Component    犯了最不应该出差错的问题
  如果出现监听不到数据的问题,那么就试试更改⽅法⼀⼆,如果不可以在去试试⽅法三,之前出现这个问题也是查过⼀般查到都会说  “低版本的服务器接收不到⾼版本的⽣产者发送的消息”,但是净由测试使⽤⽤1.0.5RELEASE 和 2.6.3反复测试,并没有任何的问题。
如果按照版本⼀致,那么根本就不现实,因为可能不同的项⽬,springboot版本不⼀致的话,可能有的springboot版本低,那么你还得要求⾃⼰维护项⽬版本升级?如果出现第四种情况就⽆话可说了。
⼆. 解决问题⼆(kafka为什么会有重复数据发送)
  重复数据的发送问题如下
1. 可能在发送者的那⾥的事务问题。mysql存储事务发⽣异常导致回滚操作,但是kafka消息却是已经发送到了服务器中。此
事肯定会出现重复问题
2. ⽣产者设置时间问题,⽣产发送设置的时间内,消息没完成发送,⽣产者以为消费者挂掉,便重新发送⼀个,导致重复
3. offset问题,当项⽬重启,offset⾛到某⼀个位置已扔到kafka服务器中,但是项⽬被重启.那么offset会是在原本重启的那⼀个点
的地⽅再次发送⼀次,这是kafka设计的问题,防⽌出现丢失数据问题
三. 解决问题三(kafka数据重复如何解决)
  ⽬前我是使⽤的Redis进⾏的排重法,⽤的是Redis中的set,保证⾥⾯不存在重复,保证Redis⾥⾯不会存⼊太多的脏数据。并定期清理
  粘贴⼀下我的排重(Redis排重法)
//kafka prefix
String cache = "kafka_cache";
//kafka suffix
Calendar c = Instance();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//0点,⽬前是为了设置为这⼀天的固定时间。这个完全可以去写个⼯具类⾃⼰弄,为了看的更清楚,⿇烦了⼀点的写⼊
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd 00:00:00");
String gtimeStart = sdf2.Time());
long time = sdf.parse(gtimeStart).getTime();
//此位置为了设置是否是新的⼀天,新的⼀天需要设置定时时间,保证redis中不会存储太多⽆⽤数据
Boolean flag = false;
//数据接收
Set<String> range = new HashSet<>();
//判断是否存在
if (redisTemplate.hasKey(cache + time)) {
//存在则取出这个set
range = redisTemplate.opsForSet().members(cache + time);
}else {
//不存在,则为下⾯过期时间的设置铺垫
flag = true;
}
//判断监听到的数据是否是重复
if (ains("测试需要")) {
//重复则排出,根据逻辑⾃⼰修改
continue;
} else {
//添加进去
redisTemplate.opsForSet().add(cache + time, i+"");
if (flag){
kafka最新版本
//设置为24⼩时,保证新⼀天使⽤,之前使⽤的存储会消失掉
//不会在进⼊这个⾥⾯,如果多次的存⼊过期时间,那么这个key的过期时间就永远是24⼩时,⼀直就不会过期
flag = false;
}
}
四. 解决问题四(为什么kafka会出现俩个消费端都可以消费问题)
  原因是因为在不同groupId之下,kafka接收到以后,会给监听他的每⼀个组发送⼀个他所收到的消息,但是两个消费端监听同⼀个租,那么就只有⼀个消费端可以消费到。
五. 粘⼀下我的监听配置⽂件
# 指定kafka 代理地址,可以多个,⽤逗号间隔
spring.kafka.bootstrap-servers= localhost:9092
# 指定默认消费者group id
up-id= test
# 是否⾃动提交
able-auto-commit= true
# 提交间隔的毫秒
sumer.auto-commit-interval.ms=60000
# 最⼤轮询的次数
sumer.max-poll-records=1
# 将偏移量重置为最新偏移量
sumer.auto-offset-reset=earliest
# 指定消息key和消息体的编解码⽅式
sumer.key-deserializer=org.apache.kafkamon.serialization.StringDeserializer
sumer.value-deserializer=org.apache.kafkamon.serialization.StringDeserializer
总结
到此这篇关于kafka监听问题的解决和剖析的⽂章就介绍到这了,更多相关kafka监听问题内容请搜索以前的⽂章或继续浏览下⾯的相关⽂章希望⼤家以后多多⽀持!

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。