1. General
Reprint: Flink 1.12.2 source code analysis: Task data input
In Task, InputGate is the encapsulation of input, and InputGate is one-to-one corresponding to JobEdge in JobGraph.
In other words, the InputGate actually corresponds to the upstream operator that the Task depends on (including multiple parallel subtasks), and each InputGate consumes one or more resultpartitions.
InputGate is composed of InputChannel, which corresponds to the ExecutionEdge in the ExecutionGraph one by one; In other words, inputchannels and resultsubpartitions are connected one by one, and one InputChannel receives the output of one ResultSubpartition. According to the location of the read ResultSubpartition, InputChannel has two different implementations: LocalInputChannel and RemoteInputChannel.
II InputGate
The input of a Task is abstracted as InputGate, which is composed of InputChannel, which corresponds to the ResultSubpartition that the Task needs to consume one by one.
InputGate consumes one or more partitions of a single generated intermediate result.
Each intermediate result is divided on the parallel subtasks it produces;
Each of these partitions is further divided into one or more sub partitions.
For example, consider a map reduce program in which the map operator generates data and the reduce operator uses the generated data.
/** * InputGate Consume one or more partitions of a single generated intermediate result. * * Each intermediate result is divided on the parallel subtasks it produces; * * Each of these partitions is further divided into one or more sub partitions. * * For example, consider a map reduce program in which the map operator generates data and the reduce operator uses the generated data. * * <pre>{@code * +-----+ +---------------------+ +--------+ * | Map | = produce => | Intermediate Result | <= consume = | Reduce | * +-----+ +---------------------+ +--------+ * }</pre> * * * * */
When such a program is deployed in parallel, the intermediate results will be divided into parallel subtasks generated by it;
Each of these partitions is further divided into one or more sub partitions.
* <pre>{@code * Intermediate result * +-----------------------------------------+ * | +----------------+ | +-----------------------+ * +-------+ | +-------------+ +=> | Subpartition 1 | | <=======+=== | Input Gate | Reduce 1 | * | Map 1 | ==> | | Partition 1 | =| +----------------+ | | +-----------------------+ * +-------+ | +-------------+ +=> | Subpartition 2 | | <==+ | * | +----------------+ | | | Subpartition request * | | | | * | +----------------+ | | | * +-------+ | +-------------+ +=> | Subpartition 1 | | <==+====+ * | Map 2 | ==> | | Partition 2 | =| +----------------+ | | +-----------------------+ * +-------+ | +-------------+ +=> | Subpartition 2 | | <==+======== | Input Gate | Reduce 2 | * | +----------------+ | +-----------------------+ * +-----------------------------------------+ * }</pre> * */
In the above example, two map subtasks generate intermediate results in parallel, resulting in two partitions (partition 1 and partition 2).
Each partition is further divided into two sub partitions -- one for each parallel reduce subtask.
As shown in the figure, each reduce task has an input gate connected to it.
This will provide its input, which will consist of a sub partition of each partition of the intermediate result.
2.1. Interface implemented by inputgate
InputGate is an abstract class that implements three interfaces: pullingasyncdatainput, autocloseable and channelstateholder
Let's focus on it
2.1.1. PullingAsyncDataInput
Interface defines two basic methods of asynchronous and non blocking data polling.
For the most efficient use, the user of this class should call {@ link #pollNext()} until it returns an element that is no longer available.
If this happens, he should check the input {@ link #isFinished()}.
If not, he should wait for {@ link #getAvailableFuture()} {@ link CompletableFuture} to complete. For example:
/** * <pre>{@code * AsyncDataInput<T> input = ...; * while (!input.isFinished()) { * Optional<T> next; * * while (true) { * next = input.pollNext(); * if (!next.isPresent()) { * break; * } * // do something with next * } * * input.getAvailableFuture().get(); * } */ }</pre>
method
PullingAsyncDataInput has two methods
name | describe |
---|---|
Optional pollNext() throws Exception; | Get the next element, the method should be non blocking |
boolean isFinished(); | Complete |
2.1.2. ChannelStateHolder
Implemented by an entity that holds any type of channel state and needs to reference {@ link ChannelStateWriter}.
There is only one method and can only be called once
/** Injects the {@link ChannelStateWriter}. Must only be called once. */ void setChannelStateWriter(ChannelStateWriter channelStateWriter);
2.1.3. AutoCloseable
There is only one void close() throws Exception; Method
2.2. Method list
name | describe |
---|---|
setChannelStateWriter(ChannelStateWriter channelStateWriter) | Set channelstatewriter( |
abstract int getNumberOfInputChannels | Complete |
abstract Optional getNext() | Blocking the call waiting for the next {@ link BufferOrEvent}. (before getting the next buffer, you should ensure that the last returned buffer has been recycled.) |
abstract Optional pollNext() | Non blocking, polling {@ link BufferOrEvent}. |
abstract void sendTaskEvent(TaskEvent event) | Send task event |
abstract void resumeConsumption(InputChannelInfo channelInfo) | Request consumption ResultPartition |
abstract InputChannel getChannel(int channelIndex) | Returns the channel of the secondary gate |
List getChannelInfos() | Returns the channel information of the gate |
CompletableFuture<?> getPriorityEventAvailableFuture() | Notify when priority events have been queued. If you query the future from the task thread, you can ensure that priority events are available and retrieved through {@ link #getNext()}. |
abstract void setup() | Set the gate, which may be heavy weight, compared with blocking operation, just create. |
abstract void requestPartitions() | Request consumption ResultPartition |
III InputGate implementation
The Task calls inputgate through a loop The getnextbufferoevent method obtains the input data and gives the obtained data to the operator it encapsulates for processing, which constitutes the basic running logic of a Task.
InputGate has two specific implementations: SingleInputGate and unioninputgate. Unioninputgate is composed of multiple singleinputgates.
3.1. SingleInputGate
3.1.1. IndexedInputGate
The parent class of SingleInputGate is IndexedInputGate, which defines checkpoint related methods
name | describe |
---|---|
abstract int getGateIndex() | Get the number of the modified inputgate |
void checkpointStarted(CheckpointBarrier barrier) | Start checkpoint |
void checkpointStopped(long cancelledCheckpointId) | Stop checkpoint |
getInputGateIndex() | Get the number of the modified inputgate |
void blockConsumption(InputChannelInfo channelInfo) | Not used. The network stack automatically blocks consumption by canceling credit cards. |
3.1. 2. Properties
/** * Lock object to guard partition requests and runtime channel updates. Lock objects to protect partition requests and runtime channel updates. * Lock object to guard partition requests and runtime channel updates. */ private final Object requestLock = new Object(); /** * The name of the task, which is used for logging. * owningTaskName = "Flat Map (2/4)#0 (0ef8b3d70af60be8633af8af4e1c0698)" * The name of the owning task, for logging purposes. */ private final String owningTaskName; private final int gateIndex; /** * The ID of the consumed intermediate result. Each input gate consumes partitions of the * intermediate result specified by this ID. This ID also identifies the input gate at the * consuming task. * * Consume the ID of the output fruit partition of the previous operator * {IntermediateDataSetID@8214} "5eba1007ad48ad2243891e1eff29c32b" * */ private final IntermediateDataSetID consumedResultId; /** * Type of result partition:{ ResultPartitionType@7380} "PIPELINED_BOUNDED" * The type of the partition the input gate is consuming. */ private final ResultPartitionType consumedPartitionType; /** * index of consumption sub partition * The index of the consumed subpartition of each consumed partition. This index depends on the * {@link DistributionPattern} and the subtask indices of the producing and consuming task. */ private final int consumedSubpartitionIndex; /** * inputchannel Number of * The number of input channels (equivalent to the number of consumed partitions). */ private final int numberOfInputChannels; /** * InputGate All Input channels in * Result partition -- > input channels * Each consumed intermediate result partition has an input channel. * We store it in a map for runtime updates of a single channel * inputChannels = {HashMap@8215} size = 1 * {IntermediateResultPartitionID@8237} "5eba1007ad48ad2243891e1eff29c32b#0" -> {LocalRecoveredInputChannel@8238} * * * * Input channels. There is a one input channel for each consumed intermediate result partition. * We store this in a map for runtime updates of single channels. */ private final Map<IntermediateResultPartitionID, InputChannel> inputChannels; /** * InputGate All Input channels in * channels = {InputChannel[1]@8216} * 0 = {LocalRecoveredInputChannel@8238} */ @GuardedBy("requestLock") private final InputChannel[] channels; /** * InputChannel There are data available for consumption in these inputchannels * inputChannelsWithData = {PrioritizedDeque@8217} "[]" * Channels, which notified this input gate about available data. * */ private final PrioritizedDeque<InputChannel> inputChannelsWithData = new PrioritizedDeque<>(); /** * A field that guarantees the uniqueness of the inputChannelsWithData queue. * * These two fields should be unified into one field. * * enqueuedInputChannelsWithData = {BitSet@8218} "{}" * * Field guaranteeing uniqueness for inputChannelsWithData queue. * Both of those fields should be unified onto one. */ @GuardedBy("inputChannelsWithData") private final BitSet enqueuedInputChannelsWithData; // Channel without partition event?? private final BitSet channelsWithEndOfPartitionEvents; // Last priority serial number @GuardedBy("inputChannelsWithData") private int[] lastPrioritySequenceNumber; /** The partition producer state listener. */ private final PartitionProducerStateProvider partitionProducerStateProvider; /** * Memory manager: LocalBufferPool * {LocalBufferPool@8221} "[size: 8, required: 1, requested: 1, available: 1, max: 8, listeners: 0,subpartitions: 0, maxBuffersPerChannel: 2147483647, destroyed: false]" * * Buffer pool for incoming buffers. Incoming data from remote channels is copied to buffers * from this pool. */ private BufferPool bufferPool; private boolean hasReceivedAllEndOfPartitionEvents; /** * Flag indicating whether a partition has been requested * Flag indicating whether partitions have been requested. */ private boolean requestedPartitionsFlag; /** * Blocked Evnet */ private final List<TaskEvent> pendingEvents = new ArrayList<>(); //Number of uninitialized channels private int numberOfUninitializedChannels; /** * The timer that triggers the local partition request again. Initialize only when needed. * A timer to retrigger local partition requests. Only initialized if actually needed. */ private Timer retriggerLocalRequestTimer; // Factory class of bufferpoolFactory // {SingleInputGateFactory$lambda@8223} private final SupplierWithException<BufferPool, IOException> bufferPoolFactory; private final CompletableFuture<Void> closeFuture; @Nullable private final BufferDecompressor bufferDecompressor; // {NetworkBufferPool@7512} private final MemorySegmentProvider memorySegmentProvider; /** * {HybridMemorySegment@8225} * * The segment to read data from file region of bounded blocking partition by local input * channel. */ private final MemorySegment unpooledSegment;
3.1.3. setup
Allocate dedicated memory for all input channel s in the setup phase of InputGate. View the setup method of SingleInputGate
It's actually distribution LocalBufferPool@8221 , all inputchenels in the same InputGate share one LocalBufferPool@8221 .
@Override public void setup() throws IOException { checkState( this.bufferPool == null, "Bug in input gate setup logic: Already registered buffer pool."); // Allocate dedicated buffers for all inputchannels and the rest as floating buffers setupChannels(); // Set bufferPool to allocate floating buffers BufferPool bufferPool = bufferPoolFactory.get(); // Request the subpartition s to be read by each input channel setBufferPool(bufferPool); } /** Assign the exclusive buffers to all remote input channels directly for credit-based mode. */ @VisibleForTesting public void setupChannels() throws IOException { synchronized (requestLock) { for (InputChannel inputChannel : inputChannels.values()) { // Call the setup method of each InputChannel in SingleInputGate separately. inputChannel.setup(); } } }
3.1.4. requestPartitions
//Request partition @Override public void requestPartitions() { synchronized (requestLock) { // partition can only be requested once. After the method is called for the first time, this flag will be set to true if (!requestedPartitionsFlag) { if (closeFuture.isDone()) { throw new IllegalStateException("Already released."); } // Sanity checks if (numberOfInputChannels != inputChannels.size()) { throw new IllegalStateException( String.format( "Bug in input gate setup logic: mismatch between " + "number of total input channels [%s] and the currently set number of input " + "channels [%s].", inputChannels.size(), numberOfInputChannels)); } convertRecoveredInputChannels(); // Request partition data internalRequestPartitions(); } // After the method is called, set the flag to true to prevent repeated calls requestedPartitionsFlag = true; } } // Request data??? private void internalRequestPartitions() { for (InputChannel inputChannel : inputChannels.values()) { try { //Each channel requests the corresponding sub partition inputChannel.requestSubpartition(consumedSubpartitionIndex); } catch (Throwable t) { inputChannel.setError(t); return; } } }
3.1.5. getChannel(int channelIndex)
Get the specified InputChannel according to channelIndex
@Override public InputChannel getChannel(int channelIndex) { return channels[channelIndex]; }
3.1.6. updateInputChannel
Convert unknownChannel to LocalInputChannel or RemoteInputChannel according to whether it is a local operation
public void updateInputChannel( ResourceID localLocation, NettyShuffleDescriptor shuffleDescriptor) throws IOException, InterruptedException { synchronized (requestLock) { if (closeFuture.isDone()) { // There was a race with a task failure/cancel return; } IntermediateResultPartitionID partitionId = shuffleDescriptor.getResultPartitionID().getPartitionId(); InputChannel current = inputChannels.get(partitionId); // The InputChannel has not been specified yet if (current instanceof UnknownInputChannel) { UnknownInputChannel unknownChannel = (UnknownInputChannel) current; boolean isLocal = shuffleDescriptor.isLocalTo(localLocation); InputChannel newChannel; if (isLocal) { // LocalInputChannel newChannel = unknownChannel.toLocalInputChannel(); } else { // RemoteInputChannel RemoteInputChannel remoteInputChannel = unknownChannel.toRemoteInputChannel( shuffleDescriptor.getConnectionId()); remoteInputChannel.setup(); newChannel = remoteInputChannel; } LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel); inputChannels.put(partitionId, newChannel); channels[current.getChannelIndex()] = newChannel; if (requestedPartitionsFlag) { newChannel.requestSubpartition(consumedSubpartitionIndex); } for (TaskEvent event : pendingEvents) { newChannel.sendTaskEvent(event); } if (--numberOfUninitializedChannels == 0) { pendingEvents.clear(); } } } }
3.1.7. retriggerPartitionRequest
Re trigger the partition request. In fact, it triggers the corresponding inputchanel's retingersubpartitionrequest
/** Retriggers a partition request. */ public void retriggerPartitionRequest(IntermediateResultPartitionID partitionId) throws IOException { synchronized (requestLock) { if (!closeFuture.isDone()) { final InputChannel ch = inputChannels.get(partitionId); checkNotNull(ch, "Unknown input channel with ID " + partitionId); LOG.debug( "{}: Retriggering partition request {}:{}.", owningTaskName, ch.partitionId, consumedSubpartitionIndex); if (ch.getClass() == RemoteInputChannel.class) { // RemoteInputChannel final RemoteInputChannel rch = (RemoteInputChannel) ch; rch.retriggerSubpartitionRequest(consumedSubpartitionIndex); } else if (ch.getClass() == LocalInputChannel.class) { // RemoteInputChannel final LocalInputChannel ich = (LocalInputChannel) ch; if (retriggerLocalRequestTimer == null) { retriggerLocalRequestTimer = new Timer(true); } ich.retriggerSubpartitionRequest( retriggerLocalRequestTimer, consumedSubpartitionIndex); } else { throw new IllegalStateException( "Unexpected type of channel to retrigger partition: " + ch.getClass()); } } } }
3.1.8. close
Close the operation Release the resources in each InputChannel, delay the release of LocalBufferPool, and notify all if the release is successful
@Override public void close() throws IOException { boolean released = false; synchronized (requestLock) { if (!closeFuture.isDone()) { try { LOG.debug("{}: Releasing {}.", owningTaskName, this); if (retriggerLocalRequestTimer != null) { retriggerLocalRequestTimer.cancel(); } for (InputChannel inputChannel : inputChannels.values()) { try { // Release resources inputChannel.releaseAllResources(); } catch (IOException e) { LOG.warn( "{}: Error during release of channel resources: {}.", owningTaskName, e.getMessage(), e); } } // The buffer pool can actually be destroyed immediately after the // reader received all of the data from the input channels. if (bufferPool != null) { // Release bufferPool bufferPool.lazyDestroy(); } } finally { released = true; closeFuture.complete(null); } } } if (released) { synchronized (inputChannelsWithData) { // Notify all inputChannelsWithData.notifyAll(); } } }
3.1.9. getNextBufferOrEvent
The Task calls inputgate through a loop Getnextbufferoevent method obtains the input data,
The getnextbufferoevent method calls the waitAndGetNextData method
And give the obtained data to the operator it encapsulates for processing,
This constitutes the basic running logic of a Task.
/** * Task Call inputgate. Through a loop Getnextbufferoevent method obtains the input data, * And give the obtained data to the operator it encapsulates for processing, * This constitutes the basic running logic of a Task. * * @param blocking * @return * @throws IOException * @throws InterruptedException */ private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException { // Returns NULL if all partition termination events are received if (hasReceivedAllEndOfPartitionEvents) { return Optional.empty(); } // If the input gate is turned off if (closeFuture.isDone()) { throw new CancelTaskException("Input gate is already closed."); } // Read data in {blocking: blocking / non blocking} Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking); if (!next.isPresent()) { return Optional.empty(); } // Get data InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get(); // Judge whether the data is event or data according to the Buffer return Optional.of( transformToBufferOrEvent( inputWithData.data.buffer(), inputWithData.moreAvailable, inputWithData.input, inputWithData.morePriorityEvents)); }
There are two ways to obtain
Get the next Buffer in blocking mode
@Override public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException { return getNextBufferOrEvent(true); }
Non blocking get the next Buffer
@Override public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException { return getNextBufferOrEvent(false); }
3.1.10. waitAndGetNextData
Wait to get the next data... [judge whether to block the acquisition according to the parameter boolean blocking]
private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData( boolean blocking) throws IOException, InterruptedException { while (true) { synchronized (inputChannelsWithData) { Optional<InputChannel> inputChannelOpt = getChannel(blocking); if (!inputChannelOpt.isPresent()) { return Optional.empty(); } // Get the channel and determine whether it is blocking mode according to the blocking parameter final InputChannel inputChannel = inputChannelOpt.get(); Optional<BufferAndAvailability> bufferAndAvailabilityOpt = inputChannel.getNextBuffer(); if (!bufferAndAvailabilityOpt.isPresent()) { checkUnavailability(); continue; } final BufferAndAvailability bufferAndAvailability = bufferAndAvailabilityOpt.get(); if (bufferAndAvailability.moreAvailable()) { // Put input channels at the end to avoid starvation // enqueue the inputChannel at the end to avoid starvation queueChannelUnsafe(inputChannel, bufferAndAvailability.morePriorityEvents()); } final boolean morePriorityEvents = inputChannelsWithData.getNumPriorityElements() > 0; if (bufferAndAvailability.hasPriority()) { lastPrioritySequenceNumber[inputChannel.getChannelIndex()] = bufferAndAvailability.getSequenceNumber(); if (!morePriorityEvents) { priorityAvailabilityHelper.resetUnavailable(); } } // If inputChannelsWithData is empty, it is set to unavailable status checkUnavailability(); // Return packaged results return Optional.of( new InputWithData<>( inputChannel, bufferAndAvailability, !inputChannelsWithData.isEmpty(), morePriorityEvents)); }
3.1.11. transformToBufferOrEvent
Judge whether the data is an event or data according to the Buffer... Return the corresponding BufferOrEvent object instance
private BufferOrEvent transformToBufferOrEvent( Buffer buffer, boolean moreAvailable, InputChannel currentChannel, boolean morePriorityEvents) throws IOException, InterruptedException { // Judge whether the data is event or data according to the Buffer if (buffer.isBuffer()) { return transformBuffer(buffer, moreAvailable, currentChannel, morePriorityEvents); } else { return transformEvent(buffer, moreAvailable, currentChannel, morePriorityEvents); } }
private BufferOrEvent transformBuffer( Buffer buffer, boolean moreAvailable, InputChannel currentChannel, boolean morePriorityEvents) { return new BufferOrEvent( decompressBufferIfNeeded(buffer), currentChannel.getChannelInfo(), moreAvailable, morePriorityEvents); } private BufferOrEvent transformEvent( Buffer buffer, boolean moreAvailable, InputChannel currentChannel, boolean morePriorityEvents) throws IOException, InterruptedException { final AbstractEvent event; try { event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); } finally { buffer.recycleBuffer(); } //If it is the EndOfPartitionEvent event event, then if all inputchannels have received this event //Mark hasReceivedAllEndOfPartitionEvents as true and no more data can be obtained thereafter if (event.getClass() == EndOfPartitionEvent.class) { channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex()); if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) { //Due to the competitive conditions of the following parties: // 1. In this method, release the inputChannelsWithData lock and reach this position // 2. Empty data notification, re queue the channel, we can set the moreAvailable flag to true without requiring more data. // Because of race condition between: // 1. releasing inputChannelsWithData lock in this method and reaching this place // 2. empty data notification that re-enqueues a channel // we can end up with moreAvailable flag set to true, while we expect no more data. checkState(!moreAvailable || !pollNext().isPresent()); moreAvailable = false; hasReceivedAllEndOfPartitionEvents = true; markAvailable(); } currentChannel.releaseAllResources(); } return new BufferOrEvent( event, buffer.getDataType().hasPriority(), currentChannel.getChannelInfo(), moreAvailable, buffer.getSize(), morePriorityEvents); }
3.1.12. sendTaskEvent
@Override public void sendTaskEvent(TaskEvent event) throws IOException { synchronized (requestLock) { // Loop all inputchannels to call their sendTaskEvent for (InputChannel inputChannel : inputChannels.values()) { inputChannel.sendTaskEvent(event); } // If there is a queue that has not been initialized, add the Event to the queue if (numberOfUninitializedChannels > 0) { pendingEvents.add(event); } } }
3.1. 13. Recovery of consumption
@Override public void resumeConsumption(InputChannelInfo channelInfo) throws IOException { checkState(!isFinished(), "InputGate already finished."); // BEWARE: consumption resumption only happens for streaming jobs in which all slots // are allocated together so there should be no UnknownInputChannel. As a result, it // is safe to not synchronize the requestLock here. We will refactor the code to not // rely on this assumption in the future. channels[channelInfo.getInputChannelIdx()].resumeConsumption(); }
3.1.14. Channel notification
name | describe |
---|---|
void notifyChannelNonEmpty(InputChannel channel) | Callback when an InputChannel has data |
void notifyPriorityEvent(InputChannel inputChannel, int prioritySequenceNumber) | Notification event |
void triggerPartitionStateCheck(ResultPartitionID partitionId) | Trigger status monitoring of partitions |
void queueChannel(InputChannel channel, @Nullable Integer prioritySequenceNumber) | Add a new channel to the queue |
boolean queueChannelUnsafe(InputChannel channel, boolean priority) | If not already queued, queuing the channel may increase the priority. |
3.2. InputGateWithMetrics
InputGateWithMetrics is a subclass of InputGate. In fact, it has an additional Counter attribute for counting correlation
DataSinkTask is useful to
There are two properties
private final IndexedInputGate inputGate; private final Counter numBytesIn;
The IndexedInputGate in the property is actually an implementation such as SingleInputGate or UnionInputGate
3.3. UnionInputGate
UnionInputGate is composed of multiple singleinputgates. There is an inputGatesWithData queue inside
The Input gate wrapper combines inputs from multiple input gates.
Each input gates is connected with an input channel from which data can be read.
At each input gates, the input channel has a unique ID from 0 (inclusive) to the number of input channels (exclusive).
* * <pre> * +---+---+ +---+---+---+ * | 0 | 1 | | 0 | 1 | 2 | * +--------------+--------------+ * | Input gate 0 | Input gate 1 | * +--------------+--------------+ * </pre>
union input gate maps these ID s from 0 to the total number of input channels of all union input gates,
For example, the channel of input gate 0 retains its original index,
The channel index of gate 1 is set from 2 to 2 – 4.
* * <pre> * +---+---++---+---+---+ * | 0 | 1 || 2 | 3 | 4 | * +--------------------+ * | Union input gate | * +--------------------+ * </pre> *
/** * Gates, which notified this input gate about available data. We are using it as a FIFO queue * of {@link InputGate}s to avoid starvation and provide some basic fairness. */ private final PrioritizedDeque<IndexedInputGate> inputGatesWithData = new PrioritizedDeque<>();
IV InputChannel
InputGate contains the implementation of multiple inputchannels
The basic logic of InputChannel is also relatively simple,
Its life cycle is based on requestSubpartition(int subpartitionIndex),
getNextBuffer() and releaseAllResources().
According to the location of the ResultPartition consumed by InputChannel,
InputChannel has two different implementations: LocalInputChannel and RemoteInputChannel,
Corresponding to local and remote data exchange respectively.
Another implementation class of InputChannel is UnknownInputChannel,
This is equivalent to a placeholder when the ResultPartition location has not been determined,
It will eventually be updated to LocalInputChannel or RemoteInputChannel.
4.1. attribute
//Target ResultPartitionID for consumption protected final ResultPartitionID partitionId; // Belongs to a specific InputGate protected final SingleInputGate inputGate; // - Asynchronous error notification -------------------------------------- private final AtomicReference<Throwable> cause = new AtomicReference<Throwable>(); // - Partition request backoff -------------------------------------------- /** * Initialize backoff(ms) * The initial backoff (in ms). * */ protected final int initialBackoff; /** * Maximum backoff(ms) * The maximum backoff (in ms). */ protected final int maxBackoff; // Data size quantity counter protected final Counter numBytesIn; // Buffer count protected final Counter numBuffersIn; /** * Current Backoff (ms) * The current backoff (in ms). */ private int currentBackoff;
4.2. Construction method
Assignment related
protected InputChannel( SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, int initialBackoff, int maxBackoff, Counter numBytesIn, Counter numBuffersIn) { checkArgument(channelIndex >= 0); int initial = initialBackoff; int max = maxBackoff; checkArgument(initial >= 0 && initial <= max); this.inputGate = checkNotNull(inputGate); this.channelInfo = new InputChannelInfo(inputGate.getGateIndex(), channelIndex); this.partitionId = checkNotNull(partitionId); this.initialBackoff = initial; this.maxBackoff = max; this.currentBackoff = initial == 0 ? -1 : 0; this.numBytesIn = numBytesIn; this.numBuffersIn = numBuffersIn; }
4.3. method
name | describe |
---|---|
void setup() | Initialization correlation |
abstract void resumeConsumption() | Restore data consumption |
void notifyChannelNonEmpty | Callback function to inform InputGate that there is data in the current channel |
void notifyPriorityEvent(int priorityBufferNumber) | Callback function to inform InputGate that there is an event in the current channel |
void notifyBufferAvailable(int numAvailableBuffers) | Notify the number of available buffers, back pressure related |
abstract void requestSubpartition(int subpartitionIndex) | Request ResultSubpartition |
void checkpointStarted(CheckpointBarrier barrier) | checkpoint startup |
void checkpointStopped(long checkpointId) | Stop checkpoint |
abstract void sendTaskEvent(TaskEvent event) | Send TaskEvent |
abstract boolean isReleased() | Free resources |
abstract void releaseAllResources() | Free all resources |
V LocalInputChannel
LocalInputChannel is a subclass of InputChannel Used to exchange data between different threads in the same process
If an InputChannel and the Task of its consumed upstream ResultPartition run in the same TaskManager,
Then the data exchange between them is carried out between different threads in the same JVM process without network exchange.
LocalInputChannel implements both the InputChannel interface and the BufferAvailabilityListener interface.
The LocalInputChannel requests to create a resultsubpartitionview associated with the specified resultsubpartition through the ResultPartitionManager,
And take itself as the callback of resultsubpartitionview.
In this way, once the resultsubpartition has data output, the resultsubpartitionview will be notified,
At the same time, the callback function of LocalInputChannel will also be called,
In this way, consumers can obtain the production situation of data in time, so as to consume data in time.
5.1. attribute
// ------------------------------------------------------------------------ private final Object requestLock = new Object(); /** * * Partition manager, which stores all partition information * partitionManager = {ResultPartitionManager@7381} * registeredPartitions = {HashMap@7405} size = 8 * {ResultPartitionID@7416} "6b3e5e999219f9532114514c4bdbb773#0@51ad11521e991efaad6349cdf2accda7" -> {PipelinedResultPartition@7417} "PipelinedResultPartition 6b3e5e999219f9532114514c4bdbb773#0@51ad11521e991efaad6349cdf2accda7 [PIPELINED_BOUNDED, 1 subpartitions, 1 pending consumptions]" * {ResultPartitionID@7418} "6b3e5e999219f9532114514c4bdbb773#2@aecbd0682c0973976efe563eca747cc0" -> {PipelinedResultPartition@7419} "PipelinedResultPartition 6b3e5e999219f9532114514c4bdbb773#2@aecbd0682c0973976efe563eca747cc0 [PIPELINED_BOUNDED, 1 subpartitions, 1 pending consumptions]" * {ResultPartitionID@7420} "e07667949eeb5fe115288459d1d137f1#1@0ef8b3d70af60be8633af8af4e1c0698" -> {PipelinedResultPartition@7421} "PipelinedResultPartition e07667949eeb5fe115288459d1d137f1#1@0ef8b3d70af60be8633af8af4e1c0698 [PIPELINED_BOUNDED, 4 subpartitions, 4 pending consumptions]" * {ResultPartitionID@7422} "e07667949eeb5fe115288459d1d137f1#2@5e3aaeed65818bcfeb1485d0fd22d1ac" -> {PipelinedResultPartition@7423} "PipelinedResultPartition e07667949eeb5fe115288459d1d137f1#2@5e3aaeed65818bcfeb1485d0fd22d1ac [PIPELINED_BOUNDED, 4 subpartitions, 4 pending consumptions]" * {ResultPartitionID@7424} "e07667949eeb5fe115288459d1d137f1#3@30e457019371f01a403bd06cf3041eeb" -> {PipelinedResultPartition@7425} "PipelinedResultPartition e07667949eeb5fe115288459d1d137f1#3@30e457019371f01a403bd06cf3041eeb [PIPELINED_BOUNDED, 4 subpartitions, 4 pending consumptions]" * {ResultPartitionID@7426} "e07667949eeb5fe115288459d1d137f1#0@bfbc34d8314d506a39528d9c86f16859" -> {PipelinedResultPartition@7427} "PipelinedResultPartition e07667949eeb5fe115288459d1d137f1#0@bfbc34d8314d506a39528d9c86f16859 [PIPELINED_BOUNDED, 4 subpartitions, 4 pending consumptions]" * {ResultPartitionID@7428} "6b3e5e999219f9532114514c4bdbb773#1@bcf3be98463b672ea899cee1290423a2" -> {PipelinedResultPartition@7429} "PipelinedResultPartition 6b3e5e999219f9532114514c4bdbb773#1@bcf3be98463b672ea899cee1290423a2 [PIPELINED_BOUNDED, 1 subpartitions, 1 pending consumptions]" * {ResultPartitionID@7379} "5eba1007ad48ad2243891e1eff29c32b#0@db0c587a67c31a83cff5fd8be9496e5d" -> {PipelinedResultPartition@7371} "PipelinedResultPartition 5eba1007ad48ad2243891e1eff29c32b#0@db0c587a67c31a83cff5fd8be9496e5d [PIPELINED_BOUNDED, 4 subpartitions, 4 pending consumptions]" * The local partition manager. */ private final ResultPartitionManager partitionManager; /** * * taskEventPublisher = {TaskEventDispatcher@6289} * * Task event dispatcher for backwards events. * * */ private final TaskEventPublisher taskEventPublisher; /** The consumed subpartition. */ @Nullable private volatile ResultSubpartitionView subpartitionView; private volatile boolean isReleased; private final ChannelStatePersister channelStatePersister;
5.2. BufferAvailabilityListener
The LocalInputChannel implements the BufferAvailabilityListener interface
This interface has two methods
name | describe |
---|---|
void notifyDataAvailable(); | This method will be called when data arrives in the resultsubsystem to notify the InputChannel that data has arrived and can be consumed |
void notifyPriorityEvent(int prioritySequenceNumber) | Called when the first priority event is added to the buffer queue header. |
The most important is the notifyDataAvailable method Because Flink's flow calculation, considering the timeliness, adopts the push method When data arrives in the resultsubsystem, the notifyDataAvailable method will be called to notify the InputChannel that data has arrived and can be consumed
5.3. ChannelStatePersister
The persistence operation of checkpin is implemented by ChannelStatePersister Mainly look at the public void checkpoint started (checkpoint barrier) and public void checkpoint stopped (long checkpoint ID) methods
5.3.1. CheckpointBarrier
Checkpoint barriers are used to align checkpoints throughout the flow topology.
When instructed by JobManager, source will issue barriers.
When operators receive a checkpoint barrier on one of its input s, it knows that this is the point between pre checkpoint and post checkpoint data.
Once the operator receives Checkpoint barriers from all its input channels, it knows that a checkpoint has been completed.
It can trigger operator specific checkpoint behavior and broadcast barriers to downstream operators.
According to the semantic guarantee, the data after the checkpoint can be delayed until the checkpoint is completed (exactly once).
Checkpoint barriersID is strictly monotonically increasing.
The checkpoint barrier has only three properties:
private final long id; private final long timestamp; private final CheckpointOptions checkpointOptions;
So far, the read/writer method is not supported in the checkpoint barrier. All operations are controlled by the parameter checkpoint options
5.3.2. CheckpointOptions property
Some properties are defined here, such as whether it is isExactlyOnceMode, precise primary mode, checkpoint type and other related operations
/** * checkpoint Type of * Type of the checkpoint. */ private final CheckpointType checkpointType; // Checkpin persistence related /** Target location for the checkpoint. */ private final CheckpointStorageLocationReference targetLocation; //Is it an accurate once mode private final boolean isExactlyOnceMode; // Align private final boolean isUnalignedCheckpoint; // Timeout private final long alignmentTimeout;
5.3.3. CheckpointType
Checkpoint type is an enumeration class of checkpoint
There are four types of checkpoint type
name | describe |
---|---|
CHECKPOINT(false, PostCheckpointAction.NONE, "Checkpoint") | checkpoint operation |
SAVEPOINT(true, PostCheckpointAction.NONE, "Savepoint"), | savepoint operation |
SAVEPOINT_SUSPEND(true, PostCheckpointAction.SUSPEND, "Suspend Savepoint"), | Pause savepoint |
SAVEPOINT_TERMINATE(true, PostCheckpointAction.TERMINATE, "Terminate Savepoint"); | Stop savepoint |
Each type of checkpoint type can be summarized into three properties
// Is it svepont type private final boolean isSavepoint; // action private final PostCheckpointAction postCheckpointAction; // Checkpoint type name private final String name;
5.3.4. CheckpointStatus
The checkpoint status has three statuses
private enum CheckpointStatus { // complete COMPLETED, // Hang BARRIER_PENDING, // Received BARRIER_RECEIVED }
5.3.5. checkpointStarted
Start the checkpoint operation
protected void startPersisting(long barrierId, List<Buffer> knownBuffers) throws CheckpointException { logEvent("startPersisting", barrierId); // Judge that the status of the checkpoint must be received, and the last lastSeenBarrier must be greater than the currently entered barrierId if (checkpointStatus == CheckpointStatus.BARRIER_RECEIVED && lastSeenBarrier > barrierId) { throw new CheckpointException( String.format( "Barrier for newer checkpoint %d has already been received compared to the requested checkpoint %d", lastSeenBarrier, barrierId), CheckpointFailureReason .CHECKPOINT_SUBSUMED); // currently, at most one active unaligned } if (lastSeenBarrier < barrierId) { // Regardless of the current checkpoint status, if we receive notification about the nearest checkpoint, we have seen so far that the nearest barrier is always marked as suspended. // //BARRIER_RECEIVED status can occur. If we see an old barrier, it may not have been processed by the task, but the task now informs us that the checkpoint has been started for the new checkpoint. // //We should say what we know and show that we are waiting for new obstacles // Regardless of the current checkpointStatus, if we are notified about a more recent // checkpoint then we have seen so far, always mark that this more recent barrier is // pending. // BARRIER_RECEIVED status can happen if we have seen an older barrier, that probably // has not yet been processed by the task, but task is now notifying us that checkpoint // has started for even newer checkpoint. We should spill the knownBuffers and mark that // we are waiting for that newer barrier to arrive checkpointStatus = CheckpointStatus.BARRIER_PENDING; lastSeenBarrier = barrierId; } if (knownBuffers.size() > 0) { channelStateWriter.addInputData( barrierId, channelInfo, ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, CloseableIterator.fromList(knownBuffers, Buffer::recycleBuffer)); } }
5.3.6. checkpointStopped
Stop the checkpoint operation
protected void stopPersisting(long id) { logEvent("stopPersisting", id); if (id >= lastSeenBarrier) { checkpointStatus = CheckpointStatus.COMPLETED; lastSeenBarrier = id; } }
5.4. requestSubpartition
Request the sub partition corresponding to consumption
//Request the sub partition corresponding to consumption @Override protected void requestSubpartition(int subpartitionIndex) throws IOException { boolean retriggerRequest = false; boolean notifyDataAvailable = false; // The lock is required to request only once in the presence of retriggered requests. synchronized (requestLock) { checkState(!isReleased, "LocalInputChannel has been released already"); if (subpartitionView == null) { LOG.debug( "{}: Requesting LOCAL subpartition {} of partition {}. {}", this, subpartitionIndex, partitionId, channelStatePersister); try { // Local, without network communication, create a ResultSubpartitionView through ResultPartitionManager // The LocalInputChannel implements the BufferAvailabilityListener // You will be notified when there is data, and notifyDataAvailable will be called, // Then add the current channel to the available channel queue of InputGate ResultSubpartitionView subpartitionView = partitionManager.createSubpartitionView( partitionId, subpartitionIndex, this); if (subpartitionView == null) { throw new IOException("Error requesting subpartition."); } // make the subpartition view visible this.subpartitionView = subpartitionView; // check if the channel was released in the meantime if (isReleased) { subpartitionView.releaseAllResources(); this.subpartitionView = null; } else { notifyDataAvailable = true; } } catch (PartitionNotFoundException notFound) { if (increaseBackoff()) { retriggerRequest = true; } else { throw notFound; } } } } if (notifyDataAvailable) { notifyDataAvailable(); } // Do this outside of the lock scope as this might lead to a // deadlock with a concurrent release of the channel via the // input gate. if (retriggerRequest) { inputGate.retriggerPartitionRequest(partitionId.getPartitionId()); } }
5.5. ResultSubpartitionView
In the requestSubpartition method, there is the following code to build a ResultSubpartitionView Used to read the data in ResultSubpartition
5.5. 1. Construction
// Local, without network communication, create a ResultSubpartitionView through ResultPartitionManager // The LocalInputChannel implements the BufferAvailabilityListener // You will be notified when there is data, and notifyDataAvailable will be called, // Then add the current channel to the available channel queue of InputGate ResultSubpartitionView subpartitionView = partitionManager.createSubpartitionView( partitionId, subpartitionIndex, this);
Process of creating
ResultPartitionManager#createSubpartitionView --> ResultPartition#createSubpartitionView --> BufferWritingResultPartition#createSubpartitionView --> PipelinedSubpartition#createReadView --> readView = new PipelinedSubpartitionView(this, availabilityListener)
5.5. 2. Method
ResultSubpartitionView is an interface that defines some column methods
name | describe |
---|---|
BufferAndBacklog getNextBuffer() | Get an instance of {@ link Buffer} from the queue |
void notifyDataAvailable(); | Notifies the ResultSubpartition that the data is available for consumption |
default void notifyPriorityEvent(int priorityBufferNumber) {} | Event consumption of ResultSubpartition has been completed |
void releaseAllResources() throws IOException; | Free all resources |
boolean isReleased(); | Free resources |
void resumeConsumption(); | Re consumption |
Throwable getFailureCause(); | Get exception |
boolean isAvailable(int numCreditsAvailable); | Get available quota |
int unsynchronizedGetNumberOfQueuedBuffers(); | Number of unsynchronized get queued buffers |
5.6. PipelinedSubpartitionView
PipelinedSubpartitionView is the implementation class of ResultSubpartitionView
5.6. 1. Properties
/** * Identify which PipelinedSubpartition this view belongs to * The subpartition this view belongs to. */ private final PipelinedSubpartition parent; /** * When there is data, it is notified through the implementation of BufferAvailabilityListener * LocalInputChannel * perhaps * CreditBasedSequenceNumberingViewReader(RemoteInputChannel)When data comes, you can consume data */ private final BufferAvailabilityListener availabilityListener; /** * Is this view released * Flag indicating whether this view has been released. */ final AtomicBoolean isReleased;
5.6. 2. Method
name | describe |
---|---|
BufferAndBacklog getNextBuffer() | Get an instance of {@ link Buffer} from the queue |
void notifyDataAvailable(); | Notifies the ResultSubpartition that the data is available for consumption |
default void notifyPriorityEvent(int priorityBufferNumber) {} | Event consumption of ResultSubpartition has been completed |
void releaseAllResources() throws IOException; | Free all resources |
boolean isReleased(); | Free resources |
void resumeConsumption(); | Re consumption |
Throwable getFailureCause(); | Get exception |
boolean isAvailable(int numCreditsAvailable); | Get available quota |
int unsynchronizedGetNumberOfQueuedBuffers(); | Number of unsynchronized get queued buffers |
Methods basically call the methods in the parent class PipelinedSubpartition to process data
@Nullable @Override public BufferAndBacklog getNextBuffer() { return parent.pollBuffer(); } @Override public void notifyDataAvailable() { //Call back the interface to notify inputchanel of the arrival of data availabilityListener.notifyDataAvailable(); } @Override public void notifyPriorityEvent(int priorityBufferNumber) { //Call back the interface to notify inputchnel of the arrival of an event availabilityListener.notifyPriorityEvent(priorityBufferNumber); } // Free all resources @Override public void releaseAllResources() { if (isReleased.compareAndSet(false, true)) { // The view doesn't hold any resources and the parent cannot be restarted. Therefore, // it's OK to notify about consumption as well. parent.onConsumedSubpartition(); } } @Override public boolean isReleased() { return isReleased.get() || parent.isReleased(); } @Override public void resumeConsumption() { parent.resumeConsumption(); } @Override public boolean isAvailable(int numCreditsAvailable) { return parent.isAvailable(numCreditsAvailable); } @Override public Throwable getFailureCause() { return parent.getFailureCause(); } @Override public int unsynchronizedGetNumberOfQueuedBuffers() { return parent.unsynchronizedGetNumberOfQueuedBuffers(); }
5.7. retriggerSubpartitionRequest
/** Retriggers a subpartition request. */ void retriggerSubpartitionRequest(Timer timer, final int subpartitionIndex) { synchronized (requestLock) { checkState(subpartitionView == null, "already requested partition"); timer.schedule( new TimerTask() { @Override public void run() { try { requestSubpartition(subpartitionIndex); } catch (Throwable t) { setError(t); } } }, getCurrentBackoff()); } }
5.8. notifyDataAvailable
Callback to notify resultsubpartitionview that there is data available for consumption in resultsubpartition,
//Callback to notify resultsubpartitionview that there is data available for consumption in resultsubpartition, @Override public void notifyDataAvailable() { //LocalInputChannel notifies InputGate notifyChannelNonEmpty(); }
5.9. resumeConsumption
@Override public void resumeConsumption() { checkState(!isReleased, "Channel released."); subpartitionView.resumeConsumption(); if (subpartitionView.isAvailable(Integer.MAX_VALUE)) { notifyChannelNonEmpty(); } }
5.10. sendTaskEvent
@Override void sendTaskEvent(TaskEvent event) throws IOException { checkError(); checkState( subpartitionView != null, "Tried to send task event to producer before requesting the subpartition."); //Event distribution if (!taskEventPublisher.publish(partitionId, event)) { throw new IOException( "Error while publishing event " + event + " to producer. The producer could not be found."); } }
Vi RemoteInputChannel
The data receiver mainly refers to the remoteinput channel, which is used to receive and process data read from other nodes.
The remoteinputchannel corresponds to the LocalInputChannel, which is responsible for reading the local partition and does not need to use the receiver cache.
The RemoteInputChannel requests the ResultSubpartition at the remote end to create a PartitionRequestClient and send a PartitionRequest through Netty,
The id of the current InputChannel and the initial credit information will be brought:
CreditBasedPartitionRequestClientHandler reads data from the network and gives it to the RemoteInputChannel. The RemoteInputChannel will add the received to the queue and apply for floating buffer according to the accumulation of the production end
6.1. attribute
private static final int NONE = -1; /** ID to distinguish this channel from other channels sharing the same TCP connection. */ private final InputChannelID id = new InputChannelID(); /** The connection to use to request the remote partition. */ private final ConnectionID connectionId; /** The connection manager to use connect to the remote partition provider. */ private final ConnectionManager connectionManager; /** * The received buffers. Received buffers are enqueued by the network I/O thread and the queue * is consumed by the receiving task thread. */ private final PrioritizedDeque<SequenceBuffer> receivedBuffers = new PrioritizedDeque<>(); /** * Flag indicating whether this channel has been released. Either called by the receiving task * thread or the task manager actor. */ private final AtomicBoolean isReleased = new AtomicBoolean(); /** * RemoteInputChannel Request the ResultSubpartition of the remote end, * A PartitionRequestClient will be created, * And send a PartitionRequest through Netty, * The id of the current InputChannel and the initial credit information will be brought * * Client to establish a (possibly shared) TCP connection and request the partition. * */ private volatile PartitionRequestClient partitionRequestClient; /** The next expected sequence number for the next buffer. */ private int expectedSequenceNumber = 0; /** * Initialize credit value * The initial number of exclusive buffers assigned to this channel. */ private final int initialCredit; /** The number of available buffers that have not been announced to the producer yet. */ private final AtomicInteger unannouncedCredit = new AtomicInteger(0); private final BufferManager bufferManager; @GuardedBy("receivedBuffers") private int lastBarrierSequenceNumber = NONE; @GuardedBy("receivedBuffers") private long lastBarrierId = NONE; private final ChannelStatePersister channelStatePersister;
6.2. SequenceBuffer
There is a queue receivedBuffers in the attribute summary, which is used to record the received SequenceBuffer
/** * The received buffers. Received buffers are enqueued by the network I/O thread and the queue * is consumed by the receiving task thread. */ private final PrioritizedDeque<SequenceBuffer> receivedBuffers = new PrioritizedDeque<>();
SequenceBuffer is just a wrapper class with a Buffer attribute and a sequence number sequenceNumber
private static final class SequenceBuffer { final Buffer buffer; final int sequenceNumber; private SequenceBuffer(Buffer buffer, int sequenceNumber) { this.buffer = buffer; this.sequenceNumber = sequenceNumber; } }
6.3. PartitionRequestClient
RemoteInputChannel requests the ResultSubpartition of the remote end,
A PartitionRequestClient will be created and a PartitionRequest request will be sent through Netty. At this time, the id of the current InputChannel and the initial credit information will be brought
6.4. Construction method
Just a simple assignment,
Attention should be paid to the construction of buffer manager. Each RemoteinputChannel has only one buffer manager for memory management
But in fact, the final memory manager, MemorySegmentProvider globalPool, is still at the NetworkBufferPool level
public RemoteInputChannel( SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, ConnectionID connectionId, ConnectionManager connectionManager, int initialBackOff, int maxBackoff, int networkBuffersPerChannel, Counter numBytesIn, Counter numBuffersIn, ChannelStateWriter stateWriter) { super( inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, numBytesIn, numBuffersIn); this.initialCredit = networkBuffersPerChannel; this.connectionId = checkNotNull(connectionId); this.connectionManager = checkNotNull(connectionManager); // Build a BufferManager. Each RemoteinputChannel has only one BufferManager for memory management // But in fact, the final memory manager, MemorySegmentProvider globalPool, is still at the NetworkBufferPool level this.bufferManager = new BufferManager(inputGate.getMemorySegmentProvider(), this, 0); this.channelStatePersister = new ChannelStatePersister(stateWriter, getChannelInfo()); }
6.5. setup
/** * Setup includes assigning exclusive buffers to this input channel, and this method should be * called only once after this input channel is created. */ @Override void setup() throws IOException { checkState( bufferManager.unsynchronizedGetAvailableExclusiveBuffers() == 0, "Bug in input channel setup logic: exclusive buffers have already been set for this input channel."); // Apply for MemorySegment according to the initial credit value bufferManager.requestExclusiveBuffers(initialCredit); }
6.6. requestSubpartition
Request sub partition data
/** Requests a remote subpartition. */ @VisibleForTesting @Override public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException { if (partitionRequestClient == null) { LOG.debug( "{}: Requesting REMOTE subpartition {} of partition {}. {}", this, subpartitionIndex, partitionId, channelStatePersister); // Create a client and request the partition try { // Build a client and request partition //REMOTE requires network communication and uses Netty to establish a network //Establish a connection through the connection manager: create a PartitionRequestClient and initiate a request through the PartitionRequestClient partitionRequestClient = connectionManager.createPartitionRequestClient(connectionId); } catch (IOException e) { // IOExceptions indicate that we could not open a connection to the remote // TaskExecutor throw new PartitionConnectionException(partitionId, e); } //Request partition and initiate request through netty partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0); } }
6.7. getNextBuffer
Get buffer from receivedBuffers queue
@Override Optional<BufferAndAvailability> getNextBuffer() throws IOException { checkPartitionRequestQueueInitialized(); final SequenceBuffer next; final DataType nextDataType; // Get buffer from receivedBuffers queue synchronized (receivedBuffers) { next = receivedBuffers.poll(); nextDataType = receivedBuffers.peek() != null ? receivedBuffers.peek().buffer.getDataType() : DataType.NONE; } if (next == null) { if (isReleased.get()) { throw new CancelTaskException( "Queried for a buffer after channel has been released."); } return Optional.empty(); } numBytesIn.inc(next.buffer.getSize()); numBuffersIn.inc(); return Optional.of( new BufferAndAvailability(next.buffer, nextDataType, 0, next.sequenceNumber)); }
6.8. sendTaskEvent
@Override void sendTaskEvent(TaskEvent event) throws IOException { checkState( !isReleased.get(), "Tried to send task event to producer after channel has been released."); checkPartitionRequestQueueInitialized(); partitionRequestClient.sendTaskEvent(partitionId, event, this); }
6.9. notifyCreditAvailable
Notify the current channel of a new credit
/** * Enqueue this input channel in the pipeline for notifying the producer of unannounced credit. */ private void notifyCreditAvailable() throws IOException { checkPartitionRequestQueueInitialized(); //Notify the current channel of a new credit partitionRequestClient.notifyCreditAvailable(this); }
6.10. requestBuffer
/** * * This method gets a buffer from the bufferQueue and returns it. * * * Requests buffer from input channel directly for receiving network data. It should always * return an available buffer in credit-based mode unless the channel has been released. * * @return The available buffer. */ @Nullable public Buffer requestBuffer() { return bufferManager.requestBuffer(); }
6.11. onSenderBacklog
backlog is the number of accumulated buffers at the sending end. If the number of buffers in the bufferQueue is insufficient, it is necessary to request floating buffer s from the LocalBufferPool
After requesting a new buffer, notify the producer that credit is available
Allocate memory in advance according to the backlog. If the backlog plus the initial credit is greater than the number of available buffers, you need to allocate floating buffers.
/** * * backlog Is the number of accumulated buffer s at the sending end, * If the number of buffers in the bufferQueue is insufficient, the floating buffer must be requested from the LocalBufferPool * After requesting a new buffer, notify the producer that credit is available * * Allocate memory in advance according to the backlog, * If the backlog plus the initial credit is greater than the number of available buffers, a floating buffer needs to be allocated. * * Receives the backlog from the producer's buffer response. If the number of available buffers * is less than backlog + initialCredit, it will request floating buffers from the buffer * manager, and then notify unannounced credits to the producer. * * @param backlog The number of unsent buffers in the producer's sub partition. */ void onSenderBacklog(int backlog) throws IOException { // Request floating memory int numRequestedBuffers = bufferManager.requestFloatingBuffers(backlog + initialCredit); if (numRequestedBuffers > 0 && unannouncedCredit.getAndAdd(numRequestedBuffers) == 0) { notifyCreditAvailable(); } }
6.12. onBuffer
Received Buffer sent by remote resultpartition
Called by CreditBasedPartitionRequestClientHandler#decodeBufferOrEvent
/** * Received Buffer sent by remote ResultSubpartition * @param buffer * @param sequenceNumber * @param backlog * @throws IOException */ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException { // Do you want to recycle this buffer boolean recycleBuffer = true; try { // Check sequenceNumber // Serial number needs to match if (expectedSequenceNumber != sequenceNumber) { onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber)); return; } final boolean wasEmpty; boolean firstPriorityEvent = false; synchronized (receivedBuffers) { NetworkActionsLogger.traceInput( "RemoteInputChannel#onBuffer", buffer, inputGate.getOwningTaskName(), channelInfo, channelStatePersister, sequenceNumber); // Similar to notifyBufferAvailable(), make sure that we never add a buffer // after releaseAllResources() released all buffers from receivedBuffers // (see above for details). if (isReleased.get()) { return; } // Judge whether the queue before adding buffer is empty wasEmpty = receivedBuffers.isEmpty(); SequenceBuffer sequenceBuffer = new SequenceBuffer(buffer, sequenceNumber); DataType dataType = buffer.getDataType(); if (dataType.hasPriority()) { firstPriorityEvent = addPriorityBuffer(sequenceBuffer); } else { // Put the memory block filled with data into the receivedBuffers queue // Subsequent operator s can read the data cached in InputChannel through the implementation class of StreamTaskNetworkInput. // [such as StreamOneInputProcessor] receivedBuffers.add(sequenceBuffer); channelStatePersister.maybePersist(buffer); if (dataType.requiresAnnouncement()) { firstPriorityEvent = addPriorityBuffer(announce(sequenceBuffer)); } } // Add SequenceNumber ++expectedSequenceNumber; } // Data has been received and the cache does not need to be recycled recycleBuffer = false; if (firstPriorityEvent) { notifyPriorityEvent(sequenceNumber); } // If the queue before adding buffer is empty, you need to notify the corresponding inputGate. Now there is data (not empty) if (wasEmpty) { //Notify InputGate that there is new data in the current channel notifyChannelNonEmpty(); } if (backlog >= 0) { //Apply for float buffer according to the backlog of clients onSenderBacklog(backlog); } } finally { // Reclaim buffer if (recycleBuffer) { buffer.recycleBuffer(); } } }