Log appending process for source analysis RocketMQ DLedger

In the last article we analyzed in detail Leader Selector for Multiple Copies of Source Analysis RocketMQ DLedger This article will analyze the implementation of log replication in detail.

According to raft protocol, when the entire cluster finishes Leader selection, the primary node in the cluster can accept requests from the client, while the secondary node in the cluster is only responsible for synchronizing data from the primary node, and will not process read and write requests, which is very different from the read and write separation of M-S structure.

On the basis of the previous article, this article will start directly with the Leader processing client request entry, which starts with the handleAppend method of DLedgerServer.

1. Basic process of log replication

Before formally analyzing RocketMQ DLedger's multiple copy replication, let's first look at the Request Protocol fields for client sending logs, whose class diagrams are as follows:

Let's start by explaining what each field means:

  • String group
    The name of the group to which the cluster belongs.
  • String remoteId
    Request destination node ID.
  • String localId
    Node ID.
  • int code
    Request response field, indicating the return response code.
  • String leaderId = null
    Leader Id in cluster.
  • long term
    Current election rounds for the cluster.
  • byte[] body
    Data to be sent.

The handleAppend method with a DLedgerServer entry for request processing of the log.


PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
reConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);

Step1: First verify the validity of the request:

  • If the requested node ID is not the current processing node, an exception is thrown.
  • If the requested cluster is not the cluster of the current node, an exception is thrown.
  • If the current node is not the primary node, an exception is thrown.


long currTerm = memberState.currTerm();
if (dLedgerEntryPusher.isPendingFull(currTerm)) {  // @1
    AppendEntryResponse appendEntryResponse = new AppendEntryResponse();
    return AppendFuture.newCompletedFuture(-1, appendEntryResponse);
} else {   // @2
    DLedgerEntry dLedgerEntry = new DLedgerEntry();
    DLedgerEntry resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);
    return dLedgerEntryPusher.waitAck(resEntry);

Step2: If the preprocessing queue is full, reject the client request and return the LEADER_PENDING_FULL error code; if not, encapsulate the request as a DledgerEntry, call the dLedgerStore method to append the log and synchronize waiting for replication response from the replica node by using the waitAck method of dLedgerEntryPusher And eventually returns the result to the calling method.

  • Code @1: If the push queue for dLedgerEntryPusher is full, an append is returned with the error code LEADER_PENDING_FULL.
  • Code @2: Appends a message to the Leader server and broadcasts it to a slave node, which is considered a failure if no confirmation from the slave node is received within a specified time.

Next, follow these three main points:

  • Determine if the Push queue is full
  • Leader node stores messages
  • Primary node waits for ACK replication from secondary node

1.1 How to determine if the Push queue is full


public boolean isPendingFull(long currTerm) {
    checkTermForPendingMap(currTerm, "isPendingFull");     // @1
    return pendingAppendResponsesByTerm.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum(); // @2

There are two main steps:
Code @1: Check to see if the current polling rounds are in PendingMap, and if not, initialize with a structure of Map < Long/Rounds/, ConcurrentMap>.

Code @2: Check to see if the number of currently waiting results from a node exceeds its maximum number of requests, via maxPendingRequests
Num configuration, which defaults to: 10000.

The above logic is simple, but where does the data in ConcurrentMap come from?We might as well look down.

1.2 Leader node stores data

The data store of the Leader node is mainly implemented by the appendAsLeader method of the DLedgerStore.DLedger implements memory-based and file-based storage implementations respectively. This paper focuses on file-based storage implementations, which are implemented as DLedgerMmapFileStore.

The following focuses on the analysis of the data storage process, whose entry is the appendAsLeader method of the DLedgerMmapFileStore.


PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL);

Step1: First of all, we can decide whether the data can be appended or not, based on the following two main points:

  • Whether the state of the current node is a Leader or not, an exception is thrown.
  • Whether the current disk is full or not depends on the fact that the root directory or data file directory of the DLedger is being used more than the maximum allowed, with a default value of 85%.
ByteBuffer dataBuffer = localEntryBuffer.get();
ByteBuffer indexBuffer = localIndexBuffer.get();

Step2: Get a data and index buffer from the local thread variable.The ByteBuffer used to store data has a fixed capacity of 4M, and the ByteBuffer of the index is the length of two index entries, fixed to 64 bytes.

DLedgerEntryCoder.encode(entry, dataBuffer);
public static void encode(DLedgerEntry entry, ByteBuffer byteBuffer) {
    int size = entry.computSizeInBytes();
    //always put magic on the first position

Step3: Write DLedgerEntry to ByteBuffer, where you can see that each write calls ByteBuffer's clear method and empties the data. From this, you can see that only 4M of data can be stored for each data append.


synchronized (memberState) {
    PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, null);
    // ...omit code

Step4: Lock the state machine and check again if the state of the node is a Leader node.


long nextIndex = ledgerEndIndex + 1;
DLedgerEntryCoder.setIndexTerm(dataBuffer, nextIndex, memberState.currTerm(), CURRENT_MAGIC);

Step5: Set the number of entryIndex and entryTerm (voting rounds) for the current log entry.The magic number, entryIndex, entryTerm and so on are written to bytebuffer.


long prePos = dataFileList.preAppend(dataBuffer.remaining());
PreConditions.check(prePos != -1, DLedgerResponseCode.DISK_ERROR, null);
DLedgerEntryCoder.setPos(dataBuffer, prePos);

Step6: Calculate the starting offset of a new message, follow up on the preAppend implementation of the dataFileList, and write the offset to the bytebuffer in the log.


for (AppendHook writeHook : appendHooks) {
    writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET);

Step7: Execute the hook function.


long dataPos = dataFileList.append(dataBuffer.array(), 0, dataBuffer.remaining());
PreConditions.check(dataPos != -1, DLedgerResponseCode.DISK_ERROR, null);
PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null);

Step8: Append data to pagecache.This method will be described in more detail later.


DLedgerEntryCoder.encodeIndex(dataPos, entrySize, CURRENT_MAGIC, nextIndex, memberState.currTerm(), indexBuffer);
long indexPos = indexFileList.append(indexBuffer.array(), 0, indexBuffer.remaining(), false);
PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);

Step9: Build an entry index and append the index data to the pagecache.


ledgerEndTerm = memberState.currTerm();
if (ledgerBeginIndex == -1) {
    ledgerBeginIndex = ledgerEndIndex;

Step10:ledgerEndeIndex plus one (next entry) serial number.The ledgerEndIndex and ledgerEndTerm of the state machine of the leader node are set.

Leader node data appending is described here and will focus on the implementation details of storage-related methods later.

1.3 Primary node waiting for ACK replication from secondary node

Its implementation entry is the waitAck method of dLedgerEntryPusher.


public CompletableFuture<AppendEntryResponse> waitAck(DLedgerEntry entry) {
    updatePeerWaterMark(entry.getTerm(), memberState.getSelfId(), entry.getIndex());    // @1
    if (memberState.getPeerMap().size() == 1) {                                                                  // @2
        AppendEntryResponse response = new AppendEntryResponse();
        return AppendFuture.newCompletedFuture(entry.getPos(), response);
    } else {
        checkTermForPendingMap(entry.getTerm(), "waitAck");                                            
        AppendFuture<AppendEntryResponse> future = new AppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs()); // @3
        CompletableFuture<AppendEntryResponse> old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future);     // @4
        if (old != null) {
            logger.warn("[MONITOR] get old wait at index={}", entry.getIndex());
        wakeUpDispatchers();                                       // @5
        return future;

Code @1: Update the push level of the current node.
Code @2: If the number of nodes in the cluster is 1, no forwarding is required and a successful result is returned directly.
Code @3: Build an append to respond to Future and set the timeout, defaulting to 2500 ms, which can be changed by the maxWaitAckTimeMs configuration.
Code @4: Put the built Future in the waiting result set.
Code @5: Wake up the Entry forwarding thread, which push es data from the primary node to the slave nodes.

Next, the key points are explained separately.

1.3.1 updatePeerWaterMark method


private void updatePeerWaterMark(long term, String peerId, long index) {    // Code@1
    synchronized (peerWaterMarksByTerm) { 
       checkTermForWaterMark(term, "updatePeerWaterMark");                     // Code@2
        if (peerWaterMarksByTerm.get(term).get(peerId) < index) {                   // Code@3
            peerWaterMarksByTerm.get(term).put(peerId, index);

Code @1: Start with a brief description of the two parameters of this method:

  • long term
    Current voting rounds.
  • String peerId
    ID of the current node.
  • long index
    The sequence number of the currently appended data.

Code @2: Initialize the peerWaterMarksByTerm data structure, resulting in < Long / term */, Map < String / peerId */, Long /** entry index */>.

Code @3: Update if the index stored by peerWaterMarksByTerm is less than the index of the current data.

1.3.2 wakeUpDispatchers Details


public void wakeUpDispatchers() {
    for (EntryDispatcher dispatcher : dispatcherMap.values()) {

This method mainly traverses the transponder and wakes it up.The key to this approach is EntryDispatcher, so let's take a look at the initialization of this collection before going into detail.

DLedgerEntryPusher construction method

for (String peer : memberState.getPeerMap().keySet()) {
    if (!peer.equals(memberState.getSelfId())) {
        dispatcherMap.put(peer, new EntryDispatcher(peer, logger));

An EntryDispatcher object was originally created for each slave node when the DLedgerEntryPusher was built.

Obviously, log replication is done by DLedgerEntryPusher.For space reasons, this section will continue in the next article.

The storage-related implementations were not analyzed in detail when Leader appended the logs above. For the sake of completing the knowledge system, let's analyze the core implementations.

2. Log Storage Implementation Details

This section focuses on the preAppend and append methods of the MmapFileList.

The design of the storage section can be found in my blog: Source Code Analysis RocketMQ DLedger Multi-copy Storage Implementation , MmapFileList pairs MapdFileQueue labeled RocketMQ.

Detailed preAppend for 2.1 MmapFileList

This method eventually calls the preAppend method with two parameters, so let's look directly at the preAppend method with two parameters.


public long preAppend(int len, boolean useBlank) {                // @1
    MmapFile mappedFile = getLastMappedFile();                   // @2 start
    if (null == mappedFile || mappedFile.isFull()) {
        mappedFile = getLastMappedFile(0);
    if (null == mappedFile) {
        logger.error("Create mapped file for {}", storePath);
        return -1;
    }                                                                                            // @2 end
    int blank = useBlank ? MIN_BLANK_LEN : 0;
    if (len + blank > mappedFile.getFileSize() - mappedFile.getWrotePosition()) {   // @3
        if (blank < MIN_BLANK_LEN) {
            logger.error("Blank {} should ge {}", blank, MIN_BLANK_LEN);
            return -1;
        } else {
            ByteBuffer byteBuffer = ByteBuffer.allocate(mappedFile.getFileSize() - mappedFile.getWrotePosition());     // @4
            byteBuffer.putInt(BLANK_MAGIC_CODE);                                                                                                      // @5
            byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition());                                               // @6
            if (mappedFile.appendMessage(byteBuffer.array())) {                                                                                     // @7
                //need to set the wrote position
            } else {
                logger.error("Append blank error for {}", storePath);
                return -1;
            mappedFile = getLastMappedFile(0);
            if (null == mappedFile) {
                logger.error("Create mapped file for {}", storePath);
                return -1;
    return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();// @8

Code @1: First introduce the meaning of its parameters:

  • int len requires the length of the request.
  • Whether boolean useBlank needs to be populated or not, defaults to true.

Code @2: Get the last file, which is the file you are currently writing.

Code @3: Logic to process if the requested resource exceeds the writable section of the current file.Code @4-@7 is all its processing logic.

Code @4: Request a bytebuffer of the size of the bytes remaining in the current file.

Code @5: Write magic number first.

Code @6: Write byte length equal to the total remaining size of the current file.

Code @7: Write empty bytes, code @4-@7 is meant to write an empty Entry filled with magic numbers and size for easy parsing.

Code @8: If the current file is large enough to hold the log to be written, return its physical offset directly.

From the above code interpretation, it is easy to see that this method returns the starting physical offset to be written to the log.

Appnd Details for 2.2 MmapFileList

The append method for the four parameters is invoked with the following code:

public long append(byte[] data, int pos, int len, boolean useBlank) {  // @1
    if (preAppend(len, useBlank) == -1) {
        return -1;
    MmapFile mappedFile = getLastMappedFile();                               // @2
    long currPosition = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();   // @3
    if (!mappedFile.appendMessage(data, pos, len)) {            // @4
        logger.error("Append error for {}", storePath);
        return -1;
    return currPosition;

Code @1: Let's start with the parameters:

  • byte[] data
    The data to be written, which is the log to be appended.
  • int pos
    From where in the data byte array to start reading.
  • int len
    The number of bytes to be written.
  • boolean useBlank
    Whether to use padding or not, defaults to true.

Code @2: Get the last file, which is the current writable file.

Code @3: Get the current write pointer.

Code @4: Append message.

Finally, let's look at appendMessage, the specific message append implementation logic.


public boolean appendMessage(final byte[] data, final int offset, final int length) {
    int currentPos = this.wrotePosition.get();

    if ((currentPos + length) <= this.fileSize) {
        ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); // @1
        byteBuffer.put(data, offset, length);
        return true;
    return false;

This method I would like to highlight that the way to write is mappedByteBuffer, which is created by the map method of FileChannel, which we often refer to as PageCache, where message appending is first written to the pageCache.

This article details the first two steps the Leader node takes to process client message append requests, that is, to determine if the Push queue is full and the Leader node stores messages.Considering the length of the page, data synchronization for each node will be detailed in the next article.

Before moving on to the next article, let's think about the following questions:

  1. If the primary node is appended successfully (written to PageCache), but the synchronization process from the primary node fails or the primary node is down, how does the data in the cluster ensure consistency?

Recommended reading: Source analysis in the RocketMQ DLedger series series.
1,RocketMQ Multi-Copy Preamble: A Preliminary Exploration of raft Protocol
2,Leader Selector for Multiple Copies of Source Analysis RocketMQ DLedger
3,Source Code Analysis RocketMQ DLedger Multi-copy Storage Implementation

The original was published from 2019-09-15
Author: Dingwei, author of Inside RocketMQ Technology.
This article is from Middleware Circle of Interest , learn more about it and pay attention to it Middleware Circle of Interest.

Keywords: Java less

Added by fat creative on Tue, 17 Sep 2019 05:35:03 +0300