Java uses thread factory to monitor thread pool

ThreadFactory

Where do the threads in the thread pool come from? It's threadvector

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

There is an interface in Threadfactory. This method will be called when a thread needs to be created in the thread pool. You can also customize the thread factory

public class ThreadfactoryText {
    public static void main(String[] args) {
        Runnable runnable=new Runnable() {
            @Override
            public void run() {
                int num=new Random().nextInt(10);
                System.out.println(Thread.currentThread().getId()+"--"+System.currentTimeMillis()+"--sleep"+num);
                try {
                    TimeUnit.SECONDS.sleep(num);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //Create a thread pool, use a custom thread factory, and adopt the default rejection policy
        ExecutorService executorService=new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t=new Thread(r);
                t.setDaemon(true);//Set as the guard thread. When the main thread finishes running, the threads in the thread pool will also be released
                System.out.println("Thread created"+t);
                return t;
            }
        });
        //Submit five tasks
        for (int i = 0; i < 5; i++) {
            executorService.submit(runnable);
        }
    }
}

When a thread submits more than five tasks, the thread pool will throw an exception by default

Monitor thread pool

ThreadPoolExcutor provides a set of methods for monitoring thread pools

int getActiveCount()//Get thread pool only the current number of get threads
long getCompletedTaskCount()//Returns the number of tasks completed by the thread pool
int getCorePoolSize()//Number of core tasks in thread pool
int getLargestPoolSize() //Returns the maximum number of threads that have ever been reached in the thread pool
int getMaximumPoolSize()//Returns the maximum capacity of the thread pool
int getPoolSize()//Return thread size
BlockingQueue<Runnable> getQueue()//Return blocking queue
long getTaskCount()//Returns the total number of tasks received by the thread pool
public class Text {
    public static void main(String[] args) throws InterruptedException {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getId() + "The thread starts executing--" + System.currentTimeMillis());
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //The default thread factory is used to create a thread pool, and the bounded queue adopts the DiscardPolicy policy policy
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());
        //Submit five tasks
        for (int i = 0; i < 30; i++) {
            executorService.submit(runnable);
            System.out.println("Current number of core threads"+executorService.getCorePoolSize()+",Maximum threads:"+executorService.getMaximumPoolSize()+",Current thread pool size:"+executorService.getPoolSize()+"Number of active threads:"+executorService.getActiveCount()+",Received task:"+executorService.getTaskCount()+"Number of tasks completed:"+executorService.getCompletedTaskCount()+"Number of waiting tasks:"+executorService.getQueue().size());
            TimeUnit.MILLISECONDS.sleep(500);
        }
        System.out.println("-------------------");
        while (executorService.getActiveCount()>=0)//Continue to detect the thread pool
        {
          System.out.println("Current number of core threads"+executorService.getCorePoolSize()+",Maximum threads:"+executorService.getMaximumPoolSize()+",Current thread pool size:"+executorService.getPoolSize()+"Number of active threads:"+executorService.getActiveCount()+",Received task:"+executorService.getTaskCount()+"Number of tasks completed:"+executorService.getCompletedTaskCount()+"Number of waiting tasks:"+executorService.getQueue().size());
            Thread.sleep(1000);//Test every 1 second
        }

    }
}

When the thread pool size reaches the number of core threads, threads will be placed in the waiting queue. When the thread pool waiting queue is full, a new thread will be started. When the current thread size reaches the maximum number of threads and the waiting queue is full, the DiscardPolicy policy will be executed to directly discard the unprocessable task. There are only 15 tasks left in the last 30 tasks.

Schematic diagram:

Extended thread pool

Sometimes you need to extend the thread pool, such as monitoring the start and end times of each task, or customize other enhancements.

ThreadPoolExecutor thread pool provides two methods:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }

The thread pool will execute the beforeExecute() method before executing a task, and call the afterExecute() method after execution

Check the source code of ThreadPoolExecutor. An internal class Worker is defined in this class. The working thread of ThreadPoolExecutor thread pool is the instance of Worker class. The Worker instance will call beforeExecute and afterExecute methods during execution.

public void run() {
            runWorker(this);
}
final void runWorker(Worker w) {
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
    }      

Part of the code has been omitted. The thread will call beforeExecute before execution and afterExecute after execution.

Extended thread pool example

package com;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Text07 {
    public static void main(String[] args) {

        //Define the extended thread pool. Define the thread pool class, inherit the ThreadPoolExecutor, and then override other methods
        ExecutorService threadPoolExecutor=
 new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new LinkedBlockingDeque<>()){
     //Override the start method in the inner class
     @Override
     protected void beforeExecute(Thread t, Runnable r) {
         System.out.println(t.getId()+"The thread is ready to execute the task"+((Mytask)r).name);
     }
     //Override the end method in the inner class
     @Override
     protected void afterExecute(Runnable r, Throwable t) {
         System.out.println(((Mytask)r).name+"Execution complete");
     }
     //Thread pool exit
     @Override
     protected void terminated() {
         System.out.println("Thread pool exit");
     }
 };
        for (int i = 0; i < 5; i++) {
            Mytask mytask=new Mytask("Thread"+i);
            threadPoolExecutor.execute(mytask);
        }
    }
    private  static  class  Mytask implements Runnable
    {
        private  String name;

        public  Mytask(String name)
        {
            this.name=name;
        }
        @Override
        public void run() {
            System.out.println(name+"Being executed"+Thread.currentThread().getId());
            try {
                Thread.sleep(1000);//Simulation task duration
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Optimize thread pool size

The size of the thread pool has a certain impact on the system performance. If it is too large or too small, it cannot give full play to the best performance of the system. It does not need to be very accurate. Just avoid the maximum or minimum. Generally speaking, the size of the thread pool should consider the number of CPU s

Thread pool size = number of CPUs * target CPU utilization * (1 + ratio of waiting time to calculation time)

Thread pool deadlock

If task A submits Task B during the execution of the thread pool, and Task B is added to the waiting queue in the thread pool, if the end of A requires the execution result of B, and thread B needs to wait for the execution of thread A, all other working threads may be in the waiting state until these tasks are executed in the blocking queue. If there is no thread in the thread pool that can process the blocking queue, it will wait all the time and become A deadlock.

It is suitable to submit independent tasks to the thread pool instead of interdependent tasks. For interdependent tasks, you can consider submitting them to different thread pools for processing.

Thread pool exception information capture

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Text09 {
    public static void main(String[] args) {
        //Create thread pool
        ExecutorService executorService=new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new SynchronousQueue<>());
        //Add two count tasks to the thread pool
        for (int i = 0; i <5 ; i++) {
            executorService.submit(new Text(10,i));
        }

    }
    private  static class  Text implements  Runnable
    {
        private  int x;
        private  int y;
        public  Text(int x,int y)
        {
            this.x=x;
            this.y=y;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"thread  x/y The result is"+x+"/"+y+"="+(x/y));
        }
    }
}

You can see that there are only four results. Five tasks are actually submitted to the thread pool, but when i==0, an arithmetic exception occurs. The thread pool eats the exception, so we know nothing about the exception

terms of settlement:

1. Change submit to execute

2. Extend the thread pool and wrap the submit

package com;

import java.util.concurrent.*;

public class Text09 {
    public static void main(String[] args) {
        //Creating a thread pool uses a custom thread pool
        ExecutorService executorService=new TranceThreadPoorExcuter(5,5,0, TimeUnit.SECONDS,new SynchronousQueue<>());
        //Add two count tasks to the thread pool
        for (int i = 0; i <5 ; i++) {
            executorService.submit(new Text(10,i));
        }

    }
    public  static class  Text implements  Runnable
    {
        public  int x;
        public  int y;
        public  Text(int x,int y)
        {
            this.x=x;
            this.y=y;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"thread  x/y The result is"+x+"/"+y+"="+(x/y));
        }
    }
    //The custom thread pool class extends trancethreadpoorexputer
    private  static  class  TranceThreadPoorExcuter extends  ThreadPoolExecutor
    {

        public TranceThreadPoorExcuter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
        //Define a method to pass in two parameters. The first is the task to be accepted, and the second is Exception
        public  Runnable warp(Runnable r,Exception e)
        {
            return new Runnable() {
                @Override
                public void run() {

                    try {
                        r.run();
                    }
                    catch (Exception e1)
                    {
                        e.printStackTrace();
                        throw e1;
                    }
                }
            };
        }
        //Override the submit method
        @Override
        public Future<?> submit(Runnable task) {
            return super.submit(warp(task,new Exception("Customer tracking exception")));
        }
        //You can also override the excute method
    }
}

This method uses a custom thread pool and rewrites the submit method in the thread pool. In the submit method, the task parameters to be passed in can be used to capture thread pool exceptions with the function of capturing exception information.

Keywords: Java Multithreading

Added by DamianTV on Sun, 20 Feb 2022 05:20:09 +0200