SpringKafka和SpringBoot整合实现消息发送与消费简单案例
本⽂主要分享下Spring Boot和Spring Kafka如何配置整合,实现发送和接收来⾃Spring Kafka的消息。
先前我已经分享了Kafka的基本介绍与集环境搭建⽅法。关于Kafka的介绍请阅读,关于Kafka安装请阅读,关于Kafka集环境搭建请阅读。这⾥关于服务器环境搭建不在赘述。
Spring Kafka整合Spring Boot创建⽣产者客户端案例
创建⼀个kafka-producer-master的maven⼯程。整个项⽬结构如下:
Maven的依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="/POM/4.0.0" xmlns:xsi="/2001/XMLSchema-instance"
xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId&h.kafka.producer</groupId>
<artifactId>producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-producer-master</name>
<description>demo project for kafka producer</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
&porting.outputEncoding>UTF-8</porting.outputEncoding>
<spring-kafka.version>2.1.5.RELEASE</spring-kafka.version>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- mvnrepository/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- mvnrepository/artifact/org.springframework.kafka/spring-kafka-test -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
<!-- mvnrepository/artifact/io.springfox/springfox-swagger2 -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.8.0</version>
</dependency>
<!-- mvnrepository/artifact/io.springfox/springfox-swagger-ui -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
使⽤application.properties配置应⽤程序
当然,根据个⼈喜好,你也可以使⽤l属性⽂件重写配置。Spring Boot会尝试根据l⽂件中指定的依赖关系⾃动配置应⽤程序,并设置合理的默认值。server.port=8000
spring.application.name=kafka-producer
#kafka configuration
spring.kafka.producer.bootstrap-servers=192.168.1.130:9092,192.168.1.101:9093,192.168.1.101:9094
spring.kafka.producer.key-serializer=org.apache.kafkamon.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafkamon.serialization.StringSerializer
#topic
pic.foo=test20180430
在上⾯的配置中,我给⽣产者分配的端⼝号是8000,服务器有3台,采⽤先前中配置的服务器。想了解关于kafka⽣产者相关的更多配置的话,可以阅读关于的配置信息。
使⽤Spring Boot发送Spring Kafka消息
SpringKafka提供了使⽤Producer的KafkaTemplate类发送消息,并提供将数据发送到Kafka主题的⾼级操作。提供异步和同步⽅法,异步⽅法返回Future。Spring Boot根据application.properties属性⽂件中配置的属性⾃动配置并初始化KafkaTemplate。为了⽅便测试发送消息,使⽤了Spring的定时任务,在类上使⽤@EnableScheduling 注解开启定时任务,通过@Scheduled注解指定发送消息规则。
h.kafka.producerponent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.urrent.ListenableFuture;
@Component
@EnableScheduling
public class KafkaMessageProducer {
private static final Logger LOG = Logger(KafkaMessageProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${pic.foo}")
private String topic;
@Scheduled(cron = "00/5 * * * * ?")
public void send() {
String message = "Hello World---" + System.currentTimeMillis();
LOG.info("topic="+topic+",message="+message);
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(success -> LOG.info("KafkaMessageProducer 发送消息成功!"),
fail -> ("KafkaMessageProducer 发送消息失败!"));
}
}
创建消息⽣产者启动类
h.kafka.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.t.properties.EnableConfigurationProperties;
@SpringBootApplication
@EnableConfigurationProperties
public class KafkaProducerApplication{
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}
}
⾄此,Spring Boot整合Spring Kafka消息⽣产者应⽤已经整合完毕。启动zookeeper、kafka各个服务器。启动⽣产者应⽤,查看消息⽣产者应⽤控制台⽇志,如下图说明整合OK。
当然在创建消息⽣产者类时,我们可以更加灵活,可以不使⽤定时任务,通过界⾯请求的⽅式,发送我们想要发送的内容。简单案例如下:消息发送者类
h.kafka.producer.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.urrent.ListenableFuture;
@Service
public class KafkaMessageSendService {
private static final Logger LOG = Logger(KafkaMessageSendService.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${pic.foo}")
private String topic;
public void send(String message){
LOG.info("topic="+topic+",message="+message);
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(success -> LOG.info("KafkaMessageProducer 发送消息成功!"),
fail -> ("KafkaMessageProducer 发送消息失败!"));
}
}
界⾯请求处理controller类
h.ller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
h.kafka.producer.service.KafkaMessageSendService;
@RestController
@RequestMapping(value="send",produces=MediaType.APPLICATION_JSON_UTF8_VALUE)
public class KafkaMessageSendController {
@Autowired
private KafkaMessageSendService kafkaMessageSendService;
@RequestMapping(value="/sendMessage",method=RequestMethod.POST)
public String send(@RequestParam(required=true) String message){
try {
kafkaMessageSendService.send(message);
} catch (Exception e) {
return "send failed.";
}
return message;
}
}
通过Swagger访问测试Controller服务请求
Spring Kafka整合Spring Boot创建消费者客户端案例:创建⼀个kafka-consumer-master的maven⼯程。整个项⽬结构如下:
Maven的依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="/POM/4.0.0" xmlns:xsi="/2001/XMLSchema-instance" xsi:schemaLocation="maven.apach
<modelVersion>4.0.0</modelVersion>spring boot选择题
<groupId&sumer</groupId>
<artifactId>consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-consumer-master</name>
<description>demo project for kafka consumer</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
&porting.outputEncoding>UTF-8</porting.outputEncoding>
<spring-kafka.version>1.3.4.RELEASE</spring-kafka.version>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- mvnrepository/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- mvnrepository/artifact/org.springframework.kafka/spring-kafka-test -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
注意,这是使⽤Spring-Kafka时⼀定要注意版本问题,否则会报各种奇葩错误。Spring官⽅⽹站上给出了(它的版本号要和kafka服务器的版本保持⼀致)的对应关系:
使⽤application.properties配置应⽤程序
Spring Boot会尝试根据l⽂件中指定的依赖关系⾃动配置应⽤程序,并设置合理的默认值。
server.port=8001
spring.application.name=kafka-consumer
#kafka configuration
#指定消息被消费之后⾃动提交偏移量,以便下次继续消费
able-auto-commit=true
#指定消息组
up-id=guan
#指定kafka服务器地址
sumer.bootstrap-servers=192.168.1.130:9092,192.168.1.101:9093,192.168.1.101:9094
#指定从最近地⽅开始消费(earliest)
sumer.auto-offset-reset=latest
sumer.key-deserializer=org.apache.kafkamon.serialization.StringDeserializer
sumer.value-deserializer=org.apache.kafkamon.serialization.StringDeserializer
#topic
pic.foo=test20180430
在上⾯的配置中,我给⽣产者分配的端⼝号是8000,服务器有3台,采⽤先前中配置的服务器。想了解关于kafka⽣产者相关的更多配置的话,可以阅读关于的配置信息。
使⽤Spring Boot消费Spring Kafka消息
通过使⽤@KafkaListener来注解⼀个⽅法Spring Kafka会⾃动创建⼀个消息容器。使⽤该注解,并指定要消费的topic(也可以指定消费组以及分区号,⽀持正则表达式匹配),这样,消费者⼀旦启动,就会监听kafka服务器上的topic,实时进⾏消费消息。
sumer.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import ssaging.MessageHeaders;
import ssaging.handler.annotation.Headers;
import ssaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class KafkaMessageConsumer {
private static final Logger LOG = Logger(KafkaMessageConsumer.class);
@KafkaListener(topics={"${pic.foo}"})
public void receive(@Payload String message, @Headers MessageHeaders headers){
LOG.info("KafkaMessageConsumer 接收到消息:"+message);
headers.keySet().forEach(key->LOG.info("{}: {}",(key)));
}
}
创建消息消费者启动类
sumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.t.properties.EnableConfigurationProperties;
@SpringBootApplication
@EnableConfigurationProperties
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论