[JAVA] Application of delay queue

Recently, when developing the CRM management system, I encountered a demand: when using the system, the personnel of the sales department can "get" potential customer clues from the [clue public sea] module to their own [clue private sea] module to become their own private potential customer clues for later tracking and development. At the same time, they can also take the initiative to give up the clues, "Release" the clue back to [clue open sea]. If the development is successful, the customer will enter the [customer private sea] module and become his own potential customer. If he doesn't want to continue to develop this customer at this time, he will enter the [customer open sea] for all sales to "receive". Who will receive it, Enter the [customer private sea] of the corresponding sales

On this basis, we hope to realize such a function:
If the user fails to successfully develop the lead into his own potential customer within 24 hours after receiving the lead, it will be automatically released to become a lead on the high seas, and the lead will be frozen within 48 hours (unable to receive). Similarly, if the potential customer does not develop into a formal customer within 60 days, the customer resources will be automatically released into the high seas, which can not be claimed again within 48 hours

In this scenario, I thought of DelayQueue

Introduction to DelayQueue

In short, DelayQueue is a queue sorted according to the expiration time of elements, rather than a general queue, which is first in first out. The elements that expire fastest are at the top of the queue, and the elements that expire later are ranked later
When in use, the element must implement the Delayed interface. When the producer thread adds elements to the queue, it will trigger the compareTo method in the Delayed interface to sort. When the consumer clue obtains elements, it will call the getDelay method in the Delayed interface to check whether the first element of the queue is due. The getDelay method returns the time value remaining from the expiration time, If the value returned by getDelay is less than or equal to 0, it indicates that it has expired and the consumer thread takes it out for consumption. If the value returned by getDelay method is greater than 0, the consumer thread will be blocked. After the time value returned by wait, take the element from the queue head for consumption

data structure

Read the source code of DelayQueue

You can see that it contains:
A PriorityQueue - PriorityQueue is a priority Queue. It is a Queue without blocking function, that is, the underlying layer of DelayQueue uses PriorityQueue to store elements

A ReentrantLock lock

A thread leader - DelayQueue uses a similar leader follower mode, that is, when the consumer thread wants to get the element, if the element has not expired, the blocking waiting time of the consumer thread is the remaining expiration time of the element, that is, the element waiting by the consumer thread is guaranteed to be the first expired element, so that the consumer thread can spend its time on processing tasks as much as possible, Minimize idle time to improve thread utilization efficiency

A blocking Condition -- the function of blocking when leaving the queue

characteristic

DelayQueue is an unbounded queue, so it will not be blocked when entering the queue, which is the same as the priority queue
The characteristics of DelayQueue are mainly on the out of line
When leaving the team:
1. If the queue is empty, it will be blocked
2. If it is not empty, check whether the element at the top of the heap has expired. If the remaining expiration time is less than or equal to 0, it will be out of the queue. If it is greater than 0, it will judge whether there is a current consumer thread as a leader waiting to obtain the element. If the leader is not null, it will be blocked directly. If the leader is null, set the current consumer thread as the leader and block according to the earliest expiration time

Sketch Map:

After 2s, element 5 expires. Wake up consumer thread 1 and get element 5 for consumption
At the same time, set consumer thread 2 as the leader. At this time, element 4 is the top element of the heap and expires after 2s. Therefore, the blocking time of consumer thread 2 is set to 2s

After another 2s, element 4 expires, wakes up consumer thread 2 and obtains element 4 for consumption
Consumer thread 1 continues processing element 5

After 2s, if consumer thread 1 or consumer thread 2 has finished processing the task, it will continue to obtain the element for consumption, and element 3 has just expired
If both threads do not finish processing the task at this time, element 3 will expire, but no consumer will take out the consumption. At the same time, new elements in the queue will continue to enter the queue, which will cause the task delay, the queue will become larger and larger, and the delayed processing time of elements will become longer and longer

Suppose that after another 2s, no consumer thread is empty:

Therefore, if the task processing time is long, the task growth rate is fast, and the expiration time is concentrated, it is necessary to speed up the task processing speed of consumer threads and increase the number of consumer threads, otherwise the task delay will be longer and longer. On the contrary, we can not blindly increase the number of consumer threads, which will lead to a waste of resources

example

Combined with the project requirements, DelayQueue is used to realize the timeout function of clues and customers
(1) Create task class: delaytask Java, implement the Delayed interface as an element in the delay queue, and then just inherit the clue class and customer class

@Data
public class DelayTask implements Delayed {

    /**
     * If the start timing time is not set, it defaults to the current system time
     */
    private transient Date taskStartTime = new Date();
    /**
     * If the expiration time is not set, it defaults to 1 minute
     */
    private transient long taskExpiredTime = 60 * 1000;

    /**
     * Initial setting start timing time
     * taskStartTime Start time [string] [yyyy MM DD HH: mm: SS]
     * taskExpiredTime Expiration time [long] unit: s
     * @param taskStartTime
     * @param taskExpiredTime
     */
    public void initTaskTime(String taskStartTime, long taskExpiredTime) {
        if(Assert.notEmpty(taskStartTime)) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            try {
                this.taskStartTime = sdf.parse(taskStartTime);
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
        this.taskExpiredTime = taskExpiredTime;
        this.taskExpiredTime += this.taskStartTime.getTime();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(taskExpiredTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.taskExpiredTime - ((DelayTask) o).taskExpiredTime);
    }

}

(2) Create a singleton delay queue tool class: DelayQueueHelper
It declares a delay queue and provides a unified and global entry to the delay queue (queue entry and element deletion operations)

public class DelayQueueHelper {

    private volatile static DelayQueueHelper delayQueueHelper = null;

    //Expiration time of private sea clues: 24h
    public static final long CLUE_EXPIRED_TIME = 24 * 60 * 60 * 1000;

    //Expiration time of private customers: 60 days
    public static final long CUS_EXPIRED_TIME = 60L * 24 * 60 * 60 * 1000;

    //Clues, freezing time after customer release: 48h
    public static final long BLOCK_TIME = 48 * 60 * 60 * 1000;

    private DelayQueue<DelayTask> queue = new DelayQueue<>();

    private DelayQueueHelper() {
    }

    public static DelayQueueHelper getInstance() {
        if(delayQueueHelper == null) {
            synchronized(DelayQueueHelper.class) {
                delayQueueHelper = new DelayQueueHelper();
            }
        }
        return delayQueueHelper;
    }

    public void addTask(DelayTask task) {
        queue.put(task);
    }

    public void removeTask(DelayTask task) {
        if(task == null){
            return;
        }
        for(Iterator<DelayTask> iterator = queue.iterator(); iterator.hasNext();) {
            if(task instanceof Clue) {
                Clue clue = (Clue) task;
                Clue queueObj = (Clue) iterator.next();
                if(clue.getId().equals(queueObj.getId())){
                    queue.remove(queueObj);
                }
            }
        }
    }

    public DelayQueue<DelayTask> getQueue() {
        return queue;
    }

}

(3) Create an initialization class: delayqueuerun to implement the ApplicationRunner interface
When the system starts, first queue all tasks, and start a consumer thread to cycle to obtain expired clues and customers from the delay queue for consumption (change the status of clues and customers to release status and unfreeze status)

@Slf4j
@Component
public class DelayQueueRunner implements ApplicationRunner {

    @Override
    public void run(ApplicationArguments args) throws Exception {
        DelayQueueHelper queueHelper = DelayQueueHelper.getInstance();
        //1. Queue all unexpired threads and customers
        //......
        
        //2. Start a consumer thread
        run(queueHelper.getQueue());
    }

    public void run(DelayQueue queue) {
        new Thread() {
            @Override
            public void run() {
                try {
                    while (true) {
                        DelayTask task = (DelayTask) queue.take();
                        executeTask(task);
                    }
                } catch (InterruptedException e) {
                    log.error(e.getMessage());
                    e.printStackTrace();
                }
            }
        }.start();
    }

    private void executeTask(DelayTask task) {
        if(task instanceof Clue) {
            Clue clue = (Clue) task;
            //modify state
          	clue.update();
        }
    }

}

Keywords: Java Back-end Concurrent Programming message queue

Added by L on Mon, 14 Feb 2022 06:31:43 +0200