SpringCloudAlibaba-SpringCloudStream整合RocketMQ
Spring Cloud Stream
简介
在微服务的开发过程中,可能会经常⽤到消息中间件,通过消息中间件在服务与服务之间传递消息,不管你使⽤的是哪款消息中间件,⽐如RabbitMQ、Kafka和RocketMQ,那么消息中间件和服务之间都有⼀点耦合性,这个耦合性就是指如果我原来使⽤的RabbitMQ,现在要替换为RocketMQ,那么我们的微服务都需要修改,变动会⽐较⼤,因为这两款消息中间件有⼀些区别,如果我们使⽤Spring Cloud Stream来整合我们的消息中间件,那么这样就可以降低微服务和消息中间件的耦合性,做到轻松在不同消息中间件间切换,当然Spring Cloud Stream 官⽅只⽀持rabbitmq 和 kafka,spring cloud alibaba新写了⼀个starter可以⽀持RocketMQ;
按照官⽅的定义,Spring Cloud Stream 是⼀个构建消息驱动微服务的框架;
Spring Cloud Stream解决了开发⼈员⽆感知的使⽤消息中间件的问题,因为Spring Cloud Stream对消息中间件的进⼀步封装,可以做到代码层⾯对消息中间件的⽆感知,甚⾄于动态的切换中间件(rabbitmq切换为rocketmq或者kafka),使得微服务开发的⾼度解耦,服务可以关注更多⾃⼰的业务流程;
核⼼概念
Spring Cloud Stream 内部有⼏个概念:Binder 、Binding、input、output;
1、Binder: 跟外部消息中间件集成的组件,⽤来创建Binding,各消息中间件都有⾃⼰的 Binder 实现;
⽐如 Kafka 的实现 KafkaMessageChannelBinder,RabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ 的实现RocketMQMessageChannelBinder;
2、Binding: 包括 Input Binding 和 Output Binding;
Binding 在消息中间件与应⽤程序提供的 Provider 和 Consumer 之间提供了⼀个桥梁,实现了开发者只需使⽤应⽤程序的 Provider 或Consumer ⽣产或消费数据即可,屏蔽了开发者与底层消息中间件的接触;
3、input
应⽤程序通过input(相当于消费者consumer)与Spring Cloud Stream中Binder交互,⽽Binder负责与消息中间件交互,因此,我们只需关注如何与Binder交互即可,⽽⽆需关注与具体消息中间件的交互。
4、Output
output(相当于⽣产者producer)与Spring Cloud Stream中Binder交互;
组成说明
Binder Binder是应⽤与消息中间件之间的封装,⽬前实现了Kafka和RabbitMQ的Binder,通过Binder可以很⽅便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置⽂件来实现;
@Input该注解标识输⼊通道,通过该输⼊通道接收消息进⼊应⽤程序@Output该注解标识输出通道,发布的消息将通过该通道离开应⽤程序@StreamListener监听队列,⽤于消费者的队列的消息接收
@EnableBinding将信道channel和exchange、topic绑定在⼀起
Spring Cloud Stream 应⽤
消息⽣产者
1、创建SpringBoot应⽤31-rocket-spring-cloud-stream;
2、添加依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
配置⽂件
# 应⽤名称
spring.application.name=stream
>> RocketMQ 通⽤配置
# 客户端接⼊点,必填 -rocketmq 连接地址
spring.ketmq.binder.name-server=localhost:9876
# ⽇志级别
logging.level.alibaba.cloud.ketmq=INFO
>> Consumer Config 消费者
# input 的配置:
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.t-type=text/plain
spring.cloud.stream.up=test-group
>> Produce Config ⽣产者
# output 的配置如下: bingdings具体⽣产,消费的桥梁
spring.cloud.stream.bindings.output.destination=test-topic //⽬的地保持⼀致
spring.cloud.stream.t-type=text/plain
spring.cloud.stream.up=test-group
兼容性问题:
注意版本需要使⽤springboot2.2.5
<spring-boot.version>2.2.5.RELEASE</spring-boot.version>
<spring-cloud-alibaba.version>2.2.1.RELEASE</spring-cloud-alibaba.version>
消息发送:
@EnableBinding(Source.class)
@Service
public class SenderService {
@Autowired
private Source source;
public void send(String msg) throws Exception {
boolean flag = source.output().send(MessageBuilder.*withPayload*(msg).build());
System.*out*.println("消息发送:" + flag);
}
}
消息接收:
@EnableBinding(Sink.class)
public class ReceiveService {
@StreamListener("input")
public void receiveInput1(String receiveMsg) {
System.*out*.println("input 接收到的消息: " + receiveMsg);
}
}
可以通过调⽤SenderService中的⽅法进⾏发送信息,也可以通过在启动类中的Main⽅法中进⾏调⽤SenderService的⽅法进⾏发送信息:@SuppressWarnings("all")
@EnableBinding(value = {Source.class, Sink.class}) //使得Source⽣效
@SpringBootApplication
public class StreamApplication implements CommandLineRunner {
@Autowired
private SenderService senderService;
@Autowired
private ReceiveService receiveService;
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class, args);
}
//在main中调⽤发送信息⽅法
@Override
public void args) throws Exception {
senderService.send("hello rocketmq");
}
}
Spring Cloud Stream⾃定义信道
在前⾯的案例中,我们已经实现了⼀个基础的 Spring Cloud Stream 消息传递处理操作,但在操作之中使⽤的是系统提供的 Source (output)、Sink(input),接下来我们来看⼀下⾃定义信道名称;
public interface MySource {
String OUTPUT1 = "output1";
@Output(MySource.OUTPUT1)
MessageChannel output1();
String OUTPUT2 = "output2";
@Output(MySource.OUTPUT2)
MessageChannel output2();
}
public interface MySink {
String INPUT1 = "input1";
@Input(MySink.INPUT1)
SubscribableChannel input1();
String INPUT2 = "input1";
@Input(MySink.INPUT2)
SubscribableChannel input2();
}
server.port=8090
# 应⽤名称
spring.application.name=stream
>> RocketMQ 通⽤配置
# 客户端接⼊点,必填 -rocketmq 连接地址
spring.ketmq.binder.name-server=localhost:9876
# ⽇志级别
logging.level.alibaba.cloud.ketmq=INFO
>> Consumer Config 消费者
# input 的配置:
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.t-type=text/plain
spring.cloud.stream.up=test-group
>> Produce Config ⽣产者
# output 的配置如下: bingdings具体⽣产,消费的桥梁
spring.cloud.stream.bindings.output.destination=test-topic //⽬的地保持⼀致
springcloud和springboot
spring.cloud.stream.t-type=text/plain
spring.cloud.stream.up=test-group
>> ⾃定义
# input 的配置:
spring.cloud.stream.bindings.input1.destination=test-topic1
spring.cloud.stream.t-type=text/plain
spring.cloud.stream.up=test-group1
# output 的配置:
spring.cloud.stream.bindings.output1.destination=test-topic1
spring.cloud.stream.t-type=text/plain
spring.cloud.stream.up=test-group1
SpringCloudStream RocketMQ事务消息
Apache RocketMQ在4.3.0版中已经⽀持分布式事务消息,这⾥RocketMQ采⽤了2PC的思想来实现了提交事务消息,同时增加⼀个补偿逻辑来处理⼆阶段超时或者失败的消息,如下图所⽰:
上图说明了事务消息的⼤致⽅案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程;
1.事务消息发送及提交:
(1) 发送消息(half消息);
(2) 服务端响应消息写⼊结果;
(3) 根据发送结果执⾏本地事务(如果写⼊失败,此时half消息对业务不可见,本地逻辑不执⾏);
(4) 根据本地事务状态执⾏Commit或者Rollback(Commit操作⽣成消息索引,消息对消费者可见)
2.补偿流程:
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起⼀次“回查”;
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态;
(3) 根据本地事务状态,重新Commit或者Rollback;
其中,补偿阶段⽤于解决消息Commit或者Rollback发⽣超时或者失败的情况;
事务消息⼀共有三种状态:提交状态、回滚状态、中间状态;
TransactionStatus.CommitTransaction: 提交事务,代表消费者可以消费此消息;TransactionStatus.RollbackTransaction: 回滚事务,代表消息将被删除,不能被消费;TransactionStatus.Unknown: 中间状态,代表需要检查消息队列来确定状态;
MQ内部逻辑:
package com.springcloud.stream.stream.Transaction;
import ketmq.spring.annotation.RocketMQTransactionListener;
import RocketMQLocalTransactionListener;
import RocketMQLocalTransactionState;
import ssaging.Message;
//MQ接收,并根据结果运⾏内部逻辑
@SuppressWarnings("all")
@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10) public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
/**
* 执⾏本地事务:也就是执⾏本地业务逻辑
*
* @param msg
* @param arg
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object num = Headers().get("test");
if ("1".equals(num)) {
System.out.println("executer: " + new String((byte[]) Payload()) + " unknown");
return RocketMQLocalTransactionState.UNKNOWN;
}
else if ("2".equals(num)) {
System.out.println("executer: " + new String((byte[]) Payload()) + " rollback");
return RocketMQLocalTransactionState.ROLLBACK;
}
System.out.println("executer: " + new String((byte[]) Payload()) + " commit");
return RocketMQLocalTransactionState.COMMIT;
}
/**
* 回调检查
*
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println("check: " + new String((byte[]) Payload()));
return RocketMQLocalTransactionState.COMMIT;
}
}
消息发送:
@Component
public class Sender {
@Autowired
private MySource mySource;
public <T> void sendTransactionalMsg(T msg ,int num) throws Exception{
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader("test",String.valueOf(num));
//.setHeader(RocketMQHeaders.TAGS,"binder");
Message message = builder.build();
mySource.outputTX().send(message);
}
}
⾃定义信道-重写Source
public interface MySource {
String OUTPUTTX = "outputTX";
@Output(MySource.OUTPUTTX)
MessageChannel outputTX();
}
⾃定义信道-重写Sink
public interface MySink {
String INPUTTX = "inputTX";
@Input(MySink.INPUTTX)
SubscribableChannel inputTX();
}
消费者接收消息:
@EnableBinding({MySink.class})
public class ReceiveService {
//spring cloud stream ⾥⾯发消息通过sink发送
@Autowired
private MySink mySink;
//消费者端接收到的消息
@StreamListener("inputTX")
public void receiveTransactionMessage(String receiveMsg) {
System.out.println("Transaction_input 接收到的消息: " + receiveMsg); }
}
Spring Cloud Stream RocketMQ 配置选项RocketMQ Binder Properties
*spring.ketmq.binder.name-server*
RocketMQ NameServer 地址(⽼版本使⽤ namesrv-addr 配置项);Default: 127.0.0.1:9876.
*spring.ketmq.binder.access-key*
阿⾥云账号 AccessKey。
Default: null.
*spring.ketmq.binder.secret-key*
阿⾥云账号 SecretKey。
Default: null.
*spring.able-msg-trace*
是否为 Producer 和 Consumer 开启消息轨迹功能
Default: true.
*spring.ketmq.binder.customized-trace-topic*
消息轨迹开启后存储的 topic 名称。
Default: RMQ_SYS_TRACE_TOPIC.
RocketMQ Consumer
Properties
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
推荐文章
热门文章
-
m函数数字提取
2025-01-07 -
jest断言方法大全
2025-01-07 -
中兴ZXSEC US 管理员手册
2025-01-07 -
keras系列(一):参数设置
2025-01-07 -
Qt从QString中提取出数字
2025-01-07 -
element input 金额千分位格式化
2025-01-07 -
freemaker 参数解析正则
2025-01-07 -
C#正则验证数字
2025-01-07 -
form表单验证正则
2025-01-07 -
scanf正则表达式用法
2025-01-07 -
grafana value的正则表达式
2025-01-07 -
Android平台浮点数运算应用
2025-01-07 -
js-(JS正则表达式验证数字)
2025-01-07 -
判断Python输入是否是整数,字符,或浮点数
2025-01-07 -
c语言 sscanf 正则规则
2025-01-07 -
从文本中提取数值技巧
2025-01-07 -
js将整数转换成两位浮点数的方法
2025-01-07 -
vue正则限制浮点数
2025-01-07 -
8到20的结尾的正则
2025-01-07 -
shell 正则表达式 最后一行
2025-01-07
最新文章
-
应用程序的安全检测方法、装置、电子设备和存储介质
2025-01-07 -
VBA之正则表达式(1)--基础篇
2025-01-07 -
代码编辑的辅助方法、装置及电子设备
2025-01-07 -
SHELL查字符串中包含字符的命令
2025-01-07 -
String方法中replace和replaceAll的区别详解(源码分析)
2025-01-07 -
双字节符号正则
2025-01-07
发表评论