1, Introduction to RocketMQ architecture
1.1 logical deployment diagram
(pictures from the Internet)
1.2 description of core components
As can be seen from the above figure, RocketMQ's core components mainly include four, namely NameServer, Broker, Producer and Consumer. Let's briefly explain these four core components in turn:
NameServer: NameServer acts as a provider of routing information. Producers or consumers can find the corresponding Broker IP list of each Topic through NameServer. Multiple Namesrver instances form a cluster, but they are independent of each other and have no information exchange.
Broker: message relay role, responsible for storing and forwarding messages. The broker is responsible for sending and receiving messages from the broker server at the same time. The broker server also stores metadata related to messages, including consumer groups, consumption progress offsets, and subject and queue messages.
Friends who need the framework source code can see my personal profile and contact me. Recommended distributed architecture source code
Producer: responsible for producing messages. Generally, the business system is responsible for producing messages. A message producer will send the messages generated in the business application system to the Broker server. RocketMQ provides a variety of sending methods, including synchronous sending, asynchronous sending, sequential sending and one-way sending. Both synchronous and asynchronous methods require the Broker to return confirmation information, and one-way transmission is not required.
Consumer: it is responsible for consuming messages. Generally, the background system is responsible for asynchronous consumption. A message consumer pulls messages from the Broker server and provides them to the application. From the perspective of user application, it provides two consumption forms: pull consumption and push consumption.
In addition to the three core components mentioned above, there is also the concept of Topic, which will be mentioned many times below:
Topic: represents a collection of messages. Each topic contains several messages. Each message can only belong to one topic. It is the basic unit of RocketMQ for message subscription. A topic can be partitioned on multiple Broker clusters. Each topic partition contains multiple queue s. Refer to the following figure for the specific structure:
1.3 design concept
RocketMQ is a topic based publish and subscribe mode. Its core functions include message sending, message storage and message consumption. The overall design pursues simplicity and performance first. To sum up, it mainly includes the following three types:
-
NameServer replaces ZK as the registration center. NameServer clusters do not communicate with each other and tolerate minute inconsistency in routing information in the cluster, which is more lightweight;
-
Use memory mapping mechanism to realize efficient IO storage and achieve high throughput;
-
Tolerate design defects and ensure that the message is consumed at least once through ack. However, if the ACK is lost, the message may be consumed repeatedly. This situation is allowed in design and is guaranteed by the user.
This article focuses on NameServer. Let's take a look at how NameServer is started and how to manage routes.
2, NameServer architecture design
In the first chapter, we have briefly introduced that NameServer replaces zk as a more lightweight registry as a provider of routing information. So how to realize routing information management? Let's look at the following figure first:
The above figure describes the core principles of NameServer for route registration, route elimination and route discovery.
Route registration: when the broker server starts, it will send a heartbeat signal to all nameservers in the nameserver cluster for registration, and will send a heartbeat to the nameserver every 30 seconds to tell the nameserver that it is alive. After receiving the heartbeat packet sent by the broker, nameserver will record the broker information and save the last time it received the heartbeat packet.
Route rejection: NameServer maintains a long connection with each Broker, receives heartbeat packets sent by the Broker every 30 seconds, and scans the Broker livetable every 10 seconds to compare whether the heartbeat time received last time and the current time are greater than 120 seconds. If more than 120 seconds, it is considered that the Broker is unavailable and the relevant information of the Broker in the routing table is rejected.
Route discovery: route discovery is not real-time. After route changes, NameServer does not actively push to the client and waits for the producer to regularly pull the latest route information. This design method reduces the complexity of NameServer implementation. When the route changes, the fault-tolerant mechanism at the message sending end is used to ensure the high availability of message sending (this content will be introduced in the subsequent introduction of producer message sending, which will not be explained in this paper).
High availability: NameServer ensures its high availability by deploying multiple NameServer servers. At the same time, there is no communication between multiple NameServer servers. In this way, when the routing information changes, the data between NameServer servers may not be exactly the same, but the high availability of message sending is guaranteed through the fault-tolerant mechanism of the sender. This is precisely why NameServer pursues simplicity and efficiency.
3, Start process
After sorting out and understanding the architecture design of NameServer, let's first look at how NameServer is started?
Since it's source code interpretation, let's take a look at the code entry: org apache. rocketmq. namesrv. Namesrvstartup#main (string [] args), which actually calls the main0() method,
The code is as follows:
public static NamesrvController main0(String[] args) { try { //Create namesrvController NamesrvController controller = createNamesrvController(args); //Initialize and start NamesrvController start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
Starting NameServer through the main method is mainly divided into two steps: first create NamesrvController, and then initialize and start NamesrvController. We analyze them separately.
3.1 sequence diagram
Before reading the code, let's have an understanding of the overall process through a sequence diagram, as shown in the following figure:
3.2 creating NamesrvController
Let's look at the core code first, as follows:
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { // Set the version number to the current version number System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); //PackageConflictDetect.detectFastjson(); //Construct org apache. commons. cli. Options, and add the - h -n parameter, - h parameter is the print help information, and - n parameter is the specified namesrvAddr Options options = ServerUtil.buildCommandlineOptions(new Options()); //Initialize commandLine, and add the - c -p parameter in options, - c specifies the configuration file path of nameserver, and - p identifies the print configuration information commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } //nameserver configuration class, business parameters final NamesrvConfig namesrvConfig = new NamesrvConfig(); //netty server configuration class, network parameters final NettyServerConfig nettyServerConfig = new NettyServerConfig(); //Set the port number of the nameserver nettyServerConfig.setListenPort(9876); //The command has the - c parameter to specify the configuration file. You need to read the content of the configuration file according to the configuration file path and assign the configuration information in the file to NamesrvConfig and NettyServerConfig if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); //Reflection mode MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); //Set profile path namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } //The command line is marked with - p, which indicates that it is a command to print parameters. Then the properties of NamesrvConfig and NettyServerConfig will be printed. When starting NameServer, you can use it first/ mqnameserver -c configFile -p prints the currently loaded configuration properties if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); //The print parameter command does not need to start the nameserver service, but only needs to print parameters System.exit(0); } //Parse the command line parameters and load them into namesrvConfig MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); //Check ROCKETMQ_HOME, cannot be empty if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } //Initialize the logback log factory. rocketmq uses logback as log output by default LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"); log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); //Create NamesrvController final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); //Copy the contents of global Properties to namesrvcontroller Configuration. In allconfigs // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); return controller; }
From the above comments on each line of code, we can see that the process of creating NamesrvController is mainly divided into two steps:
Step 1: get the configuration from the command line. Assigned to NamesrvConfig and NettyServerConfig classes.
Step 2: construct a NamesrvController instance according to the configuration classes NamesrvConfig and NettyServerConfig.
It can be seen that NamesrvConfig and NettyServerConfig want to be important. These two classes are the business parameters and network parameters of NameServer. Let's see the attributes in these two classes:
NamesrvConfig
NettyServerConfig
Note: Apache Commons CLI is an open source command line parsing tool. It can help developers quickly build startup commands and help you organize command parameters and output lists.
3.3 initialization and startup
After creating the NamesrvController instance, initialize and start the NameServer.
First, initialize. The code entry is NamesrvController#initialize.
public boolean initialize() { //Load kvconfig. Under kvConfigPath The KV configuration in the JSON configuration file, and then put these configurations into the KVConfigManager#configTable attribute this.kvConfigManager.load(); //Initialize a netty server according to nettyServerConfig. //brokerHousekeepingService is instantiated in the constructor when NamesrvController is instantiated. This class is responsible for handling Broker connection events and implements ChannelEventListener, which is mainly used to manage the brokerLiveTable of RouteInfoManager this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); //Initialize the thread pool responsible for processing Netty network interaction data. The default number of threads is 8 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); //If the requesterprocessor class is registered to handle the request, then the netrequesterprocessor class is registered to handle the request this.registerProcessor(); //Register the heartbeat mechanism thread pool, delay the startup for 5 seconds, and traverse the RouteInfoManager#brokerLiveTable attribute every 10 seconds to scan for non viable brokers this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //Register and print KV configuration thread pool, delay 1 minute to start, and print kvConfig configuration every 10 minutes this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); //rocketmq can improve the security of data transmission by turning on TLS. If it is turned on, you need to register a listener to reload SslContext if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } return true; }
The above code is the NameServer initialization process. It can be seen from the comments of each line of code that there are mainly five steps:
-
Step 1: load the KV configuration and write it into the configTable attribute of KVConfigManager;
-
Step 2: initialize the netty server;
-
Step 3: initialize the thread pool for processing netty network interaction data;
-
Step4: register the heartbeat mechanism thread pool, and check the Broker's survival every 10 seconds after starting for 5 seconds;
-
Step5: register and print the thread pool of KV configuration. After starting for 1 minute, print the KV configuration every 10 minutes.
RocketMQ's development team also used a common programming technique, which is to use JVM hook function to gracefully shut down NameServer. In this way, the shutdown operation will be performed before the JVM process is shut down.
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { controller.shutdown(); return null; } }));
Execute the start function to start the NameServer. The code is relatively simple, that is, start the netty server created in the first step. Remotingserver The start () method is not described in detail. You need to be familiar with netty, which is not the focus of this article. Interested students can download the source code by themselves.
public void start() throws Exception { //Start netty service this.remotingServer.start(); //If TLS is turned on if (this.fileWatchService != null) { this.fileWatchService.start(); } }
4, Routing management
At the beginning of Chapter 2, we learned that NameServer, as a lightweight registry, mainly provides Topic routing information for message producers and consumers, and manages these routing information and Broker nodes, mainly including route registration, route elimination and route discovery.
This chapter will analyze how NameServer manages routing information from the perspective of source code. The core code is mainly at org apache. rocketmq. namesrv. routeinfo. Routeinfomanager.
4.1 routing meta information
Before understanding routing information management, we first need to know which routing meta information is stored in NameServer and what the data structure is.
Looking at the code, we can see that the routing meta information is maintained mainly through five attributes, as follows:
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
We will explain these five attributes in turn.
4.1.1 TopicQueueTable
Note: Topic message queue routing information. Load balancing is performed according to the routing table when sending messages.
Data structure: HashMap structure. key is the name of Topic, and value is a queue collection of type QueueData. As mentioned in the first chapter, there are multiple queues in a Topic. The data structure of QueueData is as follows:
Data structure example:
topicQueueTable:{ "topic1": [ { "brokerName": "broker-a", "readQueueNums":4, "writeQueueNums":4, "perm":6, "topicSynFlag":0, }, { "brokerName": "broker-b", "readQueueNums":4, "writeQueueNums":4, "perm":6, "topicSynFlag":0, } ] }
4.1.2 BrokerAddrTable
Description: the basic information of the Broker, including the Broker name, the name of the cluster to which it belongs, and the address of the active and standby brokers.
Data structure: HashMap structure. key is BrokerName and value is an object of BrokerData type. The data structure of BrokerData is as follows (it can be understood in combination with the following logic diagram of Broker master-slave structure):
Broker master-slave structure logic diagram:
Data structure example:
brokerAddrTable:{ "broker-a": { "cluster": "c1", "brokerName": "broker-a", "brokerAddrs": { 0: "192.168.1.1:10000", 1: "192.168.1.2:10000" } }, "broker-b": { "cluster": "c1", "brokerName": "broker-b", "brokerAddrs": { 0: "192.168.1.3:10000", 1: "192.168.1.4:10000" } } }
4.1.3 ClusterAddrTable
Note: the Broker cluster information stores the names of all brokers in the cluster.
Data structure: HashMap structure. key is ClusterName and value is the Set structure that stores BrokerName.
Data structure example:
clusterAddrTable:{ "c1": ["broker-a","broker-b"] }
4.1.4 BrokerLiveTable
Description: Broker status information. NameServer replaces this information every time it receives a heartbeat packet
Data structure: HashMap structure. key is the address of the Broker and value is the Broker information object of the BrokerLiveInfo structure. The data structure of BrokerLiveInfo is as follows:
Data structure example:
brokerLiveTable:{ "192.168.1.1:10000": { "lastUpdateTimestamp": 1518270318980, "dataVersion":versionObj1, "channel":channelObj, "haServerAddr":"" }, "192.168.1.2:10000": { "lastUpdateTimestamp": 1518270318980, "dataVersion":versionObj1, "channel":channelObj, "haServerAddr":"192.168.1.1:10000" }, "192.168.1.3:10000": { "lastUpdateTimestamp": 1518270318980, "dataVersion":versionObj1, "channel":channelObj, "haServerAddr":"" }, "192.168.1.4:10000": { "lastUpdateTimestamp": 1518270318980, "dataVersion":versionObj1, "channel":channelObj, "haServerAddr":"192.168.1.3:10000" } }
4.1.5 filterServerTable
Note: the list of filterservers and message filtering servers on the Broker will be introduced later when the consumer is introduced. The data pulled by the consumer is pulled through the filterserver, and the consumer registers with the Broker.
Data structure: HashMap structure, where key is the Broker address and value is the List set recording the address of filterServer.
4.2 route registration
Route registration is realized through the heartbeat function between Broker and NameServer. It is mainly divided into two steps:
Step1:
When the Broker starts, it sends heartbeat statements to all nameservers in the cluster and sends them again every 30 seconds (the default is 30s, and the time interval is between 10 seconds and 60 seconds).
Step2:
NameServer received heartbeat package updates topicqueueuetable, brokeraddrtable, brokerlivetable, clusterAddrTable and filterServerTable.
We analyze these two steps separately.
4.2.1 Broker sends heartbeat packets
The core logic of sending heartbeat packets is in the Broker startup logic, and the code entry is org apache. rocketmq. Broker. The core of this article is the heartbeat sending logic of the Broker controller, which is only listed in the following article:
1) A thread pool registration Broker is created. The program is executed 10 seconds after startup and every 30 seconds (the default is 30 seconds, the time interval is between 10 seconds and 60 seconds, and the default value of BrokerConfig.getRegisterNameServerPeriod() is 30 seconds).
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
2) After encapsulating the Topic configuration and version number, carry out the actual route registration (Note: encapsulating the Topic configuration is not the focus of this article, which will be explained when introducing the Broker source code). The actual route registration is at org apache. rocketmq. Broker. out. Implemented in brokerouter API #registerbrokerall, the core code is as follows:
public List<RegisterBrokerResult> registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, final boolean compressed) { final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>(); //Get nameserver address list List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { /** *Encapsulation request header start *Encapsulate the request header, which mainly encapsulates the relevant information of the broker **/ final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); //Encapsulate the requestBody, including topic and filterServerList related information RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); /** *Encapsulate request header end **/ //Enable multithreading to register with each nameserver final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { //Actual registration method RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { //Encapsulate the information returned by nameserver registerBrokerResultList.add(result); } log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } return registerBrokerResultList; }
From the above code, it is also relatively simple. First, you need to encapsulate the request header and requestBody, and then turn on multithreading to register with each NameServer server.
The request header type is RegisterBrokerRequestHeader, which mainly includes the following fields:
The requestBody type is RegisterBrokerBody, which mainly includes the following fields:
1) The actual route registration is realized through the registerBroker method. The core code is as follows:
private RegisterBrokerResult registerBroker( final String namesrvAddr, final boolean oneway, final int timeoutMills, final RegisterBrokerRequestHeader requestHeader, final byte[] body ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { //When creating a request instruction, you need to pay attention to the requestCode REGISTER_ Broker, the network processor at the nameserver side will conduct corresponding business processing according to the requestCode RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); request.setBody(body); //Network transmission based on netty if (oneway) { //If it is a one-way call, there is no return value and no nameserver return result try { this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { // Ignore } return null; } //The asynchronous call initiates registration with the nameserver to obtain the return information of the nameserver RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { //Get the returned reponseHeader RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); //Reseal the returned results and update masterAddr and haServerAddr RegisterBrokerResult result = new RegisterBrokerResult(); result.setMasterAddr(responseHeader.getMasterAddr()); result.setHaServerAddr(responseHeader.getHaServerAddr()); if (response.getBody() != null) { result.setKvTable(KVTable.decode(response.getBody(), KVTable.class)); } return result; } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr()); }
Network transmission is conducted between the Broker and NameServer through netty. When the Broker initiates registration with the NameServer, it will add the registration code requestCode to the request REGISTER_ Broker. This is a network tracking method. Each request of RocketMQ will define a requestCode, and the network processor of the server will process the affected business according to different requestcodes.
4.2.2 NameServer processing heartbeat packets
After the Broker sends out the heartbeat packet of route registration, NameServer will process it according to the requestCode in the heartbeat packet. The default network processor of NameServer is DefaultRequestProcessor. The specific code is as follows:
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { if (ctx != null) { log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request); } switch (request.getCode()) { ...... //, if it is requestcode REGISTER_ Broker, register as a broker case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } ...... default: break; } return null; }
Judge requestCode, if it is requestCode REGISTER_ Broker, then confirm that the business processing logic is a registered broker. Select different methods according to the broker version number, and we have V3_0_11 taking the above as an example, calling the registerBrokerWithFilterServer method to register is divided into three steps:
Step1:
Check whether the header is correct based on the Cr32;
Step2:
Analyze Topic information;
Step3:
Call RouteInfoManager#registerBroker to register the Broker;
The core registration logic is implemented by RouteInfoManager#registerBroker. The core code is as follows:
public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { //Add write lock to prevent concurrent writing of routing table information in RoutInfoManager. this.lock.writeLock().lockInterruptibly(); //Get all broker name collections from clusterAddrTable according to clusterName Set<String> brokerNames = this.clusterAddrTable.get(clusterName); //If it is not obtained, it means that the cluster to which the broker belongs has not been recorded. You need to create and add the broker name to the broker collection of the cluster if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); boolean registerFirst = false; //Try to get brokerData from brokerAddrTable according to brokerName BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { //If no brokerdata is obtained, create a new brokerdata and put it into brokerAddrTable, and set registerFirst to true; registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } //Update brokerAddrs in brokerData Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); //Considering that the master may hang up and the slave becomes the master, the brokerId will become 0. At this time, the old brokerAddr needs to be deleted //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT> //The same IP:PORT must only have one record in brokerAddrTable Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> item = it.next(); if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } } //Update the broker addrs and judge whether it is the first registered broker according to the returned oldAddr String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); //If the Broker is a Master, and the Broker's Topic configuration information changes or is registered for the first time, you need to create or update the Topic routing metadata and fill in the topicqueueuetable if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { //Create or update Topic routing metadata this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } //Update brokelivelnfo, which is an important basis for route deletion BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr); } //Register the address list of the Broker's filterServer if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } } //If this Broker is a slave node, you need to find the node information of the Broker Master and update the corresponding masterAddr attribute if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); } return result; }
Through the above source code analysis, we can decompose the registration of a Broker into seven steps:
-
Step 1: add write lock to prevent concurrent writing of routing table information in RoutInfoManager;
-
Step 2: judge whether the cluster to which the Broker belongs exists. If it does not exist, it needs to be created, and add the Broker name to the cluster Broker set;
-
Step 3: maintain BrokerData;
-
Step 4: if the Broker is a Master and the Broker's Topic configuration information changes or is registered for the first time, it is necessary to create or update the Topic routing metadata and fill in the topicqueueuetable;
-
Step 5: update BrokerLivelnfo;
-
Step 6: register the address list of the Broker's filterServer;
-
Step 7: if this Broker is a slave node, you need to find the node information of the Broker Master, update the corresponding masterAddr attribute, and return it to the Broker side.
4.3 route rejection
4.3.1 trigger conditions
There are two main trigger conditions for route rejection:
NameServer scans the BrokerLiveTable every 10s. If no heartbeat packet is received for 120s, remove the Broker and close the socket connection;
Route deletion is triggered when the Broker closes normally.
4.3.2 source code analysis
The logic of the trigger point and the final route deletion described above is the same, which is unified in RouteInfoManager#onChannelDestroy
The core code is as follows:
public void onChannelDestroy(String remoteAddr, Channel channel) { String brokerAddrFound = null; if (channel != null) { try { try { //Read lock this.lock.readLock().lockInterruptibly(); //Find the corresponding Broker address from the brokerLiveTable through the channel Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable = this.brokerLiveTable.entrySet().iterator(); while (itBrokerLiveTable.hasNext()) { Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next(); if (entry.getValue().getChannel() == channel) { brokerAddrFound = entry.getKey(); break; } } } finally { //Release read lock this.lock.readLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } } //If the Broker has been cleared from the surviving Broker address list, use remoteAddr directly if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } else { log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound); } if (brokerAddrFound != null && brokerAddrFound.length() > 0) { try { try { //Apply for write lock this.lock.writeLock().lockInterruptibly(); //Remove the brokerAddress from the brokerLiveTable and filterServerTable according to the brokerAddress this.brokerLiveTable.remove(brokerAddrFound); this.filterServerTable.remove(brokerAddrFound); String brokerNameFound = null; boolean removeBrokerName = false; Iterator<Entry<String, BrokerData>> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator(); //Traverse the brokerAddrTable while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { BrokerData brokerData = itBrokerAddrTable.next().getValue(); Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); //Find the corresponding brokerData according to the brokerAddress and remove the corresponding brokerAddress from the brokerData if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", brokerId, brokerAddr); break; } } //If the brokerAddress of the entire brokerData is empty after removal, the entire brokerData will be removed if (brokerData.getBrokerAddrs().isEmpty()) { removeBrokerName = true; itBrokerAddrTable.remove(); log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); } } if (brokerNameFound != null && removeBrokerName) { //Traverse clusterAddrTable Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, Set<String>> entry = it.next(); String clusterName = entry.getKey(); Set<String> brokerNames = entry.getValue(); //Remove the corresponding brokerName according to the brokerName to be removed obtained in step 3 boolean removed = brokerNames.remove(brokerNameFound); if (removed) { log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", brokerNameFound, clusterName); //If the collection is empty after removal, the entire cluster will be removed from the clusterAddrTable if (brokerNames.isEmpty()) { log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", clusterName); it.remove(); } break; } } } if (removeBrokerName) { Iterator<Entry<String, List<QueueData>>> itTopicQueueTable = this.topicQueueTable.entrySet().iterator(); //Traverse queuetopic table while (itTopicQueueTable.hasNext()) { Entry<String, List<QueueData>> entry = itTopicQueueTable.next(); String topic = entry.getKey(); List<QueueData> queueDataList = entry.getValue(); Iterator<QueueData> itQueueData = queueDataList.iterator(); while (itQueueData.hasNext()) { QueueData queueData = itQueueData.next(); //Remove the corresponding broker under topic according to the broker name if (queueData.getBrokerName().equals(brokerNameFound)) { itQueueData.remove(); log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); } } //If there is only one broker to be removed under the topic, the topic will also be removed from the table if (queueDataList.isEmpty()) { itTopicQueueTable.remove(); log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); } } } } finally { //Release write lock this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } } }
The overall logic of route deletion is mainly divided into six steps:
-
Step 1: add readlock, find the corresponding Broker address from the BrokerLiveTable through the channel, and release readlock. If the Broker has been cleared from the list of surviving Broker addresses, use remoteAddr directly.
-
Step 2: apply for a write lock and remove it from the BrokerLiveTable and filterServerTable according to the BrokerAddress.
-
Step 3: traverse the BrokerAddrTable, find the corresponding brokerData according to the brokeraddress, and remove the corresponding brokeraddress in the brokerData. If the brokeraddress of the whole brokerData is empty after removal, remove the whole brokerData.
-
Step 4: traverse the clusterAddrTable and remove the corresponding brokername according to the brokername to be removed obtained in step 3. If the collection is empty after removal, the entire cluster will be removed from the clusterAddrTable.
-
Step 5: traverse the topicqueueuetable and remove the corresponding Broker under the Topic according to the Broker name. If there is only one Broker to be removed under the Topic, the Topic will also be removed from the table.
-
Step 6: release the write lock.
As can be seen from the above, the overall logic of route elimination is relatively simple, that is, it simply operates on the data structure of route meta information. In order to better understand this code, it is recommended that you read the code by referring to the data structure of routing meta information introduced in 4.1.
4.4 route discovery
When the routing information changes, the nameserver will not actively push it to the client, but wait for the client to regularly go to the nameserver to actively pull the latest routing information. This design reduces the complexity of nameserver implementation.
4.4.1 active pull of producer
After the producer is started, it will start a series of scheduled tasks, one of which is to regularly obtain Topic routing information from the NameServer. The code entry is mqclientinstance#start scheduledtask (), and the core code is as follows:
private void startScheduledTask() { ...... this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { //Update the latest topic routing information from nameserver MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); ...... } /** * Get topic routing information from nameserver */ public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis, boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { ...... //Send a request package to the nameserver. The requestCode is requestCode GET_ ROUTEINFO_ BY_ TOPIC RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader); ...... }
The network transmission between producer and NameServer is through netty, and producer adds the registration code to the request initiated by NameServer
RequestCode.GET_ROUTEINFO_BY_TOPIC.
4.4.2 routing information returned by nameserver
After receiving the request sent by producer, NameServer will process it according to the requestCode in the request. The processing of requestCode is also carried out in the default network processor DefaultRequestProcessor, which is finally realized through RouteInfoManager#pickupTopicRouteData.
TopicRouteData structure
Before formally parsing the source code, let's take a look at the data structure returned by NameServer to producer. From the code, we can see that the returned object is a TopicRouteData object. The specific structure is as follows:
Among them, QueueData, BrokerData and filterServerTable are introduced when introducing the routing meta information in Chapter 4.1.
Source code analysis
After understanding the TopicRouteData structure returned to the producer, let's enter the RouteInfoManager#pickupTopicRouteData method to see how to implement it.
public TopicRouteData pickupTopicRouteData(final String topic) { TopicRouteData topicRouteData = new TopicRouteData(); boolean foundQueueData = false; boolean foundBrokerData = false; Set<String> brokerNameSet = new HashSet<String>(); List<BrokerData> brokerDataList = new LinkedList<BrokerData>(); topicRouteData.setBrokerDatas(brokerDataList); HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>(); topicRouteData.setFilterServerTable(filterServerMap); try { try { //Read lock this.lock.readLock().lockInterruptibly(); //Get the queue collection according to the topic name from the metadata topicqueueuetable List<QueueData> queueDataList = this.topicQueueTable.get(topic); if (queueDataList != null) { //Write the obtained queue collection to the queuedata of topicRouteData topicRouteData.setQueueDatas(queueDataList); foundQueueData = true; Iterator<QueueData> it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); brokerNameSet.add(qd.getBrokerName()); } //Traverse the brokerName extracted from the QueueData collection for (String brokerName : brokerNameSet) { //Get brokerData from brokerAddrTable according to brokerName BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { //Clone the brokerData object and write it to the brokerData of topicRouteData BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone()); brokerDataList.add(brokerDataClone); foundBrokerData = true; //Traversal of brokerAddrs for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { //Get the filterServerList according to brokerAddr, and write it into the filterServerTable of topicRouteData after encapsulation List<String> filterServerList = this.filterServerTable.get(brokerAddr); filterServerMap.put(brokerAddr, filterServerList); } } } } } finally { //Release read lock this.lock.readLock().unlock(); } } catch (Exception e) { log.error("pickupTopicRouteData Exception", e); } log.debug("pickupTopicRouteData {} {}", topic, topicRouteData); if (foundBrokerData && foundQueueData) { return topicRouteData; } return null; }
The above code encapsulates the queuedata, brokerdata and filterServerTable of topicrotedata, and the orderTopicConf field is not encapsulated. Let's see when this field is encapsulated. Let's look up at the calling method DefaultRequestProcessor#getRouteInfoByTopic of routeinfomanager#pickuptopicrotedata as follows:
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { ...... //This code is the code parsed above to obtain the topicRouteData object TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); if (topicRouteData != null) { //Judge whether the orderMessageEnable configuration of nameserver is enabled if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) { //If the configuration is turned on, get the configuration content of sequential messages in kvConfig configuration file according to the name of namespace and topic String orderTopicConf = this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic()); //Encapsulate orderTopicConf topicRouteData.setOrderTopicConf(orderTopicConf); } byte[] content = topicRouteData.encode(); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } //If the topic route is not obtained, the reponseCode is TOPIC_NOT_EXIST response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; }
Combining these two methods, we can conclude that finding Topic routing is mainly divided into three steps:
Call RouteInfoManager#pickupTopicRouteData, get information from topicqueueuetable, brokeraddrtab and filterServerTable, and fill in queue data, brokerdata and filterServerTable respectively.
If the topic is a sequential message, get the configuration about the first closing of the sequential message from KVconfig and fill it into orderTopicConf.
If no routing information is found, the returned code is responsecode TOPIC_ NOT_ EXIST.
5, Summary
This article mainly introduces the NameServer of RocketMQ from the perspective of source code, including the startup process of NameServer, route registration, route elimination and route discovery. After understanding the design principle of NameServer, we can also go back and think about some tips worth learning in the design process. Here, I put forward two points:
-
Start the process and register the JVM hook for graceful shutdown. This is a programming skill. In the actual development process, if we use thread pool or some resident thread tasks, we can consider registering JVM hooks to release resources or complete some things before the JVM is closed to ensure elegant downtime.
-
When updating the routing table, you need to prevent concurrent operations by locking. Here, the read-write lock with less lock granularity is used to allow multiple message senders to read concurrently to ensure high concurrency when sending messages. However, at the same time, NameServer only processes one Broker heartbeat packet, and multiple heartbeat packets request serial execution. This is also a classic use scenario of read-write lock.