SpringBoot整合Kafka消息组件
1、Kafka是新⼀代的消息系统,也是⽬前性能最好的消息组件,在数据采集业务中被⼴泛应⽤。这⾥Kafka将基于Kerberos认证实现消息组件处理。修改l配置⽂件,追加依赖库配置,如下所⽰:
1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="/POM/4.0.0"
3 xmlns:xsi="/2001/XMLSchema-instance"
4 xsi:schemaLocation="/POM/4.0.0
5 /xsd/maven-4.0.0.xsd">
6 <modelVersion>4.0.0</modelVersion>
7 <parent>
8 <groupId>org.springframework.boot</groupId>
9 <artifactId>spring-boot-starter-parent</artifactId>
10 <version>2.3.5.RELEASE</version>
11 <relativePath /> <!-- lookup parent from repository -->
12 </parent>
13 <groupId&le</groupId>
14 <artifactId>demo</artifactId>
15 <version>0.0.1-SNAPSHOT</version>
16 <name>demo</name>
17 <description>Demo project for Spring Boot</description>
18
19 <properties>
20 <java.version>1.8</java.version>
21 <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
22 </properties>
23
24 <dependencies>
25 <dependency>
26 <groupId>org.springframework.boot</groupId>
27 <artifactId>spring-boot-starter-web</artifactId>
28 </dependency>
29
30 <dependency>
31 <groupId>org.springframework.boot</groupId>
32 <artifactId>spring-boot-starter-test</artifactId>
33 <scope>test</scope>
34 <exclusions>
35 <exclusion>
36 <groupId>org.junit.vintage</groupId>
37 <artifactId>junit-vintage-engine</artifactId>
38 </exclusion>
39 </exclusions>
40 </dependency>
41
42 <!-- mysql驱动包 -->
43 <dependency>
44 <groupId>mysql</groupId>
45 <artifactId>mysql-connector-java</artifactId>
46 </dependency>
47
48 <!-- druid连接池 -->
49 <dependency>
50 <groupId>com.alibaba</groupId>
51 <artifactId>druid</artifactId>
52 <version>1.1.10</version>
53 </dependency>
54
55 <dependency>
56 <groupId>org.springframework.boot</groupId>
57 <artifactId>spring-boot-starter-data-jpa</artifactId>
58 </dependency>
59 <dependency>
60 <groupId>org.springframework.boot</groupId>
61 <artifactId>spring-boot-starter-cache</artifactId>
62 </dependency>
63 <dependency>
64 <groupId>org.hibernate</groupId>
65 <artifactId>hibernate-ehcache</artifactId>
66 </dependency>
67
68 <!-- activeMQ -->
69 <dependency>
70 <groupId>org.springframework.boot</groupId>
71 <artifactId>spring-boot-starter-activemq</artifactId>
72 </dependency>
73
74 <!-- rabbitMQ -->
75 <dependency>
76 <groupId>org.springframework.boot</groupId>
77 <artifactId>spring-boot-starter-amqp</artifactId>
78 </dependency>
79
80 <!-- kafka -->
81 <dependency>
82 <groupId>org.springframework.kafka</groupId>
83 <artifactId>spring-kafka</artifactId>
84 </dependency>
85 </dependencies>
86
87 <build>
88 <plugins>
89 <plugin>
90 <groupId>org.springframework.boot</groupId>
91 <artifactId>spring-boot-maven-plugin</artifactId>
92 </plugin>
93 </plugins>
94 <resources>
95 <resource>
96 <directory>src/main/resources</directory>
97 <includes>
98 <include>**/*.properties</include>
99 <include>**/*.yml</include>
100 <include>**/*.xml</include>
101 <include>**/*.p12</include>
102 <include>**/*.html</include>
103 <include>**/*.jpg</include>
104 <include>**/*.png</include>
105 </includes>
106 </resource>
107 </resources>
108 </build>
109
110</project>
修改l配置⽂件,追加依赖库配置,如下所⽰:
1 # 定义主机列表
2 spring.kafka.bootstrap-servers=192.168.110.142:9092
3 # 定义主题名称
4 plate.default-topic=test
5 # 定义⽣产者配置
6 spring.kafka.producer.key-serializer=org.apache.kafkamon.serialization.StringSerializer
7 spring.kafka.producer.value-serializer=org.apache.kafkamon.serialization.StringSerializer
8 # 定义消费者配置
9 sumer.key-deserializer=org.apache.kafkamon.serialization.StringDeserializer
10 sumer.value-deserializer=org.apache.kafkamon.serialization.StringDeserializer
11 # 数据分组
12 up-id=group-1
使⽤Kafka消息机制实现消息发送接⼝,如下所⽰:
1 package com.demo.producer;
2
3 import org.springframework.beans.factory.annotation.Autowired;
4 import org.KafkaTemplate;
5 import org.springframework.stereotype.Service;
6
7 @Service
8public class KafkaMessageProducer {
9
10// kafka消息模板
11 @Autowired
12private KafkaTemplate<String, String> kafkaTemplate;
spring framework组件13
14public void send(String text) {
15// 发送消息
16this.kafkaTemplate.sendDefault("message-key", text);
17 }
18
19 }
建⽴⼀个Kafka消息的消费程序类,如下所⽰:
1 package sumer;
2
3 import org.apache.sumer.ConsumerRecord;
4 import org.springframework.kafka.annotation.KafkaListener;
5 import org.springframework.stereotype.Service;
6
7 @Service
8public class KafkaMessageConsumer {
9
10/**
11 * 进⾏消息接收处理
12 *
13 * @param record
14*/
15 @KafkaListener(topics = { "test" })
16public void receiveMessage(ConsumerRecord<String, String> record) {
17 println("【*** 接收消息 ***】 key = " + record.key() + " , value = " + record.value());
18 }
19
20 }
通过测试程序调⽤IMessageProducer接⼝进⾏消息发送,由于Kafka已经配置了⾃动创建主题,所以即使现在主题不存在,也不影响程序执⾏。
1 package ller;
2
3 import org.springframework.beans.factory.annotation.Autowired;
4 import org.springframework.stereotype.Controller;
5 import org.springframework.web.bind.annotation.RequestMapping;
6 import org.springframework.web.bind.annotation.ResponseBody;
7
8 import com.demo.producer.KafkaMessageProducer;
9
10 @Controller
11public class KafkaMessageController {
12
13 @Autowired
14private KafkaMessageProducer kafkaMessageProducer;
15
16 @RequestMapping(value = "/messageProducer")
17 @ResponseBody
18public void findAll() {
19for (int i = 0; i < 20000; i++) {
20if (i % 20 == 0) {
21try {
22 Thread.sleep(1000);
23 } catch (InterruptedException e) {
24 e.printStackTrace();
25 }
26 }
27 kafkaMessageProducer.send("Kafka producer message : " + i);
28 }
29 }
30 }
如果启动项⽬报下⾯的错误,如下所⽰:
1 WARN 17821 --- [gateway-logback] org.apache.kafka.clients.NetworkClient : [Producer clientId=framework-gateway-logback] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2 WARN 17821 --- [gateway-logback] org.apache.kafka.clients.NetworkClient : [Producer clientId=framework-gateway-logback] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
3 WARN 17821 --- [gateway-logback] org.apache.kafka.clients.NetworkClient : [Producer clientId=framework-gateway-logback] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
4 WARN 17821 --- [gateway-logback] org.apache.kafka.clients.NetworkClient : [Producer clientId=framework-gateway-logback] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.修改server.properties的两⾏默认配置,即可通过外⽹连接服务器Kafka,问题解决:
1 # 允许外部端⼝连接
2 listeners=PLAINTEXT://0.0.0.0:9092
3 # 外部代理地址
4 advertised.listeners=PLAINTEXT://192.168.110.142:9092
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论