消息幂等(去重)通⽤解决⽅案,真顶!
消息中间件是分布式系统常⽤的组件,⽆论是异步化、解耦、削峰等都有⼴泛的应⽤价值。我们通常会认为,消息中间件是⼀个可靠的组件——这⾥所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会⾄少保证消息能被消费者成功消费⼀次,这是消息中间件最基本的特性之⼀,也就是我们常说的“AT LEAST ONCE”,即消息⾄少会被“成功消费⼀遍”。
举个例⼦,⼀个消息M发送到了消息中间件,消息投递到了消费程序A,A接受到了消息,然后进⾏消费,但在消费到⼀半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停⽌投递。
然⽽这种可靠的特性导致,消息可能被多次地投递。举个例⼦,还是刚刚这个例⼦,程序A接受到这个消息M并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应⽤程序A来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。
这在RockectMQ的场景来看,就是同⼀个messageId的消息重复投递下来了。
基于消息的投递可靠(消息不丢)是优先级更⾼的,所以消息不重的任务就会转移到应⽤程序⾃我实现,
这也是为什么RocketMQ的⽂档⾥强调的,消费逻辑需要⾃我实现幂等。背后的逻辑其实就是:不丢和不重是⽭盾的(在分布式场景下),但消息重复是有解决⽅案的,⽽消息丢失是很⿇烦的。
简单的消息去重解决⽅案
例如:假设我们业务的消息消费逻辑是:插⼊某张订单表的数据,然后更新库存:
insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';
要实现消息的幂等,我们可能会采取这样的⽅案:
select * from t_order where order_no = 'order123'
if(order  != null) {
return ;//消息重复,直接返回
}
这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。
并发重复消息
假设这个消费的所有代码加起来需要1秒,有重复的消息在这1秒内(假设100毫秒)内到达(例如⽣产者快速重发,Broker重启等),那么很可能,上⾯去重代码⾥⾯会发现,数据依然是空的(因为上⼀条消息还没消费完,还没成功更新订单状态),
那么就会穿透掉检查的挡板,最后导致重复的消息消费逻辑进⼊到⾮幂等安全的业务代码中,从⽽引发重复消费的问题(如主键冲突抛出异常、库存被重复扣减⽽没释放等)
并发去重的解决⽅案之⼀
要解决上⾯并发场景下的消息幂等问题,⼀个可取的⽅案是开启事务把select 改成 select for update语句,把记录进⾏锁定。
select * from t_order where order_no = 'THIS_ORDER_NO' for update  //开启事务
if(order.status != null) {
return ;//消息重复,直接返回
}
但这样消费的逻辑会因为引⼊了事务包裹⽽导致整个消息消费可能变长,并发度下降。
当然还有其他更⾼级的解决⽅案,例如更新订单状态采取乐观锁,更新失败则消息重新消费之类的。但这需要针对具体业务场景做更复杂和细致的代码开发、库表设计,不在本⽂讨论的范围。
但⽆论是select for update, 还是乐观锁这种解决⽅案,实际上都是基于业务表本⾝做去重,这⽆疑增加了业务开发的复杂度, ⼀个业务系统⾥⾯很⼤部分的请求处理都是依赖MQ的,如果每个消费逻辑本⾝都需要基于业务本⾝⽽做去重/幂等的开发的话,这是繁琐的⼯作量。本⽂希望探索出⼀个通⽤的消息幂等处理的⽅法,从⽽抽象出⼀定的⼯具类⽤以适⽤各个业务场景。
Exactly Once
在消息中间件⾥,有⼀个投递语义的概念,⽽这个语义⾥有⼀个叫”Exactly Once”,即消息肯定会被成功消费,并且只会被消费⼀次。以下是阿⾥云⾥对Exactly Once的解释:
Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理⼀次,即使⽣产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费⼀次。
在我们业务消息幂等处理的领域内,可以认为业务消息的代码肯定会被执⾏,并且只被执⾏⼀次,那么我们可以认为是Exactly Once。
但这在分布式的场景下想⼀个通⽤的⽅案⼏乎是不可能的。不过如果是针对基于数据库事务的消费逻辑,实际上是可⾏的。
基于关系数据库事务插⼊消息表
假设我们业务的消息消费逻辑是:更新MySQL数据库的某张订单表的状态:
update t_order set status = 'SUCCESS' where order_no= 'order123';
要实现Exaclty Once即这个消息只被消费⼀次(并且肯定要保证能消费⼀次),我们可以这样做:在这个数据库中增加⼀个消息消费记录表,把消息插⼊到这个表,并且把原来的订单更新和这个插⼊的动作放到同⼀个事务中⼀起提交,就能保证消息只会被消费⼀遍了。
1. 开启事务
2. 插⼊消息表(处理好主键冲突的问题)
3. 更新订单表(原消费逻辑)
4. 提交事务
说明:
1. 这时候如果消息消费成功并且事务提交了,那么消息表就插⼊成功了,这时候就算RocketMQ还没有收到消费位点的更新再次投递,
也会插⼊消息失败⽽视为已经消费过,后续就直接更新消费位点了。这保证我们消费代码只会执⾏⼀次。
2. 如果事务提交之前服务挂了(例如重启),对于本地事务并没有执⾏所以订单没有更新,消息表也没插⼊成功;⽽对于RocketMQ服
务端来说,消费位点也没更新,所以消息还会继续投递下来,投递下来发现这个消息插⼊消息表也是成功的,所以可以继续消费。这保证了消息不丢失。
基于这种⽅式,的确这是有能⼒拓展到不同的应⽤场景,因为他的实现⽅案与具体业务本⾝⽆关——⽽是依赖⼀个消息表。
但是这⾥有它的局限性
1. 消息的消费逻辑必须是依赖于关系型数据库事务。如果消费的消费过程中还涉及其他数据的修改,例如Redis这种不⽀持事务特性的数
据源,则这些数据是不可回滚的。
2. 数据库的数据必须是在⼀个库,跨库⽆法解决
注:业务上,消息表的设计不应该以消息ID作为标识,⽽应该以业务的业务主键作为标识更为合理,以应对⽣产者的重发。阿⾥云上的消息去重只是RocketMQ的messageId,在⽣产者因为某些原因⼿动重发(例如上游针对⼀个交易重复请求了)的场景下起不到去重/幂等的效果(因消息id不同)。
更复杂的业务场景
如上所述,这种⽅式Exactly Once语义的实现,实际上有很多局限性,这种局限性使得这个⽅案基本不具备⼴泛应⽤的价值。并且由于基于事务,可能导致锁表时间过长等性能问题。
例如我们以⼀个⽐较常见的⼀个订单申请的消息来举例,可能有以下⼏步(以下统称为步骤X):
1. 检查库存(RPC)
2. 锁库存(RPC)
3. 开启事务,插⼊订单表(MySQL)
4. 调⽤某些其他下游服务(RPC)
5. 更新订单状态
6. commit 事务(MySQL)
这种情况下,我们如果采取消息表+本地事务的实现⽅式,消息消费过程中很多⼦过程是不⽀持回滚的,也就是说就算我们加了事务,实际上这背后的操作并不是原⼦性的。怎么说呢,就是说有可能第⼀条⼩在经历了第⼆步锁库存的时候,服务重启了,这时候实际上库存是已经在另外的服务⾥被锁定了,这并不能被回滚。当然消息还会再次投递下来,要保证消息能⾄少消费⼀遍,换句话说,锁库存的这个RPC接⼝本⾝依旧要⽀持“幂等”。
再者,如果在这个⽐较耗时的长链条场景下加⼊事务的包裹,将⼤⼤的降低系统的并发。所以通常情况下,我们处理这种场景的消息去重的⽅法还是会使⽤⼀开始说的业务⾃⼰实现去重逻辑的⽅式,如前⾯加select for update,或者使⽤乐观锁。
那我们有没有⽅法抽取出⼀个公共的解决⽅案,能兼顾去重、通⽤、⾼性能呢?
拆解消息执⾏过程
其中⼀个思路是把上⾯的⼏步,拆解成⼏个不同的⼦消息,例如:
1. 库存系统消费A:检查库存并做锁库存,发送消息B给订单服务
2. 订单系统消费消息B:插⼊订单表(MySQL),发送消息C给⾃⼰(下游系统)消费
3. 下游系统消费消息C:处理部分逻辑,发送消息D给订单系统
4. 订单系统消费消息D:更新订单状态
注:上述步骤需要保证本地事务和消息是⼀个事务的(⾄少是最终⼀致性的),这其中涉及到分布式事务消息相关的话题,不在本⽂论述。
可以看到这样的处理⽅法会使得每⼀步的操作都⽐较原⼦,⽽原⼦则意味着是⼩事务,⼩事务则意味着使⽤消息表+事务的⽅案显得可⾏。
然⽽,这太复杂了!这把⼀个本来连续的代码逻辑割裂成多个系统多次消息交互!那还不如业务代码层⾯上加锁实现呢。
更通⽤的解决⽅案
上⾯消息表+本地事务的⽅案之所以有其局限性和并发的短板,究其根本是因为它依赖于关系型数据库的事务,且必须要把事务包裹于整个消息消费的环节。
如果我们能不依赖事务⽽实现消息的去重,那么⽅案就能推⼴到更复杂的场景例如:RPC、跨库等。
例如,我们依旧使⽤消息表,但是不依赖事务,⽽是针对消息表增加消费状态,是否可以解决问题呢?
基于消息幂等表的⾮事务⽅案
以上是去事务化后的消息幂等⽅案的流程,可以看到,此⽅案是⽆事务的,⽽是针对消息表本⾝做了状态的区分:消费中、消费完成。只有消费完成的消息才会被幂等处理掉。⽽对于已有消费中的消息,后⾯重复的消息会触发延迟消费(在RocketMQ的场景下即发送到RETRY TOPIC),之所以触发延迟消费是为了控制并发场景下,第⼆条消息在第⼀条消息没完成的过程中,去控制消息不丢(如果直接幂等,那么会丢失消息(同⼀个消息id的话),因为上⼀条消息如果没有消费完成的时候,第⼆条消息你已经告诉broker成功了,那么第⼀条消息这时候失败broker也不会重新投递了)
上⾯的流程不再细说,后⽂有github源码的地址,读者可以参考源码的实现,这⾥我们回头看看我们⼀开始想解决的问题是否解决了:
1. 消息已经消费成功了,第⼆条消息将被直接幂等处理掉(消费成功)。
2. 并发场景下的消息,依旧能满⾜不会出现消息重复,即穿透幂等挡板的问题。
3. ⽀持上游业务⽣产者重发的业务重复的消息幂等问题。
关于第⼀个问题已经很明显已经解决了,在此就不讨论了。
关于第⼆个问题是如何解决的?主要是依靠插⼊消息表的这个动作做控制的,假设我们⽤MySQL作为消息表的存储媒介(设置消息的唯⼀ID为主键),那么插⼊的动作只有⼀条消息会成功,后⾯的消息插⼊会由于主键冲突⽽失败,⾛向延迟消费的分⽀,然后后⾯延迟消费的时候就会变成上⾯第⼀个场景的问题。
关于第三个问题,只要我们设计去重的消息键让其⽀持业务的主键(例如订单号、请求流⽔号等),⽽不仅仅是messageId即可。所以也不是问题。
此⽅案是否有消息丢失的风险?
如果细⼼的读者可能会发现这⾥实际上是有逻辑漏洞的,问题出在上⾯聊到的个三问题中的第2个问题(并发场景),在并发场景下我们依赖于消息状态是做并发控制使得第2条消息重复的消息会不断延迟消费(重试)。但如果这时候第1条消息也由于⼀些异常原因(例如机器重启了、外部异常导致消费失败)没有成功消费成功呢?也就是说这时候延迟消费实际上每次下来看到的都是消费中的状态,最后消费就会被视为消费失败⽽被投递到死信Topic中(RocketMQ默认可以重复消费16次)。
有这种顾虑是正确的!对于此,我们解决的⽅法是,插⼊的消息表必须要带⼀个最长消费过期时间,例如10分钟,意思是如果⼀个消息处于消费中超过10分钟,就需要从消息表中删除(需要程序⾃⾏实现)。所以最后这个消息的流程会是这样的:
mysql删除重复的数据保留一条更灵活的消息表存储媒介
我们这个⽅案实际上没有事务的,只需要⼀个存储的中⼼媒介,那么⾃然我们可以选择更灵活的存储媒介,例如Redis。使⽤Redis有两个好处:
1. 性能上损耗更低
2. 上⾯我们讲到的超时时间可以直接利⽤Redis本⾝的ttl实现
当然Redis存储的数据可靠性、⼀致性等⽅⾯是不如MySQL的,需要⽤户⾃⼰取舍。
源码:RocketMQDedupListener
以下仅贴⼀个Readme中利⽤Redis去重的使⽤样例,⽤以意业务中如果使⽤此⼯具加⼊消息去重幂等的是多么简单:
//利⽤Redis做幂等表
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST-APP1");
consumer.subscribe("TEST-TOPIC", "*");
String appName = ConsumerGroup();// ⼤部分情况下可直接使⽤consumer group名
StringRedisTemplate stringRedisTemplate = null;// 这⾥省略获取StringRedisTemplate的过程
DedupConfig dedupConfig = ableDedupConsumeConfig(appName, stringRedisTemplate);
DedupConcurrentListener messageListener = new SampleListener(dedupConfig);
consumer.start();
以上代码⼤部分是原始RocketMQ的必须代码,唯⼀需要修改的仅仅是创建⼀个DedupConcurrentListener⽰例,在这个⽰例中指明你的消费逻辑和去重的业务键(默认是messageId)。
更多使⽤详情请参考Github上的说明。
这种实现是否⼀劳永逸?
实现到这⾥,似乎⽅案挺完美的,所有的消息都能快速的接⼊去重,且与具体业务实现也完全解耦。那么这样是否就完美的完成去重的所有任务呢?
很可惜,其实不是的。原因很简单:因为要保证消息⾄少被成功消费⼀遍,那么消息就有机会消费到⼀半的时候失败触发消息重试的可能。还是以上⾯的订单流程X:
1. 检查库存(RPC)
2. 锁库存(RPC)
3. 开启事务,插⼊订单表(MySQL)
4. 调⽤某些其他下游服务(RPC)
5. 更新订单状态
6. commit 事务(MySQL)
当消息消费到步骤3的时候,我们假设MySQL异常导致失败了,触发消息重试。因为在重试前我们会删除幂等表的记录,所以消息重试的时候就会重新进⼊消费代码,那么步骤1和步骤2就会重新再执⾏⼀遍。如果步骤2本⾝不是幂等的,那么这个业务消息消费依旧没有做好完整的幂等处理。
本实现⽅式的价值?
那么既然这个并不能完整的完成消息幂等,还有什么价值呢?价值可就⼤了!虽然这不是解决消息幂等的银弹(事实上,软件⼯程领域⾥基本没有银弹),但是他能以便捷的⼿段解决:
1.各种由于Broker、负载均衡等原因导致的消息重投递的重复问题
2.各种上游⽣产者导致的业务级别消息重复问题
3.重复消息并发消费的控制窗⼝问题,就算重复,重复也不可能同⼀时间进⼊消费逻辑
⼀些其他的消息去重的建议
也就是说,使⽤这个⽅法能保证正常的消费逻辑场景下(⽆异常,⽆异常退出),消息的幂等⼯作全部都能解决,⽆论是业务重复,还是rocketmq特性带来的重复。
事实上,这已经能解决99%的消息重复问题了,毕竟异常的场景肯定是少数的。那么如果希望异常场景下也能处理好幂等的问题,可以做以下⼯作降低问题率:
1. 消息消费失败做好回滚处理。如果消息消费失败本⾝是带回滚机制的,那么消息重试⾃然就没有副作⽤了。
2. 消费者做好优雅退出处理。这是为了尽可能避免消息消费到⼀半程序退出导致的消息重试。
3. ⼀些⽆法做到幂等的操作,⾄少要做到终⽌消费并告警。例如锁库存的操作,如果统⼀的业务流⽔锁成功了⼀次库存,再触发锁库
存,如果做不到幂等的处理,⾄少要做到消息消费触发异常(例如主键冲突导致消费异常等)
4. 在#3做好的前提下,做好消息的消费监控,发现消息重试不断失败的时候,⼿动做好#1的回滚,使得下次重试消费成功。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。