spring整合kafka监听消费的配置过程
前⾔
最近项⽬⾥有个需求,要消费kafka⾥的数据。之前也⼿动写过代码去消费kafka数据。但是转念⼀想。既然spring提供了消费kafka的⽅法。就没必要再去重复造轮⼦。于是尝试使⽤spring的API。
项⽬技术背景,使⽤springMVC,XML配置和注解相互使⽤。kafka的配置都是使⽤XML⽅式。
整合过程
1. 引⼊spring-kafka的依赖包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
2. 在spring的xml⽂件⾥增加配置项,也可以单独创建⼀个l⽂件。
<!-- consumer configuration 该配置项可以根据⾃⼰业务的实际需求做增加或删除-->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
<entry key="group.id" value="group" />
<entry key="enable.automit" value="true" />
<entry key="automit.interval.ms" value="3000" />
springframework jar包下载<entry key="session.timeout.ms" value="10000" />
<entry key="key.deserializer"
value="org.apache.kafkamon.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafkamon.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
<!-- create factory 该类是spring jar包⾥提供,就这么配置-->
<bean id="consumerFactory" class="org.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties" />
</constructor-arg>
</bean>
<!-- ⾃定义的消费类,需要实现spring的接⼝ -->
<bean id="payPalConsumer"
class="com.sumer.PayPalConsumer" />
<!-- 该类也是jar包⾥提供的,注⼊的监听类是⾃⼰定义的,topic名称是配置⽂件引⼊的-->
<bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg name="topics" value="${pic.name}"/>
<property name="messageListener" ref="payPalConsumer" />
</bean>
<!-- 改类也是jar⾥提供的,把这个containerProperties和consumerfactory 注⼊ -->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="consumerFactory" />
<constructor-arg ref="containerProperties" />
</bean>
2. ⾃定义消费者类,消费者类依然可以使⽤注解。
/**
* get msg from kafka
*/
@Component
public class PayPalConsumer implements MessageListener<String, String> {
private static Logger logger = Logger(PayPalConsumer.class);
@Autowired
private XXService XXService;
@Override
public void onMessage(ConsumerRecord<String, String> authorizeRecord) {
String value = authorizeRecord.value();
if (StringUtils.isEmpty(value)){
logger.warn("receive message from kafka is null");
return;
}
logger.info("receive message from kafka is {}",value);
}
}
使⽤这个步骤配置,⼀次性过。⾮常顺利。
到此这篇关于spring 整合kafka监听消费的配置过程的⽂章就介绍到这了,更多相关spring 整合kafka内容请搜索以前的⽂章或继续浏览下⾯的相关⽂章希望⼤家以后多多⽀持!
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论