How to use thread pool correctly

 

 

1. jdk creates thread pool

Executors Tool Class itself provides us with several ways to create threads

 

          2.Executors.newCachedThreadPool();

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

The number of core threads is 0,

The maximum number of threads is MAX_VALUE = 0x7fffffff.

60L: Represents the longest time a thread can survive when the current number of threads in the thread pool is larger than the number of core threads, when no new tasks come in from the queue

TimeUnit.SECONDS: Time in seconds, or 60 seconds

New SynchronousQueue <Runnable>(): Task bounded queue, suitable for scenarios with fewer elements, suitable for data exchange

 

 

         3.Executors.newFixedThreadPool(nThreads);

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

The number of threads in the custom thread pool, the number of core threads and the maximum number of threads are the same.

The number of core threads is created when the thread pool is initialized, so there is no need to create new threads because there are too many tasks, nor to destroy redundant threads, so the waiting time is set to 0L.

New LinkedBlockingQueue <Runnable>(): Bounded blocking queues, underlying linked list implementation, adding and deleting elements using not the same lock.

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    /**
     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if {@code capacity} is not greater
     *         than zero
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

But in the jdk source code, when we do not pass in capacity, the default queue size is MAX_VALUE = 0x7fffffff, which is equivalent to an unbounded queue.

 

         4.Executors.newScheduledThreadPool(corePoolSize);

 public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

Called is the method of the parent class, continue to look at the concrete implementation of the parent class.

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

Continue to catch up with this method.

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

In fact, this method is to make a judgment on the parameters, and then assign them to the member variables. What we want to see is the previous block of code, what is the value passed in?

The maximum number of threads is MAX_VALUE = 0x7fffffff.

The idle waiting time of redundant threads is 0.

new Delayed Queue () Delayed queue, unbounded queue,

 

     5.Executors.newSingleThreadExecutor(threadFactory);

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

The number of core threads and the maximum number of threads in this thread pool are 1:

 

 

2. A summary of jdk's own thread pool creation

 

Thread pool Number of core threads Maximum number of threads Task queue
newCachedThreadPool() 0 Intger.MAX_VALUE SynchronousQueue < Runnable >() (bounded)
newFixedThreadPool(nThreads) User-defined User-defined Linked BlockingQueue < Runnable >() (unbounded)
newScheduledThreadPool(corePoolSize) User-defined Intger.MAX_VALUE DelayedQueue() (bounded)
newSingleThreadExecutor(threadFactory) 1 1 Linked BlockingQueue < Runnable >() (unbounded)

 

 

 

 

 

 

1. Defects

What are the drawbacks of the thread pools created above?

First of all, the maximum number of threads in the thread pool must not be MAX_VALUE, because our cpu is limited, so when the number of threads reaches the upper limit, continue to create threads, which will inevitably affect the server.

Then, there are several uses of unbounded queues, unbounded queues, which means that when we have enough tasks, we will always store them in this queue. When the storage limit exceeds the memory capacity, it will inevitably cause memory overflow.

These risks cannot be ignored in production, so we need to define thread pools ourselves.

 

 

3. Custom thread pool

Post code first

public static void main(String[] args) {
		ThreadPoolExecutor threadPool=new ThreadPoolExecutor(
				//Number of core threads
				10, 
				//Maximum number of threads
				Runtime.getRuntime().availableProcessors()*2, 
				//The longest waiting time for idle threads beyond the number of core threads
				60L, 
				//Unit seconds
				TimeUnit.SECONDS, 
				//Bounded blocking queue, read and write with the same lock
				new ArrayBlockingQueue<Runnable>(1000), 
				//Customize the factory for creating threads
				new ThreadFactory() {
					@Override
					public Thread newThread(Runnable r) {
						Thread thread=new Thread(r);
						thread.setName("students");
						if(thread.isDaemon()) {
							thread.setDaemon(false);
						}
						if(Thread.NORM_PRIORITY!=thread.getPriority()) {
							thread.setPriority(Thread.NORM_PRIORITY);
						}
						return thread;
					}
				}, 
				//Custom Denial Policy
				new  RejectedExecutionHandler() {
					@Override
					public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
						System.out.println("Mission lost, print logs");
					}
				});
	}

 

1. Number of core threads

Here, the number of core threads is set to 10, which is generally determined according to the needs of their own business. This value should not be too large.

2. Maximum number of threads

     

Task type Maximum number of threads
Computer (cpu) intensive Runtime.getRuntime().availableProcessors()*2 or Runtime.getRuntime().availableProcessors()+1
io-intensive Runtime.getRuntime().availableProcessors()/(1-blocking coefficient) (blocking coefficient (0.8-0.9))

 

Runtime.getRuntime().availableProcessors(): Number of CPUs

Note: In a project, sometimes more than one thread pool is used. So the number of other thread pools should also be considered when setting this value.  

3. Task queue

Array BlockingQueue < Runnable > (1000) bounded queue to prevent memory overflow.

4. Thread Factory

Custom thread factories can set some properties of threads, such as name setting. When viewing thread information, they can distinguish which threads are used for what purpose.

Threads can be set to non-daemon threads to prevent threads from hanging in the background.

Restore priority.

5. Refusal strategy

When there are no idle threads in the thread pool, the maximum number of threads is the number of threads. Tasks are added to the queue continuously, so we will refuse to perform some tasks. For those tasks that refuse to perform, you can print out their logs or do some other processing.

 

IV. SUMMARY

1. In production, we must use custom thread pool.

2. Pay attention to the setting of maximum number of threads and task queue.

3. Have a good refusal strategy

Keywords: JDK

Added by beeman000 on Sat, 17 Aug 2019 15:35:35 +0300