解决kafka消息堆积及分区不均匀的问题
⽬录
kafka消息堆积及分区不均匀的解决
1、先在kafka消息中创建
2、添加配置⽂件application.properties
3、创建kafka⼯⼚
4、展⽰kafka消费者
kafka出现若⼲分区不消费的现象
定位过程kafka为什么那么快
验证
解决⽅法
kafka消息堆积及分区不均匀的解决
我在环境中发现代码⾥⾯的kafka有所延迟,查看kafka消息发现堆积严重,经过检查发现是kafka消息分区不均匀造成的,消费速度过慢。这⾥由⾃⼰在虚拟机上演⽰相关问题,给⼤家提供相应问题的参考思路。
这篇⽂章有点遗憾并没重现分区不均衡的样例和Warning: Consumer group ‘testGroup1' is rebalancing. 这⾥仅将正确的⽅式展⽰,等后续重现了在进⾏补充。
主要有两个要点:
1、⼀个消费者组只消费⼀个topic.
2、factory.setConcurrency(concurrency);这⾥设置监听并发数为部署单元节点*concurrency=分区数量
1、先在kafka消息中创建
对应分区数⽬的topic(testTopic2,testTopic3)testTopic1由代码创建
./kafka-topics.sh --create --zookeeper 192.168.25.128:2181 --replication-factor 1 --partitions 2 --topic testTopic2
2、添加配置⽂件application.properties
kafka.broker=192.168.25.128:9092
automit.interval.time=60000
#up=customer-test
kafka.offset=earliest
kafka.automit=false
session.timeout.time=10000
3、创建kafka⼯⼚
package com.fig;
import org.apache.sumer.ConsumerConfig;
import org.apache.kafkamon.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import t.annotation.Bean;
import t.annotation.Configuration;
import org.fig.ConcurrentKafkaListenerContainerFactory;
import org.fig.KafkaListenerContainerFactory;
import org.ConsumerFactory;
import org.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @author yin
* @Date 2019/11/24 15:54
* @Method
*/
@Configuration
@Component
public class KafkaConfig {
@Value("${kafka.broker}")
private String broker;
@Value("${kafka.automit}")
private String autoCommit;
// @Value("${up}")
/
/private String testGroup;
@Value("${session.timeout.time}")
private String sessionOutTime;
@Value("${automit.interval.time}")
private String autoCommitTime;
@Value("${kafka.offset}")
private String offset;
@Value("${urrency}")
private Integer concurrency;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//监听设置两个个分区
factory.setConcurrency(concurrency);
//打开批量拉取数据
factory.setBatchListener(true);
//这⾥设置的是⼼跳时间也是拉的时间,也就说每间隔max.poll.interval.ms我们就调⽤⼀次poll,kafka默认是300s,⼼跳只能在poll的时候发出,如果连续两次poll的时候超过 //max.poll.interval.ms 值就会导致rebalance
//⼼跳导致GroupCoordinator以为本地consumer节点挂掉了,引发了partition在consumerGroup⾥的rebalance。
/
/ 当rebalance后,之前该consumer拥有的分区和offset信息就失效了,同时导致不断的报auto offset commit failed。
return factory;
}
private ConsumerFactory<String,String> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
//kafka的地址
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
//是否⾃动提交 Offset
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
// enable.automit 设置成 false,那么 automit.interval.ms 也就不被再考虑
//默认5秒钟,⼀个 Consumer 将会提交它的 Offset 给 Kafka
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
//这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。
//zookeeper.session.timeout.ms 默认值:6000
//ZooKeeper的session的超时时间,如果在这段时间内没有收到ZK的⼼跳,则会被认为该Kafka server挂掉了。
// 如果把这个值设置得过低可能被误认为挂掉,如果设置得过⾼,如果真的挂了,则需要很长时间才能被server得知。
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionOutTime);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//组与组间的消费者是没有关系的。
//topic中已有分组消费数据,新建其他分组ID的消费者时,之前分组提交的offset对新建的分组消费不起作⽤。
//propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup);
//当创建⼀个新分组的消费者时,set值为latest时,
// 表⽰消费新的数据(从consumer创建开始,后⽣产的数据),之前产⽣的数据不消费。
// blog.csdn/u012129558/article/details/80427016
//earliest 当分区下有已提交的offset时,从提交的offset开始消费;⽆提交的offset时,从头开始消费。
// latest 当分区下有已提交的offset时,从提交的offset开始消费;⽆提交的offset时,消费新产⽣的该分区下的数据。
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
//不是指每次都拉50条数据,⽽是⼀次最多拉50条数据()
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
return propsMap;
}
}
4、展⽰kafka消费者
@Component
public class KafkaConsumer {
private static final Logger logger = Logger(KafkaConsumer.class);
@KafkaListener(topics = "${pic1}",groupId = "${up1}",containerFactory = "kafkaListenerContainerFactory")
public void listenPartition1(List<ConsumerRecord<?, ?>> records,Acknowledgment ack) {
logger.info("testTopic1 recevice a message size :{}" , records.size());
try {
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
logger.info("received:{} " , record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = pic();
Thread.sleep(300);
logger.info("p1 topic is:{} received message={}",topic, message);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
ack.acknowledge();
}
}
@KafkaListener(topics = "${pic2}",groupId = "${up2}",containerFactory = "kafkaListenerContainerFactory")
public void listenPartition2(List<ConsumerRecord<?, ?>> records,Acknowledgment ack) {
logger.info("testTopic2 recevice a message size :{}" , records.size());
try {
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
logger.info("received:{} " , record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = pic();
Thread.sleep(300);
logger.info("p2 topic :{},received message={}",topic, message);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
ack.acknowledge();
}
}
@KafkaListener(topics = "${pic3}",groupId = "${up3}",containerFactory = "kafkaListenerContainerFactory")
public void listenPartition3(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
logger.info("testTopic3 recevice a message size :{}" , records.size());
try {
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
logger.info("received:{} " , record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = pic();
logger.info("p3 topic :{},received message={}",topic, message);
Thread.sleep(300);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
ack.acknowledge();
}
}
}
查看分区消费情况:
kafka出现若⼲分区不消费的现象
近⽇,有⽤户反馈kafka有topic出现某个消费组消费的时候,有⼏个分区⼀直不消费消息,消息⼀直积压(图1)。除了⼀直积压外,还有⼀个现象就是消费组⼀直在重均衡,⼤约每5分钟就会重均衡⼀次。具体表现为消费分区的owner⼀直在改变(图2)。
(图1)
(图2)
定位过程
业务侧没有报错,同时kafka服务端⽇志也⼀切正常,同事先将消费组的机器滚动重启,仍然还是那⼏个分区没有消费,之后将这⼏个不消费的分区迁移⾄别的broker上,依然没有消费。
还有⼀个奇怪的地⽅,就是每次重均衡后,不消费的那⼏个分区的消费owner所在机器的⽹络都有流量变化。按理说不消费应该就是拉取不到分区不会有流量的。于是让运维去拉了下不消费的consumer的jstack⽇志。⼀看果然发现了问题所在。
从堆栈看,consumer已经拉取到消息,然后就⼀直卡在处理消息的业务逻辑上。这说明kafka是没有问题的,⽤户的业务逻辑有问题。
consumer在拉取完⼀批消息后,就⼀直在处理这批消息,但是这批消息中有若⼲条消息⽆法处理,⽽业务⼜没有超时操作或者异常处理导致进程⼀直处于消费中,⽆法去poll下⼀批数据。
⼜由于业务采⽤的是autocommit的offset提交⽅式,⽽根据源码可知,consumer只有在下⼀次poll中才会⾃动提交上次poll的offset,所以业务⼀直在拉取同⼀批消息⽽⽆法更新offset。反映的现象就是该consumer对应的分区的offset⼀直没有变,所以有积压的现象。
⾄于为什么会⼀直在重均衡消费组的原因也很明了了,就是因为有消费者⼀直卡在处理消息的业务逻辑上,超过了max.poll.interval.ms(默认5min),消费组就会将该消费者踢出消费组,从⽽发⽣重均衡。
验证
让业务⽅去查证业务⽇志,验证了积压的这⼏个分区,总是在循环的拉取同⼀批消息。
解决⽅法
临时解决⽅法就是跳过有问题的消息,将offset重置到有问题的消息之后。本质上还是要业务侧修改业务逻辑,增加超时或者异常处理机制,最好不要采⽤⾃动提交offset的⽅式,可以⼿动管理。
以上为个⼈经验,希望能给⼤家⼀个参考,也希望⼤家多多⽀持。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论