springboot+@KafkaListener⼿动提交及消费能⼒优化spring-boot 版本 1.5.12
依赖使⽤spring-kafka1.3.3(对应kafka-clients版本0.11.0.0,请使⽤于kafka版本对应版本的依赖)
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.12.RELEASE</version>
<relativePath/>
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.3.RELEASE</version>
</dependency>
1、⾃定义监听⼯⼚ (resources⽬录下⾯kafka.properties⽂件中定义对应参数)
##============== kafka =====================
#消费者并发启动个数(对应分区个数)每个listener⽅法
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${sumer.bootstrap.servers}")
private String servers;
@Value("${sumer.session.timout.ms}")
private String sessionTimeout;
@Value("${sumer.max.poll.interval.ms}")
private String pollInterval;
@Value("${sumer.ds}")
private String pollRecords;
@Value("${sumer.heartbeat.interval.ms}")
private String heartbeatInterval;
@Value("${up.id}")
private String groupId;
/**
* 消费者基础配置
*
* @return Map
*/
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>(9);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, pollRecords);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
/**
* ⾃定义 ConcurrentKafkaListenerContainerFactory 初始化消费者
*
* @return ConcurrentKafkaListenerContainerFactory
*/
@Bean("ackContainerFactory")
public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
}
/**
* 将监听者注⼊到IOC中,也可以采⽤注解⽅式,此⽅式只是为了便于确定监听者的分布
*
* @return MqSinkReceiver
*/
@Bean
public MqSinkReceiver listener() {
return new MqSinkReceiver();spring ioc注解
}
}
2、
public class MqSinkReceiver {
@Autowired
private MqListener mqListener;
private final LoggerUtilI logger = Class().getName());
/**
* 归档统计
*
* @param consumerRecord 消息体
* @param ack Acknowledgment
*/
@KafkaListener(id = "clusterPersonfileConsumer", topics = {"personfile-new-clustering"}, containerFactory = "ackContainerFactory")
public void inputPersonfileNewCluster(ConsumerRecord consumerRecord, Acknowledgment ack) {
if (consumerRecord != null) {
JSONObject jsonParam = JSONObject.parseObject(consumerRecord.value().toString());
logger.info("接收到数据平台的归档kafka消息" + String());
try {
mqListener.clusterStatistic(jsonParam);
if (ack != null) {
ack.acknowledge();
}
} catch (BusinessException | ParseException e) {
<("归档统计异常:" + e);
}
}
}
}
3、spring-boot容器即可
#消费者并发启动个数(对应分区个数)每个listener⽅法
将启动器的并发提⾼到和分区数⼀致
kafka 消费能⼒的提⾼
1、⾃动提交的实现
2、autoCommitIntervalMs 设置每次隔多久⾃动提交offset
3、kafka.max.poll.interval.ms 和 sessionTimeout
max.poll.interval.ms ,它表⽰最⼤的poll数据间隔,如果超过这个间隔没有发起pool请求,但heartbeat仍旧在发,就认为该consumer 处于 livelock状态。就会将该consumer退出consumer group
之后就会触发导致reblance
·heartbeat.interval.ms
⼼跳间隔。⼼跳是在consumer与coordinator之间进⾏的。⼼跳是确定consumer存活,加⼊或者退出group的有效⼿段。
这个值必须设置的⼩于session.timeout.ms,因为:
当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同⼀group 内的其它的consumer上。
通常设置的值要低于session.timeout.ms的1/3。
默认值是:3000 (3s)
·session.timeout.ms
Consumer session 过期时间。这个值必须设置在broker configuration中的group.min.session.timeout.ms 与group.max.session.timeout.ms之间。
其默认值是:10000 (10 s)
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论