Executor and Thread Pool

Definition

Threads are heavy objects and should be avoided from being created and destroyed frequently.

class XXXPool{
  // Getting pooled resources
  XXX acquire() {
  }
  // Release pooled resources
  void release(XXX x){
  }
}  

Thread pool is a producer-consumer mode

//A design method for pooling resources in a general sense
class ThreadPool{
  // Get idle threads
  Thread acquire() {
  }
  // Release Threads
  void release(Thread t){
  }
} 
//Expected use
ThreadPool pool;
Thread T1=pool.acquire();
//Incoming Runnable Object
T1.execute(()->{
  //Specific business logic
  ......
});


//Simplified thread pool, used only to illustrate how it works
class MyThreadPool{
  //Implement producer-consumer mode using blocking queues
  BlockingQueue<Runnable> workQueue;
  //Save internal worker threads
  List<WorkerThread> threads 
    = new ArrayList<>();
  // Construction method
  MyThreadPool(int poolSize, 
    BlockingQueue<Runnable> workQueue){
    this.workQueue = workQueue;
    // Create worker thread
    for(int idx=0; idx<poolSize; idx++){
      WorkerThread work = new WorkerThread();
      work.start();
      threads.add(work);
    }
  }
  // Submit Tasks
  void execute(Runnable command){
    workQueue.put(command);
  }
  // Work threads are responsible for consuming tasks and performing them
  class WorkerThread extends Thread{
    public void run() {
      //Loop Tasks and Execute
      while(true){ ①
        Runnable task = workQueue.take();
        task.run();
      } 
    }
  }  
}

/** Here's an example of use**/
// Create bounded blocking queue
BlockingQueue<Runnable> workQueue = 
  new LinkedBlockingQueue<>(2);
// Create Thread Pool  
MyThreadPool pool = new MyThreadPool(
  10, workQueue);
// Submit Tasks  
pool.execute(()->{
    System.out.println("hello");
});

Inside MyThreadPool, we maintain a blocked queue workQueue and a set of worker threads whose number is specified by the poolSize in the constructor. The user submits a Runnable task by calling the execute() method, and the internal implementation of the execute() method simply adds the task to the workQueue. The worker thread maintained internally by MyThreadPool consumes and executes tasks in the workQueue, and the relevant code is the while loop at Code 1

ThreadPoolExecutor

ThreadPoolExecutor(
  int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue,
  ThreadFactory threadFactory,
  RejectedExecutionHandler handler) 

CorePoolSize: Indicates the minimum number of threads that the thread pool holds. Some projects are idle, but you can't take people away, at least keep corePoolSize on its own. MaximumPoolSize: Indicates the maximum number of threads created by the thread pool. When a project is busy, you need to add people, but you can't add people without restrictions, at most to maximumPoolSize individuals. When the project is idle, it will be withdrawn, up to corePoolSize individuals. KeepAliveTime & unit: The project mentioned above increases or decreases people based on their busy and idle time. How do you define busy and idle in the programming world? Simply, if a thread does not execute a task for a period of time, it means it is idle and keepAliveTime and unit are the parameters used to define this "period of time". That is, if a thread has been idle keepAliveTime & unit for so long and the number of threads in the thread pool is greater than corePoolSize, the idle thread will be recycled.
workQueue: The work queue, which is synonymous with the work queue of the sample code above. threadFactory: This parameter allows you to customize how threads are created, for example, by giving them a meaningful name. Handler: With this parameter you can customize the rejection policy of the task. If all threads in the thread pool are busy and the work queue is full (provided the work queue is bounded), the thread pool will reject the submission at this time. As for the rejection policy, you can specify it with the handler parameter. ThreadPoolExecutor already provides the following four strategies.
CallerRunsPolicy: The thread submitting the task executes the task itself.
**AbortPolicy:** Default rejection policy, throws RejectedExecutionException.
**DiscardPolicy:** Discards the task directly without any exceptions thrown. DiscardOldestPolicy: To discard the oldest task is to discard the task that first entered the work queue and then add the new task to the work queue.
The most important reason not to recommend using Executors is that many of the methods provided by Executors default to an unbound LinkedBlockingQueue, which can easily lead to OOM in high load scenarios where OOM causes all requests to go unprocessed, which is a fatal problem. So bounded queues are strongly recommended. With bounded queues, a thread pool triggers a rejection policy when there are too many tasks, and the default rejection policy for the thread pool throws RejectedExecutionException This is a runtime exception that is not enforced by the runtime exception compiler, so it is easy for developers to ignore. Therefore, the default rejection strategy should be used with caution. If the task handled by the thread pool is important, it is recommended that you customize your own denial policy; In practice, custom rejection strategies are often used in conjunction with demotion strategies.

try {
  //Business logic
} catch (RuntimeException x) {
  //Processing on demand
} catch (Throwable x) {
  //Processing on demand
} 

How to get task execution results

// Submit Runnable Task
Future<?> 
  submit(Runnable task);
// Submit Callable Task
<T> Future<T> 
  submit(Callable<T> task);
// Submit Runnable Tasks and Result References  
<T> Future<T> 
  submit(Runnable task, T result);

The Future interface has five methods, listed below. They are cancel(), isCancelled(), isDone(), and two get () and get(timeout, unit) that get the results of the task execution. The last get(timeout, unit) supports the timeout mechanism. Through these five methods of the Future interface, you will find that the tasks we submit not only get the results of task execution, but also cancel them. However, it should be noted that both get() methods are blocking. If the task has not finished executing when it is called, the thread calling get() method will block until the task is finished executing.

// Cancel Task
boolean cancel(
  boolean mayInterruptIfRunning);
// Determine if the task has been cancelled  
boolean isCancelled();
// Determine if the task has ended
boolean isDone();
// Get Task Execution Results
get();
// Get task execution results, support timeouts
get(long timeout, TimeUnit unit);

Kettle

// Create FutureTask for Task T2
FutureTask<String> ft2
  = new FutureTask<>(new T2Task());
// Create FutureTask for Task T1
FutureTask<String> ft1
  = new FutureTask<>(new T1Task(ft2));
// Thread T1 executes task ft1
Thread T1 = new Thread(ft1);
T1.start();
// Thread T2 executes task ft2
Thread T2 = new Thread(ft2);
T2.start();
// Waiting for the result of thread T1 execution
System.out.println(ft1.get());

// Task tasks:
// Wash kettle, boil water, make tea
class T1Task implements Callable<String>{
  FutureTask<String> ft2;
  // T1 task requires FutureTask for T2 task
  T1Task(FutureTask<String> ft2){
    this.ft2 = ft2;
  }
  @Override
  String call() throws Exception {
    System.out.println("T1:Washing kettle...");
    TimeUnit.SECONDS.sleep(1);
    
    System.out.println("T1:Boiling water...");
    TimeUnit.SECONDS.sleep(15);
    // Get tea from T2 thread  
    String tf = ft2.get();
    System.out.println("T1:Get the tea:"+tf);

    System.out.println("T1:Making Tea...");
    return "Serve tea:" + tf;
  }
}
// The tasks that T2Task needs to perform:
// Tea jug, cup, tea
class T2Task implements Callable<String> {
  @Override
  String call() throws Exception {
    System.out.println("T2:Tea-washer...");
    TimeUnit.SECONDS.sleep(1);

    System.out.println("T2:Tea cup...");
    TimeUnit.SECONDS.sleep(2);

    System.out.println("T2:Get tea...");
    TimeUnit.SECONDS.sleep(1);
    return "Longjing";
  }
}
// One execution result:
T1:Washing kettle...
T2:Tea-washer...
T1:Boiling water...
T2:Tea cup...
T2:Get tea...
T1:Get the tea:Longjing
T1:Making Tea...
Serve tea:Longjing
//Task 1: Wash the kettle - > Boil the water
CompletableFuture<Void> f1 = 
  CompletableFuture.runAsync(()->{
  System.out.println("T1:Washing kettle...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T1:Boiling water...");
  sleep(15, TimeUnit.SECONDS);
});
//Task 2: Tea-washer->Tea-cup->Take tea
CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
  System.out.println("T2:Tea-washer...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T2:Tea cup...");
  sleep(2, TimeUnit.SECONDS);

  System.out.println("T2:Get tea...");
  sleep(1, TimeUnit.SECONDS);
  return "Longjing";
});
//Task 3: Execute after Task 1 and Task 2 are completed: Make Tea
CompletableFuture<String> f3 = 
  f1.thenCombine(f2, (__, tf)->{
    System.out.println("T1:Get the tea:" + tf);
    System.out.println("T1:Making Tea...");
    return "Serve tea:" + tf;
  });
//Waiting for Task 3 to execute
System.out.println(f3.join());

void sleep(int t, TimeUnit u) {
  try {
    u.sleep(t);
  }catch(InterruptedException e){}
}
// One execution result:
T1:Washing kettle...
T2:Tea-washer...
T1:Boiling water...
T2:Tea cup...
T2:Get tea...
T1:Get the tea:Longjing
T1:Making Tea...
Serve tea:Longjing

Creating a CompletableFuture object depends primarily on the four static methods shown in the code below, let's look at the first two. In the case of boiling tea, we have used runAsync(Runnable runnable) and supplyAsync(Supplier supplier), the difference being that the run() method of the Runnable interface does not return a value, and the get() method of the Supplier interface does. The difference between the first two methods is that the last two methods can specify thread pool parameters. CompletableFuture uses the common ForkJoinPool thread pool by default. The default number of threads created by this thread pool is the number of cores in the CPU (You can also set the number of threads in the ForkJoinPool thread pool through JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism). If all CompletableFutures share a thread pool, then once all tasks perform some slow I/O operations, all threads in the thread pool will be blocked on the I/O operations, causing thread hunger and hence shadow Sounds the performance of the whole system. Therefore, it is strongly recommended that you create different thread pools for different business types to avoid interference with each other.

//Use default thread pool
static CompletableFuture<Void> 
  runAsync(Runnable runnable)
static <U> CompletableFuture<U> 
  supplyAsync(Supplier<U> supplier)
//Thread pool can be specified  
static CompletableFuture<Void> 
  runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> 
  supplyAsync(Supplier<U> supplier, Executor executor)  

CompletionService is recommended when batch submission of asynchronous tasks is required. CompletionService combines the functionality of the thread pool Executor with the blocking queue BlockingQueue to make the management of batch asynchronous tasks easier. In addition, CompletionService allows the execution of asynchronous tasks to be ordered, with first execution entering the blocked queue. This feature allows you to easily achieve the ordering of subsequent processing, avoid unnecessary waiting, and quickly implement requirements such as Forking Cluster. ExecutorCompletionService, an implementation class of CompletionService, requires you to create a thread pool yourself. Although it may seem tedious, the benefit is that you can isolate thread pools from multiple ExecutorCompletionServices, which avoids the risk that several particularly time-consuming tasks will crash the entire application.

Ask for minimum quotation

// Create Thread Pool
ExecutorService executor = 
  Executors.newFixedThreadPool(3);
// Create CompletionService
CompletionService<Integer> cs = new 
  ExecutorCompletionService<>(executor);
// Asynchronous inquiry to E-commerce S1
cs.submit(()->getPriceByS1());
// Asynchronous inquiry to E-commerce S2
cs.submit(()->getPriceByS2());
// Asynchronous Inquiry to E-commerce S3
cs.submit(()->getPriceByS3());
// Save the results of the request for quote asynchronously to the database
// And calculate the lowest quotation
AtomicReference<Integer> m =
  new AtomicReference<>(Integer.MAX_VALUE);
for (int i=0; i<3; i++) {
  executor.execute(()->{
    Integer r = null;
    try {
      r = cs.take().get();
    } catch (Exception e) {}
    save(r);
    m.set(Integer.min(m.get(), r));
  });
}
return m;

Keywords: Java Concurrent Programming thread pool

Added by cyberplasma on Sun, 02 Jan 2022 21:39:14 +0200