Basic steps of Flink programming:
1. Create the stream execution environment streamexecutionenvironment Getexecutionenvironment() gets the stream environment.
2. Load data Source
3. Transformation
4. Output Sink, land it in other data warehouses and print it directly
Basic operation of Flink data -- four categories
-
Operation of a single piece of data map filter
-
Operation window of multiple data
-
Merge multiple streams into one stream operation connect union join
-
Split a stream into multiple stream operations (split expires), and measure the output of the output stream (OutputTag)
Flink input data source
Built in predefined Source
-
Based on local collection Source
-
Application scenario: after the program is written, test whether the current function is available for development and testing.
-
classification
-
From element fromElements
-
From collection
-
generateSequence based on Sequence
-
DataStream based on start and end, fromSequence
-
-
-
Parallelism setting
-
Parallelism setting mode
1. Set the configuration file flink-conf.yaml parallelism default: 1 2. Submit the task on the client side, and set the parallelism flink run -p 1 3. Set the global parallelism env in the program setParallelism(1) 4. Operator level parallelism setting operator setParallelism(1) Priority: Operator priority > Global parallelism in program > parallelism of client submitted job > parallelism in configuration file
-
Global parallelism setting
env.setParallelism(1);
-
Operator parallelism setting
source.print().setParallelism(2);
-
-
File based Source
-
Read the file in batch mode, only once -- readTextFile
-
Read the file by stream, and monitor the file in real time according to the specified cycle - readFile
There are two monitoring methods, watchType
-
FileProcessingMode.PROCESS_CONTINUOUSLY is mainly used to modify and delete many scenes, and continuously read the whole file according to the cycle (destroy the semantic feature only once)
-
FileProcessingMode.Process_Once is mainly used to read only once and exit after reading
-
-
-
Socket based Source
env.socketTextStream("node1", 9999);
Custom Source
-
It is mainly used to specify the data source of specific format and generate bounded or unbounded data stream
-
Bounded data stream - read mysql and read files
-
Unbounded data flow - for loop generates data
-
-
Requirements - generate data sources in a user-defined way.
-
Demand case
-
The custom implementation SourceFunction interface case only needs to override the run method and cancel method
-
package cn.itcast.flink.api; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.text.SimpleDateFormat; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/11/29 14:56 * Desc - An order information (order ID, user ID, order amount, time stamp) is randomly generated every 1 second * public class Order * String oid; * int uid; * double money; * long timestamp; * String datetime; * One piece of data is generated every second * Print out each piece of data * Execution flow environment */ public class OrderSource { public static void main(String[] args) throws Exception { //1. Create a flow execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2. Set parallelism env.setParallelism(1); //3. Get custom data source //Implementation mode DataStreamSource<Order> source = env.addSource(new OrderEmitSource()); //4. Printout source.printToErr(); //5. Execution flow environment env.execute(); } public static class OrderEmitSource implements SourceFunction<Order> { //Defines a tag that identifies the currently continuously generated data private volatile boolean isRunning = true; /** * Realize the generation of data and output the generated data to ctx * * @param ctx * @throws Exception */ @Override public void run(SourceContext<Order> ctx) throws Exception { //Define random number Random rm = new Random(); //Time conversion format tool SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //Dead loop, always generating data while (isRunning) { //random number String oid = UUID.randomUUID().toString(); //User id, random value between 0 and 5 int uid = rm.nextInt(6); //Money between 0 and 100 double money = rm.nextDouble()*100; //time stamp long timestamp = System.currentTimeMillis(); //current time String datetime = sdf.format(timestamp); Order order = new Order( oid, uid, money, timestamp, datetime ); //collecting data ctx.collect(order); //The program sleeps for one second and then executes TimeUnit.SECONDS.sleep(1); } } /** * When the user cancels the generation, the generation is cancelled */ @Override public void cancel() { isRunning = false; } } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String oid; private int uid; private double money; private long timestamp; private String datetime; } }
Case of implementing ParallelSourceFunction interface
Parallelization generates data, and the parallelism degree setParallelism(n) is set on the operator
package cn.itcast.flink.api; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.text.SimpleDateFormat; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/11/29 14:56 * Desc - An order information (order ID, user ID, order amount, time stamp) is randomly generated every 1 second * public class Order * String oid; * int uid; * double money; * long timestamp; * String datetime; * One piece of data is generated every second * Print out each piece of data * Execution flow environment */ public class OrderParallelismSource { public static void main(String[] args) throws Exception { //1. Create a flow execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2. Set parallelism env.setParallelism(1); //3. Get custom data source //Implementation mode DataStreamSource<Order> source = env.addSource(new OrderEmitSource()).setParallelism(6); //4. Printout source.printToErr(); //5. Execution flow environment env.execute(); } public static class OrderEmitSource implements ParallelSourceFunction<Order> { //Defines a tag that identifies the currently continuously generated data private volatile boolean isRunning = true; /** * Realize the generation of data and output the generated data to ctx * * @param ctx * @throws Exception */ @Override public void run(SourceContext<Order> ctx) throws Exception { //Define random number Random rm = new Random(); //Time conversion format tool SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //Dead loop, always generating data while (isRunning) { //random number String oid = UUID.randomUUID().toString(); //User id, random value between 0 and 5 int uid = rm.nextInt(6); //Money between 0 and 100 double money = rm.nextDouble()*100; //time stamp long timestamp = System.currentTimeMillis(); //current time String datetime = sdf.format(timestamp); Order order = new Order( oid, uid, money, timestamp, datetime ); //collecting data ctx.collect(order); //The program sleeps for one second and then executes TimeUnit.SECONDS.sleep(5); } } /** * When the user cancels the generation, the generation is cancelled */ @Override public void cancel() { isRunning = false; } } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String oid; private int uid; private double money; private long timestamp; private String datetime; } }
Case of implementing RichParallelSourceFunction
-
Rich is a rich function, which inherits abstractrichfuncton and implements
-
open and close methods of life cycle
-
The open method is used to implement the currently generated initialization condition
-
The close method is used to generate the closing work of the data end
-
getRuntimeContext method is used to obtain the context object (parameter, environment variable, state, accumulator, etc.) of the current program
-
-
Case - reading data from database
-
1: Initialization - creating databases and data tables
# Create database create database test; # Use database use test; # Create tables and import data DROP TABLE IF EXISTS `user`; CREATE TABLE `user` ( `id` int(11) NOT NULL, `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; -- ---------------------------- -- Records of user -- ---------------------------- INSERT INTO `user` VALUES (10, 'dazhuang', '123456', 'Big and strong'); INSERT INTO `user` VALUES (11, 'erya', '123456', 'Erya'); INSERT INTO `user` VALUES (12, 'sanpang', '123456', 'Three fat'); SET FOREIGN_KEY_CHECKS = 1;
2:Flink reads MySQL data source
package cn.itcast.flink.source; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2022/1/11 16:17 * Desc Read mysql data table and print out * Development steps: * 1.Create and prepare database and data table flink * 2.Get stream execution environment * 3.Set parallelism * 4.Add custom data source, read data from mysql, implement RichSourceFunction, rich enhance rich function, open close getRuntimeContext * 4.1. open Initialize actions, create connections, create statement s, and get variables * 4.2. run Method reads the data in the data table and encapsulates it into an object * 4.3. close Method to close the statement and connection * 5. Print result output * 6. Execution flow environment */ public class UserSource { public static void main(String[] args) throws Exception { //2. Get the flow execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //3. Set parallelism env.setParallelism(1); //4. Add custom data source, read data from mysql, implement RichSourceFunction, rich enhance rich function, open close getRuntimeContext DataStreamSource<User> source = env.addSource(new RichSourceFunction<User>() { Connection conn = null; Statement statement = null; //sign boolean isRunning = true; /** * The first thing to do when executing source is to initialize * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { //1. Set driver Class.forName("com.mysql.jdbc.Driver"); //2. Get the connection setting url user name and password conn = DriverManager.getConnection( "jdbc:mysql://node1:3306/flink?useSSL=false", "root", "123456" ); //3. Create statement based on sql statement = conn.createStatement(); } /** * All elements are executed here * @param ctx * @throws Exception */ @Override public void run(SourceContext<User> ctx) throws Exception { String sql = "select id,username,password,name from user"; while (isRunning) { //1. Read the data statement ExecuteQuery gets the ResultSet result set ResultSet rs = statement.executeQuery(sql); //2. Check whether the ResultSet has data hasNext() = true while (rs.next()) { User user = new User(); //3. Assign each data to User int id = rs.getInt("id"); String username = rs.getString("username"); String password = rs.getString("password"); String name = rs.getString("name"); user.setId(id); user.setUsername(username); user.setPassword(password); user.setName(name); //4. Collect User CTX collect(User) ctx.collect(user); } TimeUnit.MINUTES.sleep(5); } } @Override public void cancel() { //Set flag to false isRunning = false; } /** * All elements have completed the closing work * @throws Exception */ @Override public void close() throws Exception { //Close statement if (!statement.isClosed()) { statement.close(); } //Close connection if (!conn.isClosed()) { conn.close(); } } }); //4.1. open initialization action, create connection, create statement and obtain variables //4.2. The run method reads the data in the data table and encapsulates it into an object //4.3. The close method closes the statement and the connection //5. Print result output source.printToErr(); //6. Execution flow environment env.execute(); } public static class User { // id private int id; // username private String username; // password private String password; // name private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "User{" + "id=" + id + ", username='" + username + '\'' + ", password='" + password + '\'' + ", name='" + name + '\'' + '}'; } } }