2, Thread pool of JUC


1. Why thread pool

Advantages of thread pool:

The work of the thread pool is mainly to control the number of running threads, put tasks in the queue during processing, and then start these tasks after the threads are created. If the number of threads exceeds the maximum number, the threads exceeding the number will queue up and wait until other threads finish executing, and then take tasks out of the queue for execution.

Its main features are: thread reuse; Control the maximum concurrent number; Manage threads.

First: reduce resource consumption. Reduce the consumption caused by thread creation and destruction by reusing the created threads.

Second: improve the response speed. When the task arrives, the task can be executed immediately without waiting for the thread to be created.

Third: improve the manageability of threads. Threads are scarce resources. If they are created without restrictions, they will not only consume system resources, but also reduce the stability of the system. Using thread pool can be used for unified allocation, tuning and monitoring

2. Use of thread pool

1,Executors.newFixedThreadPool(int)

The values of corePoolSize and maximumPoolSize of the thread pool created by newFixedThreadPool are equal. It uses LinkedBlockingQueue to perform long-term tasks with good performance. Create a thread pool with N fixed threads and a fixed number of threads

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

2,Executors.newSingleThreadExecutor()

The values of corePoolSize and maximumPoolSize of the thread pool created by newsinglethreadexecution are 1. It uses LinkedBlockingQueue to execute one task, one pool and one thread

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

3,Executors.newCachedThreadPool()

The thread pool created by newCachedThreadPool sets corePoolSize to 0 and maximumPoolSize to Integer.MAX_VALUE, which uses synchronous queue, that is, when a task comes, it creates a thread to run. When the thread is idle for more than 60 seconds, it destroys the thread.

Many short-term asynchronous tasks are performed, and the thread pool creates new threads as needed, but reuses previously built threads when they are available. Expandable, strong in case of strength

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Thread pool
 * Arrays
 * Collections
 * Executors
 */
public class MyThreadPoolDemo {

    public static void main(String[] args) {
        //List list = new ArrayList();
        //List list = Arrays.asList("a","b");
        //A fixed number of thread pools, one pool with five threads

//       ExecutorService threadPool =  Executors.newFixedThreadPool(5); // One bank outlet and five business acceptance windows
//       ExecutorService threadPool =  Executors.newSingleThreadExecutor(); // One bank outlet and one business acceptance window
       ExecutorService threadPool =  Executors.newCachedThreadPool(); //A bank outlet can expand the window of accepting business

        //10 customer requests
        try {
            for (int i = 1; i <=10; i++) {
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"\t Handle the business");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }

    }
}

3. Underlying principle of ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  1. corePoolSize: the number of resident core threads in the thread pool

  2. maximumPoolSize: the maximum number of threads that can be executed simultaneously in the thread pool. This value must be greater than or equal to 1

  3. keepAliveTime: the survival time of redundant idle threads. When the number of threads in the current pool exceeds corePoolSize, when the idle time reaches keepAliveTime, the redundant threads will be destroyed until only corePoolSize threads are left

  4. Unit: the unit of keepAliveTime

  5. workQueue: task queue, a task submitted but not yet executed

  6. threadFactory: refers to the thread factory that generates the working threads in the thread pool. It is used to create threads. It is generally the default

  7. handler: reject policy, which indicates how to reject the runnable request execution when the queue is full and the worker thread is greater than or equal to the maximum poolsize of the thread pool

1,After creating the thread pool, start waiting for requests.
2,When called execute()Method, the thread pool will make the following judgment:
  2.1 If the number of running threads is less than corePoolSize,Then create a thread to run the task immediately;
  2.2 If the number of running threads is greater than or equal to corePoolSize,Then put the task in the queue;
  2.3 If the queue is full and the number of running threads is less than maximumPoolSize,Then you still need to create a non core thread to run the task immediately;
  2.4 If the queue is full and the number of running threads is greater than or equal to maximumPoolSize,Then the thread pool will start the saturation rejection policy to execute.
3,When a thread completes a task, it will remove a task from the queue for execution.
4,When a thread has nothing to do for more than a certain time( keepAliveTime)When, the thread will judge:
    If the number of threads currently running is greater than corePoolSize,Then the thread is stopped.
    So when all the tasks of the thread pool are completed, it will eventually shrink to corePoolSize The size of the.

4. Rejection strategy? If reasonable parameters are set in production

1. Reject policy for thread pool

The waiting queue is full and no new tasks can be filled. At the same time, the max thread in the thread pool has reached and cannot continue to serve new tasks. At this time, we need the rejection policy mechanism to deal with this problem reasonably.

2. JDK built-in rejection policy

Abortpolicy (default): throw the RejectedExecutionException exception directly to prevent the system from running normally

CallerRunsPolicy: "caller run" is a kind of adjustment mechanism, which neither discards tasks nor throws exceptions, but backs some tasks back to the caller, so as to reduce the traffic of new tasks.

DiscardOldestPolicy: discards the longest waiting task in the queue, then adds the current task to the queue and tries to submit the current task again.

DiscardPolicy: this policy silently discards the tasks that cannot be processed without any processing or throwing exceptions. This is the best policy if tasks are allowed to be lost.

The above built-in rejection policies implement the RejectedExecutionHandle interface

5. Which of the three methods of creating thread pools, single / fixed / variable, is more useful?

The answer is no, we can only use custom ones in our work

6. Custom thread pool

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

/**
 * Thread pool
 * Arrays
 * Collections
 * Executors
 */
public class MyThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                2L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(3),
                Executors.defaultThreadFactory(),
                //new ThreadPoolExecutor.AbortPolicy()
                //new ThreadPoolExecutor.CallerRunsPolicy()
                //new ThreadPoolExecutor.DiscardOldestPolicy()
                new ThreadPoolExecutor.DiscardOldestPolicy()
        );
        //10 customer requests
        try {
            for (int i = 1; i <= 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t Handle the business");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }

    }

    private static void threadPool() {
        //List list = new ArrayList();
        //List list = Arrays.asList("a","b");
        //A fixed number of thread pools, one pool with five threads

//       Executorservice ThreadPool = executors.newfixedthreadpool (5); / / one bank outlet and five business acceptance windows
//       Executorservice ThreadPool = executors. Newsinglethreadexecutor(); / / one bank outlet, one business acceptance window
        ExecutorService threadPool = Executors.newCachedThreadPool(); //A bank outlet can expand the window of accepting business

        //10 customer requests
        try {
            for (int i = 1; i <= 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t Handle the business");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}

7. BlockingQueue blocking queue

1. Stack and queue

Stack: first in first out, last in first out

Queue: first in first out

2. Blocking queue

Blocking: must block / have to block

Thread 1 adds elements to the blocking queue, and thread 2 removes elements from the blocking queue

When the queue is empty, the operation of getting elements from the queue will be blocked

When the queue is full, adding elements from the queue will be blocked

Threads trying to get elements from an empty queue will be blocked until other threads insert new elements into the empty queue

Threads trying to add new elements to a full queue will be blocked until other threads remove one or more elements from the queue or completely empty, making the queue idle and adding new elements later

3. Species analysis

ArrayBlockingQueue: a bounded blocking queue composed of an array structure.

LinkedBlockingQueue: a bounded (but the default size is integer.MAX_VALUE) blocking queue composed of a linked list structure.

PriorityBlockingQueue: an unbounded blocking queue that supports prioritization.

DelayQueue: delay unbounded blocking queue implemented using priority queue.

Synchronous queue: a blocking queue that does not store elements, that is, a queue of individual elements.

LinkedTransferQueue: an unbounded blocking queue composed of linked lists.

LinkedBlockingDeque: a bidirectional blocking queue composed of linked lists.

4. BlockingQueue core method


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Blocking queue
 */
public class BlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {

//        List list = new ArrayList();

        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
        //first group
//        System.out.println(blockingQueue.add("a"));
//        System.out.println(blockingQueue.add("b"));
//        System.out.println(blockingQueue.add("c"));
//        System.out.println(blockingQueue.element());

        //System.out.println(blockingQueue.add("x"));
//        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());
//    Group 2
//        System.out.println(blockingQueue.offer("a"));
//        System.out.println(blockingQueue.offer("b"));
//        System.out.println(blockingQueue.offer("c"));
//        System.out.println(blockingQueue.offer("x"));
//        System.out.println(blockingQueue.poll());
//        System.out.println(blockingQueue.poll());
//        System.out.println(blockingQueue.poll());
//        System.out.println(blockingQueue.poll());
//    Group 3        
//         blockingQueue.put("a");
//         blockingQueue.put("b");
//         blockingQueue.put("c");
//         //blockingQueue.put("x");
//        System.out.println(blockingQueue.take());
//        System.out.println(blockingQueue.take());
//        System.out.println(blockingQueue.take());
//        System.out.println(blockingQueue.take());
        
//    Group 4        
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
        System.out.println(blockingQueue.offer("a",3L, TimeUnit.SECONDS));

    }
}

Keywords: Java Back-end

Added by gc40 on Fri, 12 Nov 2021 08:27:43 +0200