Complete solution of Java 8 completable future usage

1, Introduction

Completabilefuture implements the CompletionStage interface and the Future interface. The former is an extension of the latter. It increases the capabilities of asynchronous callback, streaming processing and multiple Future combination processing, making Java more smooth and convenient in dealing with multi task collaborative work.

2, Create asynchronous task

1,Future.submit

The usual thread pool interface class ExecutorService, in which the return value of the execute method is void, that is, the execution status of asynchronous tasks cannot be obtained, and the return value of the three overloaded submit methods is Future, which can be used to obtain the execution status and results of tasks. Examples are as follows:

    @Test
    public void test3() throws Exception {
        // To create an asynchronous execution task:
        ExecutorService executorService= Executors.newSingleThreadExecutor();
        Future<Double> cf = executorService.submit(()->{
            System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(false){
                throw new RuntimeException("test");
            }else{
                System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
                return 1.2;
            }
        });
        System.out.println("main thread start,time->"+System.currentTimeMillis());
        //Wait for the execution of the subtask to be completed. If it has been completed, the result will be returned directly
        //If the execution task is abnormal, the get method will re throw the previously caught exception
        System.out.println("run result->"+cf.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

The results are as follows:

The sub thread is executed asynchronously. The main thread sleeps and waits for the sub thread to complete execution. After the sub thread completes execution, the main thread wakes up, and the main thread exits after obtaining the task execution result.

Many blogs say that when using the get method without waiting time limit, if the sub thread executes abnormally, it will cause the main thread to block for a long time, which is actually wrong. When the sub thread executes abnormally, its exception will be caught, and then modify the task state to abnormal end and wake up the waiting main thread. The get method judges that the task state has changed and terminates the wait, And throw an exception.

2,supplyAsync / runAsync

supplyAsync means to create an asynchronous task with a return value, which is equivalent to the executorservice submit (callable < T > task) method. runAsync means to create an asynchronous task without a return value, which is equivalent to the ExecutorService submit(Runnable task) method. The effects of these two methods are the same as that of submit. The test cases are as follows:

    @Test
    public void test2() throws Exception {
        // Create asynchronous execution task with return value
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(true){
                throw new RuntimeException("test");
            }else{
                System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
                return 1.2;
            }
        });
        System.out.println("main thread start,time->"+System.currentTimeMillis());
        //Wait for subtask execution to complete
        System.out.println("run result->"+cf.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }
 
   @Test
    public void test4() throws Exception {
        // Create asynchronous execution task, no return value
        CompletableFuture cf = CompletableFuture.runAsync(()->{
            System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(false){
                throw new RuntimeException("test");
            }else{
                System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
            }
        });
        System.out.println("main thread start,time->"+System.currentTimeMillis());
        //Wait for subtask execution to complete
        System.out.println("run result->"+cf.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

These two methods each have an overloaded version. You can specify the Executor implementation for executing asynchronous tasks. If not specified, forkjoinpool. Com is used by default Commonpool(), if the machine is single core, threadpertaskeexecutor will be used by default. This class is an internal class, and a new thread will be created each time execution is execute d. The test cases are as follows:

   @Test
    public void test2() throws Exception {
        ForkJoinPool pool=new ForkJoinPool();
        // To create an asynchronous execution task:
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(true){
                throw new RuntimeException("test");
            }else{
                System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
                return 1.2;
            }
        },pool);
        System.out.println("main thread start,time->"+System.currentTimeMillis());
        //Wait for subtask execution to complete
        System.out.println("run result->"+cf.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }
 
@Test
    public void test4() throws Exception {
        ExecutorService executorService= Executors.newSingleThreadExecutor();
        // To create an asynchronous execution task:
        CompletableFuture cf = CompletableFuture.runAsync(()->{
            System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(false){
                throw new RuntimeException("test");
            }else{
                System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
            }
        },executorService);
        System.out.println("main thread start,time->"+System.currentTimeMillis());
        //Wait for subtask execution to complete
        System.out.println("run result->"+cf.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

3, Asynchronous callback

1,thenApply / thenApplyAsync

thenApply refers to the action executed after a task is executed, that is, the callback method. The execution result of the task, that is, the method return value, will be passed to the callback method as an input parameter. The test case is as follows:

@Test
    public void test5() throws Exception {
        ForkJoinPool pool=new ForkJoinPool();
        // To create an asynchronous execution task:
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
            return 1.2;
        },pool);
        //cf the return value of the associated asynchronous task is passed into thenApply's method as a method input parameter
        //thenApply actually creates a new completable future instance here
        CompletableFuture<String> cf2=cf.thenApply((result)->{
            System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
            return "test:"+result;
        });
        System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
        //Wait for subtask execution to complete
        System.out.println("run result->"+cf.get());
        System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis());
        System.out.println("run result->"+cf2.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

The implementation results are as follows:

After the execution of job1, the return value of the method of job1 is passed to job2 as an input parameter, and job2 is executed immediately. The difference between thenApplyAsync and thenApply is that the former submits job2 to the thread pool for asynchronous execution. The thread actually executing job2 may be another thread. The latter executes job2 immediately by the thread executing job1, that is, both jobs are executed by the same thread. After changing thenApply to thenApplyAsync in the above test cases, the execution results are as follows:

From the output, we can see that job1 and job2 are two different threads. thenApplyAsync has an overloaded version, which can specify the Executor implementation for executing asynchronous tasks. If it is not specified, forkjoinpool is used by default commonPool(). For the following multiple methods, each method has two methods ending in Async. One is implemented by the default Executor and the other is implemented by the specified Executor. The method without Async executes the task by the thread that triggers the task, and the method with Async submits the task to the thread pool by the thread that triggers the task, The thread executing the task is not necessarily the same as the thread triggering the task.

2,thenAccept / thenRun

thenAccept and thenApply receive the return value of the previous task as a parameter, but there is no return value; The method of thenRun has no input parameters and also has a return value. The test cases are as follows:

@Test
    public void test6() throws Exception {
        ForkJoinPool pool=new ForkJoinPool();
        // To create an asynchronous execution task:
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
            return 1.2;
        },pool);
        //cf the return value of the associated asynchronous task is passed into thenApply's method as a method input parameter
        CompletableFuture cf2=cf.thenApply((result)->{
            System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
            return "test:"+result;
        }).thenAccept((result)-> { //Receive the execution result of the previous task as an input parameter, but there is no return value
            System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(result);
            System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
        }).thenRun(()->{ //There are no input parameters and no return values
            System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println("thenRun do something");
            System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis());
        });
        System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
        //Wait for subtask execution to complete
        System.out.println("run result->"+cf.get());
        System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis());
        //Wait for the last thenRun execution to complete
        System.out.println("run result->"+cf2.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

The implementation results are as follows:

3,exceptionally 

The exceptionally method specifies the callback method to be executed when a task executes an exception. The exception thrown will be passed to the callback method as a parameter. If the task executes normally, the CompletionStage result returned by the exceptionally method is the result of the normal execution of the task. The test case is as follows:

@Test
    public void test2() throws Exception {
        ForkJoinPool pool=new ForkJoinPool();
        // To create an asynchronous execution task:
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(true){
                throw new RuntimeException("test");
            }else{
                System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
                return 1.2;
            }
        },pool);
        //cf when executing an exception, pass the thrown exception as an input parameter to the callback method
        CompletableFuture<Double> cf2= cf.exceptionally((param)->{
             System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println("error stack trace->");
            param.printStackTrace();
            System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
             return -1.1;
        });
        //cf this is the logic executed during normal execution. If the execution is abnormal, this logic will not be called
        CompletableFuture cf3=cf.thenAccept((param)->{
            System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println("param->"+param);
            System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
        });
        System.out.println("main thread start,time->"+System.currentTimeMillis());
        //Wait for the execution of the subtask to complete. Here, both job2 and job3 can exit job2 before the main thread exits. If it is cf, the main thread will not wait for the execution of job2 to complete and exit automatically
        //cf2. During get, there is no exception, but there is still a return value, that is, the return value of CF
        System.out.println("run result->"+cf2.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

Its output is as follows:

After an exception is thrown, only cf2 executes and cf3 does not. Change the if(true) in the above example to if(false), and the output is as follows:

 

cf2 if it is not specified, the result is the result of cf execution. In theory, cf2 Get should return immediately. Here it waits for cf3, that is, it returns only after job2 is completed. The specific reasons will be discussed in the next source code analysis.  

4,whenComplete 

whenComplete is a callback method executed after a task is executed. It will pass the execution result or the exception thrown during execution to the callback method. If it is executed normally, the exception is null. The result of the completable future corresponding to the callback method is consistent with the task. If the task is executed normally, the get method returns the execution result. If it is executed abnormally, The get method throws an exception. The test cases are as follows:

@Test
    public void test10() throws Exception {
        // To create an asynchronous execution task:
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(false){
                throw new RuntimeException("test");
            }else{
                System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
                return 1.2;
            }
        });
        //cf after execution, the execution result and the exception thrown during execution will be passed into the callback method. If it is executed normally, the passed exception is null
        CompletableFuture<Double> cf2=cf.whenComplete((a,b)->{
            System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(b!=null){
                System.out.println("error stack trace->");
                b.printStackTrace();
            }else{
                System.out.println("run succ,result->"+a);
            }
            System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
        });
        //Wait for subtask execution to complete
        System.out.println("main thread start wait,time->"+System.currentTimeMillis());
        //If cf is executed normally, cf2 The result of get is the result of cf execution
        //If cf is an execution exception, cf2 Get throws an exception
        System.out.println("run result->"+cf2.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

The results are as follows:

Change the if(false) in the above example to if(true), and the output is as follows:

 

5,handle

It is basically the same as whenComplete, except that the callback method of handle has a return value, and the result of completable future returned by the handle method is the execution result of the callback method or the exception thrown during the execution of the callback method, which has nothing to do with the result of the original completable future. The test cases are as follows:

 @Test
    public void test10() throws Exception {
        // To create an asynchronous execution task:
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(true){
                throw new RuntimeException("test");
            }else{
                System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
                return 1.2;
            }
        });
        //cf after execution, the execution result and the exception thrown during execution will be passed into the callback method. If it is executed normally, the passed exception is null
        CompletableFuture<String> cf2=cf.handle((a,b)->{
            System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            if(b!=null){
                System.out.println("error stack trace->");
                b.printStackTrace();
            }else{
                System.out.println("run succ,result->"+a);
            }
            System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
            if(b!=null){
                return "run error";
            }else{
                return "run succ";
            }
        });
        //Wait for subtask execution to complete
        System.out.println("main thread start wait,time->"+System.currentTimeMillis());
        //The result of get is the return value of cf2, which has nothing to do with cf
        System.out.println("run result->"+cf2.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

The implementation results are as follows:

Change the if(true) in the above example to if(false), and the output is as follows:

 

4, Combination processing

 1,thenCombine / thenAcceptBoth / runAfterBoth

These three methods combine two completable future. A task will be executed only after the two are executed normally. The difference is that thenCombine will pass the execution results of the two tasks to the specified method as method parameters, and the method has a return value; thenAcceptBoth also takes the execution results of the two tasks as method parameters, but there is no return value; runAfterBoth has no input parameters and no return value. Note that if there is only one execution exception in the two tasks, the exception information will be taken as the execution result of the specified task. The test cases are as follows:

@Test
    public void test7() throws Exception {
        ForkJoinPool pool=new ForkJoinPool();
        // To create an asynchronous execution task:
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
            return 1.2;
        });
        CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
            return 3.2;
        });
        //After the asynchronous tasks of cf and cf2 are completed, the execution results will be passed to cf3 as method input parameters with return values
        CompletableFuture<Double> cf3=cf.thenCombine(cf2,(a,b)->{
            System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
            System.out.println("job3 param a->"+a+",b->"+b);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
            return a+b;
        });
 
        //After the asynchronous tasks of cf and cf2 are completed, the execution results will be passed to cf3 as method input parameters without return value
        CompletableFuture cf4=cf.thenAcceptBoth(cf2,(a,b)->{
            System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis());
            System.out.println("job4 param a->"+a+",b->"+b);
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis());
        });
 
        //After both cf4 and cf3 are executed, execute cf5 without input parameters and return values
        CompletableFuture cf5=cf4.runAfterBoth(cf3,()->{
            System.out.println(Thread.currentThread()+" start job5,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println("cf5 do something");
            System.out.println(Thread.currentThread()+" exit job5,time->"+System.currentTimeMillis());
        });
 
        System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
        //Wait for subtask execution to complete
        System.out.println("cf run result->"+cf.get());
        System.out.println("main thread start cf5.get(),time->"+System.currentTimeMillis());
        System.out.println("cf5 run result->"+cf5.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

The operation results are as follows:

Job1 and job2 run almost at the same time. Job2 executes before job1. After job1 exits, job3 and job4 start running almost at the same time. Job4 exits first. After job3 executes, job5 starts. After job5 executes, the main thread exits.

2,applyToEither / acceptEither / runAfterEither 

These three methods are a combination of two completable future. As long as one of them is completed, a task will be executed. The difference is that applyToEither takes the execution result of the completed task as a method parameter and has a return value; acceptEither also takes the execution result of the completed task as a method parameter, but there is no return value; runAfterEither has no method arguments and no return value. Note that if there is only one execution exception in the two tasks, the exception information will be taken as the execution result of the specified task. The test cases are as follows:

@Test
    public void test8() throws Exception {
        // To create an asynchronous execution task:
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
            return 1.2;
        });
        CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
            return 3.2;
        });
        //After the asynchronous tasks of cf and cf2 are completed, the execution results will be passed to cf3 as method input parameters with return values
        CompletableFuture<Double> cf3=cf.applyToEither(cf2,(result)->{
            System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
            System.out.println("job3 param result->"+result);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
            return result;
        });
 
        //After the asynchronous tasks of cf and cf2 are completed, the execution results will be passed to cf3 as method input parameters without return value
        CompletableFuture cf4=cf.acceptEither(cf2,(result)->{
            System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis());
            System.out.println("job4 param result->"+result);
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis());
        });
 
        //After both cf4 and cf3 are executed, execute cf5 without input parameters and return values
        CompletableFuture cf5=cf4.runAfterEither(cf3,()->{
            System.out.println(Thread.currentThread()+" start job5,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println("cf5 do something");
            System.out.println(Thread.currentThread()+" exit job5,time->"+System.currentTimeMillis());
        });
 
        System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
        //Wait for subtask execution to complete
        System.out.println("cf run result->"+cf.get());
        System.out.println("main thread start cf5.get(),time->"+System.currentTimeMillis());
        System.out.println("cf5 run result->"+cf5.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

The operation results are as follows:

Job1 and job2 start running at the same time. Job2 completes execution first, and then job4 starts execution. Theoretically, job3 and job4 should start running at the same time, but only job4 starts execution at this time. Job3 starts execution after job1 completes execution. Job4 completes execution before job3, and then job5 starts execution. After job5 completes execution, the main thread exits. The above differences will be discussed in the next source code analysis.

3,thenCompose

After a task is completed, thenpose method will take the execution result of the task as a method parameter, and then execute the specified method. The method will return a new completable future instance. If the result of the completable future instance is not null, it will return a new completable future instance based on the result; If the completable future instance is null, then execute the new task. The test case is as follows:

    @Test
    public void test9() throws Exception {
        // To create an asynchronous execution task:
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
            return 1.2;
        });
        CompletableFuture<String> cf2= cf.thenCompose((param)->{
            System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
            return CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
                return "job3 test";
            });
        });
        System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
        //Wait for subtask execution to complete
        System.out.println("cf run result->"+cf.get());
        System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis());
        System.out.println("cf2 run result->"+cf2.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

Its output is as follows:

After the execution of job1, job2 starts to execute. After the execution of job2, job3 will be returned, and then job3 will be executed. After the execution of job3, the main thread will exit.

4,allOf / anyOf 

The completable future returned by allOf will be executed only after multiple tasks have been executed. If there is only one task execution exception, the returned completable future will throw an exception when executing the get method. If all tasks are executed normally, get will return null.

 @Test
    public void test11() throws Exception {
        // To create an asynchronous execution task:
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
            return 1.2;
        });
        CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
            return 3.2;
        });
        CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(1300);
            } catch (InterruptedException e) {
            }
//            throw new RuntimeException("test");
            System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
            return 2.2;
        });
        //allof waits for all tasks to complete before executing cf4. If a task terminates abnormally, cf4 Exceptions will be thrown during get, which are executed normally. cf4 Get returned null
        //anyOf means that only one task is executed. Whether it is executed normally or abnormally, it will execute cf4, cf4 The result of get is the execution result of the completed task
        CompletableFuture cf4=CompletableFuture.allOf(cf,cf2,cf3).whenComplete((a,b)->{
           if(b!=null){
               System.out.println("error stack trace->");
               b.printStackTrace();
           }else{
               System.out.println("run succ,result->"+a);
           }
        });
 
        System.out.println("main thread start cf4.get(),time->"+System.currentTimeMillis());
        //Wait for subtask execution to complete
        System.out.println("cf4 run result->"+cf4.get());
        System.out.println("main thread exit,time->"+System.currentTimeMillis());
    }

Its output is as follows:

The main thread waits for the last job1 execution to complete before exiting. The completable future returned by anyOf is that multiple tasks will be executed as long as one of them is completed. Its get returns the execution result of the completed task. If the task execution exception, an exception will be thrown. After changing allOf to anyOf in the above test cases, the output is as follows:

Keywords: Java Multithreading

Added by itreP on Wed, 15 Dec 2021 07:40:37 +0200