Grain mall_ 10_ Asynchronous orchestration

Asynchronous orchestration

Introduction to completable future

Future is a class added in Java 5 to describe the results of an asynchronous calculation. You can use the isDone method to check whether the calculation is completed, or use get to block the calling thread until the calculation is completed and the result is returned. You can also use the cancel method to stop the execution of the task.

Although Future and related usage methods provide the ability to execute tasks asynchronously, it is very inconvenient to obtain the results. The results of tasks can only be obtained by blocking or polling. The blocking method is obviously contrary to the original intention of our asynchronous programming. The polling method will consume unnecessary CPU resources and can not get the calculation results in time. Why not use the observer design mode to inform the listener when the calculation results are completed?

Many languages, such as node JS, using callback to realize asynchronous programming. Some Java frameworks, such as Netty, extend the Future interface of Java and provide multiple extension methods such as addListener; Google guava also provides a general extension of Future; Scala also provides an easy-to-use and powerful Future/Promise asynchronous programming mode.

As an orthodox Java class library, should we do something to strengthen the functions of our own library?

Complete8 provides a powerful way to simplify the processing of Java's complex methods, and provides a new way to simplify the calculation of the results through the extension of the function of Complete8. Complete8 provides a very powerful way to process the results of Java.

Create asynchronous object

Completable future provides four static methods to create an asynchronous operation.

static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

Methods that do not specify an Executor will use forkjoinpool Commonpool () executes asynchronous code as its thread pool. If a thread pool is specified, it runs with the specified thread pool. All of the following methods are similar.

  • The runAsync method does not support return values.
  • supplyAsync can support return values.

Callback method when calculation is completed: when the calculation result of completable future is completed or an exception is thrown, a specific Action can be executed. The main methods are as follows:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);

public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);

whenComplete can handle normal and abnormal calculation results, and exceptionally handle abnormal situations. BiConsumer<? super T,? Super throwable > you can define the processing business

The difference between whenComplete and whenCompleteAsync:

  • whenComplete: the thread that executes the current task executes the task that continues to execute whenComplete.
  • whenCompleteAsync: the task of whenCompleteAsync is submitted to the thread pool for execution.

The method does not end with Async, which means that the Action uses the same thread to execute, while Async may use other threads to execute (if using the same thread pool, it may also be selected by the same thread to execute)

public class CompletableFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {
            @Override
            public Object get() {
                System.out.println(Thread.currentThread().getName() + "\t completableFuture");
                int i = 10 / 0;
                return 1024;
            }
        }).whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(Object o, Throwable throwable) {
                System.out.println("-------o=" + o.toString());
                System.out.println("-------throwable=" + throwable);
            }
        }).exceptionally(new Function<Throwable, Object>() {
            @Override
            public Object apply(Throwable throwable) {
                System.out.println("throwable=" + throwable);
                return 6666;
            }
        });
        System.out.println(future.get());
    }
}

handle method

handle is the processing of the result when the execution task is completed.
Handle is executed after the task is completed. It can also handle abnormal tasks.

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

Thread serialization method

thenApply method: when a thread depends on another thread, get the result returned by the previous task and return the return value of the current task.

thenAccept method: consume processing results. Receive the processing result of the task and consume it. No result is returned.

thenRun method: execute thenRun as long as the above task is completed, but execute the subsequent operations of thenRun after processing the task

With Async, it is executed asynchronously by default. The so-called asynchrony here refers to not executing in the current thread.

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

CompletableFuture

1. Create asynchronous object

Completable future provides four static methods to create an asynchronous operation.

static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

Methods that do not specify an Executor will use forkjoinpool Commonpool () executes asynchronous code as its thread pool. If a thread pool is specified, it runs with the specified thread pool. All of the following methods are similar.

  • The runAsync method does not support return values.
  • supplyAsync can support return values.

2. Callback method when calculation is completed

When the calculation result of completable future is completed or an exception is thrown, a specific Action can be executed. The main methods are as follows:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);

public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);

whenComplete can handle normal and abnormal calculation results, and exceptionally handle abnormal situations. BiConsumer<? super T,? Super throwable > you can define the processing business

The difference between whenComplete and whenCompleteAsync:
whenComplete: the thread that executes the current task executes the task that continues to execute whenComplete.
whenCompleteAsync: the task of whenCompleteAsync is submitted to the thread pool for execution.

The method does not end with Async, which means that the Action uses the same thread to execute, while Async may use other threads to execute (if using the same thread pool, it may also be selected by the same thread to execute)

Code example:

public class CompletableFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {
            @Override
            public Object get() {
                System.out.println(Thread.currentThread().getName() + "\t completableFuture");
                int i = 10 / 0;
                return 1024;
            }
        }).whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(Object o, Throwable throwable) {
                System.out.println("-------o=" + o.toString()); // Asynchronous result
                System.out.println("-------throwable=" + throwable); // abnormal
            }
        }).exceptionally(new Function<Throwable, Object>() {
            @Override
            public Object apply(Throwable throwable) {
                System.out.println("throwable=" + throwable);
                return 6666; // If an exception occurs, the default result is returned
            }
        });
      	// Get return result
        System.out.println(future.get());
    }
}

3. handle method

handle is the processing of the result when the execution task is completed.
Handle is executed after the task is completed. It can also handle abnormal tasks.

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

4. Thread serialization method

thenApply method: when a thread depends on another thread, get the result returned by the previous task and return the return value of the current task.

thenAccept method: consume processing results. Receive the processing result of the task and consume it. No result is returned.

thenRun method: execute thenRun as long as the above task is completed, but execute the subsequent operations of thenRun after processing the task

With Async, it is executed asynchronously by default. The so-called asynchrony here refers to not executing in the current thread.

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

Function<? super T,? extends U>
T: The type of result returned by the previous task
U: Return value type of current task

Code demonstration:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            System.out.println(Thread.currentThread().getName() + "\t completableFuture");
            //int i = 10 / 0;
            return 1024;
        }
    }).thenApply(new Function<Integer, Integer>() {
        @Override
        public Integer apply(Integer o) {
            System.out.println("thenApply Method, last returned result:" + o);
            return  o * 2;
        }
    }).whenComplete(new BiConsumer<Integer, Throwable>() {
        @Override
        public void accept(Integer o, Throwable throwable) {
            System.out.println("-------o=" + o);
            System.out.println("-------throwable=" + throwable);
        }
    }).exceptionally(new Function<Throwable, Integer>() {
        @Override
        public Integer apply(Throwable throwable) {
            System.out.println("throwable=" + throwable);
            return 6666;
        }
    }).handle(new BiFunction<Integer, Throwable, Integer>() {
        @Override
        public Integer apply(Integer integer, Throwable throwable) {
            System.out.println("handle o=" + integer);
            System.out.println("handle throwable=" + throwable);
            return 8888;
        }
    });
    System.out.println(future.get());
}

1. Combination of two tasks - both to be completed

Both tasks must be completed to trigger the task.

thenCombine: combine two futures, obtain the return results of the two futures, and return the return value of the current task

Then accept both: combine the two future tasks, get the return results of the two future tasks, and then process the tasks without return value.

runAfterBoth: combine two futures. You don't need to obtain the results of the future. You only need to process the task after the two futures process the task.

public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn, Executor executor);
    

public <U> CompletableFuture<Void> thenAcceptBoth(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action);

public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action);

public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action, Executor executor);


public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                            Runnable action);

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action);

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action,
                                                 Executor executor);

Test case:

public static void main(String[] args) {
    CompletableFuture.supplyAsync(() -> {
        return "hello";
    }).thenApplyAsync(t -> {
        return t + " world!";
    }).thenCombineAsync(CompletableFuture.completedFuture(" CompletableFuture"), (t, u) -> {
        return t + u;
    }).whenComplete((t, u) -> {
        System.out.println(t);
    });
}

Output: hello world! CompletableFuture

2. Combination of two tasks - one completed

When either of the two future tasks is completed, execute the task.

applyToEither: when one of the two tasks is completed, get its return value, process the task and have a new return value.

acceptEither: one of the two tasks is completed. Get its return value and process the task. There is no new return value.

runAfterEither: one of the two tasks is completed. There is no need to obtain the future results, process the task, and there is no return value.

public <U> CompletableFuture<U> applyToEither(
    CompletionStage<? extends T> other, Function<? super T, U> fn);

public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn);

public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn,
    Executor executor);

public CompletableFuture<Void> acceptEither(
    CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture<Void> acceptEitherAsync(
    CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture<Void> acceptEitherAsync(
    CompletionStage<? extends T> other, Consumer<? super T> action,
    Executor executor);

public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                              Runnable action);

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                   Runnable action);

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                   Runnable action,
                                                   Executor executor);

3. Multi task combination

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

allOf: wait for all tasks to complete

anyOf: as long as one task is completed

public static void main(String[] args) {
    List<CompletableFuture> futures = Arrays.asList(CompletableFuture.completedFuture("hello"),
                                                    CompletableFuture.completedFuture(" world!"),
                                                    CompletableFuture.completedFuture(" hello"),
                                                    CompletableFuture.completedFuture("java!"));
    final CompletableFuture<Void> allCompleted = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
    allCompleted.thenRun(() -> {
        futures.stream().forEach(future -> {
            try {
                System.out.println("get future at:"+System.currentTimeMillis()+", result:"+future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    });
}

Test results:

get future at:1568892339473, result:hello
get future at:1568892339473, result: world!
get future at:1568892339473, result: hello
get future at:1568892339473, result:java!

Almost complete the task at the same time!

Optimize product details page

1. Custom thread pool

2. Injection business writing

@Service
public class ItemServiceImpl implements ItemService {

    @Autowired
    private GmallPmsFeign pmsFeign;

    @Autowired
    private GmallSmsFeign smsFeign;

    @Autowired
    private GmallWmsFeign wmsFeign;

    @Autowired
    private ThreadPoolExecutor threadPoolExecutor; // Custom thread pool


    @Override
    public ItemVO loadData(Long skuId) throws ExecutionException, InterruptedException {

        ItemVO itemVO = new ItemVO();

        // 1. Get the basic information of sku
        // skuInfoEntity is required for subsequent acquisition of SKU promotion information, spu sales attributes and spu details (spuId in sku is required)
        // supplyAsync has a return value
        // runAsync has no return value
        // So you need to use supplyAsync here
        CompletableFuture<SkuInfoEntity> skuFuture = CompletableFuture.supplyAsync(() -> {
            Resp<SkuInfoEntity> skuInfoEntityResp = this.pmsFeign.querySkuById(skuId);
            SkuInfoEntity skuInfoEntity = skuInfoEntityResp.getData();
            if (skuInfoEntity != null) {
                BeanUtils.copyProperties(skuInfoEntity, itemVO);
            }
            return skuInfoEntity;
        }, threadPoolExecutor);


        // 2. Get the picture information of sku
        CompletableFuture<Void> skuImageFuture = CompletableFuture.runAsync(() -> {
            Resp<List<SkuImagesEntity>> listResp = this.pmsFeign.queryImagesBySkuId(skuId);
            List<SkuImagesEntity> images = listResp.getData();
            if (!CollectionUtils.isEmpty(images)) {
                List<String> imageUrls = images.stream().map(image -> image.getImgUrl()).collect(Collectors.toList());
                itemVO.setPics(imageUrls);
            }
        }, threadPoolExecutor);


        // 3. Get sku's promotion information TODO

        // 4. Get all sales attributes of spu
        // thenAcceptAsync: with parameters, no return
        // thenApplyAsync: there are parameters and returns
        // Subsequent spu details also need spuId in skuInfoEntity, so thenApplyAsync is used here
        CompletableFuture<SkuInfoEntity> spuFuture = skuFuture.thenApplyAsync(skuInfoEntity -> {
            Resp<List<SkuSaleAttrValueEntity>> skuSaleAttrValueResp = this.pmsFeign.querySkuSaleAttrValueBySpuId(skuInfoEntity.getSpuId());
            List<SkuSaleAttrValueEntity> skuSaleAttrValueEntities = skuSaleAttrValueResp.getData();
            itemVO.setSaleAttrs(skuSaleAttrValueEntities);
            return skuInfoEntity;
        }, threadPoolExecutor);

        // 5. Obtain the specification parameter group and the specification parameter TODO under the group

        // 6. spu details TODO

        CompletableFuture<Void> future = CompletableFuture.allOf(skuFuture, skuImageFuture, spuFuture);
        // Block the main process and wait for all sub processes to be executed!
        future.get();

        return itemVO;
    }
}

springboot configuration file

1. Configuration properties

/**
 * @author ljy
 * @version 1.0.0
 * @Description TODO
 * @createTime 2022 12:23:00, January 12
 */
@ConfigurationProperties(prefix = "liu.jian")
@Component
@Data
public class ThreadPoolConfigProperties {
    private int id;
}

2. Profile configuration

liu.jian.id = 20

3. Use

@EnableConfigurationProperties(ThreadPoolConfigProperties.class) // If the configuration attribute class does not add component, add this
ThreadPoolConfigProperties.getId(); // use

Keywords: Spring Boot Back-end thread pool

Added by Funk001 on Wed, 23 Feb 2022 02:06:17 +0200