SpringCloudStream学习及实践⽂章⽬录
基本介绍
为微服务应⽤构建消息驱动能⼒的框架。
通过Spring Integration来连接消息代理中间件以实现消息事件驱动。
整合了Spring Boot和Spring Integration
⽀持的中间件:
RabbitMQ
Kafka
Springboot中使⽤
引⼊依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
其中,spring-cloud-starter-stream-rabbit等价于spring-cloud-stream-binder-rabbit
核⼼注解
@EnableBinding
⽤于指定⼀个或多个定义了@Input和@Output注解的接⼝,以此实现对消息通道(Channel)的绑定。
@Input
public interface Sink{
String INPUT = "input";
@Input("INPUT")
SubscribableChannel input()
}
绑定了⼀个输⼊消息通道
@Output
public interface Source{
String OUTPUT = "output";
@Output("OUTPUT")
MessageChannel output();
}
绑定了⼀个输出通道
上述两个接⼝都是Spring Cloud默认实现的绑定消息通道的接⼝,还有⼀个Processor,其结合Sink和Source,包括输⼊绑定和输出绑定。
EnableBinding指定多个接⼝来绑定消息通道:
@EnableBinding(value = {Sink.class, Source.class})
@StreamListener
定义在⽅法上,能将被修饰的⽅法注册为消息中间件上数据流的,属性值对应了监听的通道名。
⚠ :输⼊通道、输出通道的概念都是针对应⽤程序来说的。输⼊通道即,将消息输⼊到应⽤程序的通道;输出通道,即应⽤程序输出消息到Binder的通道。
核⼼概念
Binder(绑定器)
应⽤程序与消息中间件之间通过Binder关联,将两者进⾏隔离,使得不同消息中间件的实现细节对于应⽤来说是透明的。
这样做的好处是:增加灵活性,切换消息中间件或升级中间件时,只需切换Binder即可。
不同消息中间件通过Binder向应⽤程序暴露统⼀的Channel通道
Spring Cloud Stream中对RabbitMQ和Kafka提供了默认的Binder实现,对于其它消息中间件,可以使⽤扩展API实现Binder。
Binder配置的对象是消息中间件中的内容,如queue…
Bindings配置的对象即为@Input、@Output注解的内容
发布-订阅模式
消息投递到消息中间件时,会通过Topic⼴播给所有订阅它的消费者,消费者收到消息出发⾃⾝业务逻辑。
RabbitMQ中Topic对应Exchange,Kafka中就是Topic。
若要对⼀类消息增加⼀种处理⽅式,只需增加⼀个应⽤程序并将输⼊通道绑定到既有的Topic中即可。
@Input("[BinderName]")
SubscribableChannel input();
消费组
保障每个消息只被组内⼀个实例消费
现实的微服务架构中,为实现⾼可⽤和负载均衡,⼀个微服务会部署多个实例。
消息⽣产者发送⼀条消息给⼀个微服务时,只想被消费⼀次;应这种使⽤场景,消费组概念应运⽽⽣。
spring.cloud.stream.bindings.[input].group配置可以为应⽤指定⼀个组名,此应⽤的多个实例在接受到消息时,只有⼀个成员会真正收到消息并处理。
没有为应⽤指定消费组时,Spring Cloud Stream会默认为其分配⼀个独⽴的匿名消费组;同⼀主题下
所有应⽤都未指定组时,消息发布时,所有应⽤都会消费它(它们属于独⽴消费组)。
消息分区
为⼀些具有相同特征的消息设置每次都被同⼀个消费实例进⾏消费。
Spring Cloud Stream为分区提供了通⽤的抽象实现,为不具备分区功能的消息中间件也增加了分区功能。
server:
port: 8089
spring:
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment: #配置rabbimq连接环境
spring:
rabbitmq:
host: x
username: xxx
password: xxx
virtual-host: /
bindings:
msgSender: #⽣产者绑定,这个是消息通道的名称
destination: exchange-msgSender #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的e xchange-msg交换器。
content-type: application/json
producer:
partition-count: 2 #指定参与消息分区的消费端节点数量为2个
partition-key-expression: headers['partitionKey'] #payload.id#这个是分区表达式, 例如当表达式的值为1, 那么在订阅者的instance-index中为1的接收⽅, 将会执⾏该消息.
msgSender2: #⽣产者绑定,这个是消息通道的名称
destination: exchange-msgSender #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的e xchange-msgSender交换器。
content-type: application/json
producer:
partition-count: 2 #指定参与消息分区的消费端节点数量为2个
partition-key-expression: headers['partitionKey'] #payload.id#这个是分区表达式, 例如当表达式
的值为1, 那么在订阅者的instance-index中为1的接收⽅, 将会执⾏该消息.
使⽤详解
开启绑定功能
@Target({ ElementType.TYPE, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Configuration
@Import({ BindingBeansRegistrar.class, BinderFactoryConfiguration.class})
@EnableIntegration
public @interface EnableBinding {
/
**
* A list of interfaces having methods annotated with {@link Input} and/or
* {@link Output} to indicate binding targets.
*/
Class<?>[] value() default {};
}
上述为@EnableBinding注解源码
包含@Configuration注解,所以被它注解的类也会成为Spring的基本配置类;
包含@Import注解,加载了Spring Cloud Stream运⾏需要的⼏个基础类:
+ ChannelBindingServicrConfiguration:加载消息通道绑定必要的⼀些实例,如:处理消息通道绑定的ChannelBindingService 实例、消息类型转换器MessageConverterConfigurer、消息通道⼯⼚BindableChannelFactory等;
BindingBeansRegistrar:是ImportBeanDefinitionrigistrar的实现,主要在Spring加载Bean时被调⽤,⽤来实现加载更多的bean;此处其在加载⽤于消息驱动的基础类后,会根据@EnableBinding的value属性加载更多的类;
BinderFactoryConfiguration:Binder⼯⼚的配置,主要⽤来加载与消息中间件相关的配置信息。从应⽤⼯程的META-
INFO/spring.binders中加载针对具体消息中间件相关的配置⽂件等;
绑定消息通道
在接⼝中⽤@Input和@Output定义消息通道,其value属性可以定义消息通道名称;
@EnableBinding(value = {xxxx.class}),上述接⼝可以在此处指定,在应⽤启动时,实现对消息通道的绑定。
输出通道返回MessageChannel接⼝对象,其中定义了消息通道发送消息的⽅法;
输⼊通道需要返回SubscribableChannel接⼝对象,其扩展了MessageChannel,新增了维护消息通道订阅者的⽅法。
public interface MyChannel {
String INPUT = "input";
String OUTPUT = "output";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}
@Slf4j
@EnableBinding(value = {MyChannel.class})
public class MyReceiver {
@StreamListener(MyChannel.INPUT)
private void reveive(Object messageObject){
log.info("Reveived: {}", messageObject);
}
}
注⼊绑定接⼝
springcloud难学吗对上⾯代码中声明对MyChannel接⼝,我们可以如下调⽤
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = {DemoApplication.class})
@WebAppConfiguration
public class StreamTest {
@Autowired
private MyChannel myChannel;
@Test
public void contextLoads(){
myChannel.output().send(MessageBuilder.withPayload("From MyChannel").build());
}
}
注⼊消息通道
除了注⼊绑定接⼝,我们还可以指定消息通道进⾏注⼊:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论