RocketMQ在⾯试中那些常见问题及答案+汇总1、说说你们公司线上⽣产环境⽤的是什么消息中间件?
【多个mq如何选型?】
2、多个mq如何选型?
MQ描述
RabbitMQ erlang开发,对消息堆积的⽀持并不好,当⼤量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。每秒钟可以处理⼏万到⼗⼏万条消息。
RocketMQ java开发,⾯向互联⽹集化功能丰富,对在线业务的响应时延做了很多的优化,⼤多数情况下可以做到毫秒级的响应,每秒钟⼤概能处理⼏⼗万条消息。
Kafka Scala开发,⾯向⽇志功能丰富,性能最⾼。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反⽽会⽐较⾼。所以,Kafka 不太适合在线业务场景。
ActiveMQ java开发,简单,稳定,性能不如前⾯三个。⼩型系统⽤也ok,但是不推荐。推荐⽤互联⽹主流的。
3、为什么要使⽤MQ?
因为项⽬⽐较⼤,做了分布式系统,所有远程服务调⽤请求都是同步执⾏经常出问题,所以引⼊了mq
作⽤描述
解耦系统耦合度降低,没有强依赖关系
异步不需要同步执⾏的远程调⽤可以有效提⾼响应时间
削峰请求达到峰值后,后端service还可以保持固定消费速率消费,不会被压垮
4、RocketMQ由哪些⾓⾊组成,每个⾓⾊作⽤和特点是什么?
⾓⾊作⽤
Nameserver⽆状态,动态列表;这也是和zookeeper的重要区别之⼀。zookeeper是有状态的。
Producer消息⽣产者,负责发消息到Broker。
Broker就是MQ本⾝,负责收发消息、持久化消息等。
Consumer消息消费者,负责从Broker上拉取消息进⾏消费,消费完进⾏ack。
5、RocketMQ中的Topic和JMS的queue有什么区别?
queue就是来源于数据结构的FIFO队列。⽽Topic是个抽象的概念,每个Topic底层对应N个queue,⽽数据也真实存在queue上的。6、RocketMQ Broker中的消息被消费后会⽴即删除吗?
不会,每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度信息,当有消息消费后只是当前Consumer的消费进度(CommitLog的offset)更新了。
追问:那么消息会堆积吗?什么时候清理过期消息?
4.6版本默认48⼩时后会删除不再使⽤的CommitLog⽂件
检查这个⽂件最后访问时间
判断是否⼤于过期时间
指定时间删除,默认凌晨4点
源码如下:
/**
* {@link ketmq.store.DefaultMessageStore.CleanCommitLogService#isTimeToDelete()}
*/
private boolean isTimeToDelete() {
// when = "04";
String when = MessageStoreConfig().getDeleteWhen();
// 是04点,就返回true
if (UtilAll.isItTimeToDo(when)) {
return true;
}
// 不是04点,返回false
return false;
}activemq和rocketmq的区别
/**
* {@link ketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles()}
*/
private void deleteExpiredFiles() {
// isTimeToDelete()这个⽅法是判断是不是凌晨四点,是的话就执⾏删除逻辑。
if (isTimeToDelete()) {
// 默认是72,但是broker配置⽂件默认改成了48,所以新版本都是48。
long fileReservedTime = 48 * 60 * 60 * 1000;
deleteCount = DefaultMessageStore.thismitLog.deleteExpiredFile(72 * 60 * 60 * 1000, xx, xx, xx);
}
}
/**
* {@link ketmq.store.CommitLog#deleteExpiredFile()}
*/
public int deleteExpiredFile(xxx) {
// 这个⽅法的主逻辑就是遍历查最后更改时间+过期时间,⼩于当前系统时间的话就删了(也就是⼩于48⼩时)。
return this.mappedFileQueue.deleteExpiredFileByTime(72 * 60 * 60 * 1000, xx, xx, xx);
}
7、RocketMQ消费模式有⼏种?
消费模型由Consumer决定,消费维度为Topic。
集消费
1.⼀条消息只会被同Group中的⼀个Consumer消费
2.多个Group同时消费⼀个Topic时,每个Group都会有⼀个Consumer消费到数据
⼴播消费
消息将对⼀个Consumer Group 下的各个 Consumer 实例都消费⼀遍。即即使这些 Consumer 属于同⼀个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费⼀次。
8、消费消息是push还是pull?
RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采⽤的是长轮询机制,即拉取⽅式broker端属性 longPollingEnable 标记是否开启长轮询。默认开启
源码如下:
// {@link ketmq.sumer.DefaultMQPushConsumerImpl#pullMessage()}
// 看到没,这是⼀只披着⽺⽪的狼,名字叫PushConsumerImpl,实际⼲的确是pull的活。
// 拉取消息,结果放到pullCallback⾥
this.pullAPIWrapper.pullKernelImpl(pullCallback);
追问:为什么要主动拉取消息⽽不使⽤事件监听⽅式?
事件驱动⽅式是建⽴好长连接,由事件(发送数据)的⽅式来实时推送。
如果broker主动推送消息的话有可能push速度快,消费速度慢的情况,那么就会造成消息在consumer端堆积过多,同时⼜不能被其他consumer消费的情况。⽽pull的⽅式可以根据当前⾃⾝情况来pull,不会造成过多的压⼒⽽造成瓶颈。所以采取了pull的⽅式。
9、broker如何处理拉取请求的?
Consumer⾸次请求Broker
Broker中是否有符合条件的消息
有 ->
响应Consumer
等待下次Consumer的请求
没有
DefaultMessageStore#ReputMessageService#run⽅法
PullRequestHoldService 来Hold连接,每个5s执⾏⼀次检查pullRequestTable有没有消息,有的话⽴即推送
每隔1ms检查commitLog中是否有新消息,有的话写⼊到pullRequestTable
当有新消息的时候返回请求
挂起consumer的请求,即不断开连接,也不返回数据
使⽤consumer的offset,
10、RocketMQ如何做负载均衡?
通过Topic在多Broker中分布式存储实现。
producer端
发送端指定message queue发送消息到相应的broker,来达到写⼊时的负载均衡
提升写⼊吞吐量,当多个producer同时向⼀个broker写⼊数据的时候,性能会下降
消息分布在多broker中,为负载消费做准备
默认策略是随机选择:
producer维护⼀个index
每次取节点会⾃增
index向所有broker个数取余
⾃带容错策略
其他实现:
SelectMessageQueueByHash
hash的是传⼊的args
SelectMessageQueueByRandom
SelectMessageQueueByMachineRoom 没有实现
也可以⾃定义实现MessageQueueSelector接⼝中的select⽅法
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
consumer端
采⽤的是平均分配算法来进⾏负载均衡。
其他负载均衡算法
平均分配策略(默认)(AllocateMessageQueueAveragely) 环形分配策略(AllocateMessageQueueAveragelyByCircle) ⼿动配置分配策略(AllocateMessageQueueByConfig) 机房分配策略(AllocateMessageQueueByMachineRoom) ⼀致性哈希分配策略(AllocateMessageQueueConsistentHash) 靠近机房策略(AllocateMachineRoomNearby)
追问:当消费负载均衡consumer和queue不对等的时候会发⽣什么?
Consumer和queue会优先平均分配,如果Consumer少于queue的个数,则会存在部分Consumer消费多
个queue的情况,如果Consumer等于queue的个数,那就是⼀个Consumer消费⼀个queue,如果Consumer个数⼤于queue的个数,那么会有部分Consumer空余出来,⽩⽩的浪费了。
11、消息重复消费
影响消息正常发送和消费的重要原因是⽹络的不确定性。
引起重复消费的原因
ACK
正常情况下在consumer真正消费完消息后应该发送ack,通知broker该消息已正常消费,从queue中剔除
当ack因为⽹络原因⽆法发送到broker,broker会认为词条消息没有被消费,此后会开启消息重投机制把消息再次投递到consumer 消费模式
在CLUSTERING模式下,消息在broker中会保证相同group的consumer消费⼀次,但是针对不同group的consumer会推送多次
解决⽅案
数据库表
处理消息前,使⽤消息主键在表中带有约束的字段中insert
Map
单机时可以使⽤map ConcurrentHashMap -> putIfAbsent guava cache
Redis
分布式锁搞起来。
12、如何让RocketMQ保证消息的顺序消费
你们线上业务⽤消息中间件的时候,是否需要保证消息的顺序性?
如果不需要保证消息顺序,为什么不需要?假如我有⼀个场景要保证消息的顺序,你们应该如何保证?
⾸先多个queue只能保证单个queue⾥的顺序,queue是典型的FIFO,天然顺序。多个queue同时消费是⽆法绝对保证消息的有序性的。所以总结如下:
同⼀topic,同⼀个QUEUE,发消息的时候⼀个线程去发送消息,消费的时候⼀个线程去消费⼀个queue⾥的消息。
追问:怎么保证消息发到同⼀个queue?
Rocket MQ给我们提供了MessageQueueSelector接⼝,可以⾃⼰重写⾥⾯的接⼝,实现⾃⼰的算法,举个最简单的例⼦:判断i % 2 == 0,那就都放到queue1⾥,否则放到queue2⾥。
for (int i = 0; i < 5; i++) {
Message message = new Message("orderTopic", ("hello!" + i).getBytes());
producer.send(
// 要发的那条消息
message,
// queue 选择器,向 topic中的哪个queue去写消息
new MessageQueueSelector() {
// ⼿动选择⼀个queue
@Override
public MessageQueue select(
// 当前topic ⾥⾯包含的所有queue
List<MessageQueue> mqs,
// 具体要发的那条消息
Message msg,
// 对应到 send()⾥的 args,也就是2000前⾯的那个0
Object arg) {
// 向固定的⼀个queue⾥写消息,⽐如这⾥就是向第⼀个queue⾥写消息
if (Integer.String()) % 2 == 0) {
(0);
} else {
(1);
}
}
},
// ⾃定义参数:0
// 2000代表2000毫秒超时时间
i, 2000);
}
13、RocketMQ如何保证消息不丢失
⾸先在如下三个部分都可能会出现丢失消息的情况:
Producer端
Broker端
Consumer端
13.1、Producer端如何保证消息不丢失
采取send()同步发消息,发送结果是同步感知的。
发送失败后可以重试,设置重试次数。默认3次。
producer.setRetryTimesWhenSendFailed(10);
集部署,⽐如发送失败了的原因可能是当前Broker宕机了,重试的时候会发送到其他Broker上。
13.2、Broker端如何保证消息不丢失
修改刷盘策略为同步刷盘。默认情况下是异步刷盘的。
flushDiskType = SYNC_FLUSH
集部署,主从模式,⾼可⽤。
13.3、Consumer端如何保证消息不丢失
完全消费正常后在进⾏⼿动ack确认。
14、rocketMQ的消息堆积如何处理
下游消费系统如果宕机了,导致⼏百万条消息在消息中间件⾥积压,此时怎么处理?
你们线上是否遇到过消息积压的⽣产故障?如果没遇到过,你考虑⼀下如何应对?
⾸先要到是什么原因导致的消息堆积,是Producer太多了,Consumer太少了导致的还是说其他情况,总之先定位问题。
然后看下消息消费速度是否正常,正常的话,可以通过上线更多consumer临时解决消息堆积问题
追问:如果Consumer和Queue不对等,上线了多台也在短时间内⽆法消费完堆积的消息怎么办?
准备⼀个临时的topic
queue的数量是堆积的⼏倍
queue分布到多Broker中
上线⼀台Consumer做消息的搬运⼯,把原来Topic中的消息挪到新的Topic⾥,不做业务逻辑处理,只是挪过去
上线N台Consumer同时消费临时Topic中的数据
改bug
恢复原来的Consumer,继续消费之前的Topic
追问:堆积时间过长消息超时了?
RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时。也就是说未被消费的消息不会存在超时删除这情况。
追问:堆积的消息会不会进死信队列?
不会,消息在消费失败后会进⼊重试队列(%RETRY%+ConsumerGroup),18次(默认18次,⽹上所
有⽂章都说是16次,⽆⼀例外。但是我没搞懂为啥是16次,这不是18个时间吗?)才会进⼊死信队列(%DLQ%+ConsumerGroup)。
源码如下:
public class MessageStoreConfig {
// 每隔如下时间会进⾏重试,到最后⼀次时间重试失败的话就进⼊死信队列了。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}
15、RocketMQ在分布式事务⽀持这块机制的底层原理?
你们⽤的是RocketMQ?RocketMQ很⼤的⼀个特点是对分布式事务的⽀持,你说说他在分布式事务⽀持这块机制的底层原理?
分布式系统中的事务可以使⽤TCC(Try、Confirm、Cancel)、2pc来解决分布式系统中的消息原⼦性
RocketMQ 4.3+提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终⼀致
RocketMQ实现⽅式:
**Half Message:**预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
**检查事务状态:**Broker会开启⼀个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执⾏任务会向消息发送者确认事务执⾏状态(提交、回滚、未知),如果是未知,Broker会定时去回调在重新检查。
**超时:**如果超过回查次数,默认回滚消息。
也就是他并未真正进⼊Topic的queue,⽽是⽤了临时queue来放所谓的half message,等提交事务后才会真正的将half message转移到topic 下的queue。
16、如果让你来动⼿实现⼀个分布式消息中间件,整体架构你会如何设计实现?
我个⼈觉得从以下⼏个点回答吧:
需要考虑能快速扩容、天然⽀持集
持久化的姿势
⾼可⽤性
数据0丢失的考虑
服务端部署简单、client端使⽤简单
17、看过RocketMQ 的源码没有。如果看过,说说你对RocketMQ 源码的理解?
要真让我说,我会吐槽蛮烂的,⾸先没任何注释,可能是之前阿⾥巴巴写了中⽂注释,捐赠给apache后,apache觉得中⽂注释不能留,⾃⼰⼜懒得写英⽂注释,就都给删了。⾥⾯⽐较典型的设计模式有单例、⼯⼚、策略、门⾯模式。单例⼯⼚⽆处不在,策略印象深刻⽐如发消
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论