ThreadFactory
Where do the threads in the thread pool come from? It's threadvector
public interface ThreadFactory { Thread newThread(Runnable r); }
There is an interface in Threadfactory. This method will be called when a thread needs to be created in the thread pool. You can also customize the thread factory
public class ThreadfactoryText { public static void main(String[] args) { Runnable runnable=new Runnable() { @Override public void run() { int num=new Random().nextInt(10); System.out.println(Thread.currentThread().getId()+"--"+System.currentTimeMillis()+"--sleep"+num); try { TimeUnit.SECONDS.sleep(num); } catch (InterruptedException e) { e.printStackTrace(); } } }; //Create a thread pool, use a custom thread factory, and adopt the default rejection policy ExecutorService executorService=new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t=new Thread(r); t.setDaemon(true);//Set as the guard thread. When the main thread finishes running, the threads in the thread pool will also be released System.out.println("Thread created"+t); return t; } }); //Submit five tasks for (int i = 0; i < 5; i++) { executorService.submit(runnable); } } }
When a thread submits more than five tasks, the thread pool will throw an exception by default
Monitor thread pool
ThreadPoolExcutor provides a set of methods for monitoring thread pools
int getActiveCount()//Get thread pool only the current number of get threads long getCompletedTaskCount()//Returns the number of tasks completed by the thread pool int getCorePoolSize()//Number of core tasks in thread pool int getLargestPoolSize() //Returns the maximum number of threads that have ever been reached in the thread pool int getMaximumPoolSize()//Returns the maximum capacity of the thread pool int getPoolSize()//Return thread size BlockingQueue<Runnable> getQueue()//Return blocking queue long getTaskCount()//Returns the total number of tasks received by the thread pool
public class Text { public static void main(String[] args) throws InterruptedException { Runnable runnable = new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getId() + "The thread starts executing--" + System.currentTimeMillis()); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }; //The default thread factory is used to create a thread pool, and the bounded queue adopts the DiscardPolicy policy policy ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy()); //Submit five tasks for (int i = 0; i < 30; i++) { executorService.submit(runnable); System.out.println("Current number of core threads"+executorService.getCorePoolSize()+",Maximum threads:"+executorService.getMaximumPoolSize()+",Current thread pool size:"+executorService.getPoolSize()+"Number of active threads:"+executorService.getActiveCount()+",Received task:"+executorService.getTaskCount()+"Number of tasks completed:"+executorService.getCompletedTaskCount()+"Number of waiting tasks:"+executorService.getQueue().size()); TimeUnit.MILLISECONDS.sleep(500); } System.out.println("-------------------"); while (executorService.getActiveCount()>=0)//Continue to detect the thread pool { System.out.println("Current number of core threads"+executorService.getCorePoolSize()+",Maximum threads:"+executorService.getMaximumPoolSize()+",Current thread pool size:"+executorService.getPoolSize()+"Number of active threads:"+executorService.getActiveCount()+",Received task:"+executorService.getTaskCount()+"Number of tasks completed:"+executorService.getCompletedTaskCount()+"Number of waiting tasks:"+executorService.getQueue().size()); Thread.sleep(1000);//Test every 1 second } } }
When the thread pool size reaches the number of core threads, threads will be placed in the waiting queue. When the thread pool waiting queue is full, a new thread will be started. When the current thread size reaches the maximum number of threads and the waiting queue is full, the DiscardPolicy policy will be executed to directly discard the unprocessable task. There are only 15 tasks left in the last 30 tasks.
Schematic diagram:
Extended thread pool
Sometimes you need to extend the thread pool, such as monitoring the start and end times of each task, or customize other enhancements.
ThreadPoolExecutor thread pool provides two methods:
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { }
The thread pool will execute the beforeExecute() method before executing a task, and call the afterExecute() method after execution
Check the source code of ThreadPoolExecutor. An internal class Worker is defined in this class. The working thread of ThreadPoolExecutor thread pool is the instance of Worker class. The Worker instance will call beforeExecute and afterExecute methods during execution.
public void run() { runWorker(this); } final void runWorker(Worker w) { try { beforeExecute(wt, task); try { task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; w.unlock(); } } }
Part of the code has been omitted. The thread will call beforeExecute before execution and afterExecute after execution.
Extended thread pool example
package com; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Text07 { public static void main(String[] args) { //Define the extended thread pool. Define the thread pool class, inherit the ThreadPoolExecutor, and then override other methods ExecutorService threadPoolExecutor= new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new LinkedBlockingDeque<>()){ //Override the start method in the inner class @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println(t.getId()+"The thread is ready to execute the task"+((Mytask)r).name); } //Override the end method in the inner class @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println(((Mytask)r).name+"Execution complete"); } //Thread pool exit @Override protected void terminated() { System.out.println("Thread pool exit"); } }; for (int i = 0; i < 5; i++) { Mytask mytask=new Mytask("Thread"+i); threadPoolExecutor.execute(mytask); } } private static class Mytask implements Runnable { private String name; public Mytask(String name) { this.name=name; } @Override public void run() { System.out.println(name+"Being executed"+Thread.currentThread().getId()); try { Thread.sleep(1000);//Simulation task duration } catch (InterruptedException e) { e.printStackTrace(); } } } }
Optimize thread pool size
The size of the thread pool has a certain impact on the system performance. If it is too large or too small, it cannot give full play to the best performance of the system. It does not need to be very accurate. Just avoid the maximum or minimum. Generally speaking, the size of the thread pool should consider the number of CPU s
Thread pool size = number of CPUs * target CPU utilization * (1 + ratio of waiting time to calculation time)
Thread pool deadlock
If task A submits Task B during the execution of the thread pool, and Task B is added to the waiting queue in the thread pool, if the end of A requires the execution result of B, and thread B needs to wait for the execution of thread A, all other working threads may be in the waiting state until these tasks are executed in the blocking queue. If there is no thread in the thread pool that can process the blocking queue, it will wait all the time and become A deadlock.
It is suitable to submit independent tasks to the thread pool instead of interdependent tasks. For interdependent tasks, you can consider submitting them to different thread pools for processing.
Thread pool exception information capture
import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Text09 { public static void main(String[] args) { //Create thread pool ExecutorService executorService=new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new SynchronousQueue<>()); //Add two count tasks to the thread pool for (int i = 0; i <5 ; i++) { executorService.submit(new Text(10,i)); } } private static class Text implements Runnable { private int x; private int y; public Text(int x,int y) { this.x=x; this.y=y; } @Override public void run() { System.out.println(Thread.currentThread().getName()+"thread x/y The result is"+x+"/"+y+"="+(x/y)); } } }
You can see that there are only four results. Five tasks are actually submitted to the thread pool, but when i==0, an arithmetic exception occurs. The thread pool eats the exception, so we know nothing about the exception
terms of settlement:
1. Change submit to execute
2. Extend the thread pool and wrap the submit
package com; import java.util.concurrent.*; public class Text09 { public static void main(String[] args) { //Creating a thread pool uses a custom thread pool ExecutorService executorService=new TranceThreadPoorExcuter(5,5,0, TimeUnit.SECONDS,new SynchronousQueue<>()); //Add two count tasks to the thread pool for (int i = 0; i <5 ; i++) { executorService.submit(new Text(10,i)); } } public static class Text implements Runnable { public int x; public int y; public Text(int x,int y) { this.x=x; this.y=y; } @Override public void run() { System.out.println(Thread.currentThread().getName()+"thread x/y The result is"+x+"/"+y+"="+(x/y)); } } //The custom thread pool class extends trancethreadpoorexputer private static class TranceThreadPoorExcuter extends ThreadPoolExecutor { public TranceThreadPoorExcuter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } //Define a method to pass in two parameters. The first is the task to be accepted, and the second is Exception public Runnable warp(Runnable r,Exception e) { return new Runnable() { @Override public void run() { try { r.run(); } catch (Exception e1) { e.printStackTrace(); throw e1; } } }; } //Override the submit method @Override public Future<?> submit(Runnable task) { return super.submit(warp(task,new Exception("Customer tracking exception"))); } //You can also override the excute method } }
This method uses a custom thread pool and rewrites the submit method in the thread pool. In the submit method, the task parameters to be passed in can be used to capture thread pool exceptions with the function of capturing exception information.