rocketmq消息指定_RocketMQ在⾯试中那些常见问题及答案
+汇总
本⽂同步Java知⾳社区,专注于Java
0、汇总
RocketMQ⼊门到⼊⼟(⼀)新⼿也能看懂的原理和实战!
RocketMQ⼊门到⼊⼟(⼆)事务消息&顺序消息
从⼊门到⼊⼟(三)RocketMQ 怎么保证的消息不丢失?
RocketMQ⼊门到⼊⼟(四)producer⽣产消息源码剖析
RocketMQ⼊门到⼊⼟(五)消息持久化存储源码解析
RocketMQ⼊门到⼊⼟(六)发消息的时候选择queue的算法有哪些?
RocketMQ⼊门到⼊⼟(七 )为什么同⼀个消费组设置不同tag会出现奇怪现象
从⼊门到⼊⼟(⼋)RocketMQ的Consumer是如何做的负载均衡的
从⼊门到⼊⼟(九)⼿摸⼿教你搭建RocketMQ双主双从同步集,不信学不会!
从⼊门到⼊⼟(⼗)RocketMQ集流程以及核⼼概念
1、说说你们公司线上⽣产环境⽤的是什么消息中间件?
见【2、多个mq如何选型?】
2、多个mq如何选型?
MQ描述RabbitMQerlang开发,对消息堆积的⽀持并不好,当⼤量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。每秒钟可以处理⼏万到⼗⼏万条消息。RocketMQjava开发,⾯向互联⽹集化功能丰富,对在线业务的响应时延做了很多的优化,⼤多数情况下可以做到毫秒级的响应,每秒钟⼤概能处理⼏⼗万条消息。KafkaScala开发,⾯向⽇志功能丰富,性能最⾼。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反⽽会⽐较⾼。所以,Kafka 不太适合在线业务场景。ActiveMQjava开发,简单,稳定,性能不如前⾯三个。⼩型系统⽤也ok,但是不推荐。推荐⽤互联⽹主流的。
3、为什么要使⽤MQ?
因为项⽬⽐较⼤,做了分布式系统,所有远程服务调⽤请求都是同步执⾏
同步执⾏经常出问题,所以引⼊了mq
作⽤描述解耦系统耦合度降低,没有强依赖关系异步不需要同步执⾏的远程调⽤可以有效提⾼响应时间削峰请求达到峰值后,后端service 还可以保持固定消费速率消费,不会被压垮
4、RocketMQ由哪些⾓⾊组成,每个⾓⾊作⽤和特点是什么?
⾓⾊作⽤Nameserver⽆状态,动态列表;这也是和zookeeper的重要区别之⼀。zookeeper是有状态的。Producer消息⽣产者,负责发消息到Broker。Broker就是MQ本⾝,负责收发消息、持久化消息等。Consumer消息消费者,负责从Broker上拉取消息进⾏消费,消费完进⾏ack。
5、RocketMQ中的Topic和JMS的queue有什么区别?
queue就是来源于数据结构的FIFO队列。⽽Topic是个抽象的概念,每个Topic底层对应N个queue,⽽数据也真实存在queue上的。
6、RocketMQ Broker中的消息被消费后会⽴即删除吗?
不会,每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度信息,当有消息消费后只是当前Consumer的消费进度(CommitLog的offset)更新了。
追问:那么消息会堆积吗?什么时候清理过期消息?
4.6版本默认48⼩时后会删除不再使⽤的CommitLog⽂件
检查这个⽂件最后访问时间
判断是否⼤于过期时间
指定时间删除,默认凌晨4点
源码如下:
/**
* {@link ketmq.store.DefaultMessageStore.CleanCommitLogService#isTimeToDelete()}
*/
private boolean isTimeToDelete() {
// when = "04";
String when = MessageStoreConfig().getDeleteWhen();
// 是04点,就返回true
if (UtilAll.isItTimeToDo(when)) {
return true;
}
// 不是04点,返回false
return false;
}
/**
* {@link ketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles()}
*/
private void deleteExpiredFiles() {
// isTimeToDelete()这个⽅法是判断是不是凌晨四点,是的话就执⾏删除逻辑。
if (isTimeToDelete()) {
// 默认是72,但是broker配置⽂件默认改成了48,所以新版本都是48。
long fileReservedTime = 48 * 60 * 60 * 1000;
deleteCount = DefaultMessageStore.thismitLog.deleteExpiredFile(72 * 60 * 60 * 1000, xx, xx, xx);
}
}
/**
* {@link ketmq.store.CommitLog#deleteExpiredFile()}
*/
public int deleteExpiredFile(xxx) {
// 这个⽅法的主逻辑就是遍历查最后更改时间+过期时间,⼩于当前系统时间的话就删了(也就是⼩于48⼩时)。
return this.mappedFileQueue.deleteExpiredFileByTime(72 * 60 * 60 * 1000, xx, xx, xx);
}
7、RocketMQ消费模式有⼏种?
消费模型由Consumer决定,消费维度为Topic。
集消费
1.⼀条消息只会被同Group中的⼀个Consumer消费
2.多个Group同时消费⼀个Topic时,每个Group都会有⼀个Consumer消费到数据
⼴播消费
消息将对⼀ 个Consumer Group 下的各个 Consumer 实例都消费⼀遍。即即使这些 Consumer 属于同⼀个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费⼀次。
8、消费消息是push还是pull?
RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采⽤的是长轮询机制
长轮询机制,即拉取⽅式 broker端属性 longPollingEnable 标记是否开启长轮询。默认开启
源码如下:
// {@link ketmq.sumer.DefaultMQPushConsumerImpl#pullMessage()}
// 看到没,这是⼀只披着⽺⽪的狼,名字叫PushConsumerImpl,实际⼲的确是pull的活。
// 拉取消息,结果放到pullCallback⾥
this.pullAPIWrapper.pullKernelImpl(pullCallback);
追问:为什么要主动拉取消息⽽不使⽤事件监听⽅式?
事件驱动⽅式是建⽴好长连接,由事件(发送数据)的⽅式来实时推送。
如果broker主动推送消息的话有可能push速度快,消费速度慢的情况,那么就会造成消息在consumer端堆积过多,同时⼜不能被其他consumer消费的情况。⽽pull的⽅式可以根据当前⾃⾝情况来pull,不会造成过多的压⼒⽽造成瓶颈。所以采取了pull的⽅式。
9、broker如何处理拉取请求的?
Consumer⾸次请求Broker
Broker中是否有符合条件的消息
有 ->
响应Consumer
等待下次Consumer的请求
没有
DefaultMessageStore#ReputMessageService#run⽅法
PullRequestHoldService 来Hold连接,每个5s执⾏⼀次检查pullRequestTable有没有消息,有的话⽴即
推送
每隔1ms检查commitLog中是否有新消息,有的话写⼊到pullRequestTable
当有新消息的时候返回请求
挂起consumer的请求,即不断开连接,也不返回数据
使⽤consumer的offset,
10、RocketMQ如何做负载均衡?
通过Topic在多Broker中分布式存储实现。
producer端
发送端指定message queue发送消息到相应的broker,来达到写⼊时的负载均衡
提升写⼊吞吐量,当多个producer同时向⼀个broker写⼊数据的时候,性能会下降
消息分布在多broker中,为负载消费做准备
默认策略是随机选择:
producer维护⼀个index
每次取节点会⾃增
index向所有broker个数取余
⾃带容错策略
其他实现:
SelectMessageQueueByHash
hash的是传⼊的args
SelectMessageQueueByRandom
SelectMessageQueueByMachineRoom 没有实现
MessageQueueSelector接⼝中的select⽅法
也可以⾃定义实现MessageQueueSelector
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
consumer端
采⽤的是平均分配算法来进⾏负载均衡。
其他负载均衡算法
平均分配策略(默认)(AllocateMessageQueueAveragely) 环形分配策略(AllocateMessageQueueAveragelyByCircle) ⼿动配置分配策略(AllocateMessageQueueByConfig) 机房分配策略(AllocateMessageQueueByMachineRoom) ⼀致性哈希分配策略(AllocateMessageQueueConsistentHash) 靠近机房策略(AllocateMachineRoomNearby)
追问:当消费负载均衡consumer和queue不对等的时候会发⽣什么?
Consumer和queue会优先平均分配,如果Consumer少于queue的个数,则会存在部分Consumer消费多个queue的情况,如果Consumer等于queue的个数,那就是⼀个Consumer消费⼀个queue,如果Consumer个数⼤于queue的个数,那么会有部分Consumer空余出来,⽩⽩的浪费了。
11、消息重复消费
重要原因是⽹络的不确定性。
影响消息正常发送和消费的重要原因是⽹络的不确定性。
引起重复消费的原因
ACK
正常情况下在consumer真正消费完消息后应该发送ack,通知broker该消息已正常消费,从queue中剔除
当ack因为⽹络原因⽆法发送到broker,broker会认为词条消息没有被消费,此后会开启消息重投机制把消息再次投递到consumer 消费模式
在CLUSTERING模式下,消息在broker中会保证相同group的consumer消费⼀次,但是针对不同group的consumer会推送多次
解决⽅案
数据库表
处理消息前,使⽤消息主键在表中带有约束的字段中insert
Map
单机时可以使⽤map ConcurrentHashMap -> putIfAbsent guava cache
Redis
分布式锁搞起来。
12、如何让RocketMQ保证消息的顺序消费
你们线上业务⽤消息中间件的时候,是否需要保证消息的顺序性?
如果不需要保证消息顺序,为什么不需要?假如我有⼀个场景要保证消息的顺序,你们应该如何保证?
⾸先多个queue只能保证单个queue⾥的顺序,queue是典型的FIFO,天然顺序。多个queue同时消费是⽆法绝对保证消息的有序性的。所以总结如下:
同⼀topic,同⼀个QUEUE,发消息的时候⼀个线程去发送消息,消费的时候 ⼀个线程去消费⼀个queue⾥的消息。
追问:怎么保证消息发到同⼀个queue?
Rocket MQ给我们提供了MessageQueueSelector接⼝,可以⾃⼰重写⾥⾯的接⼝,实现⾃⼰的算法,举个最简单的例⼦:判断i % 2 == 0,那就都放到queue1⾥,否则放到queue2⾥。
for (int i = 0; i < 5; i++) {
Message message = new Message("orderTopic", ("hello!" + i).getBytes());
producer.send(
// 要发的那条消息
message,
// queue 选择器,向 topic中的哪个queue去写消息
new MessageQueueSelector() {
// ⼿动选择⼀个queue
@Override
public MessageQueue select(
// 当前topic ⾥⾯包含的所有queue
List<MessageQueue> mqs,
// 具体要发的那条消息
Message msg,
// 对应到 send()⾥的 args,也就是2000前⾯的那个0
Object arg) {
// 向固定的⼀个queue⾥写消息,⽐如这⾥就是向第⼀个queue⾥写消息
if (Integer.String()) % 2 == 0) {
(0);
} else {
(1);
}
}
},
// ⾃定义参数:0
// 2000代表2000毫秒超时时间
i, 2000);
}
13、RocketMQ如何保证消息不丢失
⾸先在如下三个部分都可能会出现丢失消息的情况:
Producer端
Broker端
Consumer端
13.1、Producer端如何保证消息不丢失
采取send()同步发消息,发送结果是同步感知的。
发送失败后可以重试,设置重试次数。默认3次。
producer.setRetryTimesWhenSendFailed(10);
activemq和rocketmq的区别集部署,⽐如发送失败了的原因可能是当前Broker宕机了,重试的时候会发送到其他Broker上。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论