Changwen. Take you through with code examples: be proficient in the use of completable future

preface

There are only two ways to create threads: inherit Thread or implement Runnable interface. However, both methods have a defect that they have no return value

After Java 1.5, you can obtain a Future object containing the return value by submitting a Callable to the thread pool

Limitations of Future interface

When the thread of Future performs a very time-consuming operation, our main thread will be blocked.

When we are in simple business, we can use another overloaded method get(long,TimeUnit) of Future to set the timeout time to avoid our main thread being blocked endlessly.

Simply using the Future interface or FutureTask class can not complete the following business we need

  • Merge two asynchronous calculations into one. The two asynchronous calculations are independent of each other, and the second one depends on the result of the first one
  • Wait for all tasks in the Future collection to be completed.
  • Only wait for the fastest ending task in the Future collection to complete and return its results.
  • Complete the execution of a Future task by programming
  • When the completion time of Future is completed, you will be notified, and you can use the calculation results of Future for the next operation, not just blocking the results of waiting operation

What is completable future

In Java 8, a new class, completable Future, combines the advantages of Future, provides a very powerful extension function of Future, helps us simplify the complexity of asynchronous programming, provides the ability of functional programming, and can process the calculation results through callback

Completable future is designed for asynchronous programming in Java. The main thread does not have to block / wait for the completion of the task. You can use the main thread to execute other tasks in parallel. Using this parallel method greatly improves the performance of the program.

  • Completable Future implements the Future interface, so it has the ability to return results asynchronously.
  • Completabilefuture implements the CompletionStage interface, which is a new interface in Java 8. It is used for stage processing in asynchronous execution. It is widely used in the calculation of Lambda expressions. At present, there is only one implementation class of completabilefuture.
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

Method naming rules

  • Methods with Async suffix are executed asynchronously by another thread, and there is no thread that reuses the previous task
  • All methods with the Apply identifier can obtain the return value + those with a return value
  • All methods with Accept can obtain the return value
  • The method with run ID can not get the return value or no return value, but just run

get method and join method

  • join: block getting results or throw non checked exceptions.
  • Get: block to get the result or throw the detected exception, which needs to be displayed for try Catch processing

Use of different thread pools

Default thread pool execution

/**
 * Default thread pool
 * Operation results:
 * main.................start.....
 * main.................end......
 * Current thread: forkjoinpool commonPool-worker-9
 * Operation result: 5
 */
@Test
public void defaultThread() {
    System.out.println("main.................start.....");
    CompletableFuture.runAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getName());
        int i = 10 / 2;
        System.out.println("Operation results:" + i);
    });
    System.out.println("main.................end......");
}

Forkjoinpool is used by default commonPool (), which is a thread pool shared by many tasks, is designed to run non blocking CPU intensive tasks. In order to maximize the use of CPU, the number of threads is assumed to be - 1 by default

  • Where is commonPool used
    • CompletableFuture
    • Parallel Streams
  • Why introduce commonPool
    • In order to avoid introducing a thread pool for any parallel operation, in the worst case, too many pool threads will be created on a single JVM, reducing efficiency.
  • How is the commonPool thread pool created and used
    • ForkJoinTask must run in a ForkJoinPool. If it is not explicitly submitted to the ForkJoinPool, a common pool (full process sharing) will be used to execute the task.

Custom thread pool execution

Customize a thread pool

private ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>());

Use defined thread pool

/**
 * Custom thread pool
 * Operation results:
 * main.................start.....
 * main.................end......
 * Current thread: pool-1-thread-1
 * Operation result: 5
 */
@Test
public void myThread() {
    System.out.println("main.................start.....");
    CompletableFuture.runAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getName());
        int i = 10 / 2;
        System.out.println("Operation results:" + i);
    },executor);
    System.out.println("main.................end......");
}

Turn on an asynchronous

runAsync - no return value

Use runAsync to start an asynchronous task thread. This method returns no results. It is suitable for some asynchronous tasks that do not need results

/***
 * No return value
 *  runAsync
 *  result:
 * main.................start.....
 * main.................end......
 * Current thread: 33
 * Operation result: 5
 */
@Test
public void runAsync() {
    System.out.println("main.................start.....");
    CompletableFuture.runAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Operation results:" + i);
    }, executor);
    System.out.println("main.................end......");
}

supplyAsync return value

Use completable future The get () method gets the result, and the program will block here until the result is returned.

/**
 * There is a return value
 * supplyAsync
 * result:
 * main.................start.....
 * Current thread: 33
 * Operation result: 5
 * main.................end.....5
 */
@Test
public void supplyAsync() throws ExecutionException, InterruptedException {
    System.out.println("main.................start.....");
    CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Operation results:" + i);
        return i;
    }, executor);
    System.out.println("main.................end....." + completableFuture.get());
}

If you have to go ahead to time out, use completable future Get (long timeout, timeunit) method.

Thread serialization method

Methods with Async suffix are executed asynchronously by another thread, and there is no thread that reuses the previous task

thenApply - after the above task is executed + get the return value + have a return value

/**
 * After the above task is executed, you can get the result + you can return the value
 * result:
 * thenApplyAsync Current thread: 33
 * thenApplyAsync Operation result: 5
 * thenApplyAsync Task 2 started..... Previous result: 5
 * main.................end.....hello10
 * 
 * @throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void thenApplyAsync() throws ExecutionException, InterruptedException {

    CompletableFuture<String> thenApplyAsync = CompletableFuture.supplyAsync(() -> {
        System.out.println("thenApplyAsync Current thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("thenApplyAsync Operation results:" + i);
        return i;
    }, executor).thenApplyAsync(result -> {
        System.out.println("thenApplyAsync Task 2 started..... Previous results:" + result);
        return "hello" + result * 2;
    }, executor);
    System.out.println("main.................end....." + thenApplyAsync.get());
}

thenAccept - after the above task is executed + get the return value

/**
 * After the above tasks are executed, you can get the results
 * result:
 * thenAcceptAsync Current thread: 33
 * thenAcceptAsync Operation result: 5
 * thenAcceptAsync Task 2 started..... Previous result: 5
 * @throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void thenAcceptAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> thenAcceptAsync = CompletableFuture.supplyAsync(() -> {
        System.out.println("thenAcceptAsync Current thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("thenAcceptAsync Operation results:" + i);
        return i;
    }, executor).thenAcceptAsync(result -> {
        System.out.println("thenAcceptAsync Task 2 started..... Previous results:" + result);
    }, executor);
}

thenRun - the above task is executed

/**
 * After the above tasks are executed
 * result
 * main.................start.....
 * Current thread: 33
 * Operation result: 5
 * Task 2 started.....
 */
@Test
public void thenRunAsync() throws ExecutionException, InterruptedException {
    System.out.println("main.................start.....");
    CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Operation results:" + i);
        return i;
    }, executor).thenRunAsync(() -> {
        System.out.println("Task 2 started.....");
    }, executor);
}

Thenpose - receives the return value and generates a new task

When the original task is completed, receive the return value and return a new task

  • thenApply () converts the type in the generic type, which is equivalent to converting completabilefuture into a new completabilefuture
  • Thenpose () is used to connect two completabilefuture and generate a new completabilefuture.
/**
 * When the original task is completed, receive the return value and return a new task
 * result:
 * hello: thenCompose
 */
@Test
public void thenCompose() {
    CompletableFuture cf = CompletableFuture.completedFuture("hello")
            .thenCompose(str -> CompletableFuture.supplyAsync(() -> {
                return str + ": thenCompose";
            },executor));
    System.out.println(cf.join());
}

Task combination

thenCombine - consume two results + return results

/**
 * Both task combinations need to be completed
 * completableFuture.thenCombine()Get two future return results with return value
 * result:
 * Task 1 thread: 33
 * Task 1 running result: 5
 * Task 2 thread: 34
 * Task 2 running result:
 * Task 5 start... Result 1:5... Result 2: hello
 * Task 5 result hello -- > 5
 */
@Test
public void thenCombine() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Task 1 running result:" + i);
        return i;
    }, executor);

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Task 2 running result:");
        return "hello";
    }, executor);
    CompletableFuture<String> thenCombineAsync = future1.thenCombineAsync(future2, (result1, result2) -> {
        System.out.println("Task 5 start... Result 1:" + result1 + ". . . Result 2:" + result2);
        return result2 + "-->" + result1;
    }, executor);
    System.out.println("Task 5 Results" + thenCombineAsync.get());
}

thenAcceptBoth - consume two results + no return

/**
 * Both task combinations need to be completed
 * completableFuture.thenAcceptBoth() Get two future return results, no return value
 * result:
 * Task 1 thread: 33
 * Task 1 running result: 5
 * Task 2 thread: 34
 * Task 2 running result:
 * Task 4 start... Result 1:5... Result 2: hello
 */
@Test
public void thenAcceptBothAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Task 1 running result:" + i);
        return i;
    }, executor);

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Task 2 running result:");
        return "hello";
    }, executor);

    CompletableFuture<Void> thenAcceptBothAsync = future1.thenAcceptBothAsync(future2, (result1, result2) -> {
        System.out.println("Task 4 start... Result 1:" + result1 + ". . . Result 2:" + result2);
    }, executor);

}

runAfterBoth - two tasks are completed and then run

/**
 * Both task combinations need to be completed
 * completableFuture.runAfterBoth() Combine two future s
 * result:
 * Task 1 thread: 33
 * Task 1 running result: 5
 * Task 2 thread: 34
 * Task 2 running result:
 * Task 3 start...
 */
@Test
public void runAfterBothAsync() {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Task 1 running result:" + i);
        return i;
    }, executor);

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Task 2 running result:");
        return "hello";
    }, executor);

    CompletableFuture<Void> runAfterBothAsync = future1.runAfterBothAsync(future2, () -> {
        System.out.println("Task 3 start...");
    }, executor);

}

Perform one of the two tasks

applyToEither - one of them is executed + get return value + have return value

/**
 * Two tasks are combined, and one task is executed upon completion
 * objectCompletableFuture.applyToEither() One of them is executed + get return value + have return value
 * result:
 * Task 1 thread: 33
 * Task 2 thread: 34
 * Task 2 running result:
 * Task 5 begins... Result: hello
 * Task 5 result: hello world
 * <p>
 * Process finished with exit code 0
 */
@Test
public void applyToEither() throws ExecutionException, InterruptedException {
    CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        try {
            Thread.sleep(3000);
            System.out.println("Task 1 running result:" + i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }, executor);

    CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Task 2 running result:");
        return "hello";
    }, executor);

    CompletableFuture<String> applyToEitherAsync = future1.applyToEitherAsync(future2, result -> {
        System.out.println("Task 5 begins... result:" + result);
        return result.toString() + " world";
    }, executor);
    System.out.println("Task 5 result:" + applyToEitherAsync.get());
}

acceptEither - one of them is executed + get the return value

/**
 * Two tasks are combined, and one task is executed upon completion
 * objectCompletableFuture.acceptEither() After one of them is executed, execute + get the return value
 * result:
 * Task 1 thread: 33
 * Task 2 thread: 34
 * Task 2 running result:
 * Task 4 begins... Result: hello
 */
@Test
public void acceptEither() {
    CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        try {
            Thread.sleep(3000);
            System.out.println("Task 1 running result:" + i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }, executor);

    CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Task 2 running result:");
        return "hello";
    }, executor);

    CompletableFuture<Void> acceptEitherAsync = future1.acceptEitherAsync(future2, result -> {
        System.out.println("Task 4 begins... result:" + result);
    }, executor);

}

runAfterEither - execute as soon as a task is completed

/**
 * Two tasks are combined, and one task is executed upon completion
 * <p>
 * objectCompletableFuture.runAfterEither() One of them is finished
 * result:
 * Task 1 thread: 33
 * Task 2 thread: 34
 * Task 2 running result:
 * Task 3 begins...
 */
@Test
public void runAfterEither() {
    CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        try {
            Thread.sleep(3000);
            System.out.println("Task 1 running result:" + i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }, executor);

    CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Task 2 running result:");
        return "hello";
    }, executor);

    CompletableFuture<Void> runAfterEitherAsync = future1.runAfterEitherAsync(future2, () -> {
        System.out.println("Task 3 begins...");
    }, executor);
}

Multi task combination

allOf - wait until all are completed

/**
 * Multi task combination
 * allOf Wait for all tasks to complete
 * result:
 * Task 1
 * Task 3
 * Task 2
 * allOf Task 1 ------ task 2 ------ task 3
 *
 * @throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void allOf() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1");
        return "Task 1";
    }, executor);
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
            System.out.println("Task 2");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task 2";
    }, executor);
    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 3");
        return "Task 3";
    }, executor);

    CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
    //Wait for all tasks to complete
    //allOf.get();
    allOf.join();
    System.out.println("allOf" + future1.get() + "-------" + future2.get() + "-------" + future3.get());

}

anyOf - wait until one of them is completed

/**
 * Multi task combination
 * anyOf As long as one task is completed
 * result:
 * Task 1
 * anyOf--The first thing to complete is task 1
 * Task 3
 * Wait, task 2
 * Task 2
 *
 * @throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void anyOf() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1");
        return "Task 1";
    }, executor);
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
            System.out.println("Task 2");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task 2";
    }, executor);

    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 3");
        return "Task 3";
    }, executor);
    CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
    System.out.println("anyOf--The first thing to finish is" + anyOf.get());
    //Waiting for future2 to print
    System.out.println("Wait, task 2");
    Thread.sleep(3000);
}

sensation abnormality

handle - catch the result or exception and return a new result

If the input parameter is a result or exception, a new result will be returned

/**
 * If the input parameter is a result or exception, a new result is returned
 * result:
 * main.................start.....
 * Current thread: 33
 * main.................end.....Error reporting return
 */
@Test
public void handle() throws ExecutionException, InterruptedException {
    System.out.println("main.................start.....");
    final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 0;
        System.out.println("Operation results:" + i);
        return i;
    }, executor).handleAsync((in, throwable) -> {
        if (throwable != null) {
            return "Error reporting return";
        }
        return "Right";
    });
    System.out.println("main.................end....." + completableFuture.get());

}

whenComplete - perceive the result or exception and return the corresponding information

Although whenComplete gets exception information, it cannot modify the returned information

/**
 * There is a return value and subsequent operation whenComplete
 * <p>
 * result:
 * main.................start.....
 * Current thread: 33
 * Asynchronous completion.... The result is: null Exception: Java util. concurrent. CompletionException: java. Lang. arithmeticexception: divide by zero
 * Reported wrong 2
 *
 * @throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void whenComplete() {
    System.out.println("main.................start.....");
    final CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 0;
        System.out.println("Operation results:" + i);
        return i;
    }, executor).whenComplete((result, throwable) -> {
        //Although whenComplete gets exception information, it cannot modify the returned information
        System.out.println("Asynchronous completion.... The result is:" + result + "...The exception is:" + throwable);
    });

    try {
        System.out.println("main.................end..T..." + completableFuture.get());
    } catch (InterruptedException e) {
        System.out.println("Reported wrong 1");
    } catch (ExecutionException e) {
        System.out.println("Reported wrong 2");
    }
}

exceptionally - catches an exception and returns the specified value

/**
 * Perception after method completion
 * Perceives the error and returns the specified value exceptionally
 * result:
 * main.................start.....
 * Current thread: 33
 * exceptionally executed
 * main.................end.....0
 * @throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void exceptionally() throws ExecutionException, InterruptedException {
    System.out.println("main.................start.....");
    CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 0;
        System.out.println("Operation results:" + i);
        return i;
    }, executor).exceptionally(throwable -> {
        //R apply(T t);
        //exceptionally can sense the error and return the specified value
        System.out.println("Yes exceptionally");
        return 0;
    });
    System.out.println("main.................end....." + completableFuture.get());
}

The content of this article is a detailed explanation of the use of completable future

  1. Friends who think the article is good can forward this article and pay attention to the Xiaobian;
  2. Thank you for your support!

Keywords: Java

Added by digitalecartoons on Mon, 07 Mar 2022 13:38:46 +0200