problem
When learning RocketMQ, I have several questions.
If the subject does not exist, who will the client send the message to?
When a message is sent to a topic that does not exist, when is the topic created?
guess
When I execute the following code, the topic does not exist, so when did I create the topic "TopicTest202112151152"?
Message msg = new Message("TopicTest202112151152" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */); SendResult sendResult = producer.send(msg,1000000000);
In fact, what I guessed at that time was that when I found that the theme did not exist, I first sent a message to the server to create the theme, and then sent the message.
The result: create a topic when sending a message
Question 1: when the client sends a message, the subject does not exist. To whom?
Source tracking
Take the following code as an example. To send a message to the topic "TopicTest202112151154", the content sent is a time string, followed by producer Send method
// Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); // Launch the instance. producer.start(); // Create a message instance, specifying topic, tag and message body. Message msg = new Message( "TopicTest202112151154", "TagA", (LocalDateTime.now().toString()).getBytes(RemotingHelper.DEFAULT_CHARSET)); // Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg, 1000000000); System.out.printf("%s%n", sendResult); // Shut down once the producer instance is not longer in use. producer.shutdown();
Follow DefaultMQProducerImpl###sendDefaultImpl method
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { //... TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); //.... //... send message }
Follow DefaultMQProducerImpl###tryToFindTopicPublishInfo method
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { //First, get from the local cache. Because the topic does not exist, it returns null TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); //Then get from the NameServer. Because the topic does not exist, a TopicPublishInfo without Ok is returned this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } //Because TopicPublishInfo does not Ok if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { //Get the topic again. This method is the key point. Follow it this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
Following MQClientInstance###updateTopicRouteInfoFromNameServer method
In this method, get the routing information of the default topic "TBW102" in the NameServer, and copy the routing information of the new topic with reference to "TBW102". At this time, it is considered that the new topic has been created on the client, but the changed topic has not been created on the server.
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { //Get the default theme defaultmqproducer Getcreatetopickey(), that is, the routing information of TBW102 topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); //Omit... } //Then, create the topicRouteData of the new topic according to the topicRouteData of TBW102. At this time, the client will have the routing information of the new topic (actually the routing information of TBW102) return false; }
At this time, the client has the routing information of the new topic, but there is no information of the topic on the broker corresponding to the routing information, but the client already knows which IP to send the message to.
Question answer
If the topic information obtained by the client does not exist, it will create a new topic according to the information of the "TBW102" topic, and then store the information of the new topic locally. At this time, the client knows which IP to send data to, and then the client will establish a connection with the Netty of that IP, and then send data and Ok.
Question 2: when will the broker create a message when it finds that the topic does not exist?
Where do you start
First of all, you should know Netty, so that according to common sense, you can know that the logic is in the simple channel inboundhandler.
So where to find SimpleChannelInboundHandler? You should find NettyServer first. NettyServer should be found in the startup source code of Broker.
The following code is in the BrokerController###start method
if (this.remotingServer != null) { this.remotingServer.start(); }
NettyRemotingServer is selected as the implementation class of remotingServer. The start method inside has the following code
ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, encoder, new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, serverHandler ); } });
serverHandler is the MQ custom method. You can find the channelRead0 method of NettyServerHandler
NettyRemotingAbstract###processMessageReceived method, set conditional multithread break in processRequestCommand, and the condition is CMD code == 310(RequestCode.SEND_MESSAGE_V2 = 310)
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } }
Start with the source code
When the client sends a message, the breakpoint of the broker will stop on the following line processRequestCommand
NettyRemotingAbstract###processMessageReceived method, set conditional multithread break in processRequestCommand, and the condition is CMD code == 310(RequestCode.SEND_MESSAGE_V2 = 310)
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } }
Nettyremotengabstract ###processrequestcommand method
RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd) submits the task to the run anonymous class in the following code
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); final RemotingResponseCallback callback = new RemotingResponseCallback() { @Override public void callback(RemotingCommand response) { doAfterRpcHooks( RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { System.out.println(response); ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } } }; if (pair.getObject1() instanceof AsyncNettyRequestProcessor) { AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor) pair.getObject1(); processor.asyncProcessRequest(ctx, cmd, callback); } else { NettyRequestProcessor processor = pair.getObject1(); RemotingCommand response = processor.processRequest(ctx, cmd); callback.callback(response); } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand( RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } try { //Use thread pool to submit tasks final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { }
Then follow the sendmessageprocessor ####asyncprocessrequest method
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception { asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor()); }
Then follow the sendmessageprocessor ####asyncprocessrequest method
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this.asyncConsumerSendMsgBack(ctx, request); default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return CompletableFuture.completedFuture(null); } mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); if (requestHeader.isBatch()) { return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { //Take this branch return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader); } } }
Then follow the sendmessageprocessor ####asyncsendmessage method
Method has a preSend method
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) { final RemotingCommand response = preSend(ctx, request, requestHeader); //ellipsis }
Then follow the sendmessageprocessor ####asyncsendmessage method
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request, SendMessageRequestHeader requestHeader) { //ellipsis //Check for problems with the subject super.msgCheck(ctx, requestHeader, response); //ellipsis }
Follow up AbstractSendMessageProcessor###msgCheck method
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, final SendMessageRequestHeader requestHeader, final RemotingCommand response) { //ellipsis //Create a theme on the broker and follow it topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod( requestHeader.getTopic(), requestHeader.getDefaultTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getDefaultTopicQueueNums(), topicSysFlag); //ellipsis }
TopicConfigManager###createTopicInSendMessageMethod
This method will create a topic and persist it. At this time, the topic exists in the broker but the NameServer does not exist
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic, final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) { if (PermName.isInherited(defaultTopicConfig.getPerm())) { //Create topic information for a new topic topicConfig = new TopicConfig(topic); int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums()); if (queueNums < 0) { queueNums = 0; } topicConfig.setReadQueueNums(queueNums); topicConfig.setWriteQueueNums(queueNums); int perm = defaultTopicConfig.getPerm(); perm &= ~PermName.PERM_INHERIT; topicConfig.setPerm(perm); topicConfig.setTopicSysFlag(topicSysFlag); topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType()); } } if (topicConfig != null) { //Persistence this.persist(); } } return topicConfig; }
###ConfigManager###persist method
public synchronized void persist() { String jsonString = this.encode(true); if (jsonString != null) { //My value is C: \ users \ 25682 \ store \ config \ topics json String fileName = this.configFilePath(); try { MixAll.string2File(jsonString, fileName); } catch (IOException e) { log.error("persist file " + fileName + " exception", e); } } }
MixAll###string2File
//str is all the latest topic information public static void string2File(final String str, final String fileName) throws IOException { //First save str to topics json. In TMP String tmpFile = fileName + ".tmp"; string2FileNotSafe(str, tmpFile); //Put topics The data in JSON is stored in topics json. BK Li String bakFile = fileName + ".bak"; String prevContent = file2String(fileName); if (prevContent != null) { string2FileNotSafe(prevContent, bakFile); } //Delete topics json File file = new File(fileName); file.delete(); //Put topics json. TMP renamed topics json file = new File(tmpFile); file.renameTo(new File(fileName)); }
Role of TBW102 theme
When the Producer sends a message, by default, it does not need to create a Topic in advance. If the Topic does not exist, the Broker will automatically create a Topic. But what are the permissions of the newly created Topic? What is the number of read-write queues? TBW102 is needed at this time
After that, RocketMQ will create a new Topic based on the configuration of the Topic.
reference resources
Deeply analyze the creation mechanism of RocketMQ topics. Why does the production recommend turning off automatic Topic creation
https://blog.csdn.net/a1036645146/article/details/109581499
Role of TBW102 theme
https://www.modb.pro/db/130866