Future and Callable principle

This paper mainly introduces the principles of Future and Callable, that is, how to obtain thread execution results outside the online process and its principle.

1 example

1.1 example 1

The following example code executes a Callable through the thread pool, and then obtains the return result through Future.

public static void main(String[] args) throws Exception {
    Callable<Integer> callable = () -> {
        Thread.sleep(1000);
        Random random = new Random();
        return random.nextInt(100);
    };
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    Future<Integer> future = executorService.submit(callable);
    System.out.println(DateUtil.getCurrentTime() + " ready to do task");
    Integer result = future.get();
    System.out.println(DateUtil.getCurrentTime() + " get task result! result=" + result);
}

The execution results of this method are as follows. It can be seen that the main thread waits for the task to execute. After 1s, the thread completes execution. After returning the results, the main thread obtains the results and outputs them.

14:45:57:090 ready to do task
14:45:58:123 get task result! result=46

1.2 example 2

In the following example, we create and execute threads ourselves:

public static void main(String[] args) throws Exception {
    Callable<Integer> callable = () -> {
        Thread.sleep(1000);
        Random random = new Random();
        return random.nextInt(100);
    };

    FutureTask<Integer> task = new FutureTask<>(callable);
    Thread thread = new Thread(task);
    System.out.println(DateUtil.getCurrentTime() + " ready to do task");
    thread.start();
    Integer result = task.get();
    System.out.println(DateUtil.getCurrentTime() + " get task result! result=" + result);
}

The output results are as follows

15:51:47:615 ready to do task
15:52:13:885 get task result! result=31

2 Principle Analysis

The reason why the above two examples can obtain the execution results of threads is the same. They all come from the support of Callable, Future, Runnable and FutureTask. Next, we will analyze how to implement them.

2.1 principle overview

One thread (e.g. threadA) obtains the execution result of another thread (e.g. threadB). This function is implemented based on two points: convert the execution of Runnable#run to the call of Callable#call method, and store the returned result; Threads that wait for results are managed by waiting queues (similar to AQS).
For the FutureTask objects we commonly use, it can be understood that FutureTask acts as a converter between Runnable and Callable to convert the execution of the thread's Runnable#run method to the call of the Callable#call method. Because the run method is a method for executing tasks in the thread, and the call itself returns results, FutureTask only needs to execute the call method when running the run method, Then save the execution results to a place so that other threads can obtain the execution results of other threads through Future.
In short, as shown in the figure below:

  • Create a Callable object and add tasks to its call() method.
  • Create FutureTask (taskB) through Callable.
  • Create a thread (threadB) through task, and then execute the thread start() method.
  • The operating system schedules and executes the run() method of threadB. Because FutureTask implements the Runnable interface, the run() method in FutureTask (taskB) is executed at this time.
  • threadA through taskb The get () method obtains the execution result of threadB. If threadB is not completed, threadA will be suspended.
  • The main logic of the run() method of FutureTask is to execute the call() method of Callable; Then set the return result of the call method to the outcome field of FutrueTask; Then wake up the thread waiting for the running result of this thread (i.e. threadA).

2.2 Callable

The Callable interface is very simple, only declaring a call method.

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

2.3 Future

Future can represent the result of an asynchronous calculation, which can be obtained through the get method: if the calculation has not been completed, the current thread will be suspended; If the calculation has been completed, the current thread will wake up and get this result.
The following is a sample code using Futrue provided by Jdk. This code is very similar to "example 1" above, so I won't introduce it more.

interface ArchiveSearcher {
    String search(String target);
}

ExecutorService executor = Executors.newSingleThreadExecutor();
ArchiveSearcher searcher = (target) -> {
    return "query=" + target + "  content=hello world";
};

void showSearch(final String target) throws InterruptedException {
    Future<String> future = executor.submit(() -> {
        return searcher.search(target);
    });

    try {
        displayText(future.get()); // use future
    } catch (ExecutionException ex) {
        cleanup();
        return;
    }
}

2.4 FutureTask

2.4.1 inheritance structure

FutureTask is the most commonly used subclass of Future. Its inheritance results are shown in the following figure:

It can be seen from the inheritance relationship that FutureTask implements Runnable, so a thread can be created through FutureTask to run some specified tasks; FutrueTask implements Future, so it can return the results of asynchronous calculation.

2.4.2 encapsulation of Runnable#run method

FutureTask implements the Runnable interface and the run method, which is the most important reason why it can return thread results. Because the Callable#call method will be called in the run method and the results will be saved, so that subsequent threads can obtain the saved results as long as they can access FutureTask. Next, we will discuss several methods in detail.

2.4.2.1 run() method

The main logic of this method is:

  • Check the status. If the thread has been started, it will return directly.
  • Execute the call() method of Callable.
  • Through the set method, update the execution result to the outcome field of FutrueTask, and wake up the thread waiting for the result.
    The source code is as follows:
public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
2.4.2.2 set() method

The main logic of the set() method is:

  • Updates the status of the thread to the completion status
  • Update the return result of the call method to the outcome field of FutureTask;
  • Wake up the thread waiting for the result of this thread.
    The source code is as follows:
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

2.4.3 manage threads waiting for results through the waiting queue

2.4.3.1 manage pending threads

Suppose that there are three threads waiting for execution results, and the order of calling get() to obtain results is thread1, thread2 and thread3 (assuming that threads are suspended in this order), then the waiters in FutureTask will point to a linked list, as shown below:

When each thread obtains the result through the get() method, because the task has not been completed, they need to enter the waiting state, that is, they are suspended. Before being suspended, each thread will create a WaitNode node and hang it on the waiters property. When the thread execution is completed, find the suspended thread through the linked list, then wake up these suspended threads, and finally return the execution result of the thread

2.4.3.2 get() method

Threads can obtain the execution results of other threads through the FutureTask#get method. The value returned here is actually the value saved in the run method above.
As can be seen from the source code, as long as the task is not completed, the state will not be completed, and the thread will be suspended through awaitDone.

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}
2.4.3.3 awaitDone() method

This method is used to suspend the thread that needs to wait. For the suspended logic, please refer to the "managing suspended threads" section above and the following source code. Note that if the thread is interrupted or the waiting time exceeds the time limit, the waiting queue will be cleaned up and the waiting thread will be awakened.

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}
2.4.3.4 finishcompletement() method

In the set() method, we first save the results, and then clean up the pending threads through the finishcompletement method: clean up the linked list and wake up the threads.

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

Keywords: Java Multithreading

Added by erikw46 on Sun, 16 Jan 2022 03:16:41 +0200