Automatic creation mechanism of RocketMQ theme

problem

When learning RocketMQ, I have several questions.
If the subject does not exist, who will the client send the message to?
When a message is sent to a topic that does not exist, when is the topic created?

guess

When I execute the following code, the topic does not exist, so when did I create the topic "TopicTest202112151152"?

  Message msg = new Message("TopicTest202112151152" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
            SendResult sendResult = producer.send(msg,1000000000);

In fact, what I guessed at that time was that when I found that the theme did not exist, I first sent a message to the server to create the theme, and then sent the message.
The result: create a topic when sending a message

Question 1: when the client sends a message, the subject does not exist. To whom?

Source tracking

Take the following code as an example. To send a message to the topic "TopicTest202112151154", the content sent is a time string, followed by producer Send method

// Instantiate with a producer group name.
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Specify name server addresses.
    producer.setNamesrvAddr("localhost:9876");
    // Launch the instance.
    producer.start();
    // Create a message instance, specifying topic, tag and message body.
    Message msg =
        new Message(
            "TopicTest202112151154",
            "TagA",
            (LocalDateTime.now().toString()).getBytes(RemotingHelper.DEFAULT_CHARSET));
    // Call send message to deliver message to one of brokers.
    SendResult sendResult = producer.send(msg, 1000000000);
    System.out.printf("%s%n", sendResult);
    // Shut down once the producer instance is not longer in use.
    producer.shutdown();

Follow DefaultMQProducerImpl###sendDefaultImpl method

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    	//...
    	TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    	//....
    	//... send message
    }

Follow DefaultMQProducerImpl###tryToFindTopicPublishInfo method

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        //First, get from the local cache. Because the topic does not exist, it returns null
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //Then get from the NameServer. Because the topic does not exist, a TopicPublishInfo without Ok is returned 
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        //Because TopicPublishInfo does not Ok
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            //Get the topic again. This method is the key point. Follow it
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

Following MQClientInstance###updateTopicRouteInfoFromNameServer method
In this method, get the routing information of the default topic "TBW102" in the NameServer, and copy the routing information of the new topic with reference to "TBW102". At this time, it is considered that the new topic has been created on the client, but the changed topic has not been created on the server.

    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
   
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        //Get the default theme defaultmqproducer Getcreatetopickey(), that is, the routing information of TBW102
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                            //Omit...
                    }
                    //Then, create the topicRouteData of the new topic according to the topicRouteData of TBW102. At this time, the client will have the routing information of the new topic (actually the routing information of TBW102)   
        return false;
    }

At this time, the client has the routing information of the new topic, but there is no information of the topic on the broker corresponding to the routing information, but the client already knows which IP to send the message to.

Question answer

If the topic information obtained by the client does not exist, it will create a new topic according to the information of the "TBW102" topic, and then store the information of the new topic locally. At this time, the client knows which IP to send data to, and then the client will establish a connection with the Netty of that IP, and then send data and Ok.

Question 2: when will the broker create a message when it finds that the topic does not exist?

Where do you start

First of all, you should know Netty, so that according to common sense, you can know that the logic is in the simple channel inboundhandler.
So where to find SimpleChannelInboundHandler? You should find NettyServer first. NettyServer should be found in the startup source code of Broker.
The following code is in the BrokerController###start method

if (this.remotingServer != null) {
            this.remotingServer.start();
        }

NettyRemotingServer is selected as the implementation class of remotingServer. The start method inside has the following code

 ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });

serverHandler is the MQ custom method. You can find the channelRead0 method of NettyServerHandler
NettyRemotingAbstract###processMessageReceived method, set conditional multithread break in processRequestCommand, and the condition is CMD code == 310(RequestCode.SEND_MESSAGE_V2 = 310)

   public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

Start with the source code

When the client sends a message, the breakpoint of the broker will stop on the following line processRequestCommand

NettyRemotingAbstract###processMessageReceived method, set conditional multithread break in processRequestCommand, and the condition is CMD code == 310(RequestCode.SEND_MESSAGE_V2 = 310)

   public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

Nettyremotengabstract ###processrequestcommand method
RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd) submits the task to the run anonymous class in the following code

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();

        if (pair != null) {
      Runnable run =
          new Runnable() {
            @Override
            public void run() {
              try {
                doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                final RemotingResponseCallback callback =
                    new RemotingResponseCallback() {
                      @Override
                      public void callback(RemotingCommand response) {
                        doAfterRpcHooks(
                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                        if (!cmd.isOnewayRPC()) {
                          if (response != null) {
                            response.setOpaque(opaque);
                            response.markResponseType();
                            try {
                              System.out.println(response);
                              ctx.writeAndFlush(response);
                            } catch (Throwable e) {
                              log.error("process request over, but response failed", e);
                              log.error(cmd.toString());
                              log.error(response.toString());
                            }
                          } else {
                          }
                        }
                      }
                    };
                if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                  AsyncNettyRequestProcessor processor =
                      (AsyncNettyRequestProcessor) pair.getObject1();
                  processor.asyncProcessRequest(ctx, cmd, callback);
                } else {
                  NettyRequestProcessor processor = pair.getObject1();
                  RemotingCommand response = processor.processRequest(ctx, cmd);
                  callback.callback(response);
                }
              } catch (Throwable e) {
                log.error("process request exception", e);
                log.error(cmd.toString());

                if (!cmd.isOnewayRPC()) {
                  final RemotingCommand response =
                      RemotingCommand.createResponseCommand(
                          RemotingSysResponseCode.SYSTEM_ERROR,
                          RemotingHelper.exceptionSimpleDesc(e));
                  response.setOpaque(opaque);
                  ctx.writeAndFlush(response);
                }
              }
            }
          };

            if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }

            try {
                 //Use thread pool to submit tasks
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
               
    }

Then follow the sendmessageprocessor ####asyncprocessrequest method

public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
        asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor());
    }

Then follow the sendmessageprocessor ####asyncprocessrequest method

public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                                  RemotingCommand request) throws RemotingCommandException {
        final SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.asyncConsumerSendMsgBack(ctx, request);
            default:
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return CompletableFuture.completedFuture(null);
                }
                mqtraceContext = buildMsgContext(ctx, requestHeader);
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
                if (requestHeader.isBatch()) {
                    return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    //Take this branch
                    return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
                }
        }
    }

Then follow the sendmessageprocessor ####asyncsendmessage method
Method has a preSend method

private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
                                                                
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        //ellipsis
    }

Then follow the sendmessageprocessor ####asyncsendmessage method

private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,
                                    SendMessageRequestHeader requestHeader) {
       
       //ellipsis

        //Check for problems with the subject
        super.msgCheck(ctx, requestHeader, response);
        
        //ellipsis
    }

Follow up AbstractSendMessageProcessor###msgCheck method

 protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
        final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
            
            //ellipsis
            
            //Create a theme on the broker and follow it
            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
                requestHeader.getTopic(),
                requestHeader.getDefaultTopic(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                requestHeader.getDefaultTopicQueueNums(), topicSysFlag);

           //ellipsis

    }

TopicConfigManager###createTopicInSendMessageMethod
This method will create a topic and persist it. At this time, the topic exists in the broker but the NameServer does not exist

public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
        final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
      
                    
                        if (PermName.isInherited(defaultTopicConfig.getPerm())) {
                            //Create topic information for a new topic
                            topicConfig = new TopicConfig(topic);

                            int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums());

                            if (queueNums < 0) {
                                queueNums = 0;
                            }

                            topicConfig.setReadQueueNums(queueNums);
                            topicConfig.setWriteQueueNums(queueNums);
                            int perm = defaultTopicConfig.getPerm();
                            perm &= ~PermName.PERM_INHERIT;
                            topicConfig.setPerm(perm);
                            topicConfig.setTopicSysFlag(topicSysFlag);
                            topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
                        } 
                    } 
                    if (topicConfig != null) {
                       
                        //Persistence
                        this.persist();
                    }
                } 

        return topicConfig;
    }

###ConfigManager###persist method

public synchronized void persist() {
        String jsonString = this.encode(true);
        if (jsonString != null) {
            //My value is C: \ users \ 25682 \ store \ config \ topics json
            String fileName = this.configFilePath();
            try {
                MixAll.string2File(jsonString, fileName);
            } catch (IOException e) {
                log.error("persist file " + fileName + " exception", e);
            }
        }
    }

MixAll###string2File

//str is all the latest topic information
public static void string2File(final String str, final String fileName) throws IOException {
        //First save str to topics json. In TMP
        String tmpFile = fileName + ".tmp";
        string2FileNotSafe(str, tmpFile);
        //Put topics The data in JSON is stored in topics json. BK Li
        String bakFile = fileName + ".bak";
        String prevContent = file2String(fileName);
        if (prevContent != null) {
            string2FileNotSafe(prevContent, bakFile);
        }
        //Delete topics json
        File file = new File(fileName);
        file.delete();
        //Put topics json. TMP renamed topics json
        file = new File(tmpFile);
        file.renameTo(new File(fileName));
    }

Role of TBW102 theme

When the Producer sends a message, by default, it does not need to create a Topic in advance. If the Topic does not exist, the Broker will automatically create a Topic. But what are the permissions of the newly created Topic? What is the number of read-write queues? TBW102 is needed at this time
After that, RocketMQ will create a new Topic based on the configuration of the Topic.

reference resources

Deeply analyze the creation mechanism of RocketMQ topics. Why does the production recommend turning off automatic Topic creation

https://blog.csdn.net/a1036645146/article/details/109581499

Role of TBW102 theme

https://www.modb.pro/db/130866

Keywords: Java RabbitMQ Distribution

Added by AbsolutelyFreeW on Wed, 15 Dec 2021 15:41:23 +0200