springboot2.x+kafka使⽤和源码分析五(消费者配置使⽤)上⼀章描述springboot对于kafka事务的⽀持,本章主要叙说springboot对于consumer⽀持。
这⾥通过两种⽅式
第⼀种:由springboot框架来初始化基础bean,我们只需要在yml配置⽂件中编写配置即可。如下图所⽰(常规配置具体所有配置可参考的consumer):
springboot初始化bena源码 :
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
/**
*初始化consumer⼯⼚类创建Consumer
*/
public class KafkaAutoConfiguration {
。。。。
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
}
。。。。
}
/**
*注⼊并⾏监听容器
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {
。。。。
/**
*创建MessageListenerContainer ⼯⼚类
*/
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
return factory;
}
@Configuration(proxyBeanMethods = false)
@EnableKafka
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
static class EnableKafkaConfiguration {
}
}
对于kafkaListenerContainerFactory  的⽤于创建MessageListenerContainer 消息监听容器类
MessageListenerContainer有两个实现类KafkaMessageListenerContainer,ConcurrentMessageListenerContainer
KafkaMessageListenerContainer:以单线程的⽅式消费topic中所有partition数据
ConcurrentMessageListenerContainer:以并⾏的⽅式消费topic中所有partition数据(开启多线程),每
⼀个线程会对应⼀个partition所有建议对于ConcurrentMessageListenerContainer的并发数与topic的partition数保持⼀致
MessageListenerContainer的作⽤在于对于MessageListener的管理
MessageListener这个接⼝的作⽤⼜是什么呢?⽤于消费topic中的数据,并对offset进⾏管理(2.3以后默认是⼿动提交)
springboot 对于MessageListener⼜默认8中实现:
1:public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
}
2:public interface AcknowledgingMessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
3:public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
4:public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
5:public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
6:public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
7:public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
8:public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
注:
1. offset提交⽅式为⾃动提交,使⽤此接⼝可以处理从Kafka consumer poll操作接收的单个ConsumerRecord实例。
2. offset提交⽅式为⼿动提交,使⽤此接⼝可以处理从Kafka consumer poll操作接收的单个ConsumerRecord实例。
3. offset提交⽅式为⾃动提交,使⽤此接⼝可以处理从Kafka consumer poll操作接收的单个ConsumerRecord实例,并提供对提供对
Consumer对象的访问。
4. offset提交⽅式为⼿动提交,使⽤此接⼝可以处理从Kafka consumer poll操作接收的单个ConsumerRecord实例,并提供对提供对
Consumer对象的访问。
5. offset提交⽅式为⾃动提交,使⽤此接⼝可以处理从Kafka consumer poll操作接收的批量ConsumerRecord实例。
6. offset提交⽅式为⼿动提交,使⽤此接⼝可以处理从Kafka consumer poll操作接收的批量ConsumerRecord实例。
7. offset提交⽅式为⾃动提交,使⽤此接⼝可以处理从Kafka consumer poll操作接收的批量ConsumerRecord实例,并提供对提供对
Consumer对象的访问。
8. offset提交⽅式为⼿动提交,使⽤此接⼝可以处理从Kafka consumer poll操作接收的批量ConsumerRecord实例,并提供对提供对
Consumer对象的访问。
KafkaMessageListenerContainer使⽤demo:
public KafkaMessageListenerContainer messageListenerContainer1(@Qualifier("consumerFactory") ConsumerFactory<Integer,String> consumerFactory)
//创建topic分区
TopicPartition topicPartition1 = new TopicPartition("springboot_test_topic",0);
//设置需要消费的分区偏移量
TopicPartitionOffset tpo1 = new TopicPartitionOffset(topicPartition1,0L,TopicPartitionOffset.SeekPosition.BEGINNING);
//创建topic分区
TopicPartition topicPartition2 = new TopicPartition("springboot_test_topic",1);
//设置需要消费的分区偏移量
TopicPartitionOffset tpo2 = new TopicPartitionOffset(topicPartition2,0L,TopicPartitionOffset.SeekPosition.BEGINNING);
//创建topic分区
TopicPartition topicPartition3 = new TopicPartition("springboot_test_topic",2);
//设置需要消费的分区偏移量
TopicPartitionOffset tpo3 = new TopicPartitionOffset(topicPartition3,0L,TopicPartitionOffset.SeekPosition.BEGINNING);
/
/设置topic的主题和分区可以指定从哪个分区哪个offset开始消费
//容器配置
ContainerProperties containerProperties = new ContainerProperties(tpo1,tpo2,tpo3);
/**
* 指定
*/
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
containerProperties.setClientId("springboot_test_topic_id");
containerProperties.setGroupId("springboot_test_topic_group");
containerProperties.setAckTime(6000);
containerProperties.setPollTimeout(3000);
containerProperties.setAckOnError(false);
//绑定message 消息监听⼿动提交单条消费
containerProperties.setMessageListener((AcknowledgingMessageListener<Integer, String>) (consumerRecord, acknowledgment) -> {
System.out.println("partition============>"+consumerRecord.partition());
System.out.println("offset============>"+consumerRecord.offset());
System.out.println("key============>"+consumerRecord.key());
System.out.println("value============>"+consumerRecord.value());
//确定消费完成 commit offset
acknowledgment.acknowledge();
});
//构建kafka消费者监听容器
KafkaMessageListenerContainer<Integer,String>  kafkaMessageListenerContainer =
new KafkaMessageListenerContainer<Integer,String>(consumerFactory,containerProperties);
//启动消费监听
kafkaMessageListenerContainer.start();
return kafkaMessageListenerContainer;
}
ConcurrentMessageListenerContainer使⽤demo:
public ConcurrentMessageListenerContainer messageListenerContainer(@Qualifier("consumerFactory")  ConsumerFactory<Integer,String> consumerFac        String topicName = "springboot_test_topic";
String topicName = "springboot_test_topic";
//创建topic分区
TopicPartition topicPartition1 = new TopicPartition(topicName,0);
//设置需要消费的分区偏移量
TopicPartitionOffset tpo1 = new TopicPartitionOffset(topicPartition1,0L,TopicPartitionOffset.SeekPosition.BEGINNING);
//创建topic分区
TopicPartition topicPartition2 = new TopicPartition(topicName,1);
//设置需要消费的分区偏移量
TopicPartitionOffset tpo2 = new TopicPartitionOffset(topicPartition2,0L,TopicPartitionOffset.SeekPosition.BEGINNING);
//创建topic分区
TopicPartition topicPartition3 = new TopicPartition(topicName,2);
//设置需要消费的分区偏移量
TopicPartitionOffset tpo3 = new TopicPartitionOffset(topicPartition3,0L,TopicPartitionOffset.SeekPosition.BEGINNING);
//ConsumerFactory设置topic的主题和分区
//容器配置
ContainerProperties containerProperties = new ContainerProperties(tpo1,tpo2,tpo3);
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
containerProperties.setClientId("springboot_test_topic_concurrent_id");
containerProperties.setGroupId("springboot_test_topic_concurrent_group");
containerProperties.setAckTime(6000);
containerProperties.setPollTimeout(3000);
containerProperties.setAckOnError(false);
//绑定message 消息监听⾃定提交单条消费存在丢失数据以及重复消费的问题
containerProperties.setMessageListener((AcknowledgingMessageListener<Integer, String>) (consumerRecord, acknowledgment) -> {
System.out.println("partition============>"+consumerRecord.partition());
System.out.println("offset============>"+consumerRecord.offset());
springboot框架的作用
System.out.println("key============>"+consumerRecord.key());
System.out.println("value============>"+consumerRecord.value());
//确定消费完成 commit offset
acknowledgment.acknowledge();
});
ConcurrentMessageListenerContainer<Integer,String> cmlc = new ConcurrentMessageListenerContainer(consumerFactory,containerProperties);
//是否设置虽容器⾃动启动
cmlc.setAutoStartup(true);
cmlc.setBeanName("concurrentMessageListenerContainer");
//设置并发数
cmlc.setConcurrency(3);
//启动消费监听
cmlc.start();
return cmlc;
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。