使⽤SpringCloudStream对RabbitMq进⾏消息发送使⽤Spring Cloud Stream对RabbitMq进⾏消息发送
⽬录
⼀、Spring Cloud Stream简介
Spring Cloud Stream是Spring Cloud的组件之⼀。 它是为微服务构建消息驱动能⼒的框架。其架构图如图所⽰:
应⽤程序通过inputs和outputs与Stream中的Binder进⾏交互。Binder于中间件交互。
对中间件进⼀步封装,这样代码层⾯对中间件⽆感知。也可以动态切换中间件。
⼆、Stream 的使⽤
2.1 简单使⽤案例
1.引⼊依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-start-stream-rabbitmq</artifactId>
</dependency>
此处是使⽤RabbitMq作为中间件
2.在yaml配置中增添如下配置
spring:
rabbitmq:
password: guest
username: guest
addresses: 127.0.0.1
port: 5672 # 注意,这⾥是5672,不是访问界⾯⽤的15672,这个是默认值
这⾥取得都是默认值,如果没有改过,可以不配置此项。
3.定义Input/Output接⼝,⽤于注册Bean
public interface StreamClient { //消息接受发送接⼝
@Input("testMessage")
SubscribableChannel input(); //⽤于接受消息
@Output("testMessage")
MessageChannel output(); //⽤于发送消息
}
在⽼版本中,如果名称⼀致,会报错,如果出现报错,可以升级成新版本。
4.定义消息接受类
@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver { //消息接受类
@StreamListener("testMessage2") //监听testMessage这个消息队列, StreamClient类中必须定义相应的Input。 public void receiver(Object message){
System.out.println("接收到消息:"+message);
}
}
5.定义消息发送Controller,测试消息发送
@RestController
public class StreamController {
@Autowired
private StreamClient streamClient;
@GetMapping("/sendMessage")
public void send(){
//ssaging.support.MessageBuilder;
streamClient.output().send(MessageBuilder.withPayload("it is test message.").build()); //构建消息并且发送 }
}
6.访问sendMessage接⼝。
访问之后,可以看到消息队列中多出了⼀条消息。
testMessage.anonymous.qVYvqBtFQwyWwlt-ejib1g
同时,该程序也收到了这条消息,控制台打印了接受消息
接收到消息:it is test message.
此时会发现,如果我们多起⼏个实例,如果队列中有消息,那么监听了该队列的实例都会执⾏监听⽅法。
但是我们只想让⼀个实例执⾏即可。此时需要对实例进⾏分组
7.对队列进⾏分组,解决多个实例都接受到消息。
增加配置:
spring:
stream:
# 增加该配置,对队列进⾏分组。保证⼀个服务只有⼀台实例接受到消息。
bindings:
# 监听的消息队列的名称。
testMessage:
# 服务的名称
group: order
此时就可以发现,当向消息队列中发送消息时,只有⼀个order服务实例会接收到消息。
2.2 发送对象消息
1.将上⾯的String改为⼀个可序列化的对象即可。
2.增加content-type配置,如果不增加该配置,则在rabbitmq上,看到的是被base64编码后的不可读的东西,不利于调试。
spring:
stream:
# 增加该配置,对队列进⾏分组。保证⼀个服务只有⼀台实例接受到消息。
bindings:
# 监听的消息队列的名称。
testMessage:
# 服务的名称
group: order
sendmessage返回值# 将发送的对象消息转化为json,⽅便调试
content-type: application/json
2.3 消息处理完成之后发送响应消息
使⽤@SendTo("响应消息的消息队列")来进⾏响应消息的发送。 响应内容为该⽅法的返回值。
@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver { //消息接受类
@StreamListener("testMessage") //监听testMessage这个消息队列, StreamClient类中必须定义相应的Input。
@SendTo("responseMessage") //该注解会在消息处理完成后,向responseMessage这个队列发送消息。消息内容就是该⽅法的返回值。 public String receiver(Object message){
System.out.println("接收到消息:"+message);
return "处理消息完成"; //当消息处理完成之后,会将该返回值发送到@SendTo指定的responseMesssage消息队列中。
}
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论