RabbitMQ消息最终⼀致性解决⽅案
RabbitMQ消息最终⼀致性解决⽅案
随着分布式服务架构的流⾏与普及,原来在单体应⽤中执⾏的多个逻辑操作,现在被拆分成了多个服务之间的远程调⽤。虽然服务化为我们的系统带来了⽔平伸缩的能⼒,然⽽随之⽽来挑战就是分布式事务问题,多个服务之间使⽤⾃⼰单独维护的数据库,它们彼此之间不在同⼀个事务中,假如A执⾏成功了,B执⾏却失败了,⽽A的事务此时已经提交,⽆法回滚,那么最终就会导致两边数据不⼀致性的问题;尽管很早之前就有基于两阶段提交的XA分布式事务,但是这类⽅案因为需要资源的全局锁定,导致性能极差;因此后⾯就逐渐衍⽣出了消息最终⼀致性、TCC等柔性事务的分布式事务⽅案,本⽂主要分析的是基于消息的最终⼀致性⽅案。
01 普通消息的处理流程
MQ消息最终⼀致性解决⽅案
消息⽣成者发送消息
MQ收到消息,将消息进⾏持久化,在存储中新增⼀条记录
返回ACK给⽣产者
MQ push 消息给对应的消费者,然后等待消费者返回ACK
如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执⾏第
6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执⾏4、5、6步骤
MQ删除消息
1.1 普通消息处理存在的⼀致性问题
我们以订单创建为例,订单系统先创建订单(本地事务),再发送消息给下游处理;如果订单创建成功,然⽽消息没有发送出去,那么下游所有系统都⽆法感知到这个事件,会出现脏数据;
public void processOrder() {
// 订单处理(业务操作)
activemq默认账号密码orderService.process();
// 发送订单处理成功消息(发送消息)
sendBizMsg ();
}
如果先发送订单消息,再创建订单;那么就有可能消息发送成功,但是在订单创建的时候却失败了,此时下游系统却认为这个订单已经创建,也会出现脏数据。
public void processOrder() {
// 发送订单处理成功消息(发送消息)
sendBizMsg ();
// 订单处理(业务操作)
orderService.process();
}
1.2 ⼀个错误的想法
此时可能有同学会想,我们可否将消息发送和业务处理放在同⼀个本地事务中来进⾏处理,如果业务消息发送失败,那么本地事务就回滚,这样是不是就能解决消息发送的⼀致性问题呢?
@Transactionnal
public void processOrder() {
try{
// 订单处理(业务操作)
orderService.process();
// 发送订单处理成功消息(发送消息)
sendBizMsg ();
}catch(Exception e){
事务回滚;
}
}
1.3 消息发送的异常情况分析
可能的情况 ⼀致性 订单处理成功,然后突然宕机,事务未提交,消息没有发送出去 ⼀致 订单处理成功,由于⽹络原因或者MQ宕机,消息没有发送出去,事务回滚 ⼀致 订单处理成功,消息发送成功,但是MQ由于其他原因,导致消息存储失败,事务回滚 ⼀致 订单处理成功,消息存储成功,但是MQ处理超时,从⽽ACK确认失败,导致发送⽅本地事务回滚 不⼀致 从上⾯的情况分析,我们可以看到,使⽤普通的处理⽅式,⽆论如何,都⽆法保证业务处理与消息发送两边的⼀致性,其根本的原因就在于:远程调⽤,结果最终可能为成功、失败、超时;⽽对于超时的情况,处理⽅最终的结果可能是成功,也可能是失败,调⽤⽅是⽆法知晓的。 笔者就曾经在项⽬中出现类似的情况,调⽤⽅先在本地写数据,然后发起RPC服务调⽤,但是处理⽅由于DB数据量⽐较⼤,导致处理超时,调⽤⽅在出现超时异常后,直接回滚本地事务,从⽽导致调⽤⽅这边没数据,⽽处理⽅那边数据却已经写⼊了,最终导致两边业务数据的不⼀致。为了保证两边数据的⼀致性,我们只能从其他地⽅寻新的突破⼝。
02 事务消息
由于传统的处理⽅式⽆法解决消息⽣成者本地事务处理成功与消息发送成功两者的⼀致性问题,因此事务消息就诞⽣了,它实现了消息⽣成者本地事务与消息发送的原⼦性,保证了消息⽣成者本地事务处理成功与消息发送成功的最终⼀致性问题。
03 事务消息处理的流程
MQ消息最终⼀致性解决⽅案
事务消息与普通消息的区别就在于消息⽣产环节,⽣产者⾸先预发送⼀条消息到MQ(这也被称为发送half消息)
MQ接受到消息后,先进⾏持久化,则存储中会新增⼀条状态为待发送的消息
然后返回ACK给消息⽣产者,此时MQ不会触发消息推送事件
⽣产者预发送消息成功后,执⾏本地事务
执⾏本地事务,执⾏完成后,发送执⾏结果给MQ
MQ会根据结果删除或者更新消息状态为可发送
如果消息状态更新为可发送,则MQ会push消息给消费者,后⾯消息的消费和普通消息是⼀样的
注意点:由于MQ通常都会保证消息能够投递成功,因此,如果业务没有及时返回ACK结果,那么就有可能造成MQ的重复消息投递问题。因此,对于消息最终⼀致性的⽅案,消息的消费者必须要对消息的消费⽀持幂等,不能造成同⼀条消息的重复消费的情况。
3.1 事务消息异常情况分析
异常情况 ⼀致性 处理异常⽅法 消息未存储,业务操作未执⾏ ⼀致 ⽆ 存储待发送消息成功,但是ACK失败,导致业务未执⾏(可能是MQ处理超时、⽹络抖动等原因) 不⼀致 MQ确认业务操作结果,处理消息(删除消息) 存储待发送消息成功,ACK成功,业务执⾏(可能成功也可能失败),但是MQ没有收到⽣产者业务处理的最终结果 不⼀致 MQ确认业务操作结果,处理消息(根据就业务处理结果,更新消息状态,如果业务执⾏成功,则投递消息,失败则删除消息) 业务处理成功,并且发送结果给MQ,但是MQ更新消息失败,导致消息状态依旧为待发送不⼀致 同上
04 ⽀持事务消息的MQ
现在⽬前较为主流的MQ,⽐如ActiveMQ、RabbitMQ、Kafka、RocketMQ等,只有RocketMQ⽀持事务消息。据笔者了解,早年阿⾥对MQ增加事务消息也是因为⽀付宝那边因为业务上的需求⽽产⽣的。因此,如果我们希望强依赖⼀个MQ的事务消息来做到消息最终⼀致性的话,在⽬前的情况下,技术选型上只能去选择RocketMQ来解决。上⾯我们也分析了事务消息所存在的异常情况,即MQ存储了待发送的消息,但是MQ⽆法感知到上游处理的最终结果。对于RocketMQ⽽⾔,它的解决⽅案⾮常的简单,就是其内部实现会有⼀个定时任务,去轮训状态为待发送的消息,然后给producer发送check请求,⽽producer必须实现⼀个check,的内容通常就是去检查与之对应的本地事务是否成功(⼀般就是查询DB),如果成功了,则MQ会将消息设置为可发送,否则就删除消息。
05 常见的问题
(1)问:如果预发送消息失败,是不是业务就不执⾏了?
答:是的,对于基于消息最终⼀致性的⽅案,⼀般都会强依赖这步,如果这个步骤⽆法得到保证,那么最终也 就不可能做到最终⼀致性了。
(2)问:为什么要增加⼀个消息预发送机制,增加两次发布出去消息的重试机制,为什么不在业务成功之后,发送失败的话使⽤⼀次重试机制?
答:如果业务执⾏成功,再去发消息,此时如果还没来得及发消息,业务系统就已经宕机了,系统重启后,根本没有记录之前是否发送过消息,这样就会导致业务执⾏成功,消息最终没发出去的情况。
(3)问:如果consumer消费失败,是否需要producer做回滚呢?
答:这⾥的事务消息,producer不会因为consumer消费失败⽽做回滚,采⽤事务消息的应⽤,其所追求的是⾼可⽤和最终⼀致性,消息消费失败的话,MQ⾃⼰会负责重推消息,直到消费成功。因此,事务消息是针对⽣产端⽽⾔的,⽽消费端,消费端的⼀致性是通过MQ的重试机制来完成的。
(4)问:如果consumer端因为业务异常⽽导致回滚,那么岂不是两边最终⽆法保证⼀致性?
答:基于消息的最终⼀致性⽅案必须保证消费端在业务上的操作没障碍,它只允许系统异常的失败,不允许业务上的失败,⽐如在你业务上抛出个NPE之类的问题,导致你消费端执⾏事务失败,那就很难做到⼀致了。
由于并⾮所有的MQ都⽀持事务消息,假如我们不选择RocketMQ来作为系统的MQ,是否能够做到消息的最终⼀致性呢?答案是可以的。
06 基于本地消息的最终⼀致性
MQ消息最终⼀致性解决⽅案
基于本地消息的最终⼀致性⽅案的最核⼼做法就是在执⾏业务操作的时候,记录⼀条消息数据到DB,并
且消息数据的记录与业务数据的记录必须在同⼀个事务内完成,这是该⽅案的前提核⼼保障。在记录完成后消息数据后,后⾯我们就可以通过⼀个定时任务到DB中去轮训状态为待发送的消息,然后将消息投递给MQ。这个过程中可能存在消息投递失败的可能,此时就依靠重试机制来保证,直到成功收到MQ的ACK确认之后,再将消息状态更新或者消息清除;⽽后⾯消息的消费失败的话,则依赖MQ本⾝的重试来完成,其最后做到两边系统数据的最终⼀致性。基于本地消息服务的⽅案虽然可以做到消息的最终⼀致性,但是它有⼀个⽐较严重的弊端,每个业务系统在使⽤该⽅案时,都需要在对应的业务库创建⼀张消息表来存储消息。针对这个问题,我们可以将该功能单独提取出来,做成⼀个消息服务来统⼀处理,因⽽就衍⽣出了我们下⾯将要讨论的⽅案。
07 独⽴消息服务的最终⼀致性
MQ消息最终⼀致性解决⽅案
独⽴消息服务最终⼀致性与本地消息服务最终⼀致性最⼤的差异就在于将消息的存储单独地做成了⼀个
RPC的服务,这个过程其实就是模拟了事务消息的消息预发送过程,如果预发送消息失败,那么⽣产者业务就不会去执⾏,因此对于⽣产者的业务⽽⾔,它是强依赖于该消息服务的。不过好在独⽴消息服务⽀持⽔平扩容,因此只要部署多台,做成HA的集模式,就能够保证其可靠性。在消息服务中,还有⼀个单独地定时任务,它会定期轮训长时间处于待发送状态的消息,通过⼀个check补偿机制来确认该消息对应的业务是否成功,如果对应的业务处理成功,则将消息修改为可发送,然后将其投递给MQ;如果业务处理失败,则将对应的消息更新或者删除即可。因此在使⽤该⽅案时,消息⽣产者必须同时实现⼀个check服务,来供消息服务做消息的确认。对于消息的消费,该⽅案与上⾯的处理是⼀样,都是通过MQ⾃⾝的重发机制来保证消息被消费。
总结:上游事务提交之后,在基于MQ的场景下就不考虑回滚了。失败的可能是由于⽹络、服务宕机所导致,⽂章中提到说业务上执⾏是⽆障碍的。如果下游服务长时间没有恢复,那么就应该设置告警,在这⾥有⼏种机制来解决⼀些⽜⽪癣类型的问题,假如上游消息始终发送失败(这种可能性基本不存在除⾮代码是假的)这种情况我们可以设置报警机制⽐如发⽣异常时可以打印⽇志,,发送邮件,将异常订单保存到数据库,这些措施可以同时⽤于下游⼀些异常订单,同时也可以在发⽣异常的时候新建⼀个异常Topic的消息提⽰,让⼈⼯来介⼊数据订正。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。