admin 管理员组文章数量: 887021
RocketMQ
本文基于RocketMQ 4.7.1版本
本文将从整体上分析broker如何启动。
文章目录
- 一、broker启动
- 1、createBrokerController()
- 2、start()
一、broker启动
broker的启动类是BrokerStartup,其main方法如下:
public static void main(String[] args) {start(createBrokerController(args));}
createBrokerController()用于创建BrokerController控制器,start()用于启动BrokerController。
BrokerController是broker启动的核心类,它会创建一些处理器和管理器,并且持有四大配置对象。它创建的管理器有生产者管理器、消费者管理器、broker状态管理器、主题配置管理器等,这些管理器在以后使用到的时候在介绍。
下面分别来看这两个方法的逻辑。
1、createBrokerController()
下图是createBrokerController方法的处理流程:
下面是对上图内容的一些解析。
broker对外提供服务的端口号不是可配置的,是代码中写死的,必须是10911。
因为broker的配置非常多,所以rocketmq将这些配置做了一下归类,分散到四个不同的配置类中:BrokerConfig属于broker的基本配置,NettyServerConfig和NettyClientConfig用于设置网络相关的配置,比如对外发布服务的端口号就是在NettyServerConfig中设置,MessageStoreConfig用于设置存储相关的配置,比如消息存储路径、日志路径。
如果当前的broker是master,那么其broker id必须是0。
rocketmq在目录/store/config目录下维护了几个非常重要的json文件:
- 主题配置文件是在个人目录下的/store/config目录中,文件名为topics.json,该文件中保存主题名、读队列个数、写队列个数、是否只读。文件解析完后,将数据存储到主题配置管理器TopicConfigManager的topicConfigTable属性中。
- 消费者位移文件(consumerOffset.json)记录主题在每个消费组下每个读队列的消费位移,比如:
上图显示了消费组consumer-A消费了主题topicTest11的四个读队列的消息,value里面显示了四个读队列的位移。该文件由ConsumerOffsetManager处理,解析后的数据保存在该对象的offsetTable属性中。 - 订阅组文件(subscriptionGroup.json),也叫作消费组文件,里面保存了消费组的配置信息,比如消费组名、消费组是否可以消费消息、重试队列个数、最大重试次数等。该文件由SubscriptionGroupManager处理,解析后数据保存在该对象的subscriptionGroupTable属性中。
- 消费者过滤器配置文件(ConsumerFilterManager),记录了每个主题下消费组配置的消息过滤条件,解析后数据保存在ConsumerFilterManager的filterDataByTopic属性中。
消息存储器插件可以在配置文件里面由参数messageStorePlugIn设置,值使用全限定类名,多个插件之间使用“,”分隔,插件类需要继承抽闲类AbstractPluginMessageStore,该类实现了MessageStore接口,DefaultMessageStore也实现了MessageStore接口,当存储消息时,rocketmq将多个插件按照配置依次调用,最后调用DefaultMessageStore将消息存储到文件,因此插件提供了一种在消息存储前后修改数据的功能。
上图中的数据恢复并不是系统宕机后的恢复,这里的恢复是指读取文件来恢复内存数据,使内存数据尽可能恢复到停机前的现场,比如在停机前内存会记录消息文件的下一个写入位置,重启后这个数据就会丢失,那么需要重新读取消息文件,找到消息最后的写入位置,在读取期间,还会处理消息索引,处理读队列的数据。
2、start()
本方法主要是调用BrokerController.start()方法来启动控制器,BrokerController.start()又会启动一系列的组件,下面来看一下BrokerController.start()的代码:
public void start() throws Exception {if (this.messageStore != null) {//启动DefaultMessageStore,该类会对/store/lock文件加锁,//确保在broker运行期间只有一个broker实例操作/store目录this.messageStore.start();}//启动Netty监听10911端口,可以对外提供服务if (this.remotingServer != null) {this.remotingServer.start();}if (this.fastRemotingServer != null) {//监听10909端口,作用未知this.fastRemotingServer.start();}if (this.fileWatchService != null) {//fileWatchService与TLS有关,本文暂不对TLS解析this.fileWatchService.start();}if (this.brokerOuterAPI != null) {//启动客户端Netty,broke使用该对象对外发送数据,比如向nameserver注册主题信息this.brokerOuterAPI.start();}if (this.pullRequestHoldService != null) {//作用未知this.pullRequestHoldService.start();}if (this.clientHousekeepingService != null) {//作用未知this.clientHousekeepingService.start();}if (this.filterServerManager != null) {//作用未知this.filterServerManager.start();}if (!messageStoreConfig.isEnableDLegerCommitLog()) {//处理HAstartProcessorByHa(messageStoreConfig.getBrokerRole());//启动定时任务,定时与slave机器同步数据,同步的内容包括配置,消费位移等handleSlaveSynchronize(messageStoreConfig.getBrokerRole());//向所有的nameserver发送本机所有的主题数据,//包括主题名、读队列个数、写队列个数、队列权限、是否有序等this.registerBrokerAll(true, false, true);}this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {//定时任务,每过一段时间向nameserver注册一次主题信息@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();//空实现}//启动定时任务,用于对收到的请求做流控,//如果发现broker收到的请求在指定的时间内无法处理完成,默认是5s,//那么会向请求方返回错误信息,告知broker正忙,请稍后重试//涉及到的请求有:生产者发送的消息、消费者拉去消息的请求、心跳请求、事务结束请求if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}
BrokerController.start()方法主要是启动一些组件,其主要作用是:
- 监听10911端口,接收生产者和消费者请求;
- 向nameserver发送本机所有的主题数据,并启动定时任务。
本文标签: RocketMQ
版权声明:本文标题:RocketMQ 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/jishu/1686930988h48533.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论