Detailed use of Flink
1. Installation and deployment
install
-
Step 1: add flink-1.10.1-bin-scala_2.12.tgz upload to the server and decompress
-
Step 2: modify the conf/flink-conf.yaml file
# Modify the jobmanager.rpc.address parameter to the jobmanager machine jobmanager.rpc.address: hadoop151
-
Step 3: modify the conf / slave file
# slave machine hadoop152 hadoop153
-
Step 4: distribute the entire flink directory to other machines
2. Implementation of tasks
Standalone mode
Start / stop
-
command
# start-up bin/start-cluster.sh # stop it bin/stop-cluster.sh
-
Accessing web pages
- http://hadoop151:8081
Perform tasks
# ===================Start task=================== bin/flink run -c Fully qualified class name –p Number of partitions jar package # Example bin/flink run -c com.itfzk.flink.wordcount.KafkaStreamWordCount -p 3 FlinkStudyDemo-1.0-SNAPSHOT-jar-with-dependencies.jar # ===================Stop task=================== bin/flink cancel JobId # Example bin/flink cancel f69fbd0650ae4202b2a46b3ad2089606
Yarn mode
Session cluster mode
Start the yarn session
-
command
# ===================Start the yarn session=================== # -n(--container): the number of taskmanagers # -s(--slots): the number of slots per task manager. By default, one slot and one core. By default, the number of slots per task manager is 1. Sometimes more task managers can be used for redundancy # -jm: memory of JobManager in MB # -tm: memory per taskmanager (in MB) # -nm: Yarn's appName (now the name on yarn's ui) # -d: Background execution bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d # ===================Stop yarn session=================== yarn application -kill Application-Id # Example yarn application -kill application_1633171918776_0003
-
Accessing web pages
- After starting the yarn session, the web address will appear, for example: http://hadoop153:42189
Perform tasks
# ===================Start task=================== bin/flink run -c Fully qualified class name –p Number of partitions jar package # Example bin/flink run -c com.itfzk.flink.wordcount.KafkaStreamWordCount -p 3 FlinkStudyDemo-1.0-SNAPSHOT-jar-with-dependencies.jar # ===================Stop task=================== bin/flink cancel JobId # Example bin/flink cancel f69fbd0650ae4202b2a46b3ad2089606
Per job cluster mode
# ===================Start task=================== bin/flink run –m yarn-cluster -c Fully qualified class name –p Number of partitions jar package # Example bin/flink run –m yarn-cluster -c com.itfzk.flink.wordcount.KafkaStreamWordCount -p 3 FlinkStudyDemo-1.0-SNAPSHOT-jar-with-dependencies.jar # ===================Stop task=================== bin/flink cancel JobId # Example bin/flink cancel f69fbd0650ae4202b2a46b3ad2089606
3. Implementation environment
Environment
getExecutionEnvironment (common)
-
Create an execution environment that represents the context of the current executing program. getExecutionEnvironment determines what kind of running environment to return according to the query running mode. It is the most common way to create an execution environment
// General operating environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // Streaming operating environment (common) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
createLocalEnvironment
-
To return to the local execution environment, you need to specify the default parallelism when calling
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
createRemoteEnvironment
-
Return to the cluster execution environment and submit the Jar to the remote server. You need to specify the IP and port number of the JobManager when calling, and specify the Jar package to run in the cluster
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123, "YOURPATH//WordCount.jar");
Source,Sink
Transform (operator)
map
DataStream<Integer> mapStram = dataStream.map(new MapFunction<String, Integer>() { public Integer map(String value) throws Exception { } });
flatMap
DataStream<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>() { public void flatMap(String value, Collector<String> out) throws Exception { } });
Filter
DataStream<Interger> filterStream = dataStream.filter(new FilterFunction<String>() { public boolean filter(String value) throws Exception { } });
KeyBy
- DataStream → KeyedStream
- A stream is divided into disjoint partitions. Each partition contains elements with the same key and is implemented internally in the form of hash
Rolling aggregation operator
- sum()
- max()
- min()
- maxBy()
- minBy()
Reduce
- KeyedStream → DataStream
- The aggregation operation of a grouped data stream combines the current element and the results of the last aggregation to produce a new value. The returned stream contains the results of each aggregation, rather than only the final results of the last aggregation
Split and Select
Split
- DataStream → SplitStream
- Split a DataStream into two or more datastreams according to some characteristics
Select
- SplitStream→DataStream
- Get one or more datastreams from a SplitStream
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String filePath = "E:\\~fzk\\java\\IDEA\\bigdata\\FlinkStudyDemo\\test\\test1"; DataStream<String> inputDataStream = env.readTextFile(filePath); DataStream<SensorsData> map = inputDataStream.map(new MapFunction<String, SensorsData>() { public SensorsData map(String value) throws Exception { String[] splits = value.split(" "); return new SensorsData(splits[0], new Long(splits[1]), new Double(splits[2])); } }); KeyedStream<SensorsData, Tuple> keyedStream = map.keyBy("id"); // Split: split SplitStream<SensorsData> splitStream = keyedStream.split(new OutputSelector<SensorsData>() { public Iterable<String> select(SensorsData value) { return value.getWendu() > 37 ? Collections.singletonList("h") : Collections.singletonList("d"); } }); // Select: select one or more datastreams DataStream<SensorsData> resultDataStream = splitStream.select("d"); env.execute(); }
Connect and CoMap
Connect
- DataStream,DataStream → ConnectedStreams: Connect two data streams that maintain their types. After the two data streams are connected, they are only placed in the same stream. Their internal data and forms remain unchanged, and the two streams are independent of each other
CoMap
- ConnectedStreams → DataStream: it acts on ConnectedStreams. Its function is the same as that of map and flatMap. Map and flatMap are processed for each Stream in ConnectedStreams respectively
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String filePath = "E:\\~fzk\\java\\IDEA\\bigdata\\FlinkStudyDemo\\test\\test1"; DataStream<String> inputDataStream = env.readTextFile(filePath); DataStream<SensorsData> map = inputDataStream.map(new MapFunction<String, SensorsData>() { public SensorsData map(String value) throws Exception { String[] splits = value.split(" "); return new SensorsData(splits[0], new Long(splits[1]), new Double(splits[2])); } }); KeyedStream<SensorsData, Tuple> keyedStream = map.keyBy("id"); SplitStream<SensorsData> splitStream = keyedStream.split(new OutputSelector<SensorsData>() { public Iterable<String> select(SensorsData value) { return value.getWendu() > 37 ? Collections.singletonList("high") : Collections.singletonList("low"); } }); DataStream<SensorsData> highDataStream = splitStream.select("high"); DataStream<SensorsData> lowDataStream = splitStream.select("low"); // Connect & comapfunction: Merge ConnectedStreams<SensorsData, SensorsData> connectedStreams = highDataStream.connect(lowDataStream); /* new CoMapFunction<SensorsData, SensorsData, Object> First parameter: the first data type of confluence Second parameter: the second data type of confluence Third parameter: return type of confluence */ DataStream<Object> resultDataStream = connectedStreams.map(new CoMapFunction<SensorsData, SensorsData, Object>() { public Object map1(SensorsData value) throws Exception { return value; } public Object map2(SensorsData value) throws Exception { return value; } }); env.execute(); }
Union
- DataStream → DataStream: union two or more datastreams to generate a new DataStream containing all DataStream elements
- The type of connection flow is the same
4. Window
-
TimeWindow: generates a Window by time
-
CountWindow: generates a Window according to the specified number of data pieces, regardless of time
TimeWindow
// The window size is 10s, sliding for 5s each time DataStream<Tuple2<String, Integer>> resultDataStream = stringDataSource .flatMap(new MyFlatMapFunction()) .keyBy(0) .timeWindow(Time.seconds(10), Time.seconds(5)) .sum(1);
CountWindow
// The window size is 10, sliding 5 at a time DataStream<Tuple2<String, Integer>> resultDataStream = stringDataSource .flatMap(new MyFlatMapFunction()) .keyBy(0) .countWindow(10, 5) .sum(1);
5. Temporal semantics and Watermark
Temporal semantics
- Event Time: is the time when the event was created
- Ingestion Time: the time when data enters Flink
- Processing Time: refers to the local system time of each operator executing time-based operation
Introduction of temporal semantics
// EventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // IngestionTime env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); // ProcessingTime env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Watermark (used with Window)
- Watermark is a mechanism to measure the progress of Event Time
- Watermark is used to handle out of order events, and the correct handling of out of order events is usually realized by watermark mechanism combined with window
- Watermark can be understood as a delay trigger mechanism. We can set the delay time t of watermark. Each time, the system will verify the maximum maxEventTime of the arrived data, and then determine that all data whose eventTime is less than maxEventTime - t have arrived. If the stop time of a window is equal to maxEventTime – t, the window will be triggered for execution
Out of order time reference
-
Interface: AssignerWithPeriodicWatermarks
-
Set time semantics before use
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Set time semantics env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //Generate watermar periodically. The default cycle is 200 milliseconds env.getConfig().setAutoWatermarkInterval(5000); String filePath = "E:\\~fzk\\java\\IDEA\\bigdata\\FlinkStudyDemo\\test\\test1"; DataStream<String> inputDataStream = env.readTextFile(filePath); DataStream<SensorsData> map = inputDataStream.map(new MapFunction<String, SensorsData>() { public SensorsData map(String value) throws Exception { String[] splits = value.split(" "); return new SensorsData(splits[0], new Long(splits[1]), new Double(splits[2])); } }); //watermark in the case of out of order time //Time.milliseconds(1000): delay time, 1000ms DataStream<SensorsData> eventTimeDataStream = map.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorsData>(Time.milliseconds(1000)) { @Override public long extractTimestamp(SensorsData element) { return element.getTimestamp(); } }); env.execute(); } // class public class SensorsData { private String id; private Long timestamp; private double wendu; }
Reference to sequential time
-
Interface: AssignerWithPunctuatedWatermarks
-
Set time semantics before use
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Set time semantics env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); String filePath = "E:\\~fzk\\java\\IDEA\\bigdata\\FlinkStudyDemo\\test\\test1"; DataStream<String> inputDataStream = env.readTextFile(filePath); DataStream<SensorsData> map = inputDataStream.map(new MapFunction<String, SensorsData>() { public SensorsData map(String value) throws Exception { String[] splits = value.split(" "); return new SensorsData(splits[0], new Long(splits[1]), new Double(splits[2])); } }); //watermark in sequential time DataStream<SensorsData> eventTimeDataStream = map.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorsData>() { @Override public long extractAscendingTimestamp(SensorsData element) { return element.getTimestamp(); } }); env.execute(); } // class public class SensorsData { private String id; private Long timestamp; private double wendu; }
6. Status management
keyed state
- The keying status is based on the key defined in the input data stream To maintain and access. Flink maintains a state instance for each key value, and partitions all data with the same key into the same operator task. This task will maintain and process the state corresponding to this key. When the task processes a piece of data, it will automatically limit the access range of the state to the key of the current data. Therefore, all numbers with the same key Data will access the same state. Keyed State is very similar to a distributed key value map data structure and can only be used for KeyedStream (after keyBy operator processing)
- Store a status value
Keyed State supports data types
- Valuestate < T > saves a single value of type T
- get operation: ValueState.value()
- set operation: ValueState.update(T value)
- Liststate < T > saves a list. The data type of the elements in the list is t
- ListState.add(T value)
- ListState.addAll(List<T> values)
- ListState.get() returns iteratable < T >
- ListState.update(List<T> values)
- Mapstate < K, V > save key value pair
- MapState.get(UK key)
- MapState.put(UK key, UV value)
- MapState.contains(UK key)
- MapState.remove(UK key)
Example: ValueState
-
We can use Keyed State to realize such a requirement: detect the temperature value of the sensor. If the temperature difference between two consecutive temperatures exceeds 10 degrees, an alarm will be output
public class Test { public static void main(String[] args) throws Exception { //Create execution phase StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<String> inputDataStream = env.socketTextStream("localhost", 9999); SingleOutputStreamOperator<MyBean> myBeanDataStream = inputDataStream.map(new MapFunction<String, MyBean>() { @Override public MyBean map(String s) throws Exception { String[] split = s.split(" "); return new MyBean(split[0], Double.valueOf(split[1])); } }); SingleOutputStreamOperator<Tuple3<String, Double, Double>> resultDataStream = myBeanDataStream .keyBy((KeySelector<MyBean, String>) data -> data.getId()) .flatMap(new MyRichFlatMapFunction(10.0)); resultDataStream.print(); env.execute(); } } // Rich methods: storing status values public class MyRichFlatMapFunction extends RichFlatMapFunction<MyBean, Tuple3<String, Double, Double>> { private ValueState<Double> myValueState; private Double abs; public MyRichFlatMapFunction(Double abs) { this.abs = abs; } @Override public void open(Configuration parameters) throws Exception { // Create status value myValueState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("my-flatmap", Double.class)); } @Override public void flatMap(MyBean myBean, Collector<Tuple3<String, Double, Double>> collector) throws Exception { // Get status value Double lastWendu = myValueState.value(); if(lastWendu != null){ double absWebdu = Math.abs(myBean.getWendu() - lastWendu); if (absWebdu > abs){ collector.collect(new Tuple3<>(myBean.getId(), lastWendu, myBean.getWendu())); } } // Modify status value myValueState.update(myBean.getWendu()); } @Override public void close() throws Exception { // Clear status value myValueState.clear(); } } // Entity class public class MyBean { private String id; private Double wendu; }
7. ProcessFunction API
- The DataStream API provides a series of low level conversion operators. You can access timestamps, watermark s, and register scheduled events. You can also output specific events, such as timeout events
- Flink provides eight process functions
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessJoinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
KeyedProcessFunction
-
KeyedProcessFunction is used to operate KeyedStream. KeyedProcessFunction will process each element of the stream and output 0, 1 or more elements. All process functions inherit from the RichFunction interface, so they have methods such as open(), close(), and getRuntimeContext().
-
Keyedprocessfunction < K, I, O > also provides two additional methods
- processElement
- Each element in the flow will call this method, and the call result will be output in the Collector data type. Context can access the timestamp of the element, the key of the element, and the TimerService time service. Context can also output the results to other streams (side outputs)
- onTimer
- Callback function. Called when a previously registered timer is triggered. The parameter timestamp is the trigger timestamp set by the timer. Collector is a collection of output results. OnTimerContext, like the Context parameter of processElement, provides some information about the Context, such as the time information triggered by the timer (event time or processing time)
class MyKeyedProcessFunction extends KeyedProcessFunction<Tuple, MyBean, MyBean> { @Override public void open(Configuration parameters) throws Exception { } @Override public void processElement(MyBean myBean, KeyedProcessFunction<Tuple, MyBean, MyBean>.Context context, Collector<MyBean> collector) throws Exception { collector.collect(myBean); } @Override public void onTimer(long timestamp, KeyedProcessFunction<Tuple, MyBean, MyBean>.OnTimerContext ctx, Collector<MyBean> out) throws Exception { } @Override public void close() throws Exception { } }
- processElement
TimerService and Timers
-
The TimerService objects held by Context and OnTimerContext have the following methods
- long currentProcessingTime(): returns the current processing time
- long currentWatermark(): returns the timestamp of the current watermark
- Void registerprocessingtimer (long timestamp): registers the timer of the processing time of the current key. When the processing time reaches the timing time, the timer is triggered.
- Void registereventtimer (long timestamp): registers the event time timer of the current key. When the water level is greater than or equal to the time registered by the timer, the timer is triggered to execute the callback function.
- Void deleteprocessingtimer (long timestamp): deletes the previously registered processing time timer. If there is no timer with this timestamp, it will not be executed
- Void deleteeventtimer (long timestamp): deletes the previously registered event timer. If there is no timer with this timestamp, it will not be executed
class MyKeyedProcessFunction extends KeyedProcessFunction<Tuple, MyBean, MyBean> { @Override public void processElement(MyBean myBean, KeyedProcessFunction<Tuple, MyBean, MyBean>.Context context, Collector<MyBean> collector) throws Exception { long currentProcessingTime = context.timerService().currentProcessingTime(); long currentWatermark = context.timerService().currentWatermark(); context.timerService().registerProcessingTimeTimer(10000l); context.timerService().registerEventTimeTimer(10000l); context.timerService().deleteProcessingTimeTimer(10000l); context.timerService().deleteEventTimeTimer(10000l); } }
Side output stream (SideOutput)
-
The side outputs function of process function can generate multiple streams, and the data types of these streams can be different. A side output can be defined as an OutputTag[X] object. X is the data type of the output stream. Process function can emit an event to one or more side outputs through the Context object
-
Example: monitor the temperature value of the sensor and output the data with the temperature value lower than 30 degrees to the side output
public class Test { public static void main(String[] args) throws Exception { //Create execution phase StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<String> inputDataStream = env.socketTextStream("localhost", 9999); SingleOutputStreamOperator<Tuple2<String, Double>> myBeanDataStream = inputDataStream.map(new MapFunction<String, Tuple2<String, Double>>() { @Override public Tuple2<String, Double> map(String s) throws Exception { String[] split = s.split(" "); return new Tuple2<String, Double>(split[0], Double.valueOf(split[1])); } }); //Define side output stream OutputTag<Tuple2<String, Double>> outputTag = new OutputTag<Tuple2<String, Double>>("high-output") {}; // Use custom operator: ProcessFunction SingleOutputStreamOperator<Tuple2<String, Double>> resultDataStream = myBeanDataStream .process(new MyProcessFunction(30.0, outputTag)); resultDataStream.print("low-wendu"); // Get side output stream and output resultDataStream.getSideOutput(outputTag).print("high-wendu"); env.execute(); } private static class MyProcessFunction extends ProcessFunction<Tuple2<String, Double>, Tuple2<String, Double>> { private Double wenduLimit; private OutputTag<Tuple2<String, Double>> outputTag; // initialization public MyProcessFunction(Double wenduLimit, OutputTag<Tuple2<String, Double>> outputTag) { this.wenduLimit = wenduLimit; this.outputTag = outputTag; } @Override public void processElement(Tuple2<String, Double> myBean, ProcessFunction<Tuple2<String, Double>, Tuple2<String, Double>>.Context context, Collector<Tuple2<String, Double>> collector) throws Exception { // If the temperature is higher than the limit temperature, the data will be added to the side output stream, otherwise it will be output normally if(myBean.f1 > wenduLimit){ context.output(outputTag, myBean); }else { collector.collect(myBean); } } } }
8. CheckPoint
- Flink periodically saves consistent checkpoints of state during the execution of streaming applications
- If a failure occurs, Flink will use the most recent checkpoint to consistently restore the state of the application and restart the process
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // Checkpoint data storage location // RocksDB env.setStateBackend(new RocksDBStateBackend(new URI("hdfs:///tmp/check-point-test"))); // file //env.setStateBackend(new FsStateBackend("file:///tmp/check-point-test")); // Memory //env.setStateBackend(new MemoryStateBackend()); // Checkpoint configuration // Set checkpoint cycle time env.enableCheckpointing(10000l); // Set exactly once mode env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // Sets the maximum time that a checkpoint may take before it is discarded env.getCheckpointConfig().setCheckpointTimeout(60000); // Sets the minimum pause between checkpoint attempts env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // Set the maximum number of checkpoint attempts that may occur simultaneously env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // Enables checkpoints to persist externally when owned jobs fail or pause env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // Restart strategy env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 60000)); }
9. Status consistency
classification
- AT-MOST-ONCE (at most once)
- When a task fails, the simplest way is to do nothing, neither recover the lost state nor replay the lost data. At most one event can be processed
- AT-LEAST-ONCE
- In most real application scenarios, we want to not lose events. All events are processed, and some events may be processed multiple times
- Exactly once
- Processing semantics exactly once not only means that no events are lost, but also means that the internal state is updated only once for each data
End to end exactly once
- Internal assurance: checkpoint
- source: resets the reading position of the data
- sink: when recovering from a failure, the data will not be repeatedly written to the external system
- Idempotent write
- Transaction write
Transaction write
- The constructed transaction corresponds to the checkpoint. When the checkpoint is really completed, all the corresponding results are written to the sink system
- Implementation mode
- Pre write log
- Two stage submission
Pre write log (not commonly used)
- Save the result data as a state first, and then write it to the sink system at one time when receiving the notification of checkpoint completion
- It is simple and easy to implement. Since the data is cached in the state backend in advance, any sink system can be done in batch in this way
- The DataStream API provides a template class: GenericWriteAheadSink to implement this transactional sink
Two stage submission
- For each checkpoint, the sink task will start a transaction and add all the received data to the transaction
- The data is then written to the external sink system without committing them -- this is just a "pre commit."
- When it receives the notification of checkpoint completion, it formally commits the transaction to realize the real writing of the result
Flink+Kafka end-to-end state consistency assurance
- inside
- The c heckpoint mechanism saves the status to disk and can be recovered in case of failure to ensure the consistency of internal status
- source
- As a source, kafka consumer can save the offset. If subsequent tasks fail, the connector can reset the offset and consume data again to ensure consistency
- sink
- kafka producer, as a sink, submits the sink in two stages, and needs to implement a TwoPhaseCommitSinkFunction
Maven(pom.xml)
<dependencies> <!-- <dependency>--> <!-- <groupId>org.apache.flink</groupId>--> <!-- <artifactId>flink-java</artifactId>--> <!-- <version>${flink.version}</version>--> <!-- </dependency>--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <!-- fix start--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- fix end --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>kafka-clients</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <!-- <scope>provided</scope>--> <!-- local test--> <!-- <scope>compile</scope>--> <exclusions> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>commons-collections</artifactId> <groupId>commons-collections</groupId> </exclusion> </exclusions> <version>${flink.version}</version> </dependency> <!--Add the following two dependencies to appear Flink Log out--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.13</version> </dependency> <!--Toolkit dependency--> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.4</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> <exclusions> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.4</version> </dependency> <dependency> <groupId>com.jayway.jsonpath</groupId> <artifactId>json-path</artifactId> <version>2.4.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.9.9</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!--cglib--> <dependency> <groupId>asm</groupId> <artifactId>asm</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>asm</groupId> <artifactId>asm-commons</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>asm</groupId> <artifactId>asm-util</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>cglib</groupId> <artifactId>cglib-nodep</artifactId> <version>2.2.2</version> </dependency> <!--state backend--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId> <scope>compile</scope> <exclusions> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.beust</groupId> <artifactId>jcommander</artifactId> <version>1.72</version> </dependency> <!-- Alibaba connection pool --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.21</version> </dependency> <dependency> <groupId>commons-dbutils</groupId> <artifactId>commons-dbutils</artifactId> <version>1.7</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.72</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> <scope>compile</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> <version>4.1</version> </dependency> <!-- https://mvnrepository.com/artifact/com.github.oshi/oshi-core --> <dependency> <groupId>com.github.oshi</groupId> <artifactId>oshi-core</artifactId> <version>3.5.0</version> </dependency> </dependencies>