Flink processes business data in the database - associating primary and secondary tables

Article Directory

1. Requirements

1. Requirement Description

  • Order data tables exist in a variety of systems

  • Order form: order master table, order detail table

       Order master table:
      		Order id, order status, total order amount, order time, user ID
    
      Order Detail Table:
      		ID of the main order form, commodity ID, classification ID of the commodity, unit price of the commodity, quantity of the commodity
    

Counting the transaction amount for a commodity classification

  • Order status is transacted (in main table)
  • Classification ID of the commodity (in the breakdown table)
  • Amount of commodity is unit price * quantity (detailed list)

2. Data preparation

  • OrderMain:
  • OrderMain imports data into Kafka:
{"data":[{"oid":"29001","create_time":"2020-03-16 00:38:01","total_money":"2000.0","status":"1",
"update_time":"2020-03-16 00:38:01","uid":"9999","province":"Beijing"}],"database":"doit12",
"es":1584333481000,"id":4,"isDdl":false,"mysqlType":{"oid":"bigint(20)","create_time":"timestamp",
"total_money":"double","status":"int(11)","update_time":"timestamp","uid":"varchar(20)",
"province":"varchar(50)"},"old":null,"pkNames":["oid"],"sql":"","sqlType":{"oid":-5,"create_time":93,
"total_money":8,"status":4,"update_time":93,"uid":12,"province":12},"table":"ordermain",
"ts":1584333481540,"type":"INSERT"}
  • Order Detail:
  • OrderDetail imports data to Kafka:
{"data":[{"id":"10","order_id":"29001","category_id":"2","sku":"20001","money":"1000.0","amount":"1",
"create_time":"2020-03-16 00:38:01","update_time":"2020-03-16 00:38:01"}],"database":"doit12",
"es":1584333481000,"id":4,"isDdl":false,"mysqlType":{"id":"bigint(20)","order_id":"bigint(20)",
"category_id":"int(11)","sku":"varchar(50)","money":"double","amount":"int(11)",
"create_time":"timestamp","update_time":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id
":-5,"order_id":-5,"category_id":4,"sku":12,"money":8,"amount":4,"create_time":93,"update_time":93}
,"table":"orderdetail","ts":1584333481600,"type":"INSERT"}

2. Analysis

To make statistics, you need to pull data from both tables into Flink to make a Join Association

  • Data in business systems is basically stored in relational databases, such as Mysql
  • With canal, you can import business data from MySQL into kafka (canal masquerades as Save from Mysql)
  • Flink pulls business data from kafka via KafkaSource
  • The pulled data is processed, and the main table and the secondary table are join ed.
  • Then do the statistics of the transaction amount

3. Technical Points

  • Use and principle of canal
  • kafka producers, consumers, Topic
  • Flink's EventTime Scrolling Window
  • Flink's two-stream LeftJoin loop
  • Flink's Flow Measurement Output Late Data Loops
  • Mysql Connections and Queries

4. Join Architecture

There are two scenarios without join s:

  • (1) Late arrival of left table data
    ->Use the flow measurement output, pull it out separately, then look up the database of the main table (right table) of the order, and associate the data of the main table (right table)
  • (2) Late data on the right
    ->Query the information of the right table based on the query criteria of the left table, look up the database or api interface
  • Then, write the two data union s together

5. Tool Class and Standard Class

1. FlinkUtilsV2 Get Tool Class for KafkaSource

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;


public class FlinkUtilsV2 {
    private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // Method overload 1: Incoming profile and data type
    public static <T> DataStream<T> createKafkaDataStream(ParameterTool parameters,
                                                          Class<? extends DeserializationSchema<T>> clazz) throws Exception {
        String topics = parameters.getRequired("kafka.topics");
        String groupId = parameters.getRequired("group.id");
        return createKafkaDataStream(parameters, topics, groupId, clazz);
    }


    // Method overload two: incoming profile and data type
    public static <T> DataStream<T> createKafkaDataStream(ParameterTool parameters, String topics,
                                                          Class<? extends DeserializationSchema<T>> clazz) throws Exception {
        String groupId = parameters.getRequired("group.id");
        return createKafkaDataStream(parameters, topics, groupId, clazz);
    }


    // Method overload 1: Incoming profile and data type
    public static <T> DataStream<T> createKafkaDataStream(ParameterTool parameters, String topics, String groupId,
                                                          Class<? extends DeserializationSchema<T>> clazz) throws Exception {

        // Set Profile as Global Profile
        env.getConfig().setGlobalJobParameters(parameters);


	   //  KafkaSink needs to configure this
      //parameters.getProperties().setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "");

        //Open checkpoint
        env.enableCheckpointing(parameters.getLong("checkpoint.interval", 10000L),
                CheckpointingMode.EXACTLY_ONCE);

        //Set restart policy
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                parameters.getInt("restart.times", 10), Time.seconds(3)));

        //Set statebackend
        String path = parameters.get("state.backend.path");
        if (path != null) {
            //The best way to configure setStateBackend to flink-conf.yaml in Flink's global configuration file
            env.setStateBackend(new FsStateBackend(path));
        }

        //Setting cancel task does not automatically delete checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
                

        //Set maximum degree of parallelism
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(4);

        //When multiple topic s occur, place in the list collection
        List<String> topicList = Arrays.asList(topics.split(","));


        Properties properties = parameters.getProperties();

        properties.setProperty("group.id", groupId);

        //Create FlinkKafkaConsumer
        FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<T>(
                topicList,
                clazz.newInstance(),
                properties
        );

        //Save the Kafka offset to Kafka's special Topic at Checkpoint, defaulting to true
        kafkaConsumer.setCommitOffsetsOnCheckpoints(false);

        // Return kafkaDataStream (lines)
        return env.addSource(kafkaConsumer);
    }

    public static StreamExecutionEnvironment getEnv() {
        return env;
    }
}

2. OrderMainBean Order Master Table Class

import java.util.Date;

public class OrderMainBean {
    private Long oid;
    private Date create_time;
    private Double total_money;
    private int status;
    private Date update_time;
    private String province;
    private String uid;
    //Types of operations on databases: INSERT, UPDATE
    private String type;

    public OrderMainBean() {
    }

    public OrderMainBean(Long oid, Date create_time, Double total_money, int status, Date update_time,
                         String province, String uid, String type) {
        this.oid = oid;
        this.create_time = create_time;
        this.total_money = total_money;
        this.status = status;
        this.update_time = update_time;
        this.province = province;
        this.uid = uid;
        this.type = type;
    }

    @Override
    public String toString() {
        return "OrderMainBean{" +
                "oid=" + oid +
                ", create_time=" + create_time +
                ", total_money=" + total_money +
                ", status=" + status +
                ", update_time=" + update_time +
                ", province='" + province + '\'' +
                ", uid='" + uid + '\'' +
                ", type='" + type + '\'' +
                '}';
    }
    public Long getOid() {
        return oid;
    }
    public void setOid(Long oid) {
        this.oid = oid;
    }
    public Date getCreate_time() {
        return create_time;
    }
    public void setCreate_time(Date create_time) {
        this.create_time = create_time;
    }
    public Double getTotal_money() {
        return total_money;
    }
    public void setTotal_money(Double total_money) {
        this.total_money = total_money;
    }
    public int getStatus() {
        return status;
    }
    public void setStatus(int status) {
        this.status = status;
    }
    public Date getUpdate_time() {
        return update_time;
    }
    public void setUpdate_time(Date update_time) {
        this.update_time = update_time;
    }
    public String getProvince() {
        return province;
    }
    public void setProvince(String province) {
        this.province = province;
    }
    public String getUid() {
        return uid;
    }
    public void setUid(String uid) {
        this.uid = uid;
    }
    public String getType() {
        return type;
    }
    public void setType(String type) {
        this.type = type;
    }
}

3. OrderDetailBean Order Detail Table Class

import java.util.Date;

public class OrderDetailBean {

    private Long id;
    private Long order_id;
    private int category_id;
    private Long sku;
    private Double money;
    private int amount;
    private Date create_time;
    private Date update_time;

    //Types of operations on databases: INSERT, UPDATE
    private String type;

    public OrderDetailBean() {
    }

    public OrderDetailBean(Long id, Long order_id, int category_id, Long sku, Double money, int amount,
                           Date create_time, Date update_time, String type) {
        this.id = id;
        this.order_id = order_id;
        this.category_id = category_id;
        this.sku = sku;
        this.money = money;
        this.amount = amount;
        this.create_time = create_time;
        this.update_time = update_time;
        this.type = type;
    }

    @Override
    public String toString() {
        return "OrderDetailBean{" +
                "id=" + id +
                ", order_id=" + order_id +
                ", category_id=" + category_id +
                ", sku=" + sku +
                ", money=" + money +
                ", amount=" + amount +
                ", create_time=" + create_time +
                ", update_time=" + update_time +
                ", type='" + type + '\'' +
                '}';
    }

    public Long getId() {
        return id;
    }
    public void setId(Long id) {
        this.id = id;
    }
    public Long getOrder_id() {
        return order_id;
    }
    public void setOrder_id(Long order_id) {
        this.order_id = order_id;
    }
    public int getCategory_id() {
        return category_id;
    }
    public void setCategory_id(int category_id) {
        this.category_id = category_id;
    }
    public Long getSku() {
        return sku;
    }
    public void setSku(Long sku) {
        this.sku = sku;
    }
    public Double getMoney() {
        return money;
    }
    public void setMoney(Double money) {
        this.money = money;
    }
    public int getAmount() {
        return amount;
    }
    public void setAmount(int amount) {
        this.amount = amount;
    }
    public Date getCreate_time() {
        return create_time;
    }
    public void setCreate_time(Date create_time) {
        this.create_time = create_time;
    }
    public Date getUpdate_time() {
        return update_time;
    }
    public void setUpdate_time(Date update_time) {
        this.update_time = update_time;
    }
    public String getType() {
        return type;
    }
    public void setType(String type) {
        this.type = type;
    }
}

6. Main Line Code

  • configuration file
checkpoint.interval=30000
restart.times=3
state.backend.path=file:///D:\\doit12logs

bootstrap.servers=linux01:9092,linux02:9092,linux03:9092
group.id=g22
auto.offset.reset=earliest

1. Loading data sources from kafka

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.sql.*;

/**
 * @date: 2020/3/16 21:22
 * @site: www.ianlou.cn
 * @author: lekko Six Water
 * @qq: 496208110
 * @description:
 */
public class OrderJoinAdv {
    public static void main(String[] args) throws Exception {
        //Get Configuration File
        ParameterTool propertiesFile = ParameterTool.fromPropertiesFile(args[0]);

        StreamExecutionEnvironment env = FlinkUtilsV2.getEnv();

        // Use EventTime as the time standard
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // Loading data sources from kafka
        DataStream<String> orderMainLines = FlinkUtilsV2.createKafkaDataStream(propertiesFile, "ordermain",
                "g011", SimpleStringSchema.class);

        DataStream<String> orderDetailLines = FlinkUtilsV2.createKafkaDataStream(propertiesFile, "orderdetail",
                "g011", SimpleStringSchema.class);

2. Parse the json data pulled from kafka

        //  2. Analyzing json data pulled from kafka

        // Loading data in OrderMain
        SingleOutputStreamOperator<OrderMainBean> orderMainBeanDS = orderMainLines.process(new ProcessFunction<String,
                OrderMainBean>() {
            @Override
            public void processElement(String input, Context ctx, Collector<OrderMainBean> out) throws Exception {

                try {
                    JSONObject jsonObject = JSON.parseObject(input);
                    String type = jsonObject.getString("type");
                    if (type.equals("INSERT") || type.equals("UPDATE")) {

                        //Remove the contents from {} of the data[] array
                        JSONArray jsonArray = jsonObject.getJSONArray("data");
                        for (int i = 0; i < jsonArray.size(); i++) {
                            OrderMainBean orderMain = jsonArray.getObject(i, OrderMainBean.class);
                            orderMain.setType(type); //Set Operation Type
                            out.collect(orderMain);
                        }
                    }
                } catch (Exception e) {
                    //e.printStackTrace();
                    //TODO Records Error Data
                }
            }
        });


        //Loading data in OrderDetail
        SingleOutputStreamOperator<OrderDetailBean> orderDetailBeanDS =
                orderDetailLines.process(new ProcessFunction<String,
                        OrderDetailBean>() {
                    @Override
                    public void processElement(String input, Context ctx, Collector<OrderDetailBean> out) throws Exception {

                        try {
                            JSONObject jsonObject = JSON.parseObject(input);
                            String type = jsonObject.getString("type");
                            if (type.equals("INSERT") || type.equals("UPDATE")) {
                                JSONArray jsonArray = jsonObject.getJSONArray("data");
                                for (int i = 0; i < jsonArray.size(); i++) {
                                    OrderDetailBean orderDetail = jsonArray.getObject(i, OrderDetailBean.class);
                                    orderDetail.setType(type); //Set Operation Type
                                    out.collect(orderDetail);
                                }
                            }
                        } catch (Exception e) {
                            //e.printStackTrace();
                            //Record wrong data
                        }
                    }
                });

3. Extract EventTime to generate WaterMark

        int delaySeconds = 2;
        int windowSize = 5;

       SingleOutputStreamOperator<OrderMainBean> orderMainWithWaterMark =
                orderMainBeanDS.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor
                        <OrderMainBean>(Time.seconds(delaySeconds)) {
                    @Override
                    public long extractTimestamp(OrderMainBean element) {
                        return element.getCreate_time().getTime();
                    }
                });

        SingleOutputStreamOperator<OrderDetailBean> orderDetailWithWaterMark
                =
                orderDetailBeanDS.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor
                        <OrderDetailBean>(Time.seconds(delaySeconds)) {
                    @Override
                    public long extractTimestamp(OrderDetailBean element) {
                        return element.getCreate_time().getTime();
                    }
                });

4. Left Out JOIN with Order Detail as the left table

      DataStream<Tuple2<OrderDetailBean, OrderMainBean>> joined =
                orderDetailWithWaterMark.coGroup(orderMainWithWaterMark)
                        .where(new KeySelector<OrderDetailBean, Long>() {
                            @Override
                            public Long getKey(OrderDetailBean value) throws Exception {
                                return value.getOrder_id();
                            }
                        })
                        .equalTo(new KeySelector<OrderMainBean, Long>() {
                            @Override
                            public Long getKey(OrderMainBean value) throws Exception {
                                return value.getOid();
                            }
                        })
                        .window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
                        .apply(new CoGroupFunction<OrderDetailBean, OrderMainBean, Tuple2<OrderDetailBean,
                                OrderMainBean>>() {

                            // After join ing, the output
                            @Override
                            public void coGroup(Iterable<OrderDetailBean> first, Iterable<OrderMainBean> second,
                                                Collector<Tuple2<OrderDetailBean, OrderMainBean>> out) throws Exception {
                                for (OrderDetailBean orderDetailBean : first) {

                                    boolean isJoined = false;
                                    for (OrderMainBean orderMainBean : second) {
                                        out.collect(Tuple2.of(orderDetailBean, orderMainBean));

                                        isJoined = true;
                                    }

                                    if (!isJoined) {
                                        out.collect(Tuple2.of(orderDetailBean, null));
                                    }
                                }
                            }
                        });

5. Handle special cases that will occur: there is no join on

There are two scenarios without join s:

  • (1) Late arrival of left table data
    ->Use the flow measurement output, pull it out separately, then look up the database of the main table (right table) of the order, and associate the data of the main table (right table)
  • (2) Late data on the right
    ->Query the information of the right table based on the query criteria of the left table, look up the database or api interface
  • Then, write the two data union s together
5.1. Processing late data for the left table (orderDetailWithWaterMark)
 OutputTag<OrderDetailBean> outputTag = new OutputTag<OrderDetailBean>("leftLate-date") {
        };

        // Divide windows to match window size and type of join above
        SingleOutputStreamOperator<OrderDetailBean> orderDetailLateData =
                orderDetailWithWaterMark.windowAll(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
                        .sideOutputLateData(outputTag)
                        .apply(new AllWindowFunction<OrderDetailBean, OrderDetailBean, TimeWindow>() {
                            @Override
                            public void apply(TimeWindow window, Iterable<OrderDetailBean> values,
                                              Collector<OrderDetailBean> out) throws Exception {
                                //Nothing to do, just label the late data
                                // apply is used because you want to operate on the full amount of data in the window
                            }
                        });

        // Get Late Data from Left Table
        DataStream<OrderDetailBean> leftLateDate = orderDetailLateData.getSideOutput(outputTag);


        // Lookup, associate right table (OrderMain) data
        SingleOutputStreamOperator<Tuple2<OrderDetailBean, OrderMainBean>> lateOrderDetailAndOrderMain =
                leftLateDate.map(new RichMapFunction<OrderDetailBean, Tuple2<OrderDetailBean, OrderMainBean>>() {

            private transient Connection conn = null;

            // Create a JDBC connection
            @Override
            public void open(Configuration parameters) throws Exception {
                conn = DriverManager.getConnection(
                        "jdbc:mysql://linux04:3306/doit12?characterEncoding=utf8",
                        "root",
                        "123456"
                );
            }

            //query data base
            @Override
            public Tuple2<OrderDetailBean, OrderMainBean> map(OrderDetailBean value) throws Exception {
                Long order_id = value.getOrder_id();
                String type = value.getType();
                OrderMainBean orderMainB = queryOrderMainFromMySQL(order_id, type, conn);

                return Tuple2.of(value, orderMainB);
            }

            // Close JDBC connection
            @Override
            public void close() throws Exception {
                conn.close();
            }
        });
5.2. Processing data late for the right table - > that is, data null for the right table after join
       SingleOutputStreamOperator<Tuple2<OrderDetailBean, OrderMainBean>> lateOrderMainAndOrderMain =
                joined.map(new RichMapFunction<Tuple2<OrderDetailBean, OrderMainBean>, Tuple2<OrderDetailBean,
                        OrderMainBean>>() {

            private transient Connection conn = null;

            @Override
            public void open(Configuration parameters) throws Exception {
                conn = DriverManager.getConnection(
                        "jdbc:mysql://linux04:3306/doit12?characterEncoding=utf8",
                        "root",
                        "123456"
                );
            }

            //Query database Query data from right table according to the query criteria of left table
            @Override
            public Tuple2<OrderDetailBean, OrderMainBean> map(Tuple2<OrderDetailBean, OrderMainBean> value) throws Exception {
                OrderMainBean orderMainB = null;
                if (value.f1 == null) {
                    Long order_id = value.f0.getOrder_id();
                    String type = value.f0.getType();
                  orderMainB = queryOrderMainFromMySQL(order_id, type, conn);
                }
                return Tuple2.of(value.f0, orderMainB);
            }

            @Override
            public void close() throws Exception {
                conn.close();
            }
        });
5.3. union data together
  DataStream<Tuple2<OrderDetailBean, OrderMainBean>> allOrderStream =
                lateOrderMainAndOrderMain.union(lateOrderDetailAndOrderMain);


        allOrderStream.print();

        FlinkUtilsV2.getEnv().execute("OrderJoinAdv");
    }
 

6. Tool class for querying MySql database

   private static OrderMainBean queryOrderMainFromMySQL(Long order_id, String type, Connection conn) throws Exception {

        ResultSet resultSet = null;
        PreparedStatement preparedStatement = null;
        try {
            String sql = "select oid, create_time, total_money, status, update_time, province, uid from ordermain" +
                    " where oid = ?";
            preparedStatement = conn.prepareStatement(sql);
            preparedStatement.setLong(1, order_id);
            resultSet = preparedStatement.executeQuery();
        } finally {
            if (resultSet != null) {
                resultSet.close();
            }
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if(conn != null){
                conn.close();
            }
        }


        long oid = resultSet.getLong("oid");
        Date create_time = resultSet.getDate("create_time");
        double total_money1 = resultSet.getDouble("total_money");
        int status = resultSet.getInt("status");
        Date update_time = resultSet.getDate("update_time");
        String province = resultSet.getString("province");
        String uid = resultSet.getString("uid");

        OrderMainBean orderMain = new OrderMainBean(oid, create_time, total_money1, status, update_time,
                province, uid, type);

        return orderMain;
    }
}
23 original articles published, 7 praised, 1114 visits
Private letter follow

Keywords: Apache kafka Java Database

Added by ThE_eNd on Tue, 17 Mar 2020 03:35:31 +0200