How to create process pool in Java 8

Several default ways of creating thread pools are provided in Java 8

Advantages of thread pool: less resource waste and improved thread reuse

Create the top-level class executors. For thread pool class

First, open the class and view the methods implemented by the class

To sum up, several implementations are:

        //Single pass pool
        ExecutorService executor1 = Executors.newSingleThreadExecutor();
        //A one-way pool that can be executed regularly or periodically
        ExecutorService executor5 = Executors.newSingleThreadScheduledExecutor();
        //Fixed length linear pool
        ExecutorService executor2 = Executors.newFixedThreadPool(5);
        //Cache thread pool
        ExecutorService executor3 = Executors.newCachedThreadPool();
        //Fixed length routing pool executed regularly or regularly
        ExecutorService executor4 = Executors.newScheduledThreadPool(5);
        //Task stealing thread pool
        ExecutorService executor6 = Executors.newWorkStealingPool();
1. Singleton pool: newSingleThreadExecutor() 

The internal calls of the single thread pool are as follows: it can be seen that the number of core threads and the maximum number of threads are 1. The queue adopts an unbounded queue. If a single thread is terminated due to a failure during the task execution before closing, a new thread will replace it to continue to execute subsequent tasks if necessary. It can ensure the sequential execution of tasks, and no more than one thread will be active within the specified time. Unlike the equivalent newFixedThreadPool(1), it ensures that other threads can be used without reconfiguration

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

Test with the following test code:

public static void main(String[] args) throws InterruptedException {
        int threads = 10;
        //Single thread pool. If this single thread is terminated due to a failure during execution before shutdown, a new thread will perform subsequent tasks instead of it if necessary
        ExecutorService executor1 = Executors.newSingleThreadExecutor();
        for (int i = 0; i < threads; i++) {
            int index = i;
            Thread.sleep(1000);
            executor1.execute(() ->{
                System.out.println(Thread.currentThread().getName() + "  " + index);
            });
        }
    }

You can see that the execution result is:

2,newSingleThreadScheduledExecutor()

Single threaded executor that can schedule commands to run after a given delay or execute periodically

First, let's look at a task that is executed regularly, which is similar to the effect of a job. The main difference is

ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit);
public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(new Runnable() {
            int i = 0;
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() +  "implement" + i++);
            }
        }, 3, 2, TimeUnit.SECONDS);
    }

The results are as follows

 

Delayed tasks are as follows, the difference is

ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)

public static void main(String[] args){
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        long start = System.currentTimeMillis();
        executorService.schedule(() -> {
            System.out.println(Thread.currentThread().getName() + "Time:" + (System.currentTimeMillis() - start) + " implement " );
        }, 500, TimeUnit.MILLISECONDS);
    }

The execution result is

Therefore, whether the newsinglethreadscheduledexecurator generates a delayed task or a periodic task mainly depends on the method called when submitting the task.

3. Fixed length thread pool newFixedThreadPool

A thread pool with a fixed number of threads. The core code of the initialization of the thread pool is shown below

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

It can be seen that the parameters of newFixedThreadPool are initialized in the thread pool. The number of core threads and the maximum number of threads are the parameter values passed in, that is, the parameters passed in by initializing newFixedThreadPool. Unbounded queue used by queue

public static void main(String[] args) throws InterruptedException {
        // Number of threads
        int threads = 10;
        // Used to count whether the thread has completed execution
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        System.out.println("---- start ----");
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0 ; i < threads; i++) {
            executorService.submit(() ->{
                System.out.println(Thread.currentThread().getName() + "implement");
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        System.out.println("---- end ----");
        executorService.shutdown();
    }

The execution results are as follows:

 

You can see that up to three threads have been created.

4. Cache thread pool newCachedThreadPool

Thread pool with cache. The core code of thread pool initialization is

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

You can see that the thread pool is obtained by initializing the parameters of newFixedThreadPool. The number of core threads is 0 and the maximum number of threads is integer MAX_ Value, initialized by newFixedThreadPool with an expiration time of 60 seconds.

In other words, every time a new task comes in, a new thread will be generated to execute the task. When no task is executed, the thread will be recycled after 60 seconds. The specific use method is basically the same as that of newFixedThreadPool, the test code is as follows:

public static void main(String[] args) throws InterruptedException {
        // Number of threads
        int threads = 10;
        // Used to count whether the thread has completed execution
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        System.out.println("---- start ----");
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0 ; i < 10; i++) {
            executorService.submit(() ->{
                System.out.println(Thread.currentThread().getName() + "implement");
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        System.out.println("---- end ----");
        executorService.shutdown();
    }

The operation results are as follows:

 

5, newScheduledThreadPool

A fixed length routing pool that runs or executes regularly. Usage is similar to newSingleThreadScheduledExecutor.

The initialization core code is as follows:

  public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

ScheduledThreadPoolExecutor is a subclass of ThreadPoolExecutor, and implements the ScheduledExecutorService interface (which is also a sub interface of ExecutorService interface, equivalent to extending ExecutorService), and implements its methods to achieve the purpose of regular or regular execution

The interface method is as follows

The relationship between classes is as follows

The specific usage is similar to that of newSingleThreadScheduledExecutor, a one-way process pool executed regularly or periodically

The code snippets executed periodically are as follows:

public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(new Runnable() {
            int i = 0;
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() +  "implement" + i++);
            }
        }, 3, 2, TimeUnit.SECONDS);
    }

The results are as follows:

 6,newWorkStealingPool

Task stealing thread pool. The core code of initialization is as follows

/**
     * Creates a work-stealing thread pool using all
     * {@link Runtime#availableProcessors available processors}
     * as its target parallelism level.
     * @return the newly created thread pool
     * @see #newWorkStealingPool(int)
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

You can see that the way to initialize the thread is from jdk1 Only after 8. The core of this thread initialization is to initialize a ForkJoinPool. The number of threads uses runtime getRuntime(). The number of cpu core threads of the current server obtained by availableprocessors(). ForkJoinPool can make full use of the advantages of multi-core cpu to fork a task into multiple "small tasks" and distribute them to different cpu cores for execution. After execution, the results are joined and returned together. The so-called Fork - join.

Job theft refers to idle threads processing tasks that do not belong to them. Each processor core has a queue to store the tasks to be completed. For multi-core machines, when the tasks corresponding to one core are processed, they can help other cores process tasks.

The example code is as follows:

public static void main(String[] args) throws InterruptedException {
        // Number of threads
        int threads = 10;
        // Used to count whether the thread has completed execution
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        System.out.println("---- start ----");
        ExecutorService executorService = Executors.newWorkStealingPool();
        for (int i = 0; i < threads; i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName());
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        System.out.println("---- end ----");
    }

The operation results are as follows:

 

Summary: in addition to the last newly added thread stealing pool in java8, the rest of the common thread pools are initialized threadpoolexecutors, which are implemented by assigning different values to different parameters. The next article focuses on the meaning of the seven parameters initialized by ThreadPoolExecutor and the process created by the thread when the task is submitted.

 

Keywords: Java Back-end Multithreading

Added by softsolvers on Thu, 06 Jan 2022 15:36:56 +0200