RocketMQ⾯试题
你们为什么使⽤mq?具体的使⽤场景是什么?
mq的作⽤很简单,削峰填⾕。以电商交易下单的场景来说,正向交易的过程可能涉及到创建订单、扣减库存、扣减活动预算、扣减积分等等。每个接⼝的耗时如果是100ms,那么理论上整个下单的链路就需要耗费400ms,这个时间显然是太长了。
如果这些操作全部同步处理的话,⾸先调⽤链路太长影响接⼝性能,其次分布式事务的问题很难处理,这时候像扣减预算和积分这种对实时⼀致性要求没有那么⾼的请求,完全就可以通过mq异步的⽅式去处理了。同时,考虑到异步带来的不⼀致的问题,我们可以通过job去重试保证接⼝调⽤成功,⽽且⼀般公司都会有核对的平台,⽐如下单成功但是未扣减积分的这种问题可以通过核对作为兜底的处理⽅案。
使⽤mq之后我们的链路变简单了,同时异步发送消息我们的整个系统的抗压能⼒也上升了。
那你们使⽤什么mq?基于什么做的选型?
我们主要调研了⼏个主流的mq,kafka、rabbitmq、rocketmq、activemq,选型我们主要基于以下⼏个点去考虑:activemq和rocketmq的区别
1. 由于我们系统的qps压⼒⽐较⼤,所以性能是⾸要考虑的要素。
2. 开发语⾔,由于我们的开发语⾔是java,主要是为了⽅便⼆次开发。
3. 对于⾼并发的业务场景是必须的,所以需要⽀持分布式架构的设计。
4. 功能全⾯,由于不同的业务场景,可能会⽤到顺序消息、事务消息等。
基于以上⼏个考虑,我们最终选择了RocketMQ。
Kafka RocketMQ RabbitMQ ActiveMQ
单机吞吐量10万级10万级万级万级
开发语⾔Scala Java Erlang Java
⾼可⽤分布式架构分布式架构主从架构主从架构
性能ms级ms级us级ms级
功能只⽀持主要的MQ功能顺序消息、事务消息等功能完善并发强、性能好、延时低成熟的社区产品、⽂档丰富
你上⾯提到异步发送,那消息可靠性怎么保证?
消息丢失可能发⽣在⽣产者发送消息、MQ本⾝丢失消息、消费者丢失消息3个⽅⾯。
⽣产者丢失
⽣产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中⽹络闪断MQ没收到,消息就丢失了。
由于同步发送的⼀般不会出现这样使⽤⽅式,所以我们就不考虑同步发送的问题,我们基于异步发送的场景来说。
异步发送分为两个⽅式:异步有回调和异步⽆回调,⽆回调的⽅式,⽣产者发送完后不管结果可能就会造成消息丢失,⽽通过异步发送+回调通知+本地消息表的形式我们就可以做出⼀个解决⽅案。以下单的场景举例。
1. 下单后先保存本地数据和MQ消息表,这时候消息的状态是发送中,如果本地事务失败,那么下单失败,事务回滚。
2. 下单成功,直接返回客户端成功,异步发送MQ消息
3. MQ回调通知消息发送结果,对应更新数据库MQ发送状态
4. JOB轮询超过⼀定时间(时间根据业务配置)还未发送成功的消息去重试
5. 在监控平台配置或者JOB程序处理超过⼀定次数⼀直发送不成功的消息,告警,⼈⼯介⼊。
⼀般⽽⾔,对于⼤部分场景来说异步回调的形式就可以了,只有那种需要完全保证不能丢失消息的场景我们做⼀套完整的解决⽅案。MQ丢失
如果⽣产者保证消息发送到MQ,⽽MQ收到消息后还在内存中,这时候宕机了⼜没来得及同步给从节点,就有可能导致消息丢失。
⽐如RocketMQ:
RocketMQ分为同步刷盘和异步刷盘两种⽅式,默认的是异步刷盘,就有可能导致消息还未刷到硬盘上就丢失了,可以通过设置为同步刷盘的⽅式来保证消息可靠性,这样即使MQ挂了,恢复的时候也可以从磁盘中去恢复消息。
⽐如Kafka也可以通过配置做到:
acks=all 只有参与复制的所有节点全部收到消息,才返回⽣产者成功。这样的话除⾮所有的节点都挂了,消息才会丢失。
replication.factor=N,设置⼤于1的数,这会要求每个partion⾄少有2个副本
plicas=N,设置⼤于1的数,这会要求leader⾄少感知到⼀个follower还保持着连接
retries=N,设置⼀个⾮常⼤的值,让⽣产者发送失败⼀直重试
虽然我们可以通过配置的⽅式来达到MQ本⾝⾼可⽤的⽬的,但是都对性能有损耗,怎样配置需要根据业务做出权衡。
消费者丢失
消费者丢失消息的场景:消费者刚收到消息,此时服务器宕机,MQ认为消费者已经消费,不会重复发送消息,消息丢失。
RocketMQ默认是需要消费者回复ack确认,⽽kafka需要⼿动开启配置关闭⾃动offset。
消费⽅不返回ack确认,重发的机制根据MQ类型的不同发送时间间隔、次数都不尽相同,如果重试超过次数之后会进⼊死信队列,需要⼿⼯来处理了。(Kafka没有这些)
你说到消费者消费失败的问题,那么如果⼀直消费失败导致消息积压怎么处理?
因为考虑到时消费者消费⼀直出错的问题,那么我们可以从以下⼏个⾓度来考虑:
1. 消费者出错,肯定是程序或者其他问题导致的,如果容易修复,先把问题修复,让consumer恢复正常消费
2. 如果时间来不及处理很⿇烦,做转发处理,写⼀个临时的consumer消费⽅案,先把消息消费,然后再转发到⼀个新的topic和MQ资
源,这个新的topic的机器资源单独申请,要能承载住当前积压的消息
3. 处理完积压数据后,修复consumer,去消费新的MQ和现有的MQ数据,新MQ消费完成后恢复原状
那如果消息积压达到磁盘上限,消息被删除了怎么办?
这。。。他妈都删除了我有啥办法啊。。。冷静,再想想。。有了。
最初,我们发送的消息记录是落库保存了的,⽽转发发送的数据也保存了,那么我们就可以通过这部分数据来到丢失的那部分数据,再单独跑个脚本重发就可以了。如果转发的程序没有落库,那就和消费⽅的记录去做对⽐,只是过程会更艰难⼀点。
说了这么多,那你说说RocketMQ实现原理吧?
RocketMQ由NameServer注册中⼼集、Producer⽣产者集、Consumer消费者集和若⼲Broker(RocketMQ进程)组成,它的架构原理是这样的:
1. Broker在启动的时候去向所有的NameServer注册,并保持长连接,每30s发送⼀次⼼跳
2. Producer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择⼀台服务器来发送消息
3. Conusmer消费消息的时候同样从NameServer获取Broker地址,然后主动拉取消息来消费
为什么RocketMQ不使⽤Zookeeper作为注册中⼼呢?
我认为有以下⼏个点是不使⽤zookeeper的原因:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论