This paper mainly studies the HAClient of rocketmq
class HAClient extends ServiceThread { private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4; private final AtomicReference<String> masterAddress = new AtomicReference<>(); private final ByteBuffer reportOffset = ByteBuffer.allocate(8); private SocketChannel socketChannel; private Selector selector; private long lastWriteTimestamp = System.currentTimeMillis(); private long currentReportedOffset = 0; private int dispatchPosition = 0; private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); public HAClient() throws IOException { this.selector = RemotingUtil.openSelector(); } //...... @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { if (this.connectMaster()) { if (this.isTimeToReportOffset()) { boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); if (!result) { this.closeMaster(); } } this.selector.select(1000); boolean ok = this.processReadEvent(); if (!ok) { this.closeMaster(); } if (!reportSlaveMaxOffsetPlus()) { continue; } long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig() .getHaHousekeepingInterval()) { log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress + "] expired, " + interval); this.closeMaster(); log.warn("HAClient, master not response some time, so close connection"); } } else { this.waitForRunning(1000 * 5); } } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); this.waitForRunning(1000 * 5); } } log.info(this.getServiceName() + " service end"); } @Override public String getServiceName() { return HAClient.class.getSimpleName(); } //...... private boolean processReadEvent() { int readSizeZeroTimes = 0; while (this.byteBufferRead.hasRemaining()) { try { int readSize = this.socketChannel.read(this.byteBufferRead); if (readSize > 0) { readSizeZeroTimes = 0; boolean result = this.dispatchReadRequest(); if (!result) { log.error("HAClient, dispatchReadRequest error"); return false; } } else if (readSize == 0) { if (++readSizeZeroTimes >= 3) { break; } } else { log.info("HAClient, processReadEvent read socket < 0"); return false; } } catch (IOException e) { log.info("HAClient, processReadEvent read socket exception", e); return false; } } return true; } //...... }
- HAClient inherits ServiceThread, its run method uses isStopped as false to carry out while loop, and then uses connectMaster method to determine whether master address is connected or not. If not, waitForRunning(1000 * 5) is executed; after connecting master, istimetorepoteoffset is determined, that is, the difference between current time and lastWriteTimestamp is determined, if the value is greater than defaultmessagestore.getmessagest Oreconfig(). Gethasendheartbeatinterval(), it will return true; finally, execute processReadEvent; processReadEvent will execute dispatchReadRequest on the premise of byteBufferRead.hasRemaining()
class HAClient extends ServiceThread { //...... private boolean dispatchReadRequest() { final int msgHeaderSize = 8 + 4; // phyoffset + size int readSocketPos = this.byteBufferRead.position(); while (true) { int diff = this.byteBufferRead.position() - this.dispatchPosition; if (diff >= msgHeaderSize) { long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition); int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8); long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); if (slavePhyOffset != 0) { if (slavePhyOffset != masterPhyOffset) { log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + slavePhyOffset + " MASTER: " + masterPhyOffset); return false; } } if (diff >= (msgHeaderSize + bodySize)) { byte[] bodyData = new byte[bodySize]; this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize); this.byteBufferRead.get(bodyData); HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); this.byteBufferRead.position(readSocketPos); this.dispatchPosition += msgHeaderSize + bodySize; if (!reportSlaveMaxOffsetPlus()) { return false; } continue; } } if (!this.byteBufferRead.hasRemaining()) { this.reallocateByteBuffer(); } break; } return true; } //...... }
- dispatchReadRequest will judge diff > = (msgheadersize + bodysize). If it is set, execute defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData)
- HAClient inherits ServiceThread, its run method uses isStopped as false to carry out while loop, and then uses connectMaster method to determine whether master address is connected or not. If not, waitForRunning(1000 * 5) is executed; after connecting master, istimetorepoteoffset is determined, that is, the difference between current time and lastWriteTimestamp is determined, if the value is greater than defaultmessagestore.getmessagest Oreconfig(). Gethasendheartbeatinterval(), it will return true; finally, execute processReadEvent; processReadEvent will execute dispatchReadRequest on the premise of byteBufferRead.hasRemaining()