Thread pool for concurrent programming

Thread usage problems

  • Frequent creation and destruction of threads
  • Too many threads will cause the overhead of CPU resources.
  • Context switching (consuming CPU resources).

Thread reuse

Connection pool, object pool, memory pool, thread pool.

  • The core of pooling Technology: reuse

Thread pool

  • Create a series of threads in advance and save them in this thread pool.
  • When a task needs to be executed, take out the thread from the thread pool for execution.
  • When there are no tasks, the thread pool is put back.

The method of creating threads in the executors class (this method is not recommended. It is best to use ThreadPoolExecutor to avoid resource waste)

  • newFixedThreadPool has a fixed number of threads, and the task queue adopts LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • Newsinglethreadexecution has only one thread pool, and the task queue adopts LinkedBlockingQueue
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  • newCachedThreadPool can cache the thread pool - > theoretically, the thread pool can create multiple threads according to the number of requests
    Fewer threads to handle.
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • newScheduledThreadPool / / provides the thread pool executed according to the cycle. - > Timer
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

Using a small Demo

public class ThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executorService= Executors.newFixedThreadPool(4);
        for(int i=0;i<10;i++){
            //Give a task that implements the Runnable interface to the thread pool for execution
            executorService.execute(new Task());
        }
        executorService.shutdown();
    }

    static class Task implements Runnable{

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"-Start task");
            try {
                Thread.sleep(new Random().nextInt(1000));
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}

Two ways to perform tasks

Both execute() and submit() are used to execute thread pool tasks. The main difference between them is that the submit() method can receive the return value executed by the thread pool, while execute() cannot receive the return value.
submit is related to Callable and Future

ThreadPoolExecutor

Conjecture parameters

  • Process deduction of the implementation principle of thread pool
    • Requirements: realize the reuse of threads
    • Decomposition:
      How to reuse threads
      The only way to reuse threads is to keep threads from ending.
    • So how do I get threads to perform new tasks? In other words, how can he perform the task?
      The thread passes parameters through [shared memory], such as putting tasks in a collection, and the thread fetches them by itself
    • Is it reasonable that the thread is always running?
      • When a task comes, execute it
      • When there is no task, blocking

Conclusion: the reuse of threads in the thread pool is realized by blocking the queue.

With these conjectures, you can write a simple version of the code of threads in the thread pool!

public class ThreadExample implements Runnable {


    private static BlockingDeque<Object> tasks=new LinkedBlockingDeque<>();

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()){
            Object take = tasks.take();//Blocking method. An Object is returned by a task. No task is blocked

            ((Runnable)take).run();//This place should call the run method, which itself is the thread executing
        }
    }

    public static void main(String[] args) {
        tasks.add(new Task());
    }

    static class Task implements Runnable{

        @Override
        public void run() {
            System.out.println("work");
        }
    }
}

Conjecture summary diagram

Source code verification

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;//Number of core threads
        this.maximumPoolSize = maximumPoolSize;//Maximum number of threads. Let them end when they are not used
        this.workQueue = workQueue;//Blocking queue (task queue)
        this.keepAliveTime = unit.toNanos(keepAliveTime);//time
       this.threadFactory = threadFactory;//Thread factory, which is used to create working threads, can be implemented by default without passing this parameter
       //If you want to customize the thread name, you need to wear this parameter
        this.handler = handler;//The rejection policy can be implemented by default without passing this parameter
    }

execute method

  • Initialize the core thread first.
  • Call the method of blocking the queue and save the task. (offer() -> true/false)
    • If true, it indicates that the current request volume is not large, and the core thread can handle it.
    • false, add worker thread (non core thread)
      • If the addition fails, it indicates that the current number of working threads has reached the maximum number of threads. Directly call the reject policy.
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
      
        int c = ctl.get();
        //Judge whether the current number of worker threads is less than the number of core threads (delayed initialization)
        //Note that this ctl is of type AtomicInteger
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))//Execute command while adding the worker thread
                return;
            c = ctl.get();
        }
        //workQueue. Add offer to blocking queue
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // If the blocking queue is full, add a worker thread (expanded thread)
        else if (!addWorker(command, false))
            reject(command);//Reject policy
    }

How to close a thread pool

Close thread pool
There are two methods to shut down the thread pool: shutdown or shutdown now. The principle is to traverse the working threads in the thread pool, and then call the interrupt method of the threads one by one to interrupt the threads, so the tasks that cannot respond to the interrupt may never be terminated.

difference

  • Shutdown now first sets the state of the thread pool to STOP, then attempts to STOP all threads executing or pausing tasks, and returns the list of tasks waiting to be executed

  • Shutdown simply sets the state of the thread pool to shutdown, and then interrupts all threads that are not executing tasks.

  • Call any method, and the isShutdown method will return true. After all tasks are closed, it means that the thread pool is closed successfully. At this time, call isTerminaed to return true.

  • Usually, the shutdown method is called to close the thread pool. If the task does not have to be completed, the shutdown now method can be called.

Number of threads (eternal problem)

IO intensive CPU 2core+1 (low CPU utilization)
CPU intensive CPU +1 (high CPU utilization will increase context switching)

Keywords: Java Multithreading queue Concurrent Programming

Added by toasty2 on Fri, 14 Jan 2022 20:52:12 +0200