SpringBoot下RocketMQListener如何发起onMessage⽅法(源码)⽰例
通过注解⽅式监听, 指定了消息的类型, 会⾃动转换, 当获取到消息后会⾃动调⽤onMessage()
@Component
@RocketMQMessageListener(topic ="topic-A", consumerGroup ="group1")
public class RocketMQListenerService implements RocketMQListener<String>{
public void onMessage(String message){
System.out.println(message);
}
}
源码
SpringBoot会默认开启⾃动配置模式, 扫描各jar包下的spring.factories⽂件, 并对⽂件中指定的类注⼊到容器中
spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
ketmq.spring.autoconfigure.RocketMQAutoConfiguration
RocketMQAutoConfiguration 通过Import⽅式, 向容器中导⼊了ListenerContainerConfiguration类
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass({ MQAdmin.class, ObjectMapper.class})
@ConditionalOnProperty(prefix ="rocketmq", value ="name-server")
@Import({ JacksonFallbackConfiguration.class, ListenerContainerConfiguration.class})
@AutoConfigureAfter(JacksonAutoConfiguration.class)
public class RocketMQAutoConfiguration {
.
..
}
ketmq.spring.autoconfigure.ListenerContainerConfiguration
实现了SmartInitializingSingleton接⼝, 在容器实例化bean之后会调⽤此bean的afterSingletonsInstantiated()
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
...
@Override
public void afterPropertiesSet()throws Exception {
// 默认初始化了push模式的consumer
initRocketMQPushConsumer();
log.debug("RocketMQ messageType: {}", Name());
}
@Override
public void afterSingletonsInstantiated(){
// 根据注解从容器中获取bean
Map<String, Object> beans =BeansWithAnnotation(RocketMQMessageListener.class);
Null(beans)){
// 逐个处理
beans.forEach(this::registerContainer);
}
}
private void registerContainer(String beanName, Object bean){
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
// 判断类型是否为RocketMQListener
if(!RocketMQListener.class.Class())){
throw new IllegalStateException(clazz +" is not instance of "+ Name());
}
// 获取注解信息并校验
RocketMQMessageListener annotation = Annotation(RocketMQMessageListener.class);
validate(annotation);
// ⽣成beanName -> ketmq.spring.support.DefaultRocketMQListenerContainer_1
String containerBeanName = String.format("%s_%s", Name(),
counter.incrementAndGet());
GenericApplicationContext genericApplicationContext =(GenericApplicationContext) applicationContext;
// 注册bean 类型为DefaultRocketMQListenerContainer 重点
// Spring在bean创建对象时会回调supplier, 即createRocketMQListenerContainer()创建对象
()->createRocketMQListenerContainer(bean, annotation));
// 从容器中获取对应的bean, 容器会对刚注⼊的bean创建对象, 会调⽤createRocketMQListenerContainer()
DefaultRocketMQListenerContainer container = Bean(containerBeanName,
DefaultRocketMQListenerContainer.class);
// 校验状态初始不是running状态
if(!container.isRunning()){
try{
// 执⾏start⽅法
container.start();
}catch(Exception e){
<("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(Object bean, RocketMQMessageListener annotation){ // 此处只是简单的对注解进⾏解析封装成⼀个ListenerContainer
DefaultRocketMQListenerContainer container =new DefaultRocketMQListenerContainer();
container.NameServer());
container.pic()));
container.sumerGroup()));
container.setRocketMQMessageListener(annotation);
container.setRocketMQListener((RocketMQListener) bean);
container.setObjectMapper(objectMapper);
return container;
}
}
接下来会执⾏start⽅法, 开启监听
sumer.DefaultMQPushConsumer#start
public void start()throws MQClientException {
// 此处重点开启监听
this.defaultMQPushConsumerImpl.start();
// 校验traceDispatcher是否为空不为空也开启start 默认不是空为AsyncTraceDispatcher, 但是会因为循环执⾏start 抛出异常打印log
if(null!= traceDispatcher){
try{
traceDispatcher.NamesrvAddr());
}catch(MQClientException e){
log.warn("trace dispatcher start failed ", e);
}
}
}
ketmq.sumer.DefaultMQPushConsumerImpl#start
public synchronized void start()throws MQClientException {
// 判断服务状态
switch(this.serviceState){
// 服务刚启动, 状态为CREATE_JUST
case CREATE_JUST:
// 先设置⼀个默认值
this.serviceState = ServiceState.START_FAILED;
// ⼀⼤堆校验
this.checkConfig();
// 将订阅者进⾏copy 保存到rebalance订阅者列表中
if(MessageModel()== MessageModel.CLUSTERING){
// 将进程id存⼊到InstanceName
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 利⽤缓存检查并创建⼀个客户端实例
this.mQClientFactory = Instance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,this.rpcHook); // 将信息存放到rebalance中
// 封装pull对象
this.pullAPIWrapper =new PullAPIWrapper(
mQClientFactory,
ConsumerGroup(),isUnitMode());
// 设置消息过滤器
isterFilterMessageHook(filterMessageHookList);
// 获取偏移量
if(OffsetStore()!=null){
this.offsetStore =OffsetStore();
}else{
// 判断消息类型⼴播消息使⽤本地offset 集消息使⽤远程offset
switch(MessageModel()){
case BROADCASTING:
this.offsetStore =new LocalFileOffsetStore(this.mQClientFactory,ConsumerGroup());
break;
case CLUSTERING:
this.offsetStore =new RemoteBrokerOffsetStore(this.mQClientFactory,ConsumerGroup());
break;
default:
break;
}
// 更新
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 调⽤load() remote空实现 local则从本地读取根据类型创建对应的实现对象
this.offsetStore.load();
// 消息获取⽅式顺序 or 并发(默认)
MessageListenerInner()instanceof MessageListenerOrderly){
new ConsumeMessageOrderlyService(this,(MessageListenerInner());
}else MessageListenerInner()instanceof MessageListenerConcurrently){
new ConsumeMessageConcurrentlyService(this,(MessageListenerInner());
}
// 开启线程清理过期消息
// 注册消费者
boolean registerOK = isterConsumer(ConsumerGroup(),this);
if(!registerOK){
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The consumer group["+Consumer
Group()
+"] has been created before, specify another name please."+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}
// client启动此时由于已经是running状态, 所以直接返回
mQClientFactory.start();
// 将当前类属性状态标记成running
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 从⽅法名称可以看出当订阅者变化后更新订阅者信息, 从NameServer中获取topic路由信息队列数量信息等
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
balanceImmediately();
}
ketmq.client.impl.factory.MQClientInstance#start
public void start()throws MQClientException {
synchronized(this){
switch(this.serviceState){
case CREATE_JUST:
// 执⾏启动
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if(null==NamesrvAddr()){
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
// 开启netty客户端⽤于向nameServer和broker通讯
this.mQClientAPIImpl.start();
// Start various schedule tasks
// 开启⼀些定时任务: 获取nameServer地址, 获取topic路由信息, 向所有broker发送⼼跳, 持久化
this.startScheduledTask();
// Start pull service
// 开启获取消息线程核⼼
this.pullMessageService.start();
// Start rebalance service
// 开启rebalance线程默认每20s对每⼀个消费者组进⾏执⾏⼀次rebalance, 从远端获取consumerId, 默认使⽤平均分配策略balanceService.start();
// Start push service
DefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK",this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object["+ClientId()+"] has been created before, and failed.",null);
springboot aopdefault:
break;
}
}
}
ketmq.sumer.PullMessageService#run
public void run(){
while(!this.isStopped()){
try{
// 从阻塞队列中获取⼀个元素当LinkedBlockingQueue<PullRequest>有元素时程序才会继续向下执⾏
PullRequest pullRequest =this.pullRequestQueue.take();
// 开始获取
this.pullMessage(pullRequest);
}catch(InterruptedException ignored){
}catch(Exception e){
<("Pull Message Service Run Method exception", e);
}
}
}
ketmq.sumer.DefaultMQPushConsumerImpl#pullMessage
public void pullMessage(final PullRequest pullRequest){
// 获取当前处理的队列⽤于校验队列消费状态
final ProcessQueue processQueue = ProcessQueue();
/
/ ⼀系列校验 ...
// 获取已缓存信息个数
long cachedMessageCount = MsgCount().get();
// 获取已缓存信息⼤⼩
long cachedMessageSizeInMiB = MsgSize().get()/(1024*1024);
// 已缓存个数超过限制
if(cachedMessageCount >PullThresholdForQueue()){

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。