Basic steps of Flink programming and loading different types of data sources

Basic steps of Flink programming:

1. Create the stream execution environment streamexecutionenvironment Getexecutionenvironment() gets the stream environment.

2. Load data Source

3. Transformation

4. Output Sink, land it in other data warehouses and print it directly

Basic operation of Flink data -- four categories

  1. Operation of a single piece of data map filter

  2. Operation window of multiple data

  3. Merge multiple streams into one stream operation connect union join

  4. Split a stream into multiple stream operations (split expires), and measure the output of the output stream (OutputTag)

Flink input data source

Built in predefined Source

  • Based on local collection Source

    • Application scenario: after the program is written, test whether the current function is available for development and testing.

    • classification

      1. From element fromElements

      2. From collection

      3. generateSequence based on Sequence

      4. DataStream based on start and end, fromSequence

  • Parallelism setting

    • Parallelism setting mode

      1. Set the configuration file flink-conf.yaml parallelism default: 1
       2. Submit the task on the client side, and set the parallelism flink run -p 1
       3. Set the global parallelism env in the program setParallelism(1)
      4. Operator level parallelism setting operator setParallelism(1)
      Priority:
      Operator priority > Global parallelism in program > parallelism of client submitted job > parallelism in configuration file
    • Global parallelism setting

      env.setParallelism(1);
    • Operator parallelism setting

      source.print().setParallelism(2);
  • File based Source

    • Read the file in batch mode, only once -- readTextFile

    • Read the file by stream, and monitor the file in real time according to the specified cycle - readFile

      There are two monitoring methods, watchType

      1. FileProcessingMode.PROCESS_CONTINUOUSLY is mainly used to modify and delete many scenes, and continuously read the whole file according to the cycle (destroy the semantic feature only once)

      2. FileProcessingMode.Process_Once is mainly used to read only once and exit after reading

  • Socket based Source

    env.socketTextStream("node1", 9999);

Custom Source

  • It is mainly used to specify the data source of specific format and generate bounded or unbounded data stream

    1. Bounded data stream - read mysql and read files

    2. Unbounded data flow - for loop generates data

  • Requirements - generate data sources in a user-defined way.

  • Demand case

    • The custom implementation SourceFunction interface case only needs to override the run method and cancel method

package cn.itcast.flink.api;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.text.SimpleDateFormat;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * Author itcast
 * Date 2021/11/29 14:56
 * Desc - An order information (order ID, user ID, order amount, time stamp) is randomly generated every 1 second
 * public class Order
 * String oid;
 * int uid;
 * double money;
 * long timestamp;
 * String datetime;
 * One piece of data is generated every second
 * Print out each piece of data
 * Execution flow environment
 */
public class OrderSource {
    public static void main(String[] args) throws Exception {
        //1. Create a flow execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2. Set parallelism
        env.setParallelism(1);
        //3. Get custom data source
        //Implementation mode
        DataStreamSource<Order> source = env.addSource(new OrderEmitSource());
        //4. Printout
        source.printToErr();
        //5. Execution flow environment
        env.execute();
    }

    public static class OrderEmitSource implements SourceFunction<Order> {
        //Defines a tag that identifies the currently continuously generated data
        private volatile boolean isRunning = true;

        /**
         * Realize the generation of data and output the generated data to ctx
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<Order> ctx) throws Exception {
            //Define random number
            Random rm = new Random();
            //Time conversion format tool
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //Dead loop, always generating data
            while (isRunning) {
                //random number
                String oid = UUID.randomUUID().toString();
                //User id, random value between 0 and 5
                int uid = rm.nextInt(6);
                //Money between 0 and 100
                double money = rm.nextDouble()*100;
                //time stamp
                long timestamp = System.currentTimeMillis();
                //current time 
                String datetime = sdf.format(timestamp);
                Order order = new Order(
                        oid,
                        uid,
                        money,
                        timestamp,
                        datetime
                );
                //collecting data 
                ctx.collect(order);
                //The program sleeps for one second and then executes
                TimeUnit.SECONDS.sleep(1);
            }
        }

        /**
         * When the user cancels the generation, the generation is cancelled
         */
        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String oid;
        private int uid;
        private double money;
        private long timestamp;
        private String datetime;
    }
}

Case of implementing ParallelSourceFunction interface

Parallelization generates data, and the parallelism degree setParallelism(n) is set on the operator

package cn.itcast.flink.api;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.text.SimpleDateFormat;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * Author itcast
 * Date 2021/11/29 14:56
 * Desc - An order information (order ID, user ID, order amount, time stamp) is randomly generated every 1 second
 * public class Order
 * String oid;
 * int uid;
 * double money;
 * long timestamp;
 * String datetime;
 * One piece of data is generated every second
 * Print out each piece of data
 * Execution flow environment
 */
public class OrderParallelismSource {
    public static void main(String[] args) throws Exception {
        //1. Create a flow execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2. Set parallelism
        env.setParallelism(1);
        //3. Get custom data source
        //Implementation mode
        DataStreamSource<Order> source = env.addSource(new OrderEmitSource()).setParallelism(6);
        //4. Printout 
        source.printToErr();
        //5. Execution flow environment
        env.execute();
    }

    public static class OrderEmitSource implements ParallelSourceFunction<Order> {
        //Defines a tag that identifies the currently continuously generated data
        private volatile boolean isRunning = true;

        /**
         * Realize the generation of data and output the generated data to ctx
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<Order> ctx) throws Exception {
            //Define random number
            Random rm = new Random();
            //Time conversion format tool
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //Dead loop, always generating data
            while (isRunning) {
                //random number
                String oid = UUID.randomUUID().toString();
                //User id, random value between 0 and 5
                int uid = rm.nextInt(6);
                //Money between 0 and 100
                double money = rm.nextDouble()*100;
                //time stamp
                long timestamp = System.currentTimeMillis();
                //current time 
                String datetime = sdf.format(timestamp);
                Order order = new Order(
                        oid,
                        uid,
                        money,
                        timestamp,
                        datetime
                );
                //collecting data 
                ctx.collect(order);
                //The program sleeps for one second and then executes
                TimeUnit.SECONDS.sleep(5);
            }
        }

        /**
         * When the user cancels the generation, the generation is cancelled
         */
        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String oid;
        private int uid;
        private double money;
        private long timestamp;
        private String datetime;
    }
}

Case of implementing RichParallelSourceFunction

  • Rich is a rich function, which inherits abstractrichfuncton and implements

  • open and close methods of life cycle

    1. The open method is used to implement the currently generated initialization condition

    2. The close method is used to generate the closing work of the data end

    3. getRuntimeContext method is used to obtain the context object (parameter, environment variable, state, accumulator, etc.) of the current program

  • Case - reading data from database

  • 1: Initialization - creating databases and data tables

# Create database
create database test;
# Use database
use test;
# Create tables and import data
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user`  (
  `id` int(11) NOT NULL,
  `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (10, 'dazhuang', '123456', 'Big and strong');
INSERT INTO `user` VALUES (11, 'erya', '123456', 'Erya');
INSERT INTO `user` VALUES (12, 'sanpang', '123456', 'Three fat');

SET FOREIGN_KEY_CHECKS = 1;

2:Flink reads MySQL data source

package cn.itcast.flink.source;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;

/**
 * Author itcast
 * Date 2022/1/11 16:17
 * Desc Read mysql data table and print out
 * Development steps:
 * 1.Create and prepare database and data table flink
 * 2.Get stream execution environment
 * 3.Set parallelism
 * 4.Add custom data source, read data from mysql, implement RichSourceFunction, rich enhance rich function, open close getRuntimeContext
 * 4.1. open Initialize actions, create connections, create statement s, and get variables
 * 4.2. run Method reads the data in the data table and encapsulates it into an object
 * 4.3. close Method to close the statement and connection
 * 5. Print result output
 * 6. Execution flow environment
 */
public class UserSource {
    public static void main(String[] args) throws Exception {
        //2. Get the flow execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //3. Set parallelism
        env.setParallelism(1);
        //4. Add custom data source, read data from mysql, implement RichSourceFunction, rich enhance rich function, open close getRuntimeContext
        DataStreamSource<User> source = env.addSource(new RichSourceFunction<User>() {
            Connection conn = null;
            Statement statement = null;
            //sign
            boolean isRunning = true;

            /**
             * The first thing to do when executing source is to initialize
             * @param parameters
             * @throws Exception
             */
            @Override
            public void open(Configuration parameters) throws Exception {
                //1. Set driver
                Class.forName("com.mysql.jdbc.Driver");
                //2. Get the connection setting url user name and password
                conn = DriverManager.getConnection(
                        "jdbc:mysql://node1:3306/flink?useSSL=false",
                        "root",
                        "123456"
                );
                //3. Create statement based on sql
                statement = conn.createStatement();
            }

            /**
             * All elements are executed here
             * @param ctx
             * @throws Exception
             */
            @Override
            public void run(SourceContext<User> ctx) throws Exception {
                String sql = "select id,username,password,name from user";
                while (isRunning) {
                    //1. Read the data statement ExecuteQuery gets the ResultSet result set
                    ResultSet rs = statement.executeQuery(sql);
                    //2. Check whether the ResultSet has data hasNext() = true
                    while (rs.next()) {
                        User user = new User();
                        //3. Assign each data to User
                        int id = rs.getInt("id");
                        String username = rs.getString("username");
                        String password = rs.getString("password");
                        String name = rs.getString("name");

                        user.setId(id);
                        user.setUsername(username);
                        user.setPassword(password);
                        user.setName(name);
                        //4. Collect User CTX collect(User)
                        ctx.collect(user);
                    }
                    TimeUnit.MINUTES.sleep(5);
                }
            }

            @Override
            public void cancel() {
                //Set flag to false
                isRunning = false;
            }

            /**
             * All elements have completed the closing work
             * @throws Exception
             */
            @Override
            public void close() throws Exception {
                //Close statement
                if (!statement.isClosed()) {
                    statement.close();
                }
                //Close connection
                if (!conn.isClosed()) {
                    conn.close();
                }
            }
        });
        //4.1. open initialization action, create connection, create statement and obtain variables
        //4.2.  The run method reads the data in the data table and encapsulates it into an object
        //4.3.  The close method closes the statement and the connection
        //5. Print result output
        source.printToErr();
        //6. Execution flow environment
        env.execute();
    }

    public static class User {
        // id
        private int id;
        // username
        private String username;
        // password
        private String password;
        // name
        private String name;

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getUsername() {
            return username;
        }

        public void setUsername(String username) {
            this.username = username;
        }

        public String getPassword() {
            return password;
        }

        public void setPassword(String password) {
            this.password = password;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", username='" + username + '\'' +
                    ", password='" + password + '\'' +
                    ", name='" + name + '\'' +
                    '}';
        }
    }
}

Keywords: Java Big Data Data Warehouse flink

Added by Oxymen on Wed, 26 Jan 2022 23:28:31 +0200