spring-kafka之KafkaListener注解深⼊解读
简介
Kafka⽬前主要作为⼀个分布式的发布订阅式的消息系统使⽤,也是⽬前最流⾏的消息队列系统之⼀。因此,也越来越多的框架对kafka做了集成,⽐如本⽂将要说到的spring-kafka。
Kafka既然作为⼀个消息发布订阅系统,就包括消息⽣成者和消息消费者。本⽂主要讲述的spring-kafka框架的kafkaListener注解的深⼊解读和使⽤案例。
解读
源码解读
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {
/**
* 消费者的id,当GroupId没有被配置的时候,默认id为GroupId
*/
String id() default "";
/**
* 监听容器⼯⼚,当监听时需要区分单数据还是多数据消费需要配置containerFactory 属性
*/
String containerFactory() default "";
/
**
* 需要监听的Topic,可监听多个,和 topicPattern 属性互斥
*/
String[] topics() default {};
/**
* 需要监听的Topic的正则表达。和 topics,topicPartitions属性互斥
*/
String topicPattern() default "";
/**
* 可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听,可配置该参数, 和 topicPattern 属性互斥
*/
TopicPartition[] topicPartitions() default {};
/**
*侦听器容器组
*/
String containerGroup() default "";
/**
* 监听异常处理器,配置BeanName
*/
String errorHandler() default "";
/**
* 消费组ID
*/
String groupId() default "";
/**
* id是否为GroupId
*/
boolean idIsGroup() default true;
/**
* 消费者Id前缀
*/
String clientIdPrefix() default "";
/**
* 真实监听容器的BeanName,需要在 BeanName前加 "__"
*/
String beanRef() default "__listener";
}
View Code
使⽤案例
ConsumerRecord类消费
使⽤ConsumerRecord类接收有⼀定的好处,ConsumerRecord类⾥⾯包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使⽤ConsumerRecord会是个不错的选择。如果使⽤具体的类型接收消息体则更加⽅便,⽐如说⽤String类型去接收消息体。
这⾥我们编写⼀个Listener⽅法,监听"topic1"Topic,并把ConsumerRecord⾥⾯所包含的内容打印到控制台中:
@Component
public class Listener {
private static final Logger log = Logger(Listener.class);
@KafkaListener(id = "consumer", topics = "topic1")
public void consumerListener(ConsumerRecord<Integer, String> record) {
log.info("sumer receive : " + String());
}
}
View Code
批量消费
批量消费在现实业务场景中是很有实⽤性的。因为批量消费可以增⼤kafka消费吞吐量,提⾼性能。
批量消费实现步骤:
1、重新创建⼀份新的消费者配置,配置为⼀次拉取10条消息
2、创建⼀个监听容器⼯⼚,命名为:batchContainerFactory,设置其为批量消费并设置并发量为5,这个并发量根据分区数决定,必须⼩于等于分区数,否则会有线程⼀直处于空闲状态。
3、创建⼀个分区数为8的Topic。
4、创建监听⽅法,设置消费id为“batchConsumer”,clientID前缀为“batch”,监听“batch”,使⽤“batchContainerFactory”⼯⼚创建该监听容器。
@Component
public class BatchListener {
private static final Logger log= Logger(BatchListener.class);
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
//⼀次拉取消息数量
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
NumberDeserializers.IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return props;
}
@Bean("batchContainerFactory")
public ConcurrentKafkaListenerContainerFactory listenerContainer() {
ConcurrentKafkaListenerContainerFactory container
= new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
//设置并发量,⼩于或等于Topic的分区数
container.setConcurrency(5);
//必须设置为批量监听
container.setBatchListener(true);
return container;
}
@Bean
public NewTopic batchTopic() {
return new NewTopic("topic.batch", 8, (short) 1);
}
@KafkaListener(id = "batchConsumer",clientIdPrefix = "batch"
,topics = {"topic.batch"},containerFactory = "batchContainerFactory")
public void batchListener(List<String> data) {
log.info("topic.batch receive : ");
for (String s : data) {
log.info( s);
}
}
}
View Code
监听Topic中指定的分区
使⽤@KafkaListener注解的topicPartitions属性监听不同的partition分区。
@TopicPartition:topic--需要监听的Topic的名称,partitions --需要监听Topic的分区id。
partitionOffsets --可以设置从某个偏移量开始监听,@PartitionOffset:partition --分区Id,⾮数组,initialOffset --初始偏移量。
@Bean
public NewTopic batchWithPartitionTopic() {
return new NewTopic("topic.batch.partition", 8, (short) 1);
}
@KafkaListener(id = "batchWithPartition",clientIdPrefix = "bwp",containerFactory = "batchContainerFactory",
topicPartitions = {
@TopicPartition(topic = "topic.batch.partition",partitions = {"1","3"}),
@TopicPartition(topic = "topic.batch.partition",partitions = {"0","4"},
partitionOffsets = @PartitionOffset(partition = "2",initialOffset = "100"))
}
)
public void batchListenerWithPartition(List<String> data) {
log.info("topic.batch.partition receive : ");
for (String s : data) {
log.info(s);
}
}
View Code
注解⽅式获取消息头及消息体
当你接收的消息包含请求头,以及你监听⽅法需要获取该消息⾮常多的字段时可以通过这种⽅式。。这⾥使⽤的是默认的监听容器⼯⼚创建的,如果你想使⽤批量消费,把对应的类型改为List即可,⽐如List<String> data , List<Integer> key。
@Payload:获取的是消息的消息体,也就是发送内容
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):获取发送消息的key
@Header(KafkaHeaders.RECEIVED_PARTITION_ID):获取当前消息是从哪个分区中监听到的
@Header(KafkaHeaders.RECEIVED_TOPIC):获取监听的TopicName
@Header(KafkaHeaders.RECEIVED_TIMESTAMP):获取时间戳
@KafkaListener(id = "params", topics = "topic.params")
public void otherListener(@Payload String data,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
log.info("topic.params receive : \n"+
"data : "+data+"\n"+
"key : "+key+"\n"+
"partitionId : "+partition+"\n"+
"topic : "+topic+"\n"+
"timestamp : "+ts+"\n"
);
}
View Code
使⽤Ack机制确认消费
Kafka是通过最新保存偏移量进⾏消息消费的,⽽且确认消费的消息并不会⽴刻删除,所以我们可以重复的消费未被删除的数据,当第⼀条消息未被确认,⽽第⼆条消息被确认的时候,Kafka会保存第⼆条消息的偏移量,也就是说第⼀条消息再也不会被所获取,除⾮是根据第⼀条消息的偏移量⼿动获
取。Kafka的ack 机制可以有效的确保消费不被丢失。因为⾃动提交是在kafka拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。
使⽤Kafka的Ack机制⽐较简单,只需简单的三步即可:
kafka最新版本1. 设置ENABLE_AUTO_COMMIT_CONFIG=false,禁⽌⾃动提交
2. 设置AckMode=MANUAL_IMMEDIATE
3. 监听⽅法加⼊Acknowledgment ack 参数
4.使⽤Consumer.seek⽅法,可以指定到某个偏移量的位置
@Component
public class AckListener {
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论