Implementation of concurrent thread pool and blocking thread pool

Executors create thread pool

There are many kinds of thread pools in java. The most convenient way is to create thread pools by Executors. You can directly specify the size of thread pools, such as Executors.newFixedThreadPool(int nThreads). But convenience not only hides the complexity, but also buries the potential hidden danger (OOM, thread exhaustion) for us.

Method name function
newFixedThreadPool(int nThreads) Create a fixed size thread pool
newSingleThreadExecutor() Create a thread pool with only one thread
newCachedThreadPool() Create a thread pool with unlimited threads, and any submitted tasks will be executed immediately

 

Of course, there is no problem for small programs to use these shortcut methods. For programs that need to run for a long time on the server side, the method of constructing ThreadPoolExecutor should be used directly to create thread pool. Yes, the thread pool created by the Executors method above is ThreadPoolExecutor.

Construction method of ThreadPoolExecutor

/**
 * Thread pool initialization method
 * <p>
 * corePoolSize Core thread pool size - 10
 * maximumPoolSize Maximum thread pool size ----- 30
 * keepAliveTime The maximum survival time of idle threads in the thread pool exceeding the number of corePoolSize ----- 30 + unit TimeUnit
 * TimeUnit keepAliveTime Time unit --- TimeUnit.MINUTES
 * workQueue Blocking queue - New arrayblockingqueue < runnable > (10) ===== 10 capacity blocking queue
 * threadFactory New thread factory - new CustomThreadFactory() ===== customized thread factory
 * rejectedExecutionHandler When the number of submitted tasks exceeds the sum of maxmumPoolSize+workQueue,
 * That is, when the 41st task is submitted (the previous threads have not finished executing, sleep (100) is used in this test method),
 * The task is handed over to the RejectedExecutionHandler for processing
 */
        ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 30, 30,
                TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(10), new CustomThreadFactory(), new CustomRejectedExecutionHandler());
    

 

There are two types of tasks that can be submitted to the thread pool: Runnable and Callable. The differences between them are as follows:

  1. Method signature is different, void Runnable.run(), V Callable.call() throws Exception
  2. Is return value allowed? Callable is allowed
  3. Whether exceptions are allowed to be thrown. Callable allows exceptions to be thrown.

Callable is an interface added in JDK 1.5. As a supplement to Runnable, it allows return values and exceptions.

 

Therefore, after the thread is created, the thread pool submit is ready

 

Here we customize our own non blocking thread pool

package demo3;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Customize your own non blocking thread pool
 */
public class CustomThreadPoolExecutor {
    private ThreadPoolExecutor pool = null;

    /**
     * Thread pool initialization method
     * <p>
     * corePoolSize Core thread pool size - 10
     * maximumPoolSize Maximum thread pool size ----- 30
     * keepAliveTime The maximum survival time of idle threads in the thread pool exceeding the number of corePoolSize ----- 30 + unit TimeUnit
     * TimeUnit keepAliveTime Time unit --- TimeUnit.MINUTES
     * workQueue Blocking queue - New arrayblockingqueue < runnable > (10) ===== 10 capacity blocking queue
     * threadFactory New thread factory - new CustomThreadFactory() ===== customized thread factory
     * rejectedExecutionHandler When the number of submitted tasks exceeds the sum of maxmumPoolSize+workQueue,
     * That is, when the 41st task is submitted (the previous threads have not finished executing, sleep (100) is used in this test method),
     * The task is handed over to the RejectedExecutionHandler for processing
     */

    public void init() {

        pool = new ThreadPoolExecutor(10, 30, 30,
                TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(10), new CustomThreadFactory(), new CustomRejectedExecutionHandler());
    }

    public void destory() {
        if (pool != null) {
            pool.shutdownNow();
        }
    }

    public ExecutorService getCustomThreadPoolExecutor() {
        return this.pool;
    }


    private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //Record exception
            System.out.println("error...................");
        }
    }

    private class CustomThreadFactory implements ThreadFactory {

        private AtomicInteger count = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);

            System.out.println(threadName);
            t.setName(threadName);
            return t;
        }
    }

    public static void main(String[] args) {
        CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();

        //1. initialization
        exec.init();

        ExecutorService pool = exec.getCustomThreadPoolExecutor();

        for (int i = 1; i < 100; i++) {
            System.out.println("Submission" + i + "Task");
            int finalI = i;
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(">>>task is running========" + finalI);
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

}

 

This is a parallel thread. The added thread will execute synchronously. If you want to implement one thread by one, you need to customize your own blocking thread pool

The core of the blocking thread pool is a queue. Of course, ordinary threads are also a queue. However, if we change the way of adding an offer to an out, threads can only execute one by one,

package demo3;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Author:Zach
 * @Description: Customize your own blocking thread pool
 * @Date:Created in 15:26 2018/8/14
 * @Modified By:
 */
public class CustomUnblockThreadPoolExecutor {
    private ThreadPoolExecutor pool = null;

    /**
     * Thread pool initialization method
     * <p>
     * corePoolSize Core thread pool size ----- 1
     * maximumPoolSize Maximum thread pool size - 3
     * keepAliveTime The maximum survival time of idle threads in the thread pool exceeding the number of corePoolSize ----- 30 + unit TimeUnit
     * TimeUnit keepAliveTime Time unit --- TimeUnit.MINUTES
     * workQueue Blocking queue - New arrayblockingqueue < runnable > (5) = = = = 5 capacity blocking queue
     * threadFactory New thread factory - new CustomThreadFactory() ===== customized thread factory
     * rejectedExecutionHandler When the number of submitted tasks exceeds the sum of maxmumPoolSize+workQueue,
     * That is, when the 9th task is submitted (the previous threads have not finished executing, sleep (100) is used in this test method),
     * The task is handed over to the RejectedExecutionHandler for processing
     */

    public void init() {

        pool = new ThreadPoolExecutor(1, 3, 30,
                TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(5), new CustomThreadFactory(), new CustomRejectedExecutionHandler());
    }

    public void destory() {
        if (pool != null) {
            pool.shutdownNow();
        }
    }

    public ExecutorService getCustomThreadPoolExecutor() {
        return this.pool;
    }


    private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //The core transformation point, from the offer of blockingqueue to put blocking method
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private class CustomThreadFactory implements ThreadFactory {

        private AtomicInteger count = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            String threadName = CustomUnblockThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);

            System.out.println(threadName);
            t.setName(threadName);
            return t;
        }
    }

    public static void main(String[] args) {
        CustomUnblockThreadPoolExecutor exec = new CustomUnblockThreadPoolExecutor();

        //1. initialization
        exec.init();

        ExecutorService pool = exec.getCustomThreadPoolExecutor();

        for (int i = 1; i < 100; i++) {
            System.out.println("Submission" + i + "Task");
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(">>>task is running========");
                        TimeUnit.SECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

}

 

 

Published 22 original articles, won praise 1, visited 3678
Private letter follow

Keywords: Java JDK

Added by gravedig2 on Mon, 20 Jan 2020 08:41:30 +0200