Flink real-time data warehouse of big data project (DWM layer)

Design ideas

In the past, we split the data into independent Kafka topics through diversion and other processing methods. Next, when processing the data, we should consider processing the index items used in real-time calculation. Timeliness is the pursuit of real-time data warehouse. Therefore, in some scenarios, it is not necessary to have a large and complete middle layer like off-line data warehouse. We only need the middle layer to save some calculation indexes, Provide convenience for the next calculation.
Therefore, we need to consider some real-time calculation index requirements. Outputting these indexes in the form of topic wide table is the dws layer

Some of the indicators listed here are mainly service visualization and large screen computing. After all, why hasn't the dwm layer appeared? Don't worry. The dwm layer mainly serves the dws layer. Because some requirements directly from the dwd layer to the dws layer will have some complex calculations, and the results of these calculations are likely to be reused by multiple topics in the dws layer, so the dwd layer forms a layer of dwm, which mainly involves business

  • Visitor UV calculation
  • Jump out of detail calculation
  • Order width table
  • Payment width table

Visitor UV calculation

UV(Unique Visitor), i.e. independent visitor, also known as dau (daily active user), i.e. daily active user
The user behavior log identifies the visitors of the day in two ways:

  1. Identify the first page opened by the visitor, indicating that the visitor begins to enter our application
  2. Since visitors can enter the application many times a day, we can de duplicate it within a day

Data flow: Web / app - > nginx - > springboot - > Kafka (ODS) - > flinkapp - > Kafka (DWD) - > flinkapp - > Kafka (DWM)
Program: mocklog - > nginx - > logger sh -> Kafka(ZK) -> BaseLogApp -> kafka -> UniqueVisitApp -> Kafka

        //TODO 1. Get execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //TODO 2. Read Kafka dwd_page_log subject data
        String groupId = "unique_visit_app_210325";
        String sourceTopic = "dwd_page_log";
        String sinkTopic = "dwm_unique_visit";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));
        
        //TODO 3. Convert each row of data into a JSON object
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);

        //TODO 4. Filtering data status programming only retains the data of the first login of each mid every day
        KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));
        SingleOutputStreamOperator<JSONObject> uvDS = keyedStream.filter(new RichFilterFunction<JSONObject>() {

            private ValueState<String> dateState;
            private SimpleDateFormat simpleDateFormat;
            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("date-state", String.class);
                //Set the timeout time of the status and how to update the time
                StateTtlConfig stateTtlConfig = new StateTtlConfig
                        .Builder(Time.hours(24))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .build();
                valueStateDescriptor.enableTimeToLive(stateTtlConfig);
                dateState = getRuntimeContext().getState(valueStateDescriptor);

                simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
            }
            @Override
            public boolean filter(JSONObject value) throws Exception {
                //Take out the previous page information
                String lastPageId = value.getJSONObject("page").getString("last_page_id");
                //Determine whether the previous page is Null
                if (lastPageId == null || lastPageId.length() <= 0) {
                    //Fetch status data
                    String lastDate = dateState.value();
                    //Take out today's date
                    String curDate = simpleDateFormat.format(value.getLong("ts"));
                    //Judge whether the two dates are the same
                    if (!curDate.equals(lastDate)) {
                        dateState.update(curDate);
                        return true;
                    }
                }
                return false;
            }
        });
        //TODO 5. Write data to Kafka
        uvDS.print();
        uvDS.map(JSONAware::toJSONString)
                .addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));

        //TODO 6. Start task
        env.execute("UniqueVisitApp");

Data flow: Web / app - > nginx - > springboot - > Kafka (ODS) - > flinkapp - > Kafka (DWD) - > flinkapp - > Kafka (DWM)
Program: mocklog - > nginx - > logger sh -> Kafka(ZK) -> BaseLogApp -> kafka -> UserJumpDetailApp -> Kafka

Jump out of detail calculation

        //TODO 1. Get execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); //Production environment, consistent with the number of Kafka partitions

        //TODO 2. Read the data of Kafka topic and create a stream
        String sourceTopic = "dwd_page_log";
        String groupId = "userJumpDetailApp";
        String sinkTopic = "dwm_user_jump_detail";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));

        //TODO 3. Convert each row of data into a JSON object and extract the timestamp to generate a Watermark
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject)
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                        .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                            @Override
                            public long extractTimestamp(JSONObject element, long recordTimestamp) {
                                return element.getLong("ts");
                            }
                        }));

        //TODO 4. Define pattern sequence
        Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                String lastPageId = value.getJSONObject("page").getString("last_page_id");
                return lastPageId == null || lastPageId.length() <= 0;
            }
        }).next("next").where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                String lastPageId = value.getJSONObject("page").getString("last_page_id");
                return lastPageId == null || lastPageId.length() <= 0;
            }
        }).within(Time.seconds(10));

        //Defining pattern sequences using circular patterns
        Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                String lastPageId = value.getJSONObject("page").getString("last_page_id");
                return lastPageId == null || lastPageId.length() <= 0;
            }
        })
                .times(2)
                .consecutive() //Strict nearest neighbor
                .within(Time.seconds(10));

        //TODO 5. Apply a pattern sequence to a stream
        PatternStream<JSONObject> patternStream = CEP
                .pattern(jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"))
                        , pattern);

        //TODO 6. Extract and timeout events on matching
        OutputTag<JSONObject> timeOutTag = new OutputTag<JSONObject>("timeOut") {
        };
        SingleOutputStreamOperator<JSONObject> selectDS = patternStream.select(timeOutTag,
                new PatternTimeoutFunction<JSONObject, JSONObject>() {
                    @Override
                    public JSONObject timeout(Map<String, List<JSONObject>> map, long ts) throws Exception {
                        return map.get("start").get(0);
                    }
                }, new PatternSelectFunction<JSONObject, JSONObject>() {
                    @Override
                    public JSONObject select(Map<String, List<JSONObject>> map) throws Exception {
                        return map.get("start").get(0);
                    }
                });
        DataStream<JSONObject> timeOutDS = selectDS.getSideOutput(timeOutTag);

        //TODO 7.UNION two events
        DataStream<JSONObject> unionDS = selectDS.union(timeOutDS);

        //TODO 8. Write data to Kafka
        unionDS.print();
        unionDS.map(JSONAware::toJSONString)
                .addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));

        //TODO 9. Start task
        env.execute("UserJumpDetailApp");

    }

Order wide table design

                //TODO 1. Get execution environment
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);

                //TODO 2. Read the data of Kafka theme and convert it into JavaBean object & extract the timest amp to generate WaterMark
                String orderInfoSourceTopic = "dwd_order_info";
                String orderDetailSourceTopic = "dwd_order_detail";
                String orderWideSinkTopic = "dwm_order_wide";
                String groupId = "order_wide_group_0325";
                SingleOutputStreamOperator<OrderInfo> orderInfoDS = env.addSource(MyKafkaUtil.getKafkaConsumer(orderInfoSourceTopic, groupId))
                        .map(line -> {
                                OrderInfo orderInfo = JSON.parseObject(line, OrderInfo.class);

                                String create_time = orderInfo.getCreate_time();
                                String[] dateTimeArr = create_time.split(" ");
                                orderInfo.setCreate_date(dateTimeArr[0]);
                                orderInfo.setCreate_hour(dateTimeArr[1].split(":")[0]);

                                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                orderInfo.setCreate_ts(sdf.parse(create_time).getTime());

                                return orderInfo;
                        }).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
                                        @Override
                                        public long extractTimestamp(OrderInfo element, long recordTimestamp) {
                                                return element.getCreate_ts();
                                        }
                                }));
                SingleOutputStreamOperator<OrderDetail> orderDetailDS = env.addSource(MyKafkaUtil.getKafkaConsumer(orderDetailSourceTopic, groupId))
                        .map(line -> {
                                OrderDetail orderDetail = JSON.parseObject(line, OrderDetail.class);
                                String create_time = orderDetail.getCreate_time();

                                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                orderDetail.setCreate_ts(sdf.parse(create_time).getTime());

                                return orderDetail;
                        }).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderDetail>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<OrderDetail>() {
                                        @Override
                                        public long extractTimestamp(OrderDetail element, long recordTimestamp) {
                                                return element.getCreate_ts();
                                        }
                                }));

                //TODO 3. Double flow JOIN
                SingleOutputStreamOperator<OrderWide> orderWideWithNoDimDS = orderInfoDS.keyBy(OrderInfo::getId)
                        .intervalJoin(orderDetailDS.keyBy(OrderDetail::getOrder_id))
                        .between(Time.seconds(-5), Time.seconds(5)) //Maximum delay time in the build environment
                        .process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
                                @Override
                                public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, Context ctx, Collector<OrderWide> out) throws Exception {
                                        out.collect(new OrderWide(orderInfo, orderDetail));
                                }
                        });

                //Print test
                orderWideWithNoDimDS.print("orderWideWithNoDimDS>>>>>>>>>");

                //TODO 4. Associated dimension information HBase Phoenix
//        orderWideWithNoDimDS.map(orderWide -> {
//            //Associated user dimension
//            Long user_id = orderWide.getUser_id();
//            //According to user_id query Phoenix user information
//            //Add user information to orderWide
//            //Area
//            //SKU
//            //SPU
//            //. . . 
//            //Return results
//            return orderWide;
//        });

                //4.1 associated user dimension
                SingleOutputStreamOperator<OrderWide> orderWideWithUserDS = AsyncDataStream.unorderedWait(
                        orderWideWithNoDimDS,
                        new DimAsyncFunction<OrderWide>("DIM_USER_INFO") {
                                @Override
                                public String getKey(OrderWide orderWide) {
                                        return orderWide.getUser_id().toString();
                                }

                                @Override
                                public void join(OrderWide orderWide, JSONObject dimInfo) throws ParseException {
                                        orderWide.setUser_gender(dimInfo.getString("GENDER"));

                                        String birthday = dimInfo.getString("BIRTHDAY");
                                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");

                                        long currentTs = System.currentTimeMillis();
                                        long ts = sdf.parse(birthday).getTime();

                                        long age = (currentTs - ts) / (1000 * 60 * 60 * 24 * 365L);

                                        orderWide.setUser_age((int) age);
                                }
                        },
                        60,
                        TimeUnit.SECONDS);

                //Print test
//        orderWideWithUserDS.print("orderWideWithUserDS");

                //4.2 related region dimension
                SingleOutputStreamOperator<OrderWide> orderWideWithProvinceDS = AsyncDataStream.unorderedWait(orderWideWithUserDS,
                        new DimAsyncFunction<OrderWide>("DIM_BASE_PROVINCE") {
                                @Override
                                public String getKey(OrderWide orderWide) {
                                        return orderWide.getProvince_id().toString();
                                }

                                @Override
                                public void join(OrderWide orderWide, JSONObject dimInfo) throws ParseException {
                                        orderWide.setProvince_name(dimInfo.getString("NAME"));
                                        orderWide.setProvince_area_code(dimInfo.getString("AREA_CODE"));
                                        orderWide.setProvince_iso_code(dimInfo.getString("ISO_CODE"));
                                        orderWide.setProvince_3166_2_code(dimInfo.getString("ISO_3166_2"));
                                }
                        }, 60, TimeUnit.SECONDS);

                //4.3 associated SKU dimensions
                SingleOutputStreamOperator<OrderWide> orderWideWithSkuDS = AsyncDataStream.unorderedWait(
                        orderWideWithProvinceDS, new DimAsyncFunction<OrderWide>("DIM_SKU_INFO") {
                                @Override
                                public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException {
                                        orderWide.setSku_name(jsonObject.getString("SKU_NAME"));
                                        orderWide.setCategory3_id(jsonObject.getLong("CATEGORY3_ID"));
                                        orderWide.setSpu_id(jsonObject.getLong("SPU_ID"));
                                        orderWide.setTm_id(jsonObject.getLong("TM_ID"));
                                }

                                @Override
                                public String getKey(OrderWide orderWide) {
                                        return String.valueOf(orderWide.getSku_id());
                                }
                        }, 60, TimeUnit.SECONDS);

                //4.4 associated SPU dimension
                SingleOutputStreamOperator<OrderWide> orderWideWithSpuDS = AsyncDataStream.unorderedWait(
                        orderWideWithSkuDS, new DimAsyncFunction<OrderWide>("DIM_SPU_INFO") {
                                @Override
                                public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException {
                                        orderWide.setSpu_name(jsonObject.getString("SPU_NAME"));
                                }

                                @Override
                                public String getKey(OrderWide orderWide) {
                                        return String.valueOf(orderWide.getSpu_id());
                                }
                        }, 60, TimeUnit.SECONDS);

                //4.5 associated TM dimensions
                SingleOutputStreamOperator<OrderWide> orderWideWithTmDS = AsyncDataStream.unorderedWait(
                        orderWideWithSpuDS, new DimAsyncFunction<OrderWide>("DIM_BASE_TRADEMARK") {
                                @Override
                                public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException {
                                        orderWide.setTm_name(jsonObject.getString("TM_NAME"));
                                }

                                @Override
                                public String getKey(OrderWide orderWide) {
                                        return String.valueOf(orderWide.getTm_id());
                                }
                        }, 60, TimeUnit.SECONDS);

                //4.6 associated Category dimension
                SingleOutputStreamOperator<OrderWide> orderWideWithCategory3DS = AsyncDataStream.unorderedWait(
                        orderWideWithTmDS, new DimAsyncFunction<OrderWide>("DIM_BASE_CATEGORY3") {
                                @Override
                                public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException {
                                        orderWide.setCategory3_name(jsonObject.getString("NAME"));
                                }

                                @Override
                                public String getKey(OrderWide orderWide) {
                                        return String.valueOf(orderWide.getCategory3_id());
                                }
                        }, 60, TimeUnit.SECONDS);

                orderWideWithCategory3DS.print("orderWideWithCategory3DS>>>>>>>>>>>");

                //TODO 5. Write data to Kafka
                orderWideWithCategory3DS
                        .map(JSONObject::toJSONString)
                        .addSink(MyKafkaUtil.getKafkaProducer(orderWideSinkTopic));

                //TODO 6. Start task
                env.execute("OrderWideApp");
        }

Pay wide table design

        //TODO 1. Get execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //TODO 2. Read the data of Kafka theme, create a stream and convert it into JavaBean object, extract the timestamp and generate WaterMark
        String groupId = "payment_wide_group";
        String paymentInfoSourceTopic = "dwd_payment_info";
        String orderWideSourceTopic = "dwm_order_wide";
        String paymentWideSinkTopic = "dwm_payment_wide";
        SingleOutputStreamOperator<OrderWide> orderWideDS = env.addSource(MyKafkaUtil.getKafkaConsumer(orderWideSourceTopic, groupId))
                .map(line -> JSON.parseObject(line, OrderWide.class))
                .assignTimestampsAndWatermarks(WatermarkStrategy.<OrderWide>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<OrderWide>() {
                            @Override
                            public long extractTimestamp(OrderWide element, long recordTimestamp) {
                                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                try {
                                    return sdf.parse(element.getCreate_time()).getTime();
                                } catch (ParseException e) {
                                    e.printStackTrace();
                                    return recordTimestamp;
                                }
                            }
                        }));
        SingleOutputStreamOperator<PaymentInfo> paymentInfoDS = env.addSource(MyKafkaUtil.getKafkaConsumer(paymentInfoSourceTopic, groupId))
                .map(line -> JSON.parseObject(line, PaymentInfo.class))
                .assignTimestampsAndWatermarks(WatermarkStrategy.<PaymentInfo>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<PaymentInfo>() {
                            @Override
                            public long extractTimestamp(PaymentInfo element, long recordTimestamp) {
                                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                try {
                                    return sdf.parse(element.getCreate_time()).getTime();
                                } catch (ParseException e) {
                                    e.printStackTrace();
                                    return recordTimestamp;
                                }
                            }
                        }));
                        
        //TODO 3. Double flow JOIN
        SingleOutputStreamOperator<PaymentWide> paymentWideDS = paymentInfoDS.keyBy(PaymentInfo::getOrder_id)
                .intervalJoin(orderWideDS.keyBy(OrderWide::getOrder_id))
                .between(Time.minutes(-15), Time.seconds(5))
                .process(new ProcessJoinFunction<PaymentInfo, OrderWide, PaymentWide>() {
                    @Override
                    public void processElement(PaymentInfo paymentInfo, OrderWide orderWide, Context ctx, Collector<PaymentWide> out) throws Exception {
                        out.collect(new PaymentWide(paymentInfo, orderWide));
                    }
                });

        //TODO 4. Write data to Kafka
        paymentWideDS.print(">>>>>>>>>");
        paymentWideDS
                .map(JSONObject::toJSONString)
                .addSink(MyKafkaUtil.getKafkaProducer(paymentWideSinkTopic));

        //TODO 5. Start task
        env.execute("PaymentWideApp");
    }

DWM layer summary

It is mainly used to convert one kind of details into another through calculation to deal with subsequent statistics and calculation
De duplication using state
CEP can be used to screen and judge a group of data
Learn to use intervaljoin to process stream join
Handle dimension association and optimize performance through caching and asynchronous query

Keywords: Big Data kafka flink

Added by SteveMellor on Thu, 03 Feb 2022 21:34:05 +0200