JAVA Thread Pool Details
Description: Based on JDK 1.7.0.79
Why use thread pools instead of displaying create threads
- Reduce the time spent creating and destroying threads and the overhead of system resources to resolve resource shortages;
- The absence of a thread pool can cause the system to create a large number of similar threads, leading to memory depletion or "over-switching" problems.
How to better use thread pools
-
Thread pools are not allowed to be created using Executors, although they are simple to use, they have the following problems;
-
FixedThreadPool and SingleThreadPool:
Allowed request queue length is Integer.MAX_VALUE, which can cause a large number of requests to accumulate, resulting in OOM.The following:
FixedThreadPool public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } SingleThreadPool public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } LinkedBlockingQueue public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
Note: The code above shows that when setting up a blocking queue, it is a new LinkedBlockingQueue, and the default size of the queue is Integer.MAX_VALUE.
-
CachedThreadPool and Scheduled ThreadPool:
The number of creation threads allowed is Integer.MAX_VALUE, which may create a large number of threads, resulting in OOM.The following:
CachedThreadPool public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } ScheduledThreadPool public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); }
Note: The code above shows that the maximumPoolSize value is Integer.MAX_VALUE for ThreadPoolExecutor initialization.(
-
-
ThreadPoolExecutor allows developers to better understand the rules behind the thread pool and avoid the risk of resource exhaustion.
Create a thread pool based on ThreadPoolExecutor
ThreadPoolExecutor construction
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
Parameter Description
-
corePoolSize Core Thread Size;
- When the thread pool is initially created, it defaults to 0. As new tasks continue to add new threads, new threads will be created even if there are idle threads. Threads will not be recycled once they are created as long as they are not larger than corePoolSize.
- If the allowCoreThreadTimeOut parameter is set to true, it means that if there is an idle thread in corePoolSize and the time exceeds the keepAliveTime setting, the thread will be closed.By default, this parameter is true, meaning that it will not be recycled as long as it does not exceed the size set by corePoolSize.
-
maximumPoolSize The maximum number of threads allowed in this thread pool. If the current task is larger than the size of the blocked queue, it will be created for use and recycled when idle.
-
keepAliveTime sets the wait time for an idle thread, which is recycled when it exceeds (provided it is not a core thread), defaulting to 60 seconds;
-
Unit time unit, used with keepAliveTime;
-
A workQueue blocked and thread-safe queue that places extra tasks in the blocked queue when the current core thread is full; when there are idle threads in the thread pool, it goes back to the task queue to pick up the tasks and process them.
-
The threadFactory creates new threads in a way that sets the name of the thread and suggests specifying a meaningful name to help troubleshoot errors.
-
The rejection policy used by the handler thread pool when it reaches the maximum number of threads, the default AbortPolicy, and others: DiscardPolicy, CallerRunsPolicy, DiscardOldestPolicy
Decision on the number of threads
Definition: poolSize is the number of threads currently created
-
When the created thread poolSize < corePoolSize, a new thread will be created to process each request, even if there are idle threads, as long as the corePoolSize has not been reached, a new thread will be created to process until the keepAliveTime idle time is reached unless allowCoreThreadTimeOut=true is setWill be recycled;
-
When a thread poolSize > corePoolSize is created, when a new request exists, the new request is added to the BlockingQueue;
-
When BlockingQueue is full, it continues to receive new requests, recreates threads, and executes tasks (be careful not to set the blocking queue too large, otherwise maximumPoolSize will be meaningless, which is why you don't use the Executors tool class)
-
Execute RejectedExecutionHandler rejection policy logic when poolSize > maximumPoolSize
Blocking queue details
BlockingQueue
- Data Storage
- offer: Add element to BlockingQueue, true if BlockingQueue is successfully placed, false otherwise.Of course, we can also wait a while with its overload method, or fail if it still can't be put in.(Note: This method does not block the thread currently executing the method)
- put:Add element to BlockingQueue. If BlockQueue has insufficient space, the thread calling this method will be blocked until there is space in BlockingQueue.
- Data Acquisition
- poll: Remove the first object in BlockingQueue; if not immediately, wait for a certain amount of time; if not, return null.The overload method specified in the time unit may indicate that the time exceeded and the return failed.
- take: Remove the first object in the BlockingQueue, and block if the BlockingQueue is empty until it wakes up when new data is available.
Related Implementation Queue
-
ArrayBlockingQueue is based on the bounded blocking queue of array structure, FIFO sorting principle.
-
LinkedBlockingQueue is based on bounded blocking queues of chain tables, one-way chain table structure, FIFO sorting principle; throughput is usually higher than ArrayBlockingQueue; producer and consumer are not blocked; queue size created by default construction is Integer.MAX_VALUE, so default construction is not recommended.
-
SynchronousQueue is a blocked queue with no queue capacity, a put must wait for a take, and vice versa; throughput is usually higher than LinkedBlockingQueue.
-
PriorityBlockingQueue is an unbounded blocking queue with priority. Although the queue is logically unbound, the attempt to add may fail due to exhaustion of resources, resulting in OOM phenomena.
-
DelayQueue is an unbounded blocking queue from which elements can only be extracted when the delay expires.
ThreadFactory Details
- This parameter is used to set up the factory where threads are created, and it allows you to specify a meaningful thread name when creating threads through the factory class, which helps you locate and troubleshoot problems.
Create a default thread factory using Executors.defaultThreadFactory(), and you can see from the code below that when we wrap a Runnable with new Thread, the name of the thread is set to a self-added name with the namePrefix prefix.Normally in business, we need to set a more meaningful name, so we don't create it by default.
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
Rejection Policy Details
RejectedExecutionHandler: This handler is triggered when both the queue and the thread pool are full, indicating that the thread pool is saturated and cannot process new tasks anymore, requiring the user to adopt a strategy for handling new submitted tasks.The default policy is AbortPolicy, which means throwing exceptions to handle new tasks.
Here are five strategies provided by JDK1.5:
- AbortPolicy: throw an exception directly;
- CallerRunsPolicy: Run the task only with the same thread as the caller;
- DiscardOldestPolicy: Discard old tasks and perform current tasks;
- DiscardPolicy: Do not execute, discard tasks;
- Customize the policy to implement the rejectedExecution method of the RejectedExecutionHandler interface.
case
Case 1 incorrect usage, no queue size specified, default size
new ThreadPoolExecutor(5,10,0,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())
Note: The above is for maximum concurrency of 10, but it is difficult to trigger.There's a mistake here, once you set the new LinkedBlockingQueue(){}, you default to Integer.MAX_VALUE without setting the size, which makes the queue hard to fill up and may eventually explode memory, so if you want to use the size you specify as much as possible.
Case 2 specifies the size of the blocking queue and sets the name of each thread for easy control and troubleshooting
ThreadFactory threadFactory = new NamedThreadFactory("BizProcessor-"+bizName); ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(5,10,0,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(10),threadFactory); //Or use a SynchronousQueue queue ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(5,10,0,TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),threadFactory);
Description: The size of the LinkedBlockingQueue queue can be specified, or the SynchronousQueue queue can be used to keep the queue size controllable. Specifying the created thread has some business implications to facilitate troubleshooting
Case 3 rewrites the CallerRunsPolicy rejection policy to print JVM stack information when the thread pool is full for post-mortem analysis
private final RejectedExecutionHandler handler = new JvmCallerRunsPolicy(); /** * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method, * unless the executor has been shut down, in which case the task * is discarded. */ public static class JvmCallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { jstack(); throw new RejectedExecutionException(); } /** * Get stack information, triggered when an exception occurs * @throws Exception */ public void jstack() throws Exception { String logPath = "/home/admin/app/logs"; File file = new File(logPath); if (!file.exists()) { file.mkdirs(); } FileOutputStream jstackStream = null; try { jstackStream = new FileOutputStream(new File(logPath, "app_jstack.log")); //Get all stack information Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces(); Iterator<Map.Entry<Thread, StackTraceElement[]>> ite = map.entrySet().iterator(); while (ite.hasNext()) { Map.Entry<Thread, StackTraceElement[]> entry = ite.next(); StackTraceElement[] elements = entry.getValue(); if (elements != null && elements.length > 0) { String threadName = entry.getKey().getName(); //Record Thread Name jstackStream.write(("Thread Name :[" + threadName + "]\n").getBytes()); for (StackTraceElement el : elements) { String stack = el.toString() + "\n"; jstackStream.write(stack.getBytes()); } jstackStream.write("\n".getBytes()); } } } catch (Exception e) { //Just prompt for errors, don't throw exceptions log.error("JVM Collect ERROR:"+e); } finally { if (jstackStream != null) { try { jstackStream.close(); } catch (IOException e) { } } } } }
Usage
ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(5,10,0,TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),threadFactory, handler);