Pulsar整合SpringCloud让Pulsar的配置可以热更新的⽅法完整代码git地址 gitee/zhaoyuxuan66/pulsar-springcloud_boot-demo/tree/master/
代码,包括Pulsar的参数类, Pulsar Client, Producer和Consumer
================Pulsar参数类=====================
@Data
@Component
@ConfigurationProperties(prefix = "tdmq.pulsar")
public class PulsarProperties {
/**
* 接⼊地址
*/
private String serviceurl;
/**
* 命名空间tdc
*/
private String tdcNamespace;
/**
* ⾓⾊tdc的token
*/
private String tdcToken;
/**
* 集name
*/
private String cluster;
/**
* topicMap
*/
private Map<String, String> topicMap;
/**
* 订阅
*/
private Map<String, String> subMap;
/**
* 开关 on:Consumer可⽤ ||||| off:Consumer断路
*/
private String onOff;
}
==================PulsarClient=======================
@Slf4j
@Configuration
@EnableConfigurationProperties(PulsarProperties.class)
public class PulsarConfig {
@Autowired
PulsarProperties pulsarProperties;
@RefreshScope
@Bean
public PulsarClient getPulsarClient() {
try {
return PulsarClient.builder()
.TdcToken()))
.Serviceurl())
.build();
} catch (PulsarClientException e) {
<("初始化Pulsar Client失败", e);
}
throw new RuntimeException("初始化Pulsar Client失败");
}
}
===========Producer&Consumer&发送消息的⼯具类=================
@Slf4j
@Component
public class PulsarUtils {
@Autowired
PulsarProperties pulsarProperties;
@Autowired
PulsarClient client;
@Autowired
AuditCommentResultListener<String> auditCommentResultListener;
@Autowired
AuditReplyResultListener<String> auditReplyResultListener;
@Autowired
AuditResourceResultListener<String> auditResourceResultListener;
/**
* 创建⼀个⽣产者
*
* @param topic topic name
* @return Producer⽣产者
*/
public Producer<byte[]> createProducer(String topic) {
try {
wProducer()
.Cluster() + "/" + TdcNamespace() + "/" + topic)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.sendTimeout(10, TimeUnit.SECONDS)
.blockIfQueueFull(true)
.create();
} catch (PulsarClientException e) {
<("初始化Pulsar Producer失败", e);
}
throw new RuntimeException("初始化Pulsar Producer失败");
}
/**
* 创建⼀个消费者
*
* @param topic          topic name
* @param subscription    sub name
* @param messageListener MessageListener的⾃定义实现类
* @return Consumer消费者
*/
public Consumer createConsumer(String topic, String subscription,
MessageListener messageListener) {
try {
wConsumer()
.Cluster() + "/" + TdcNamespace() + "/" + topic)
.subscriptionName(subscription)
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.messageListener(messageListener)
.subscribe()
;
} catch (PulsarClientException e) {
<("初始化Pulsar Consumer失败", e);
}
throw new RuntimeException("初始化Pulsar Consumer失败");
}
/**
* 异步send⼀条msg
*
* @param message 消息体
*/
public void sendMessage(String message, Producer<byte[]> producer) {
producer.Bytes()).thenAccept(msgId -> {
log.info("消息发送成功, MessageID为{}", msgId);
});
}
/**
* 同步发送⼀条msg
*
* @param message  消息体
* @param producer ⽣产者实例
*/
public void sendOnce(String message, Producer<byte[]> producer) throws PulsarClientException {
MessageId send = producer.Bytes());
log.info("消息成功发送, MessageId {},message {}", send, message);
}
//-----------consumer-----------
@RefreshScope
@Bean(name = "audit-resource-result-topic")
public Consumer getAuditResourceResultTopicConsumer() {
TopicMap().get("audit-resource-result-topic"),
auditResourceResultListener);
}
//-----------producer-----------
@RefreshScope
@Bean(name = "resource-publish-topic")
public Producer<byte[]> getResourcePublishTopicProducer() {
TopicMap().get("resource-publish-topic"));
}
}
=====================AbstractListener===============================
@Slf4j
@Component
public abstract class AbstractListener<String> implements MessageListener<String> {
@Autowired
PulsarProperties pulsarProperties;
@Override
public void received(Consumer<String> consumer, Message<String> message) {
}
/**
* 判断开关
*
* @return is equals off
*/
public boolean judgeIsOff() {
OnOff().equals("off");
}
}
=================Listener⾃定义实现类====================springcloud和springboot
@Slf4j
@Component
public class AuditCommentResultListener<String> extends AbstractListener<String> {
@Autowired
CommentService commentService;
@Override
public void received(Consumer consumer, Message msg) {
try {
java.lang.String data = new java.lang.Data());
log.info("接受到消息, MessageId {} data {}", MessageId(), data);
// 添加开关
if (super.judgeIsOff()) {
<("当前开关为off 拒绝消费消息, MessageId {} data {}", MessageId(), data);
}
// 处理业务逻辑
consumer.acknowledge(msg);
} catch (Exception e) {
<("拒绝消费消息, MessageId {} data {}", MessageId(), new java.lang.Data()), e);
}
}
}
=========================================================================
后来发现如上代码会导致BUG-> 在更新Nacos配置之后 Consumer会挂掉
经排查发现结果是由于@RefreshScope注解导致, 此注解将摧毁Bean, PulsarConsumer和Producer都将被摧毁,只是说Producer将在下⼀次调⽤中完成重启,Consumer则不能重启,因为没有调⽤. 那么怎么解决呢?我通过⽇志打印的信息
发现这⾏⽇志打印在Nacos 更新完配置之后跟进这个类
==============NacosContextRefresher 81⾏====================
private void registerNacosListener(final String groupKey, final String dataKey) {
String key = MapKey(dataKey, groupKey);
Listener listener = (Listener)this.listenerMapputeIfAbsent(key, (lst) -> {
return new AbstractSharedListener() {
public void innerReceive(String dataId, String group, String configInfo) {
NacosContextRefresher.this.nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "Refresh Nacos config"));
if (NacosContextRefresher.log.isDebugEnabled()) {
NacosContextRefresher.log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s", group, dataId, configInfo));
}
}
};
});
============================
关键就在这⾥我发现Nacos更新在更新了历史记录表之后⾛了这个⽅法 publishEvent(),我曾经尝试去监听RefreshEvent 但是这个事件仍然执⾏在@RefreshScope注解刷新容器事件之后, 我需要以⼀个延时任务的形式, 在监听到RefreshEvent之后, 延这样的做法不太优雅, 那么继续寻解决⽅案跟⼊这个publishEvent⽅法
============================
===========不⽤多说相信看过源码的朋友都知道该跟进哪⼀个================
来到了如图的这个地⽅
这就是Spring发布事件的⽅法, 打断点更新Nacos配置后,将发布什么事件根据⽇志的信息
到这个类
跟进refresh⽅法

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。