springcloudstream3.1.2源码搭配rocketmq学习(三)(⼆)中介绍了函数的注册, 这篇介绍⼀下函数的初始化
这⽂章涉及到了⼤量响应式编程的⽅式, reactor 需要补⼀下
前⾔
这个 functionInitializer 其实是 channel 和 function bean的绑定
响应式的doOn
同步钩⼦⽅法,在subscriber触发⼀系列事件的时候触发
先来熟悉⼀下doOn系列的⽅法. 这个⽅法在subscriber的时候如果没触发对应的钩⼦, 是不会执⾏的.
热⾝
@Bean
public Function<Flux<Message<String>>, Mono<Void>> demo() {
return flux -> flux.map(message -> {
System.out.println("接收到了: " + message);
return message;
}).then();
}
@Component
static class DemoRunner implements CommandLineRunner {
@Autowired
Wrapper wrapper;
@Override
public void args) throws Exception {
InputChannel inputChannel = new InputChannel();
Flux<Message<String>> input = Flux.defer(() -> {
Sinks.Many<Message<String>> sink = Sinks.many().unicast().onBackpressureError();
System.out.println("初始化了inputChannel");
MessageHandler messageHandler = message -> {
System.out.println("处理信息");
};
inputChannel.subscribe(messageHandler);
return sink.asFlux().doOnCancel(() -> {
// ...
});
});
Mono<Void> result = wrapper.apply(input);
// 上⾯这⼀段操作等同于操作 flux 合并成了⼀个⼤的响应式
//          Mono<Void> result = Flux.defer(() -> {
//                Sinks.Many<Message<String>> sink = Sinks.many().unicast().onBackpressureError();
//                System.out.println("初始化了inputChannel");
//                MessageHandler messageHandler = message -> {
//                    System.out.println("处理信息");
//                    EmitNext((Message<String>) message);
//                };
//                inputChannel.subscribe(messageHandler);
/
/                return sink.asFlux().doOnCancel(() -> {
//                    // ...
//                });
//            }).map(message -> {
//                System.out.println("接收到了: " + message);
//                return message;
//            }).then()
//            .doOnSubscribe(message -> {
//                System.out.println("在Wrapper.apply我加⼊了");
//            });
result.subscribe();
inputChannel.handle(MessageBuilder.withPayload("aaaa").build());
}
}
static class InputChannel {
final List<MessageHandler> messageHandlers = new ArrayList<>();
public void subscribe(MessageHandler messageHandler) {
messageHandlers.add(messageHandler);
}
public void handle(Message<String> message) {
<(0).handleMessage(message);
}
}
@Component
static class Wrapper {
@Autowired
Function<Flux<Message<String>>, Mono<Void>> demo;
public Mono<Void> apply(Flux<Message<String>> input) {
System.out.println("---------");
return demo.apply(input).doOnSubscribe(message -> {
System.out.println("在Wrapper.apply我加⼊了");
});
}
}
这⼀段简单的响应式, 是functionInitializer核⼼的部分.
先组装flux然后调⽤我们注册的Bean把初始化的东西传⼊并⽣成⼀个总的响应式, 类似于合体⼀样. 上⾯注释部分的result就是最终⽣成的响应式.
functionInitializer就是把注册的Function Bean的调⽤某些注册⽅法加⼊到channel中和增加⼀些响应式的钩⼦达到统⼀处理某些信息的注册.下⾯我们⼀起来看看源码
functionInitializer
初始化了⼀个这样的Bean--new FunctionConfiguration.FunctionToDestinationBinder
public void afterPropertiesSet() throws Exception {
Map<String, BindableProxyFactory> beansOfType = BeansOfType(BindableProxyFactory.class);
}
⾸先把BindableProxyFactory.class的Bean都取出来了.
看到BindableProxyFactory是不是很熟悉, 点进去发现, 他是BindableFunctionProxyFactory的⽗类.
BindableFunctionProxyFactory是不是(⼆)中⽤definition注册的Bean.
接着我们看到下⾯的这个bindFunctionToDestinations函数
只有这个函数不是提供者的时候才能绑定函数到⽬的地
if (function != null && !function.isSupplier()) {
springcloud难学吗this.bindFunctionToDestinations(bindableProxyFactory, functionDefinition);
}
从下述代码发现inputs/outputs 就是(⼆)中注册的Input/Output
Set<String> inputBindingNames = Inputs();
Set<String> outputBindingNames = Outputs();
public Set<String> getInputs() {
return this.inputHolders.keySet();
}
我们看到其中有⼀段关键的代码
SubscribableChannel, 是不是在(⼆)中注册的DirectWithAttributesChannel的Bean.
把对应inputBindingName的取了出来并做了对应的封装.
组合成⼀个Publisher
SubscribableChannel inputChannel = (SubscribableChannel)Bean(inputBindingName, SubscribableChannel.class); ssageChannelToFlux(inputChannel);
进⼊messageChannelToFlux⽅法我们发现会调⽤adaptSubscribableChannelToPublisher
private static <T> Flux<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel
inputChannel) {
return Flux.defer(() -> {
Many<Message<T>> sink = Sinks.many().unicast().onBackpressureError();
MessageHandler messageHandler = (message) -> {
while(true) {
EmitNext(message)) {
case FAIL_NON_SERIALIZED:
case FAIL_OVERFLOW:
LockSupport.parkNanos(1000L);
break;
case FAIL_ZERO_SUBSCRIBER:
throw new IllegalStateException("The [" + sink + "] doesn't have subscribers to accept messages");
case FAIL_TERMINATED:
case FAIL_CANCELLED:
throw new IllegalStateException("Cannot emit messages into the cancelled or terminated sink for message channel: " + inputChannel);
default:
return;
}
}
};
inputChannel.subscribe(messageHandler);
return sink.asFlux().doOnCancel(() -> {
inputChannel.unsubscribe(messageHandler);
});
});
}
会发现有⼀⾏
inputChannel.subscribe(messageHandler);
把处理message的处理器注册进了inputChannel中
因为这个inputChannel就是DirectWithAttributesChannel, 所以我们直接关注到DirectWithAttributesChannel的subscibe⽅法.
MessageDispatcher dispatcher = RequiredDispatcher();
boolean added = dispatcher.addHandler(handler);
把这个handler加进了dispatcher中, 那这个dispatcher是⼀个什么呢?
我们查阅继承关系发现DirectChannel这个类初始化的时候初始化了⼀个dispathcher
public DirectChannel(@Nullable LoadBalancingStrategy loadBalancingStrategy) {
this.dispatcher = new UnicastingDispatcher();
...
}
这样 messageHandler 就注册进了DirectWithAttributesChannel的dispatcher中.
我们回到bindFunctionToDestinations中, 然后我们关注到这⼀⾏代码
Object resultPublishers = ((Function)functionToInvoke).apply(inputPublishers.length == 1 ? inputPublishers[0] : Tuples.fromArray(inputPublishers)); functionToInvoke 就是FunctionWrapper, 所以我们看看FunctionInvocationWrapper的apply⽅法
点进去看看
public Object apply(Object input) {
// ...
Object result = this.doApply(input);
// ...
return result;
}
看到doApply中, 因为我们注册的Bean是Function类型的, 所以我们直接看到 invokeFunction
发现有关键的⼀⾏ invokeFunctionAndEnrichResultIfNecessary
result = this.invokeFunctionAndEnrichResultIfNecessary(convertedInput);
private Object invokeFunctionAndEnrichResultIfNecessary(Object value) {
//...
// target就是注册的Function Bean的函数.
/
/ 在此处我们对他进⾏调⽤并把输⼊传⼊.
// intputValue是对inputChannel内的信息进⾏了处理并封装成了Message
// 想知道怎么处理的朋友可以看看源码, 就在这个函数⾥
Object result = ((Function)this.target).apply(inputValue);
//...
}
那这个target是什么呢, 这个是时候我们可以打个断点看看, 发现他就是我们注册的Function.
然后他调⽤了apply, 证明调⽤了这个⽅法, 并且传⼊了inputValue
然后我们发现functionToInvoke.apply这个函数将上述封装的inputChannel响应式进⾏传⼊, 并调⽤对应的function Bean, 得到完整的响应式函数. 合并了两段响应式函数.
这⾥的resultPublishers实际上就是我们配置的Function调⽤后的返回的值.
接着对resultPublishers进⾏判断, 是否有输出需要处理, 有的话做个doOnNext的钩⼦, 并封装对应的发送和错误处理逻辑.
没有则进⾏subscribe, 让之前的inputChannel的调⽤进⾏消费注册.
((Iterable)resultPublishers).forEach((publisher) -> {
Flux flux = Flux.from((Publisher)publisher);
if (!CollectionUtils.isEmpty(outputBindingNames)) {
//  ...发送逻辑
}
// 如果不是消费者则消费.
// 这会subscribe上⾯配置的Flux, 进⾏对应的初始化.
// 但是doOn的⽅法是钩⼦, 这边只是简单的subscribe所以不会被触发
if (!function.isConsumer()) {
flux.subscribe();
}
});
⾄此, 我们才完整的注册了⼀个Function Bean.
总结
1. 到(⼆)中注册的Bean
2. 到(⼆)中注册的对应的Input/Output的Bean
3. 将channel和这个Function bean绑定到⼀起, 并加⼊统⼀的处理⽅法
ps 响应式其实不是直接调⽤, 是配置了⼀堆东西, 等同于配置⽂件. 等到⼀个命令来的时候例如类被new的时候, 再进⾏统⼀的执⾏.
好, 这能这篇⽂章⽐较⼲, 也可能⽐较乱, 如果有不好的地⽅, 欢迎讨论改进, 谢谢!
channel的注册 --- (⼆)
function Bean 和 channel 的绑定 (三)
那是不是还缺⼀个channel 和外部消息中间件的绑定呢, 我们下⼀篇⽂章继续!
Wish.
Do.

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。