RocketMQBroker启动流程(RocketMQ系列五)
properties在哪打开
Broker启动流程
前⾔
今天我们讲Broker的启动流程,讲完这篇以后,⼤家对Broker启动做的事情有⼀个清晰的认识,这个源码是我⾃⼰⼀点⼀点啃得,有点硬,后⾯再整理代码注释的时候,参考了⼀些
博客,我尽可能的说的清楚⼀些,我表达和总结能⼒⽐较弱,有些地⽅说的可能不清楚,⼤家可以给我留⾔,我看到了都会回复的。
源码Idea启动Broker
到Broker的启动类,BrokerStartup,然后编辑main⽅法,修改启动参数,如下图所⽰
环境⽬录会在下图所⽰的包中
具体配置:
ROCKETMQ_HOME=E:\self\rocketmq\distribution
-
C E:\self\rocketmq\distribution/f,
启动之前先修改f中nameserver的地址,idea跑nameserver⽅法⽐跑Broker还要简单,只需要添加RocketMQ的环境地址。
然后启动main⽅法,如果打印下⾯这⾏⽇志,说明启动成功了
Connected to the target VM, address:'127.0.0.1:53208', transport:'socket'
源码讲解
Broker的启动类BrokerStartup的main⽅法就是源码的⼊⼝,看起来⾮常简单,第⼀步先创建BrokerContoller,第⼆步启动这个BrokerContoller,所以我们⼀步⼀步来讲。
public static void main(String[] args){
start(createBrokerController(args));
}
创建BrokerController
BrokerController创建过程主要分为了解析命令⾏参数、初始化Broker配置、初始化BrokerController,注册JVM退出的钩⼦,我们⼀个个来看
1.解析启动命令⾏填充参数
Options options = ServerUtil.buildCommandlineOptions(new Options());
//解析命令⾏参数 -n localhost:9876  -c  -p  -h  -m
commandLine = ServerUtil.parseCmdLine("mqbroker", args,buildCommandlineOptions(options),
new PosixParser());
-n 指定nameserver地址,集以;分割 -c 指定broker的配置⽂件 -p 打印出所有的配置项 -h 打印出帮助命令 -m 打印出重要的配置项 其实在f中指定了nameserver的地址 就不需要使⽤-n命令来指定
这句代码会把我们上⾯指定的f地址给解析出来,然后去加载这个⽂件,解析就是为了下⾯这句代码做准备
//实例化broker配置
final BrokerConfig brokerConfig =new BrokerConfig();
//实例化消息持久化配置
final MessageStoreConfig messageStoreConfig =new MessageStoreConfig();
//实例化服务端配置
final NettyServerConfig nettyServerConfig =new NettyServerConfig();
//实例化客户端配置
final NettyClientConfig nettyClientConfig =new NettyClientConfig();
//..... 中间省略了很多系解析配置的代码
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);
MixAll.properties2Object(ServerUtilmandLine2Properties(commandLine), brokerConfig);
BrokerConfig⼀些重要的配置信息如下:
namesrvAddr、brokerIP1、brokerName、brokerClusterName、brokerId、transactionCheckMax(事务消息最⼤回查次数)、transactionCheckInterval(事务消息多久没收到确认开始回调)
nettyServerConfig和nettyClientConfig配置信息:
messageStoreConfig⼀些重要的配置信息:
storePathCommitLog(commitLogd地址)、mappedFileSizeCommitLog(mapperFile⼤⼩)、deleteWhen(过期消息删除时间默认凌晨四点)、fileReservedTime(消息保存时间默认48h)、brokerRole、flushDiskType
、messageDelayLevel(延迟消息延迟等级对应时间)、syncFlushTimeout(同步刷盘超时)
2.实例化BrokerController
设置BrokerController的nettyServer
设置BrokerController的nettyClient
设置BrokerController的messageStroreConfig
设置consumer的offset管理器
设置topic配置管理器TopicConfigManager(会⾃动创建系统的那些topic)
设置消息拉取请求的处理器PullMessageProcessor
设置consumer管理器consumerManager
设置producer管理器ProducerManager
设置请求的BrokerOuterAPI对外请求的api接⼝
创建⼀系列的队列、为了创建线程池准备的 eg:⼼跳、客户端管理、消息拉取等
3.BrokerController初始化
加载持久化配置⽂件
通过加载store/config/topic.json⽂件加载当前broker所有的topic(系统和⽤户创建),然后包装成
了TopicConfigSerializeWrapper,最后存到TopicConfigManager的topicConfigTable中
加载store/config/consumerOffset.json⽂件加载所有consumer的offset,最终解析成了ConsumerOffsetManager
加载store/config/subscriptionGroup.json,加载当前broker所有的订阅者,最终解析成了SubscriptionGroupManager
加载store/config/consumerFilter.json,加载consumer的过滤条件,最终解析成了ConsumerFilterManager
实例化MessageStore
创建消息到达 --> NotifyMessageArrivingListener
创建Commitlog(Commitlog/DLedgerCommitLog)
系统Commitlog如果是同步:创建GroupCommitService 如果异步:创建FlushRealTimeService
设置消息追加的回调 --> DefaultAppendMessageCallback
选择添加消息时锁的⽅式:4.0之前默认使⽤ReentrantLock,现在默认都是使⽤⾃旋锁
创建刷consumerqueue服务(1s⼀次)
创建定时删除过期的mappedFile服务(默认凌晨4点删除72⼩时未修改过的⽂件)
创建删除consumerQueue和index的mappedFile (⼩于commitlog最⼩的offset的⽂件)
创建存储层内部统计服务
创建commitlog主从同步的服务
创建延迟消息监控类,到期⾃动执⾏
从MessageStore中获取昨⽇和今⽇消息拉去的数量和发送数量
加载commitlog
延迟消息加载延迟消息的offset以及延迟等级对应的延迟时间到内存
commitlog的mappedFile⽂件到内存中 --> MappedFileQueue.mappedFiles<CopyOnWriteArrayList>
加载consumerQueue并放到内存中 --> sumeQueueTable<ConcurrentMap<String/* topic /, ConcurrentMap<Integer/ queueId */, ConsumeQueue>>>
加载index⽂件到内存
根据brokerConfig配置实例化⼀些线程池
创建⼀些定时器
⼀天执⾏⼀次记录broker⼀天的消息拉取量
5s执⾏⼀次更新consumer的offset值
1s打印⼀些关于Queue Size的⽇志 包含 Pull、 Queue、 Transaction
nameServerAddress不为空修改内存中的nameServerAddress,如果为空,2分钟向服务器拉取nameServerAddress 如果没开Dleger(主从⾃动切换功能) 如果Broker是master打印出master和slave之间的offset差
初始化事务消息的处理Service 和消息状态回查的Service
初始化访问控制列表(acl权限控制)
初始化RpcHooks
public boolean initialize()throws CloneNotSupportedException {
//加载topic  topic.json
boolean result =picConfigManager.load();
//加载consumer的offset
//加载consumer的offset
result = result &&sumerOffsetManager.load();
//加载当前broker所有的订阅者
result = result &&this.subscriptionGroupManager.load();
//加载consumer的过滤
result = result &&sumerFilterManager.load();
if(result){
try{
new ssageStoreConfig,this.ssageArrivingListener,
this.brokerConfig);
//是否打开了主从⾃动切换(delger)
if(messageStoreConfig.isEnableDLegerCommitLog()){
DLedgerRoleChangeHandler roleChangeHandler =new DLedgerRoleChangeHandler(this,(DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChang eHandler(roleChangeHandler);
}
//加载昨天和今天消息获取和发送的总数
this.brokerStats =new BrokerStats((ssageStore);
//load plugin 加载MessageStore上下⽂
MessageStorePluginContext context =new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, bro kerConfig);
}catch(IOException e){
result =false;
<("Failed to initialize", e);
}
}
//加载commitlog内容  ConsumeQueue 刷盘时间点
/**
* lod commitlog  --> MappedFileQueue CopyOnWriteArrayList<MappedFile> mappedFiles;
*                                          topic                  queueId
* load consumer queue  -->  ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> consumeQueueTable;
* storeCheckpoint  记录commitlog  consumer queue  index 最近的输盘时间点,其中只⽤该⽂件的前 24个字节,结构图: segmentfault/img/b VbpQoU?w=486&h=55
* load index --> IndexService.ArrayList<IndexFile> indexFileList
*/
result = result &&ssageStore.load();
/**
* 实例化⼀系列线程池
*/
if(result){
NettyServerConfig fastConfig =(NettyServerConfig)thistyServerConfig.clone();
fastConfig.ListenPort()-2);
this.fastRemotingServer =new NettyRemotingServer(fastConfig,this.clientHousekeepingService);
this.sendMessageExecutor =new BrokerFixedThreadPoolExecutor(
SendMessageThreadPoolNums(),
SendMessageThreadPoolNums(),
1000*60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
this.pullMessageExecutor =new BrokerFixedThreadPoolExecutor(
PullMessageThreadPoolNums(),
PullMessageThreadPoolNums(),
1000*60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
ProcessReplyMessageThreadPoolNums(),

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。