This paper is based on Flink SQL and hudi Build quasi real-time data warehouse from Flink kafka After accessing the data, all data is stored in hudi, including all intermediate processing data and final data. Articles< Real time data warehouse | based on flink1 Exploration and practice of building real-time data warehouse based on SQL 11 (qq.com) >This paper describes the real-time data warehouse based on Flink SQL and kafka. This paper is based on the above article.
While completing the practice of this paper, you can refer to the above articles synchronously.
Final result:





Background introduction
Taking e-commerce business as an example, this paper shows the data processing flow of quasi real-time data warehouse.
Component and configuration description
Flink 1.13.3
flink cdc 2.0.2
hudi 0.10.0 (latest release on 2021.12.08, address: https://github.com/apache/hudi/releases/tag/release-0.10.0 )
hadoop 3.2.0
zeppelin 0.10.0
mysql 5.7 (enable binlog)
kafka 2.5.0
Due to the convenience of zeppelin, this article is based on zeppelin for task submission. If you can't use zeppelin, you can refer to: https://lrting-top.blog.csdn.net/article/details/120681666 . Of course, if you don't want to use zeppelin, there's no problem submitting with Flink SQL Client.
In this experiment, Flink starts checkpoint and is set to 60s.
Before completing the following tasks, make sure you have
- Deploy Flink 1.13.3, package the jar package corresponding to hudi correctly and place it in the lib directory of Flink, and place the jar package corresponding to flink cdc in the lib directory.
- Deploy and start zeppelin 0.10.0, and specify Flink on the Flink interpreter of zeppelin_ Home and HADOOP_CLASSPATH
- At the same time, start hadoop, mysql and kafka
Processing flow

MySQL table creation and raw data loading
First, obtain the mysql table creation statement and simulation data from the following address:
After downloading the above table creation statement, enter mysql and create a new realtime_dw_demo_1. Enter the database realtime_dw_demo_1. Initialize the database
mysql -u root -p create database realtime_dw_demo_1; use database realtime_dw_demo_1; source realtime_table.sql
Synchronize mysql table data to kafka
Synchronize mysql data to kafka by using flick CDC. The following are related sql statements:
Read mysql source table data
%flink.ssql drop table if exists base_category1; drop table if exists base_category2; drop table if exists base_category3; drop table if exists base_province; drop table if exists base_region; drop table if exists base_trademark; drop table if exists date_info; drop table if exists holiday_info; drop table if exists holiday_year; drop table if exists order_detail; drop table if exists order_info; drop table if exists order_status_log; drop table if exists payment_info; drop table if exists sku_info; drop table if exists user_info; ---mysql table CREATE TABLE `base_category1` ( `id` bigint NOT NULL, `name` string NOT NULL, PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_category1' ); CREATE TABLE `base_category2` ( `id` bigint NOT NULL COMMENT 'number', `name` string NOT NULL COMMENT 'Secondary classification name', `category1_id` bigint NULL COMMENT 'Primary classification number', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_category2' ); CREATE TABLE `base_category3` ( `id` bigint NOT NULL COMMENT 'number', `name` string NOT NULL COMMENT 'Three level classification name', `category2_id` bigint NULL COMMENT 'Secondary classification number', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_category3' ); CREATE TABLE `base_province` ( `id` int NULL COMMENT 'id', `name` string NULL COMMENT 'Province name', `region_id` int NULL COMMENT 'Large area id', `area_code` string NULL COMMENT 'Administrative location code', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_province' ); CREATE TABLE `base_region` ( `id` int NOT NULL COMMENT 'Large area id', `region_name` string NULL COMMENT 'Region name', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_region' ); CREATE TABLE `base_trademark` ( `tm_id` string NULL COMMENT 'brand id', `tm_name` string NULL COMMENT 'Brand name', PRIMARY KEY (`tm_id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'base_trademark' ); CREATE TABLE `date_info` ( `date_id` int NOT NULL, `week_id` int NULL, `week_day` int NULL, `day` int NULL, `month` int NULL, `quarter` int NULL, `year` int NULL, `is_workday` int NULL, `holiday_id` int NULL, PRIMARY KEY (`date_id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'date_info' ); CREATE TABLE `holiday_info` ( `holiday_id` int NOT NULL, `holiday_name` string NULL, PRIMARY KEY (`holiday_id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'holiday_info' ); CREATE TABLE `holiday_year` ( `year_id` int NULL, `holiday_id` int NULL, `start_date_id` int NULL, `end_date_id` int NULL, PRIMARY KEY (`end_date_id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'holiday_year' ); CREATE TABLE `order_detail` ( `id` bigint NOT NULL COMMENT 'number', `order_id` bigint NULL COMMENT 'Order No', `sku_id` bigint NULL COMMENT 'sku_id', `sku_name` string NULL COMMENT 'sku Name (redundant))', `img_url` string NULL COMMENT 'Picture name (redundant))', `order_price` decimal(10,2) NULL COMMENT 'Purchase price(When placing an order sku Price)', `sku_num` string NULL COMMENT 'Number of purchases', `create_time` timestamp(3) NULL COMMENT 'Creation time', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'order_detail' ); CREATE TABLE `order_info` ( `id` bigint NOT NULL COMMENT 'number', `consignee` string NULL COMMENT 'consignee', `consignee_tel` string NULL COMMENT 'Recipient phone', `total_amount` decimal(10,2) NULL COMMENT 'Total amount', `order_status` string NULL COMMENT 'Order status', `user_id` bigint NULL COMMENT 'user id', `payment_way` string NULL COMMENT 'payment method', `delivery_address` string NULL COMMENT 'Shipping address', `order_comment` string NULL COMMENT 'Order remarks', `out_trade_no` string NULL COMMENT 'Order transaction number (for third party payment))', `trade_body` string NULL COMMENT 'Order description(For third party payment)', `create_time` timestamp(3) NULL COMMENT 'Creation time', `operate_time` timestamp(3) NULL COMMENT 'Operation time', `expire_time` timestamp(3) NULL COMMENT 'Failure time', `tracking_no` string NULL COMMENT 'Logistics order No', `parent_order_id` bigint NULL COMMENT 'Parent order number', `img_url` string NULL COMMENT 'Picture path', `province_id` int NULL COMMENT 'region', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'order_info' ); CREATE TABLE `order_status_log` ( `id` int NOT NULL, `order_id` int NULL, `order_status` int NULL, `operate_time` timestamp(3) NULL, PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'order_status_log' ); CREATE TABLE `payment_info` ( `id` bigint NOT NULL COMMENT 'number', `out_trade_no` string NULL COMMENT 'External business No', `order_id` string NULL COMMENT 'Order No', `user_id` string NULL COMMENT 'User number', `alipay_trade_no` string NULL COMMENT 'Alipay transaction flow code', `total_amount` decimal(16,2) NULL COMMENT 'Payment amount', `subject` string NULL COMMENT 'Transaction content', `payment_type` string NULL COMMENT 'Payment method', `payment_time` string NULL COMMENT 'Payment time', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'payment_info' ); CREATE TABLE `sku_info` ( `id` bigint NOT NULL COMMENT 'skuid(itemID)', `spu_id` bigint NULL COMMENT 'spuid', `price` decimal(10,0) NULL COMMENT 'Price', `sku_name` string NULL COMMENT 'sku name', `sku_desc` string NULL COMMENT 'Product specification description', `weight` decimal(10,2) NULL COMMENT 'weight', `tm_id` bigint NULL COMMENT 'brand(redundancy)', `category3_id` bigint NULL COMMENT 'Three level classification id(redundancy)', `sku_default_img` string NULL COMMENT 'Default display picture(redundancy)', `create_time` timestamp(3) NULL COMMENT 'Creation time', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'sku_info' ); CREATE TABLE `user_info` ( `id` bigint NOT NULL COMMENT 'number', `login_name` string NULL COMMENT 'User name', `nick_name` string NULL COMMENT 'User nickname', `passwd` string NULL COMMENT 'User password', `name` string NULL COMMENT 'User name', `phone_num` string NULL COMMENT 'cell-phone number', `email` string NULL COMMENT 'mailbox', `head_img` string NULL COMMENT 'head portrait', `user_level` string NULL COMMENT 'User level', `birthday` date NULL COMMENT 'User birthday', `gender` string NULL COMMENT 'Gender M male,F female', `create_time` timestamp(3) NULL COMMENT 'Creation time', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'mysql-cdc', 'hostname' = 'vhost-118-23', 'port' = '3306', 'username' = 'root', 'password' = 'Pass-123-root', 'server-time-zone'= 'Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name' = 'realtime_dw_demo_1', 'table-name' = 'user_info' );
kafka sink table creation statement
%flink.ssql drop table if exists base_category1_topic; drop table if exists base_category2_topic; drop table if exists base_category3_topic; drop table if exists base_province_topic; drop table if exists base_region_topic; drop table if exists base_trademark_topic; drop table if exists date_info_topic; drop table if exists holiday_info_topic; drop table if exists holiday_year_topic; drop table if exists order_detail_topic; drop table if exists order_info_topic; drop table if exists order_status_log_topic; drop table if exists payment_info_topic; drop table if exists sku_info_topic; drop table if exists user_info_topic; CREATE TABLE `base_category1_topic` ( `id` bigint NOT NULL COMMENT 'number', `name` string NOT NULL COMMENT 'Classification name', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category1' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ); CREATE TABLE `base_category2_topic` ( `id` bigint NOT NULL COMMENT 'number', `name` string NOT NULL COMMENT 'Secondary classification name', `category1_id` bigint NULL COMMENT 'Primary classification number', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category2' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ); CREATE TABLE `base_category3_topic` ( `id` bigint NOT NULL COMMENT 'number', `name` string NOT NULL COMMENT 'Three level classification name', `category2_id` bigint NULL COMMENT 'Secondary classification number', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category3' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ); CREATE TABLE `base_province_topic` ( `id` int NULL COMMENT 'id', `name` string NULL COMMENT 'Province name', `region_id` int NULL COMMENT 'Large area id', `area_code` string NULL COMMENT 'Administrative location code' ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_province' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ); CREATE TABLE `base_region_topic` ( `id` int NOT NULL COMMENT 'Large area id', `region_name` string NULL COMMENT 'Region name', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_region' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ); CREATE TABLE `base_trademark_topic` ( `tm_id` string NULL COMMENT 'brand id', `tm_name` string NULL COMMENT 'Brand name', PRIMARY KEY (`tm_id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_trademark' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ); CREATE TABLE `date_info_topic` ( `date_id` int NOT NULL, `week_id` int NULL, `week_day` int NULL, `day` int NULL, `month` int NULL, `quarter` int NULL, `year` int NULL, `is_workday` int NULL, `holiday_id` int NULL, PRIMARY KEY (`date_id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.date_info' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ); CREATE TABLE `holiday_info_topic` ( `holiday_id` int NOT NULL, `holiday_name` string NULL, PRIMARY KEY (`holiday_id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.holiday_info' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ); CREATE TABLE `holiday_year_topic` ( `year_id` int NULL, `holiday_id` int NULL, `start_date_id` int NULL, `end_date_id` int NULL ) with( 'connector' = 'kafka' ,'topic' = 'my5.holiday_year' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ); CREATE TABLE `order_detail_topic` ( `id` bigint NOT NULL COMMENT 'number', `order_id` bigint NULL COMMENT 'Order No', `sku_id` bigint NULL COMMENT 'sku_id', `sku_name` string NULL COMMENT 'sku Name (redundant))', `img_url` string NULL COMMENT 'Picture name (redundant))', `order_price` decimal(10,2) NULL COMMENT 'Purchase price(When placing an order sku Price)', `sku_num` string NULL COMMENT 'Number of purchases', `create_time` timestamp(3) NULL COMMENT 'Creation time', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.order_detail' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ); CREATE TABLE `order_info_topic` ( `id` bigint NOT NULL COMMENT 'number', `consignee` string NULL COMMENT 'consignee', `consignee_tel` string NULL COMMENT 'Recipient phone', `total_amount` decimal(10,2) NULL COMMENT 'Total amount', `order_status` string NULL COMMENT 'Order status', `user_id` bigint NULL COMMENT 'user id', `payment_way` string NULL COMMENT 'payment method', `delivery_address` string NULL COMMENT 'Shipping address', `order_comment` string NULL COMMENT 'Order remarks', `out_trade_no` string NULL COMMENT 'Order transaction number (for third party payment))', `trade_body` string NULL COMMENT 'Order description(For third party payment)', `create_time` timestamp(3) NULL COMMENT 'Creation time', `operate_time` timestamp(3) NULL COMMENT 'Operation time', `expire_time` timestamp(3) NULL COMMENT 'Failure time', `tracking_no` string NULL COMMENT 'Logistics order No', `parent_order_id` bigint NULL COMMENT 'Parent order number', `img_url` string NULL COMMENT 'Picture path', `province_id` int NULL COMMENT 'region', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.order_info' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ); CREATE TABLE `order_status_log_topic` ( `id` int NOT NULL , `order_id` int NULL, `order_status` int NULL, `operate_time` timestamp(3) NULL, PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.order_status_log' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ); CREATE TABLE `payment_info_topic` ( `id` bigint NOT NULL COMMENT 'number', `out_trade_no` string NULL COMMENT 'External business No', `order_id` string NULL COMMENT 'Order No', `user_id` string NULL COMMENT 'User number', `alipay_trade_no` string NULL COMMENT 'Alipay transaction flow code', `total_amount` decimal(16,2) NULL COMMENT 'Payment amount', `subject` string NULL COMMENT 'Transaction content', `payment_type` string NULL COMMENT 'Payment method', `payment_time` string NULL COMMENT 'Payment time', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.payment_info' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ); CREATE TABLE `sku_info_topic` ( `id` bigint NOT NULL COMMENT 'skuid(itemID)', `spu_id` bigint NULL COMMENT 'spuid', `price` decimal(10,0) NULL COMMENT 'Price', `sku_name` string NULL COMMENT 'sku name', `sku_desc` string NULL COMMENT 'Product specification description', `weight` decimal(10,2) NULL COMMENT 'weight', `tm_id` bigint NULL COMMENT 'brand(redundancy)', `category3_id` bigint NULL COMMENT 'Three level classification id(redundancy)', `sku_default_img` string NULL COMMENT 'Default display picture(redundancy)', `create_time` timestamp(3) NULL COMMENT 'Creation time', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.sku_info' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ); CREATE TABLE `user_info_topic` ( `id` bigint NOT NULL COMMENT 'number', `login_name` string NULL COMMENT 'User name', `nick_name` string NULL COMMENT 'User nickname', `passwd` string NULL COMMENT 'User password', `name` string NULL COMMENT 'User name', `phone_num` string NULL COMMENT 'cell-phone number', `email` string NULL COMMENT 'mailbox', `head_img` string NULL COMMENT 'head portrait', `user_level` string NULL COMMENT 'User level', `birthday` date NULL COMMENT 'User birthday', `gender` varchar(1) NULL COMMENT 'Gender M male,F female', `create_time` timestamp(3) NULL COMMENT 'Creation time', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.user_info' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' );
insert statement to import mysql binlog data into the topic corresponding to kafka
%flink.ssql(runAsOne=true) insert into base_category1_topic select * from base_category1; insert into base_category2_topic select * from base_category2; insert into base_category3_topic select * from base_category3; insert into base_province_topic select * from base_province; insert into base_region_topic select * from base_region; insert into base_trademark_topic select * from base_trademark; insert into date_info_topic select * from date_info; insert into holiday_info_topic select * from holiday_info; insert into holiday_year_topic select * from holiday_year; insert into order_detail_topic select * from order_detail; insert into order_info_topic select * from order_info; insert into order_status_log_topic select * from order_status_log; insert into payment_info_topic select * from payment_info; insert into sku_info_topic select * from sku_info; insert into user_info_topic select * from user_info;
Import dimension table data into hudi
Set my5 base_ Province and my1 base_ Region two region dimension tables are written into hudi COW table
%flink.ssql drop table if exists base_province_topic_source; drop table if exists base_province_hudi; CREATE TABLE `base_province_topic_source` ( `id` int NULL COMMENT 'id', `name` string NULL COMMENT 'Province name', `region_id` int NULL COMMENT 'Large area id', `area_code` string NULL COMMENT 'Administrative location code' ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_province' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `base_province_hudi` ( `id` int NULL COMMENT 'id', `name` string NULL COMMENT 'Province name', `region_id` int NULL COMMENT 'Large area id', `area_code` string NULL COMMENT 'Administrative location code', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://host117:8020/realtime-demo-2/base_province_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into base_province_hudi select * from base_province_topic_source;
%flink.ssql drop table if exists base_region_topic_source; drop table if exists base_region_hudi; CREATE TABLE `base_region_topic_source` ( `id` int NOT NULL COMMENT 'Large area id', `region_name` string NULL COMMENT 'Region name', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_region' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `base_region_hudi` ( `id` int NOT NULL COMMENT 'Large area id', `region_name` string NULL COMMENT 'Region name', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://host117:8020/realtime-demo-2/base_region_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into base_region_hudi select * from base_region_topic_source;
Use the above two dimension tables to create dim_province table
%flink.ssql DROP TABLE IF EXISTS dim_province_hudi; create table dim_province_hudi ( province_id INT, province_name STRING, area_code STRING, region_id INT, region_name STRING , PRIMARY KEY (province_id) NOT ENFORCED ) with ( 'connector' = 'hudi', 'path' = 'hdfs://host117:8020/realtime-demo-2/dim_province_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'province_id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into dim_province_hudi SELECT bp.id AS province_id, bp.name AS province_name, bp.area_code AS area_code, br.id AS region_id, br.region_name AS region_name FROM base_region_hudi br JOIN base_province_hudi bp ON br.id= bp.region_id ;
Add item dimension table my5 base_ Category1 and my5 base_ Category2 write the information of the two commodity dimension tables into the hudi COW table
%flink.ssql drop table if exists base_category1_topic_source; drop table if exists base_category1_hudi; CREATE TABLE `base_category1_topic_source` ( `id` bigint NOT NULL COMMENT 'number', `name` string NOT NULL COMMENT 'Classification name', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category1' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `base_category1_hudi` ( `id` bigint NOT NULL COMMENT 'number', `name` string NOT NULL COMMENT 'Classification name', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://host117:8020/realtime-demo-2/base_category1_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into base_category1_hudi select * from base_category1_topic_source;
%flink.ssql drop table if exists base_category2_topic_source; drop table if exists base_category2_hudi; CREATE TABLE `base_category2_topic_source` ( `id` bigint NOT NULL COMMENT 'number', `name` string NOT NULL COMMENT 'Classification name', `category1_id` bigint NULL COMMENT 'Primary classification number', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category2' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `base_category2_hudi` ( `id` bigint NOT NULL COMMENT 'number', `name` string NOT NULL COMMENT 'Classification name', `category1_id` bigint NULL COMMENT 'Primary classification number', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://host117:8020/realtime-demo-2/base_category2_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into base_category2_hudi select * from base_category2_topic_source;
%flink.ssql drop table if exists base_category3_topic_source; drop table if exists base_category3_hudi; CREATE TABLE `base_category3_topic_source` ( `id` bigint NOT NULL COMMENT 'number', `name` string NOT NULL COMMENT 'Classification name', `category2_id` bigint NULL COMMENT 'Secondary classification number', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.base_category3' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `base_category3_hudi` ( `id` bigint NOT NULL COMMENT 'number', `name` string NOT NULL COMMENT 'Classification name', `category2_id` bigint NULL COMMENT 'Secondary classification number', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://host117:8020/realtime-demo-2/base_category3_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into base_category3_hudi select * from base_category3_topic_source;
Import item table into hudi
%flink.ssql drop table if exists sku_info_topic_source; drop table if exists sku_info_topic_hudi; CREATE TABLE `sku_info_topic_source` ( `id` bigint NOT NULL COMMENT 'skuid(itemID)', `spu_id` bigint NULL COMMENT 'spuid', `price` decimal(10,0) NULL COMMENT 'Price', `sku_name` string NULL COMMENT 'sku name', `sku_desc` string NULL COMMENT 'Product specification description', `weight` decimal(10,2) NULL COMMENT 'weight', `tm_id` bigint NULL COMMENT 'brand(redundancy)', `category3_id` bigint NULL COMMENT 'Three level classification id(redundancy)', `sku_default_img` string NULL COMMENT 'Default display picture(redundancy)', `create_time` timestamp(3) NULL COMMENT 'Creation time', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.sku_info' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `sku_info_topic_hudi` ( `id` bigint NOT NULL COMMENT 'skuid(itemID)', `spu_id` bigint NULL COMMENT 'spuid', `price` decimal(10,0) NULL COMMENT 'Price', `sku_name` string NULL COMMENT 'sku name', `sku_desc` string NULL COMMENT 'Product specification description', `weight` decimal(10,2) NULL COMMENT 'weight', `tm_id` bigint NULL COMMENT 'brand(redundancy)', `category3_id` bigint NULL COMMENT 'Three level classification id(redundancy)', `sku_default_img` string NULL COMMENT 'Default display picture(redundancy)', `create_time` timestamp(3) NULL COMMENT 'Creation time', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'hudi', 'path' = 'hdfs://host117:8020/realtime-demo-2/sku_info_topic_hudi', 'table.type' = 'COPY_ON_WRITE', 'write.precombine.field' = 'id', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into sku_info_topic_hudi select * from sku_info_topic_source;
Based on the above steps, we synchronize the basic data of the commodity dimension table to hudi. Similarly, we use the commodity dimension table to create dim_sku_info view
%flink.ssql drop view if exists dim_sku_info; CREATE VIEW dim_sku_info AS SELECT si.id AS id, si.sku_name AS sku_name, si.category3_id AS c3_id, si.weight AS weight, si.tm_id AS tm_id, si.price AS price, si.spu_id AS spu_id, c3.name AS c3_name, c2.id AS c2_id, c2.name AS c2_name, c3.id AS c1_id, c3.name AS c1_name FROM sku_info_topic_hudi si JOIN base_category3_hudi c3 ON si.category3_id = c3.id JOIN base_category2_hudi c2 ON c3.category2_id =c2.id JOIN base_category1_hudi c1 ON c2.category1_id = c1.id ;
DWD layer data processing
After the above steps, we have prepared the dimension table for use. Next, we will process the original data of ODS and process it into a schedule of DWD layer.
%flink.ssql drop table if exists ods_order_detail_topic; drop table if exists ods_order_info_topic; drop table if exists dwd_paid_order_detail_hudi; CREATE TABLE `ods_order_detail_topic` ( `id` bigint NOT NULL COMMENT 'number', `order_id` bigint NULL COMMENT 'Order No', `sku_id` bigint NULL COMMENT 'sku_id', `sku_name` string NULL COMMENT 'sku Name (redundant))', `img_url` string NULL COMMENT 'Picture name (redundant))', `order_price` decimal(10,2) NULL COMMENT 'Purchase price(When placing an order sku Price)', `sku_num` int NULL COMMENT 'Number of purchases', `create_time` timestamp(3) NULL COMMENT 'Creation time', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.order_detail' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE `ods_order_info_topic` ( `id` bigint NOT NULL COMMENT 'number', `consignee` string NULL COMMENT 'consignee', `consignee_tel` string NULL COMMENT 'Recipient phone', `total_amount` decimal(10,2) NULL COMMENT 'Total amount', `order_status` string NULL COMMENT 'Order status', `user_id` bigint NULL COMMENT 'user id', `payment_way` string NULL COMMENT 'payment method', `delivery_address` string NULL COMMENT 'Shipping address', `order_comment` string NULL COMMENT 'Order remarks', `out_trade_no` string NULL COMMENT 'Order transaction number (for third party payment))', `trade_body` string NULL COMMENT 'Order description(For third party payment)', `create_time` timestamp(3) NULL COMMENT 'Creation time', `operate_time` timestamp(3) NULL COMMENT 'Operation time', `expire_time` timestamp(3) NULL COMMENT 'Failure time', `tracking_no` string NULL COMMENT 'Logistics order No', `parent_order_id` bigint NULL COMMENT 'Parent order number', `img_url` string NULL COMMENT 'Picture path', `province_id` int NULL COMMENT 'region', PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector' = 'kafka' ,'topic' = 'my5.order_info' ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667' ,'format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'properties.group.id' = 'hudiGroup' ); CREATE TABLE dwd_paid_order_detail_hudi ( detail_id BIGINT, order_id BIGINT, user_id BIGINT, province_id INT, sku_id BIGINT, sku_name STRING, sku_num INT, order_price DECIMAL(10,0), create_time TIMESTAMP(3), pay_time TIMESTAMP(3), primary key (detail_id) not enforced ) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://host117:8020/realtime-demo-2/dwd_paid_order_detail_hudi', 'scan.startup.mode' = 'earliest-offset', 'table.type' = 'MERGE_ON_READ', 'compaction.async.enabled' = 'false', 'read.streaming.enabled' = 'true' );
%flink.ssql insert into dwd_paid_order_detail_hudi SELECT od.id, oi.id order_id, oi.user_id, oi.province_id, od.sku_id, od.sku_name, od.sku_num, od.order_price, oi.create_time, oi.operate_time FROM ( SELECT * FROM ods_order_info_topic WHERE order_status = '2' -- Paid ) oi JOIN ( SELECT * FROM ods_order_detail_topic ) od ON oi.id = od.order_id;
ADS layer data
After the above steps, we created a dwd_paid_order_detail the schedule and store it in hudi. Next, we will use this detail wide table and dimension table to JOIN to get our ADS application layer data.
ads_province_index_hudi
%flink.ssql drop table if exists ads_province_index_hudi; drop table if exists tmp_province_index_hudi; -- --------------------------------- -- use DDL establish MySQL Medium ADS Layer table -- Indicator: 1.Orders per province per day -- 2.Order amount per province per day -- --------------------------------- CREATE TABLE ads_province_index_hudi( province_id INT, area_code STRING, province_name STRING, region_id INT, region_name STRING, order_amount DECIMAL(10,2), order_count BIGINT, dt STRING, PRIMARY KEY (province_id, dt) NOT ENFORCED ) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://host117:8020/realtime-demo-2/ads_province_index_hudi', 'table.type' = 'MERGE_ON_READ', 'compaction.async.enabled' = 'false', 'read.streaming.enabled' = 'true' ); -- --------------------------------- -- tmp_province_index -- Order summary temporary table -- --------------------------------- CREATE TABLE tmp_province_index_hudi( province_id INT, order_count BIGINT,-- Number of orders order_amount DECIMAL(10,2), -- Order amount pay_date DATE, primary key(province_id) not enforced )WITH ( 'connector' = 'hudi', 'path' = 'hdfs://host117:8020/realtime-demo-2/tmp_province_index_hudi', 'table.type' = 'MERGE_ON_READ', 'compaction.async.enabled' = 'false', 'read.streaming.enabled' = 'true' );
%flink.ssql -- --------------------------------- -- tmp_province_index -- Order summary temporary table data loading -- --------------------------------- INSERT INTO tmp_province_index_hudi SELECT province_id, count(distinct order_id) order_count,-- Number of orders sum(order_price * sku_num) order_amount, -- Order amount TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date FROM dwd_paid_order_detail_hudi GROUP BY province_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql INSERT INTO ads_province_index_hudi SELECT pc.province_id, dp.area_code, dp.province_name, dp.region_id, dp.region_name, pc.order_amount, pc.order_count, cast(pc.pay_date as VARCHAR) FROM tmp_province_index_hudi pc JOIN dim_province_hudi as dp ON dp.province_id = pc.province_id;
View ADS of ADS layer_ province_ index_ Hudi table data:

ads_sku_index_hudi
%flink.ssql -- --------------------------------- -- use DDL establish hudi Medium ADS Layer table -- Indicator: 1.Number of orders corresponding to each commodity per day -- 2.Order amount corresponding to each commodity per day -- 3.Quantity of each item per day -- --------------------------------- drop table if exists ads_sku_index_hudi; CREATE TABLE ads_sku_index_hudi ( sku_id BIGINT, sku_name VARCHAR, weight DOUBLE, tm_id BIGINT, price DOUBLE, spu_id BIGINT, c3_id BIGINT, c3_name VARCHAR , c2_id BIGINT, c2_name VARCHAR, c1_id BIGINT, c1_name VARCHAR, order_amount DOUBLE, order_count BIGINT, sku_count BIGINT, dt varchar, PRIMARY KEY (sku_id,dt) NOT ENFORCED ) with ( 'connector' = 'hudi', 'path' = 'hdfs://host117:8020/realtime-demo-2/ads_sku_index_hudi', 'table.type' = 'MERGE_ON_READ', 'compaction.async.enabled' = 'false', 'read.streaming.enabled' = 'true' ); -- --------------------------------- -- tmp_sku_index -- Commodity index statistics -- --------------------------------- drop table if exists tmp_sku_index_hudi; CREATE TABLE tmp_sku_index_hudi( sku_id BIGINT, order_count BIGINT,-- Number of orders order_amount DECIMAL(10,2), -- Order amount order_sku_num BIGINT, pay_date DATE, PRIMARY KEY (sku_id) NOT ENFORCED )WITH ( 'connector' = 'hudi', 'path' = 'hdfs://host117:8020/realtime-demo-2/tmp_sku_index_hudi', 'table.type' = 'MERGE_ON_READ', 'compaction.async.enabled' = 'false', 'read.streaming.enabled' = 'true' );
%flink.ssql -- --------------------------------- -- tmp_sku_index -- Data loading -- --------------------------------- INSERT INTO tmp_sku_index_hudi SELECT sku_id, count(distinct order_id) order_count, -- Number of orders sum(order_price * sku_num) order_amount, -- Order amount sum(sku_num) order_sku_num, TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date FROM dwd_paid_order_detail_hudi GROUP BY sku_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql INSERT INTO ads_sku_index_hudi SELECT sku_id , sku_name , weight , tm_id , price , spu_id , c3_id , c3_name, c2_id , c2_name , c1_id , c1_name , sc.order_amount, sc.order_count , sc.order_sku_num , cast(sc.pay_date as VARCHAR) FROM tmp_sku_index_hudi sc JOIN dim_sku_info ds ON ds.id = sc.sku_id
%flink.ssql select * from ads_sku_index_hudi;

This article is the original article of "xiaozhch5", a blogger from big data to artificial intelligence. It follows the CC 4.0 BY-SA copyright agreement. Please attach the original source link and this statement for reprint.
Original link: https://lrting.top/uncategorized/2744/