Springboot⾃定义kafkaTemplate的bean实例进⾏⽣产消息和发
送消息
本⽂为博主原创,未经允许不得转载:
⽬录:
1. ⾃定义⽣产消息 kafkaTemplate 实例
2. 封装 kafka 发送消息的service ⽅法
3. 测试 kafka 发送消息service 的⽅法
4. ⾃定义 kafka 消费消息的⼯⼚ bean
5. kafka 监听消费消息
1. ⾃定义 kafkaTemplate 实例
a : 使⽤ @ConditionalOnProperty 注解属性控制是否加载 kafka 相关初始化配置,因为在项⽬开发过程中,如kafka 或redis 等⼯具容易封装为
⼯具类,被各微服务引⽤并进⾏加载。使⽤ @ConditionalOnProperty 注解的 havingValue 属性可以控制服务中是否进⾏加载对应的配置。
该属性的值,可在 yaml 配置⽂件中指定: kafka.used = true 。如果为true 则加载,false则不加载
b. 使⽤⼯⼚实例⽣成指定的 kafkaTemplate 实例
fig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafkamon.serialization.StringSerializer;
import org.springframework.dition.ConditionalOnProperty;
import t.annotation.Bean;
import t.annotation.Configuration;
import org.DefaultKafkaProducerFactory;
import org.KafkaTemplate;
import org.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@ConditionalOnProperty(prefix="kafka",name = "isClose",havingValue = "true")
public class KafkaTemplateConfig {
/**
* Producer Template 配置
*/
@Bean(name="kafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* Producer ⼯⼚配置
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/**
* Producer 参数配置
*/
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// 指定多个kafka集多个地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.125.26.68:9092");
// 重试次数,0为不启⽤重试机制
props.put(ProducerConfig.RETRIES_CONFIG, 0);
//同步到副本, 默认为1
// acks=0 把消息发送到kafka就认为发送成功
// acks=1 把消息发送到kafka leader分区,并且写⼊磁盘就认为发送成功
/
/ acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进⾏了同步就任务发送成功
props.put(ProducerConfig.ACKS_CONFIG, 1);
// ⽣产者空间不⾜时,send()被阻塞的时间,默认60s
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
// 控制批处理⼤⼩,单位为字节
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
// 批量发送,延迟为1毫秒,启⽤该功能能有效减少⽣产者发送消息次数,从⽽提⾼并发量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// ⽣产者可以使⽤的总内存字节来缓冲等待发送到服务器的记录
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
// 消息的最⼤⼤⼩限制,也就是说send的消息⼤⼩不能超过这个限制, 默认1048576(1MB)
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
// 键的序列化⽅式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 值的序列化⽅式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 压缩消息,⽀持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
// 消费者默认⽀持解压,所以压缩设置在⽣产者,消费者⽆需设置。
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
return props;
}
}
2. 封装 kafka 发送消息的service ⽅法:
ample.demo.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.urrent.ListenableFutureCallback;
import urrent.ExecutionException;
import urrent.TimeUnit;
import urrent.TimeoutException;
@Service
public class KafkaProduceService {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* producer 同步⽅式发送数据
*
* @param topic topic名称
* @param message producer发送的数据
*/
public void sendMessageSync(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException { kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
}
/**
* producer 异步⽅式发送数据
*
* @param topic topic名称
* @param message producer发送的数据
*/
public void sendMessageAsync(String topic, String message) {
kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("success");
}
@Override
public void onSuccess(Object o) {
System.out.println("failure");
}
});
}
}
3. 测试 kafka 发送消息service 的⽅法:
spring怎么读取yamlample.demo;
ample.demo.service.KafkaProduceService;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.st.context.SpringBootTest;
import st.context.junit4.SpringJUnit4ClassRunner;
import urrent.ExecutionException;
import urrent.TimeoutException;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProduceServiceTest {
@Autowired
private KafkaProduceService producerService;
@Test
public void sendMessageSync() throws InterruptedException, ExecutionException, TimeoutException {
producerService.sendMessageSync("test","同步发送消息测试");
}
@Test
public void sendMessageAsync() {
producerService.sendMessageAsync("test","异步发送消息测试");
}
}
4. ⾃定义 kafka 消费消息的⼯⼚ bean :
fig;
import org.apache.sumer.ConsumerConfig;
import org.apache.kafkamon.serialization.StringDeserializer;
import t.annotation.Bean;
import org.fig.ConcurrentKafkaListenerContainerFactory;
import org.fig.KafkaListenerContainerFactory;
import org.ConsumerFactory;
import org.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
public class KafkaConsumerConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
// 设置消费者⼯⼚
factory.setConsumerFactory(consumerFactory());
// 消费者组中线程数量
factory.setConcurrency(3);
// 拉取超时时间
// 当使⽤批量时需要设置为true
factory.setBatchListener(true);
return factory;
}
// @Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
// @Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
// Kafka地址
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.125.26.68:9092");
//配置默认分组,这⾥没有配置+在监听的地⽅没有设置groupId,多个服务会出现收到相同消息情况
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "defaultGroup");
// 是否⾃动提交offset偏移量(默认true)
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// ⾃动提交的频率(ms)
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
// Session超时设置
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
// 键的反序列化⽅式
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
/
/ 值的反序列化⽅式
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// offset偏移量规则设置:
// (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;⽆提交的offset时,从头开始消费
// (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;⽆提交的offset时,消费新产⽣的该分区下的数据 // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有⼀个分区不存在已提交的offset,则抛出异常 propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propsMap;
}
}
5. kafka 监听消费消息:
ample.demo.service;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumerListener {
@KafkaListener(topics = {"test"},groupId = "group1",
containerFactory="kafkaListenerContainerFactory")
public void kafkaListener(String message){
System.out.println(message);
}
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论