RocketMq重置消费位点逻辑
系列
RocketMq broker 配置⽂件
RocketMq broker 启动流程
RocketMq broker CommitLog介绍
RocketMq broker consumeQueue介绍
RocketMq broker 重试和死信队列
RocketMq broker 延迟消息
RocketMq IndexService介绍
RocketMq 读写分离机制
RocketMq Client管理
RocketMq broker过期⽂件删除
开篇
这篇⽂章的主要⽬的是分析RocketMq根据时间戳重置某topic在某consumeGroup下的消费位点。
重置位点的执⾏顺序按照admin 到 broker 到 consumer的顺序依次触发,admin负责构建参数通知broker,broker负责查询
consumeQueue的具体位移,broker负责通知consumer进⾏位移重置。
根据时间戳查consumeQueue对应的位移,然后由broker通知consumer来持久化消费位移,最终会持久化到broker的消费位移。
重置位点操作本质上是在consumer端执⾏,consumer端负责持久化新的消费位移然后由定时任务通知broker更新消费位移。
consumer在整个位移重置过程中会设置ProcessQueue的状态为Dropped,从⽽阻断消息拉取任务ConsumeRequest的执⾏阻断消息拉取,其次会在consumer侧修改消费位移通过⼼跳通知broker修改consumer的消费位移,最后通过重新的rebalance过程开始重新消费消息。
重置命令
public class ResetOffsetByTimeCommand implements SubCommand {
@Override
public String commandName() {
return "resetOffsetByTime";
}
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("g", "group", true, "set the consumer group");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("t", "topic", true, "set the topic");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("s", "timestamp", true, "set the timestamp[now|currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]");
final RemotingCommand response = ateResponseCommand(null);
// 获取TopicConfig相关信息
TopicConfig topicConfig = TopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
return response;
}
Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
// 遍历所有的写队列并获取每个MessageQueue的消费位移
for (int i = 0; i < WriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setBrokerName(BrokerConfig().getBrokerName());
mq.setTopic(topic);
mq.setQueueId(i);
// 查询每个messageQueue查询对应的consumeQueue的消费位移consumerOffset
long consumerOffset =
ConsumerOffsetManager().queryOffset(group, topic, i);
if (-1 == consumerOffset) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("THe consumer group <%s> not exist", group));
return response;
}
// 根据时间戳查询consumeQueue的位移
long timeStampOffset;
offset命令if (timeStamp == -1) {
// 没时间戳就获取consumeQueue的最⼤位移
timeStampOffset = MessageStore().getMaxOffsetInQueue(topic, i);
} else {
// 根据时间戳查consumeQueue的最⼤位移
timeStampOffset = MessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
}
if (timeStampOffset < 0) {
log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
timeStampOffset = 0;
}
if (isForce || timeStampOffset < consumerOffset) {
offsetTable.put(mq, timeStampOffset);
} else {
offsetTable.put(mq, consumerOffset);
}
}
// RequestCode 为 RESET_CONSUMER_CLIENT_OFFSET
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timeStamp);
RemotingCommand request =
// c++ language
ResetOffsetBodyForC body = new ResetOffsetBodyForC();
List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
body.setOffsetTable(offsetList);
request.de());
} else {
// other language
ResetOffsetBody body = new ResetOffsetBody();
body.setOffsetTable(offsetTable);
request.de());
}
// 获取consumeGroup的所有consumer信息并通知位移重置
ConsumerGroupInfo consumerGroupInfo =
ConsumerManager().getConsumerGroupInfo(group);
if (consumerGroupInfo != null && !AllChannel().isEmpty()) {
ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
for (Map.Entry<Channel, ClientChannelInfo> entry : Set()) {
int version = Value().getVersion();
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论