Using Blink SQL+UDAF to realize difference aggregation calculation

According to the real business requirements of a power grid company, this case realizes the difference aggregation calculation on the real-time flow through Blink SQL+UDAF. Through this case, the readers are familiar with UDAF writing and understand the method invocation relationship and order in UDAF.
Thank you @ commander for your guidance in the implementation process. The level of the author is limited. If there is any mistake, please point out.

1, Customer needs

The grid company collects the meter data of each user every day (format is as follows), in which data "date" is the report time of the meter data, cons "Id" is the meter ID, r1 is the meter power, and other fields are irrelevant to the calculation logic and can be ignored. For the convenience of follow-up demonstration, only data with cons Ou id = 100000002 is input.

no(string) data_date(string) cons_id(string) org_no(string) r1(double)
101 20190716 100000002 35401 13.76
101 20190717 100000002 35401 14.12
101 20190718 100000002 35401 16.59
101 20190719 100000002 35401 18.89

Table 1: input data
The grid company hopes to get the difference data of the last two days (the same day and the previous day) of each meter every day after processing the data of the meter by Blink. The results are similar to the following table:

cons_id(string) data_date(string) subDegreeR1(double)
100000002 20190717 0.36
100000002 20190718 2.47
100000002 20190719 2.3

Table 2: expected output data

2, Demand analysis

According to the needs of customers, it is easy to get two solutions: 1. Through over window (2) windows over window for difference aggregation; 2. Through hop window (sliding=1 day, size=2 days) perform difference aggregation.
over window and hop window are both standard Windows supported by blink, which are very simple to use. The biggest difficulty of this requirement is the difference aggregation. Blink supports SUM, MAX, MIN, AVG and other built-in aggregation functions, but it does not meet the difference aggregation function of business requirements. Therefore, it needs to be implemented through UDAF.

3, UDAF development

For the development and construction environment of user-defined function for real-time calculation, please refer to UDX overview , no more details here. In this case, Blink2.2.7 is used, and the writing of key code is briefly described below.
Complete code (in order to facilitate upload, txt format is used): SubtractionUdaf.txt
1. In the package com.alibaba.blink.sql.udx.subtractonudaf, create a subtractonudaf class that inherits the AggregateFunction class.

public class SubtractionUdaf extends AggregateFunction<Double, SubtractionUdaf.Accum> 

Where Double is the type of UDAF output, in this case, it is the difference degree of two adjacent days. SubtractionUdaf.Accum is an internal custom accumulator data structure.
2. Define the accumulator data structure, and the user saves the status of UDAF.

    public static class Accum {
        private long currentTime;//Reporting time of the latest degree
        private double oldDegree;//Previous degrees
        private double newDegree;//Current latest degree
        private long num;   //The number of record s that have been calculated in the accumulator, mainly used for merge
        private List<Tuple2<Double, Long>> listInput;//Cache all input, mainly used for retract
    }

3. Implement the createAccumulator method to initialize the accumulator of UDAF

    //Initializing the accumulator of udaf
    public SubtractionUdaf.Accum createAccumulator() {
        SubtractionUdaf.Accum acc = new SubtractionUdaf.Accum();
        acc.currentTime = 0;
        acc.oldDegree = 0.0;
        acc.newDegree = 0.0;
        acc.num = 0;
        acc.listInput = new ArrayList<Tuple2<Double, Long>>();
        return acc;
    }

4. The getValue method is implemented to calculate the result of UDAF through the accumulator of stored state. The requirement of this case is to calculate the difference between the old and new data.

    public Double getValue(SubtractionUdaf.Accum accumulator) {
        return accumulator.newDegree - accumulator.oldDegree;
    }

5. The accumulate method is implemented to update the accumulator of the storage state of UDAF according to the input data. Considering that the data may be out of order and possibly retract, the data includes the corresponding degree iValue, as well as the time of reporting degree (constructed event time ts).

    public void accumulate(SubtractionUdaf.Accum accumulator, double iValue, long ts) {
        System.out.println("method : accumulate" );
        accumulator.listInput.add(Tuple2.of(Double.valueOf(iValue),Long.valueOf(ts)));
        Collections.sort(accumulator.listInput,this.comparator);//Sort by time
        accumulator.num ++;
        if(accumulator.listInput.size() == 1){
            accumulator.newDegree = iValue;
            accumulator.oldDegree = 0.0;
            accumulator.currentTime = ts;
        }else {//Dealing with possible data disorder
            accumulator.newDegree = accumulator.listInput.get(0).f0;
            accumulator.currentTime = accumulator.listInput.get(0).f1;
            accumulator.oldDegree = accumulator.listInput.get(1).f0;
        }
    }

Where accumulator is the state of UDAF, iValue and ts are the actual input data.
Note that you need to deal with possible input data disorder.
6. Implement the retract method, which is used to process the retract data in some optimization scenarios (such as using the over window).

    public void retract(SubtractionUdaf.Accum accumulator, double iValue, long ts) throws Exception{
        if(accumulator.listInput.contains(Tuple2.of(iValue, ts))){
            if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) == 0){//retract is the latest value
                accumulator.listInput.remove(0);
                accumulator.num--;
                if(accumulator.listInput.isEmpty()){
                    accumulator.currentTime = 0;
                    accumulator.oldDegree = 0.0;
                    accumulator.newDegree = 0.0;
                }else if(accumulator.listInput.size() == 1) {
                    accumulator.currentTime = accumulator.listInput.get(0).f1;
                    accumulator.newDegree = accumulator.listInput.get(0).f0;
                    accumulator.oldDegree = 0.0;
                }else{
                    accumulator.currentTime = accumulator.listInput.get(0).f1;
                    accumulator.newDegree = accumulator.listInput.get(0).f0;
                    accumulator.oldDegree = accumulator.listInput.get(1).f0;
                }
            } else if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) == 1){//retract is a new value
                accumulator.listInput.remove(1);
                accumulator.num--;
                if(accumulator.listInput.size() == 1){
                    accumulator.oldDegree = 0.0;
                }else {
                    accumulator.oldDegree = accumulator.listInput.get(1).f0;
                }
            }else {//retract has other values
                accumulator.listInput.remove(Tuple2.of(iValue, ts));
                accumulator.num--;
            }
        }else {
            throw new Exception("Cannot retract a unexist record : iValue = "+ iValue + "timestamp = "+ ts);
        }
    }

We need to consider whether retract is the latest data or the next new data, which requires different logical processing.
7. Implement the merge method for some optimization scenarios (such as using the hop window).

    public void merge(SubtractionUdaf.Accum accumulator, Iterable<SubtractionUdaf.Accum> its) {
        int i = 0;
        System.out.println("method : merge" );
        System.out.println("accumulator : "+ accumulator.newDegree);
        System.out.println("accumulator : "+ accumulator.currentTime);

        for (SubtractionUdaf.Accum entry : its) {
            if(accumulator.currentTime < entry.currentTime){
                if(entry.num > 1){
                    accumulator.currentTime = entry.currentTime;
                    accumulator.oldDegree = entry.oldDegree;
                    accumulator.newDegree = entry.newDegree;
                    accumulator.num += entry.num;
                    accumulator.listInput.addAll(entry.listInput);
                }else if(entry.num == 1){
                    accumulator.currentTime = entry.currentTime;
                    accumulator.oldDegree = accumulator.newDegree;
                    accumulator.newDegree = entry.newDegree;
                    accumulator.num ++;
                    accumulator.listInput.addAll(entry.listInput);
                }
            }else{
                if(accumulator.num > 1){
                    accumulator.num += entry.num;
                    accumulator.listInput.addAll(entry.listInput);
                }else if(accumulator.num == 1){
                    accumulator.oldDegree = entry.newDegree;
                    accumulator.num += entry.num;
                    accumulator.listInput.addAll(entry.listInput);
                }else if(accumulator.num == 0){
                    accumulator.currentTime = entry.currentTime;
                    accumulator.oldDegree = entry.oldDegree;
                    accumulator.newDegree = entry.newDegree;
                    accumulator.num = entry.num;
                    accumulator.listInput.addAll(entry.listInput);
                }
            }
            Collections.sort(accumulator.listInput,this.comparator);
            System.out.println("merge : "+i);
            System.out.println("newDegree : "+entry.newDegree);
            System.out.println("oldDegree = "+entry.oldDegree);
            System.out.println("currentTime : "+entry.currentTime);
        }
    }

We need to consider whether the merge data is newer than the current data, and need different processing logic.
8. In other aspects, considering the need to sort the input degrees according to the event time, the user-defined Comparator class is instantiated in the open method, and the inputList in the accumulator data structure is sorted according to the descending order of the event time.

    public void open(FunctionContext context) throws Exception {
        //Define the order of records, which is used to sort listInput. The record with the newer time is in the front of the list
        this.comparator = new Comparator<Tuple2<Double, Long>>() {
            public int compare( Tuple2<Double, Long> o1, Tuple2<Double, Long> o2) {
                if (Long.valueOf(o1.f1) < Long.valueOf(o2.f1)) {
                    return 1;
                } else if (Long.valueOf(o1.f1) > Long.valueOf(o2.f1)) {
                    return -1;
                }else {
                    return 0;
                }
            }
        };
    }

Please refer to [using IntelliJ IDEA to develop custom functions] () to compile and package UDAF, and refer to UDX overview Complete the upload and reference of resources.

 

4, SQL development and test results

 

(1) over window

The SQL code is as follows: syntax check, go online, start the job (select the current start point). And upload the table 1 data to the data hub.

CREATE FUNCTION OverWindowSubtractionUdaf as 'com.alibaba.blink.sql.udx.SubtractionUdaf';

CREATE TABLE input_dh_e_mp_read_curve (
    `no`                  VARCHAR,
    data_date             VARCHAR,
    cons_id               VARCHAR,
    org_no                VARCHAR,
    r1                    DOUBLE,
    ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss')
    ,WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (
    type = 'datahub',
    endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',
    roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole',
    project = 'jszc_datahub',
    topic = 'input_dh_e_mp_read_curve'
);
CREATE TABLE data_out(
    cons_id varchar
    ,data_date varchar
    ,subDegreeR1 DOUBLE
)with(
    type = 'print'
);

INSERT into data_out    
SELECT
    cons_id
    ,last_value(data_date) OVER (
        PARTITION BY cons_id 
        ORDER BY ts 
        ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date
    ,OverWindowSubtractionUdaf(r1,unix_timestamp(ts)) OVER (
        PARTITION BY cons_id 
        ORDER BY ts 
        ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date
FROM input_dh_e_mp_read_curve

Due to the use of print connector, the output can be seen from the taskmanager.out log of the corresponding sink as follows (other debug logs have been ignored):

task-1> (+)100000002,20190716,13.76
task-1> (+)100000002,20190717,0.35999999999999943
task-1> (+)100000002,20190718,2.4700000000000006

Compared with the expected output (Table 2), the data of the two windows 20190717 and 20190718 are correct, indicating that the business logic is correct, but the output is slightly different from the expected output:
(1) 20190716 output is 13.76, which is caused by only one piece of data in the first over window, which can be filtered out in the business layer;
(2) The data of 20190719 is not output, because we set watermark, and there is no data coming in after 20190719 in the test environment to trigger the end of the window corresponding to 20190719.

(2) hop window

The SQL code is as follows: syntax check, online, start job (select the current start point). And upload the table 1 data to the data hub.

CREATE FUNCTION HopWindowSubtractionUdaf as 'com.alibaba.blink.sql.udx.SubtractionUdaf';

CREATE TABLE input_dh_e_mp_read_curve (
    `no`                  VARCHAR,
    data_date             VARCHAR,
    cons_id               VARCHAR,
    org_no                VARCHAR,
    r1                    DOUBLE,
    ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss')
    ,WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (
    type = 'datahub',
    endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',
    roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole',
    project = 'jszc_datahub',
    topic = 'input_dh_e_mp_read_curve'
);
CREATE TABLE data_out(
    cons_id varchar
    ,data_date varchar
    ,subDegreeR1 DOUBLE
)with(
    type = 'print'
);
INSERT into data_out    
SELECT
    cons_id
    ,DATE_FORMAT(HOP_end(ts, INTERVAL '1' day,INTERVAL '2' day), 'yyyyMMdd')
    ,HopWindowSubtractionUdaf(r1,unix_timestamp(ts))
FROM input_dh_e_mp_read_curve
group by hop(ts, INTERVAL '1' day,INTERVAL '2' day),cons_id;

Since print connector is used, the output can be seen from the taskmanager.out log of the corresponding sink as follows (other debug logs have been ignored):

task-1> (+)100000002,20190716,13.76
task-1> (+)100000002,20190717,0.35999999999999943
task-1> (+)100000002,20190718,2.4700000000000006

Compared with the expected output (Table 2), the data of the two windows 20190717 and 20190718 are correct, indicating that the business logic is correct, but the output is slightly different from the expected output:
(1) 20190716 output is 13.76, which is caused by only one piece of data in the first hop window, which can be filtered out in the business layer;
(2) The data of 20190719 is not output, because we set watermark, and there is no data coming in after 20190719 in the test environment to trigger the end of the window corresponding to 20190719.

5, Some thoughts

 

1. On the calling relationship and order of UDAF internal methods

UDAF mainly includes createAccumulator, getValue, accumulator, retract and merge methods. The calling relationship and order are not completely determined, but related to the underlying optimization of Blink, Blink version, window type (such as hop or over window), etc.
The comparison determines that a normal (no failover) job is called. The createAccumulator method is called only once when the job is started, the accumulate method is called once when each data input is triggered, and the getValue is called once when the data output is triggered (which does not mean that only one call is made).
In this case, the over window calls the retract method instead of the merge method, and the hop window calls the merge method instead of the retract method.
You can add logs and observe the calling order of these methods, which is quite interesting.

2. How to know which methods to implement UDAF

The createAccumulator, getValue and accumulate methods must be implemented in UDAF. You can choose to implement the retract and merge methods.
In general, three methods, createAccumulator, getValue and accumulator, can be implemented first, and then syntax check can be performed after SQL is written. The SQL compiler will prompt whether the retract or merge method is needed.
For example, if the retract method is not implemented, when using the over window, the syntax check will report an error similar to the following:

org.apache.flink.table.api.ValidationException: Function class 'com.alibaba.blink.sql.udx.SubtractionUdaf' does not implement at least one method named 'retract' which is public, not abstract and (in case of table functions) not static.

For example, if the merge method is not implemented, when using the over window, the syntax check will report an error similar to the following:

org.apache.flink.table.api.ValidationException: Function class 'com.alibaba.blink.sql.udx.SubtractionUdaf' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static.

 

3. There is room for optimization in this case

(1) This case does not consider the problem of missing data, such as the lack of 20190717 data for some reason (network problems, data collection problems, etc.). What would be the outcome in this case? You can test it yourself;
(2) In this case, we use a List, and then sort it through the Collections.sort method, which is not a very good method. If we use priority queue, the performance should be better;

Keywords: Mobile SQL Windows Apache IntelliJ IDEA

Added by stalione on Fri, 08 May 2020 12:15:27 +0300