Then the last one RocketMQv4.9.1 source code analysis - HA Master-slave Master reading and writing processing After answering the questions related to Master, this article continues to look at the code around some questions related to Slave.
For slave, we have the following doubts:
- How can slave obtain the routing information of the master
- How does slave report offset to master
- How does slave handle data synchronized by the master
In the overall class diagram layout, the code related to slave is in HAClient class
HAClient
HAClient is the core of Slave processing, which includes three parts:
- The slave establishes a connection with the master
- slave reports the synchronization progress to the master
- slave receives and processes the master's synchronization data
HAClient startup
As mentioned earlier, the trigger time of HAClient startup is in the method of HAService startup, which is in store / SRC / main / Java / org / Apache / rocketmq / store / HA / HAService Start().
// HAService startup public void start() throws Exception { this.acceptSocketService.beginAccept(); this.acceptSocketService.start(); this.groupTransferService.start(); this.haClient.start(); } Copy code
The startup code of HAClient is in store / SRC / main / Java / org / Apache / rocketmq / store / HA / haservice $HAClient Run() path.
Three of the core steps are marked in the code comments
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // Step 1: connect to the master if (this.connectMaster()) { // Step 2: if the time interval from the last report reaches the maximum waiting time, execute a report immediately if (this.isTimeToReportOffset()) { // Report slave offset boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); if (!result) { this.closeMaster(); } } // Check for read events this.selector.select(1000); //Step 3: process the messages returned by the master boolean ok = this.processReadEvent(); if (!ok) { this.closeMaster(); } // After processing the read event, if the slave offset is updated, you need to send a new slave offset again if (!reportSlaveMaxOffsetPlus()) { continue; } long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) { log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress + "] expired, " + interval); this.closeMaster(); log.warn("HAClient, master not response some time, so close connection"); } } else { this.waitForRunning(1000 * 5); } } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); this.waitForRunning(1000 * 5); } } log.info(this.getServiceName() + " service end"); } Copy code
Slave connection Master
The purpose of the connectMaster() method is to connect with the master.
// Master address (the address of the master is configured in the configuration file) private final AtomicReference<String> masterAddress = new AtomicReference<>(); private boolean connectMaster() throws ClosedChannelException { if (null == socketChannel) { String addr = this.masterAddress.get(); if (addr != null) { SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr); if (socketAddress != null) { this.socketChannel = RemotingUtil.connect(socketAddress); if (this.socketChannel != null) { // Read event, which is used to listen for the return message of the master this.socketChannel.register(this.selector, SelectionKey.OP_READ); } } } // Set as the offset of the current commitlog this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); this.lastWriteTimestamp = System.currentTimeMillis(); } return this.socketChannel != null; } Copy code
One thing to pay attention to is the currentReportedOffset field. The purpose of this field is to represent the synchronized progress of the current slave. The value of this field is also used in subsequent reporting to the master. During initialization, it is directly set to the maximum offset of the commitlog file. If there is no commitlog file, it is 0.
Slave report offset
// Step 2: if the time interval from the last report reaches the maximum waiting time, execute a report immediately if (this.isTimeToReportOffset()) { // Report slave offset boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); if (!result) { this.closeMaster(); } } Copy code
isTimeToReportOffset() is used to judge whether the time difference between the last report and the current time is greater than the maximum waiting interval (5s by default), which means that even if no message is received from the master in 5s, slave will send a report request to the master, which is relative to a heartbeat packet.
Take a look at the reportSlaveMaxOffset() method:
private boolean reportSlaveMaxOffset(final long maxOffset) { // The write position is set to 0 this.reportOffset.position(0); // The writable length is 8 bytes this.reportOffset.limit(8); // The data content is the current offset of the slave this.reportOffset.putLong(maxOffset); // Switch from write mode to write mode // Set the read position to 0 this.reportOffset.position(0); // The readable length is 8 bytes this.reportOffset.limit(8); for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) { try { // Write data to channel this.socketChannel.write(this.reportOffset); } catch (IOException e) { log.error(this.getServiceName() + "reportSlaveMaxOffset this.socketChannel.write exception", e); return false; } } lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now(); return !this.reportOffset.hasRemaining(); } Copy code
You can know that the data packet content of the slave report request is very simple, that is, an offset data of 8 bytes.
There is a detail in the code here. rocketmq does not use the flip() method when switching from write mode to read mode, but manually sets the position and limit. This is because NIO is a non blocking IO, and the write method does not necessarily write all the data of ByteBuffer at one time.
Slave process synchronization data
In step 3, calling processReadEvent() to process the data returned by master, before we look at the code, we already know that the data returned by master is not synchronized. Then what is slave going to do? Of course, this unsynchronized data is saved to the local commitlog file.
private boolean processReadEvent() { // Number of consecutive reads with data size 0 int readSizeZeroTimes = 0; // Keep reading the data in the buffer until there is nothing left while (this.byteBufferRead.hasRemaining()) { try { int readSize = this.socketChannel.read(this.byteBufferRead); if (readSize > 0) { readSizeZeroTimes = 0; boolean result = this.dispatchReadRequest(); if (!result) { log.error("HAClient, dispatchReadRequest error"); return false; } } else if (readSize == 0) { // If it is empty for three consecutive times, the method will jump out. What is the function here? if (++readSizeZeroTimes >= 3) { break; } } else { log.info("HAClient, processReadEvent read socket < 0"); return false; } } catch (IOException e) { log.info("HAClient, processReadEvent read socket exception", e); return false; } } return true; } Copy code
The dispatchReadRequest() method is called in the code to process the request. See this method below:
private boolean dispatchReadRequest() { final int msgHeaderSize = 8 + 4; // phyoffset + size while (true) { int diff = this.byteBufferRead.position() - this.dispatchPosition; if (diff >= msgHeaderSize) { // master commitlog offset long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition); // Message size int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8); // Local commitlog offset long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); if (slavePhyOffset != 0) { // If the slave offset is different from the master offset, there is a problem in the data synchronization process and the synchronization will not continue. if (slavePhyOffset != masterPhyOffset) { log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + slavePhyOffset + " MASTER: " + masterPhyOffset); return false; } } // If there is enough space left, append the message to the commitlog if (diff >= (msgHeaderSize + bodySize)) { // Message array byte[] bodyData = byteBufferRead.array(); // Informative int dataStart = this.dispatchPosition + msgHeaderSize; // Add data to the local commitlog HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData, dataStart, bodySize); this.dispatchPosition += msgHeaderSize + bodySize; if (!reportSlaveMaxOffsetPlus()) { return false; } continue; } } if (!this.byteBufferRead.hasRemaining()) { this.reallocateByteBuffer(); } break; } return true; } Copy code
The overall logic can be divided into two parts. The first part is to parse the request package and obtain the message data. The second part is to write the message data into the commitlog file.
These two parts of the code have been written very clearly and are relatively easy to understand, so I won't repeat them.