[elasticsearch source code recovery process] es 7.7

Replica fragmentation request process, function entry: industriesclusterstateservice createOrUpdateShards


    private void createOrUpdateShards(final ClusterState state) {
        RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
        if (localRoutingNode == null) {

        DiscoveryNodes nodes = state.nodes();
        RoutingTable routingTable = state.routingTable();

        for (final ShardRouting shardRouting : localRoutingNode) { 
        //Judge whether the local node is in routingNodes. If it is, it indicates that the local node needs to be created or updated in pieces. Otherwise, skip
            ShardId shardId = shardRouting.shardId();
            if (failedShardsCache.containsKey(shardId) == false) { //
                AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardId.getIndex());
                assert indexService != null : "index " + shardId.getIndex() + " should have been created by createIndices";//Probe indexService is not empty
                Shard shard = indexService.getShardOrNull(shardId.id());
                if (shard == null) {//shard does not exist. Create??? ----- >
                    assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards"; //Detect whether the shard status is INITIALIZING
                    createShard(nodes, routingTable, shardRouting, state);//Replica recovery portal
                } else {//shard has an update
                    updateShard(nodes, shardRouting, shard, routingTable, state);

IndicesClusterStateService. The createshard function judges the type of shardRouting. If the recovery type is PEER, it looks for the source node and calls indicesservice createShard.

The shardRouting recovery type can be selected as:

EMPTY_STORE: recovery from an empty store From empty store recovery
EXISTING_STORE: recovery from an existing store From existing store recovery
PEER: recovery from a primary on another node Recover from primary on other nodes---This process is followed for replica fragmentation
SNAPSHOT: recovery from a snapshot Restore from snapshot
LOCAL_SHARDS: recovery from other shards of another index on the same node From another of this node index Other shard recovery

The main partition mainly recovers itself from the Translog. The partition from flush to disk has not been executed, and can be rebuilt from the tanslog

The copy goes in pieces, peer,

Why peer?

1) If the primary and secondary partitions start to recover at the same time, choose the primary partition? So just let the copy wait in pieces.. Wait for the main partition to recover and compare with the main partition, so go peer?

2) Only peer can compare with primary?

How to judge the master and vice? Local records?


    private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {
        assert shardRouting.initializing() : "only allow shard creation for initializing shard but was " + shardRouting;

        DiscoveryNode sourceNode = null;
        if (shardRouting.recoverySource().getType() == Type.PEER)  { //Judgment type
            sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting); //Find source node
            if (sourceNode == null) {
                logger.trace("ignoring initializing shard {} - no source node can be found.", shardRouting.shardId());

        try {
            final long primaryTerm = state.metaData().index(shardRouting.index()).primaryTerm(shardRouting.id());
            logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
            RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
            indicesService.createShard(   //Function entry
                    new RecoveryListener(shardRouting, primaryTerm),  
                           //Callback (finishes or failures)
                    repositoriesService, //service responsible for snapshot/restore
                    failedShardHandler,  //callback of shard failures
                    globalCheckpointSyncer,  //shard synchronizes the callback of the global checkpoint
                    retentionLeaseSyncer);  //callback for shard syce lease
        } catch (Exception e) {
            failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);

Find source node:

The source node is determined in two ways. If the current shard itself is not the primary shard, the source node is the node of the primary shard. Otherwise, if the current shard is being relocated (from other nodes to this node), the source node is the source node of the data relocation. After getting the source node, call IndicesService.. CreateShard, calling the method IndexShard. in this method. Startrecovery starts the recovery.


    private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, RoutingTable routingTable, DiscoveryNodes nodes,
                                                               ShardRouting shardRouting) {
        DiscoveryNode sourceNode = null;
        if (!shardRouting.primary()) { //If the shard itself is not a primary shard??? ---- >
            ShardRouting primary = routingTable.shardRoutingTable(shardRouting.shardId()).primaryShard();
            // You can only restore from the primary node in the started state. Otherwise, continue polling,
            if (primary.active()) { //Judge the status of the primary partition - the primary partition is restored first, and the replica partition is waiting
                sourceNode = nodes.get(primary.currentNodeId()); // Find the node where the primary shard is located
                if (sourceNode == null) {
                    logger.trace("can't find replica source node because primary shard {} is assigned to an unknown node.", primary);
            } else {
                logger.trace("can't find replica source node because primary shard {} is not active.", primary);
        } else if (shardRouting.relocatingNodeId() != null) { //If you are moving
            sourceNode = nodes.get(shardRouting.relocatingNodeId());
            if (sourceNode == null) {
                logger.trace("can't find relocation source node for shard {} because it is assigned to an unknown node [{}].",
                    shardRouting.shardId(), shardRouting.relocatingNodeId()); // Locate the source node for relocation
        } else {
            throw new IllegalStateException("trying to find source node for peer recovery when routing state means no peer recovery: " +
        return sourceNode;

For tasks whose recovery type is PEER, the real executor of the recovery action is PeerRecoveryTargetService

 final AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService;
 T createShard(
                ShardRouting shardRouting,
                RecoveryState recoveryState,
                PeerRecoveryTargetService recoveryTargetService,     //Actual execution function-------------------------
                PeerRecoveryTargetService.RecoveryListener recoveryListener, //listener
                RepositoriesService repositoriesService,
                Consumer<IndexShard.ShardFailure> onShardFailure,
                Consumer<ShardId> globalCheckpointSyncer,
                RetentionLeaseSyncer retentionLeaseSyncer) throws IOException;

PeerRecoveryTargetService.doRecovery: StartRecoveryRequest is sent to the source node via RPC:


public void doRun() {

private void doRecovery(final long recoveryId) {
        final StartRecoveryRequest request;
        final RecoveryState.Timer timer;
        CancellableThreads cancellableThreads;
        try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {
            if (recoveryRef == null) {
                logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
            final RecoveryTarget recoveryTarget = recoveryRef.target();
            timer = recoveryTarget.state().getTimer();
            cancellableThreads = recoveryTarget.cancellableThreads();
            try {
                assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
                logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
                final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint();
                assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG :
                    "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
                request = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);   // Wrap information such as metadata snapshot into request------------------------
            } catch (final Exception e) {
                // this will be logged as warning later on...
                logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
                    new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true);
       //..................... Several lines are omitted here

        try {
            logger.trace("{} starting recovery from {}", request.shardId(), request.sourceNode());
            cancellableThreads.executeIO(() -> //Send a request to the source node for recovery----------------------------
            //Execution will continue after cancelableThreads to ensure that any blocking call is interrupted in case of transmission delay caused by the network or other conditions. It is not clean after moving asynchronous execution, but missed requests are more difficult to troubleshoot and unacceptable.
                transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,
                    new TransportResponseHandler<RecoveryResponse>() {
                        public void handleResponse(RecoveryResponse recoveryResponse) {
                            final TimeValue recoveryTime = new TimeValue(timer.time());
                            // do this through ongoing recoveries to remove it from the collection
                            //.......... Several lines are omitted here

                        public void handleException(TransportException e) {

                        public String executor() {
                            // we do some heavy work like refreshes in the response so fork off to the generic threadpool
                            return ThreadPool.Names.GENERIC;

                        public RecoveryResponse read(StreamInput in) throws IOException {
                            return new RecoveryResponse(in);
        } catch (CancellableThreads.ExecutionCancelledException e) {
            logger.trace("recovery cancelled", e);
        } catch (Exception e) {

Get the metadata snapshot of the shard, which contains the segment information of the shard, such as syncid, checksum, doc number, etc., and then encapsulate it as StartRecoveryRequest


    public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, DiscoveryNode localNode,
                                                               RecoveryTarget recoveryTarget, long startingSeqNo) {
        final StartRecoveryRequest request;
        logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());

        Store.MetadataSnapshot metadataSnapshot;
        try {
            metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
            //Get the metadata snapshot of the shard. The structure contains the segment information of the shard, such as syncid, checksum, doc number, etc. to ensure that the current translog is consistent with Lucene, otherwise Lucene index has to be discarded---------------------
            try {
                final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY); //Get TranslogUUID
                final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);  //Get the global checkpoint through the translog location and the desired translog UUID
                assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint; //Judge whether the global checkpoint + 1 is greater than startingSeqNo
            } catch (IOException | TranslogCorruptedException e) {
                logger.warn(new ParameterizedMessage("error while reading global checkpoint from translog, " +
                    "resetting the starting sequence number from {} to unassigned and recovering as if there are none", startingSeqNo), e);
                metadataSnapshot = Store.MetadataSnapshot.EMPTY;
                startingSeqNo = UNASSIGNED_SEQ_NO;
        } catch (final org.apache.lucene.index.IndexNotFoundException e) {
            // happens on an empty folder. no need to log
            assert startingSeqNo == UNASSIGNED_SEQ_NO : startingSeqNo;
            logger.trace("{} shard folder empty, recovering all files", recoveryTarget);
            metadataSnapshot = Store.MetadataSnapshot.EMPTY;
        } catch (final IOException e) {
            if (startingSeqNo != UNASSIGNED_SEQ_NO) {
                logger.warn(new ParameterizedMessage("error while listing local files, resetting the starting sequence number from {} " +
                    "to unassigned and recovering as if there are none", startingSeqNo), e);
                startingSeqNo = UNASSIGNED_SEQ_NO;
            } else {
                logger.warn("error while listing local files, recovering as if there are none", e);
            metadataSnapshot = Store.MetadataSnapshot.EMPTY;
        logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());
        request = new StartRecoveryRequest( //Create recovery quest-------------------
        return request;

[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-Ax0Deoqi-1642672800534)(/Users/changjing/Desktop / screenshot 2022-01-20, 1.07.15 p.m. PNG)]

The entry for the source node to receive the recovery request: the source node controls the recovery and transfers the control right


    class StartRecoveryTransportRequestHandler implements TransportRequestHandler<StartRecoveryRequest> {
        public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel, Task task) throws Exception {
            recover(request, new ChannelActionListener<>(channel, Actions.START_RECOVERY, request)); //After receiving the request, the source node will call the recovery entry function recover

The recover method gets the shard from request and constructs the RecoverySourceHandler object, then calls handler.. Recovertotarget enters the recovered execution body:


    private void recover(StartRecoveryRequest request, ActionListener<RecoveryResponse> listener) {
        final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); //Get indexService
        final IndexShard shard = indexService.getShard(request.shardId().id()); //Get shardid s

        final ShardRouting routingEntry = shard.routingEntry(); //Start routingEntry

       //Judge whether the routingEntry is primary, whether the routingEntry is active, and whether an error is thrown
        if (routingEntry.primary() == false || routingEntry.active() == false) {
            throw new DelayRecoveryException("source shard [" + routingEntry + "] is not an active primary");
        if (request.isPrimaryRelocation() && (routingEntry.relocating() == false ||
            routingEntry.relocatingNodeId().equals(request.targetNode().getId()) == false)) {
            logger.debug("delaying recovery of {} as source shard is not marked yet as relocating to {}",
                request.shardId(), request.targetNode());
            throw new DelayRecoveryException("source shard is not marked yet as relocating to [" + request.targetNode() + "]");
       //Get the shard according to the request and construct the RecoverySourceHandler object
        RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard);
        logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(),
        handler.recoverToTarget(ActionListener.runAfter(listener, () -> ongoingRecoveries.remove(shard, handler)));  //Recovery entry

Recovery process:


 public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
        final Closeable releaseResources = () -> IOUtils.close(resources);
        final ActionListener<RecoveryResponse> wrappedListener = ActionListener.notifyOnce(listener);
        try {
            cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {
                final RuntimeException e;
                if (shard.state() == IndexShardState.CLOSED) { //Check whether the shard is close d
                    e = new IndexShardClosedException(shard.shardId(), "shard is closed and recovery was canceled reason [" + reason + "]");
                } else {
                    e = new CancellableThreads.ExecutionCancelledException("recovery was canceled reason [" + reason + "]");
                if (beforeCancelEx != null) {
                IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
                throw e;
            final Consumer<Exception> onFailure = e -> {
                assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[onFailure]");
                IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));

            final boolean softDeletesEnabled = shard.indexSettings().isSoftDeleteEnabled();
            final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();
            runUnderPrimaryPermit(() -> {  
                final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
                ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());
                if (targetShardRouting == null) {
                    logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(),
                    throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
                assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
            }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ",
                shard, cancellableThreads, logger);
            final Engine.HistorySource historySource;
            if (softDeletesEnabled && (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null)) {
                historySource = Engine.HistorySource.INDEX;
            } else {
                historySource = Engine.HistorySource.TRANSLOG;
            final Closeable retentionLock = shard.acquireHistoryRetentionLock(historySource);
            resources.add(retentionLock);  //Add retention lock
            final long startingSeqNo;
            final boolean isSequenceNumberBasedRecovery
                = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
                && isTargetSameHistory()
                && shard.hasCompleteHistoryOperations("peer-recovery", historySource, request.startingSeqNo())
                && (historySource == Engine.HistorySource.TRANSLOG ||
                   (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo()));
             //Even if there is a complete lease, the calculation of isSequenceNumberBasedRecovery will check whether there is a complete historical operation record      
Because from 7.4 Or rolling upgrade of earlier versions to create some initially unsatisfactory leases. In some cases, the leases are not held as we hope. Soft deletion is very convenient, but if there is no complete history, serial number based recovery will be a disaster

            if (isSequenceNumberBasedRecovery && softDeletesEnabled && retentionLeaseRef.get() != null) {
                // The required history already exists in the lease, so there is no need to add a separate retention lock
                logger.trace("history is retained by {}", retentionLeaseRef.get());
            } else {
               //When calling shard All history before hascompletehistoryoperations() needs to be locked
                // Before using safe commit, this ensures that all operations can be confirmed in saft commit
                //During the recovery period, the local checkpoint will be saved
                logger.trace("history is retained by retention lock");

            final StepListener<SendFileResult> sendFileStep = new StepListener<>();
            final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
            final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
            final StepListener<Void> finalizeStep = new StepListener<>();

            if (isSequenceNumberBasedRecovery) {
                logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
                startingSeqNo = request.startingSeqNo();
                if (retentionLeaseRef.get() == null) {
                    createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, ignored -> SendFileResult.EMPTY));
                } else {
            } else {
                final Engine.IndexCommitRef safeCommitRef;
                try {
                    safeCommitRef = shard.acquireSafeIndexCommit();
                } catch (final Exception e) {
                    throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
                // Try and copy enough operations to the recovering peer so that if it is promoted to primary then it has a chance of being
                // able to recover other replicas using operations-based recoveries. If we are not using retention leases then we
                // conservatively copy all available operations. If we are using retention leases then "enough operations" is just the
                // operations from the local checkpoint of the safe commit onwards, because when using soft deletes the safe commit retains
                // at least as much history as anything else. The safe commit will often contain all the history retained by the current set
                // of retention leases, but this is not guaranteed: an earlier peer recovery from a different primary might have created a
                // retention lease for some history that this primary already discarded, since we discard history when the global checkpoint
                // advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can
                // always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled
                // down.
                startingSeqNo = softDeletesEnabled
                    ? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L
                    : 0;
                logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);

                try {
                    final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo);
                    final Releasable releaseStore = acquireStore(shard.store());
                    sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {
                        try {
                            IOUtils.close(safeCommitRef, releaseStore);
                        } catch (final IOException ex) {
                            logger.warn("releasing snapshot caused exception", ex);

                    final StepListener<ReplicationResponse> deleteRetentionLeaseStep = new StepListener<>();
                    runUnderPrimaryPermit(() -> {
                            try {
          // If the target previously had a copy of this shard then a file-based recovery might move its global
          // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a
          // new one later on in the recovery.
                                    new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC,
                                        deleteRetentionLeaseStep, false));
                            } catch (RetentionLeaseNotFoundException e) {
                                logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
                        }, shardId + " removing retention lease for [" + request.targetAllocationId() + "]",
                        shard, cancellableThreads, logger);

                    deleteRetentionLeaseStep.whenComplete(ignored -> {
                        assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");
                        phase1(safeCommitRef.getIndexCommit(), startingSeqNo, () -> estimateNumOps, sendFileStep); //In the first stage, compare syncid and segment, and then get the different part. Take the initiative to push the data to the requester, and delete the lease after completion-------------------
                    }, onFailure);

                } catch (final Exception e) {
                    throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e);
            assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;

            sendFileStep.whenComplete(r -> {
                assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");
                // For a sequence based recovery, the target can keep its local translog
                    shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo), prepareEngineStep);
            }, onFailure);

            prepareEngineStep.whenComplete(prepareEngineTime -> {
                assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]");
                 * add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
                 * This means that any document indexed into the primary after this will be replicated to this replica as well
                 * make sure to do this before sampling the max sequence number in the next step, to ensure that we send
                 * all documents up to maxSeqNo in phase2.
                runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
                    shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);

                final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
                logger.trace("snapshot translog for recovery; current size is [{}]",
                    shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo));
                final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", historySource, startingSeqNo);

                // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
                // are at least as high as the corresponding values on the primary when any of these operations were executed on it.
                final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
                final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
                final RetentionLeases retentionLeases = shard.getRetentionLeases();
                final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetaData().getMappingVersion();
                //The second stage is to send translog-------------------
                phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
                    retentionLeases, mappingVersionOnPrimary, sendSnapshotStep);
                    r -> IOUtils.close(phase2Snapshot),
                    e -> {
                        onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e));

            }, onFailure);

            // Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2
            final long trimAboveSeqNo = startingSeqNo - 1;
            sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure);

            finalizeStep.whenComplete(r -> {
                final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
                final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result();
                final SendFileResult sendFileResult = sendFileStep.result();
                final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
                    sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
                    sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
                    prepareEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
                try {
                } finally {
            }, onFailure);
        } catch (Exception e) {
            IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));

phase1: recover segment file

As can be seen from the above code, the specific logic of phase1 is to first obtain the metadata snapshot of the shard to be recovered to obtain the recoverySourceSyncId, obtain the recoveryTargetSyncId according to the request, and compare the syncids on both sides. If the number of documents on the source and target is the same, if the result is the same, it means that the corresponding segments of the shard of the source and target are the same before the current submission point, Therefore, there is no need to restore the segment file. If the syncids on both sides are different, it means that the segment file is different. You need to find all the different files for recovery. By comparing the differences between recoverySourceMetadata and recoverytargetssnapshot, you can find all the different segment files. The logic is as follows:

 void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
        final Store store = shard.store();
        try {
            StopWatch stopWatch = new StopWatch().start();
            final Store.MetadataSnapshot recoverySourceMetadata;
            try {
                recoverySourceMetadata = store.getMetadata(snapshot);
            } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
                shard.failShard("recovery", ex);
                throw ex;
            for (String name : snapshot.getFileNames()) {  
                final StoreFileMetaData md = recoverySourceMetadata.get(name);
                if (md == null) {
                    logger.info("Snapshot differs from actual index for file: {} meta: {}", name, recoverySourceMetadata.asMap());
                    throw new CorruptIndexException("Snapshot differs from actual index - maybe index was removed metadata has " +
                            recoverySourceMetadata.asMap().size() + " files", name);
            if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) {//You can skip phase1
                final List<String> phase1FileNames = new ArrayList<>();
                final List<Long> phase1FileSizes = new ArrayList<>();
                final List<String> phase1ExistingFileNames = new ArrayList<>();
                final List<Long> phase1ExistingFileSizes = new ArrayList<>();

                // Total size of segment files that are recovered
                long totalSizeInBytes = 0;
                // Total size of segment files that were able to be re-used
                long existingTotalSizeInBytes = 0;

                // Generate a "diff" of all the identical, different, and missing
                // segment files on the target node, using the existing files on
                // the source node
                final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot());
                for (StoreFileMetaData md : diff.identical) {
                    existingTotalSizeInBytes += md.length();
                    if (logger.isTraceEnabled()) {
                        logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," +
                                        " size [{}]", md.name(), md.checksum(), md.length());
                    totalSizeInBytes += md.length();
                List<StoreFileMetaData> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());
                for (StoreFileMetaData md : phase1Files) {
                    if (request.metadataSnapshot().asMap().containsKey(md.name())) {
                        logger.trace("recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]",
                            md.name(), request.metadataSnapshot().asMap().get(md.name()), md);
                    } else {
                        logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name());
                    totalSizeInBytes += md.length();

                logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
                    phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes),
                    phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes));
                final StepListener<Void> sendFileInfoStep = new StepListener<>();
                final StepListener<Void> sendFilesStep = new StepListener<>();
                final StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>();
                final StepListener<Void> cleanFilesStep = new StepListener<>();
                recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,
                        phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep);

                sendFileInfoStep.whenComplete(r ->
                    sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure);

                sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure);

                createRetentionLeaseStep.whenComplete(retentionLease ->
                        final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint();
                        assert retentionLease == null || retentionLease.retainingSequenceNumber() - 1 <= lastKnownGlobalCheckpoint
                            : retentionLease + " vs " + lastKnownGlobalCheckpoint;
                        // Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want
                        // the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica
                        // to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on
                        // the primary, and in these cases the max seqno would be too high to be valid as a global checkpoint.
                        cleanFiles(store, recoverySourceMetadata, translogOps, lastKnownGlobalCheckpoint, cleanFilesStep);

                final long totalSize = totalSizeInBytes;
                final long existingTotalSize = existingTotalSizeInBytes;
                cleanFilesStep.whenComplete(r -> {
                    final TimeValue took = stopWatch.totalTime();
                    logger.trace("recovery [phase1]: took [{}]", took);
                    listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,
                        phase1ExistingFileSizes, existingTotalSize, took));
                }, listener::onFailure);
            } else {
                logger.trace("skipping [phase1] since source and target have identical sync id [{}]", recoverySourceMetadata.getSyncId());

                // but we must still create a retention lease
                final StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>();
                createRetentionLease(startingSeqNo, createRetentionLeaseStep);
                createRetentionLeaseStep.whenComplete(retentionLease -> {
                    final TimeValue took = stopWatch.totalTime();
                    logger.trace("recovery [phase1]: took [{}]", took);
                    listener.onResponse(new SendFileResult(Collections.emptyList(), Collections.emptyList(), 0L, Collections.emptyList(),
                        Collections.emptyList(), 0L, took));
                }, listener::onFailure);

        } catch (Exception e) {
            throw new RecoverFilesRecoveryException(request.shardId(), 0, new ByteSizeValue(0L), e);

Phase 1 can be skipped, and the syncids of the source partition and the target partition are consistent

   boolean canSkipPhase1(Store.MetadataSnapshot source, Store.MetadataSnapshot target) {
        if (source.getSyncId() == null || source.getSyncId().equals(target.getSyncId()) == false) {
            return false;   //Phase 1 can be skipped, and the syncids of the source partition and the target partition are consistent
        if (source.getNumDocs() != target.getNumDocs()) {
            throw new IllegalStateException("try to recover " + request.shardId() + " from primary shard with sync id but number " +
                "of docs differ: " + source.getNumDocs() + " (" + request.sourceNode().getName() + ", primary) vs " + target.getNumDocs()
                + "(" + request.targetNode().getName() + ")");
        SequenceNumbers.CommitInfo sourceSeqNos = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(source.getCommitUserData().entrySet());
        SequenceNumbers.CommitInfo targetSeqNos = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(target.getCommitUserData().entrySet());
        if (sourceSeqNos.localCheckpoint != targetSeqNos.localCheckpoint || targetSeqNos.maxSeqNo != sourceSeqNos.maxSeqNo) {
            final String message = "try to recover " + request.shardId() + " with sync id but " +
                "seq_no stats are mismatched: [" + source.getCommitUserData() + "] vs [" + target.getCommitUserData() + "]";
            assert false : message;
            throw new IllegalStateException(message);
        return true;
    Here will be all segment file It is divided into three categories: identical(Same) different((different) missing(target Missing). Then different and missing of segment files Send to as the files to be recovered in the first stage target node. Finished sending segment files After, the source node will also send a message to the target node to notify the target node to clean up the temporary files, and then send a message to inform the target node to open the engine for receiving translog,It should be noted that these two network communications will call PlainTransportFuture.txGet() Method blocks waiting for the peer to reply. So far, the recovery logic of the first stage is completed.

public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
    final List<StoreFileMetaData> identical = new ArrayList<>();  // Same file 
    final List<StoreFileMetaData> different = new ArrayList<>();  // Different file s
    final List<StoreFileMetaData> missing = new ArrayList<>();   // Missing file
    final Map<String, List<StoreFileMetaData>> perSegment = new HashMap<>();
    final List<StoreFileMetaData> perCommitStoreFiles = new ArrayList<>();
    ... ...
    for (List<StoreFileMetaData> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {
        boolean consistent = true;
        for (StoreFileMetaData meta : segmentFiles) {
            StoreFileMetaData storeFileMetaData = recoveryTargetSnapshot.get(meta.name());
            if (storeFileMetaData == null) {
                consistent = false;
                missing.add(meta); // If the segment does not exist in the target node, it is added to missing
            } else if (storeFileMetaData.isSame(meta) == false) {
                consistent = false;
                different.add(meta); // If they exist but are different, they are added to different
            } else {
                identicalFiles.add(meta);  // Exist and the same
        if (consistent) {
        } else {
            // make sure all files are added - this can happen if only the deletes are different
    RecoveryDiff recoveryDiff = new RecoveryDiff(Collections.unmodifiableList(identical), Collections.unmodifiableList(different), Collections.unmodifiableList(missing));
    return recoveryDiff;

phase2: send translog

The logic of the second stage is relatively simple. You only need to send all the translogs between the translog view and the current time to the source node.

void phase2(
            final long startingSeqNo,
            final long endingSeqNo,
            final Translog.Snapshot snapshot,
            final long maxSeenAutoIdTimestamp,
            final long maxSeqNoOfUpdatesOrDeletes,
            final RetentionLeases retentionLeases,
            final long mappingVersion,
            final ActionListener<SendSnapshotResult> listener) throws IOException {
        if (shard.state() == IndexShardState.CLOSED) {
            throw new IndexShardClosedException(request.shardId());
        logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]");

        final AtomicInteger skippedOps = new AtomicInteger();
        final AtomicInteger totalSentOps = new AtomicInteger();
        final AtomicInteger lastBatchCount = new AtomicInteger(); // used to estimate the count of the subsequent batch.
        final CheckedSupplier<List<Translog.Operation>, IOException> readNextBatch = () -> {
            // We need to synchronized Snapshot#next() because it's called by different threads through sendBatch.
            // Even though those calls are not concurrent, Snapshot#next() uses non-synchronized state and is not multi-thread-compatible.
            synchronized (snapshot) {
                final List<Translog.Operation> ops = lastBatchCount.get() > 0 ? new ArrayList<>(lastBatchCount.get()) : new ArrayList<>();
                long batchSizeInBytes = 0L;
                Translog.Operation operation;
                while ((operation = snapshot.next()) != null) {
                    if (shard.state() == IndexShardState.CLOSED) {
                        throw new IndexShardClosedException(request.shardId());
                    final long seqNo = operation.seqNo();
                    if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
                    batchSizeInBytes += operation.estimateSize();

                    // check if this request is past bytes threshold, and if so, send it off
                    if (batchSizeInBytes >= chunkSizeInBytes) {
                return ops;

        final StopWatch stopWatch = new StopWatch().start();
        final ActionListener<Long> batchedListener = ActionListener.map(listener,
            targetLocalCheckpoint -> {
                assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get()
                    : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
                    snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get());
                final TimeValue tookTime = stopWatch.totalTime();
                logger.trace("recovery [phase2]: took [{}]", tookTime);
                return new SendSnapshotResult(targetLocalCheckpoint, totalSentOps.get(), tookTime);


The target node begins to recover

  • Receive segment

    Corresponding to the first stage of source node recovery in the previous section, the source node sends all different segments to the target node, and the target node will drop the segment file to the disk after receiving it. The write function of segment files is recoverytarget writeFileChunk:

        public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
            try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
                final RecoveryTarget recoveryTarget = recoveryRef.target();
                final RecoveryState.Index indexState = recoveryTarget.state().getIndex();
                if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) {

                RateLimiter rateLimiter = recoverySettings.rateLimiter();
                if (rateLimiter != null) {
                    long bytes = bytesSinceLastPause.addAndGet(request.content().length());
                    if (bytes > rateLimiter.getMinPauseCheckBytes()) {
                        // Time to pause
                        long throttleTimeInNanos = rateLimiter.pause(bytes);
                final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.FILE_CHUNK, request);
                recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(), request.lastChunk(),     //Call writeFileChunk function when receiving information
                    request.totalTranslogOps(), ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));

public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
                               boolean lastChunk, int totalTranslogOps, ActionListener<Void> listener) {
        try {
            multiFileWriter.writeFileChunk(fileMetaData, position, content, lastChunk);
        } catch (Exception e) {

public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk)
        throws IOException {
        assert Transports.assertNotTransportThread("multi_file_writer");
        final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(fileMetaData.name(), name -> new FileChunkWriter());
        writer.writeChunk(new FileChunk(fileMetaData, content, position, lastChunk));

  • Turn on the engine

    After the above process, the target node completes the first step of data tracking. After receiving the segment, the target node opens the engine corresponding to the shard to receive the translog. Note that after the engine is opened, the shard being recovered can be written and deleted (including the request for synchronization of the primary shard and the operation command in the translog). The logic of opening the engine is as follows:

private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists, long maxUnsafeAutoIdTimestamp) throws IOException {
    ... ...
    final EngineConfig.OpenMode openMode;
    if (indexExists == false) {
        openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG;
    } else if (skipTranslogRecovery) {
        openMode = EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
    } else {
        openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
    final EngineConfig config = newEngineConfig(openMode, maxUnsafeAutoIdTimestamp);
    // we disable deletes since we allow for operations to be executed against the shard while recovering
    // but we need to make sure we don't loose deletes until we are done recovering
    config.setEnableGcDeletes(false); // translog is not deleted during recovery
    Engine newEngine = createNewEngine(config); // Create engine
    ... ...
  • Receive and replay translog

    After opening the engine, you can perform corresponding playback actions according to the commands in the translog. The logic of playback is similar to that of normal writing and deletion. Here, you need to restore the operation type and operation data according to the translog, build corresponding data objects according to the operation data, and then call the engine opened in the previous step to perform corresponding operations. The logic is as follows:

 public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, ActionListener<Void> listener) {
        ActionListener.completeWith(listener, () -> {
            indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
            // Persist the global checkpoint.
            if (trimAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
                // We should erase all translog operations above trimAboveSeqNo as we have received either the same or a newer copy
                // from the recovery source in phase2. Rolling a new translog generation is not strictly required here for we won't
                // trim the current generation. It's merely to satisfy the assumption that the current generation does not have any
                // operation that would be trimmed (see TranslogWriter#assertNoSeqAbove). This assumption does not hold for peer
                // recovery because we could have received operations above startingSeqNo from the previous primary terms.
                // the flush or translog generation threshold can be reached after we roll a new translog
            if (hasUncommittedOperations()) {
                indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true));
            return null;

Here, the two phases of replica shard recovery are completed. Since the shard is still in the INITIALIZING state, you need to notify the master node to start the recovered shard:

private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {
    public void onRecoveryDone(RecoveryState state) {
        if (state.getRecoverySource().getType() == Type.SNAPSHOT) {
            SnapshotRecoverySource snapshotRecoverySource = (SnapshotRecoverySource) state.getRecoverySource();
            restoreService.indexShardRestoreCompleted(snapshotRecoverySource.snapshot(), shardRouting.shardId());
        shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);

All processes of shard recovery have been completed.

After the phase 1 phase is completed, the write operation can be processed normally from the partition. At this time, the write from the partition and the trilog playback in the phase 2 phase are executed in parallel. If the trilog playback is slower than the normal write operation, the old data may be written later, resulting in data inconsistency. In order to ensure data consistency, ES compares the currently written version with the lucene document version number when writing. If the current version is smaller, it indicates that it is old data, and the document will not be written to lucene. Relevant codes are as follows:

final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
    plan = IndexingStrategy.skipAsStale(false, index.version());



Keywords: Big Data ElasticSearch search engine

Added by millercj on Fri, 21 Jan 2022 05:54:02 +0200