1, Data cached by Broker
Broker mainly caches routing information, including producer table, consumer table, consumerGroup table and topic table. This information is managed in producer manager, ConsumerManager, SubscriptionGroupManager and TopicConfigManager.
ProducerManager //producer list HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable
- groupChannelTable: the surviving Producer connections in each Producer group; Time of the last heartbeat from each connected Producer
ConsumerManager //consumer list ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable
- consumerTable: which Consumer connections exist in each Consumer group, which topics are subscribed to, and what filtering criteria (tags) are used for each Topic subscribed to.
SubscriptionGroupManager //ConsumerGroup table ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable
- subscriptionGroupTable: the consumption behavior characteristics of each ConsumerGroup, for example: the maximum number of retries after consumption failure; Number of retry queues; If the consumption from the master broker is slow, switch to which Slave Broker for consumption
TopicConfigManager //topic list ConcurrentMap<String, TopicConfig> topicConfigTable
- topicConfigTable: the configuration information of each Topic slice distributed on the current Broker, such as the number of read / write queues contained; Do you have read / write permission
2, Broker startup design
- Create BrokerController
The BrokerController class is created in the BrokerStartup#createBrokerController method. First, carry out reference analysis, and then create the BrokerController class, and then call its initialize method. The logic in it mainly includes:
1) Load the configuration of topic,consumer consumption progress, subscription relationship and consumer filtering, and load the log file of the message
2) Create a netty service to listen to the VIP port 10909
3) Initialize a series of thread pools, and then associate these thread pools with the processor in the registerProcessor method to use different thread pools for different businesses in the future, that is, thread isolation
4) Start some scheduled tasks, such as recording Broker status, consumption progress persistence, etc
5) Finally, perform permission verification initialization and Rpc call hook related services. These services are loaded in the SPI mode of Java. Broker startup
public void start() throws Exception { //Start tasks related to message storage if (this.messageStore != null) { this.messageStore.start(); } //Start the broker server if (this.remotingServer != null) { this.remotingServer.start(); } //Start the netty service for the message sender if (this.fastRemotingServer != null) { this.fastRemotingServer.start(); } //Start the service that monitors SSL connection files if (this.fileWatchService != null) { this.fileWatchService.start(); } //Client to start external API if (this.brokerOuterAPI != null) { this.brokerOuterAPI.start(); } //Start the services related to the pull mode if (this.pullRequestHoldService != null) { this.pullRequestHoldService.start(); } //Start heartbeat detection service if (this.clientHousekeepingService != null) { this.clientHousekeepingService.start(); } //Start message filtering service if (this.filterServerManager != null) { this.filterServerManager.start(); } //If the DLegerCommitLog is not started, register the Broker with the NameServer if (!messageStoreConfig.isEnableDLegerCommitLog()) { startProcessorByHa(messageStoreConfig.getBrokerRole()); handleSlaveSynchronize(messageStoreConfig.getBrokerRole()); } /*Register with namesrv*/ this.registerBrokerAll(true, false, true); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); if (this.brokerStatsManager != null) { this.brokerStatsManager.start(); } if (this.brokerFastFailure != null) { this.brokerFastFailure.start(); } }
- messageStore service: handles logs related to message storage, such as CommitLog, ConsumeQueue, etc
- remotingServer service: handle the request of the client producer & consumer
- fastRemotingServer service: the default port may be multi-purpose, which may cause service congestion. A new VIP port is opened for message processing. However, it has been turned off by default after version 4.5 for the purpose of making up the previous version.
- fileWatchService: start the service to monitor the SSL connection files used in the service connection
- Brokerouter API service: the client when the RocketMQ console interacts with the Broker
- pullRequestHoldService: a service that handles push mode consumption or delayed consumption
- clientHousekeepingService: a service used for heartbeat connection
- filterServerManager service: filter message service
- transactionalMessageCheckService service: check and process the transaction message service regularly
- slaveSynchronize service: master-slave routing information synchronization service
- Startup of netty server
Here you can refer to the previous articles Creation of server in three major points and four minor points
Reference article:
Broker startup (2) of broker part