RocketMQ learning IX - Broker analysis

1, Data cached by Broker

Broker mainly caches routing information, including producer table, consumer table, consumerGroup table and topic table. This information is managed in producer manager, ConsumerManager, SubscriptionGroupManager and TopicConfigManager.

//producer list
HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable
  • groupChannelTable: the surviving Producer connections in each Producer group; Time of the last heartbeat from each connected Producer
//consumer list
ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable
  • consumerTable: which Consumer connections exist in each Consumer group, which topics are subscribed to, and what filtering criteria (tags) are used for each Topic subscribed to.
//ConsumerGroup table
ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable
  • subscriptionGroupTable: the consumption behavior characteristics of each ConsumerGroup, for example: the maximum number of retries after consumption failure; Number of retry queues; If the consumption from the master broker is slow, switch to which Slave Broker for consumption
//topic list
ConcurrentMap<String, TopicConfig> topicConfigTable
  • topicConfigTable: the configuration information of each Topic slice distributed on the current Broker, such as the number of read / write queues contained; Do you have read / write permission

2, Broker startup design

  1. Create BrokerController
    The BrokerController class is created in the BrokerStartup#createBrokerController method. First, carry out reference analysis, and then create the BrokerController class, and then call its initialize method. The logic in it mainly includes:
    1) Load the configuration of topic,consumer consumption progress, subscription relationship and consumer filtering, and load the log file of the message
    2) Create a netty service to listen to the VIP port 10909
    3) Initialize a series of thread pools, and then associate these thread pools with the processor in the registerProcessor method to use different thread pools for different businesses in the future, that is, thread isolation
    4) Start some scheduled tasks, such as recording Broker status, consumption progress persistence, etc
    5) Finally, perform permission verification initialization and Rpc call hook related services. These services are loaded in the SPI mode of Java.
  2. Broker startup

     public void start() throws Exception {
         //Start tasks related to message storage
         if (this.messageStore != null) {
         //Start the broker server
         if (this.remotingServer != null) {
         //Start the netty service for the message sender
         if (this.fastRemotingServer != null) {
         //Start the service that monitors SSL connection files
         if (this.fileWatchService != null) {
         //Client to start external API
         if (this.brokerOuterAPI != null) {
         //Start the services related to the pull mode
         if (this.pullRequestHoldService != null) {
         //Start heartbeat detection service
         if (this.clientHousekeepingService != null) {
         //Start message filtering service
         if (this.filterServerManager != null) {
         //If the DLegerCommitLog is not started, register the Broker with the NameServer
         if (!messageStoreConfig.isEnableDLegerCommitLog()) {
         /*Register with namesrv*/
         this.registerBrokerAll(true, false, true);
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             public void run() {
                 try {
                     BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                 } catch (Throwable e) {
                     log.error("registerBrokerAll Exception", e);
         }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
         if (this.brokerStatsManager != null) {
         if (this.brokerFastFailure != null) {
  • messageStore service: handles logs related to message storage, such as CommitLog, ConsumeQueue, etc
  • remotingServer service: handle the request of the client producer & consumer
  • fastRemotingServer service: the default port may be multi-purpose, which may cause service congestion. A new VIP port is opened for message processing. However, it has been turned off by default after version 4.5 for the purpose of making up the previous version.
  • fileWatchService: start the service to monitor the SSL connection files used in the service connection
  • Brokerouter API service: the client when the RocketMQ console interacts with the Broker
  • pullRequestHoldService: a service that handles push mode consumption or delayed consumption
  • clientHousekeepingService: a service used for heartbeat connection
  • filterServerManager service: filter message service
  • transactionalMessageCheckService service: check and process the transaction message service regularly
  • slaveSynchronize service: master-slave routing information synchronization service
  1. Startup of netty server
    Here you can refer to the previous articles Creation of server in three major points and four minor points

Reference article:
Broker startup (2) of broker part

Keywords: Java message queue RocketMQ

Added by sandrob57 on Thu, 03 Feb 2022 16:31:58 +0200