FutureTask of JDK source code

In the previous article, we introduced the ThreadPoolExecutor thread pool. When submitting execution tasks through the submit method, we mentioned FutureTask, which can track the execution status of tasks. How is it implemented in jdk?

Preface

JDK version No.: 1.8.0«

When using ThreadPoolExecutor, if we don't need to get the status and result of task execution, we can directly call ThreadPoolExecutor.execute(Runnable command) method. However, when we need to know the status and result of task execution submitted, how do we proceed?

jdk itself provides us with the Callable interface and the Future interface to achieve the functions we need. FutureTask is the implementation class that implements the Future interface

FutureTask can be used to asynchronously get execution results or cancel execution tasks. FutureTask is given the task passed in to Runnable or Callable (you can see it in the source code of the construction method below), and its run method is called directly or put into the thread pool for execution. After that, the execution result can be obtained asynchronously through the get method of FutureTask externally. Therefore, FutureTask is suitable for more time-consuming calculations. The main thread can obtain the results after completing its own tasks to reduce the waiting consumption

Callable

Similar to Runnable, unlike Runnable, the run method has no return value. Callable provides a call generic interface. The return type is the V type passed in. At the same time, it can also throw errors. The return value is the result obtained after the task is submitted. We can get the task execution result for subsequent processing

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Future

Future is an interface, which defines the basic task operation methods to implement the Runnable or Callable interface. It is mainly divided into three categories:

  • Cancel task execution
  • Get task execution status
  • Get task execution results

The Callable interface mentioned above is also needed to obtain the execution results

public interface Future<V> {
    
    // Cancel task. mayInterruptIfRunning is whether to allow canceling tasks that are being executed but not completed
    boolean cancel(boolean mayInterruptIfRunning);
    // Whether the task is cancelled successfully. If it is cancelled successfully before the task completes normally, return true
    boolean isCancelled();
    // Whether the task has been completed. If the task is completed, return true
    boolean isDone();
    // Get the execution result. This method will block until the task is finished
    V get() throws InterruptedException, ExecutionException;
    // Get the execution result. If no result is obtained within the specified time, return null directly
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Class definition

We can see that FutureTask finally implements the Runnable and Future interfaces at the same time, so it can be executed by threads as Runnable, and can be used as Future to get the Callable return value

public class FutureTask<V> implements RunnableFuture<V>


Constant / variable

    // Task running status
    private volatile int state;
    // 7 states
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    
    /** The underlying callable; nulled out after running */
    // The task to implement the Callable interface is passed in during construction
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    // Save execution result or exception information, get method gets
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    // Thread executing task
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    // Waiting thread list
    private volatile WaitNode waiters;
    
    // Unsafe mechanics
    // CAS correlation
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

The note part of task running state transition is also described. The possible state transitions are as follows:

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED

7 states are described as follows for your understanding with reference to the above state transition:

  • NEW: indicates a newly created task or a task that has not been completed
  • COMPLETING: when the task has been completed or executed, an exception occurs, but the execution result or the cause of the exception has not been saved to the output, which belongs to the intermediate state
  • NORMAL: the task has been completed and the task execution result has been saved to the output. The status will change from COMPLETING to NORMAL, which belongs to the final status
  • Exception: if an exception occurs during task execution and the reason for the exception has been saved to output, the status will change from COMPLETING to exception, which belongs to the final status
  • CANCELLED: when the task has not been executed or started but not finished, the user calls the cancel(false) method to cancel the task and does not interrupt the task execution thread. At this time, the status will change from NEW to CANCELLED, which is also the final status
  • INTERRUPTING: when the task has not been executed or has been executed but not finished, the user calls cancel(true) method to cancel the task and interrupt the task execution thread, but before the task execution thread is interrupted, the status will change from NEW to INTERRUPTING, which belongs to the intermediate status
  • INTERRUPTED: after calling interrupt() to interrupt the task execution thread, the state will change from INTERRUPTING to INTERRUPTED, which belongs to the final state

Please refer to the following figure for understanding:



Construction method

There are two construction methods. For the tasks of the Callable interface and the Runnable interface, the task status is initially set to NEW

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        // Runnable is encapsulated as Callable
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

Looking back at the submit method in ThreadPoolExecutor, and looking at the submit method of AbstractExecutorService, we can see that there are many overloaded methods. Through newTaskFor, a layer of packaging is carried out first, while in newTaskFor, two construction methods of FutureTask are used to complete the packaging, and then the execution method is continued

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

Important method

get

For the task execution result, referring to the state transition diagram, the state before COMPLETING has not saved the result, so it needs to wait through the awaitDone blocking wait, the result of the task is saved or the time is thrown out, and then the report is called to get the result of the task.

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

awaitDone

The get method gets the task result, but it is not finished. It will block and wait through awaitDone

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // Set the waiting time calculation end time stamp
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            // Be interrupted
            if (Thread.interrupted()) {
                // Remove WaitNode from waiting queue
                removeWaiter(q);
                // Throw wrong
                throw new InterruptedException();
            }
            
            // Execution complete, exception or cancel
            // Return status value
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            // In COMPLETING state
            // The execution result has not been saved to the output after the task is completed
            // Concession operation
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            // Wait for the node to be empty, create a new one
            else if (q == null)
                q = new WaitNode();
            // Update waiters before joining the team
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // Wait timeout set
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    // Remove the node q waiting for the queue if it has timed out
                    removeWaiter(q);
                    // Return to task status
                    return state;
                }
                // Continue to block waiting before timeout
                LockSupport.parkNanos(this, nanos);
            }
            // Blocking wait
            else
                LockSupport.park(this);
        }
    }

report

Called by get method, return task execution result (i.e. output) or throw error

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

isCancelled

Judge whether the task has been CANCELLED, including the status of cancel, INTERRUPTING and INTERRUPTED. Refer to the above state transition diagram, it can be seen that the task state transition will return true after calling the cancel method

    public boolean isCancelled() {
        return state >= CANCELLED;
    }

isDone

Judge whether the task is completed, not in the NEW state, and other States belong to the completed task. No matter whether it is abnormal, cancelled or completed normally, refer to the description of the above state transition diagram

    public boolean isDone() {
        return state != NEW;
    }

cancel

Cancel task execution. The mayInterruptIfRunning parameter is allow to cancel a task that is executing but not completed

    public boolean cancel(boolean mayInterruptIfRunning) {
        // If the status is not NEW creation / task execution or is in NEW status and the update status fails, it will directly return false. Only after the update is successful, can the following code continue to execute
        // Refer to task state transition diagram
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                // Get thread to execute interrupt when true
                // That is, interrupt the executed task thread
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    // CAS update state
                    // Corresponding to the last one in the state transition diagram
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            // Subsequent processing
            finishCompletion();
        }
        return true;
    }

run

The execution method of the final task Thread is the same as the run method of Thread. The main process is as follows:

  • Judge the current task status. If it has been executed or cancelled, it will return directly. If the status is NEW, it will also return directly if CAS fails to update the runner
  • Perform tasks
  • Get task execution results
  • Set task status and execution result
    public void run() {
        // If the task has been completed or cancelled, return directly
        // If the task is in NEW status, try to save the current execution thread to the runner. If it succeeds, continue to execute. If it fails, return directly
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            // Get specific execution tasks
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // Execute tasks and get execution results
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    // Execute exception set execution result to null
                    // The state transition and subsequent processing are completed through setException
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    // After normal execution, the state transition and subsequent processing are completed through set
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                // Interrupt handling
                handlePossibleCancellationInterrupt(s);
        }
    }

set

Set the result after the normal execution of the task, which is called in the run method. We can see that CAS in the method updates the execution status of the task twice, corresponding to the first situation in the state transition diagram: New - > completing - > normal

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // outcome save task results
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            // Subsequent processing
            finishCompletion();
        }
    }

setException

Similar to set, it is mainly called when there is an exception in task execution, which also corresponds to the second situation in the state transition diagram: New - > completing - > excel

    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

finishCompletion

If the task is executed normally, if it is abnormal or cancelled, it will execute. Traverse the waiters list, wake up the waiting thread in all nodes, and then leave the callable in it empty

    private void finishCompletion() {
        // assert state > COMPLETING;
        // ergodic
        for (WaitNode q; (q = waiters) != null;) {
            // Empty space
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        // Release. awaitDone has the corresponding LockSupport.park
                        LockSupport.unpark(t);
                    }
                    // Empty space
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

runAndReset

Similar to run, the difference is that the task execution result will not be saved, and if the normal execution is completed, the set method will not be called to update the task status, similar to Runnable task

    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    // setException will be called in case of throw error
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

removeWaiter

Remove the node node in the waiting list waiters

    private void removeWaiter(WaitNode node) {
        // Non empty calibration
        if (node != null) {
            node.thread = null;
            // Update waiters
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }

summary

So far, the source code of FutureTask has been basically analyzed, which is not too complex in essence. To implement the tasks of the Callable() interface, then submit the tasks to the thread pool for execution. We only need to get the execution results through the get method, and at the same time, we can cancel the execution of the tasks, which is convenient for operation. FutureTask itself has seven states, which can be understood by referring to the state transition diagram or better understanding of its source code implementation

If there is any problem in the above content, please point out that the author will correct it in time after verification. Thank you

Published 12 original articles, praised 0, visited 120
Private letter follow

Keywords: JDK Excel

Added by vocoder on Fri, 28 Feb 2020 05:15:24 +0200