spring整合kafka监听消费
前⾔
最近项⽬⾥有个需求,要消费kafka⾥的数据。之前也⼿动写过代码去消费kafka数据。但是转念⼀想。既然spring提供了消费kafka的⽅法。就没必要再去重复造轮⼦。于是尝试使⽤spring的API。
项⽬技术背景,使⽤springMVC,XML配置和注解相互使⽤。kafka的配置都是使⽤XML⽅式。
整合过程
1. 引⼊spring-kafka的依赖包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
2. 在spring的xml⽂件⾥增加配置项,也可以单独创建⼀个l⽂件。
<!-- consumer configuration 该配置项可以根据⾃⼰业务的实际需求做增加或删除-->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"/>
<entry key="group.id" value="group"/>
<entry key="enable.automit" value="true"/>
<entry key="automit.interval.ms" value="3000"/>
<entry key="session.timeout.ms" value="10000"/>
<entry key="key.deserializer"
value="org.apache.kafkamon.serialization.StringDeserializer"/>
<entry key="value.deserializer"
value="org.apache.kafkamon.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>
<!-- create factory 该类是spring jar包⾥提供,就这么配置-->
<bean id="consumerFactory" class="org.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<!-- ⾃定义的消费类,需要实现spring的接⼝ -->
<bean id="payPalConsumer"
class="com.sumer.PayPalConsumer"/>
<!-- 该类也是jar包⾥提供的,注⼊的监听类是⾃⼰定义的,topic名称是配置⽂件引⼊的-->
<bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg name="topics" value="${pic.name}"/>
<property name="messageListener" ref="payPalConsumer"/>
</bean>
<!-- 改类也是jar⾥提供的,把这个containerProperties和consumerfactory 注⼊ -->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
</bean>
2. ⾃定义消费者类,消费者类依然可以使⽤注解。
/**
* get msg from kafka
*/
@Component
public class PayPalConsumer implements MessageListener<String, String> {
private static Logger logger = Logger(PayPalConsumer.class);
@Autowired
private XXService XXService;
@Override
public void onMessage(ConsumerRecord<String, String> authorizeRecord) {
String value = authorizeRecord.value();
if (StringUtils.isEmpty(value)){
logger.warn("receive message from kafka is null");
return;
}
logger.info("receive message from kafka is {}",value);
}
}
使⽤这个步骤配置,⼀次性过。⾮常顺利。spring framework是什么软件
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论