可靠消息最终⼀致性【本地消息表、RocketMQ事务消息⽅案】⼀、可靠消息最终⼀致性事务概述
事务发起⽅(消息⽣产⽅)将消息发给消息中间件,事务参与⽅从消息中间件接收消息,事务参与⽅(消息消费⽅)和消息中间件之间都是通过⽹络通信,由于⽹络通信的不确定性会导致分布式事务问题。因此可靠消息最终⼀致性⽅案要解决以下⼏个问题:
【1】本地事务与消息发送的原⼦性问题:事务发起⽅在本地事务执⾏成功后消息必须发出去,否则就丢弃消息。即实现本地事务和消息发送的原⼦性,要么都成功,要么都失败。本地事务与消息发送的原⼦性问题是实现可靠消息最终⼀致性⽅案的关键问题。先来尝试下这种操作,先发送消息,再操作数据库:这种情况下⽆法保证数据库操作与发送消息的⼀致性,因为可能发送消息成功,据库操作失败。
1 begin transaction;
2//1.发送MQ
3//2.数据库操作
4 commit transation;
第⼆种⽅案,先进⾏数据库操作,再发送消息:这种情况下貌似没有问题,如果发送 MQ消息失败,就会
抛出异常,导致数据库事务回滚。但如果是超时异常,数据库回滚,但 MQ其实已经正常发送了,同样会导致不⼀致。
1 begin transaction;
2//1.数据库操作
3//2.发送MQ
4 commit transation;
【2】事务参与⽅接收消息的可靠性:事务参与⽅必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。
【3】消息重复消费的问题:由于步骤2的存在,若某⼀个消费节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重复消费。要解决消息重复消费的问题就要实现事务参与⽅的⽅法幂等性。
⼆、解决⽅案【本地消息表⽅案】
1 begin transaction;
2//1.新增⽤户
spring framework网络系统参数3//2.存储积分消息⽇志
4 commit transation;
【2】定时任务扫描⽇志:如何保证将消息发送给消息队列呢?经过第⼀步消息已经写到消息⽇志表中,可以启动独⽴的线程,定时对消息⽇志表中的消息进⾏扫描并发送⾄消息中间件,在消息中间件反馈发送成功后删除该消息⽇志,否则等待定时任务下⼀周期重试。
【3】消费消息:如何保证消费者⼀定能消费到消息呢?这⾥可以使⽤ MQ的ack(即消息确认)机制,消费者监听MQ,如果消费者接收到消息并且业务处理完成后向MQ 发送ack(即消息确认),此时说明消费者正常消费消息完成,MQ将不再向消费者推送消息,否则消费者会不断重试向消费者来发送消息。积分服务接收到”增加积分“消息,开始增加积分,积分增加成功后向消息中间件回应ack,否则消息中间件将重复投递此消息。由于消息会重复投递,积分服务的”增加积分“功能需要实现幂等性。
三、解决⽅案【RocketMQ事务消息⽅案】
RocketMQ 是⼀个来⾃阿⾥巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项⽬。据了解,包括阿⾥云上的消息产品以及收购的⼦公司在内,阿⾥集团的消息产品全线都运⾏
在 RocketMQ 之上,并且最近⼏年的双⼗⼀⼤促中,RocketMQ 都有抢眼表现。Apache RocketMQ 4.3之后的版本正式⽀持事务消息,为分布式事务实现提供了便利性⽀持。RocketMQ 事务消息设计主要为解决Producer 端的消息发送与本地事务执⾏的原⼦性问题,RocketMQ 的设计中broker 与 producer 端的双向通信能⼒,使得broker 天⽣可以作为⼀个事务协调者存在;⽽ RocketMQ本⾝提供的存储机制为事务消息提供了持久化能⼒;RocketMQ 的⾼可⽤机制以及可靠消息设计则为事务消息在系统发⽣异常时依然能够保证达成事务的最终⼀致性。在 RocketMQ 4.3后实现了完整的事务消息,实际上是对本地消息表的⼀个封装,将本地消息表移动到了 MQ内部,解决Producer 端的消息发送与本地事务执⾏的原⼦性问题。
【执⾏流程如下】:为⽅便理解我们还以注册送积分的例⼦来描述整个流程。Producer 即MQ发送⽅,本例中是⽤户服务,负责新增⽤户。MQ订阅⽅即消息消费⽅,本例中是积分服务,负责新增积分。
【1】Producer 发送事务消息:Producer (MQ发送⽅)发送事务消息⾄MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅⽅)是⽆法消费到的。本例中,Producer 发送 ”增加积分消息“ 到MQ Server。
【2】MQ Server回应消息发送成功:MQ Server接收到 Producer 发送给的消息则回应发送成功。表⽰ MQ已接收到消息。
【3】Producer 执⾏本地事务:Producer 端执⾏业务代码逻辑,通过本地数据库事务控制。本例中,Producer 执⾏添加⽤户操作。
【4】消息投递:若Producer 本地事务执⾏成功则⾃动向 MQServer发送 commit消息,MQ Server接收到 Commit消息后将“增加积分消息”状态标记为可消费,此时MQ订阅⽅(积分服务)即正常消费消息。若Producer 本地事务执⾏失败则⾃动向 MQServer发送 Rollback消
息,MQ Server接收到 Rollback消息后将删除“增加积分消息”。MQ订阅⽅(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接
收消息。这⾥ ack默认⾃动回应,即程序执⾏正常则⾃动回应ack。
【5】事务回查:如果执⾏ Producer端本地事务过程中,执⾏端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer来获取事务执⾏状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。以上主⼲流程已由RocketMQ实现,对⽤户侧来说,⽤户需要分别实现本地事务执⾏以及本地事务回查⽅法,因此只需关注本地事务的执⾏状态(维护本地事务状态表)即可。RoacketMQ提供RocketMQLocalTransactionListener接⼝:
1public interface RocketMQLocalTransactionListener {
2/**发送prepare消息成功此⽅法被回调,该⽅法⽤于执⾏本地事务
3    * @param msg 回传的消息,利⽤transactionId即可获取到该消息的唯⼀Id
4    * @param arg 调⽤send⽅法时传递的参数,当send时候若有额外的参数可以传递到send⽅法中,这⾥能获取到
5    * @return返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
6*/
7    RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);
8
9/**@param msg 通过获取transactionId来判断这条消息的本地事务执⾏状态
10      * @return返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
11*/
12    RocketMQLocalTransactionState checkLocalTransaction(Message msg);
13  }
【6】发送事务消息:以下是 RocketMQ提供⽤于发送事务消息的API:
1 TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
2 producer.setNamesrvAddr("127.0.0.1:9876");
3 producer.start();
4//设置TransactionListener实现
5 producer.setTransactionListener(transactionListener);
6//发送事务消息
7 SendResult sendResult = producer.sendMessageInTransaction(msg, null);
四、RocketMQ实现可靠消息最终⼀致性事务
【业务说明】通过 RocketMQ中间件实现可靠消息最终⼀致性分布式事务,模拟两个账户的转账交易过
程。两个账户在分别在不同的银⾏(张三在 bank1、李四在 bank2),bank1、bank2是两个微服务。交易过程是,张三给李四转账指定⾦额。上述交易步骤,张三扣减⾦额与给bank2发转账消息,两个操作必须是⼀个整体性的事务。
1DROP TABLE IF EXISTS `de_duplication`;
2CREATE TABLE `de_duplication` (
3    `tx_no` varchar(64) COLLATE utf8_bin NOT NULL,
4    `create_time` datetime(0) NULL DEFAULT NULL,
5PRIMARY KEY (`tx_no`) USING BTREE
6 ) ENGINE = InnoDB CHARACTER SET= utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
【版本依赖】:在⽗⼯程中指定了rocketmq-spring-boot-starter的版本
1<dependency>
2<groupId>ketmq</groupId>
3<artifactId>rocketmq-spring-boot-starter</artifactId>
4<version>2.0.2</version>
5</dependency>
【配置rocketMQ】:在application-local.propertis 中配置 rocketMQ nameServer地址及⽣产组。
1 up = producer_bank2
2 rocketmq.name-server = 127.0.0.1:9876
【张三服务层代码】:
1import com.alibaba.fastjson.JSONObject;
2import RocketMQTemplate;
3import org.springframework.beans.factory.annotation.Autowired;
4import ssaging.Message;
5import ssaging.support.MessageBuilder;
6import org.springframework.stereotype.Service;
7import ansaction.annotation.Transactional;
8
9/**
10 * @author Administrator
11 * @version 1.0
12 **/
13 @Service
14 @Slf4j
15public class AccountInfoServiceImpl implements AccountInfoService {
16
17    @Autowired
18    AccountInfoDao accountInfoDao;
19
20    @Autowired
21    RocketMQTemplate rocketMQTemplate;
22
23
24//向mq发送转账消息
25    @Override
26public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
27
28//将accountChangeEvent转成json
29        JSONObject jsonObject =new JSONObject();
30        jsonObject.put("accountChange",accountChangeEvent);
31        String jsonString = JSONString();
32//⽣成message类型
33        Message<String> message = MessageBuilder.withPayload(jsonString).build();
34//发送⼀条事务消息
35/**
36        * String txProducerGroup ⽣产组
37        * String destination topic,
38        * Message<?> message, 消息内容
39        * Object arg 参数
40*/
41        rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1","topic_txmsg",message,null);
42
43    }
44
45//更新账户,扣减⾦额
46    @Override
47    @Transactional
48public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
49//幂等判断,txNo是在Ctroller中⽣成的 UUID,全局唯⼀
50if(accountInfoDao.TxNo())>0){
51return ;
52        }
53//扣减⾦额
54        accountInfoDao.AccountNo(),Amount() * -1);
55//添加事务⽇志
56        accountInfoDao.TxNo());
Amount() == 3){
58throw new RuntimeException("⼈为制造异常");
59        }
60    }
61 }
【张三RocketMQLocalTransactionListener】:编写 RocketMQLocalTransactionListener接⼝实现类,实现执⾏本地事务和事务回查两个⽅法。
1import com.alibaba.fastjson.JSONObject;
2import ketmq.spring.annotation.RocketMQTransactionListener;
3import RocketMQLocalTransactionListener;
4import RocketMQLocalTransactionState;
5import ssaging.Message;
6import ansaction.annotation.Transactional;
7
8/**
9 * @author Administrator
10 * @version 1.0
11 **/
12 @Component
13 @Slf4j
14//⽣产者组与发送消息时定义组相同
15 @RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg_bank1")
16public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {
17
18    @Autowired
19    AccountInfoService accountInfoService;
20
21    @Autowired
22    AccountInfoDao accountInfoDao;
23
24//事务消息发送后的回调⽅法,当消息发送给mq成功,此⽅法被回调
25    @Override
26    @Transactional
27public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
28
29try {
30//解析message,转成AccountChangeEvent
31            String messageString = new String((byte[]) Payload());
32            JSONObject jsonObject = JSONObject.parseObject(messageString);
33            String accountChangeString = String("accountChange");
34//将accountChange(json)转成AccountChangeEvent
35            AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
36//执⾏本地事务,扣减⾦额
37            accountInfoService.doUpdateAccountBalance(accountChangeEvent);
38//当返回RocketMQLocalTransactionState.COMMIT,⾃动向mq发送commit消息,mq将消息的状态改为可消费
39return RocketMQLocalTransactionState.COMMIT;
40        } catch (Exception e) {
41            e.printStackTrace();
42return RocketMQLocalTransactionState.ROLLBACK;
43        }
46    }
47
48//事务状态回查,查询是否扣减⾦额
49    @Override
50public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
51//解析message,转成AccountChangeEvent
52        String messageString = new String((byte[]) Payload());
53        JSONObject jsonObject = JSONObject.parseObject(messageString);
54        String accountChangeString = String("accountChange");
55//将accountChange(json)转成AccountChangeEvent
56        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class); 57//事务id
58        String txNo = TxNo();
59int existTx = accountInfoDao.isExistTx(txNo);
60if(existTx>0){
61return RocketMQLocalTransactionState.COMMIT;
62        }else{
63return RocketMQLocalTransactionState.UNKNOWN;
64        }
65    }
66 }
【李四服务层代码】:
1import org.springframework.stereotype.Service;
2import ansaction.annotation.Transactional;
3
4/**
5 * @author Administrator
6 * @version 1.0
7 **/
8 @Service
9 @Slf4j
10public class AccountInfoServiceImpl implements AccountInfoService {
11
12    @Autowired
13    AccountInfoDao accountInfoDao;
14
15//更新账户,增加⾦额
16    @Override
17    @Transactional
18public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
19        log.info("bank2更新本地账号,账号:{},⾦额:{}",AccountNo(),accou
21return ;
22        }
23//增加⾦额
24        accountInfoDao.AccountNo(),Amount());
25//添加事务记录,⽤于幂等
26        accountInfoDao.TxNo());
Amount() == 4){
28throw new RuntimeException("⼈为制造异常");
29        }
30    }
31 }
【MQ监听类】:通过实现 RocketMQListener接⼝监听⽬标 Topic
1import com.alibaba.fastjson.JSONObject;
2import ketmq.spring.annotation.RocketMQMessageListener;
3import RocketMQListener;
4
5/**
6 * @author Administrator
7 * @version 1.0
8 **/
9 @Component
10 @Slf4j
11 @RocketMQMessageListener(consumerGroup = "consumer_group_txmsg_bank2",topic = "topic_txmsg")
12public class TxmsgConsumer implements RocketMQListener<String> {
13
14    @Autowired
15    AccountInfoService accountInfoService;
16
17//接收消息
18    @Override
19public void onMessage(String message) {
20        log.info("开始消费消息:{}",message);
21//解析消息
22        JSONObject jsonObject = JSONObject.parseObject(message);
23        String accountChangeString = String("accountChange");
24//转成AccountChangeEvent
25        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
26//设置账号为李四的
27        accountChangeEvent.setAccountNo("2");
28//更新本地账户,增加⾦额
29        accountInfoService.addAccountInfoBalance(accountChangeEvent);
31    }
32 }
五、总结
可靠消息最终⼀致性就是保证消息从⽣产⽅经过消息中间件传递到消费⽅的⼀致性,本案例使⽤了 RocketMQ作为消息中间件,RocketMQ 主要解决了两个功能:
【1】本地事务与消息发送的原⼦性问题;
【2】事务参与⽅接收消息的可靠性;
可靠消息最终⼀致性事务适合执⾏周期长且实时性要求不⾼的场景。引⼊消息机制后,同步的事务操作变为基于消息执⾏的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。