Understand the use of completable future

preface

There are only two ways to create a Thread: inherit the Thread or implement the Runnable interface. However, both methods have a defect that they do not return a value

After Java 1.5, you can obtain a Future object containing the return value by submitting a Callable to the thread pool

Limitations of Future interface

When the Future thread performs a very time-consuming operation, our main thread will be blocked.

When we are in a simple business, we can use another overloaded method get(long,TimeUnit) of Future to set the timeout to avoid our main thread being blocked endlessly.

Simply using the Future interface or FutureTask class can not do the following business well

  • Merge two asynchronous calculations into one. The two asynchronous calculations are independent of each other, and the second depends on the result of the first
  • Wait for all tasks in the Future collection to complete.
  • Only wait for the fastest ending task in the Future collection to complete and return its results.
  • Complete the execution of a Future task by programming
  • When the completion time of Future is completed, you will be notified, and you can use the calculation results of Future for the next operation, not just blocking the results of waiting operations

What is completable future

In Java 8, a new class, completable Future, combines the advantages of Future, provides a very powerful extension function of Future, helps us simplify the complexity of asynchronous programming, provides the ability of functional programming, and can process the calculation results through callback

Completable future is designed for asynchronous programming in Java. The main thread does not have to block / wait for the completion of the task. You can use the main thread to execute other tasks in parallel. Using this parallel method greatly improves the performance of the program.

  • Completable Future implements the Future interface, so it has the ability to return results asynchronously.
  • Completabilefuture implements the CompletionStage interface, which is a new interface in Java 8. It is used for stage processing in asynchronous execution. It is widely used in the calculation of Lambda expressions. At present, there is only one implementation class of completabilefuture.
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

Method naming rules

  • Methods with Async suffix are executed asynchronously by another thread, and there is no thread that reuses the previous task

  • All methods with the Apply ID can obtain the return value + with a return value

  • All methods with Accept ID can obtain the return value

  • The method with run ID can not get the return value or no return value, but just run

get method and join method

  • join: block getting results or throw non checked exceptions.
  • get: blocking getting results or throwing detected exceptions, which needs to be displayed for try Catch processing

Use of different thread pools

Default thread pool execution

/**
 * Default thread pool
 * Operation results:
 * main.................start.....
 * main.................end......
 * Current thread: forkjoinpool commonPool-worker-9
 * Operation result: 5
 */
@Test
public void defaultThread() {
    System.out.println("main.................start.....");
    CompletableFuture.runAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getName());
        int i = 10 / 2;
        System.out.println("Operation results:" + i);
    });
    System.out.println("main.................end......");
}

Forkjoinpool is used by default commonPool (), which is a thread pool shared by many tasks, is designed to run non blocking CPU intensive tasks. In order to maximize CPU utilization, the number of threads is assumed to be - 1 by default

  • Where is commonPool used
    • CompletableFuture
    • Parallel Streams
  • Why introduce commonPool
    • In order to avoid introducing a thread pool for any parallel operation, in the worst case, too many pool threads will be created on a single JVM, reducing efficiency.
  • How is the commonPool thread pool created and used
    • ForkJoinTask must run in a ForkJoinPool. If it is not explicitly submitted to the ForkJoinPool, a common pool (full process sharing) will be used to execute the task.

Custom thread pool execution

Customize a thread pool

private ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>());

Use defined thread pool

/**
 * Custom thread pool
 * Operation results:
 * main.................start.....
 * main.................end......
 * Current thread: pool-1-thread-1
 * Operation result: 5
 */
@Test
public void myThread() {
    System.out.println("main.................start.....");
    CompletableFuture.runAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getName());
        int i = 10 / 2;
        System.out.println("Operation results:" + i);
    },executor);
    System.out.println("main.................end......");
}

Turn on an asynchronous

runAsync - no return value

Use runAsync to start an asynchronous task thread. This method returns no results. It is suitable for some asynchronous tasks that do not need results

/***
 * No return value
 *  runAsync
 *  result:
 * main.................start.....
 * main.................end......
 * Current thread: 33
 * Operation result: 5
 */
@Test
public void runAsync() {
    System.out.println("main.................start.....");
    CompletableFuture.runAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Operation results:" + i);
    }, executor);
    System.out.println("main.................end......");
}

supplyAsync - has a return value

Use completabilefuture The get () method gets the result, and the program will block here until the result is returned.

/**
 * There is a return value
 * supplyAsync
 * result:
 * main.................start.....
 * Current thread: 33
 * Operation result: 5
 * main.................end.....5
 */
@Test
public void supplyAsync() throws ExecutionException, InterruptedException {
    System.out.println("main.................start.....");
    CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Operation results:" + i);
        return i;
    }, executor);
    System.out.println("main.................end....." + completableFuture.get());
}

If you have to go ahead to time out, use completable future Get (long timeout, timeunit) method.

Thread serialization method

Methods with Async suffix are executed asynchronously by another thread, and there is no thread that reuses the previous task

thenApply - after the above task is executed + get the return value + have a return value

/**
 * After the above task is executed, you can get the result + you can return the value
 * result:
 * thenApplyAsync Current thread: 33
 * thenApplyAsync Operation result: 5
 * thenApplyAsync Task 2 started..... Previous result: 5
 * main.................end.....hello10
 * 
 * @throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void thenApplyAsync() throws ExecutionException, InterruptedException {

    CompletableFuture<String> thenApplyAsync = CompletableFuture.supplyAsync(() -> {
        System.out.println("thenApplyAsync Current thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("thenApplyAsync Operation results:" + i);
        return i;
    }, executor).thenApplyAsync(result -> {
        System.out.println("thenApplyAsync Task 2 started..... Previous results:" + result);
        return "hello" + result * 2;
    }, executor);
    System.out.println("main.................end....." + thenApplyAsync.get());
}

thenAccept - after the above task is executed + get the return value

/**
 * After the above tasks are executed, you can get the results
 * result:
 * thenAcceptAsync Current thread: 33
 * thenAcceptAsync Operation result: 5
 * thenAcceptAsync Task 2 started..... Previous result: 5
 * @throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void thenAcceptAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> thenAcceptAsync = CompletableFuture.supplyAsync(() -> {
        System.out.println("thenAcceptAsync Current thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("thenAcceptAsync Operation results:" + i);
        return i;
    }, executor).thenAcceptAsync(result -> {
        System.out.println("thenAcceptAsync Task 2 started..... Previous results:" + result);
    }, executor);
}

thenRun - the above task is executed

/**
 * The above task is completed
 * result
 * main.................start.....
 * Current thread: 33
 * Operation result: 5
 * Task 2 started.....
 */
@Test
public void thenRunAsync() throws ExecutionException, InterruptedException {
    System.out.println("main.................start.....");
    CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Operation results:" + i);
        return i;
    }, executor).thenRunAsync(() -> {
        System.out.println("Task 2 started.....");
    }, executor);
}

Thenpose - receives the return value and generates a new task

When the original task is completed, it receives the return value and returns a new task

  • Theapply () converts types in generics, which is equivalent to converting completabilefuture into a new completabilefuture
  • Thenpose () is used to connect two completabilefuture and generate a new completabilefuture.
/**
 * When the original task is completed, it receives the return value and returns a new task
 * result:
 * hello: thenCompose
 */
@Test
public void thenCompose() {
    CompletableFuture cf = CompletableFuture.completedFuture("hello")
            .thenCompose(str -> CompletableFuture.supplyAsync(() -> {
                return str + ": thenCompose";
            },executor));
    System.out.println(cf.join());
}

Task combination

thenCombine - consume two results + return results

/**
 * Both task combinations need to be completed
 * completableFuture.thenCombine()Get two future return results with return value
 * result:
 * Task 1 thread: 33
 * Task 1 running result: 5
 * Task 2 thread: 34
 * Task 2 running results:
 * Task 5 start... Result 1:5... Result 2: hello
 * Task 5 result hello -- > 5
 */
@Test
public void thenCombine() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Task 1 running result:" + i);
        return i;
    }, executor);

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Task 2 running result:");
        return "hello";
    }, executor);
    CompletableFuture<String> thenCombineAsync = future1.thenCombineAsync(future2, (result1, result2) -> {
        System.out.println("Task 5 start... Result 1:" + result1 + ". . . Result 2:" + result2);
        return result2 + "-->" + result1;
    }, executor);
    System.out.println("Task 5 Results" + thenCombineAsync.get());
}

thenAcceptBoth - consume two results + no return

/**
 * Both task combinations need to be completed
 * completableFuture.thenAcceptBoth() Get two future return results with no return value
 * result:
 * Task 1 thread: 33
 * Task 1 running result: 5
 * Task 2 thread: 34
 * Task 2 running results:
 * Task 4 start... Result 1:5... Result 2: hello
 */
@Test
public void thenAcceptBothAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Task 1 running result:" + i);
        return i;
    }, executor);

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Task 2 running result:");
        return "hello";
    }, executor);

    CompletableFuture<Void> thenAcceptBothAsync = future1.thenAcceptBothAsync(future2, (result1, result2) -> {
        System.out.println("Task 4 start... Result 1:" + result1 + ". . . Result 2:" + result2);
    }, executor);

}

runAfterBoth - two tasks are completed and then run

/**
 * Both task combinations need to be completed
 * completableFuture.runAfterBoth() Combine two future s
 * result:
 * Task 1 thread: 33
 * Task 1 running result: 5
 * Task 2 thread: 34
 * Task 2 running result:
 * Task 3 start...
 */
@Test
public void runAfterBothAsync() {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Task 1 running result:" + i);
        return i;
    }, executor);

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Task 2 running result:");
        return "hello";
    }, executor);

    CompletableFuture<Void> runAfterBothAsync = future1.runAfterBothAsync(future2, () -> {
        System.out.println("Task 3 start...");
    }, executor);

}

Perform one of the two tasks

applyToEither - one of them is executed + get return value + have return value

/**
 * Two tasks are combined, and one task is executed upon completion
 * objectCompletableFuture.applyToEither() One of them is executed + get return value + have return value
 * result:
 * Task 1 thread: 33
 * Task 2 thread: 34
 * Task 2 running result:
 * Task 5 begins... Result: hello
 * Task 5 result: hello world
 * <p>
 * Process finished with exit code 0
 */
@Test
public void applyToEither() throws ExecutionException, InterruptedException {
    CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        try {
            Thread.sleep(3000);
            System.out.println("Task 1 running result:" + i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }, executor);

    CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Task 2 running result:");
        return "hello";
    }, executor);

    CompletableFuture<String> applyToEitherAsync = future1.applyToEitherAsync(future2, result -> {
        System.out.println("Task 5 begins... result:" + result);
        return result.toString() + " world";
    }, executor);
    System.out.println("Task 5 Results:" + applyToEitherAsync.get());
}

acceptEither - one of them executes + gets the return value

/**
 * Two tasks are combined, and one task is executed when it is completed
 * objectCompletableFuture.acceptEither() After one of them is executed, execute + get the return value
 * result:
 * Task 1 thread: 33
 * Task 2 thread: 34
 * Task 2 running result:
 * Task 4 begins... Result: hello
 */
@Test
public void acceptEither() {
    CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        try {
            Thread.sleep(3000);
            System.out.println("Task 1 running result:" + i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }, executor);

    CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Task 2 running result:");
        return "hello";
    }, executor);

    CompletableFuture<Void> acceptEitherAsync = future1.acceptEitherAsync(future2, result -> {
        System.out.println("Task 4 begins... result:" + result);
    }, executor);

}

runAfterEither - execute as soon as a task is completed

/**
 * Two tasks are combined, and one task is executed upon completion
 * <p>
 * objectCompletableFuture.runAfterEither() One of them finished executing
 * result:
 * Task 1 thread: 33
 * Task 2 thread: 34
 * Task 2 running result:
 * Task 3 begins...
 */
@Test
public void runAfterEither() {
    CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1 thread:" + Thread.currentThread().getId());
        int i = 10 / 2;
        try {
            Thread.sleep(3000);
            System.out.println("Task 1 running result:" + i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }, executor);

    CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 2 thread:" + Thread.currentThread().getId());
        System.out.println("Task 2 running result:");
        return "hello";
    }, executor);

    CompletableFuture<Void> runAfterEitherAsync = future1.runAfterEitherAsync(future2, () -> {
        System.out.println("Task 3 begins...");
    }, executor);
}

Multi task combination

allOf - wait until all are completed

/**
 * Multi task combination
 * allOf Wait for all tasks to complete
 * result:
 * Task 1
 * Task 3
 * Task 2
 * allOf Task 1 ------ task 2 ------ task 3
 *
 * @throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void allOf() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1");
        return "Task 1";
    }, executor);
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
            System.out.println("Task 2");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task 2";
    }, executor);
    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 3");
        return "Task 3";
    }, executor);

    CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
    //Wait for all tasks to complete
    //allOf.get();
    allOf.join();
    System.out.println("allOf" + future1.get() + "-------" + future2.get() + "-------" + future3.get());

}

anyOf - wait for one of them to finish

/**
 * Multi task combination
 * anyOf As long as one task is completed
 * result:
 * Task 1
 * anyOf--The first thing to complete is task 1
 * Task 3
 * Wait, task 2
 * Task 2
 *
 * @throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void anyOf() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 1");
        return "Task 1";
    }, executor);
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
            System.out.println("Task 2");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task 2";
    }, executor);

    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
        System.out.println("Task 3");
        return "Task 3";
    }, executor);
    CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
    System.out.println("anyOf--The first thing to finish is" + anyOf.get());
    //Waiting for future2 to print
    System.out.println("Wait, task 2");
    Thread.sleep(3000);
}

sensation abnormality

handle - catch a result or exception and return a new result

If the input parameter is a result or exception, a new result is returned

/**
 * If the input parameter is a result or exception, a new result is returned
 * result:
 * main.................start.....
 * Current thread: 33
 * main.................end.....Error return
 */
@Test
public void handle() throws ExecutionException, InterruptedException {
    System.out.println("main.................start.....");
    final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 0;
        System.out.println("Operation results:" + i);
        return i;
    }, executor).handleAsync((in, throwable) -> {
        if (throwable != null) {
            return "Error return";
        }
        return "Right";
    });
    System.out.println("main.................end....." + completableFuture.get());

}

whenComplete - perceives the result or exception and returns the corresponding information

Although whenComplete gets exception information, it cannot modify the returned information

/**
 * There is a return value and a subsequent operation whenComplete
 * <p>
 * result:
 * main.................start.....
 * Current thread: 33
 * Asynchronous completion.... The result is: null The exception is: Java util. concurrent. CompletionException: java. Lang. arithmeticexception: divide by zero
 * Wrong 2
 *
 * @throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void whenComplete() {
    System.out.println("main.................start.....");
    final CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 0;
        System.out.println("Operation results:" + i);
        return i;
    }, executor).whenComplete((result, throwable) -> {
        //Although whenComplete gets exception information, it cannot modify the returned information
        System.out.println("Asynchronous completion.... The result is:" + result + "...The exception is:" + throwable);
    });

    try {
        System.out.println("main.................end..T..." + completableFuture.get());
    } catch (InterruptedException e) {
        System.out.println("Wrong 1");
    } catch (ExecutionException e) {
        System.out.println("Wrong 2");
    }
}

exceptionally - catches an exception and returns the specified value

/**
 * Perception after method completion
 * Perceives the error and returns the specified value exceptionally
 * result:
 * main.................start.....
 * Current thread: 33
 * exceptionally executed
 * main.................end.....0
 * @throws ExecutionException
 * @throws InterruptedException
 */
@Test
public void exceptionally() throws ExecutionException, InterruptedException {
    System.out.println("main.................start.....");
    CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Current thread:" + Thread.currentThread().getId());
        int i = 10 / 0;
        System.out.println("Operation results:" + i);
        return i;
    }, executor).exceptionally(throwable -> {
        //R apply(T t);
        //exceptionally can sense the error and return the specified value
        System.out.println("Yes exceptionally");
        return 0;
    });
    System.out.println("main.................end....." + completableFuture.get());
}

Write at the end

  • If you have something to gain, praise and encourage!
  • Collect articles for easy review!
  • Comments and exchanges, mutual progress!


 

Keywords: Java Back-end Interview Programmer architecture

Added by morphy@6deex on Mon, 24 Jan 2022 21:53:29 +0200