Manually implement thread pool in Java

1. Why use thread pool?

In the previous article, we created a thread when using threads, which is very easy to implement, but there will be a problem: if there are a large number of concurrent threads and each thread executes a task for a short time, the frequent creation of threads will greatly reduce the efficiency of the system, Because it takes time to create and destroy threads frequently. So is there a way to make threads reusable, that is, after executing a task, they can continue to execute other tasks without being destroyed? In Java, this effect can be achieved through thread pool.

2. How?

  • Create a blocking queue to organize tasks. If the number of tasks in the queue is less than the maximum number of threads allowed by the thread pool, all tasks will be loaded. If it is greater than, after the threads in the thread pool finish executing their current tasks, new tasks will be taken out of the queue for continuous execution.
  • Create a class that represents the threads in the thread pool
  • Create a collection to store threads in the current thread pool
  • Implement the method of adding tasks to the thread pool
  • Implement the method of ending all threads in the thread pool

3. Difficulty analysis

3.1 execute method

  • Judge whether the number of threads in the current thread pool is higher than the threshold (the maximum number of running threads in the thread pool)
    • If it is lower than the threshold, it means that a new thread can be created, the created new thread can be added to the thread collection, the task can be added to the task queue, and the idle thread will execute directly
    • If it is higher than the threshold, it means that no new thread can be created. Add the task to the task queue, wait for other threads to execute the current task, and then take the task out of the queue for execution

3.2 shutdown method

  • First, traverse the thread collection and call the interrupt method of each thread to interrupt the thread (after interrupt, each thread does not end immediately, but waits for a period of time ~)

  • Then join each thread after each interrupt to the main thread = = (ensure that the main thread ends only after all threads in the thread pool end)==

3.3 thread class run method in thread pool

Determine whether the current thread is interrupted

  • If it is not interrupted, a new task is looped out of the blocking queue for execution, but if the queue is empty, the thread will be blocked
  • If interrupted, the thread ends

4. Overall code implementation

Each thread class of the work in the thread pool, as a "consumer", consumes the tasks in the blocking queue

/**
 * Threads working in the current thread pool
 * Consumer, tasks in the consumer queue
 */
static class Worker extends Thread
{
    //Each worker needs to get the task from the task queue, so it needs to get the task queue instance
    private BlockingQueue<Runnable> queue;

    //Blocking queue for incoming storage tasks
    public Worker(BlockingQueue<Runnable> queue)
    {
        this.queue = queue;
    }

    @Override
    public void run()
    {
        try
        {
            //If the current thread is not interrupted, it will take out a new task from the blocking queue and continue to execute. If the queue is empty, the thread will block
            while (!Thread.currentThread().isInterrupted())
            {
                Runnable command = queue.take();
                command.run();
            }
        }
        catch (InterruptedException e)
        {
            System.out.println(Thread.currentThread().getName() + "Thread end");
        }
    }
}
static class MyThreadPool
{
    //The maximum number of threads that can execute simultaneously in the thread pool
    private static final int MAX_POOL_COUNT = 3;//This figure can be adjusted as needed
    //Blocking queue is used to organize several tasks. If the number of tasks in the queue is less than the maximum number of threads in the thread pool, all tasks will be loaded. If it is greater than, new tasks will be taken out of the queue to continue execution after the threads finish executing their tasks
    private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
    //A collection of all running threads in the thread pool
    private List<Worker> pool = new ArrayList<>();

    //Implement the execute method
    public void execute(Runnable command) throws InterruptedException
    {
        //Use lazy creation
        //When the number of threads in the thread pool is less than the threshold, a new thread is created to execute the task. Otherwise, it is added to the queue and wait for the task before other threads end to execute the task
        if (pool.size() < MAX_POOL_COUNT)
        {
            //Take out the task in the blocking queue and create a new thread
            Worker worker = new Worker(queue);
            //implement
            worker.start();
            //Add it to the thread pool
            pool.add(worker);
        }
        queue.put(command);//For the put method, if it is found that the queue is full when adding elements to the end of the queue, blocking will occur and wait until there is free space to add elements
    }
	//Method for closing thread pool
    public void shutDown() throws InterruptedException
    {
        //Terminate all threads
        for (Worker worker : pool)
        {
            worker.interrupt();
        }
        //After interrupt, each thread does not end immediately. Wait for the execution of each thread to end
        for (Worker worker : pool)
        {
            worker.join();
        }
        //After the cycle is completed, the recovery ends
    }
}

Keywords: Java RabbitMQ Interview

Added by guidance on Wed, 15 Dec 2021 00:35:38 +0200