Java concurrent asynchronous programming, which used to work with ten interfaces, now only needs one interface

Author: Jin Cheng
Source: juejin.im/post/5d3c46d2f265da1b9163dbce

what? Yes, you didn't hear wrong or read wrong.. multithreading executes tasks concurrently and collects results ~ ~ no longer worry

introduction

Let's take a look at some APP acquisition data, and so on. A page obtains more than N, up to about 10 user behavior data, such as the number of likes, published articles, likes, messages, concerns, collections, fans, cards, coupons, red envelopes... It's really much ~ let's see some pictures:

We usually need 10 + interfaces to get data (because when you write 10 + queries together, it is estimated that it will take half a minute to respond). N multiple interfaces on a page are really tired of the front-end baby, and it is also tired to turn on multi threads at the front-end. We need to volume the front-end babies as the back-end. After all, there is a saying "why bother programmers ~"

Today, we can also use an interface to return these data quickly, so as to solve the problems caused by serial programming and blocking programming~

Multithreading executes tasks concurrently and collects results

Today's pig feet are: Future, FutureTask, ExecutorService

  • Using FutureTask to obtain results is suitable for all ages, that is, CPU consumption. FutureTask can also be locked (it implements the semantics of Future and represents an abstract and computable result). By taking callable (equivalent to a runnable that can generate results) as an attribute, and then inheriting runnable as an actuator, FutureTask is actually an asynchronous task actuator that supports cancellation behavior.
  • Callable is a callback interface, which can generically declare the return type, and Runnable is the method executed by the thread. This is very simple ~ if you want to know more about it, you can go in and look at the source code ~   because it's really simple ~
  • FutureTask implements Future, provides start, cancel, query and other functions, and implements the Runnable interface, which can be submitted to threads for execution.
  • Three board axe status of Java concurrency tool class, queue, CAS

state

/**
 * The run state of this task, initially NEW.  The run state
 * transitions to a terminal state only in methods set,
 * setException, and cancel.  During completion, state may take on
 * transient values of COMPLETING (while outcome is being set) or
 * INTERRUPTING (only while interrupting the runner to satisfy a
 * cancel(true)). Transitions from these intermediate to final
 * states use cheaper ordered/lazy writes because values are unique
 * and cannot be further modified.
 *
 * Possible state transitions:        //Possible state transition process
 * NEW -> COMPLETING -> NORMAL        // Create -- > complete -- > Normal
 * NEW -> COMPLETING -> EXCEPTIONAL   // Create -- > complete -- > exception
 * NEW -> CANCELLED                   // Create -- > cancel
 * NEW -> INTERRUPTING -> INTERRUPTED // Creating -- > interrupt -- > interrupt end
 */

private volatile int state;                  // Actuator status

private static final int NEW = 0;            // The initial value is guaranteed by the constructor
private static final int COMPLETING = 1;     // Setting task results when finished in progress
private static final int NORMAL = 2;         // Normal completion of tasks
private static final int EXCEPTIONAL = 3;    // An exception occurred during task execution
private static final int CANCELLED = 4;      // The task has been cancelled
private static final int INTERRUPTING = 5;   // The thread that is running the task when the interrupt is in progress
private static final int INTERRUPTED = 6;    // Interrupt end task interrupted

/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

If you don't understand, just look at the picture:

public interface Future<T> {
    /**
    *Cancel task
    *@param mayInterruptIfRunning
    *Whether to allow canceling tasks that are being executed but have not been completed. If true is set, it means that tasks in the process of execution can be cancelled
    *Returns true if the task is executing
    *If the task has not been executed, return true whether mayInterruptIfRunning is true or false
    *If the task has been completed, return false whether mayInterruptIfRunning is true or false
    */
    boolean cancel(boolean mayInterruptIfRunning);
    /**
    *Whether the task was successfully cancelled. If it was successfully cancelled before the task was completed normally, true will be returned
    */
    boolean isCancelled();
    /**
    *Is the task completed
    */
    boolean isDone();
    /**
    *Get execution results through blocking
    */
    T get() throws InterruptedException, ExecutionException;
    /**
    *Get execution results through blocking. If it does not return within the specified time, null is returned
    */
    T get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future

  • cancle can stop the execution of the task, but it may not succeed. See the return value true or false
  • Get blocks the result of the callable task, that is, get blocks the calling thread until the calculation is completed and the result is returned
  • isCancelled successfully
  • isDone complete

Key notes:

Future. Get () gets the value of the execution result, which depends on the execution status. If the task is completed, it will return the result immediately. Otherwise, it will be blocked until the task enters the completion status, and then return the result or throw an exception.

"Run complete" indicates all possible end states of calculation, including normal end, end due to cancellation and end due to exception. When it enters the completion state, it will stop in this state. As long as the state is not in the NEW state, it means that the task has been executed.

FutureTask is responsible for transferring the calculation results from the thread executing the task to the thread calling the thread, and ensures the safe release of the results during the transfer process

UNSAFE lock free programming technology ensures thread safety ~ in order to maintain the CPU consumption of lock free programming, the status flag is used to reduce the CPU pressure during idling

  • Task master: callable
  • Task Performer: runner
  • Result of task: outcome
  • Get the result of the task: state + outcome + waiters
  • Interrupt or cancel a task: state + runner + waiters

run method

1. Check the state. If it is not NEW, it indicates that it has been started and returns directly; Otherwise, set the runner as the current thread. If successful, continue. Otherwise, return.

2. Call the Callable.call() method to execute the task. If it succeeds, call the set(result) method. If it fails, call the setException(ex) method. Finally, set the state and call the finishcompletement () method to wake up the threads blocking the get() method.

3. As the comment shows, what happens if you omit the ran variable and move the "set(result);" statement to the "ran = true;" statement in the try code block? First of all, from the perspective of code logic, there is no problem. However, considering that the "set(result);" method throws an exception or even an error? The set() method will eventually call the user-defined done() method, so it cannot be omitted.

4. If the state is INTERRUPTING, actively give up the CPU and wait for other threads to complete the interrupt process. See the handlepossible cancellationinterrupt (int s) method.

public void run() {
        // UNSAFE.compareAndSwapObject, CAS ensures that the Callable task is only executed once without lock programming
        if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable; // Get the job
            if (c != null && state == NEW) { // Only when the task is not empty and the actuator status is the initial value can it be executed; If you cancel, it will not be executed
                V result;
                boolean ran; // Record whether the execution was successful
                try {
                    result = c.call(); // Perform tasks
                    ran = true; // success
                } catch (Throwable ex) {
                    result = null; // Exception, clear result
                    ran = false; // fail
                    setException(ex); // Record exception
                }
                if (ran) // Question: can the ran variable be omitted? set(result); Move into the try block?
                    set(result); // Setting results
            }
        } finally {
            runner = null; // Until the set state, the runner is always non empty. In order to prevent concurrent calls to the run() method.
            int s = state;
            if (s >= INTERRUPTING) // There are other threads to interrupt the current thread. Let the CPU out. Wait a minute
                handlePossibleCancellationInterrupt(s);
        }
    }
private void handlePossibleCancellationInterrupt(int s) {
     if (s == INTERRUPTING) // When state is INTERRUPTING
         while (state == INTERRUPTING) // Indicates that a thread is interrupting the current thread
             Thread.yield(); // Give up the CPU and wait for the interrupt
}

Let's talk a little more: the run method focuses on the following things:

  • Set the runner property to the thread that is currently executing the run method
  • Call the call method of the callable member variable to execute the task
  • Set the execution result outcome. If the execution is successful, the outcome will save the execution result; If an exception occurs during execution, the exception is saved in the outcome. Before setting the result, set the state state to the intermediate state
  • After the assignment of the outcome is completed, set the state state to the termination state (NORMAL or EXCEPTIONAL)
  • Wake up all waiting threads in Treiber stack
  • Clean up (waiters, callable, runner set to null)
  • Check for missing interrupts, and if so, wait for the interrupt status to complete.

How can we eliminate the get method and block the acquisition all the time? See: awaitDone

public V get() throws InterruptedException, ExecutionException {
    int s = state; // Actuator status
     if (s <= COMPLETING) // If the status is less than or equal to COMPLETING, the task is executing and needs to wait
         s = awaitDone(false, 0L); // wait for
     return report(s); // Report results
}

By the way, secretly look at get(long, TimeUnit), which is the method extension of get, which increases the timeout time. After the timeout, I am angry and throw exceptions before I get it

public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null) // Parameter verification
        throw new NullPointerException();
    int s = state; // Actuator status
    if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) // If the status is less than or equal to COMPLETING, it indicates that the task is being executed and needs to wait; Wait for the specified time, and the state is still less than or equal to COMPLETING
        throw new TimeoutException(); // Throw timeout exception
    return report(s); // Report results
}

Then look at awaitDone. You should know that those who can write the loop while(true)|for (;;) are experts~

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L; // Calculate deadline
    WaitNode q = null; // Waiting node
    boolean queued = false; // Have you joined the team
    for (;;) {
        if (Thread.interrupted()) { // If the current thread has marked an interrupt, the node will be removed directly and an interrupt exception will be thrown
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state; // Actuator status
        if (s > COMPLETING) { // If the status is greater than COMPLETING, it indicates that the task has been completed or cancelled. Return directly
            if (q != null)
                q.thread = null; // Reset thread properties
            return s; // return
        } else if (s == COMPLETING) // If the status is equal to COMPLETING, it means that the results are being sorted and the spin is waiting for a while
            Thread.yield();
        else if (q == null) // Initial, build node
            q = new WaitNode();
        else if (!queued) // If you haven't joined the team, CAS will join the team
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
        else if (timed) { // Allow timeout
            nanos = deadline - System.nanoTime(); // Calculate waiting time
            if (nanos <= 0L) { // overtime
                removeWaiter(q); // Remove node
                return state; // Return results
            }
            LockSupport.parkNanos(this, nanos); // Thread blocking specified time
        } else
            LockSupport.park(this); // Blocking thread
    }
}

So far, I won't be wordy about thread task arrangement and acquisition. I still have a lot to explore. After all, paid chat is tense, so I won't go into details

queue

Next, let's look at the queue. In FutureTask, the implementation of the queue is a one-way linked list, which represents the collection of all threads waiting for the completion of Task execution. We know that FutureTask implements the Future interface and can obtain the execution results of "Task". What if the Task has not been completed when the results are obtained? Then the thread that gets the result will hang in a waiting queue until the Task is awakened after execution. This is somewhat similar to sync queue in AQS. In the following analysis, you can compare their similarities and differences.

As we said earlier, when using queues in concurrent programming, we usually wrap the current thread into some type of data structure and throw it into the waiting queue. Let's first look at the structure of each node in the queue:

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

It can be seen that this WaitNode is much simpler than the nodes in the bidirectional linked list used by AQS sync queue. It only contains the thread attribute of a recording thread and the next attribute pointing to the next Node.

It is worth mentioning that the one-way linked list in FutureTask is used as a stack, specifically as a Treiber stack. If you don't know what the Treiber stack is, you can simply regard it as a thread safe stack, which uses CAS to complete the operation of entering and exiting the stack (see this article for further information).

Why use a thread safe stack? Because multiple threads may get the execution results of the task at the same time. If the task is still executing, these threads will be packaged as WaitNode and thrown to the top of the Treiber stack, that is, to complete the stacking operation. In this way, multiple threads may be stacked at the same time, Therefore, it is necessary to use CAS operation to ensure thread safety in the stack, and the same is true for out of the stack.

Since the queue in FutureTask is essentially a Treiber stack, you only need a pointer to the top node of the stack to use this queue. In FutureTask, it is the waiters attribute:

/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

In fact, it is the head node of the whole one-way linked list.

To sum up, the queue structure used in FutureTask is as follows:

CAS operation

CAS operations are mostly used to change state, and FutureTask is no exception. We generally initialize the offset of attributes requiring CAS operation in static code blocks:

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = FutureTask.class;
        stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));
        runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));
        waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

From this static code block, we can also see that CAS operations mainly target three attributes, including state, runner and waiters, indicating that these three attributes will be accessed by multiple threads at the same time. The state attribute represents the status of the task, and the waiters attribute represents the pointer to the top node of the stack. We have analyzed these two above.

The runner property represents the thread that executes the "Task" in FutureTask. Why do you need a property to record the thread executing the Task? This is to prepare for interrupting or canceling the Task. Only when we know who the thread executing the Task is, can we interrupt it.

After defining the offset of the attribute, the next step is the CAS operation itself. In FutureTask, the CAS operation finally calls the compareAndSwapXXX method of Unsafe class. As for Unsafe, the paid code text will not be repeated here.

Actual combat drill

All explanations without examples are playing hooligans > > > onion, ginger and foam ~ ~ join the source of life

The actual combat project takes springboot as the project scaffold, github address:

https://github.com/javastacks/spring-boot-best-practice

1.MyFutureTask implementation class

A thread pool is defined internally for task scheduling, thread management and thread reuse. You can configure it according to your actual project situation

Thread scheduling example: core thread 8, maximum thread 20, keep alive time 30s, storage queue 10 has daemon thread rejection policy: return the overloaded task to the caller

explain:

By default, the number of core threads (8) is used to execute tasks. If the number of tasks exceeds the number of core threads, they will be thrown into the queue. When the queue (10) is full, new threads will be started. The maximum number of new threads is 20. When the task is completed, the newly opened threads will survive for 30s. If there is no task, they will die, and the thread pool will return to the number of core threads

import com.boot.lea.mybot.dto.UserBehaviorDataDTO;
import com.boot.lea.mybot.service.UserService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.*;

/**
 * @author Lijing
 * @date 2019 July 29
 */
@Slf4j
@Component
public class MyFutureTask {

    @Resource
    UserService userService;

    /**
     * Core thread 8 maximum thread 20 live time 30s storage queue 10 has a daemon thread rejection policy: return the overloaded task to the caller
     */
    private static ExecutorService executor = new ThreadPoolExecutor(8, 20,
            30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10),
            new ThreadFactoryBuilder().setNameFormat("User_Async_FutureTask-%d").setDaemon(true).build(),
            new ThreadPoolExecutor.CallerRunsPolicy());

    @SuppressWarnings("all")
    public UserBehaviorDataDTO getUserAggregatedResult(final Long userId) {

        System.out.println("MyFutureTask Thread:" + Thread.currentThread());

        long fansCount = 0, msgCount = 0, collectCount = 0,
                followCount = 0, redBagCount = 0, couponCount = 0;

//        fansCount = userService.countFansCountByUserId(userId);
//        msgCount = userService.countMsgCountByUserId(userId);
//        collectCount = userService.countCollectCountByUserId(userId);
//        followCount = userService.countFollowCountByUserId(userId);
//        redBagCount = userService.countRedBagCountByUserId(userId);
//        couponCount = userService.countCouponCountByUserId(userId);

        try {

            Future<Long> fansCountFT = executor.submit(() -> userService.countFansCountByUserId(userId));
            Future<Long> msgCountFT = executor.submit(() -> userService.countMsgCountByUserId(userId));
            Future<Long> collectCountFT = executor.submit(() -> userService.countCollectCountByUserId(userId));
            Future<Long> followCountFT = executor.submit(() -> userService.countFollowCountByUserId(userId));
            Future<Long> redBagCountFT = executor.submit(() -> userService.countRedBagCountByUserId(userId));
            Future<Long> couponCountFT = executor.submit(() -> userService.countCouponCountByUserId(userId));

            //get blocking
            fansCount = fansCountFT.get();
            msgCount = msgCountFT.get();
            collectCount = collectCountFT.get();
            followCount = followCountFT.get();
            redBagCount = redBagCountFT.get();
            couponCount = couponCountFT.get();

        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            log.error(">>>>>>Aggregation query user aggregation information exception:" + e + "<<<<<<<<<");
        }
        UserBehaviorDataDTO userBehaviorData =
                UserBehaviorDataDTO.builder().fansCount(fansCount).msgCount(msgCount)
                        .collectCount(collectCount).followCount(followCount)
                        .redBagCount(redBagCount).couponCount(couponCount).build();
        return userBehaviorData;
    }

}

2.service business method

For conventional business query methods, we delay each method in order to achieve special effects and see the actual effect

import com.boot.lea.mybot.mapper.UserMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class UserServiceImpl implements UserService {

    @Autowired
    UserMapper userMapper;

    @Override
    public long countFansCountByUserId(Long userId) {
        try {
            Thread.sleep(10000);
            System.out.println("obtain FansCount===sleep:" + 10 + "s");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("UserService obtain FansCount Thread  " + Thread.currentThread().getName());
        return 520;
    }

    @Override
    public long countMsgCountByUserId(Long userId) {
        System.out.println("UserService obtain MsgCount Thread  " + Thread.currentThread().getName());
        try {
            Thread.sleep(10000);
            System.out.println("obtain MsgCount===sleep:" + 10 + "s");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 618;
    }

    @Override
    public long countCollectCountByUserId(Long userId) {
        System.out.println("UserService obtain CollectCount Thread  " + Thread.currentThread().getName());
        try {
            Thread.sleep(10000);
            System.out.println("obtain CollectCount==sleep:" + 10 + "s");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 6664;
    }

    @Override
    public long countFollowCountByUserId(Long userId) {
        System.out.println("UserService obtain FollowCount Thread  " + Thread.currentThread().getName());
        try {
            Thread.sleep(10000);
            System.out.println("obtain FollowCount===sleep:" + 10+ "s");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return userMapper.countFollowCountByUserId(userId);
    }

    @Override
    public long countRedBagCountByUserId(Long userId) {
        System.out.println("UserService obtain RedBagCount Thread  " + Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(4);
            System.out.println("obtain RedBagCount===sleep:" + 4 + "s");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 99;
    }

    @Override
    public long countCouponCountByUserId(Long userId) {
        System.out.println("UserService obtain CouponCount Thread  " + Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(8);
            System.out.println("obtain CouponCount===sleep:" + 8+ "s");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 66;
    }
}

3.controller call

/**
 * @author LiJing
 * @ClassName: UserController
 * @Description: User controller
 * @date 2019/7/29 15:16
 */
@RestController
@RequestMapping("user/")
public class UserController {

    @Autowired
    private UserService userService;

    @Autowired
    private MyFutureTask myFutureTask;

    @GetMapping("/index")
    @ResponseBody
    public String index() {
        return "User module started successfully~~~~~~~~";
    }

    //http://localhost:8080/api/user/get/data?userId=4

    @GetMapping("/get/data")
    @ResponseBody
    public UserBehaviorDataDTO getUserData(Long userId) {
        System.out.println("UserController Thread:" + Thread.currentThread());
        long begin = System.currentTimeMillis();
        UserBehaviorDataDTO userAggregatedResult = myFutureTask.getUserAggregatedResult(userId);
        long end = System.currentTimeMillis();
        System.out.println("===============Total time:" + (end - begin) /1000.0000+ "second");
        return userAggregatedResult;
    }

}

We start the project: start the call http://localhost:8080/api/user/get/data?userId=4

When our thread pool is configured as: core thread 8, maximum thread 20, keep alive time 30s, and store queue 10, our test results are as follows:

Result: we can see that the execution thread of each server method is initiated from the thread pool. The thread name is User_Async_FutureTask-%d, the total time is reduced from 52 seconds to 10 seconds, which depends on the most time-consuming method query time

Then let's release the comment code for serial query and test:

Results: we use serial query, and the result summary will reach 52 seconds. That's terrible~~

summary

When using FutureTask, we call back the task runner in the form of caller to block the acquisition. Finally, we summarize the results, that is, we have completed the multi-threaded asynchronous call of our business methods

Future<Long> fansCountFT = executor.submit(new Callable<Long>() {
    @Override
    public Long call() throws Exception {
        return userService.countFansCountByUserId(userId);
    }
});

Here is only a simple example. Specific business methods can be defined for specific projects for merging. In fact, after JDK1.8, executorcompletionservice, forkjointask and completable future can implement the above methods. We will do some cases of the use of these methods in the future. We hope you will pay attention to them. If there are deficiencies in the article, please correct them~

Dessert

So: we need to use Spring's asynchronous programming. There are many ways of asynchronous programming: for example, the common Future sync, completable Future. Supplyasync, @ async, ha ha, actually can't do without Thread.start()..., etc. Let me tell a joke:

Dad has two children: Xiao Hong and Xiao Ming. Dad wants to drink. He asks Xiaohong to buy wine. Xiaohong goes out. Then Dad suddenly wanted to smoke, so dad asked Xiao Ming to buy cigarettes. In the object-oriented thinking, we usually take shopping and then buying back as a method. If we follow the sequential structure or use multi-threaded synchronization, Xiaoming must wait for Xiaohong to finish the shopping operation if he wants to buy cigarettes. This undoubtedly increases the cost of time (what if Dad holds his urine?). Asynchronous is to solve this problem. You can give instructions to Xiaohong and Xiaoming respectively to let them go shopping, and then you can do your own thing by yourself and receive the results when they buy it back.

package com.boot.lea.mybot.futrue;

/**
 * @ClassName: TestFuture
 * @Description: Demonstrate asynchronous programming
 * @author LiJing
 * @date 2019/8/5 15:16
 */
@SuppressWarnings("all")
public class TestFuture {
    static ExecutorService executor = Executors.newFixedThreadPool(2);

    public static void main(String[] args) throws InterruptedException {
        //Thread pool for two threads
        //For Xiaohong's wine shopping task, future2 here represents Xiaohong's future operation and returns the result of Xiaohong's shopping operation
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Dad: Xiao Hong, go buy a bottle of wine!");
            try {
                System.out.println("Xiao Hong went out to buy wine. The girls ran slowly. It's estimated that 5 s I won't come back until...");
                Thread.sleep(5000);
                return "I bought it back!";
            } catch (InterruptedException e) {
                System.err.println("Something happened on Xiaohong road");
                return "See you in the afterlife!";
            }
        }, executor);

        //For Xiaoming's task of buying cigarettes, future1 here represents what will happen to Xiaoming in the future. The return value is the result of Xiaoming's shopping
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Dad: Xiao Ming, go buy a pack of cigarettes!");
            try {
                System.out.println("Xiao Ming has gone out to buy cigarettes. It may take 3 hours s Come back later...");
                Thread.sleep(3000);

                throw new InterruptedException();
//                return "I bought it!";
            } catch (InterruptedException e) {
                System.out.println("Something unexpected happened on Xiaoming road!");
                return "This is a message from my client. I'm gone.";
            }
        }, executor);

        //Get Xiaohong's wine buying results, get the results from Xiaohong's operations, and print the results
        future2.thenAccept((e) -> {
            System.out.println("Xiao Hong said:" + e);
        });
        //Get the result of Xiaoming buying cigarettes
        future1.thenAccept((e) -> {
            System.out.println("Xiao Ming said:" + e);
        });

        System.out.println("Dad: wait, wait for the beautiful scenery of the West Lake in March......");
        System.out.println("dad: I felt bored and even went to the bathroom.");
        Thread.currentThread().join(9 * 1000);
        System.out.println("Dad: finally bought it for me......huo Alcohol");
        //Close thread pool
        executor.shutdown();
    }
}

Operation results:

Recent hot article recommendations:

1.1000 + Java interview questions and answers (2021 latest version)

2.Stop playing if/ else on the full screen. Try the strategy mode. It's really fragrant!!

3.what the fuck! What is the new syntax of xx ≠ null in Java?

4.Spring Boot 2.6 was officially released, a wave of new features..

5.Java development manual (Songshan version) is the latest release. Download it quickly!

Feel good, don't forget to like + forward!

Keywords: Java

Added by toshesh on Wed, 08 Dec 2021 08:25:43 +0200