java multithreading-ThreadPoolExecutor's rejection policy

Summary

Original address http://blog.csdn.net/qq_25806863/article/details/71172823

Analysis When ThreadPool Executor is constructed There is a RejectedExecutionHandler parameter.

Rejected Execution Handler is an interface:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

There is only one way. When the number of threads to be created is greater than the maximum number of threads in the thread pool, the new task is rejected and the method in this interface is called.

This interface can be implemented by itself to handle these tasks that exceed the number.

ThreadPool Executor itself has provided four rejection strategies: Caller Runs Policy, AbortPolicy, Discard Policy, Discard Oldest Policy.

These four rejection strategies are very simple when you look at the implementation method.

AbortPolicy

The default rejection strategy in ThreadPool Executor is AbortPolicy. Throw an exception directly.

private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

The following is his realization:

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

Simple and rude, throw a RejectedExecutionException exception directly, and don't perform this task.

test

First, customize a Runnable to give each thread a name. Next, use this Runnable.

static class MyThread implements Runnable {
        String name;
        public MyThread(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("thread:"+Thread.currentThread().getName() +" implement:"+name +"  run");
        }
    }

Then we construct a thread pool with a core thread of 1 and a maximum number of threads of 2. The rejection strategy is AbortPolicy

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 0, 
        TimeUnit.MICROSECONDS, 
        new LinkedBlockingDeque<Runnable>(2), 
        new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 6; i++) {
    System.out.println("Adding the _____________"+i+"Tasks");
    executor.execute(new MyThread("thread"+i));
    Iterator iterator = executor.getQueue().iterator();
    while (iterator.hasNext()){
        MyThread thread = (MyThread) iterator.next();
        System.out.println("List:"+thread.name);
    }
}

The output is:

Analyse the process.

  1. When the first task is added, it is executed directly and the task list is empty.
  2. When adding the second task, because the LinkedBlocking Deque is used and the core thread is executing the task, the second task will be placed in the queue with thread 2 in the queue.
  3. When the third task is added, it will also be placed in the queue, where there are threads 2 and 3.
  4. When adding the fourth task, because the core task is still running and the task queue is full, Hu directly creates a new thread to perform the fourth task. At this time, there are two threads running in the thread pool, reaching the maximum number of threads. There are threads 2 and 3 in the task queue.
  5. When the fifth task is added, there is no place to store and execute it anymore, and it will be rejected by the thread pool to execute the rejected Execution method of the rejected Execution method of the AbortPolicy, which throws an exception directly.
  6. Ultimately, only four threads can run. The latter were rejected.

CallerRunsPolicy

CallerRunsPolicy calls the thread in the current thread pool to execute the rejected task after the task is refused to be added.

The following is his realization:

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

It's also simple, run directly.

test

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 30,
        TimeUnit.SECONDS,
        new LinkedBlockingDeque<Runnable>(2),
        new ThreadPoolExecutor.AbortPolicy());

According to the above operation, output

Note that the fifth task, Task 5, is also rejected by the thread pool, so the rejected Execution method of CallerRunsPolicy is executed, which directly executes the run method of the task. So you can see that Task 5 is executed in the main thread.

It can also be seen from this that because the fifth task runs in the main thread, the main thread is blocked, so that when the fifth task is finished and the sixth task is added, the first two tasks are finished and there are idle threads, so thread 6 can be added to the thread pool to execute.

The disadvantage of this strategy is that it may block the main thread.

DiscardPolicy

This strategy is much simpler to handle. Looking at the implementation, we can see that:

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

This thing did nothing.

Therefore, using this rejection strategy, tasks rejected by thread pools will be discarded directly, no exception will be discarded and no execution will be performed.

test

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 30,
        TimeUnit.SECONDS,
        new LinkedBlockingDeque<Runnable>(2),
        new ThreadPoolExecutor.DiscardPolicy());

Output:

As you can see, the tasks 5 and 6 added later will not be executed at all, and there will be no response, and they will be discarded directly.

DiscardOldestPolicy

The role of the Discard Oldest Policy strategy is to abandon the oldest task in the task queue when the task refuses to be added, that is, to join the queue first, and then to add the new task.

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

In rejected Execution, the first joined task is popped up from the task queue, a position is vacated, and then the execute method is executed again to join the task in the queue.

test

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 30,
        TimeUnit.SECONDS,
        new LinkedBlockingDeque<Runnable>(2),
        new ThreadPoolExecutor.DiscardOldestPolicy());

The output is:

As you can see,

  1. When the fifth task is added, it is rejected by the thread pool. At this point, there are tasks 2 and 3 in the task queue.
  2. At this point, the rejection strategy will make the first task in the task queue pop up, that is task 2.
  3. Then the rejected task 5 is added to the human task queue, and then the task queue becomes task 3 and task 5.
  4. When the sixth task is added, because of the same process, task 3 in the queue is discarded and task 6 is added. Task 5 and task 6 are added to the queue.
  5. Therefore, only 1, 4, 5, 6. Task 2 and task 3 times abandoned, will not be executed.

Custom Denial Policy

Looking at the four rejection strategies provided by the previous system, we can see that the implementation of the rejection strategy is very simple. The same is true of self-writing.

For example, if you want the rejected task to be executed in a new thread, you can write as follows:

static class MyRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        new Thread(r,"New Threads"+new Random().nextInt(10)).start();
    }
}

Then use it normally:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 30,
        TimeUnit.SECONDS,
        new LinkedBlockingDeque<Runnable>(2),
        new MyRejectedExecutionHandler());

Output:

Tasks 5 and 6 that were rejected were found to be executed in the new thread.

Added by xluminex on Wed, 03 Jul 2019 22:43:22 +0300