JUC learning - thread pool 2

Next blog https://blog.csdn.net/qq_43605444/article/details/121568416?spm=1001.2014.3001.5501

8. 4 common saturation strategies

When the queue in the thread pool is full and the thread pool has reached the maximum number of threads, the thread pool will pass the task to the saturation policy for processing. These policies implement the RejectedExecutionHandler interface. There are methods in the interface:

void rejectedExecution(Runnable r, ThreadPoolExecutor executor)

Parameter Description:

  • r: Tasks to be performed
  • executor: current thread pool object

JDK provides four common saturation strategies:

  • AbortPolicy: throw an exception directly [default rejection policy]
  • CallerRunsPolicy: in the current caller's thread, run the task, that is, who throws the task, and deal with it by himself.
  • DiscardOldestPolicy: discards the oldest task in the queue, that is, discards a task at the head of the queue, and then executes the current incoming task
  • DiscardPolicy: discard directly without processing. The method is empty

9. Custom saturation policy

The RejectedExecutionHandler interface needs to be implemented. When the task cannot be processed, we want to record the log. We need to customize a saturation strategy. Example code:

public class CustomizeRejectedExecution {
    
    static class Task implements Runnable {
        String name;
        public Task(String name) {
            this.name = name;
        }
        
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "handle" + this.name);
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        @Override
        public String toString() {
            return "Task{" +
                    "name='" + name + '\'' +
                    '}';
        }
    }
    
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1,
                                1,
                                60L,
                                TimeUnit.SECONDS,
                                new ArrayBlockingQueue<Runnable>(1),
                                Executors.defaultThreadFactory(),
                                (r, executors) -> {
                                    //Custom saturation policy
                                    //Keep track of tasks you can't handle
                                    System.out.println("Tasks that cannot be processed:" + r.toString());
                                });
        
        for (int i = 0; i < 5; i++) {
            executor.execute(new Task("task-" + i));
        }
        
        executor.shutdown();
    }
}

Run the above code and output the result:

Tasks that cannot be processed: Task{name='task-2'}
Tasks that cannot be processed: Task{name='task-3'}
Tasks that cannot be processed: Task{name='task-4'}
pool-1-thread-1 Processing tasks-0
pool-1-thread-1 Processing tasks-1

From the output results, we can see that three tasks have entered the saturation strategy and recorded the task logs. For those unable to handle multiple tasks, we'd better record them so that developers can know. The task has entered the saturation strategy, which indicates that the configuration of thread pool may not be reasonable, or the performance of the machine is limited, so some optimization and adjustment need to be made.

Modify the above code:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1,
                1,
                60L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());

Run output results:

main Processing tasks-2
pool-1-thread-1 Processing tasks-0
pool-1-thread-1 Processing tasks-1
main Processing tasks-4
pool-1-thread-1 Processing tasks-3    

Other saturation strategies should be tested by yourself.

10. 2 shutdown methods in thread pool

The thread pool provides 2 closing methods: shutdown and shutdownNow. When the caller two methods, the thread pool traverses the internal working threads, then calls the interrrupt method of each worker thread to send interrupt signals to the thread. If the interrupt signal is unable to respond to the interrupt, it may never terminate, so if there is an infinite loop inside, It is best to detect the interrupt signal of the thread inside the loop and exit reasonably.

Call either of these two methods, and the isShutdown method of the thread pool will return true. When all task threads are closed, it means that the thread pool is closed successfully. At this time, calling the isTerminaed method will return true.

After calling the shutdown method, the thread pool will no longer accept new tasks, and all submitted tasks will be processed internally. After processing, the worker thread will exit automatically.

After calling the shutdownNow method, the thread pool will remove the unprocessed tasks (tasks waiting to be processed in the team), and the worker thread will exit automatically after the processing is completed.

As for which method to call to close the thread, it should be determined by the task characteristics submitted to the thread pool. In most cases, the shutdown method is called to close the thread pool. If the task does not have to be completed, the shutdown now method can be called.

11. Extended thread pool

Although the jdk provides a high-performance thread pool ThreadPoolExecutor, what should we do if we want to extend the thread pool, such as monitoring the start time and end time of each task execution, or some other customized functions?

The jdk has helped us think of several methods provided in ThreadPoolExecutor, such as before execute, after execute and terminated, which can be used by developers themselves. Take a look at the source code inside the process pool:

try {
    beforeExecute(wt, task);//Method of calling before task execution
    Throwable thrown = null;
    try {
        task.run();
    } catch (RuntimeException x) {
        thrown = x;
        throw x;
    } catch (Error x) {
        thrown = x;
        throw x;
    } catch (Throwable x) {
        thrown = x;
        throw new Error(x);
    } finally {
        afterExecute(task, thrown);//Method of calling after task execution
    }
} finally {
    task = null;
    w.completedTasks++;
    w.unlock();
}
  • beforeExecute: the method called before task execution has 2 parameters, the first parameter is the thread executing the task, and the second parameter is the task.
protected void beforeExecute(Thread t, Runnable r) { }
  • afterExecute: the method that is called after the task is completed, the 2 parameter, the first parameter represents the task, and the second parameter indicates the abnormal information when the task is executed. If there is no exception, the second parameter is null.
protected void afterExecute(Runnable r, Throwable t) { }
  • terminated: the method called after the thread pool is finally closed. All worker threads exit. Finally, the thread pool will exit. This method is called when exiting

Example code:

public class ExpandThreadPoolTest {
    
    static class Task implements Runnable {
        String name;
        public Task(String name) {
            this.name = name;
        }
        
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "handle" + this.name);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        @Override
        public String toString() {
            return "Task{" +
                    "name='" + name + '\'' +
                    '}';
        }
    }
    public static void main(String[] args) throws InterruptedException {
        
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10,
                10,
                60L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1),
                Executors.defaultThreadFactory(),
                (r, executors) -> {
                    //Custom saturation policy
                    //Keep track of tasks you can't handle
                    System.out.println("Tasks that cannot be processed:" + r.toString());
                }) {
            
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println(System.currentTimeMillis() + "," + t.getName() + ",Start task:" + r.toString());
            }
            
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",task:" + r.toString() + ",completion of enforcement!");
            }
            
            @Override
            protected void terminated() {
                System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",Close thread pool!");
            }
            
        };
        
        for (int i = 0; i < 10; i++) {
            executor.execute(new Task("task-" + i));
        }
        
        TimeUnit.SECONDS.sleep(1);
        executor.shutdown();
    }
}

Run the above code and output the result:

1637895916578,pool-1-thread-1,Start task:Task{name='task-0'}
pool-1-thread-1 Processing tasks-0
1637895916578,pool-1-thread-2,Start task:Task{name='task-1'}
pool-1-thread-2 Processing tasks-1
1637895916578,pool-1-thread-3,Start task:Task{name='task-2'}
1637895916578,pool-1-thread-4,Start task:Task{name='task-3'}
pool-1-thread-4 Processing tasks-3
pool-1-thread-3 Processing tasks-2
1637895916579,pool-1-thread-5,Start task:Task{name='task-4'}
pool-1-thread-5 Processing tasks-4
1637895916579,pool-1-thread-6,Start task:Task{name='task-5'}
pool-1-thread-6 Processing tasks-5
1637895916579,pool-1-thread-7,Start task:Task{name='task-6'}
pool-1-thread-7 Processing tasks-6
1637895916579,pool-1-thread-8,Start task:Task{name='task-7'}
pool-1-thread-8 Processing tasks-7
1637895916580,pool-1-thread-9,Start task:Task{name='task-8'}
1637895916580,pool-1-thread-10,Start task:Task{name='task-9'}
pool-1-thread-9 Processing tasks-8
pool-1-thread-10 Processing tasks-9
1637895918600,pool-1-thread-4,task:Task{name='task-3'},completion of enforcement!
1637895918601,pool-1-thread-3,task:Task{name='task-2'},completion of enforcement!
1637895918602,pool-1-thread-5,task:Task{name='task-4'},completion of enforcement!
1637895918602,pool-1-thread-6,task:Task{name='task-5'},completion of enforcement!
1637895918603,pool-1-thread-7,task:Task{name='task-6'},completion of enforcement!
1637895918604,pool-1-thread-1,task:Task{name='task-0'},completion of enforcement!
1637895918604,pool-1-thread-2,task:Task{name='task-1'},completion of enforcement!
1637895918604,pool-1-thread-8,task:Task{name='task-7'},completion of enforcement!
1637895918604,pool-1-thread-10,task:Task{name='task-9'},completion of enforcement!
1637895918604,pool-1-thread-9,task:Task{name='task-8'},completion of enforcement!
1637895918604,pool-1-thread-9,Close thread pool!

It can be seen from the output results that three lines of logs are printed for each task to be executed. Before execution, it is printed by beforeExecute of the thread pool. The run method of the task will be called during execution. After the task is executed, the afterExecute method of the thread pool will be called. It can be seen from the first and last two logs of each task that each task takes about 2 seconds. After the thread pool finally closes, the terminated method is invoked.

12. Reasonably configure thread pool

To reasonably configure the thread pool, you need to analyze the characteristics of the task. You can analyze it from the following perspectives:

  • Nature of tasks: CPU intensive tasks, IO intensive tasks and hybrid tasks
  • Task priority: high, medium and low
  • Task execution time: long, medium and short
  • Task dependency: whether to rely on other system resources, such as database connection.

Tasks with different properties can be processed separately with thread pools of different sizes.

  • CPU intensive tasks should have as few threads as possible, such as configuring a thread pool with a number of CPUs + 1 thread.
  • Since IO intensive tasks are not always executing tasks and the CPU cannot be idle, configure as many threads as possible, such as CPU number * 2.
  • If a hybrid task can be split into a CPU intensive task and an IO intensive task, as long as the execution time difference between the two tasks is not too large, the execution throughput after decomposition will be higher than that of serial execution.
  • The number of CPU s can be obtained through the Runtime.getRuntime().availableProcessors() method.
  • Tasks with different priorities can be processed by priority queue on the thread pool, so that those with higher priority can execute first.
  • When using queues, it is recommended to use bounded queues. Bounded queues increase the stability of the system. If unbounded queues are used, too many tasks may lead to system OOM and direct system downtime.

13. Configuration of the number of threads in the thread pool

Thread pool aggregation thread size has a certain impact on the system performance. Our goal is to hope that the system can give full play to the best performance. Too many or too small threads can not use the machine performance with messages. The Java Concurrency inPractice book gives the formula for estimating the size of the thread pool:

Ncpu = CUP Number of
Ucpu = target CPU Usage, 0<=Ucpu<=1
W/C = Ratio of waiting time to calculation time
 In order to save the processor to achieve the desired utilization, the size of the most thread pool is equal to:
Nthreads = Ncpu × Ucpu × (1+W/C)

14. Some usage suggestions

In Alibaba JAVA development manual, it is pointed out that thread resources must be provided through the thread pool, and it is not allowed to create threads displayed in the application. On the one hand, the creation of threads is more standardized, and the number of threads can be reasonably controlled; On the other hand, the detail management of threads is handed over to the thread pool, which optimizes the cost of resources.

Thread pools are not allowed to be created by Executors, but by ThreadPoolExecutor. On the one hand, although the Executor framework in JDK provides methods to create thread pools, such as newFixedThreadPool(), newsinglethreadexecution(), newCachedThreadPool(), they all have their limitations and are not flexible enough; In addition, the previous methods are implemented internally through ThreadPoolExecutor. Using ThreadPoolExecutor can help you clarify the running rules of thread pool, create thread pool that meets the needs of your own business scenario, and avoid the risk of resource depletion.

15. ThreadPoolTaskExecutor other knowledge points summary

  1. Will all threads in the thread pool be destroyed after idle time?

    If allowCoreThreadTimeOut is true, all threads that exceed the idle time will be recycled, but this value is false by default. The system will retain the core threads and others will be recycled.

  2. How are idle threads destroyed?

    All running worker threads will try to get the task from the queue to execute. If they have not got the task for a certain time (keepAliveTime), they will quit voluntarily.

  3. Will the core thread initialize when the thread pool is created?

    By default, the core thread will not be initialized. When calling the thread pool to execute tasks at the beginning, a task will be passed in to create a thread until the number of core threads is reached. However, after creating the thread pool, call its prestartAllCoreThreads to create the core thread ahead of time.

Article reference: http://www.itsoku.com/ Bloggers think the content of this article is very good. If you are interested, you can learn about it.

Keywords: Java Back-end Multithreading JUC

Added by QuietWhistler on Sat, 27 Nov 2021 03:57:23 +0200