Combinatorial asynchronous programming: completable future
1, Future interface
Using the example in the following code, submit a code that takes 5000ms to execute to the thread pool, and then immediately execute and print other code No blocking will be received. When the get() method of future is executed, the code will be blocked and wait for the task to complete and return the result. In this example, we use asynchronous tasks to perform time-consuming operations. Get execution results through future
/** * @author kenewstar * @date 2022/01/25 */ public class TestAsync1 { public static void main(String[] args) throws Exception { // Create a thread pool ExecutorService pool = Executors.newCachedThreadPool(); // Submit a task to the thread pool for execution Future<Integer> future = pool.submit(() -> { Thread.sleep(5000); return 10; }); // Non blocking code System.out.println("other code ..."); // The get() method is executed after blocking for 5s System.out.println(future.get()); // 10 // Close thread pool pool.shutdown(); } }
The limitation in the example is that when the task execution time is too long and the result cannot be returned, the get method will be blocked all the time. We can use a timeout setting. When the processing time expires, the execution operation will be interrupted.
Next, we will use the completable future in Java 8 to build asynchronous tasks
Synchronous API and asynchronous API (Java 8 actual combat)
In fact, synchronization API is just another name for traditional method calls: if you call a method, the caller will wait while the callee is running, and the callee will return after running. The caller will obtain the return value of the callee and continue running. Even if the caller and callee run in different threads, the caller still needs to wait for the callee to finish running. This is the origin of the term blocking call.
On the contrary, the asynchronous API will return directly, or at least hand over the remaining calculation tasks to another thread before the callee completes the calculation. The thread and the caller are asynchronous -- that's the origin of non blocking calls. The thread executing the remaining calculation tasks will return its calculation results to the caller. The method of return is either through the callback function, or the caller performs a method call of "wait until the calculation is completed" again. This kind of calculation is very common in 1O system programming: you initiate a disk access, which is asynchronous with your other calculation operations. When you complete other tasks, the data of the disk block may not be loaded into the memory, so you just need to wait for the data to be loaded.
2, Completable future uses
The asynchronous execution process of the following code can return good results under normal circumstances, and the execution time is about 2s. However, when the asynchronous task throws an exception and fails to execute the complete method of completable future, the thread will be permanently blocked when the executed get method obtains the value.
/** * @author kenewstar * @date 2022/01/25 */ public class TestAsync2 { @SneakyThrows private static void delay() { Thread.sleep(2000); } private static Future<Integer> getAsync() { CompletableFuture<Integer> cf = new CompletableFuture<>(); new Thread(() -> { delay(); cf.complete(100); }).start(); return cf; } @SneakyThrows public static void main(String[] args) { long start = System.nanoTime(); Future<Integer> async = getAsync(); delay(); System.out.println(async.get()); // 100 long end = System.nanoTime(); System.out.println((end - start)/1000/1000 + "ms"); // 2097ms } }
Let's take a look at the following optimized code, because the above code may permanently block the thread in the event of an exception. We should avoid the above situation. Catch the exception in the task and throw it to the client through the completeexceptional method. At this time, the client thread will not cause blocking
/** * @author kenewstar * @date 2022/01/25 */ public class TestAsync2 { @SneakyThrows private static void delay() { Thread.sleep(2000); throw new RuntimeException("abnormal"); } private static Future<Integer> getAsync() { CompletableFuture<Integer> cf = new CompletableFuture<>(); new Thread(() -> { try { delay(); cf.complete(100); } catch (Exception e) { cf.completeExceptionally(e); } }).start(); return cf; } @SneakyThrows public static void main(String[] args) { long start = System.nanoTime(); Future<Integer> async = getAsync(); //delay(); System.out.println(async.get()); long end = System.nanoTime(); System.out.println((end - start)/1000/1000 + "ms"); } }
The results are as follows:
Create a completable future using the factory method supplyAsync
The asynchronous task is executed in the following way. When an exception occurs to the task, it will be automatically thrown to the client to end the execution without permanently blocking the thread
/** * @author kenewstar * @date 2022/01/25 */ public class TestAsync3 { @SneakyThrows private static void delay() { Thread.sleep(2000); //throw new RuntimeException("exception"); } private static void throwException() { throw new RuntimeException(); } private static Future<Integer> getAsync() { return CompletableFuture.supplyAsync(() -> { delay(); throwException(); return 100; }); } @SneakyThrows public static void main(String[] args) { Future<Integer> async = getAsync(); delay(); System.out.println(async.get()); } }