RocketMQ4.9.1 source code analysis - (HA module) Master Slave Slave report and message processing

Then the last one RocketMQv4.9.1 source code analysis - HA Master-slave Master reading and writing processing After answering the questions related to Master, this article continues to look at the code around some questions related to Slave.

For slave, we have the following doubts:

  1. How can slave obtain the routing information of the master
  2. How does slave report offset to master
  3. How does slave handle data synchronized by the master

In the overall class diagram layout, the code related to slave is in HAClient class

HAClient

HAClient is the core of Slave processing, which includes three parts:

  1. The slave establishes a connection with the master
  2. slave reports the synchronization progress to the master
  3. slave receives and processes the master's synchronization data

HAClient startup

As mentioned earlier, the trigger time of HAClient startup is in the method of HAService startup, which is in store / SRC / main / Java / org / Apache / rocketmq / store / HA / HAService Start().

// HAService startup
public void start() throws Exception {
    this.acceptSocketService.beginAccept();
    this.acceptSocketService.start();
    this.groupTransferService.start();
    this.haClient.start();
}
Copy code

The startup code of HAClient is in store / SRC / main / Java / org / Apache / rocketmq / store / HA / haservice $HAClient Run() path.

Three of the core steps are marked in the code comments

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // Step 1: connect to the master
            if (this.connectMaster()) {
                // Step 2: if the time interval from the last report reaches the maximum waiting time, execute a report immediately
                if (this.isTimeToReportOffset()) {
                    // Report slave offset
                    boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                    if (!result) {
                        this.closeMaster();
                    }
                }
                // Check for read events
                this.selector.select(1000);
                //Step 3: process the messages returned by the master
                boolean ok = this.processReadEvent();
                if (!ok) {
                    this.closeMaster();
                }
                // After processing the read event, if the slave offset is updated, you need to send a new slave offset again
                if (!reportSlaveMaxOffsetPlus()) {
                    continue;
                }

                long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                    log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress + "] expired, " + interval);
                    this.closeMaster();
                    log.warn("HAClient, master not response some time, so close connection");
                }
            } else {
                this.waitForRunning(1000 * 5);
            }
        } catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.waitForRunning(1000 * 5);
        }
    }

    log.info(this.getServiceName() + " service end");
}
Copy code

Slave connection Master

The purpose of the connectMaster() method is to connect with the master.

// Master address (the address of the master is configured in the configuration file)
private final AtomicReference<String> masterAddress = new AtomicReference<>();

private boolean connectMaster() throws ClosedChannelException {
    if (null == socketChannel) {
        String addr = this.masterAddress.get();
        if (addr != null) {
            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) {
                    // Read event, which is used to listen for the return message of the master
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }
        // Set as the offset of the current commitlog
        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
        this.lastWriteTimestamp = System.currentTimeMillis();
    }

    return this.socketChannel != null;
}
Copy code

One thing to pay attention to is the currentReportedOffset field. The purpose of this field is to represent the synchronized progress of the current slave. The value of this field is also used in subsequent reporting to the master. During initialization, it is directly set to the maximum offset of the commitlog file. If there is no commitlog file, it is 0.

Slave report offset

// Step 2: if the time interval from the last report reaches the maximum waiting time, execute a report immediately
if (this.isTimeToReportOffset()) {
    // Report slave offset
    boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
    if (!result) {
        this.closeMaster();
    }
}
Copy code

isTimeToReportOffset() is used to judge whether the time difference between the last report and the current time is greater than the maximum waiting interval (5s by default), which means that even if no message is received from the master in 5s, slave will send a report request to the master, which is relative to a heartbeat packet.

Take a look at the reportSlaveMaxOffset() method:

private boolean reportSlaveMaxOffset(final long maxOffset) {
    // The write position is set to 0
    this.reportOffset.position(0);
    // The writable length is 8 bytes
    this.reportOffset.limit(8);
    // The data content is the current offset of the slave
    this.reportOffset.putLong(maxOffset);
    // Switch from write mode to write mode
    // Set the read position to 0
    this.reportOffset.position(0);
    // The readable length is 8 bytes
    this.reportOffset.limit(8);

    for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
        try {
            // Write data to channel
            this.socketChannel.write(this.reportOffset);
        } catch (IOException e) {
            log.error(this.getServiceName() + "reportSlaveMaxOffset this.socketChannel.write exception", e);
            return false;
        }
    }

    lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
    return !this.reportOffset.hasRemaining();
}
Copy code

You can know that the data packet content of the slave report request is very simple, that is, an offset data of 8 bytes.

There is a detail in the code here. rocketmq does not use the flip() method when switching from write mode to read mode, but manually sets the position and limit. This is because NIO is a non blocking IO, and the write method does not necessarily write all the data of ByteBuffer at one time.

Slave process synchronization data

In step 3, calling processReadEvent() to process the data returned by master, before we look at the code, we already know that the data returned by master is not synchronized. Then what is slave going to do? Of course, this unsynchronized data is saved to the local commitlog file.

private boolean processReadEvent() {
    // Number of consecutive reads with data size 0
    int readSizeZeroTimes = 0;
    // Keep reading the data in the buffer until there is nothing left
    while (this.byteBufferRead.hasRemaining()) {
        try {
            int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                readSizeZeroTimes = 0;
                boolean result = this.dispatchReadRequest();
                if (!result) {
                    log.error("HAClient, dispatchReadRequest error");
                    return false;
                }
            } else if (readSize == 0) {
                // If it is empty for three consecutive times, the method will jump out. What is the function here?
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                log.info("HAClient, processReadEvent read socket < 0");
                return false;
            }
        } catch (IOException e) {
            log.info("HAClient, processReadEvent read socket exception", e);
            return false;
        }
    }

    return true;
}
Copy code

The dispatchReadRequest() method is called in the code to process the request. See this method below:

private boolean dispatchReadRequest() {
    final int msgHeaderSize = 8 + 4; // phyoffset + size

    while (true) {
        int diff = this.byteBufferRead.position() - this.dispatchPosition;
        if (diff >= msgHeaderSize) {
            // master commitlog offset
            long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
            // Message size
            int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
            // Local commitlog offset
            long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
            if (slavePhyOffset != 0) {
                // If the slave offset is different from the master offset, there is a problem in the data synchronization process and the synchronization will not continue.
                if (slavePhyOffset != masterPhyOffset) {
                    log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + slavePhyOffset + " MASTER: " + masterPhyOffset);
                    return false;
                }
            }

            // If there is enough space left, append the message to the commitlog
            if (diff >= (msgHeaderSize + bodySize)) {
                // Message array
                byte[] bodyData = byteBufferRead.array();
                // Informative
                int dataStart = this.dispatchPosition + msgHeaderSize;
                // Add data to the local commitlog
                HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData, dataStart, bodySize);
                this.dispatchPosition += msgHeaderSize + bodySize;

                if (!reportSlaveMaxOffsetPlus()) {
                    return false;
                }

                continue;
            }
        }

        if (!this.byteBufferRead.hasRemaining()) {
            this.reallocateByteBuffer();
        }

        break;
    }

    return true;
}
Copy code

The overall logic can be divided into two parts. The first part is to parse the request package and obtain the message data. The second part is to write the message data into the commitlog file.

These two parts of the code have been written very clearly and are relatively easy to understand, so I won't repeat them.

Keywords: Java Spring Back-end Programmer

Added by john_bboy7 on Wed, 26 Jan 2022 07:44:04 +0200