Complete solution of big data Flink e-commerce real-time warehouse actual combat project process (final chapter) simple application of Flink SQL

Flink Sql introduction

In the actual development process, I personally focus on Stream API, and Flink Sql does not write much. Here I mainly refer to the original project code. I am not very familiar with the specific details. I suggest you can learn about Flink Sql alone; To put it simply, first obtain the required field data from the corresponding data table (mainly the order wide table of DWM layer) through Sql statement to form a dynamic table, then convert the dynamic table into the form of flow, and then save it to ClickHouse.
Points to note:
1. Flink Sql and MySql are written in slightly different languages;
2. When converting to stream, pay attention to the selected mode. Flink has three modes:
Convert Table to DataStream
Append Mode
– used in scenarios where the table will only be changed by the Insert operation

DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class); 

Retract Mode
– for any scenario. Some are similar to the Retract mode in the update mode. It has only Insert and Delete operations.
– a Boolean type identification bit (the first field returned) will be added to the obtained data to indicate whether it is new data (Insert) or deleted data (Delete)

DataStream<Tuple2<Boolean, Row>> aggResultStream = tableEnv.toRetractStream(aggResultTable , Row.class);

Regional theme wide table production

I won't talk about this table alone, because the design is relatively simple, and I have written the specific thinking code very clearly in the project, that is, making zoning statistics according to provinces and cities. Here, the corresponding is the heat map distribution of the map of China of the project process; The total of order amount is the total value of large screen amount;

Keywords wide table making

Keyword display is also the result of dimension aggregation. The size of keywords is determined according to the size of aggregation.
The first important source of keywords is the user's search in the search bar, and the other is from the statistics with commodity as the theme
 Get keywords from.
    //Here is the user's search keywords in the search bar, from dwd_page_log: "page_id":"good_list","item": "book";

Therefore, we need to segment long text into words one by one. This word segmentation technology may be used in search engines. For Chinese word segmentation, the current search engines basically use the third-party word segmentation. What are we doing
 You can also use word segmentation consistent with that in the search engine, IK. 

//How to complete word segmentation from Sql statements? User defined functions are used here;

    //Case:
    //@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
public static class OverloadedFunction extends TableFunction<Row> {

  public void eval(int a, int b) {
    collect(Row.of("Sum", a + b));
  }

  // overloading of arguments is still possible
  public void eval() {
    collect(Row.of("Empty args", -1));
  }
}

Here is UDTF An annotation for function definition:
//@Functionhint (output = @ datatypehint ("row < word string >")) specifies the type of output;
 Decoupling type derivation and evaluation method, type derivation depends entirely on FunctionHint
//@FunctionHint(
//  input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
//  output = @DataTypeHint("INT")
//)    

//technological process:
//1. Create environment and register user-defined functions; (note that UDTF is used here. It can be seen from the subsequent aggregation idea display that it is a multi line output result;
//2. Create a dynamic table;
//3. Query data from dynamic table; (where page['page_id']='good_list' and page['item'] IS NOT NULL) the search term of the product page;
//4. Polymerization;
//5. Convert to stream;
//6. Write clickhouse;


//Click house to create a keyword topic wide table:
create table keyword_stats_0709 (
 stt DateTime,
 edt DateTime,
 keyword String ,
 source String ,
 ct UInt64 ,
 ts UInt64
)engine =ReplacingMergeTree( ts)
 partition by toYYYYMMDD(stt)
 order by ( stt,edt,keyword,source );  


//        General join usage in select function:
SELECT *
    FROM Orders
    INNER JOIN Product
    ON Orders.product_id = Product.id

    select Table functions in join method;
    SELECT order_id, res
    FROM Orders,     //Orders is a Table;
LATERAL TABLE(table_func(order_id)) t(res)
    //Final table indicates association;


dwd_page_log Data: what we need is one of them"page_id":"good_list","item":"books",Contents of the;
 page:2> {"common":{"ar":"530000","uid":"36","os":"Android 11.0","ch":"wandoujia","is_new":"0",
       "md":"Xiaomi ten Pro ","mid":"mid_9","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"good_list","item":"books",
       "during_time":7183,"item_type":"keyword","last_page_id":"search"},"displays":[{"display_type":"recommend",
       "item":"1","item_type":"sku_id","pos_id":5,"order":1},{"display_type":"recommend","item":"5",
       "item_type":"sku_id","pos_id":2,"order":2},{"display_type":"query","item":"2","item_type":"sku_id","pos_id":2,
       "order":3},{"display_type":"promotion","item":"5","item_type":"sku_id","pos_id":3,"order":4},
       {"display_type":"promotion","item":"9","item_type":"sku_id","pos_id":4,"order":5},{"display_type":"query",
       "item":"9","item_type":"sku_id","pos_id":5,"order":6}],"ts":1626684732000}

//Overall test
➢ start-up ZK,Kafka,logger.sh,ClickHouse
➢ function BaseLogApp
➢ function KeywordStatsApp
➢ function rt_applog Under directory jar package 
➢ View console output
➢ see ClickHouse in keyword_stats_0709 Table data   
    //The result type is:
        // >>>>: 4 > keywordstats (keyword = box, ct=2, source=SEARCH, stt=2021-07-19 12:58:30, edt=2021-07-19 12:58:40, ts=162675712 ninety 00)   
    //The data in clickhouse is:
    ────────────────stt─┬─────────────────edt─┬─keyword─┬─source─┬─ct─┬────────────ts─┐
│ 2021-07-19 13:00:50 │ 2021-07-19 13:01:00 │ television    │ SEARCH │  1 │ 1626757293000 │
│ 2021-07-19 13:01:00 │ 2021-07-19 13:01:10 │ books    │ SEARCH │  2 │ 1626757293000 │
│ 2021-07-19 13:01:00 │ 2021-07-19 13:01:10 │ television    │ SEARCH │  1 │ 1626757293000 │
│ 2021-07-19 13:01:00 │ 2021-07-19 13:01:10 │ Box    │ SEARCH │  1 │ 1626757293000 │
│ 2021-07-19 13:01:10 │ 2021-07-19 13:01:20 │ television    │ SEARCH │  1 │ 1626757293000 │

    
//Get keywords from commodity themed statistics.
//Obtain the statistical table of commodity keywords, click times, order times and added shopping times from the commodity theme. Save in clickhouse table keyword_stats_0709;
    
    //Display of aggregation calculation ideas:
After the commodity subject statistics are completed, UDTF The design idea diagram is shown as follows:

product_stats_0709
	commodity SPU				Number of hits		Order times		Additional purchase times	
	Mi phones			  50				80           100


After word segmentation, the result is obtained
	key word				Number of hits		Order times		Additional purchase times	
	millet 				  50				80           100
	mobile phone 				  50				80           100
	
The goal is statistics: keyword heat--->Keyword_Stats_0820
	key word 				 Number of operations		    source		
	millet 				  50				Click
	millet 				  80				Order
	millet  				  100  				Cart
	mobile phone 				  50				Click
	mobile phone 				  80				Order
	mobile phone  				  100  				Cart
	
Get hot words		Search 10    Click 9 to place an order 8 to add purchase 7 
	key word 				 Number of operations			source	       score   
	millet				  90				search          10      	
	millet 				  50				Click          9      
	millet 				  80				Order	 8
	millet  				  100  				Cart	  7
	mobile phone 				  50				Click
	mobile phone 				  80				Order
	mobile phone  				  100  				Cart

	sum
	key word     90 * 10 
			   50 * 9 
			   80 * 8
			   100 * 6     
    
Overall test
➢ start-up ZK,Kafka,logger.sh,ClickHouse,Redis,HDFS,Hbase,Maxwell
➢ function BaseLogApp
➢ function BaseDBApp
➢ function OrderWideApp
➢ function PaymentWideApp
➢ function ProductsStatsApp
➢ function KeywordStats4ProductApp
➢ function rt_applog Under directory jar package
➢ function rt_dblog Under directory jar package
➢ View console output
➢ see ClickHouse in products_stats_0709 Table data

keywordStatsProductDataStream The data type is:
//Corresponding to click, order and shopping cart respectively;
1> KeywordStats(keyword=10x, ct=2, source=CLICK, stt=2021-07-20 13:29:10, edt=2021-07-20 13:29:20, ts=1626758982000)
1> KeywordStats(keyword=redmi, ct=12, source=ORDER, stt=2021-07-20 13:28:50, edt=2021-07-20 13:29:00, ts=1626758978000)
1> KeywordStats(keyword=clear, ct=178, source=CART, stt=2021-07-19 21:58:10, edt=2021-07-19 21:58:20, ts=1626758976000)

clickhouse The data in is:
┌─────────────────stt─┬─────────────────edt─┬─keyword───┬─source─┬─ct─┬────────────ts─┐
│ 2021-07-20 13:27:40 │ 2021-07-20 13:27:50 │ 10        │ CLICK  │  1 │ 1626758976000 │
│ 2021-07-20 13:27:40 │ 2021-07-20 13:27:50 │ 10x       │ CLICK  │  1 │ 1626758977000 │
│ 2021-07-20 13:27:40 │ 2021-07-20 13:27:50 │ redmi     │ CLICK  │  1 │ 1626758977000 │
│ 2021-07-20 13:27:40 │ 2021-07-20 13:27:50 │ millet      │ CLICK  │  1 │ 1626758976000 │
│ 2021-07-20 13:27:50 │ 2021-07-20 13:28:00 │ 10        │ CLICK  │  2 │ 1626758977000 │
│ 2021-07-20 13:27:50 │ 2021-07-20 13:28:00 │ 10        │ ORDER  │  9 │ 1626758977000 │
//There are few purchase order data, so it's hard to find here, but there are three kinds of data;            

summary

The whole project is basically over here. I won't explain the subsequent visualization links and the preparation of data interface here, because it involves a simple use of SpringBoot. If you are interested, you can also check the code by yourself. It is all a simple data statistics process, and then convert it into json format according to the format required by sugar; If you are interested, you can study it. The technology is not difficult. It is all hard work and meticulous work.
If you are interested in this project or Flink technology, you can continue to pay attention to me. In the future, I will gradually update the Flink framework and some learning methods and routes of real-time and streaming processing. I hope you can make progress together~~

Keywords: flink

Added by dumdumsareyum on Sat, 25 Dec 2021 11:55:52 +0200