Asynchronous and thread pool

1, Four ways to initialize threads

  1. Inherit Thread class
  2. Implement Runnable interface
  3. Implement Callable interface + FutureTask (you can get the returned results and handle exceptions)
  4. Thread pool
  • Inherit the Thread class and implement the Runnable interface: the main process cannot obtain the operation result of the Thread.
  • Implementation of Callable interface: the main process can obtain the operation results of the thread, but it is not conducive to controlling the thread resources of the server, which may lead to the depletion of server resources.
  • Thread pool: initialization mode
 //Initialize thread pool
 ExecutorService executorService = Executors.newFixedThreadPool(3);

ThreadPoolExecutor(int corePoolSize,
                  int maximumPoolSize,
                  long keepAliveTime,
                  TimeUnit unit,
                  BlockingQueue<Runnable> workQueue,
                  ThreadFactory threadFactory,
                  RejectedExecutionHandler handler) 

2, Seven parameters of new ThreadPoolExecutor()

  1. corePoolSize: the number of threads consistently maintained in the thread pool.
  2. maximumPoolSize: the maximum number of threads allowed in a thread pool.
  3. keepAliveTime: when the number of threads is greater than the number of core threads, the maximum number of threads that exceed the number of core threads will terminate the release if they do not receive a new task. Finally, the thread pool will be maintained at the size of corePoolSize.
  4. unit: time.
  5. workQueue: a blocking queue used to store tasks waiting to be executed. If the current demand for threads exceeds the size of corePoolSize, it will be placed here for idle threads to execute.
  6. threadFactory: a factory that creates threads, such as specifying thread names.
  7. handler: reject policy. If the thread is full, the thread pool uses the reject policy.

3, Process of running tasks in thread pool

1. Process

  1. Create a thread pool, prepare the number of core threads, and prepare to accept tasks

  2. New tasks come in and are executed with idle threads prepared by the core.
    1) When the core is full, the next task is added to the blocking queue. The idle core will get the task execution from the blocking queue.
    2) When the blocking queue is full, a new thread is directly opened for execution. The maximum number can only be opened to the number specified by max.
    3) Max, it's all done. The max core number of idle threads will destroy themselves after the time specified by keepAliveTime. Finally, keep it to the size of the core
    4) If the number of threads starts up to max and a new task comes in, the reject policy specified by reject will be used for processing.

  3. All thread creation is created by the specified threadFactory.

2. Interview questions

One thread pool core=7; max=20; queue=50. How are the 100 concurrent requests allocated?
First, 7 requests can be executed directly, then 50 enter the blocking queue and continue to execute the request task with 13 (20-7) more threads. Now there are always 7 + 13 + 50 = 70 tasks assigned. The remaining 30 default execution reject policies.

4, Four common thread pools

  1. newCacheThreadPool: create a cacheable thread pool. If the length of the thread pool exceeds the processing needs, you can flexibly recycle idle threads. If there is no recyclable thread, you can create a new thread.
  2. newFixedThreadPool: create a fixed length thread pool, which can control the maximum concurrent number of threads. The exceeded threads will wait in the queue.
  3. newScheduledThreadPool: create a fixed length thread pool to support scheduled and periodic task execution.
  4. Newsinglethreadexecution: create a single thread thread pool. Only one working thread will execute tasks to ensure that all tasks are executed in the specified order.

5, Completable future asynchronous orchestration

In java8, a new class containing about 50 methods, completable Future, provides a very powerful Future extension function, which can help simplify the complexity of asynchronous programming and provide the ability of functional programming. The calculation results can be processed through callback, and the methods of transforming and combining completable Future are provided. The completabilefuture class implements the Future interface, so the results are still obtained by blocking or polling through the get() method.

1. Completable future creates asynchronous objects

Completable future has four static methods to create asynchronous operations.

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

public static CompletableFuture<Void> runAsync(Runnable runnable)

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

runAsync methods have no return value, and supplyAsync can obtain the return result.
You can pass in a custom thread pool, otherwise the default thread pool will be used.

2. Callback method when calculation is completed (whenComplete, exceptionally)


public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) 

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) 

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) 

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

whenComplete can handle normal and abnormal calculation results, and exceptionally handle abnormal situations.
The difference between whenComplete and whenCompleteAsync:

  1. whenComplete: the thread executing the current task continues to execute the whenComplete task.
  2. whenCompleteAsync: continue to submit the task of executing whenCompleteAsync to the thread pool for execution. (a new thread may be started.)
public static void main(String[] args) throws ExecutionException, InterruptedException {
  CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {
      @Override
      public Object get() {
          System.out.println(Thread.currentThread() + " ---- completableFuture");
          int i = 10 / 0;
          return 1024;
      }
  }).whenComplete(new BiConsumer<Object, Throwable>() {
      @Override
      public void accept(Object o, Throwable throwable) {
          System.out.println("-------o = " + o.toString());
          System.out.println("-------throwable = " + throwable);
      }
  }).exceptionally(new Function<Throwable, Object>() {
      @Override
      public Object apply(Throwable throwable) {
          System.out.println("throwable=" + throwable);
          return 666;
      }
  });
  System.out.println(future.get());
}


3.handle method

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) 

public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)

public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) 

Like the complete method, the handle method can perform the final processing on the result, handle the abnormal result, and change the return value.

4. Thread serialization method

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) 

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) 

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 


public CompletableFuture<Void> thenAccept(Consumer<? super T> action) 

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) 

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor) 


public CompletableFuture<Void> thenRun(Runnable action)

public CompletableFuture<Void> thenRunAsync(Runnable action) 

public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor) 

  1. thenApply method: when a thread depends on another thread, get the result returned by the previous task and return the return value of the current thread.
  2. thenAccept method: consume processing results. Receive the processing result of the task and consume it. No result is returned.
  3. thenRun method: execute thenRun as long as the above task is completed, but execute the subsequent operations of thenRun after processing the task.
    With Async, it is executed asynchronously by default.

5. Combination of two tasks (both to be completed)

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) 

public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)

public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)


public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) 

public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)

public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor) 


public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action) 

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor) 

Both tasks must be completed to trigger the task.
thenCombine: combine two future tasks, obtain the return results of the two future tasks, and return the return value of the current task.
Then accept both: combine two future tasks, obtain the return results of the two future tasks, and then process the tasks without return value.
runAfterBoth: combine two futures. You don't need to obtain the results of the future. You only need to process the task after the two futures have processed the task.

public static void main(String[] args) throws ExecutionException, InterruptedException {
  CompletableFuture.supplyAsync(() -> {
      return "hello";
  }).thenApplyAsync(t -> {
      return t + " world!";
  }).thenCombineAsync(CompletableFuture.completedFuture(" CompletableFuture"), (t, u) -> {
      return t + u;
  }).whenComplete((t, u) -> {
      System.out.println(t);
  });
}

6. Combination of two tasks (one completed)

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) 

public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) 

public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor) 


public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)

public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) 

public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)


public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action) 

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action) 

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)

When either of the two future tasks is completed, the task is executed.
applyToEither: when one of the two tasks is completed, get its return value, process the task and have a new return value.
acceptEither: one of the two tasks is completed. Get its return value and process the task without return value.
runAfterEither: one of the two tasks is completed. There is no need to obtain the future results. The task is processed directly and there is no return value.

7. Multi task combination

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

allOf: wait for all tasks to complete.
anyOf: as long as one task is completed.

6, Product details page, code optimization using asynchronous orchestration

@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
  SkuItemVo skuItemVo = new SkuItemVo();
  //sku basic information acquisition
  CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
      SkuInfoEntity skuInfoEntity = getById(skuId);
      skuItemVo.setInfo(skuInfoEntity);
      return skuInfoEntity;
  }, executor);

  //sku picture information
  CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
      List<SkuImagesEntity> skuImagesEntities = skuImagesService.list(new QueryWrapper<SkuImagesEntity>().eq("sku_id", skuId));
      skuItemVo.setImages(skuImagesEntities);
  },executor);


  //Get the sales attribute combination of spu
  CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync(res -> {
      List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.listSaleAttrs(res.getSpuId());
      skuItemVo.setSaleAttr(saleAttrVos);
  }, executor);

  //Get spu introduction
  CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync(res -> {
      SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
      skuItemVo.setDesc(spuInfoDescEntity);
  }, executor);

  //Get spu specification parameters
  CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync(res -> {
      List<SpuItemAttrGroup> spuItemAttrGroupVos=attrGroupService.getProductGroupAttrsBySpuId(
              res.getSpuId(), res.getCatalogId());
      skuItemVo.setGroupAttrs(spuItemAttrGroupVos);
  }, executor);

  //Query whether the current sku participates in the second kill
//        CompletableFuture<Void> secKillFuture = CompletableFuture.runAsync(() -> {
//            R skuSeckillInfo = seckillFeignService.getSkuSeckillInfo(skuId);
//            if (skuSeckillInfo.getCode() == 0) {
//                SeckillInfoVo seckillInfoVo = skuSeckillInfo.getData(new TypeReference<SeckillInfoVo>() {});
//                skuItemVo.setSeckillInfoVo(seckillInfoVo);
//            }
//        }, executor);


  //Wait until all tasks are completed before returning
//        CompletableFuture.allOf(imageFuture,saleAttrFuture,descFuture,baseAttrFuture,secKillFuture).get();

  CompletableFuture.allOf(imageFuture,saleAttrFuture,descFuture,baseAttrFuture).get();
  return skuItemVo;
}

Keywords: Multithreading

Added by byronwells on Tue, 01 Feb 2022 21:34:34 +0200