How does RocketMQ Broker perceive abnormal downtime and normal offline of consumers by analyzing the fishing source code?

1, Foreword

When studying the consumption timeout of RocketMQ Consumer, which leads to repeated consumption, I suddenly thought of a thing: how does the Broker perceive the abnormal downtime and offline of the Consumer/Producer? And how can the remaining consumers quickly redo the load?, Originally, after sorting out the source code of Broker and Consumer, I also came to some conclusions, but I always felt that it was almost interesting, so I ran the RocketMQ source code locally( How does the RocketMQ source code run locally ), and added some console output and logs; This paper will analyze the source code of nanny level teaching people to fish in combination with console and log output.

1) Pre knowledge

If you want to deeply study RocketMQ (fully understand this article), you can refer to the functions of some classes:
1,Source code analysis of load balancing on the Consumer side of RocketMQ (RebalanceImpl)
2,RocketMQ: in depth analysis of the principle and source code of the Broker startup process
3,In depth analysis of RocketMQ Consumer start startup process source code

Of course, there are many big guys who know a lot about some classes. Let's start the text directly.

2, Thinking + source code analysis

There are two situations when a Consumer goes offline: normal offline and abnormal downtime.

1. Normal offline (shutDown in the code)

At first, I personally felt that it was a bit like the kill -15 [pid] command or the normal shutdown () of the thread pool, so I speculated whether RocketMQ was like them. When the program was closed, a hook function entered the shutdown or close method of the Consumer. So I went to the Consumer's implementation class DefaultMQPushConsumerImpl and found the shutdown () method.

Entering the shutdown() method, there are some operations to stop the message consumption service, load balancing service, persistent message offset, etc. I see an mqclientfactory The unregisterconsumer () method seems to fit my expectations.

Let's go in and see what MQClientInstance#unregisterConsumer() does?

1. Because MQClientInstance is unique at the JVM level, we need to remove the current consumer from its consumerTable field.
2. Offline the client by locking, unregisterClientWithLock().

public void unregisterConsumer(final String group) {
    // client offline
    this.unregisterClientWithLock(null, group);

Continue down:

// MQClientInstance#unregisterClientWithLock()
private void unregisterClientWithLock(final String producerGroup, final String consumerGroup) {
    try {
        if (this.lockHeartbeat.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                // Offline client, click in
                this.unregisterClient(producerGroup, consumerGroup);
            } catch (Exception e) {
                log.error("unregisterClient exception", e);
            } finally {
        } else {
            log.warn("lock heartBeat, but failed. [{}]", this.clientId);
    } catch (InterruptedException e) {
        log.warn("unregisterClientWithLock exception", e);

// MQClientInstance#unregisterClient()
private void unregisterClient(final String producerGroup, final String consumerGroup) {
    Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, HashMap<Long, String>> entry =;
        String brokerName = entry.getKey();
        HashMap<Long, String> oneTable = entry.getValue();

        if (oneTable != null) {
            for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                String addr = entry1.getValue();
                if (addr != null) {
                    try {
                        // Point entry
                        this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
              "unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
                    } catch (RemotingException e) {
                        log.error("unregister client exception from broker: " + addr, e);
                    } catch (InterruptedException e) {
                        log.error("unregister client exception from broker: " + addr, e);
                    } catch (MQBrokerException e) {
                        log.error("unregister client exception from broker: " + addr, e);

Finally, it will enter MQClientApiImpl#unregisterClient() and call the Broker through netty to inform the Broker that the current Consumer is offline.

If I directly kill -9 or the machine where the service is located suddenly goes down, what should RocketMQ do? I had planned to pass the ostrich algorithm, but I really wanted to know what happened to it. The next day, I began to add log and console output, and search the source code globally according to the log output.

2. Abnormal downtime (directly kill the process)

In the RebalanceImpl#rebalanceByTopic() method, when consumers perform queue load balancing according to the topic, we output the ID s of all consumers subscribing to the current topic on the console:

Start two consumers, Consumer and Consumer2, who are all consumers under the study Consumer Group.

Then stop the Consumer and look at the output of the Consumer 2 console:

From the output of the console, Consumer2 performs load balancing again at the time point of 14:37:07; Next, let's take a look at the Broker's log. What happened at this point in time?

2021-12-26 14:37:07 INFO NettyEventExecutor - NETTY EVENT: remove channel[ClientChannelInfo [channel=[id: 0x78a85d90, L:/ ! R:/], clientId=, language=JAVA, version=373, lastUpdateTimestamp=1640500621935]][] from ProducerManager groupChannelTable, producer group: CLIENT_INNER_PRODUCER
2021-12-26 14:37:07 WARN NettyEventExecutor - NETTY EVENT: remove not active channel[ClientChannelInfo [channel=[id: 0x78a85d90, L:/ ! R:/], clientId=, language=JAVA, version=373, lastUpdateTimestamp=1640500621935]] from ConsumerGroupInfo groupChannelTable, consumer group: study-consumer
2021-12-26 14:37:07 WARN PullMessageThread_13 - The broker's subscription is not latest, group: study-consumer *

From the log, the Broker perceives it immediately when the Consumer goes down.

What if we are not familiar with the source code of RocketMQ. We conduct a global search in the source code project according to the key information output from the log. For example, here we take remove channel [to search in the source code:

Click to enter the ProducerManager#doChannelCloseEvent() method:

But it doesn't seem quite right. We're the Consumer offline. Why did we enter the Producer manager here? Let's simply mention it first. After seeing it, everyone will understand it; Essentially, the entrance of the Producer manager that manages the downline of the Consumer and Producer and the doChannelCloseEvent() of the Consumer manager are in the same place. What if we don't know this?

There are two lines of logs with remove in the Broker's log output. Let's search the NETTY EVENT: remove not active channel [. Note that there is also offline Consumer client information (ClientID = 10.90.66 61@39280 ), this can be matched with the console output when we do load balancing above

Let's come back and continue to look at the search results:

Click to enter the ConsumerGroupInfo#doChannelCloseEvent() method:

Look up to see where it was called; Sure enough, it is in the ConsumerManager, which is the same as the producer manager found above.

On the right track, we continue to look up to see where the ConsumerManager#doChannelCloseEvent() method is called;

ConsumerManager#doChannelCloseEvent() is called in three places, which feels a little wrong; Don't panic if anything happens. Click in first. They are all called in the ClientHousekeppingService class, lines 74, 81 and 88 respectively. The corresponding methods are onChannelClose(), onChannelException(), and onChannelIdle(). From the naming point of view, the doChannelCloseEvent() operation of the Consumer is performed when the Channel is closed, abnormal, and idle.

Let's take onChannelClose() as an example to see where it is called;

In the NettyRemotingAbstract abstract class, there is an internal class, NettyEventExecutor, which is a Runnable implementation class (that is, executing tasks);

1) Internal class of nettyremotengabstract NettyEventExecutor

The run() method of NettyEventExecutor will continuously take out NettyEvent from its internal blocking queue eventQueue, and then enter the corresponding processing logic according to the type of NettyEvent; Corresponding to our example, the type of NettyEvent is Close.

Next, we need to find out where to put the NettyEvent with type Close into the eventQueue

(1) First, let's take a look at where the nettyevent executor starts?
The global search for NettyEventExecutor shows that there are two places where the start() method is called, namely, nettyremotengclient and nettyremotengserver; For consumers, brokers exist as servers, so nettyremoteingserver is what we need.

In the NettyRemotingServer#start() method, NettyEventExecutor#start() is called, and NettyRemotingServer#start() is called when Broker starts. The specific process can be referred to: RocketMQ: in depth analysis of the principle and source code of the Broker startup process.

(2) In the NettyEventExecutor#putNettyEvent() method, the event will be added to the eventQueue;

Its putNettyEvent() method is called in the nettyremotengabstract#putNettyEvent() method of the external class of NettyEventExecutor.

Continue to search putNettyEvent globally and find that the NettyEventType is NettyEventType There are three places to close. We enter nettyremoteingserver in the same way;

Ooh, enter NettyConnectManageHandler#channelInactive();

The channelInactive() looks familiar and feels like Netty's. it is also a rewriting method; See who it rewrites; Gan is the Channel deactivation execution method of Netty.

Finally, find out where the NettyConnectManageHandler is put into the BootStrap of Netty? Well, well, in the NettyRemotingServer#start() method:

private void prepareSharableHandlers() {
    handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
    encoder = new NettyEncoder();
    connectionManageHandler = new NettyConnectManageHandler();
    serverHandler = new NettyServerHandler();

3, Summary

The communication between RocketMQ Broker and Consumer/Producer adopts Netty. Whether the Consumer calls the Broker to go offline normally or the machine goes offline abnormally, the Broker can perceive it in real time through Netty's communication mechanism; Abnormal logoff mainly depends on nety's ChannelInboundHandlerAdapter#channelInactive().

If you don't use Netty for a long time, you forget channelInactive(); I hope this article can help more people learn to look at the source code and analyze problems.

Keywords: Java Back-end message queue RocketMQ MQ

Added by viperdk on Tue, 04 Jan 2022 03:36:25 +0200