Blink SQL time attribute

Time attribute

Flink supports three time concepts related to stream data processing: Processing Time, Event Time and Ingestion Time.

Blink SQL only supports two time types: Event Time and Processing Time:

  • Event Time: Event Time (usually the most original creation time of data). Event Time must be the data provided in the data store.
  • Processing Time: the local system time that the system processes events, in milliseconds.

Event Time

Event Time is also called Row Time. The EventTime property must be declared in the source table DDL. You can declare a field in the source table as Event Time. At present, only TIMESTAMP type (LONG type will be supported in the future) can be declared as Row Time field. If the column to be declared as Event Time in the source table is not of TIMESTAMP type, you need to construct a TIMESTAMP type column based on the existing column with the help of calculated columns.

Due to the disorder of data itself, network jitter (the change of data transmission delay caused by network congestion) or other reasons, the order of data arrival and the order of processing may be inconsistent (disorder). Therefore, you need to define a Watermark calculation method in clear text before you can define a Row Time field.

An example of window function aggregation based on Event Time is as follows.

CREATE TABLE tt_stream (
  a VARCHAR,
  b VARCHAR,
  ts TIMESTAMP,
  WATERMARK wk1 FOR ts as withOffset (ts, 1000) --Watermark Calculation method.
) WITH (
  type = 'sls',
  topic = '<yourTopicName>',
  accessId = '<yourAccessId>',
  accessKey = '<yourAccessSecret>'
);
CREATE TABLE rds_output (
  id VARCHAR,
  win_start TIMESTAMP,
  win_end TIMESTAMP,
  cnt BIGINT
) WITH (
  type = 'rds',
  url = 'jdbc:mysql://****3306/test',
  tableName = '<yourTableName>',
  userName = '<yourUserName>',
  password = '<yourPassword>'
);
INSERT
  INTO rds_output
SELECT
  a AS id,
  SESSION_START (ts, INTERVAL '1' SECOND) AS win_start,
  SESSION_END (ts, INTERVAL '1' SECOND) AS win_end,
  COUNT (a) AS cnt
FROM
  tt_stream
GROUP
  BY SESSION (ts, INTERVAL '1' SECOND),
  a

Processing Time

Processing Time is generated by the system and is not in your original data. You need to clearly define a Processing Time column in the declaration of the data source table.

filedName as PROCTIME()

An example of window function aggregation based on Processing Time is as follows.

CREATE TABLE mq_stream (
    a VARCHAR,
    b VARCHAR,
    c BIGINT,
    ts AS PROCTIME () --In the declaration of the data source table, a Processing Time Columns.
  ) WITH (
    type = 'mq',
    topic = '<yourTopic>',
    accessId = '<yourAccessId>',
    accessKey = '<yourAccessSecret>'
  );
CREATE TABLE rds_output (
  id VARCHAR,
  win_start TIMESTAMP,
  win_end TIMESTAMP,
  cnt BIGINT
) with (
  type = 'rds',
  url = '<yourDatebaseURL>',
  tableName = '<yourDatabasTableName>',
  userName = '<yourUserName>',
  password = '<yourPassword>'
);
INSERT
  INTO rds_output
SELECT
  a AS id,
  SESSION_START (ts, INTERVAL '1' SECOND) AS win_start,
  SESSION_END (ts, INTERVAL '1' SECOND) AS win_end,
  COUNT (a) AS cnt
FROM
  mq_stream
GROUP
  BY SESSION (ts, INTERVAL '1' SECOND),
  a       

Time attribute field transfer

The time attribute field will lose its time attribute after the following operations:

  • Perform GROUP BY (except GROUP BY in scroll window, sliding window or session window) operation on fields other than time attribute fields.
  • Double stream JOIN operation.
  • Match in complex event processing (CEP) statements_ Recognize operation.
  • The PARTITION BY operation in the OVER window.
  • Union operation. UNION = RETRACT+UNION ALL.

If you continue to use the time attribute field for window function operation after the above operations, a similar window will appear apache. flink. table. api. ValidationException: Window can only be defined over a time attribute column. Error reporting.

Keywords: Database SQL Alibaba Cloud flink

Added by ju8ular1 on Sun, 02 Jan 2022 02:04:20 +0200