springcloudstreambinding源码与使⽤学习笔记
前⾔
刚看到Stream的功能是对接mq产品,以为就是包装⼀些mq产品接⼝,实现⾃动装配后统⼀使⽤。但看了⼀个简单的demo,是使⽤rabbitMq产品的binder,还有输⼊输出接⼝⽅法通过配置,来对应不同的mq产品。所以作者实现的功能是在⾃⼰的channel与mq产品之间做了⼀个binder,这样⽅便的改变配置就使⽤多个mq,也可以⽅便的换不同的mq。
但是这些stream的channel如何被实现的,实现类是什么,binder⼜是如何加载进来的,⼜是如何通过binding操作把两者绑在⼀起的?什么时候绑的?都值得了解⼀下。于是简单浏览了源码,并基于⼀个rabitmq的demo,补充写⼀个简单的binder进⾏了测试。
1. rabbit为例如何使⽤stream
1.1 基本的使⽤步骤
这个⽅便到⼀些相关的帖⼦
//1. pom中引⼊
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
//2. 定义各个stream⾃⼰的通道。Output的发送,Input的接收。这个接⼝类会被⼀个配置类,上⾯通过加@EnableBinding({MessageSource.class})来触发实现类。和@Enable***都差不多。
//这⾥有三个chnanel,⼀个接收,两个发送。
public interface MessageSource {
String NAME ="pao";//管道名称:"pao"
@Output(NAME)
MessageChannel pao();
@Output("liujunTopic")
MessageChannel liujunTopic();
@Input("liujunRevc")
MessageChannel liujunRevc();
}
//3. 配置⽂件application.properties中相关的内容。
//spring.cloud.stream.bindings.${channel-name}.destination
//bindings后⾯的就是上⾯的通道名字,表⽰这个通道将和哪个mq的binder绑定。destination表⽰toipc吧。
//相关的binder:rabbit,上⾯的pox中引⼊包的meta-inf中的spring.binders⽂件中有,⽐如:
//rabbit:org.springframework.cloud.stream.fig.RabbitServiceAutoConfiguration。
//前缀【rabbit】可以看到下⾯的default中有。指明的【RabbitServiceAutoConfiguration】这个⾃动配置⽂件中,会产⽣RabbitMessageChannelBinder.class这个类对象到容器中。
//liujunmq是我⾃⼰弄的⼀个binder,就是测试⽤的。
spring.cloud.stream.bindings.output.destination = ${pic}
spring.cloud.stream.bindings.pao.destination = test
spring.cloud.stream.bindings.input.destination = ${pic}
spring.cloud.stream.defaultBinder=rabbit
spring.cloud.stream.bindings.liujunTopic.binder=liujunmq
#spring.cloud.stream.bindings.liujunTopic.destination=liujunTest
spring.cloud.stream.bindings.liujunRevc.binder=liujunmq
//4. 使⽤2中的接⼝,在controller中,可以装配channel,按名字liujunTopic。后⾯就可以直接⽤它来发送了。
@Autowired
@Qualifier("liujunTopic")// Bean 名称
private MessageChannel liujunMessageChannel;
@GetMapping("/message/sendLiujun")
public Boolean sendLiujun(@RequestParam String message){
System.out.println("1. msg received through ");
liujunMessageChannel.send(MessageBuilder.withPayload(message).build());
return true;
}
1.2 测试项⽬与结果展⽰
四个红⾊的⽂件:有⾃⼰加的spring.binders⽂件,有channel接⼝,有controller类产⽣消息,也有消息到了mockmq后,消费消息的类。最下⾯的红框是⽇志结果。mockmq只是⼀个MsgHolder,可以写⼊消息,可以配置监听,把消息发给。
1.3 业务过程
⽤户使⽤channel发消息,由于channel在binding时给加了⼀个,收到消息后才发给mq的⽣产者。(客户⽤channel发
送,channel的收到,再转发给mqProducer)
使⽤channel接收消息时,由于binding会产⽣监听,作为mq的消费者,它得到mq消息后⽤这个channel发送。⽽⽤户设置了channel的监听,就收到了消息。(mqConsumer监听消息,⽤channel发送,客户的channel监听收到)
下⾯都看看channel是如何产⽣的,binder是如何进⼊容器的。binder什么时候给channel加上需要的,或者给mqConsumer加上的。
2. 源码分析
2.1 从@enableBinding开始
我们知道很多功能都是从@enable***开始的。这个注解可以加上⼏个channel的接⼝。
具体是导⼊这些类:@Import({ BindingServiceConfiguration.class, BindingBeansRegistrar.class,
BinderFactoryConfiguration.class,SpelExpressionConverterConfiguration.class })
2.2 导⼊的BindingServiceConfiguration.class
看名字是【绑定服务】的配置,这是个@Configuration的类。主要看下⾯三个类:
new BindingService(bindingServiceProperties, binderFactory);
new OutputBindingLifecycle();–>bindable.bindOutputs(bindingService);
new InputBindingLifecycle();–>bindable.bindInputs(bindingService);
后⾯两个都实现了smartlifecycle接⼝,在容器启动时,也会执⾏start(),这时会通过bindingService来进⾏所有的绑定。这就是绑定时机,另外在stop()时,还会⽤bindingService进⾏unbinding操作。
初步看看bindingService的主要操作:
bindProducer:getBinder得到binder,再⽤它binder.bindProducer(bindingTarget, output,
producerProperties);—参数主要是stream的channel与属性值。
bindConsumer:getBinder得到binder,再⽤它binder.bindConsumer(target,
consumerProperties);—参数主要是stream的channel与属性值。
绑定服务有了,要绑定的两个对象还没有看到。⼀个channel将与⼀个mq产品的⽣产者或者消费者进⾏绑定。
2.3 导⼊的BindingBeansRegistrar.class
这个是⽤于注册bean定义的类,⽤于处理@EnableBinding中的值,也就是channel接⼝的类,应该就是被绑定的对象了。
//⼀般对于接⼝,肯定是动态代理产⽣⼀个类。这个类⼀般通过⼀个factoryBean的getObject()⽅法得到。⽐如duboo中,对接⼝的实现就是把请求代理成⼀个远程的消息发送。
//
if(type.isInterface()){
RootBeanDefinition rootBeanDefinition =new RootBeanDefinition(BindableProxyFactory.class);
rootBeanDefinition.addQualifier(new AutowireCandidateQualifier(Bindings.class, parent));
}
重点看BindableProxyFactory这个类,是个⼯⼚bean。
*{@link FactoryBean}for instantiating the interfaces specified via
*{@link EnableBinding}
//所有EnableBinding指明的接⼝的实例化类,实现了⼯⼚bean
//⾃⼰⼜是⼀个Interceptor,产⽣代理类
public class BindableProxyFactory implements MethodInterceptor, FactoryBean<Object>, Bindable, InitializingBean
------------------------------------------------------------------
//getObject果然返回代理对象,MethodInterceptor还是this。
@Override
public synchronized Object getObject()throws Exception {
if(this.proxy == null){
ProxyFactory factory =new pe,this);
this.proxy = Proxy();
}
return this.proxy;
}
------------------------------------------------------------------
//看看channel接⼝的代理对象的⽅法,执⾏是如何的?是直接从inputHolders拿到,按名字缓存起来。
@Override
public synchronized Object invoke(MethodInvocation invocation)throws Throwable {
Method method = Method();
...//⽅法上的Input注解的名字,作为channel的名字。
Input input = AnnotationUtils.findAnnotation(method, Input.class);
if(input != null){
String name = BindingTargetName(input, method);
boundTarget =(name).getBoundTarget();
targetCache.put(method, boundTarget);
return boundTarget;
}
else{
...//output略
}
return null;
}
------------------------------------------------------------------
//从invoker⽅法中,看到代理类是根据Input.class注解的名字,从inputHolders这样⼀个map中拿到的对象。说明这个对象应该已经存在了。
//在产⽣代理类产⽣之前,即调⽤getObject()之前,早就先加载了相应的boundTarget放map中了。
//果然有afterPropertiesSet⽅法,它对input与output分别进⾏了处理,把产⽣的channel对象放到了inputHolders中。上⾯的invoke才能拿到。这句是input注解的⽅法的处理。
BindableProxyFactory.this.inputHolders.put(name,
new BoundTargetHolder(getBindingTargetFactory(returnType).createInput(name),true));
//按类型得到⼯⼚,再根据名字产⽣绑定对象。
//关于⼯⼚,第⼀个导⼊类中有这个bean,就是BindingTargetFactory,可以⽣成BindingTarget。
@Bean
public SubscribableChannelBindingTargetFactory channelFactory(
CompositeMessageChannelConfigurer compositeMessageChannelConfigurer){
return new SubscribableChannelBindingTargetFactory(compositeMessageChannelConfigurer);
}
//⼯⼚是这么产⽣绑定对象的。
SubscribableChannelBindingTargetFactory-->createInput/createOutput-->return SubscribableChannel subscribableChannel =new DirectChannel();
/
/这个steam的channel对象就是下⾯的样⼦,是可以被订阅的。最开始的例⼦说明发送/接收都可以被订阅(监听)。input与output都是这个,因为都需要⼀端发,另⼀端订阅接收。
public class DirectChannel extends AbstractSubscribableChannel
上⾯有了绑定服务,也有了绑定对象了,还缺少绑定者binder。
2.4 导⼊的BinderFactoryConfiguration.class
看名字,这个是binder的⼯⼚,有了⼯⼚,binder就肯定有了。重点看两个bean.
//这个是binder的⼯⼚
@Bean
@ConditionalOnMissingBean(BinderFactory.class)
public DefaultBinderFactory binderFactory(){
DefaultBinderFactory binderFactory =new DefaultBinderFactory(getBinderConfigurations());
binderFactory.DefaultBinder());
binderFactory.setListeners(binderFactoryListeners);
return binderFactory;
}
//------------------------------------------------------------------
//这个是binder的类型注册,从META-INF/spring.binders⽂件中来,本例中有两个,⼀个⾃⼰写的,⼀个是rabbit的。
@Bean
@ConditionalOnMissingBean(BinderTypeRegistry.class)
public BinderTypeRegistry binderTypeRegistry(ConfigurableApplicationContext configurableApplicationContext){
Map<String, BinderType> binderTypes =new HashMap<>();
...
try{
Enumeration<URL> resources = Resources("META-INF/spring.binders");
...
while(resources.hasMoreElements()){
URL url = Element();
UrlResource resource =new UrlResource(url);
for(BinderType binderType :parseBinderConfigurations(classLoader, resource)){
binderTypes.DefaultName(), binderType);
}
}
}
.
..
return new DefaultBinderTypeRegistry(binderTypes);
}
看⼀下这个binder⼯⼚:DefaultBinderFactory,以及其中最主要的getBinder⽅法。
public class DefaultBinderFactory implements BinderFactory, DisposableBean, ApplicationContextAware
//得到binder的⽅法。其中根据⽂件中的binder配置类,还产⽣了⼀个⼦容器。再从中取出Binder.class类型的bean。说明每个mq都是⼀个⼦容器当中?//⼦容器当然可以使⽤⽗容器中的对象,⽗容器也可以通过这个⼯⼚类,得到⼦容器中的binder。
private<T> Binder<T,?,?>getBinderInstance(String configurationName){
...
Properties binderProperties = Properties();
ArrayList<String> args =new ArrayList<>();
for(Map.Entry<Object, Object> property : Set()){
args.add(String.format("--%s=%s", Key(), Value()));
}
...
args.add("--spring.main.applicationContextClass="+ Name());
List<Class<?>> configurationClasses =new ArrayList<Class<?>>(
Arrays.BinderType().getConfigurationClasses()));
SpringApplicationBuilder springApplicationBuilder =new SpringApplicationBuilder()
.Array(new Class<?>[]{})).bannerMode(Mode.OFF).web(false);
if(useApplicationContextAsParent){
springApplicationBuilder.t);
}
...
ConfigurableApplicationContext binderProducingContext = springApplicationBuilder
.Array(new String[args.size()]));
@SuppressWarnings("unchecked")
Binder<T,?,?> binder = Bean(Binder.class);springcloud难学吗
...
this.binderInstanceCache.put(configurationName,new BinderInstanceHolder(binder, binderProducingContext));
}
return(Binder<T,?,?>)(configurationName).getBinderInstance();
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。