java卡夫卡接收消息_springboot整合spring-kafka实现发送接
收消息实例代码
前⾔
由于我们的新项⽬使⽤的是spring-boot,⽽⼜要同步新项⽬中建的数据到⽼的系统当中.原来已经有⼀部分的同步代码,使⽤的是kafka. 其实只是做数据的同步,我觉得选MQ没必要使⽤kafka.⾸先数据量不⼤,其实搞kafka⼜要搞集,ZK.只是⽤做⼀些简单数据同步的话,有点⼤材⼩⽤.
没办法,咱只是个打⼯的,领导让搞就搞吧.刚开始的时候发现有⼀个spring-integration-kafka,描述中说是基于spring-kafka做了⼀次重写.但是我看了官⽅⽂档.实在是搞的有点头⼤.功能⼀直没实现.⽂档写的也不是很漂亮,也可能是刚起步,有很多的问题.我这⾥只能放弃了,使⽤了spring-kafka.
实现⽅法
xmlns:xsi="/2001/XMLSchema-instance"
xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd">
4.0.0
org.linuxsogood.sync
linuxsogood-sync
1.0.0-SNAPSHOT
org.springframework.boot
spring-boot-starter-parent
1.4.0.RELEASE
1.8
3.3.1
1.2.4
3.3.6
4.1.1
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-jdbc
org.springframework.boot
spring-boot-starter-aop
org.springframework.boot
spring-boot-starter-freemarker
org.springframework.kafka
springboot aop
spring-kafka
1.1.0.RELEASE
junit
junit
4.12
test
org.assertj
assertj-core
3.5.2
org.hamcrest
hamcrest-all
1.3
test
mockito-all
1.9.5
test
org.springframework spring-test
4.2.3.RELEASE
test
org.springframework.boot spring-boot-starter-test test
mysql
mysql-connector-java com.microsoft.sqlserver sqljdbc4
4.0.0
com.alibaba
druid
1.0.11
mybatis
${mybatis.version}
mybatis-spring
${mybatis.spring.version}
mybatis-generator-core
1.3.2
compile
true
com.github.pagehelper
pagehelper
${pagehelper.version}
mapper
${mapper.version}
com.alibaba
fastjson
1.2.17
repo.spring.io.milestone
Spring Framework Maven Milestone Repository
repo.spring.io/libs-milestone
mybatis_generator
mybatis-generator-maven-plugin
1.3.2
true
true
org.springframework.boot
spring-boot-maven-plugin
org.linuxsogood.sync.Starter
orm层使⽤了MyBatis,⼜使⽤了通⽤Mapper和分页插件.
kafka消费端配置
import org.linuxsogood.sync.listener.Listener;
import org.apache.sumer.ConsumerConfig; import org.apache.kafkamon.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import t.annotation.Bean;
import t.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.fig.ConcurrentKafkaListenerContainerFactory;
import org.fig.KafkaListenerContainerFactory;
import org.ConsumerFactory;
import org.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.broker.address}")
private String brokerAddress;
@Bean
KafkaListenerContainerFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
return factory;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map consumerConfigs() {
Map propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "firehome-group");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propsMap;
}
@Bean
public Listener listener() {
return new Listener();
}
}
⽣产者的配置.
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafkamon.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import t.annotation.Bean;
import t.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.DefaultKafkaProducerFactory;
import org.KafkaTemplate;
import org.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.broker.address}")
private String brokerAddress;
@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。