preface
There are only two ways to create threads: inherit Thread or implement Runnable interface. However, both methods have a defect that they have no return value
After Java 1.5, you can obtain a Future object containing the return value by submitting a Callable to the thread pool
Limitations of Future interface
When the thread of Future performs a very time-consuming operation, our main thread will be blocked.
When we are in simple business, we can use another overloaded method get(long,TimeUnit) of Future to set the timeout time to avoid our main thread being blocked endlessly.
Simply using the Future interface or FutureTask class can not complete the following business we need
- Merge two asynchronous calculations into one. The two asynchronous calculations are independent of each other, and the second one depends on the result of the first one
- Wait for all tasks in the Future collection to be completed.
- Only wait for the fastest ending task in the Future collection to complete and return its results.
- Complete the execution of a Future task by programming
- When the completion time of Future is completed, you will be notified, and you can use the calculation results of Future for the next operation, not just blocking the results of waiting operation
What is completable future
In Java 8, a new class, completable Future, combines the advantages of Future, provides a very powerful extension function of Future, helps us simplify the complexity of asynchronous programming, provides the ability of functional programming, and can process the calculation results through callback
Completable future is designed for asynchronous programming in Java. The main thread does not have to block / wait for the completion of the task. You can use the main thread to execute other tasks in parallel. Using this parallel method greatly improves the performance of the program.
- Completable Future implements the Future interface, so it has the ability to return results asynchronously.
- Completabilefuture implements the CompletionStage interface, which is a new interface in Java 8. It is used for stage processing in asynchronous execution. It is widely used in the calculation of Lambda expressions. At present, there is only one implementation class of completabilefuture.
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
Method naming rules
- Methods with Async suffix are executed asynchronously by another thread, and there is no thread that reuses the previous task
- All methods with the Apply identifier can obtain the return value + those with a return value
- All methods with Accept can obtain the return value
- The method with run ID can not get the return value or no return value, but just run
get method and join method
- join: block getting results or throw non checked exceptions.
- Get: block to get the result or throw the detected exception, which needs to be displayed for try Catch processing
Use of different thread pools
Default thread pool execution
/** * Default thread pool * Operation results: * main.................start..... * main.................end...... * Current thread: forkjoinpool commonPool-worker-9 * Operation result: 5 */ @Test public void defaultThread() { System.out.println("main.................start....."); CompletableFuture.runAsync(() -> { System.out.println("Current thread:" + Thread.currentThread().getName()); int i = 10 / 2; System.out.println("Operation results:" + i); }); System.out.println("main.................end......"); }
Forkjoinpool is used by default commonPool (), which is a thread pool shared by many tasks, is designed to run non blocking CPU intensive tasks. In order to maximize the use of CPU, the number of threads is assumed to be - 1 by default
- Where is commonPool used
- CompletableFuture
- Parallel Streams
- Why introduce commonPool
- In order to avoid introducing a thread pool for any parallel operation, in the worst case, too many pool threads will be created on a single JVM, reducing efficiency.
- How is the commonPool thread pool created and used
- ForkJoinTask must run in a ForkJoinPool. If it is not explicitly submitted to the ForkJoinPool, a common pool (full process sharing) will be used to execute the task.
Custom thread pool execution
Customize a thread pool
private ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
Use defined thread pool
/** * Custom thread pool * Operation results: * main.................start..... * main.................end...... * Current thread: pool-1-thread-1 * Operation result: 5 */ @Test public void myThread() { System.out.println("main.................start....."); CompletableFuture.runAsync(() -> { System.out.println("Current thread:" + Thread.currentThread().getName()); int i = 10 / 2; System.out.println("Operation results:" + i); },executor); System.out.println("main.................end......"); }
Turn on an asynchronous
runAsync - no return value
Use runAsync to start an asynchronous task thread. This method returns no results. It is suitable for some asynchronous tasks that do not need results
/*** * No return value * runAsync * result: * main.................start..... * main.................end...... * Current thread: 33 * Operation result: 5 */ @Test public void runAsync() { System.out.println("main.................start....."); CompletableFuture.runAsync(() -> { System.out.println("Current thread:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("Operation results:" + i); }, executor); System.out.println("main.................end......"); }
supplyAsync return value
Use completable future The get () method gets the result, and the program will block here until the result is returned.
/** * There is a return value * supplyAsync * result: * main.................start..... * Current thread: 33 * Operation result: 5 * main.................end.....5 */ @Test public void supplyAsync() throws ExecutionException, InterruptedException { System.out.println("main.................start....."); CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Current thread:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("Operation results:" + i); return i; }, executor); System.out.println("main.................end....." + completableFuture.get()); }
If you have to go ahead to time out, use completable future Get (long timeout, timeunit) method.
Thread serialization method
Methods with Async suffix are executed asynchronously by another thread, and there is no thread that reuses the previous task
thenApply - after the above task is executed + get the return value + have a return value
/** * After the above task is executed, you can get the result + you can return the value * result: * thenApplyAsync Current thread: 33 * thenApplyAsync Operation result: 5 * thenApplyAsync Task 2 started..... Previous result: 5 * main.................end.....hello10 * * @throws ExecutionException * @throws InterruptedException */ @Test public void thenApplyAsync() throws ExecutionException, InterruptedException { CompletableFuture<String> thenApplyAsync = CompletableFuture.supplyAsync(() -> { System.out.println("thenApplyAsync Current thread:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("thenApplyAsync Operation results:" + i); return i; }, executor).thenApplyAsync(result -> { System.out.println("thenApplyAsync Task 2 started..... Previous results:" + result); return "hello" + result * 2; }, executor); System.out.println("main.................end....." + thenApplyAsync.get()); }
thenAccept - after the above task is executed + get the return value
/** * After the above tasks are executed, you can get the results * result: * thenAcceptAsync Current thread: 33 * thenAcceptAsync Operation result: 5 * thenAcceptAsync Task 2 started..... Previous result: 5 * @throws ExecutionException * @throws InterruptedException */ @Test public void thenAcceptAsync() throws ExecutionException, InterruptedException { CompletableFuture<Void> thenAcceptAsync = CompletableFuture.supplyAsync(() -> { System.out.println("thenAcceptAsync Current thread:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("thenAcceptAsync Operation results:" + i); return i; }, executor).thenAcceptAsync(result -> { System.out.println("thenAcceptAsync Task 2 started..... Previous results:" + result); }, executor); }
thenRun - the above task is executed
/** * After the above tasks are executed * result * main.................start..... * Current thread: 33 * Operation result: 5 * Task 2 started..... */ @Test public void thenRunAsync() throws ExecutionException, InterruptedException { System.out.println("main.................start....."); CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Current thread:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("Operation results:" + i); return i; }, executor).thenRunAsync(() -> { System.out.println("Task 2 started....."); }, executor); }
Thenpose - receives the return value and generates a new task
When the original task is completed, receive the return value and return a new task
- thenApply () converts the type in the generic type, which is equivalent to converting completabilefuture into a new completabilefuture
- Thenpose () is used to connect two completabilefuture and generate a new completabilefuture.
/** * When the original task is completed, receive the return value and return a new task * result: * hello: thenCompose */ @Test public void thenCompose() { CompletableFuture cf = CompletableFuture.completedFuture("hello") .thenCompose(str -> CompletableFuture.supplyAsync(() -> { return str + ": thenCompose"; },executor)); System.out.println(cf.join()); }
Task combination
thenCombine - consume two results + return results
/** * Both task combinations need to be completed * completableFuture.thenCombine()Get two future return results with return value * result: * Task 1 thread: 33 * Task 1 running result: 5 * Task 2 thread: 34 * Task 2 running result: * Task 5 start... Result 1:5... Result 2: hello * Task 5 result hello -- > 5 */ @Test public void thenCombine() throws ExecutionException, InterruptedException { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 1 thread:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("Task 1 running result:" + i); return i; }, executor); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 2 thread:" + Thread.currentThread().getId()); System.out.println("Task 2 running result:"); return "hello"; }, executor); CompletableFuture<String> thenCombineAsync = future1.thenCombineAsync(future2, (result1, result2) -> { System.out.println("Task 5 start... Result 1:" + result1 + ". . . Result 2:" + result2); return result2 + "-->" + result1; }, executor); System.out.println("Task 5 Results" + thenCombineAsync.get()); }
thenAcceptBoth - consume two results + no return
/** * Both task combinations need to be completed * completableFuture.thenAcceptBoth() Get two future return results, no return value * result: * Task 1 thread: 33 * Task 1 running result: 5 * Task 2 thread: 34 * Task 2 running result: * Task 4 start... Result 1:5... Result 2: hello */ @Test public void thenAcceptBothAsync() throws ExecutionException, InterruptedException { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 1 thread:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("Task 1 running result:" + i); return i; }, executor); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 2 thread:" + Thread.currentThread().getId()); System.out.println("Task 2 running result:"); return "hello"; }, executor); CompletableFuture<Void> thenAcceptBothAsync = future1.thenAcceptBothAsync(future2, (result1, result2) -> { System.out.println("Task 4 start... Result 1:" + result1 + ". . . Result 2:" + result2); }, executor); }
runAfterBoth - two tasks are completed and then run
/** * Both task combinations need to be completed * completableFuture.runAfterBoth() Combine two future s * result: * Task 1 thread: 33 * Task 1 running result: 5 * Task 2 thread: 34 * Task 2 running result: * Task 3 start... */ @Test public void runAfterBothAsync() { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 1 thread:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("Task 1 running result:" + i); return i; }, executor); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 2 thread:" + Thread.currentThread().getId()); System.out.println("Task 2 running result:"); return "hello"; }, executor); CompletableFuture<Void> runAfterBothAsync = future1.runAfterBothAsync(future2, () -> { System.out.println("Task 3 start..."); }, executor); }
Perform one of the two tasks
applyToEither - one of them is executed + get return value + have return value
/** * Two tasks are combined, and one task is executed upon completion * objectCompletableFuture.applyToEither() One of them is executed + get return value + have return value * result: * Task 1 thread: 33 * Task 2 thread: 34 * Task 2 running result: * Task 5 begins... Result: hello * Task 5 result: hello world * <p> * Process finished with exit code 0 */ @Test public void applyToEither() throws ExecutionException, InterruptedException { CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 1 thread:" + Thread.currentThread().getId()); int i = 10 / 2; try { Thread.sleep(3000); System.out.println("Task 1 running result:" + i); } catch (InterruptedException e) { e.printStackTrace(); } return i; }, executor); CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 2 thread:" + Thread.currentThread().getId()); System.out.println("Task 2 running result:"); return "hello"; }, executor); CompletableFuture<String> applyToEitherAsync = future1.applyToEitherAsync(future2, result -> { System.out.println("Task 5 begins... result:" + result); return result.toString() + " world"; }, executor); System.out.println("Task 5 result:" + applyToEitherAsync.get()); }
acceptEither - one of them is executed + get the return value
/** * Two tasks are combined, and one task is executed upon completion * objectCompletableFuture.acceptEither() After one of them is executed, execute + get the return value * result: * Task 1 thread: 33 * Task 2 thread: 34 * Task 2 running result: * Task 4 begins... Result: hello */ @Test public void acceptEither() { CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 1 thread:" + Thread.currentThread().getId()); int i = 10 / 2; try { Thread.sleep(3000); System.out.println("Task 1 running result:" + i); } catch (InterruptedException e) { e.printStackTrace(); } return i; }, executor); CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 2 thread:" + Thread.currentThread().getId()); System.out.println("Task 2 running result:"); return "hello"; }, executor); CompletableFuture<Void> acceptEitherAsync = future1.acceptEitherAsync(future2, result -> { System.out.println("Task 4 begins... result:" + result); }, executor); }
runAfterEither - execute as soon as a task is completed
/** * Two tasks are combined, and one task is executed upon completion * <p> * objectCompletableFuture.runAfterEither() One of them is finished * result: * Task 1 thread: 33 * Task 2 thread: 34 * Task 2 running result: * Task 3 begins... */ @Test public void runAfterEither() { CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 1 thread:" + Thread.currentThread().getId()); int i = 10 / 2; try { Thread.sleep(3000); System.out.println("Task 1 running result:" + i); } catch (InterruptedException e) { e.printStackTrace(); } return i; }, executor); CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 2 thread:" + Thread.currentThread().getId()); System.out.println("Task 2 running result:"); return "hello"; }, executor); CompletableFuture<Void> runAfterEitherAsync = future1.runAfterEitherAsync(future2, () -> { System.out.println("Task 3 begins..."); }, executor); }
Multi task combination
allOf - wait until all are completed
/** * Multi task combination * allOf Wait for all tasks to complete * result: * Task 1 * Task 3 * Task 2 * allOf Task 1 ------ task 2 ------ task 3 * * @throws ExecutionException * @throws InterruptedException */ @Test public void allOf() throws ExecutionException, InterruptedException { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 1"); return "Task 1"; }, executor); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); System.out.println("Task 2"); } catch (InterruptedException e) { e.printStackTrace(); } return "Task 2"; }, executor); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 3"); return "Task 3"; }, executor); CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3); //Wait for all tasks to complete //allOf.get(); allOf.join(); System.out.println("allOf" + future1.get() + "-------" + future2.get() + "-------" + future3.get()); }
anyOf - wait until one of them is completed
/** * Multi task combination * anyOf As long as one task is completed * result: * Task 1 * anyOf--The first thing to complete is task 1 * Task 3 * Wait, task 2 * Task 2 * * @throws ExecutionException * @throws InterruptedException */ @Test public void anyOf() throws ExecutionException, InterruptedException { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 1"); return "Task 1"; }, executor); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); System.out.println("Task 2"); } catch (InterruptedException e) { e.printStackTrace(); } return "Task 2"; }, executor); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { System.out.println("Task 3"); return "Task 3"; }, executor); CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3); System.out.println("anyOf--The first thing to finish is" + anyOf.get()); //Waiting for future2 to print System.out.println("Wait, task 2"); Thread.sleep(3000); }
sensation abnormality
handle - catch the result or exception and return a new result
If the input parameter is a result or exception, a new result will be returned
/** * If the input parameter is a result or exception, a new result is returned * result: * main.................start..... * Current thread: 33 * main.................end.....Error reporting return */ @Test public void handle() throws ExecutionException, InterruptedException { System.out.println("main.................start....."); final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Current thread:" + Thread.currentThread().getId()); int i = 10 / 0; System.out.println("Operation results:" + i); return i; }, executor).handleAsync((in, throwable) -> { if (throwable != null) { return "Error reporting return"; } return "Right"; }); System.out.println("main.................end....." + completableFuture.get()); }
whenComplete - perceive the result or exception and return the corresponding information
Although whenComplete gets exception information, it cannot modify the returned information
/** * There is a return value and subsequent operation whenComplete * <p> * result: * main.................start..... * Current thread: 33 * Asynchronous completion.... The result is: null Exception: Java util. concurrent. CompletionException: java. Lang. arithmeticexception: divide by zero * Reported wrong 2 * * @throws ExecutionException * @throws InterruptedException */ @Test public void whenComplete() { System.out.println("main.................start....."); final CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Current thread:" + Thread.currentThread().getId()); int i = 10 / 0; System.out.println("Operation results:" + i); return i; }, executor).whenComplete((result, throwable) -> { //Although whenComplete gets exception information, it cannot modify the returned information System.out.println("Asynchronous completion.... The result is:" + result + "...The exception is:" + throwable); }); try { System.out.println("main.................end..T..." + completableFuture.get()); } catch (InterruptedException e) { System.out.println("Reported wrong 1"); } catch (ExecutionException e) { System.out.println("Reported wrong 2"); } }
exceptionally - catches an exception and returns the specified value
/** * Perception after method completion * Perceives the error and returns the specified value exceptionally * result: * main.................start..... * Current thread: 33 * exceptionally executed * main.................end.....0 * @throws ExecutionException * @throws InterruptedException */ @Test public void exceptionally() throws ExecutionException, InterruptedException { System.out.println("main.................start....."); CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Current thread:" + Thread.currentThread().getId()); int i = 10 / 0; System.out.println("Operation results:" + i); return i; }, executor).exceptionally(throwable -> { //R apply(T t); //exceptionally can sense the error and return the specified value System.out.println("Yes exceptionally"); return 0; }); System.out.println("main.................end....." + completableFuture.get()); }
The content of this article is a detailed explanation of the use of completable future
- Friends who think the article is good can forward this article and pay attention to the Xiaobian;
- Thank you for your support!