SpringbootRabbitMq源码解析之RabbitListener注解
RabbitListener是Springboot RabbitMq中经常⽤到的⼀个注解,将被RabbitListener注解的类和⽅法封装成MessageListener注⼊MessageListenerContainer。
(1)当RabbitListener注解在⽅法上时,对应的⽅式就是Rabbit消息的。
(2)当RabbitListener注解在类上时,和RabbitHandle注解配合使⽤,可以实现不同类型的消息的分发,类中被RabbitHandle注解的⽅法就是Rabbit消息的。
⼀、EnableRabbit和RabbitBootstrapConfiguration
在已经提到,在springboot项⽬中已经通过⾃动配置类RabbitAutoConfiguration将EnableRabbit引⼊,⽽EnableRabbit⼜通过import 注解引⼊了配置类RabbitBootstrapConfiguration。
@Configuration
public class RabbitBootstrapConfiguration {
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
return new RabbitListenerAnnotationBeanPostProcessor();
}
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
}
可以看到,其向IOC容器注⼊了bean后置处理器RabbitListenerAnnotationBeanPostProcessor,同时也注⼊了⼀个默认的RabbitListenerEndpointRegistry。
⼆、RabbitListenerAnnotationBeanPostProcessor
RabbitListenerAnnotationBeanPostProcessor类实现了BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware, SmartInitializingSingleton接⼝,Ordered表⽰处理顺序,BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware主要⽤于获取对应的BeanFactory,BeanClassLoader, Environment属性,我们主要关注从SmartInitializingSingleton和BeanPostProcessor继承的⽅法。
1. RabbitListenerAnnotationBeanProcessor#afterSingletonsInstantiated
public void afterSingletonsInstantiated() {
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, RabbitListenerConfigurer> instances =
((ListableBeanFactory) this.beanFactory).getBeansOfType(RabbitListenerConfigurer.class);
for (RabbitListenerConfigurer configurer : instances.values()) {
}
}
if (EndpointRegistry() == null) {
if (dpointRegistry == null) {
Assert.state(this.beanFactory != null,
"BeanFactory must be set to find endpoint registry by bean name");
RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
RabbitListenerEndpointRegistry.class);
}
}
if (ainerFactoryBeanName != null) {
}
// Set the custom handler method factory once resolved by the configurer
MessageHandlerMethodFactory handlerMethodFactory = MessageHandlerMethodFactory();
if (handlerMethodFactory != null) {
}
// Actually register all listeners
// clear the cache - prototype beans will be re-cached.
}
初始化⼯作,主要是基于⾃定义配置RabbitListenerConfigurer进⾏RabbitListenerAnnotationBeanPostProcessor(尤其是registrar 元素)的初始化。
2. RabbitListenerAnnotationBeanPostProcessor#postProcessBeforeInitializationspring ioc注解
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
没有做任何处理。
3. RabbitListenerAnnotationBeanPostProcessor#postProcessAfterInitilization
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
Class<?> targetClass = TargetClass(bean);
final TypeMetadata metadata = peCacheputeIfAbsent(targetClass, this::buildMetadata);
for (ListenerMethod lm : metadata.listenerMethods) {
for (RabbitListener rabbitListener : lm.annotations) {
processAmqpListener(rabbitListener, lm.method, bean, beanName);
}
}
if (metadata.handlerMethods.length > 0) {
processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
}
return bean;
}
RabbitListenerAnnotationBeanPostProcessor#postProcessAfterInitilization是整个类中最核⼼的⽅法,对RabbitListener注解查和解析。具体分析见下⽂。
三、RabbitListenerAnnotationBeanPostProcessor对RabbitListener注解的解析
RabbitListenerAnnotationBeanPostProcessor#postProcessAfterInitilization⽅法在上⽂已经提到,此处针对具体⽅法逻辑进⾏解析。
1 RabbitListenerAnnotationBeanPostProcessor#buildMetadata
private TypeMetadata buildMetadata(Class<?> targetClass) {
Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<ListenerMethod> methods = new ArrayList<>();
final List<Method> multiMethods = new ArrayList<>();
ReflectionUtils.doWithMethods(targetClass, method -> {
Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
if (listenerAnnotations.size() > 0) {
methods.add(new ListenerMethod(method,
}
if (hasClassLevelListeners) {
RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
if (rabbitHandler != null) {
multiMethods.add(method);
}
}
}, ReflectionUtils.USER_DECLARED_METHODS);
if (methods.isEmpty() && multiMethods.isEmpty()) {
return TypeMetadata.EMPTY;
}
return new TypeMetadata(
}
前⾯我们已经提到,RabbitListener有两种⽤法:
当RabbitListener注解在⽅法上时,对应的⽅式就是Rabbit消息的。
当RabbitListener注解在类上时,和RabbitHandle注解配合使⽤,可以实现不同类型的消息的分发,类中被RabbitHandle注解的⽅法就是Rabbit消息的。
RabbitListenerAnnotationBeanPostProcessor就是针对每⼀个bean类进⾏解析,针对类上的RabbitListener注解、⽅法上的RabbitHandle注解和⽅法上的RabbitListener注解解析后封装到TypeMetadata类中。
通过RabbitListenerAnotationBeanPostProcessor#buildMetadata查并封装成TypeMetadata分别交给processAmqpListener和processMultiMethodListeners进⾏解析。
2 RabbitListenerAnnotationBeanPostProcessor的processAmqpListener和
processMultiMethodListeners⽅法
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
endpoint.setMethod(methodToUse);
endpoint.setBeanFactory(this.beanFactory);
endpoint.setReturnExceptions(urnExceptions()));
String errorHandlerBeanName = Handler(), "errorHandler");
if (StringUtils.hasText(errorHandlerBeanName)) {
endpoint.setErrorHandler(Bean(errorHandlerBeanName, RabbitListenerErrorHandler.class));
}
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}
private void processMultiMethodListeners(RabbitListener[] classLevelListeners, Method[] multiMethods,
Object bean, String beanName) {
List<Method> checkedMethods = new ArrayList<Method>();
for (Method method : multiMethods) {
checkedMethods.add(checkProxy(method, bean));
}
for (RabbitListener classLevelListener : classLevelListeners) {
MultiMethodRabbitListenerEndpoint endpoint = new MultiMethodRabbitListenerEndpoint(checkedMethods, bean);
endpoint.setBeanFactory(this.beanFactory);
processListener(endpoint, classLevelListener, bean, Class(), beanName);
}
}
RabbitListenerAnnotationBeanPostProcessor#processAmqpListener针对被RabbitListener注解的⽅法进⾏解析,RabbitListenerAnnotationBeanPostProcessot#processMultiMethodListeners针对RabbitListener注解的类中被RabbitHandle注解的⽅法进⾏解析。
这两个⽅法解析⽬标虽然不同,但逻辑并没有什么实质差别。
⾸先通过checkProxy⽅法对⽅法进⾏校验,然后新建MultiMethodRabbitListenerEndpoint对象,针对
两种⽅式的差异进⾏部分属性的初始化后交给RabbitListenerAnnotationBeanPostProcessor进⾏后续处理。
checkProxy针对jdk动态代理的情况进⾏检查,检查代理的⽬标接⼝是否含有对应⽅法。
3 RabbitListenerAnnotationBeanPostProcessor#processListener
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
Object adminTarget, String beanName) {
endpoint.setBean(bean);
endpoint.ssageHandlerMethodFactory);
endpoint.setId(getEndpointId(rabbitListener));
endpoint.setQueueNames(resolveQueues(rabbitListener));
endpoint.setConcurrency(urrency(), "concurr
ency"));
String group = up();
if (StringUtils.hasText(group)) {
Object resolvedGroup = resolveExpression(group);
if (resolvedGroup instanceof String) {
endpoint.setGroup((String) resolvedGroup);
}
}
String autoStartup = rabbitListener.autoStartup();
if (StringUtils.hasText(autoStartup)) {
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));
}
endpoint.lusive());
String priority = resolve(rabbitListener.priority());
if (StringUtils.hasText(priority)) {
try {
endpoint.setPriority(Integer.valueOf(priority));
}
catch (NumberFormatException ex) {
throw new BeanInitializationException("Invalid priority value for " +
rabbitListener + " (must be an integer)", ex);
}
}
String rabbitAdmin = resolve(rabbitListener.admin());
if (StringUtils.hasText(rabbitAdmin)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");
try {
endpoint.setAdmin(Bean(rabbitAdmin, RabbitAdmin.class));
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
adminTarget + "], no " + SimpleName() + " with id '" +
rabbitAdmin + "' was found in the application context", ex);
}
}
RabbitListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = ainerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
factory = Bean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
adminTarget + "] for bean " + beanName + ", no " + SimpleName() + " with id '" +    containerFactoryBeanName + "' was found in the application context", ex);
}
}
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。