Big data Flink in 2021: Flink simulated double 11 real-time big screen statistics

 

catalogue

Flink simulation double 11 real-time big screen statistics

demand

data

Coding steps:

1.env

2.source

3.transformation

4. Use the results of the above aggregation to realize the business requirements:

5.execute

Code implementation

Effect

Flink simulation double 11 real-time big screen statistics

demand

 

 

In the real-time processing of big data, real-time large screen display has become a very important display item, such as the most famous double 11 large screen real-time total sales price display. In addition to this, there are some other scenarios. For example, we display the current pv and uv of our website in our background system in real time. In fact, the methods are similar.

Today, let's make a simple small example of simulating the large screen of e-commerce statistics,

The requirements are as follows:

1. Calculate the total sales amount from zero point of the day to the current time in real time

2. Calculate the sales top3 of each category

3. Update the statistical results every second

 

data

Firstly, we simulate the generation of orders through custom source and generate a Tuple2. The first element is the classification, and the second element represents the order amount generated under this classification. The amount is generated randomly

/**
 * Custom data source generates order data in real time tuple2 < classification, amount >
 */
public static class MySource implements SourceFunction<Tuple2<String, Double>>{
    private boolean flag = true;
    private String[] categorys = {"Women's wear", "men's wear","books", "household electrical appliances","Wash and protect", "Beauty makeup","motion", "game","outdoors", "furniture","Musical Instruments", "to work in an office"};
    private Random random = new Random();

    @Override
    public void run(SourceContext<Tuple2<String, Double>> ctx) throws Exception {
        while (flag){
            //Randomly generated classification and amount
            int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1]
            String category = categorys[index];//Get random classification
            double price = random.nextDouble() * 100;//Note that nextDouble generates random numbers between [0 ~ 1), * 100 indicates [0 ~ 100)
            ctx.collect(Tuple2.of(category,price));
            Thread.sleep(20);
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }
}

 

Coding steps:

1.env

2.source

3.transformation

3.1 define a window with a size of one day. The second parameter indicates that the UTC+08:00 time zone used in China is earlier than UTC time

keyBy(t->t.f0)

window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))

3.2 define a 1s trigger

.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))

3.3 aggregation results aggregate(new PriceAggregate(), new WindowResult());

3.4 take a look at the results of aggregation

Category POJO (category = Menswear, totalPrice=17225.26, dateTime=2020-10-20 08:04:12)

4. Use the results of the above aggregation to realize the business requirements:

tempAggResult.keyBy(CategoryPojo::getDateTime)

//The statistical results are updated every second

                .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))   

//Implement the complex business logic in ProcessWindowFunction

              .process(new WindowResultProcess());

 

4.1 calculate the total sales amount from zero point of the day to the current time in real time

4.2. Calculate the sales top3 of each category

4.3. The statistical results are updated every second

5.execute

 

Code implementation

package cn.itcast.action;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Random;
import java.util.stream.Collectors;

/**
 * Author itcast
 * Desc
 * 1.Calculate the total sales amount in real time from 0:00:00 to 23:59:59 on November 11
 * 2.Calculate the sales top3 of each category
 * 3.The statistical results are updated every second
 */
public class DoubleElevenBigScreem {
    public static void main(String[] args) throws Exception {
        //TODO 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);//Convenient observation
        //TODO 2.source
        DataStream<Tuple2<String, Double>> orderDS = env.addSource(new MySource());

        //TODO 3.transformation -- preliminary aggregation: aggregate the total sales amount of each category as of the current time every 1s
        DataStream<CategoryPojo> tempAggResult = orderDS
                //grouping
                .keyBy(t -> t.f0)
                //If you directly use the previously learned window, it is expressed as follows:
                //Indicates that it is calculated every 1 day
                //.window(TumblingProcessingTimeWindows.of(Time.days(1)));
                //It means that the data of the last day is calculated every 1s, but the calculation run at , 00:01:00 on November 11 is: from , 00:01:00 on November 10 to , 00:01:00 on November 11 --- no!
                //.window(SlidingProcessingTimeWindows.of(Time.days(1),Time.seconds(1)));
                //*For example, if China uses UTC+08:00, you need a one-day time window,
                //*The window starts at 00:00:00 local time. You can use {@ code of (time. Day (1), time. hours(-8))}
                //The following code indicates that the data of the current day is calculated from 00:00:00 of the current day, and a trigger time / trigger interval is missing
                //3.1 define a window with a size of one day. The second parameter indicates that the UTC+08:00 time zone used in China is earlier than UTC time
                .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
                //3.2 customize trigger timing / trigger interval
                .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
                //. sum() / / simple aggregation
                //3.3 custom aggregation and result collection
                //aggregate(AggregateFunction<T, ACC, V> aggFunction,WindowFunction<V, R, K, W> windowFunction)
                .aggregate(new PriceAggregate(), new WindowResult());//aggregate supports complex custom aggregations
        //3.4 take a look at the results of aggregation
        tempAggResult.print("Total sales of each category initially aggregated");
        //Total sales of each category preliminarily aggregated > doubleelevenbigscreen Category POJO (category = game, totalPrice=563.8662504982619, dateTime=2021-01-19 10:31:40)
        //Total sales of each category preliminarily aggregated > doubleelevenbigscreen Category POJO (category = Office, totalPrice=876.5216500403918, dateTime=2021-01-19 10:31:40)

        //TODO 4.sink - use the results of the above preliminary aggregation (aggregate the total sales amount of each category as of the current time every 1s) to realize the business requirements:
        tempAggResult.keyBy(CategoryPojo::getDateTime)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))//The final aggregation is carried out every 1s and the results are output
                //. sum / / simple aggregation
                //.apply()
                .process(new FinalResultWindowProcess());//Implement the complex business logic in ProcessWindowFunction

        //TODO 5.execute
        env.execute();
    }

    /**
     * Custom data source generates order data in real time tuple2 < classification, amount >
     */
    public static class MySource implements SourceFunction<Tuple2<String, Double>> {
        private boolean flag = true;
        private String[] categorys = {"Women's wear", "men's wear", "books", "household electrical appliances", "Wash and protect", "Beauty makeup", "motion", "game", "outdoors", "furniture", "Musical Instruments", "to work in an office"};
        private Random random = new Random();

        @Override
        public void run(SourceContext<Tuple2<String, Double>> ctx) throws Exception {
            while (flag) {
                //Randomly generated classification and amount
                int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1]
                String category = categorys[index];//Get random classification
                double price = random.nextDouble() * 100;//Note that nextDouble generates random decimals between [0 ~ 1), * 100 indicates random decimals between [0 ~ 100]
                ctx.collect(Tuple2.of(category, price));
                Thread.sleep(20);
            }
        }

        @Override
        public void cancel() {
            flag = false;
        }
    }

    /**
     * User defined aggregation function, specifying aggregation rules
     * AggregateFunction<IN, ACC, OUT>
     */
    private static class PriceAggregate implements AggregateFunction<Tuple2<String, Double>, Double, Double> {
        //Initialize accumulator
        @Override
        public Double createAccumulator() {
            return 0D;//D stands for double and l stands for Long
        }

        //Accumulate the data on the accumulator
        @Override
        public Double add(Tuple2<String, Double> value, Double accumulator) {
            return value.f1 + accumulator;
        }

        //Get cumulative results
        @Override
        public Double getResult(Double accumulator) {
            return accumulator;
        }

        //Merge the results of each subtask
        @Override
        public Double merge(Double a, Double b) {
            return a + b;
        }
    }

    /**
     * Customize window functions and specify window data collection rules
     * WindowFunction<IN, OUT, KEY, W extends Window>
     */
    private static class WindowResult implements WindowFunction<Double, CategoryPojo, String, TimeWindow> {
        private FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
        @Override
        //void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out)
        public void apply(String category, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception {
            long currentTimeMillis = System.currentTimeMillis();
            String dateTime = df.format(currentTimeMillis);
            Double totalPrice = input.iterator().next();
            out.collect(new CategoryPojo(category,totalPrice,dateTime));
        }
    }

    /**
     * Used to store the results of the aggregation
     */
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CategoryPojo {
        private String category;//Classification name
        private double totalPrice;//Total sales of this category
        private String dateTime;// The time up to the current time should have been EventTime, but here we simplify the direct use of the current system time
    }

    /**
     * The user-defined window completes the total sales statistics and classified sales top3 statistics and outputs them
     * abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
     */
    private static class FinalResultWindowProcess extends ProcessWindowFunction<CategoryPojo, Object, String, TimeWindow> {
        //be careful:
        //The following key/dateTime indicates the current time of 1s
        //elements: represents the sales data of each category up to the current 1s
        @Override
        public void process(String dateTime, Context context, Iterable<CategoryPojo> elements, Collector<Object> out) throws Exception {
            //1. Calculate in real time the total sales volume from 0:00 a.m. To the current time on November 11 from 00:00 a.m. To 23:59 a.m
            double total = 0D;//Used to record total sales
            //2. Calculate the sales top3 of each category: for example: "women's clothing": 10000 "men's clothing": 9000 "books": 8000
            //Note: Here you only need to ask for top 3, that is, you only need to rank the top 3, and don't worry about others! Of course, you can also sort all the data coming in each time, but it's a waste!
            //Therefore, the small top heap is directly used to complete the top3 sorting:
            //70
            //80
            //90
            //If there is a smaller element than the top of the heap, don't use it directly
            //If one is larger than the heap top element, such as 85, delete the heap top element directly, add 85 and continue to sort according to the small top heap rule, with the small one above and the large one below
            //80
            //85
            //90
            //Create a small top heap
            //https://blog.csdn.net/hefenglian/article/details/81807527
            Queue<CategoryPojo> queue = new PriorityQueue<>(3,//Initial capacity
                    //Normal sorting is that the small one is in the front and the large one is in the back, that is, when C1 > C2, it returns 1, that is, ascending order, that is, small top heap
                    (c1, c2) -> c1.getTotalPrice() >= c2.getTotalPrice() ? 1 : -1);
            for (CategoryPojo element : elements) {
                double price = element.getTotalPrice();
                total += price;
                if(queue.size()< 3){
                    queue.add(element);//Or offer to join the team
                }else{
                    if(price >= queue.peek().getTotalPrice()){//peek means to take out the top element of the heap without deleting it
                        //queue.remove(queue.peek());
                        queue.poll();//Remove heap top element
                        queue.add(element);//Or offer to join the team
                    }
                }
            }
            //When the code goes here, the queue stores the top 3 classified sales, but in ascending order It needs to be changed to reverse order and then output
            List<String> top3List = queue.stream()
                    .sorted((c1, c2) -> c1.getTotalPrice() >= c2.getTotalPrice() ? -1 : 1)
                    .map(c -> "classification:" + c.getCategory() + " amount of money:" + c.getTotalPrice())
                    .collect(Collectors.toList());

            //3. Update the statistical results every second - that is, direct output
            double roundResult = new BigDecimal(total).setScale(2, RoundingMode.HALF_UP).doubleValue();//Round to 2 decimal places
            System.out.println("time: "+dateTime +" Total amount :" + roundResult);

            System.out.println("top3: \n" + StringUtils.join(top3List,"\n"));
        }
    }
}

 

Effect

 

Keywords: Big Data

Added by Spitfire on Fri, 18 Feb 2022 13:57:10 +0200