[flink] Flink 1.12.2 source code analysis: Task data input

1. General

Reprint: Flink 1.12.2 source code analysis: Task data input


In Task, InputGate is the encapsulation of input, and InputGate is one-to-one corresponding to JobEdge in JobGraph.
In other words, the InputGate actually corresponds to the upstream operator that the Task depends on (including multiple parallel subtasks), and each InputGate consumes one or more resultpartitions.
InputGate is composed of InputChannel, which corresponds to the ExecutionEdge in the ExecutionGraph one by one; In other words, inputchannels and resultsubpartitions are connected one by one, and one InputChannel receives the output of one ResultSubpartition. According to the location of the read ResultSubpartition, InputChannel has two different implementations: LocalInputChannel and RemoteInputChannel.

II InputGate

The input of a Task is abstracted as InputGate, which is composed of InputChannel, which corresponds to the ResultSubpartition that the Task needs to consume one by one.

InputGate consumes one or more partitions of a single generated intermediate result.
Each intermediate result is divided on the parallel subtasks it produces;
Each of these partitions is further divided into one or more sub partitions.

For example, consider a map reduce program in which the map operator generates data and the reduce operator uses the generated data.

/**
 * InputGate Consume one or more partitions of a single generated intermediate result.
 *
 * Each intermediate result is divided on the parallel subtasks it produces;
 *
 * Each of these partitions is further divided into one or more sub partitions.
 *
 * For example, consider a map reduce program in which the map operator generates data and the reduce operator uses the generated data.
 *
 * <pre>{@code
 * +-----+              +---------------------+              +--------+
 * | Map | = produce => | Intermediate Result | <= consume = | Reduce |
 * +-----+              +---------------------+              +--------+
 * }</pre>
 *
 * 
 * 
 * */

When such a program is deployed in parallel, the intermediate results will be divided into parallel subtasks generated by it;
Each of these partitions is further divided into one or more sub partitions.

 * <pre>{@code
 *                            Intermediate result
 *               +-----------------------------------------+
 *               |                      +----------------+ |              +-----------------------+
 * +-------+     | +-------------+  +=> | Subpartition 1 | | <=======+=== | Input Gate | Reduce 1 |
 * | Map 1 | ==> | | Partition 1 | =|   +----------------+ |         |    +-----------------------+
 * +-------+     | +-------------+  +=> | Subpartition 2 | | <==+    |
 *               |                      +----------------+ |    |    | Subpartition request
 *               |                                         |    |    |
 *               |                      +----------------+ |    |    |
 * +-------+     | +-------------+  +=> | Subpartition 1 | | <==+====+
 * | Map 2 | ==> | | Partition 2 | =|   +----------------+ |    |         +-----------------------+
 * +-------+     | +-------------+  +=> | Subpartition 2 | | <==+======== | Input Gate | Reduce 2 |
 *               |                      +----------------+ |              +-----------------------+
 *               +-----------------------------------------+
 * }</pre>
 *
 */

In the above example, two map subtasks generate intermediate results in parallel, resulting in two partitions (partition 1 and partition 2).
Each partition is further divided into two sub partitions -- one for each parallel reduce subtask.
As shown in the figure, each reduce task has an input gate connected to it.
This will provide its input, which will consist of a sub partition of each partition of the intermediate result.

2.1. Interface implemented by inputgate

InputGate is an abstract class that implements three interfaces: pullingasyncdatainput, autocloseable and channelstateholder
Let's focus on it

2.1.1. PullingAsyncDataInput

Interface defines two basic methods of asynchronous and non blocking data polling.

For the most efficient use, the user of this class should call {@ link #pollNext()} until it returns an element that is no longer available.

If this happens, he should check the input {@ link #isFinished()}.

If not, he should wait for {@ link #getAvailableFuture()} {@ link CompletableFuture} to complete. For example:

 /**
 * <pre>{@code
 * AsyncDataInput<T> input = ...;
 * while (!input.isFinished()) {
 * 	Optional<T> next;
 *
 * 	while (true) {
 * 		next = input.pollNext();
 * 		if (!next.isPresent()) {
 * 			break;
 * 		}
 * 		// do something with next
 * 	}
 *
 * 	input.getAvailableFuture().get();
 * }
 */ }</pre>

method
PullingAsyncDataInput has two methods

namedescribe
Optional pollNext() throws Exception;Get the next element, the method should be non blocking
boolean isFinished();Complete

2.1.2. ChannelStateHolder

Implemented by an entity that holds any type of channel state and needs to reference {@ link ChannelStateWriter}.

There is only one method and can only be called once

/** Injects the {@link ChannelStateWriter}. Must only be called once. */
    void setChannelStateWriter(ChannelStateWriter channelStateWriter);

2.1.3. AutoCloseable

There is only one void close() throws Exception; Method

2.2. Method list

namedescribe
setChannelStateWriter(ChannelStateWriter channelStateWriter)Set channelstatewriter(
abstract int getNumberOfInputChannelsComplete
abstract Optional getNext()Blocking the call waiting for the next {@ link BufferOrEvent}. (before getting the next buffer, you should ensure that the last returned buffer has been recycled.)
abstract Optional pollNext()Non blocking, polling {@ link BufferOrEvent}.
abstract void sendTaskEvent(TaskEvent event)Send task event
abstract void resumeConsumption(InputChannelInfo channelInfo)Request consumption ResultPartition
abstract InputChannel getChannel(int channelIndex)Returns the channel of the secondary gate
List getChannelInfos()Returns the channel information of the gate
CompletableFuture<?> getPriorityEventAvailableFuture()Notify when priority events have been queued. If you query the future from the task thread, you can ensure that priority events are available and retrieved through {@ link #getNext()}.
abstract void setup()Set the gate, which may be heavy weight, compared with blocking operation, just create.
abstract void requestPartitions()Request consumption ResultPartition

III InputGate implementation

The Task calls inputgate through a loop The getnextbufferoevent method obtains the input data and gives the obtained data to the operator it encapsulates for processing, which constitutes the basic running logic of a Task.

InputGate has two specific implementations: SingleInputGate and unioninputgate. Unioninputgate is composed of multiple singleinputgates.

3.1. SingleInputGate

3.1.1. IndexedInputGate

The parent class of SingleInputGate is IndexedInputGate, which defines checkpoint related methods

namedescribe
abstract int getGateIndex()Get the number of the modified inputgate
void checkpointStarted(CheckpointBarrier barrier)Start checkpoint
void checkpointStopped(long cancelledCheckpointId)Stop checkpoint
getInputGateIndex()Get the number of the modified inputgate
void blockConsumption(InputChannelInfo channelInfo)Not used. The network stack automatically blocks consumption by canceling credit cards.

3.1. 2. Properties

/**
 *
 Lock object to guard partition requests and runtime channel updates.
 Lock objects to protect partition requests and runtime channel updates.
 *  Lock object to guard partition requests and runtime channel updates. */
private final Object requestLock = new Object();

/**
 * The name of the task, which is used for logging.
 * owningTaskName = "Flat Map (2/4)#0 (0ef8b3d70af60be8633af8af4e1c0698)"
 * The name of the owning task, for logging purposes. */
private final String owningTaskName;

private final int gateIndex;

/**
 * The ID of the consumed intermediate result. Each input gate consumes partitions of the
 * intermediate result specified by this ID. This ID also identifies the input gate at the
 * consuming task.
 *
 * Consume the ID of the output fruit partition of the previous operator
 * {IntermediateDataSetID@8214} "5eba1007ad48ad2243891e1eff29c32b"
 *
 */
private final IntermediateDataSetID consumedResultId;

/**
 * Type of result partition:{ ResultPartitionType@7380} "PIPELINED_BOUNDED"
 * The type of the partition the input gate is consuming. */
private final ResultPartitionType consumedPartitionType;

/**
 * index of consumption sub partition
 * The index of the consumed subpartition of each consumed partition. This index depends on the
 * {@link DistributionPattern} and the subtask indices of the producing and consuming task.
 */
private final int consumedSubpartitionIndex;

/**
 * inputchannel Number of
 * The number of input channels (equivalent to the number of consumed partitions). */
private final int numberOfInputChannels;

/**
 * InputGate All Input channels in
 * Result partition -- > input channels
 * Each consumed intermediate result partition has an input channel.
 * We store it in a map for runtime updates of a single channel
 * inputChannels = {HashMap@8215}  size = 1
 *         {IntermediateResultPartitionID@8237} "5eba1007ad48ad2243891e1eff29c32b#0" -> {LocalRecoveredInputChannel@8238}
 *
 *
 *
 * Input channels. There is a one input channel for each consumed intermediate result partition.
 * We store this in a map for runtime updates of single channels.
 */
private final Map<IntermediateResultPartitionID, InputChannel> inputChannels;

/**
 * InputGate All Input channels in
 *        channels = {InputChannel[1]@8216}
 *              0 = {LocalRecoveredInputChannel@8238}
 */
@GuardedBy("requestLock")
private final InputChannel[] channels;

/**
 * InputChannel There are data available for consumption in these inputchannels
 * inputChannelsWithData = {PrioritizedDeque@8217} "[]"
 * Channels, which notified this input gate about available data.
 * */
private final PrioritizedDeque<InputChannel> inputChannelsWithData = new PrioritizedDeque<>();

/**
 * A field that guarantees the uniqueness of the inputChannelsWithData queue.
 *
 * These two fields should be unified into one field.
 *
 * enqueuedInputChannelsWithData = {BitSet@8218} "{}"
 *
 * Field guaranteeing uniqueness for inputChannelsWithData queue.
 * Both of those fields should be unified onto one.
 */
@GuardedBy("inputChannelsWithData")
private final BitSet enqueuedInputChannelsWithData;

// Channel without partition event??
private final BitSet channelsWithEndOfPartitionEvents;

// Last priority serial number
@GuardedBy("inputChannelsWithData")
private int[] lastPrioritySequenceNumber;

/** The partition producer state listener. */
private final PartitionProducerStateProvider partitionProducerStateProvider;

/**
 * Memory manager: LocalBufferPool
 * {LocalBufferPool@8221} "[size: 8, required: 1, requested: 1, available: 1, max: 8, listeners: 0,subpartitions: 0, maxBuffersPerChannel: 2147483647, destroyed: false]"
 *
 * Buffer pool for incoming buffers. Incoming data from remote channels is copied to buffers
 * from this pool.
 */
private BufferPool bufferPool;

private boolean hasReceivedAllEndOfPartitionEvents;

/**
 * Flag indicating whether a partition has been requested
 * Flag indicating whether partitions have been requested. */
private boolean requestedPartitionsFlag;

/**
 * Blocked Evnet
 */
private final List<TaskEvent> pendingEvents = new ArrayList<>();

//Number of uninitialized channels
private int numberOfUninitializedChannels;

/**
 * The timer that triggers the local partition request again. Initialize only when needed.
 * A timer to retrigger local partition requests. Only initialized if actually needed. */
private Timer retriggerLocalRequestTimer;

// Factory class of bufferpoolFactory
// {SingleInputGateFactory$lambda@8223}
private final SupplierWithException<BufferPool, IOException> bufferPoolFactory;

private final CompletableFuture<Void> closeFuture;

@Nullable private final BufferDecompressor bufferDecompressor;

// {NetworkBufferPool@7512}
private final MemorySegmentProvider memorySegmentProvider;

/**
 *  {HybridMemorySegment@8225}
 *  
 * The segment to read data from file region of bounded blocking partition by local input
 * channel.
 */
private final MemorySegment unpooledSegment;

3.1.3. setup

Allocate dedicated memory for all input channel s in the setup phase of InputGate. View the setup method of SingleInputGate

It's actually distribution LocalBufferPool@8221 , all inputchenels in the same InputGate share one LocalBufferPool@8221 .

@Override
    public void setup() throws IOException {
        checkState(
                this.bufferPool == null,
                "Bug in input gate setup logic: Already registered buffer pool.");

        // Allocate dedicated buffers for all inputchannels and the rest as floating buffers
        setupChannels();

        // Set bufferPool to allocate floating buffers
        BufferPool bufferPool = bufferPoolFactory.get();

        // Request the subpartition s to be read by each input channel
        setBufferPool(bufferPool);
    }

	    /** Assign the exclusive buffers to all remote input channels directly for credit-based mode. */
    @VisibleForTesting
    public void setupChannels() throws IOException {
        synchronized (requestLock) {
            for (InputChannel inputChannel : inputChannels.values()) {
                // Call the setup method of each InputChannel in SingleInputGate separately.
                inputChannel.setup();
            }
        }
    }

3.1.4. requestPartitions

   //Request partition
    @Override
    public void requestPartitions() {
        synchronized (requestLock) {

            // partition can only be requested once. After the method is called for the first time, this flag will be set to true
            if (!requestedPartitionsFlag) {
                if (closeFuture.isDone()) {
                    throw new IllegalStateException("Already released.");
                }

                // Sanity checks
                if (numberOfInputChannels != inputChannels.size()) {
                    throw new IllegalStateException(
                            String.format(
                                    "Bug in input gate setup logic: mismatch between "
                                            + "number of total input channels [%s] and the currently set number of input "
                                            + "channels [%s].",
                                    inputChannels.size(), numberOfInputChannels));
                }

                convertRecoveredInputChannels();

                // Request partition data
                internalRequestPartitions();
            }

            // After the method is called, set the flag to true to prevent repeated calls
            requestedPartitionsFlag = true;
        }
    }

  // Request data???
    private void internalRequestPartitions() {
        for (InputChannel inputChannel : inputChannels.values()) {
            try {
                //Each channel requests the corresponding sub partition
                inputChannel.requestSubpartition(consumedSubpartitionIndex);
            } catch (Throwable t) {
                inputChannel.setError(t);
                return;
            }
        }
    }

3.1.5. getChannel(int channelIndex)

Get the specified InputChannel according to channelIndex

@Override
public InputChannel getChannel(int channelIndex) {
    return channels[channelIndex];
}

3.1.6. updateInputChannel

Convert unknownChannel to LocalInputChannel or RemoteInputChannel according to whether it is a local operation

public void updateInputChannel(
            ResourceID localLocation, NettyShuffleDescriptor shuffleDescriptor)
            throws IOException, InterruptedException {
        synchronized (requestLock) {
            if (closeFuture.isDone()) {
                // There was a race with a task failure/cancel
                return;
            }

            IntermediateResultPartitionID partitionId =
                    shuffleDescriptor.getResultPartitionID().getPartitionId();

            InputChannel current = inputChannels.get(partitionId);

            // The InputChannel has not been specified yet
            if (current instanceof UnknownInputChannel) {
                UnknownInputChannel unknownChannel = (UnknownInputChannel) current;
                boolean isLocal = shuffleDescriptor.isLocalTo(localLocation);
                InputChannel newChannel;
                if (isLocal) {
                    // LocalInputChannel
                    newChannel = unknownChannel.toLocalInputChannel();
                } else {
                    // RemoteInputChannel
                    RemoteInputChannel remoteInputChannel =
                            unknownChannel.toRemoteInputChannel(
                                    shuffleDescriptor.getConnectionId());
                    remoteInputChannel.setup();
                    newChannel = remoteInputChannel;
                }
                LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel);

                inputChannels.put(partitionId, newChannel);
                channels[current.getChannelIndex()] = newChannel;

                if (requestedPartitionsFlag) {
                    newChannel.requestSubpartition(consumedSubpartitionIndex);
                }

                for (TaskEvent event : pendingEvents) {
                    newChannel.sendTaskEvent(event);
                }

                if (--numberOfUninitializedChannels == 0) {
                    pendingEvents.clear();
                }
            }
        }
    }

3.1.7. retriggerPartitionRequest

Re trigger the partition request. In fact, it triggers the corresponding inputchanel's retingersubpartitionrequest

/** Retriggers a partition request. */
public void retriggerPartitionRequest(IntermediateResultPartitionID partitionId)
        throws IOException {
    synchronized (requestLock) {
        if (!closeFuture.isDone()) {
            final InputChannel ch = inputChannels.get(partitionId);

            checkNotNull(ch, "Unknown input channel with ID " + partitionId);

            LOG.debug(
                    "{}: Retriggering partition request {}:{}.",
                    owningTaskName,
                    ch.partitionId,
                    consumedSubpartitionIndex);

            if (ch.getClass() == RemoteInputChannel.class) {

                // RemoteInputChannel
                final RemoteInputChannel rch = (RemoteInputChannel) ch;
                rch.retriggerSubpartitionRequest(consumedSubpartitionIndex);
            } else if (ch.getClass() == LocalInputChannel.class) {

                // RemoteInputChannel
                final LocalInputChannel ich = (LocalInputChannel) ch;

                if (retriggerLocalRequestTimer == null) {
                    retriggerLocalRequestTimer = new Timer(true);
                }

                ich.retriggerSubpartitionRequest(
                        retriggerLocalRequestTimer, consumedSubpartitionIndex);
            } else {
                throw new IllegalStateException(
                        "Unexpected type of channel to retrigger partition: " + ch.getClass());
            }
        }
    }
}

3.1.8. close

Close the operation Release the resources in each InputChannel, delay the release of LocalBufferPool, and notify all if the release is successful

@Override
public void close() throws IOException {
    boolean released = false;
    synchronized (requestLock) {
        if (!closeFuture.isDone()) {
            try {
                LOG.debug("{}: Releasing {}.", owningTaskName, this);

                if (retriggerLocalRequestTimer != null) {
                    retriggerLocalRequestTimer.cancel();
                }

                for (InputChannel inputChannel : inputChannels.values()) {
                    try {
                        // Release resources
                        inputChannel.releaseAllResources();
                    } catch (IOException e) {
                        LOG.warn(
                                "{}: Error during release of channel resources: {}.",
                                owningTaskName,
                                e.getMessage(),
                                e);
                    }
                }

                // The buffer pool can actually be destroyed immediately after the
                // reader received all of the data from the input channels.
                if (bufferPool != null) {
                   // Release bufferPool
                    bufferPool.lazyDestroy();
                }
            } finally {
                released = true;
                closeFuture.complete(null);
            }
        }
    }

    if (released) {
        synchronized (inputChannelsWithData) {
            // Notify all
            inputChannelsWithData.notifyAll();
        }
    }
}

3.1.9. getNextBufferOrEvent

The Task calls inputgate through a loop Getnextbufferoevent method obtains the input data,
The getnextbufferoevent method calls the waitAndGetNextData method
And give the obtained data to the operator it encapsulates for processing,
This constitutes the basic running logic of a Task.

   
/**
 * Task Call inputgate. Through a loop Getnextbufferoevent method obtains the input data,
 * And give the obtained data to the operator it encapsulates for processing,
 * This constitutes the basic running logic of a Task.
 *
 * @param blocking
 * @return
 * @throws IOException
 * @throws InterruptedException
 */
private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking)
        throws IOException, InterruptedException {
    // Returns NULL if all partition termination events are received
    if (hasReceivedAllEndOfPartitionEvents) {
        return Optional.empty();
    }

    // If the input gate is turned off
    if (closeFuture.isDone()) {
        throw new CancelTaskException("Input gate is already closed.");
    }

    // Read data in {blocking: blocking / non blocking}
    Optional<InputWithData<InputChannel, BufferAndAvailability>> next =
            waitAndGetNextData(blocking);
    if (!next.isPresent()) {
        return Optional.empty();
    }
	// Get data
    InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
	// Judge whether the data is event or data according to the Buffer
    return Optional.of(
            transformToBufferOrEvent(
                    inputWithData.data.buffer(),
                    inputWithData.moreAvailable,
                    inputWithData.input,
                    inputWithData.morePriorityEvents));
}

There are two ways to obtain
Get the next Buffer in blocking mode

@Override
public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
    return getNextBufferOrEvent(true);
}

Non blocking get the next Buffer

@Override
public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
    return getNextBufferOrEvent(false);
}

3.1.10. waitAndGetNextData

Wait to get the next data... [judge whether to block the acquisition according to the parameter boolean blocking]

private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(
        boolean blocking) throws IOException, InterruptedException {
    while (true) {
        synchronized (inputChannelsWithData) {


            Optional<InputChannel> inputChannelOpt = getChannel(blocking);
            if (!inputChannelOpt.isPresent()) {
                return Optional.empty();
            }

            // Get the channel and determine whether it is blocking mode according to the blocking parameter
            final InputChannel inputChannel = inputChannelOpt.get();
            Optional<BufferAndAvailability> bufferAndAvailabilityOpt =
                    inputChannel.getNextBuffer();

            if (!bufferAndAvailabilityOpt.isPresent()) {
                checkUnavailability();
                continue;
            }

            final BufferAndAvailability bufferAndAvailability = bufferAndAvailabilityOpt.get();
            if (bufferAndAvailability.moreAvailable()) {
                // Put input channels at the end to avoid starvation
                // enqueue the inputChannel at the end to avoid starvation
                queueChannelUnsafe(inputChannel, bufferAndAvailability.morePriorityEvents());
            }

            final boolean morePriorityEvents =
                    inputChannelsWithData.getNumPriorityElements() > 0;
            if (bufferAndAvailability.hasPriority()) {
                lastPrioritySequenceNumber[inputChannel.getChannelIndex()] =
                        bufferAndAvailability.getSequenceNumber();
                if (!morePriorityEvents) {
                    priorityAvailabilityHelper.resetUnavailable();
                }
            }

            // If inputChannelsWithData is empty, it is set to unavailable status
            checkUnavailability();

            // Return packaged results
            return Optional.of(
                    new InputWithData<>(
                            inputChannel,
                            bufferAndAvailability,
                            !inputChannelsWithData.isEmpty(),
                            morePriorityEvents));
        }

3.1.11. transformToBufferOrEvent

Judge whether the data is an event or data according to the Buffer... Return the corresponding BufferOrEvent object instance

	private BufferOrEvent transformToBufferOrEvent(
        Buffer buffer,
        boolean moreAvailable,
        InputChannel currentChannel,
        boolean morePriorityEvents)
        throws IOException, InterruptedException {
    // Judge whether the data is event or data according to the Buffer
    if (buffer.isBuffer()) {
        return transformBuffer(buffer, moreAvailable, currentChannel, morePriorityEvents);
    } else {
        return transformEvent(buffer, moreAvailable, currentChannel, morePriorityEvents);
    }
}
  private BufferOrEvent transformBuffer(
            Buffer buffer,
            boolean moreAvailable,
            InputChannel currentChannel,
            boolean morePriorityEvents) {
        return new BufferOrEvent(
                decompressBufferIfNeeded(buffer),
                currentChannel.getChannelInfo(),
                moreAvailable,
                morePriorityEvents);
    }

    private BufferOrEvent transformEvent(
            Buffer buffer,
            boolean moreAvailable,
            InputChannel currentChannel,
            boolean morePriorityEvents)
            throws IOException, InterruptedException {
        final AbstractEvent event;
        try {
            event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
        } finally {
            buffer.recycleBuffer();
        }

        //If it is the EndOfPartitionEvent event event, then if all inputchannels have received this event
        //Mark hasReceivedAllEndOfPartitionEvents as true and no more data can be obtained thereafter
        if (event.getClass() == EndOfPartitionEvent.class) {
            channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());

            if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {

                //Due to the competitive conditions of the following parties:
                //      1. In this method, release the inputChannelsWithData lock and reach this position
                //      2. Empty data notification, re queue the channel, we can set the moreAvailable flag to true without requiring more data.


                // Because of race condition between:
                // 1. releasing inputChannelsWithData lock in this method and reaching this place
                // 2. empty data notification that re-enqueues a channel
                // we can end up with moreAvailable flag set to true, while we expect no more data.
                checkState(!moreAvailable || !pollNext().isPresent());
                moreAvailable = false;
                hasReceivedAllEndOfPartitionEvents = true;
                markAvailable();
            }

            currentChannel.releaseAllResources();
        }

        return new BufferOrEvent(
                event,
                buffer.getDataType().hasPriority(),
                currentChannel.getChannelInfo(),
                moreAvailable,
                buffer.getSize(),
                morePriorityEvents);
    }

3.1.12. sendTaskEvent

@Override
public void sendTaskEvent(TaskEvent event) throws IOException {
    synchronized (requestLock) {
        // Loop all inputchannels to call their sendTaskEvent
        for (InputChannel inputChannel : inputChannels.values()) {
            inputChannel.sendTaskEvent(event);
        }

        // If there is a queue that has not been initialized, add the Event to the queue
        if (numberOfUninitializedChannels > 0) {
            pendingEvents.add(event);
        }
    }
}

3.1. 13. Recovery of consumption

@Override
public void resumeConsumption(InputChannelInfo channelInfo) throws IOException {
    checkState(!isFinished(), "InputGate already finished.");
    // BEWARE: consumption resumption only happens for streaming jobs in which all slots
    // are allocated together so there should be no UnknownInputChannel. As a result, it
    // is safe to not synchronize the requestLock here. We will refactor the code to not
    // rely on this assumption in the future.
    channels[channelInfo.getInputChannelIdx()].resumeConsumption();
}

3.1.14. Channel notification

namedescribe
void notifyChannelNonEmpty(InputChannel channel)Callback when an InputChannel has data
void notifyPriorityEvent(InputChannel inputChannel, int prioritySequenceNumber)Notification event
void triggerPartitionStateCheck(ResultPartitionID partitionId)Trigger status monitoring of partitions
void queueChannel(InputChannel channel, @Nullable Integer prioritySequenceNumber)Add a new channel to the queue
boolean queueChannelUnsafe(InputChannel channel, boolean priority)If not already queued, queuing the channel may increase the priority.

3.2. InputGateWithMetrics

InputGateWithMetrics is a subclass of InputGate. In fact, it has an additional Counter attribute for counting correlation
DataSinkTask is useful to

There are two properties

private final IndexedInputGate inputGate;

private final Counter numBytesIn;

The IndexedInputGate in the property is actually an implementation such as SingleInputGate or UnionInputGate

3.3. UnionInputGate

UnionInputGate is composed of multiple singleinputgates. There is an inputGatesWithData queue inside
The Input gate wrapper combines inputs from multiple input gates.
Each input gates is connected with an input channel from which data can be read.
At each input gates, the input channel has a unique ID from 0 (inclusive) to the number of input channels (exclusive).

 *
 * <pre>
 * +---+---+      +---+---+---+
 * | 0 | 1 |      | 0 | 1 | 2 |
 * +--------------+--------------+
 * | Input gate 0 | Input gate 1 |
 * +--------------+--------------+
 * </pre>

union input gate maps these ID s from 0 to the total number of input channels of all union input gates,
For example, the channel of input gate 0 retains its original index,
The channel index of gate 1 is set from 2 to 2 – 4.

 * 
 * <pre>
 * +---+---++---+---+---+
 * | 0 | 1 || 2 | 3 | 4 |
 * +--------------------+
 * | Union input gate   |
 * +--------------------+
 * </pre>
 *
 /**
     * Gates, which notified this input gate about available data. We are using it as a FIFO queue
     * of {@link InputGate}s to avoid starvation and provide some basic fairness.
     */
    private final PrioritizedDeque<IndexedInputGate> inputGatesWithData = new PrioritizedDeque<>();

IV InputChannel

InputGate contains the implementation of multiple inputchannels

The basic logic of InputChannel is also relatively simple,
Its life cycle is based on requestSubpartition(int subpartitionIndex),
getNextBuffer() and releaseAllResources().

According to the location of the ResultPartition consumed by InputChannel,
InputChannel has two different implementations: LocalInputChannel and RemoteInputChannel,
Corresponding to local and remote data exchange respectively.

Another implementation class of InputChannel is UnknownInputChannel,
This is equivalent to a placeholder when the ResultPartition location has not been determined,
It will eventually be updated to LocalInputChannel or RemoteInputChannel.

4.1. attribute

//Target ResultPartitionID for consumption
protected final ResultPartitionID partitionId;

// Belongs to a specific InputGate
protected final SingleInputGate inputGate;

// - Asynchronous error notification --------------------------------------

private final AtomicReference<Throwable> cause = new AtomicReference<Throwable>();

// - Partition request backoff --------------------------------------------

/**
 * Initialize backoff(ms)
 * The initial backoff (in ms).
 * */
protected final int initialBackoff;

/**
 * Maximum backoff(ms)
 * The maximum backoff (in ms). */
protected final int maxBackoff;

// Data size quantity counter
protected final Counter numBytesIn;

// Buffer count
protected final Counter numBuffersIn;

/**
 * Current Backoff (ms)
 * The current backoff (in ms). */
private int currentBackoff;

4.2. Construction method

Assignment related

protected InputChannel(
        SingleInputGate inputGate,
        int channelIndex,
        ResultPartitionID partitionId,
        int initialBackoff,
        int maxBackoff,
        Counter numBytesIn,
        Counter numBuffersIn) {

    checkArgument(channelIndex >= 0);

    int initial = initialBackoff;
    int max = maxBackoff;

    checkArgument(initial >= 0 && initial <= max);

    this.inputGate = checkNotNull(inputGate);
    this.channelInfo = new InputChannelInfo(inputGate.getGateIndex(), channelIndex);
    this.partitionId = checkNotNull(partitionId);

    this.initialBackoff = initial;
    this.maxBackoff = max;
    this.currentBackoff = initial == 0 ? -1 : 0;

    this.numBytesIn = numBytesIn;
    this.numBuffersIn = numBuffersIn;
}

4.3. method

namedescribe
void setup()Initialization correlation
abstract void resumeConsumption()Restore data consumption
void notifyChannelNonEmptyCallback function to inform InputGate that there is data in the current channel
void notifyPriorityEvent(int priorityBufferNumber)Callback function to inform InputGate that there is an event in the current channel
void notifyBufferAvailable(int numAvailableBuffers)Notify the number of available buffers, back pressure related
abstract void requestSubpartition(int subpartitionIndex)Request ResultSubpartition
void checkpointStarted(CheckpointBarrier barrier)checkpoint startup
void checkpointStopped(long checkpointId)Stop checkpoint
abstract void sendTaskEvent(TaskEvent event)Send TaskEvent
abstract boolean isReleased()Free resources
abstract void releaseAllResources()Free all resources

V LocalInputChannel

LocalInputChannel is a subclass of InputChannel Used to exchange data between different threads in the same process

If an InputChannel and the Task of its consumed upstream ResultPartition run in the same TaskManager,
Then the data exchange between them is carried out between different threads in the same JVM process without network exchange.

LocalInputChannel implements both the InputChannel interface and the BufferAvailabilityListener interface.

The LocalInputChannel requests to create a resultsubpartitionview associated with the specified resultsubpartition through the ResultPartitionManager,
And take itself as the callback of resultsubpartitionview.
In this way, once the resultsubpartition has data output, the resultsubpartitionview will be notified,
At the same time, the callback function of LocalInputChannel will also be called,
In this way, consumers can obtain the production situation of data in time, so as to consume data in time.

5.1. attribute

// ------------------------------------------------------------------------

private final Object requestLock = new Object();

/**
 *
 * Partition manager, which stores all partition information
 *    partitionManager = {ResultPartitionManager@7381}
 *            registeredPartitions = {HashMap@7405}  size = 8
 *                    {ResultPartitionID@7416} "6b3e5e999219f9532114514c4bdbb773#0@51ad11521e991efaad6349cdf2accda7" -> {PipelinedResultPartition@7417} "PipelinedResultPartition 6b3e5e999219f9532114514c4bdbb773#0@51ad11521e991efaad6349cdf2accda7 [PIPELINED_BOUNDED, 1 subpartitions, 1 pending consumptions]"
 *                    {ResultPartitionID@7418} "6b3e5e999219f9532114514c4bdbb773#2@aecbd0682c0973976efe563eca747cc0" -> {PipelinedResultPartition@7419} "PipelinedResultPartition 6b3e5e999219f9532114514c4bdbb773#2@aecbd0682c0973976efe563eca747cc0 [PIPELINED_BOUNDED, 1 subpartitions, 1 pending consumptions]"
 *                    {ResultPartitionID@7420} "e07667949eeb5fe115288459d1d137f1#1@0ef8b3d70af60be8633af8af4e1c0698" -> {PipelinedResultPartition@7421} "PipelinedResultPartition e07667949eeb5fe115288459d1d137f1#1@0ef8b3d70af60be8633af8af4e1c0698 [PIPELINED_BOUNDED, 4 subpartitions, 4 pending consumptions]"
 *                    {ResultPartitionID@7422} "e07667949eeb5fe115288459d1d137f1#2@5e3aaeed65818bcfeb1485d0fd22d1ac" -> {PipelinedResultPartition@7423} "PipelinedResultPartition e07667949eeb5fe115288459d1d137f1#2@5e3aaeed65818bcfeb1485d0fd22d1ac [PIPELINED_BOUNDED, 4 subpartitions, 4 pending consumptions]"
 *                    {ResultPartitionID@7424} "e07667949eeb5fe115288459d1d137f1#3@30e457019371f01a403bd06cf3041eeb" -> {PipelinedResultPartition@7425} "PipelinedResultPartition e07667949eeb5fe115288459d1d137f1#3@30e457019371f01a403bd06cf3041eeb [PIPELINED_BOUNDED, 4 subpartitions, 4 pending consumptions]"
 *                    {ResultPartitionID@7426} "e07667949eeb5fe115288459d1d137f1#0@bfbc34d8314d506a39528d9c86f16859" -> {PipelinedResultPartition@7427} "PipelinedResultPartition e07667949eeb5fe115288459d1d137f1#0@bfbc34d8314d506a39528d9c86f16859 [PIPELINED_BOUNDED, 4 subpartitions, 4 pending consumptions]"
 *                    {ResultPartitionID@7428} "6b3e5e999219f9532114514c4bdbb773#1@bcf3be98463b672ea899cee1290423a2" -> {PipelinedResultPartition@7429} "PipelinedResultPartition 6b3e5e999219f9532114514c4bdbb773#1@bcf3be98463b672ea899cee1290423a2 [PIPELINED_BOUNDED, 1 subpartitions, 1 pending consumptions]"
 *                    {ResultPartitionID@7379} "5eba1007ad48ad2243891e1eff29c32b#0@db0c587a67c31a83cff5fd8be9496e5d" -> {PipelinedResultPartition@7371} "PipelinedResultPartition 5eba1007ad48ad2243891e1eff29c32b#0@db0c587a67c31a83cff5fd8be9496e5d [PIPELINED_BOUNDED, 4 subpartitions, 4 pending consumptions]"
 * The local partition manager. */
private final ResultPartitionManager partitionManager;

/**
 *
 * taskEventPublisher = {TaskEventDispatcher@6289}
 *
 * Task event dispatcher for backwards events.
 *
 * */
private final TaskEventPublisher taskEventPublisher;

/** The consumed subpartition. */
@Nullable private volatile ResultSubpartitionView subpartitionView;

private volatile boolean isReleased;

private final ChannelStatePersister channelStatePersister;

5.2. BufferAvailabilityListener

The LocalInputChannel implements the BufferAvailabilityListener interface

This interface has two methods

namedescribe
void notifyDataAvailable();This method will be called when data arrives in the resultsubsystem to notify the InputChannel that data has arrived and can be consumed
void notifyPriorityEvent(int prioritySequenceNumber)Called when the first priority event is added to the buffer queue header.

The most important is the notifyDataAvailable method Because Flink's flow calculation, considering the timeliness, adopts the push method When data arrives in the resultsubsystem, the notifyDataAvailable method will be called to notify the InputChannel that data has arrived and can be consumed

5.3. ChannelStatePersister

The persistence operation of checkpin is implemented by ChannelStatePersister Mainly look at the public void checkpoint started (checkpoint barrier) and public void checkpoint stopped (long checkpoint ID) methods

5.3.1. CheckpointBarrier

Checkpoint barriers are used to align checkpoints throughout the flow topology.
When instructed by JobManager, source will issue barriers.
When operators receive a checkpoint barrier on one of its input s, it knows that this is the point between pre checkpoint and post checkpoint data.
Once the operator receives Checkpoint barriers from all its input channels, it knows that a checkpoint has been completed.
It can trigger operator specific checkpoint behavior and broadcast barriers to downstream operators.

According to the semantic guarantee, the data after the checkpoint can be delayed until the checkpoint is completed (exactly once).

Checkpoint barriersID is strictly monotonically increasing.

The checkpoint barrier has only three properties:

private final long id;
private final long timestamp;
private final CheckpointOptions checkpointOptions;

So far, the read/writer method is not supported in the checkpoint barrier. All operations are controlled by the parameter checkpoint options

5.3.2. CheckpointOptions property

Some properties are defined here, such as whether it is isExactlyOnceMode, precise primary mode, checkpoint type and other related operations

/** 
 * checkpoint Type of
 * Type of the checkpoint. */
private final CheckpointType checkpointType;

// Checkpin persistence related
/** Target location for the checkpoint. */
private final CheckpointStorageLocationReference targetLocation;

//Is it an accurate once mode
private final boolean isExactlyOnceMode;

// Align
private final boolean isUnalignedCheckpoint;

// Timeout
private final long alignmentTimeout;

5.3.3. CheckpointType

Checkpoint type is an enumeration class of checkpoint

There are four types of checkpoint type

namedescribe
CHECKPOINT(false, PostCheckpointAction.NONE, "Checkpoint")checkpoint operation
SAVEPOINT(true, PostCheckpointAction.NONE, "Savepoint"),savepoint operation
SAVEPOINT_SUSPEND(true, PostCheckpointAction.SUSPEND, "Suspend Savepoint"),Pause savepoint
SAVEPOINT_TERMINATE(true, PostCheckpointAction.TERMINATE, "Terminate Savepoint");Stop savepoint

Each type of checkpoint type can be summarized into three properties

// Is it svepont type
private final boolean isSavepoint;
// action
private final PostCheckpointAction postCheckpointAction;
// Checkpoint type name
private final String name;

5.3.4. CheckpointStatus

The checkpoint status has three statuses

private enum CheckpointStatus {
    // complete
    COMPLETED,
    // Hang
    BARRIER_PENDING,
    // Received
    BARRIER_RECEIVED
}

5.3.5. checkpointStarted

Start the checkpoint operation

protected void startPersisting(long barrierId, List<Buffer> knownBuffers)
        throws CheckpointException {
    logEvent("startPersisting", barrierId);

    // Judge that the status of the checkpoint must be received, and the last lastSeenBarrier must be greater than the currently entered barrierId
    if (checkpointStatus == CheckpointStatus.BARRIER_RECEIVED && lastSeenBarrier > barrierId) {
        throw new CheckpointException(
                String.format(
                        "Barrier for newer checkpoint %d has already been received compared to the requested checkpoint %d",
                        lastSeenBarrier, barrierId),
                CheckpointFailureReason
                        .CHECKPOINT_SUBSUMED); // currently, at most one active unaligned
    }

    if (lastSeenBarrier < barrierId) {
        // Regardless of the current checkpoint status, if we receive notification about the nearest checkpoint, we have seen so far that the nearest barrier is always marked as suspended.
        //
        //BARRIER_RECEIVED status can occur. If we see an old barrier, it may not have been processed by the task, but the task now informs us that the checkpoint has been started for the new checkpoint.
        //
        //We should say what we know and show that we are waiting for new obstacles

        // Regardless of the current checkpointStatus, if we are notified about a more recent
        // checkpoint then we have seen so far, always mark that this more recent barrier is
        // pending.
        // BARRIER_RECEIVED status can happen if we have seen an older barrier, that probably
        // has not yet been processed by the task, but task is now notifying us that checkpoint
        // has started for even newer checkpoint. We should spill the knownBuffers and mark that
        // we are waiting for that newer barrier to arrive
        checkpointStatus = CheckpointStatus.BARRIER_PENDING;
        lastSeenBarrier = barrierId;
    }
    if (knownBuffers.size() > 0) {
        channelStateWriter.addInputData(
                barrierId,
                channelInfo,
                ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
                CloseableIterator.fromList(knownBuffers, Buffer::recycleBuffer));
    }
}

5.3.6. checkpointStopped

Stop the checkpoint operation

   protected void stopPersisting(long id) {
        logEvent("stopPersisting", id);
        if (id >= lastSeenBarrier) {
            checkpointStatus = CheckpointStatus.COMPLETED;
            lastSeenBarrier = id;
        }
    }

5.4. requestSubpartition

Request the sub partition corresponding to consumption

//Request the sub partition corresponding to consumption
@Override
protected void requestSubpartition(int subpartitionIndex) throws IOException {

    boolean retriggerRequest = false;
    boolean notifyDataAvailable = false;

    // The lock is required to request only once in the presence of retriggered requests.
    synchronized (requestLock) {
        checkState(!isReleased, "LocalInputChannel has been released already");

        if (subpartitionView == null) {
            LOG.debug(
                    "{}: Requesting LOCAL subpartition {} of partition {}. {}",
                    this,
                    subpartitionIndex,
                    partitionId,
                    channelStatePersister);

            try {
                // Local, without network communication, create a ResultSubpartitionView through ResultPartitionManager
                // The LocalInputChannel implements the BufferAvailabilityListener
                // You will be notified when there is data, and notifyDataAvailable will be called,
                // Then add the current channel to the available channel queue of InputGate
                ResultSubpartitionView subpartitionView =
                        partitionManager.createSubpartitionView(
                                partitionId, subpartitionIndex, this);

                if (subpartitionView == null) {
                    throw new IOException("Error requesting subpartition.");
                }

                // make the subpartition view visible
                this.subpartitionView = subpartitionView;

                // check if the channel was released in the meantime
                if (isReleased) {
                    subpartitionView.releaseAllResources();
                    this.subpartitionView = null;
                } else {
                    notifyDataAvailable = true;
                }
            } catch (PartitionNotFoundException notFound) {
                if (increaseBackoff()) {
                    retriggerRequest = true;
                } else {
                    throw notFound;
                }
            }
        }
    }

    if (notifyDataAvailable) {
        notifyDataAvailable();
    }

    // Do this outside of the lock scope as this might lead to a
    // deadlock with a concurrent release of the channel via the
    // input gate.
    if (retriggerRequest) {
        inputGate.retriggerPartitionRequest(partitionId.getPartitionId());
    }
}

5.5. ResultSubpartitionView

In the requestSubpartition method, there is the following code to build a ResultSubpartitionView Used to read the data in ResultSubpartition

5.5. 1. Construction

// Local, without network communication, create a ResultSubpartitionView through ResultPartitionManager
// The LocalInputChannel implements the BufferAvailabilityListener
// You will be notified when there is data, and notifyDataAvailable will be called,
// Then add the current channel to the available channel queue of InputGate
ResultSubpartitionView subpartitionView =
partitionManager.createSubpartitionView(
        partitionId, subpartitionIndex, this);

Process of creating

ResultPartitionManager#createSubpartitionView
-->    ResultPartition#createSubpartitionView
    -->    BufferWritingResultPartition#createSubpartitionView
        -->    PipelinedSubpartition#createReadView
            -->    readView = new PipelinedSubpartitionView(this, availabilityListener)

5.5. 2. Method

ResultSubpartitionView is an interface that defines some column methods

namedescribe
BufferAndBacklog getNextBuffer()Get an instance of {@ link Buffer} from the queue
void notifyDataAvailable();Notifies the ResultSubpartition that the data is available for consumption
default void notifyPriorityEvent(int priorityBufferNumber) {}Event consumption of ResultSubpartition has been completed
void releaseAllResources() throws IOException;Free all resources
boolean isReleased();Free resources
void resumeConsumption();Re consumption
Throwable getFailureCause();Get exception
boolean isAvailable(int numCreditsAvailable);Get available quota
int unsynchronizedGetNumberOfQueuedBuffers();Number of unsynchronized get queued buffers

5.6. PipelinedSubpartitionView

PipelinedSubpartitionView is the implementation class of ResultSubpartitionView

5.6. 1. Properties

/**
 * Identify which PipelinedSubpartition this view belongs to
 * The subpartition this view belongs to. */
private final PipelinedSubpartition parent;

/**
 * When there is data, it is notified through the implementation of BufferAvailabilityListener
 * LocalInputChannel
 * perhaps
 * CreditBasedSequenceNumberingViewReader(RemoteInputChannel)When data comes, you can consume data
 */
private final BufferAvailabilityListener availabilityListener;

/**
 * Is this view released
 * Flag indicating whether this view has been released. */
final AtomicBoolean isReleased;

5.6. 2. Method

namedescribe
BufferAndBacklog getNextBuffer()Get an instance of {@ link Buffer} from the queue
void notifyDataAvailable();Notifies the ResultSubpartition that the data is available for consumption
default void notifyPriorityEvent(int priorityBufferNumber) {}Event consumption of ResultSubpartition has been completed
void releaseAllResources() throws IOException;Free all resources
boolean isReleased();Free resources
void resumeConsumption();Re consumption
Throwable getFailureCause();Get exception
boolean isAvailable(int numCreditsAvailable);Get available quota
int unsynchronizedGetNumberOfQueuedBuffers();Number of unsynchronized get queued buffers

Methods basically call the methods in the parent class PipelinedSubpartition to process data

 @Nullable
    @Override
    public BufferAndBacklog getNextBuffer() {
        return parent.pollBuffer();
    }

    @Override
    public void notifyDataAvailable() {
        //Call back the interface to notify inputchanel of the arrival of data
        availabilityListener.notifyDataAvailable();
    }

    @Override
    public void notifyPriorityEvent(int priorityBufferNumber) {
        //Call back the interface to notify inputchnel of the arrival of an event
        availabilityListener.notifyPriorityEvent(priorityBufferNumber);
    }

    // Free all resources
    @Override
    public void releaseAllResources() {
        if (isReleased.compareAndSet(false, true)) {
            // The view doesn't hold any resources and the parent cannot be restarted. Therefore,
            // it's OK to notify about consumption as well.
            parent.onConsumedSubpartition();
        }
    }

    @Override
    public boolean isReleased() {
        return isReleased.get() || parent.isReleased();
    }

    @Override
    public void resumeConsumption() {
        parent.resumeConsumption();
    }

    @Override
    public boolean isAvailable(int numCreditsAvailable) {
        return parent.isAvailable(numCreditsAvailable);
    }

    @Override
    public Throwable getFailureCause() {
        return parent.getFailureCause();
    }

    @Override
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        return parent.unsynchronizedGetNumberOfQueuedBuffers();
    }

5.7. retriggerSubpartitionRequest

  /** Retriggers a subpartition request. */
    void retriggerSubpartitionRequest(Timer timer, final int subpartitionIndex) {
        synchronized (requestLock) {
            checkState(subpartitionView == null, "already requested partition");

            timer.schedule(
                    new TimerTask() {
                        @Override
                        public void run() {
                            try {
                                requestSubpartition(subpartitionIndex);
                            } catch (Throwable t) {
                                setError(t);
                            }
                        }
                    },
                    getCurrentBackoff());
        }
    }

5.8. notifyDataAvailable

Callback to notify resultsubpartitionview that there is data available for consumption in resultsubpartition,

//Callback to notify resultsubpartitionview that there is data available for consumption in resultsubpartition,
@Override
public void notifyDataAvailable() {
    //LocalInputChannel notifies InputGate
    notifyChannelNonEmpty();
}

5.9. resumeConsumption

@Override
public void resumeConsumption() {
    checkState(!isReleased, "Channel released.");

    subpartitionView.resumeConsumption();

    if (subpartitionView.isAvailable(Integer.MAX_VALUE)) {
        notifyChannelNonEmpty();
    }
}

5.10. sendTaskEvent

@Override
void sendTaskEvent(TaskEvent event) throws IOException {
    checkError();
    checkState(
            subpartitionView != null,
            "Tried to send task event to producer before requesting the subpartition.");

    //Event distribution
    if (!taskEventPublisher.publish(partitionId, event)) {
        throw new IOException(
                "Error while publishing event "
                        + event
                        + " to producer. The producer could not be found.");
    }
}

Vi RemoteInputChannel

The data receiver mainly refers to the remoteinput channel, which is used to receive and process data read from other nodes.
The remoteinputchannel corresponds to the LocalInputChannel, which is responsible for reading the local partition and does not need to use the receiver cache.

The RemoteInputChannel requests the ResultSubpartition at the remote end to create a PartitionRequestClient and send a PartitionRequest through Netty,
The id of the current InputChannel and the initial credit information will be brought:

CreditBasedPartitionRequestClientHandler reads data from the network and gives it to the RemoteInputChannel. The RemoteInputChannel will add the received to the queue and apply for floating buffer according to the accumulation of the production end

6.1. attribute

private static final int NONE = -1;

/** ID to distinguish this channel from other channels sharing the same TCP connection. */
private final InputChannelID id = new InputChannelID();

/** The connection to use to request the remote partition. */
private final ConnectionID connectionId;

/** The connection manager to use connect to the remote partition provider. */
private final ConnectionManager connectionManager;

/**
 * The received buffers. Received buffers are enqueued by the network I/O thread and the queue
 * is consumed by the receiving task thread.
 */
private final PrioritizedDeque<SequenceBuffer> receivedBuffers = new PrioritizedDeque<>();

/**
 * Flag indicating whether this channel has been released. Either called by the receiving task
 * thread or the task manager actor.
 */
private final AtomicBoolean isReleased = new AtomicBoolean();

/**
 * RemoteInputChannel Request the ResultSubpartition of the remote end,
 * A PartitionRequestClient will be created,
 * And send a PartitionRequest through Netty,
 * The id of the current InputChannel and the initial credit information will be brought
 *
 * Client to establish a (possibly shared) TCP connection and request the partition.
 * */
private volatile PartitionRequestClient partitionRequestClient;

/** The next expected sequence number for the next buffer. */
private int expectedSequenceNumber = 0;

/**
 * Initialize credit value
 * The initial number of exclusive buffers assigned to this channel. */
private final int initialCredit;

/** The number of available buffers that have not been announced to the producer yet. */
private final AtomicInteger unannouncedCredit = new AtomicInteger(0);

private final BufferManager bufferManager;

@GuardedBy("receivedBuffers")
private int lastBarrierSequenceNumber = NONE;

@GuardedBy("receivedBuffers")
private long lastBarrierId = NONE;

private final ChannelStatePersister channelStatePersister;

6.2. SequenceBuffer

There is a queue receivedBuffers in the attribute summary, which is used to record the received SequenceBuffer

/**
 * The received buffers. Received buffers are enqueued by the network I/O thread and the queue
 * is consumed by the receiving task thread.
 */
private final PrioritizedDeque<SequenceBuffer> receivedBuffers = new PrioritizedDeque<>();

SequenceBuffer is just a wrapper class with a Buffer attribute and a sequence number sequenceNumber

private static final class SequenceBuffer {
        final Buffer buffer;
        final int sequenceNumber;

        private SequenceBuffer(Buffer buffer, int sequenceNumber) {
            this.buffer = buffer;
            this.sequenceNumber = sequenceNumber;
        }
    }

6.3. PartitionRequestClient

RemoteInputChannel requests the ResultSubpartition of the remote end,
A PartitionRequestClient will be created and a PartitionRequest request will be sent through Netty. At this time, the id of the current InputChannel and the initial credit information will be brought

6.4. Construction method

Just a simple assignment,
Attention should be paid to the construction of buffer manager. Each RemoteinputChannel has only one buffer manager for memory management
But in fact, the final memory manager, MemorySegmentProvider globalPool, is still at the NetworkBufferPool level

   public RemoteInputChannel(
            SingleInputGate inputGate,
            int channelIndex,
            ResultPartitionID partitionId,
            ConnectionID connectionId,
            ConnectionManager connectionManager,
            int initialBackOff,
            int maxBackoff,
            int networkBuffersPerChannel,
            Counter numBytesIn,
            Counter numBuffersIn,
            ChannelStateWriter stateWriter) {

        super(
                inputGate,
                channelIndex,
                partitionId,
                initialBackOff,
                maxBackoff,
                numBytesIn,
                numBuffersIn);

        this.initialCredit = networkBuffersPerChannel;
        this.connectionId = checkNotNull(connectionId);
        this.connectionManager = checkNotNull(connectionManager);

        // Build a BufferManager. Each RemoteinputChannel has only one BufferManager for memory management
        // But in fact, the final memory manager, MemorySegmentProvider globalPool, is still at the NetworkBufferPool level
        this.bufferManager = new BufferManager(inputGate.getMemorySegmentProvider(), this, 0);
        this.channelStatePersister = new ChannelStatePersister(stateWriter, getChannelInfo());
    }

6.5. setup

/**
 * Setup includes assigning exclusive buffers to this input channel, and this method should be
 * called only once after this input channel is created.
 */
@Override
void setup() throws IOException {
    checkState(
            bufferManager.unsynchronizedGetAvailableExclusiveBuffers() == 0,
            "Bug in input channel setup logic: exclusive buffers have already been set for this input channel.");

    // Apply for MemorySegment according to the initial credit value
    bufferManager.requestExclusiveBuffers(initialCredit);
}

6.6. requestSubpartition

Request sub partition data

/** Requests a remote subpartition. */
@VisibleForTesting
@Override
public void requestSubpartition(int subpartitionIndex)
        throws IOException, InterruptedException {

    if (partitionRequestClient == null) {
        LOG.debug(
                "{}: Requesting REMOTE subpartition {} of partition {}. {}",
                this,
                subpartitionIndex,
                partitionId,
                channelStatePersister);
        // Create a client and request the partition
        try {


            // Build a client and request partition
            //REMOTE requires network communication and uses Netty to establish a network
            //Establish a connection through the connection manager: create a PartitionRequestClient and initiate a request through the PartitionRequestClient
            partitionRequestClient =
                    connectionManager.createPartitionRequestClient(connectionId);
        } catch (IOException e) {
            // IOExceptions indicate that we could not open a connection to the remote
            // TaskExecutor
            throw new PartitionConnectionException(partitionId, e);
        }

        //Request partition and initiate request through netty
        partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
    }
}

6.7. getNextBuffer

Get buffer from receivedBuffers queue

    @Override
    Optional<BufferAndAvailability> getNextBuffer() throws IOException {
        checkPartitionRequestQueueInitialized();

        final SequenceBuffer next;
        final DataType nextDataType;

        // Get buffer from receivedBuffers queue
        synchronized (receivedBuffers) {
            next = receivedBuffers.poll();
            nextDataType =
                    receivedBuffers.peek() != null
                            ? receivedBuffers.peek().buffer.getDataType()
                            : DataType.NONE;
        }

        if (next == null) {
            if (isReleased.get()) {
                throw new CancelTaskException(
                        "Queried for a buffer after channel has been released.");
            }
            return Optional.empty();
        }

        numBytesIn.inc(next.buffer.getSize());
        numBuffersIn.inc();
        return Optional.of(
                new BufferAndAvailability(next.buffer, nextDataType, 0, next.sequenceNumber));
    }

6.8. sendTaskEvent

@Override
void sendTaskEvent(TaskEvent event) throws IOException {
    checkState(
            !isReleased.get(),
            "Tried to send task event to producer after channel has been released.");
    checkPartitionRequestQueueInitialized();

    partitionRequestClient.sendTaskEvent(partitionId, event, this);
}

6.9. notifyCreditAvailable

Notify the current channel of a new credit

/**
 * Enqueue this input channel in the pipeline for notifying the producer of unannounced credit.
 */
private void notifyCreditAvailable() throws IOException {
    checkPartitionRequestQueueInitialized();
    //Notify the current channel of a new credit
    partitionRequestClient.notifyCreditAvailable(this);
}

6.10. requestBuffer

 /**
     *
     * This method gets a buffer from the bufferQueue and returns it.
     *
     *
     * Requests buffer from input channel directly for receiving network data. It should always
     * return an available buffer in credit-based mode unless the channel has been released.
     *
     * @return The available buffer.
     */
    @Nullable
    public Buffer requestBuffer() {
        return bufferManager.requestBuffer();
    }

6.11. onSenderBacklog

backlog is the number of accumulated buffers at the sending end. If the number of buffers in the bufferQueue is insufficient, it is necessary to request floating buffer s from the LocalBufferPool
After requesting a new buffer, notify the producer that credit is available
Allocate memory in advance according to the backlog. If the backlog plus the initial credit is greater than the number of available buffers, you need to allocate floating buffers.

/**
 *
 * backlog Is the number of accumulated buffer s at the sending end,
 * If the number of buffers in the bufferQueue is insufficient, the floating buffer must be requested from the LocalBufferPool
 * After requesting a new buffer, notify the producer that credit is available
 *
 * Allocate memory in advance according to the backlog,
 * If the backlog plus the initial credit is greater than the number of available buffers, a floating buffer needs to be allocated.
 *
 * Receives the backlog from the producer's buffer response. If the number of available buffers
 * is less than backlog + initialCredit, it will request floating buffers from the buffer
 * manager, and then notify unannounced credits to the producer.
 *
 * @param backlog The number of unsent buffers in the producer's sub partition.
 */
void onSenderBacklog(int backlog) throws IOException {

    // Request floating memory
    int numRequestedBuffers = bufferManager.requestFloatingBuffers(backlog + initialCredit);

    if (numRequestedBuffers > 0 && unannouncedCredit.getAndAdd(numRequestedBuffers) == 0) {

        notifyCreditAvailable();
    }
}

6.12. onBuffer

Received Buffer sent by remote resultpartition
Called by CreditBasedPartitionRequestClientHandler#decodeBufferOrEvent

/**
 * Received Buffer sent by remote ResultSubpartition
 * @param buffer
 * @param sequenceNumber
 * @param backlog
 * @throws IOException
 */
public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {

    // Do you want to recycle this buffer
    boolean recycleBuffer = true;

    try {

        // Check sequenceNumber
        // Serial number needs to match
        if (expectedSequenceNumber != sequenceNumber) {
            onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
            return;
        }

        final boolean wasEmpty;
        boolean firstPriorityEvent = false;

        synchronized (receivedBuffers) {

            NetworkActionsLogger.traceInput(
                    "RemoteInputChannel#onBuffer",
                    buffer,
                    inputGate.getOwningTaskName(),
                    channelInfo,
                    channelStatePersister,
                    sequenceNumber);
            // Similar to notifyBufferAvailable(), make sure that we never add a buffer
            // after releaseAllResources() released all buffers from receivedBuffers
            // (see above for details).
            if (isReleased.get()) {
                return;
            }

            // Judge whether the queue before adding buffer is empty
            wasEmpty = receivedBuffers.isEmpty();

            SequenceBuffer sequenceBuffer = new SequenceBuffer(buffer, sequenceNumber);
            DataType dataType = buffer.getDataType();
            if (dataType.hasPriority()) {
                firstPriorityEvent = addPriorityBuffer(sequenceBuffer);
            } else {
                //  Put the memory block filled with data into the receivedBuffers queue
                //  Subsequent operator s can read the data cached in InputChannel through the implementation class of StreamTaskNetworkInput.
                // [such as StreamOneInputProcessor]
                receivedBuffers.add(sequenceBuffer);
                channelStatePersister.maybePersist(buffer);
                if (dataType.requiresAnnouncement()) {
                    firstPriorityEvent = addPriorityBuffer(announce(sequenceBuffer));
                }
            }

            // Add SequenceNumber
            ++expectedSequenceNumber;
        }
        // Data has been received and the cache does not need to be recycled
        recycleBuffer = false;


        if (firstPriorityEvent) {

            notifyPriorityEvent(sequenceNumber);
        }
        // If the queue before adding buffer is empty, you need to notify the corresponding inputGate. Now there is data (not empty)
        if (wasEmpty) {
            //Notify InputGate that there is new data in the current channel
            notifyChannelNonEmpty();
        }

        if (backlog >= 0) {
            //Apply for float buffer according to the backlog of clients
            onSenderBacklog(backlog);
        }
    } finally {
        // Reclaim buffer
        if (recycleBuffer) {
            buffer.recycleBuffer();
        }
    }
}

Added by axman505 on Tue, 28 Dec 2021 12:33:04 +0200