Detailed use of Flink

Detailed use of Flink

1. Installation and deployment

install

  • Step 1: add flink-1.10.1-bin-scala_2.12.tgz upload to the server and decompress

  • Step 2: modify the conf/flink-conf.yaml file

    # Modify the jobmanager.rpc.address parameter to the jobmanager machine
    jobmanager.rpc.address: hadoop151
    
  • Step 3: modify the conf / slave file

    # slave machine
    hadoop152
    hadoop153
    
  • Step 4: distribute the entire flink directory to other machines

2. Implementation of tasks

Standalone mode

Start / stop

  • command

    # start-up
    bin/start-cluster.sh
    
    # stop it
    bin/stop-cluster.sh
    
  • Accessing web pages

    • http://hadoop151:8081

Perform tasks

# ===================Start task===================
bin/flink run -c Fully qualified class name –p Number of partitions jar package
# Example
bin/flink run -c com.itfzk.flink.wordcount.KafkaStreamWordCount -p 3 FlinkStudyDemo-1.0-SNAPSHOT-jar-with-dependencies.jar


# ===================Stop task===================
bin/flink cancel JobId
# Example
bin/flink cancel f69fbd0650ae4202b2a46b3ad2089606

Yarn mode

Session cluster mode

Start the yarn session

  • command

    # ===================Start the yarn session===================
    # -n(--container): the number of taskmanagers
    # -s(--slots): the number of slots per task manager. By default, one slot and one core. By default, the number of slots per task manager is 1. Sometimes more task managers can be used for redundancy
    # -jm: memory of JobManager in MB
    # -tm: memory per taskmanager (in MB)
    # -nm: Yarn's appName (now the name on yarn's ui)
    # -d: Background execution
    bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
    
    
    # ===================Stop yarn session===================
    yarn application -kill Application-Id
    # Example
    yarn application -kill application_1633171918776_0003
    
  • Accessing web pages

    • After starting the yarn session, the web address will appear, for example: http://hadoop153:42189

Perform tasks

# ===================Start task===================
bin/flink run -c Fully qualified class name –p Number of partitions jar package
# Example
bin/flink run -c com.itfzk.flink.wordcount.KafkaStreamWordCount -p 3 FlinkStudyDemo-1.0-SNAPSHOT-jar-with-dependencies.jar


# ===================Stop task===================
bin/flink cancel JobId
# Example
bin/flink cancel f69fbd0650ae4202b2a46b3ad2089606

Per job cluster mode

# ===================Start task===================
bin/flink run –m yarn-cluster -c Fully qualified class name –p Number of partitions jar package
# Example
bin/flink run –m yarn-cluster -c com.itfzk.flink.wordcount.KafkaStreamWordCount -p 3 FlinkStudyDemo-1.0-SNAPSHOT-jar-with-dependencies.jar


# ===================Stop task===================
bin/flink cancel JobId
# Example
bin/flink cancel f69fbd0650ae4202b2a46b3ad2089606

3. Implementation environment

Environment

getExecutionEnvironment (common)

  • Create an execution environment that represents the context of the current executing program. getExecutionEnvironment determines what kind of running environment to return according to the query running mode. It is the most common way to create an execution environment

    // General operating environment
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    // Streaming operating environment (common)
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    

createLocalEnvironment

  • To return to the local execution environment, you need to specify the default parallelism when calling

    LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
    

createRemoteEnvironment

  • Return to the cluster execution environment and submit the Jar to the remote server. You need to specify the IP and port number of the JobManager when calling, and specify the Jar package to run in the cluster

    StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123, "YOURPATH//WordCount.jar");
    

Source,Sink

Transform (operator)

map

DataStream<Integer> mapStram = dataStream.map(new MapFunction<String, Integer>() {
    public Integer map(String value) throws Exception {
    }
});

flatMap

DataStream<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>() {
    public void flatMap(String value, Collector<String> out) throws Exception {
    }
});

Filter

DataStream<Interger> filterStream = dataStream.filter(new FilterFunction<String>() {
    public boolean filter(String value) throws Exception {
    }
});

KeyBy

  • DataStream → KeyedStream
  • A stream is divided into disjoint partitions. Each partition contains elements with the same key and is implemented internally in the form of hash

Rolling aggregation operator

  • sum()
  • max()
  • min()
  • maxBy()
  • minBy()

Reduce

  • KeyedStream → DataStream
  • The aggregation operation of a grouped data stream combines the current element and the results of the last aggregation to produce a new value. The returned stream contains the results of each aggregation, rather than only the final results of the last aggregation

Split and Select

Split

  • DataStream → SplitStream
    • Split a DataStream into two or more datastreams according to some characteristics

Select

  • SplitStream→DataStream
    • Get one or more datastreams from a SplitStream
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    String filePath = "E:\\~fzk\\java\\IDEA\\bigdata\\FlinkStudyDemo\\test\\test1";
    DataStream<String> inputDataStream = env.readTextFile(filePath);
    
    DataStream<SensorsData> map = inputDataStream.map(new MapFunction<String, SensorsData>() {
        public SensorsData map(String value) throws Exception {
            String[] splits = value.split(" ");
            return new SensorsData(splits[0], new Long(splits[1]), new Double(splits[2]));
        }
    });
    
    KeyedStream<SensorsData, Tuple> keyedStream = map.keyBy("id");

    // Split: split
    SplitStream<SensorsData> splitStream = keyedStream.split(new OutputSelector<SensorsData>() {
        public Iterable<String> select(SensorsData value) {
            return value.getWendu() > 37 ? Collections.singletonList("h") : Collections.singletonList("d");
        }
    });

    // Select: select one or more datastreams
    DataStream<SensorsData> resultDataStream = splitStream.select("d");

    env.execute();
}

Connect and CoMap

Connect

  • DataStream,DataStream → ConnectedStreams: Connect two data streams that maintain their types. After the two data streams are connected, they are only placed in the same stream. Their internal data and forms remain unchanged, and the two streams are independent of each other

CoMap

  • ConnectedStreams → DataStream: it acts on ConnectedStreams. Its function is the same as that of map and flatMap. Map and flatMap are processed for each Stream in ConnectedStreams respectively
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    String filePath = "E:\\~fzk\\java\\IDEA\\bigdata\\FlinkStudyDemo\\test\\test1";
    DataStream<String> inputDataStream = env.readTextFile(filePath);

    DataStream<SensorsData> map = inputDataStream.map(new MapFunction<String, SensorsData>() {
        public SensorsData map(String value) throws Exception {
            String[] splits = value.split(" ");
            return new SensorsData(splits[0], new Long(splits[1]), new Double(splits[2]));
        }
    });

    KeyedStream<SensorsData, Tuple> keyedStream = map.keyBy("id");

    SplitStream<SensorsData> splitStream = keyedStream.split(new OutputSelector<SensorsData>() {
        public Iterable<String> select(SensorsData value) {
            return value.getWendu() > 37 ? Collections.singletonList("high") : Collections.singletonList("low");
        }
    });

    DataStream<SensorsData> highDataStream = splitStream.select("high");
    DataStream<SensorsData> lowDataStream = splitStream.select("low");

    // Connect & comapfunction: Merge
    ConnectedStreams<SensorsData, SensorsData> connectedStreams = highDataStream.connect(lowDataStream);
    /*
    	new CoMapFunction<SensorsData, SensorsData, Object>
        First parameter: the first data type of confluence
        Second parameter: the second data type of confluence
        Third parameter: return type of confluence
     */
    DataStream<Object> resultDataStream = connectedStreams.map(new CoMapFunction<SensorsData, SensorsData, Object>() {
        public Object map1(SensorsData value) throws Exception {
            return value;
        }

        public Object map2(SensorsData value) throws Exception {
            return value;
        }
    });

    env.execute();
}

Union

  • DataStream → DataStream: union two or more datastreams to generate a new DataStream containing all DataStream elements
  • The type of connection flow is the same

4. Window

Official website address

  • TimeWindow: generates a Window by time

  • CountWindow: generates a Window according to the specified number of data pieces, regardless of time

TimeWindow

// The window size is 10s, sliding for 5s each time
DataStream<Tuple2<String, Integer>> resultDataStream = stringDataSource
                .flatMap(new MyFlatMapFunction())
                .keyBy(0)
                .timeWindow(Time.seconds(10), Time.seconds(5))
                .sum(1);

CountWindow

// The window size is 10, sliding 5 at a time
DataStream<Tuple2<String, Integer>> resultDataStream = stringDataSource
                .flatMap(new MyFlatMapFunction())
                .keyBy(0)
                .countWindow(10, 5)
                .sum(1);

5. Temporal semantics and Watermark

Temporal semantics

  • Event Time: is the time when the event was created
  • Ingestion Time: the time when data enters Flink
  • Processing Time: refers to the local system time of each operator executing time-based operation

Introduction of temporal semantics

// EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// IngestionTime
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

// ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

Watermark (used with Window)

  • Watermark is a mechanism to measure the progress of Event Time
  • Watermark is used to handle out of order events, and the correct handling of out of order events is usually realized by watermark mechanism combined with window
  • Watermark can be understood as a delay trigger mechanism. We can set the delay time t of watermark. Each time, the system will verify the maximum maxEventTime of the arrived data, and then determine that all data whose eventTime is less than maxEventTime - t have arrived. If the stop time of a window is equal to maxEventTime – t, the window will be triggered for execution

Out of order time reference

  • Interface: AssignerWithPeriodicWatermarks

  • Set time semantics before use

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        //Set time semantics
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //Generate watermar periodically. The default cycle is 200 milliseconds
        env.getConfig().setAutoWatermarkInterval(5000);
    
        String filePath = "E:\\~fzk\\java\\IDEA\\bigdata\\FlinkStudyDemo\\test\\test1";
        DataStream<String> inputDataStream = env.readTextFile(filePath);
    
        DataStream<SensorsData> map = inputDataStream.map(new MapFunction<String, SensorsData>() {
            public SensorsData map(String value) throws Exception {
                String[] splits = value.split(" ");
                return new SensorsData(splits[0], new Long(splits[1]), new Double(splits[2]));
            }
        });
    
        //watermark in the case of out of order time
        //Time.milliseconds(1000): delay time, 1000ms
        DataStream<SensorsData> eventTimeDataStream = map.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorsData>(Time.milliseconds(1000)) {
            @Override
            public long extractTimestamp(SensorsData element) {
                return element.getTimestamp();
            }
        });
    
        env.execute();
    }
    
    
    // class
    public class SensorsData {
        private String id;
        private Long timestamp;
        private double wendu;
    }
    

Reference to sequential time

  • Interface: AssignerWithPunctuatedWatermarks

  • Set time semantics before use

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        //Set time semantics
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
        String filePath = "E:\\~fzk\\java\\IDEA\\bigdata\\FlinkStudyDemo\\test\\test1";
        DataStream<String> inputDataStream = env.readTextFile(filePath);
    
        DataStream<SensorsData> map = inputDataStream.map(new MapFunction<String, SensorsData>() {
            public SensorsData map(String value) throws Exception {
                String[] splits = value.split(" ");
                return new SensorsData(splits[0], new Long(splits[1]), new Double(splits[2]));
            }
        });
    
        //watermark in sequential time
        DataStream<SensorsData> eventTimeDataStream = map.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorsData>() {
            @Override
            public long extractAscendingTimestamp(SensorsData element) {
                return element.getTimestamp();
            }
        });
    
        env.execute();
    }
    
    
    // class
    public class SensorsData {
        private String id;
        private Long timestamp;
        private double wendu;
    }
    

6. Status management

keyed state

  • The keying status is based on the key defined in the input data stream To maintain and access. Flink maintains a state instance for each key value, and partitions all data with the same key into the same operator task. This task will maintain and process the state corresponding to this key. When the task processes a piece of data, it will automatically limit the access range of the state to the key of the current data. Therefore, all numbers with the same key Data will access the same state. Keyed State is very similar to a distributed key value map data structure and can only be used for KeyedStream (after keyBy operator processing)
  • Store a status value

Keyed State supports data types

  • Valuestate < T > saves a single value of type T
    • get operation: ValueState.value()
    • set operation: ValueState.update(T value)
  • Liststate < T > saves a list. The data type of the elements in the list is t
    • ListState.add(T value)
    • ListState.addAll(List<T> values)
    • ListState.get() returns iteratable < T >
    • ListState.update(List<T> values)
  • Mapstate < K, V > save key value pair
    • MapState.get(UK key)
    • MapState.put(UK key, UV value)
    • MapState.contains(UK key)
    • MapState.remove(UK key)

Example: ValueState

  • We can use Keyed State to realize such a requirement: detect the temperature value of the sensor. If the temperature difference between two consecutive temperatures exceeds 10 degrees, an alarm will be output

    public class Test {
        public static void main(String[] args) throws Exception {
            //Create execution phase
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            DataStream<String> inputDataStream = env.socketTextStream("localhost", 9999);
    
            SingleOutputStreamOperator<MyBean> myBeanDataStream = inputDataStream.map(new MapFunction<String, MyBean>() {
                @Override
                public MyBean map(String s) throws Exception {
                    String[] split = s.split(" ");
                    return new MyBean(split[0], Double.valueOf(split[1]));
                }
            });
    
            SingleOutputStreamOperator<Tuple3<String, Double, Double>> resultDataStream = myBeanDataStream
                    .keyBy((KeySelector<MyBean, String>) data -> data.getId())
                    .flatMap(new MyRichFlatMapFunction(10.0));
    
            resultDataStream.print();
    
            env.execute();
        }
    }
    
    
    // Rich methods: storing status values
    public class MyRichFlatMapFunction extends RichFlatMapFunction<MyBean, Tuple3<String, Double, Double>> {
        private ValueState<Double> myValueState;
      
        private Double abs;
    
        public MyRichFlatMapFunction(Double abs) {
            this.abs = abs;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            // Create status value
            myValueState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("my-flatmap", Double.class));
        }
    
        @Override
        public void flatMap(MyBean myBean, Collector<Tuple3<String, Double, Double>> collector) throws Exception {
            // Get status value
            Double lastWendu = myValueState.value();
            if(lastWendu != null){
                double absWebdu = Math.abs(myBean.getWendu() - lastWendu);
                if (absWebdu > abs){
                    collector.collect(new Tuple3<>(myBean.getId(), lastWendu, myBean.getWendu()));
                }
            }
            // Modify status value
            myValueState.update(myBean.getWendu());
        }
    
        @Override
        public void close() throws Exception {
            // Clear status value
            myValueState.clear();
        }
    }
    
    
    // Entity class
    public class MyBean {
        private String id;
        private Double wendu;
    }
    

7. ProcessFunction API

  • The DataStream API provides a series of low level conversion operators. You can access timestamps, watermark s, and register scheduled events. You can also output specific events, such as timeout events
  • Flink provides eight process functions
    • ProcessFunction
    • KeyedProcessFunction
    • CoProcessFunction
    • ProcessJoinFunction
    • BroadcastProcessFunction
    • KeyedBroadcastProcessFunction
    • ProcessWindowFunction
    • ProcessAllWindowFunction

KeyedProcessFunction

  • KeyedProcessFunction is used to operate KeyedStream. KeyedProcessFunction will process each element of the stream and output 0, 1 or more elements. All process functions inherit from the RichFunction interface, so they have methods such as open(), close(), and getRuntimeContext().

  • Keyedprocessfunction < K, I, O > also provides two additional methods

    • processElement
      • Each element in the flow will call this method, and the call result will be output in the Collector data type. Context can access the timestamp of the element, the key of the element, and the TimerService time service. Context can also output the results to other streams (side outputs)
    • onTimer
      • Callback function. Called when a previously registered timer is triggered. The parameter timestamp is the trigger timestamp set by the timer. Collector is a collection of output results. OnTimerContext, like the Context parameter of processElement, provides some information about the Context, such as the time information triggered by the timer (event time or processing time)
    class MyKeyedProcessFunction extends KeyedProcessFunction<Tuple, MyBean, MyBean> {
        @Override
        public void open(Configuration parameters) throws Exception {
        }
    
        @Override
        public void processElement(MyBean myBean, KeyedProcessFunction<Tuple, MyBean, MyBean>.Context context, Collector<MyBean> collector) throws Exception {
            collector.collect(myBean);
        }
    
        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Tuple, MyBean, MyBean>.OnTimerContext ctx, Collector<MyBean> out) throws Exception {
        }
    
        @Override
        public void close() throws Exception {
        }
    }
    

TimerService and Timers

  • The TimerService objects held by Context and OnTimerContext have the following methods

    • long currentProcessingTime(): returns the current processing time
    • long currentWatermark(): returns the timestamp of the current watermark
    • Void registerprocessingtimer (long timestamp): registers the timer of the processing time of the current key. When the processing time reaches the timing time, the timer is triggered.
    • Void registereventtimer (long timestamp): registers the event time timer of the current key. When the water level is greater than or equal to the time registered by the timer, the timer is triggered to execute the callback function.
    • Void deleteprocessingtimer (long timestamp): deletes the previously registered processing time timer. If there is no timer with this timestamp, it will not be executed
    • Void deleteeventtimer (long timestamp): deletes the previously registered event timer. If there is no timer with this timestamp, it will not be executed
    class MyKeyedProcessFunction extends KeyedProcessFunction<Tuple, MyBean, MyBean> {
        @Override
        public void processElement(MyBean myBean, KeyedProcessFunction<Tuple, MyBean, MyBean>.Context context, Collector<MyBean> collector) throws Exception {
            long currentProcessingTime = context.timerService().currentProcessingTime();
            long currentWatermark = context.timerService().currentWatermark();
            context.timerService().registerProcessingTimeTimer(10000l);
            context.timerService().registerEventTimeTimer(10000l);
            context.timerService().deleteProcessingTimeTimer(10000l);
            context.timerService().deleteEventTimeTimer(10000l);
        }
    }
    

Side output stream (SideOutput)

  • The side outputs function of process function can generate multiple streams, and the data types of these streams can be different. A side output can be defined as an OutputTag[X] object. X is the data type of the output stream. Process function can emit an event to one or more side outputs through the Context object

  • Example: monitor the temperature value of the sensor and output the data with the temperature value lower than 30 degrees to the side output

    public class Test {
        public static void main(String[] args) throws Exception {
            //Create execution phase
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setParallelism(1);
    
            DataStream<String> inputDataStream = env.socketTextStream("localhost", 9999);
    
            SingleOutputStreamOperator<Tuple2<String, Double>> myBeanDataStream = inputDataStream.map(new MapFunction<String, Tuple2<String, Double>>() {
                @Override
                public Tuple2<String, Double> map(String s) throws Exception {
                    String[] split = s.split(" ");
                    return new Tuple2<String, Double>(split[0], Double.valueOf(split[1]));
                }
            });
    
            //Define side output stream
            OutputTag<Tuple2<String, Double>> outputTag = new OutputTag<Tuple2<String, Double>>("high-output") {};
    
            // Use custom operator: ProcessFunction
            SingleOutputStreamOperator<Tuple2<String, Double>> resultDataStream = myBeanDataStream
                    .process(new MyProcessFunction(30.0, outputTag));
    
            resultDataStream.print("low-wendu");
    
            // Get side output stream and output
            resultDataStream.getSideOutput(outputTag).print("high-wendu");
    
            env.execute();
        }
    
    
    
        private static class MyProcessFunction extends ProcessFunction<Tuple2<String, Double>, Tuple2<String, Double>> {
            private Double wenduLimit;
            private OutputTag<Tuple2<String, Double>> outputTag;
    
            // initialization
            public MyProcessFunction(Double wenduLimit, OutputTag<Tuple2<String, Double>> outputTag) {
                this.wenduLimit = wenduLimit;
                this.outputTag = outputTag;
            }
    
            @Override
            public void processElement(Tuple2<String, Double> myBean, ProcessFunction<Tuple2<String, Double>, Tuple2<String, Double>>.Context context, Collector<Tuple2<String, Double>> collector) throws Exception {
                // If the temperature is higher than the limit temperature, the data will be added to the side output stream, otherwise it will be output normally
                if(myBean.f1 > wenduLimit){
                    context.output(outputTag, myBean);
                }else {
                    collector.collect(myBean);
                }
            }
        }
    }
    

8. CheckPoint

  • Flink periodically saves consistent checkpoints of state during the execution of streaming applications
  • If a failure occurs, Flink will use the most recent checkpoint to consistently restore the state of the application and restart the process
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
  
    // Checkpoint data storage location
    // RocksDB
    env.setStateBackend(new RocksDBStateBackend(new URI("hdfs:///tmp/check-point-test")));
    // file
    //env.setStateBackend(new FsStateBackend("file:///tmp/check-point-test"));
    // Memory
    //env.setStateBackend(new MemoryStateBackend());

    // Checkpoint configuration
    // Set checkpoint cycle time
    env.enableCheckpointing(10000l);
    // Set exactly once mode
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    // Sets the maximum time that a checkpoint may take before it is discarded
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    // Sets the minimum pause between checkpoint attempts
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    // Set the maximum number of checkpoint attempts that may occur simultaneously
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    // Enables checkpoints to persist externally when owned jobs fail or pause
    env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    // Restart strategy
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 60000));
}

9. Status consistency

classification

  • AT-MOST-ONCE (at most once)
    • When a task fails, the simplest way is to do nothing, neither recover the lost state nor replay the lost data. At most one event can be processed
  • AT-LEAST-ONCE
    • In most real application scenarios, we want to not lose events. All events are processed, and some events may be processed multiple times
  • Exactly once
    • Processing semantics exactly once not only means that no events are lost, but also means that the internal state is updated only once for each data

End to end exactly once

  • Internal assurance: checkpoint
  • source: resets the reading position of the data
  • sink: when recovering from a failure, the data will not be repeatedly written to the external system
    • Idempotent write
    • Transaction write

Transaction write

  • The constructed transaction corresponds to the checkpoint. When the checkpoint is really completed, all the corresponding results are written to the sink system
  • Implementation mode
    • Pre write log
    • Two stage submission

Pre write log (not commonly used)

  • Save the result data as a state first, and then write it to the sink system at one time when receiving the notification of checkpoint completion
  • It is simple and easy to implement. Since the data is cached in the state backend in advance, any sink system can be done in batch in this way
  • The DataStream API provides a template class: GenericWriteAheadSink to implement this transactional sink

Two stage submission

  • For each checkpoint, the sink task will start a transaction and add all the received data to the transaction
  • The data is then written to the external sink system without committing them -- this is just a "pre commit."
  • When it receives the notification of checkpoint completion, it formally commits the transaction to realize the real writing of the result

Flink+Kafka end-to-end state consistency assurance

  • inside
    • The c heckpoint mechanism saves the status to disk and can be recovered in case of failure to ensure the consistency of internal status
  • source
    • As a source, kafka consumer can save the offset. If subsequent tasks fail, the connector can reset the offset and consume data again to ensure consistency
  • sink
    • kafka producer, as a sink, submits the sink in two stages, and needs to implement a TwoPhaseCommitSinkFunction

Maven(pom.xml)

<dependencies>
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-java</artifactId>-->
<!--            <version>${flink.version}</version>-->
<!--        </dependency>-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
    </dependency>
    <!-- fix start-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- fix end -->

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <exclusions>
            <exclusion>
                <artifactId>kafka-clients</artifactId>
                <groupId>org.apache.kafka</groupId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<!--            <scope>provided</scope>-->
        <!--            local test-->
        <!--            <scope>compile</scope>-->
        <exclusions>
            <exclusion>
                <artifactId>slf4j-api</artifactId>
                <groupId>org.slf4j</groupId>
            </exclusion>
            <exclusion>
                <artifactId>commons-collections</artifactId>
                <groupId>commons-collections</groupId>
            </exclusion>
        </exclusions>
        <version>${flink.version}</version>
    </dependency>

    <!--Add the following two dependencies to appear Flink Log out-->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.25</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.13</version>
    </dependency>

    <!--Toolkit dependency-->
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>18.0</version>
    </dependency>
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.8.5</version>
    </dependency>

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.9.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5.2</version>
        <exclusions>
            <exclusion>
                <artifactId>commons-logging</artifactId>
                <groupId>commons-logging</groupId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.4</version>
    </dependency>

    <dependency>
        <groupId>com.jayway.jsonpath</groupId>
        <artifactId>json-path</artifactId>
        <version>2.4.0</version>
        <scope>compile</scope>
    </dependency>

    <dependency>
        <groupId>joda-time</groupId>
        <artifactId>joda-time</artifactId>
        <version>2.9.9</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
    <!--cglib-->
    <dependency>
        <groupId>asm</groupId>
        <artifactId>asm</artifactId>
        <version>3.3.1</version>
    </dependency>
    <dependency>
        <groupId>asm</groupId>
        <artifactId>asm-commons</artifactId>
        <version>3.3.1</version>
    </dependency>
    <dependency>
        <groupId>asm</groupId>
        <artifactId>asm-util</artifactId>
        <version>3.3.1</version>
    </dependency>
    <dependency>
        <groupId>cglib</groupId>
        <artifactId>cglib-nodep</artifactId>
        <version>2.2.2</version>
    </dependency>

    <!--state backend-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <artifactId>slf4j-api</artifactId>
                <groupId>org.slf4j</groupId>
            </exclusion>
        </exclusions>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>com.beust</groupId>
        <artifactId>jcommander</artifactId>
        <version>1.72</version>
    </dependency>

    <!-- Alibaba connection pool -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid</artifactId>
        <version>1.1.21</version>
    </dependency>
    <dependency>
        <groupId>commons-dbutils</groupId>
        <artifactId>commons-dbutils</artifactId>
        <version>1.7</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.72</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_${scala.binary.version}</artifactId>
        <version>${kafka.version}</version>
        <scope>compile</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-collections4</artifactId>
        <version>4.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.github.oshi/oshi-core -->
    <dependency>
        <groupId>com.github.oshi</groupId>
        <artifactId>oshi-core</artifactId>
        <version>3.5.0</version>
    </dependency>
</dependencies>

Keywords: Big Data flink

Added by godwisam on Tue, 12 Oct 2021 02:05:15 +0300