Reading Flink source code and talking about design: clean architecture in FileSystemConnector

This article was first published in Floating finch language:

1.02022.3.8Article launch

This paper analyzes the code based on Flink 1.14.

0. Preface

A while ago, I encountered a strange phenomenon in production: the full-scale operation cannot be carried out normally, and the log is full of Java util. concurrent. Timeoutexception: error message of heartbeat of taskmanager with ID container xxxx (hostname: port) timed out.

The scenario is that the full amount of Oracle is extracted to Hive, and the data will flow through Kafka. The amount of data is T-level. A partition is made every day according to the time field. The Job reporting an error is responsible for extracting Kafka data and writing it to Hive, using the TableAPI.

1. Troubleshooting ideas

When this problem was reported to me, some students had checked it for a round. According to the online search, you will be informed that it may be that yarn is under too much pressure and the network is temporarily unstable. You can turn up heartbeat Timeout to alleviate this problem, which has not been solved after adjustment.

Another statement will tell you why GC is frequent. It is recommended to adjust the memory. After adjustment, it does have a certain effect (slow down the problem time). That obviously has something to do with the code.

Because there was no problem with the synchronization data of the previous version, I began to look for the recent code changes. After several rounds, I didn't find the suspicious code. Suddenly feel a little scalp numbness. So let the students switch to the previous version and continue to do the full amount, and the phenomenon will still occur.

At this time, I was a little skeptical about the characteristics of the production environment - such as data characteristics, but the students on the scene told me that there was nothing special about the data. So I asked for an on-site HeapDump, threw it on the analysis software for inspection, and found org apache. flink. streaming. api. functions. sink. filesystem. Bucket has many objects.

So let's take a look at the definition of Bucket object:

 * A bucket is the directory organization of the output of the {@link StreamingFileSink}.
 * <p>For each incoming element in the {@code StreamingFileSink}, the user-specified {@link
 * BucketAssigner} is queried to see in which bucket this element should be written to.
public class Bucket<IN, BucketID> {

good heavens. One directory and one object. At this moment, I have doubts about the "data has nothing special" told me by the students on the scene. However, for the sake of the actual hammer, I followed the code again:

|-- HiveTableSink
   \-- createStreamSink
|-- StreamingFileSink
  \-- initializeState
|-- StreamingFileSinkHelper
  \-- constructor
|-- HadoopPathBasedBulkFormatBuilder
  \-- createBuckets
|-- Buckets
  \-- onElement
  \-- getOrCreateBucketForBucketId

After going through the code, I have a number in my heart. I asked if the time span of synchronized data is particularly large. After confirmation by the students on site, the time span is more than 3 years. Therefore, it is suggested to reduce the time span or partition time. Finally, this problem was solved after cutting the whole batch.

2. Curiosity after solving the problem

If each directory will generate a Bucket, if you run a flow job, you will encounter the same problem sooner or later. The great gods of the community must have thought of such an obvious problem long ago. Curiosity drove me to find the answer - until I saw this Code:

    public void commitUpToCheckpoint(final long checkpointId) throws IOException {
        final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
                "Subtask {} received completion notification for checkpoint with id={}.",

        while (activeBucketIt.hasNext()) {
            final Bucket<IN, BucketID> bucket =;

            if (!bucket.isActive()) {
                // We've dealt with all the pending files and the writer for this bucket is not
                // currently open.
                // Therefore this bucket is currently inactive and we can remove it from our state.

When submitting after Checkpoint, you will decide whether to remove the data structure maintained in memory according to whether the Bucket is active.

So what is active? The code is short:

    boolean isActive() {
        return inProgressPart != null
                || !pendingFileRecoverablesForCurrentCheckpoint.isEmpty()
                || !pendingFileRecoverablesPerCheckpoint.isEmpty();

The next step is to clarify the trigger conditions of these three.

2.1 inProgressPart == null

The type of this object is InProgressFileWriter, and the trigger condition is closely related to the rolling strategy of the FileSystem.

 * The policy based on which a {@code Bucket} in the {@code Filesystem Sink} rolls its currently
 * open part file and opens a new one.
public interface RollingPolicy<IN, BucketID> extends Serializable {

     * Determines if the in-progress part file for a bucket should roll on every checkpoint.
     * @param partFileState the state of the currently open part file of the bucket.
     * @return {@code True} if the part file should roll, {@link false} otherwise.
    boolean shouldRollOnCheckpoint(final PartFileInfo<BucketID> partFileState) throws IOException;

     * Determines if the in-progress part file for a bucket should roll based on its current state,
     * e.g. its size.
     * @param element the element being processed.
     * @param partFileState the state of the currently open part file of the bucket.
     * @return {@code True} if the part file should roll, {@link false} otherwise.
    boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState, IN element)
            throws IOException;

     * Determines if the in-progress part file for a bucket should roll based on a time condition.
     * @param partFileState the state of the currently open part file of the bucket.
     * @param currentTime the current processing time.
     * @return {@code True} if the part file should roll, {@link false} otherwise.
    boolean shouldRollOnProcessingTime(
            final PartFileInfo<BucketID> partFileState, final long currentTime) throws IOException;

These three interfaces correspond to whether the currently open file should be closed in some cases:

  • Shouldlolloncheck point: check before Checkpoint.
  • Shouldlollonevent: check whether it should be closed according to the current status. For example, whether the current buffer size exceeds the limit.
  • Shouldlollonprocessingtime: check whether the current opening time is too long to judge whether the disk meets the closing conditions.

2.2 pendingFileRecoverablesForCurrentCheckpoint isNotEmpty

The elements are also triggered according to RollingPolicy, so there is no need to explain too much.

2.3 pendingFileRecoverablesPerCheckpoint isNotEmpty

Based on pendingFileRecoverablesForCurrentCheckpoint isNotEmpty. Use a dictionary to save a CheckpointId and list < inprogressfilewriter Pendingfilerecoverable >.

2.4 inactive Bucket

In combination with the above conditions, the directories that have closed and completed all checkpoints are inactive buckets. The inspection time is generally:

  1. When the Task recovers, read the previous state from StateBackend and check it
  2. After the Checkpoint is completed, an inspection will be conducted

When the Bucket becomes Inactive, a notification of Inactive will be made. Inform the downstream that the data of the partition has been submitted and becomes readable. See issue: artition commit is delayed when records keep coming

Filesystem3. In clean architecture

After understanding the above knowledge points, I noticed that there is such a Proposal: FLIP-115: Filesystem connector in Table . According to this Proposal, I simply read the relevant source code and found that its implementation is also an embodiment of a clean architecture.

We have analyzed the source code above. Next, we will analyze the abstract design, responsibilities and layers:

|-- HiveTableSink  #Table level API, which is responsible for external. Users can call it directly
|-- StreamingFileSink  #Streaming API, which can also be external, is located below the TableAPI
|-- StreamingFileSinkHelper #The logic of TimeService is integrated to facilitate the periodic closing of buckets; And the distribution of data to the Bucket. This class is also used by AbstractStreamingWriter, and it is also recommended to reuse it for RichSinkFunction or StreamOperator
|-- BucketsBuilder #The specific class transferred to the scene is HadoopPathBasedBulkFormatBuilder, which will focus on the concrete implementation of Buckets with the specific implementation of BucketWriter.
|-- Buckets #This is a class that manages the Bucket life cycle. There are several key member objects
  |-- BucketWriter  #It will correspond to the specific file system implementation and the written Format
  |-- RolingPolicy  #Rolling strategy, mentioned earlier, will not be discussed in depth
  |-- BucketAssigner #Determine which Bucket each element is output to. For example, key or date, etc
  |-- BucketFactory #Responsible for creating each Bucket

Due to the fine granularity of responsibility segmentation, the data flow logic is decoupled from the external specific implementation. Let's give several examples:

  1. If we want to call Hive write based on our DSL, we only need to write a hivedsllink similar to HiveTableSink.
  2. If a data warehouse (data Lake) has been increasing the support of its underlying file system, when the first set of code is built, it only needs to implement the corresponding BucketWriter and file system in the future.
  3. If a data warehouse (data Lake) has been adding its own supported Format, then when the first set of code is built, the subsequent only need to implement the corresponding BucketWriter.

Based on this design, the core logic often does not change, and the parts that are easy to change are isolated, and the quality of the whole module will be easier to be guaranteed.

Keywords: Java architecture flink

Added by nyfael on Wed, 09 Mar 2022 02:03:12 +0200