SpringBoot整合Kafka消息组件
1、Kafka是新⼀代的消息系统,也是⽬前性能最好的消息组件,在数据采集业务中被⼴泛应⽤。这⾥Kafka将基于Kerberos认证实现消息组件处理。修改l配置⽂件,追加依赖库配置,如下所⽰:
1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="/POM/4.0.0"
3    xmlns:xsi="/2001/XMLSchema-instance"
4    xsi:schemaLocation="/POM/4.0.0
5    /xsd/maven-4.0.0.xsd">
6    <modelVersion>4.0.0</modelVersion>
7    <parent>
8        <groupId>org.springframework.boot</groupId>
9        <artifactId>spring-boot-starter-parent</artifactId>
10        <version>2.3.5.RELEASE</version>
11        <relativePath /> <!-- lookup parent from repository -->
12    </parent>
13    <groupId&le</groupId>
14    <artifactId>demo</artifactId>
15    <version>0.0.1-SNAPSHOT</version>
16    <name>demo</name>
17    <description>Demo project for Spring Boot</description>
18
19    <properties>
20        <java.version>1.8</java.version>
21        <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
22    </properties>
23
24    <dependencies>
25        <dependency>
26            <groupId>org.springframework.boot</groupId>
27            <artifactId>spring-boot-starter-web</artifactId>
28        </dependency>
29
30        <dependency>
31            <groupId>org.springframework.boot</groupId>
32            <artifactId>spring-boot-starter-test</artifactId>
33            <scope>test</scope>
34            <exclusions>
35                <exclusion>
36                    <groupId>org.junit.vintage</groupId>
37                    <artifactId>junit-vintage-engine</artifactId>
38                </exclusion>
39            </exclusions>
40        </dependency>
41
42        <!-- mysql驱动包 -->
43        <dependency>
44            <groupId>mysql</groupId>
45            <artifactId>mysql-connector-java</artifactId>
46        </dependency>
47
48        <!-- druid连接池 -->
49        <dependency>
50            <groupId>com.alibaba</groupId>
51            <artifactId>druid</artifactId>
52            <version>1.1.10</version>
53        </dependency>
54
55        <dependency>
56            <groupId>org.springframework.boot</groupId>
57            <artifactId>spring-boot-starter-data-jpa</artifactId>
58        </dependency>
59        <dependency>
60            <groupId>org.springframework.boot</groupId>
61            <artifactId>spring-boot-starter-cache</artifactId>
62        </dependency>
63        <dependency>
64            <groupId>org.hibernate</groupId>
65            <artifactId>hibernate-ehcache</artifactId>
66        </dependency>
67
68        <!-- activeMQ -->
69        <dependency>
70            <groupId>org.springframework.boot</groupId>
71            <artifactId>spring-boot-starter-activemq</artifactId>
72        </dependency>
73
74        <!-- rabbitMQ -->
75        <dependency>
76            <groupId>org.springframework.boot</groupId>
77            <artifactId>spring-boot-starter-amqp</artifactId>
78        </dependency>
79
80        <!-- kafka -->
81        <dependency>
82            <groupId>org.springframework.kafka</groupId>
83            <artifactId>spring-kafka</artifactId>
84        </dependency>
85    </dependencies>
86
87    <build>
88        <plugins>
89            <plugin>
90                <groupId>org.springframework.boot</groupId>
91                <artifactId>spring-boot-maven-plugin</artifactId>
92            </plugin>
93        </plugins>
94        <resources>
95            <resource>
96                <directory>src/main/resources</directory>
97                <includes>
98                    <include>**/*.properties</include>
99                    <include>**/*.yml</include>
100                    <include>**/*.xml</include>
101                    <include>**/*.p12</include>
102                    <include>**/*.html</include>
103                    <include>**/*.jpg</include>
104                    <include>**/*.png</include>
105                </includes>
106            </resource>
107        </resources>
108    </build>
109
110</project>
修改l配置⽂件,追加依赖库配置,如下所⽰:
1 # 定义主机列表
2 spring.kafka.bootstrap-servers=192.168.110.142:9092
3 # 定义主题名称
4 plate.default-topic=test
5 # 定义⽣产者配置
6 spring.kafka.producer.key-serializer=org.apache.kafkamon.serialization.StringSerializer
7 spring.kafka.producer.value-serializer=org.apache.kafkamon.serialization.StringSerializer
8 # 定义消费者配置
9 sumer.key-deserializer=org.apache.kafkamon.serialization.StringDeserializer
10 sumer.value-deserializer=org.apache.kafkamon.serialization.StringDeserializer
11 # 数据分组
12 up-id=group-1
使⽤Kafka消息机制实现消息发送接⼝,如下所⽰:
1 package com.demo.producer;
2
3 import org.springframework.beans.factory.annotation.Autowired;
4 import org.KafkaTemplate;
5 import org.springframework.stereotype.Service;
6
7 @Service
8public class KafkaMessageProducer {
9
10// kafka消息模板
11    @Autowired
12private KafkaTemplate<String, String> kafkaTemplate;
spring framework组件13
14public void send(String text) {
15// 发送消息
16this.kafkaTemplate.sendDefault("message-key", text);
17    }
18
19 }
建⽴⼀个Kafka消息的消费程序类,如下所⽰:
1 package sumer;
2
3 import org.apache.sumer.ConsumerRecord;
4 import org.springframework.kafka.annotation.KafkaListener;
5 import org.springframework.stereotype.Service;
6
7 @Service
8public class KafkaMessageConsumer {
9
10/**
11    * 进⾏消息接收处理
12    *
13    * @param record
14*/
15    @KafkaListener(topics = { "test" })
16public void receiveMessage(ConsumerRecord<String, String> record) {
17        println("【*** 接收消息 ***】 key = " + record.key() + " , value = " + record.value());
18    }
19
20 }
通过测试程序调⽤IMessageProducer接⼝进⾏消息发送,由于Kafka已经配置了⾃动创建主题,所以即使现在主题不存在,也不影响程序执⾏。
1 package ller;
2
3 import org.springframework.beans.factory.annotation.Autowired;
4 import org.springframework.stereotype.Controller;
5 import org.springframework.web.bind.annotation.RequestMapping;
6 import org.springframework.web.bind.annotation.ResponseBody;
7
8 import com.demo.producer.KafkaMessageProducer;
9
10 @Controller
11public class KafkaMessageController {
12
13    @Autowired
14private KafkaMessageProducer kafkaMessageProducer;
15
16    @RequestMapping(value = "/messageProducer")
17    @ResponseBody
18public void findAll() {
19for (int i = 0; i < 20000; i++) {
20if (i % 20 == 0) {
21try {
22                    Thread.sleep(1000);
23                } catch (InterruptedException e) {
24                    e.printStackTrace();
25                }
26            }
27            kafkaMessageProducer.send("Kafka producer message : " + i);
28        }
29    }
30 }
如果启动项⽬报下⾯的错误,如下所⽰:
1 WARN 17821 --- [gateway-logback] org.apache.kafka.clients.NetworkClient  : [Producer clientId=framework-gateway-logback] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2 WARN 17821 --- [gateway-logback] org.apache.kafka.clients.NetworkClient  : [Producer clientId=framework-gateway-logback] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
3 WARN 17821 --- [gateway-logback] org.apache.kafka.clients.NetworkClient  : [Producer clientId=framework-gateway-logback] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
4 WARN 17821 --- [gateway-logback] org.apache.kafka.clients.NetworkClient  : [Producer clientId=framework-gateway-logback] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.修改server.properties的两⾏默认配置,即可通过外⽹连接服务器Kafka,问题解决:
1 # 允许外部端⼝连接
2 listeners=PLAINTEXT://0.0.0.0:9092
3 # 外部代理地址
4 advertised.listeners=PLAINTEXT://192.168.110.142:9092

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。