Build SQL client streaming application based on Flink-1.11 local source code compilation

preface

I want to try how to run the sql of flink. Last year, I saw that the big brother JarkWu shared it, but I haven't actually operated it. Recently, I'm interested.
Original Jarkwu sharing Demo: building streaming applications based on Flink SQL.
JarkWu github flink-sql-demo
The boss is based on flink1 Version 10 is built, but the released release package is not available now. Let's compile and run the local source code. This time, it is based on flink1.0 11, because the boss provides docker compose in version 1.11 😃, Save a lot of things

prepare

  • A Linux or MacOS computer with docker and jdk8 (windows should also work)
  • Download the flink source code (if the github source code is downloaded slowly, change the hosts, and the git protocol can also be downloaded)
git clone git://github.com/apache/flink.git
  • All the components relied on in this practical demonstration are arranged in the container, so it can be started with one click through docker compose. You can download the docker - compose automatically through the wget command YML files can also be downloaded manually. (I copied this paragraph. Thank you again for your docker-compose.yml file)
mkdir  flink-demo; cd flink-demo
wget https://github.com/wuchong/flink-sql-demo/blob/v1.11-EN/docker-compose.yml

Start the container using docker compose

The docker components demonstrated by the boss are all ready and can also be used. Don't worry about downloading.
At docker compose In the YML file directory, execute docker compose up - D to start the container. If necessary, execute the docker compose down command.
If the startup is successful, you can use the docker ps command to view the container operation or use the client to view it. These are mainly the successful startup

Flink source code compilation and operation

idea preparation

  • Download the scala plug-in
  • Download scala package
  • Switch branch 1.11
  • Download the missing jar package for source code compilation (there are countless pits, and time is consumed here. aliyun warehouse can't pull it, so it needs to be downloaded manually) kafka-schema-registory-client-4.1.0.jar

Compile source code

If the maven environment variable is configured, directly switch to Terminal to execute the command

mvn clean install package -DskipTests

Wait for successful compilation, about 10min +, leave a message if there is a problem

After successful compilation

Copy the file, and copy the file under build target to the file under Flink bin of Flink dist, which is easy!

Non source startup

Execute the command to start directly in the Flink bin directory. You cannot debug directly
Start flick

./bin/start-cluster.sh

localhost:8081 check whether the startup is successful

Start the flynk SQL client

./bin/sql-client.sh embedded

The console displays that SQL client is started successfully

Source startup

Three main startup classes. After successful startup, you can debug locally
Start flick
StandaloneSessionClusterEntrypoint is the key to starting jobmanager
TaskManagerRunner is the to start taskmanager

org.apache.flink.runtime.taskexecutor.TaskManagerRunner
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint

Start SQL client

org.apache.flink.table.client.SqlClient

Start parameter configuration

After analyzing the startup script, I get the following startup parameters
Flick start
StandaloneSessionClusterEntrypoint

vm options: -XX:MaxPermSize=256m -Dlog.file="/Users/wigen/workspace/github/flink/flink-dist/src/main/flink-bin/log/flink-wigen-jm.log" -Dlog4j.configuration=file:"/Users/wigen/workspace/github/flink/flink-dist/src/main/flink-bin/conf/log4j.properties" -Dlog4j.configurationFile=file:"/Users/wigen/workspace/github/flink/flink-dist/src/main/flink-bin/conf/log4j.properties" -Dlogback.configurationFile=file:"/Users/wigen/workspace/github/flink/flink-dist/src/main/flink-bin/conf/logback.xml" -cp /Users/wigen/workspace/github/flink/flink-dist/src/main/flink-bin/lib/*
program argument: --configDir /Users/wigen/workspace/github/flink/flink-dist/src/main/flink-bin/conf/ > /Users/wigen/workspace/github/flink/flink-dist/src/main/flink-bin/log/job-log.out 2>&1

TaskManagerRunner

vm options:-XX:MaxPermSize=256m -Dlog.file="/Users/wigen/workspace/github/flink/flink-dist/src/main/flink-bin/log/flink-wigen-tm.log" -Dlog4j.configuration=file:"/Users/wigen/workspace/github/flink/flink-dist/src/main/flink-bin/conf/log4j.properties" -Dlog4j.configurationFile=file:"/Users/wigen/workspace/github/flink/flink-dist/src/main/flink-bin/conf/log4j.properties" -Dlogback.configurationFile=file:"/Users/wigen/workspace/github/flink/flink-dist/src/main/flink-bin/conf/logback.xml" -cp /Users/wigen/workspace/github/flink/flink-dist/src/main/flink-bin/lib/*
program arguments: -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.network.min=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.cpu.cores=10.0 -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.memory.task.off-heap.size=0b --configDir /Users/wigen/workspace/github/flink/flink-dist/src/main/flink-bin/conf/ > /Users/wigen/workspace/github/flink/flink-dist/src/main/flink-bin/log/job-log.out 2>&1

The above startup parameters need to replace the flink directory address
After the startup is completed, localhost:8081 check whether the startup is successful. If it is not successful, check the startup log in the log under Flink bin

SQL client startup
Still use the previous startup mode/ sql-client.sh embedded. You can also see SQL client SH to configure the startup parameters of SqlClient

test data

The statement construction sentence has been slightly changed because the mq field TS sent by kafka is in the format of "yyyy MM DD HH: mm: SS". The original statement construction can not be converted by parsing ts, so it has been adjusted

CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts STRING,
    ds AS TO_TIMESTAMP(ts),
    proctime as PROCTIME(),   -- A processing time column is generated by calculating the column
    WATERMARK FOR ds as ds - INTERVAL '5' SECOND  -- stay ts Upper definition watermark,ts Become event time column
) WITH (
    'connector.type' = 'kafka',  -- use kafka connector
    'connector.version' = 'universal',  -- kafka edition, universal Support 0.11 Version above
    'connector.topic' = 'user_behavior',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset',  -- From start offset Start reading
    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper address
    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker address
    'format.type' = 'json'  -- The data source format is json
);
CREATE TABLE buy_cnt_per_hour ( 
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch', -- use elasticsearch connector
    'connector.version' = '6',  -- elasticsearch Version 6 can support es 6+ And 7+ Version of
    'connector.hosts' = 'http://Localhost: 9200 ', -- elasticsearch address
    'connector.index' = 'buy_cnt_per_hour',  -- elasticsearch Index name, which is equivalent to the table name of the database
    'connector.document-type' = 'user_behavior', -- elasticsearch of type,Equivalent to the database name
    'connector.bulk-flush.max-actions' = '1',  -- Every data is refreshed
    'format.type' = 'json',  -- Output data format json
    'update-mode' = 'append'
);

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ds, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ds, INTERVAL '1' HOUR);


Use show tables; Command to view the table creation and submit SQL jobs. At this time, you can view the submitted jobs on the flink console. If the operation fails, you can view the log

Push kafka data
At docker compose YML directory execution

docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'

Or start datagen on docker dashboard

Localhost: kibana view data on 5601

end

So far, I have a general understanding of the whole Flink SQL operation process. If you want to further understand the source code, you can start debug with the source code

Keywords: Scala Docker kafka flink

Added by Smudly on Sat, 18 Dec 2021 13:25:24 +0200