In the previous article, we introduced the ThreadPoolExecutor thread pool. When submitting execution tasks through the submit method, we mentioned FutureTask, which can track the execution status of tasks. How is it implemented in jdk?
Preface
JDK version No.: 1.8.0«
When using ThreadPoolExecutor, if we don't need to get the status and result of task execution, we can directly call ThreadPoolExecutor.execute(Runnable command) method. However, when we need to know the status and result of task execution submitted, how do we proceed?
jdk itself provides us with the Callable interface and the Future interface to achieve the functions we need. FutureTask is the implementation class that implements the Future interface
FutureTask can be used to asynchronously get execution results or cancel execution tasks. FutureTask is given the task passed in to Runnable or Callable (you can see it in the source code of the construction method below), and its run method is called directly or put into the thread pool for execution. After that, the execution result can be obtained asynchronously through the get method of FutureTask externally. Therefore, FutureTask is suitable for more time-consuming calculations. The main thread can obtain the results after completing its own tasks to reduce the waiting consumption
Callable
Similar to Runnable, unlike Runnable, the run method has no return value. Callable provides a call generic interface. The return type is the V type passed in. At the same time, it can also throw errors. The return value is the result obtained after the task is submitted. We can get the task execution result for subsequent processing
@FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
Future
Future is an interface, which defines the basic task operation methods to implement the Runnable or Callable interface. It is mainly divided into three categories:
- Cancel task execution
- Get task execution status
- Get task execution results
The Callable interface mentioned above is also needed to obtain the execution results
public interface Future<V> { // Cancel task. mayInterruptIfRunning is whether to allow canceling tasks that are being executed but not completed boolean cancel(boolean mayInterruptIfRunning); // Whether the task is cancelled successfully. If it is cancelled successfully before the task completes normally, return true boolean isCancelled(); // Whether the task has been completed. If the task is completed, return true boolean isDone(); // Get the execution result. This method will block until the task is finished V get() throws InterruptedException, ExecutionException; // Get the execution result. If no result is obtained within the specified time, return null directly V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Class definition
We can see that FutureTask finally implements the Runnable and Future interfaces at the same time, so it can be executed by threads as Runnable, and can be used as Future to get the Callable return value
public class FutureTask<V> implements RunnableFuture<V>
Constant / variable
// Task running status private volatile int state; // 7 states private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ // The task to implement the Callable interface is passed in during construction private Callable<V> callable; /** The result to return or exception to throw from get() */ // Save execution result or exception information, get method gets private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ // Thread executing task private volatile Thread runner; /** Treiber stack of waiting threads */ // Waiting thread list private volatile WaitNode waiters; // Unsafe mechanics // CAS correlation private static final sun.misc.Unsafe UNSAFE; private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = FutureTask.class; stateOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("state")); runnerOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("runner")); waitersOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiters")); } catch (Exception e) { throw new Error(e); } }
The note part of task running state transition is also described. The possible state transitions are as follows:
- NEW -> COMPLETING -> NORMAL
- NEW -> COMPLETING -> EXCEPTIONAL
- NEW -> CANCELLED
- NEW -> INTERRUPTING -> INTERRUPTED
7 states are described as follows for your understanding with reference to the above state transition:
- NEW: indicates a newly created task or a task that has not been completed
- COMPLETING: when the task has been completed or executed, an exception occurs, but the execution result or the cause of the exception has not been saved to the output, which belongs to the intermediate state
- NORMAL: the task has been completed and the task execution result has been saved to the output. The status will change from COMPLETING to NORMAL, which belongs to the final status
- Exception: if an exception occurs during task execution and the reason for the exception has been saved to output, the status will change from COMPLETING to exception, which belongs to the final status
- CANCELLED: when the task has not been executed or started but not finished, the user calls the cancel(false) method to cancel the task and does not interrupt the task execution thread. At this time, the status will change from NEW to CANCELLED, which is also the final status
- INTERRUPTING: when the task has not been executed or has been executed but not finished, the user calls cancel(true) method to cancel the task and interrupt the task execution thread, but before the task execution thread is interrupted, the status will change from NEW to INTERRUPTING, which belongs to the intermediate status
- INTERRUPTED: after calling interrupt() to interrupt the task execution thread, the state will change from INTERRUPTING to INTERRUPTED, which belongs to the final state
Please refer to the following figure for understanding:
Construction method
There are two construction methods. For the tasks of the Callable interface and the Runnable interface, the task status is initially set to NEW
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { // Runnable is encapsulated as Callable this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
Looking back at the submit method in ThreadPoolExecutor, and looking at the submit method of AbstractExecutorService, we can see that there are many overloaded methods. Through newTaskFor, a layer of packaging is carried out first, while in newTaskFor, two construction methods of FutureTask are used to complete the packaging, and then the execution method is continued
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
Important method
get
For the task execution result, referring to the state transition diagram, the state before COMPLETING has not saved the result, so it needs to wait through the awaitDone blocking wait, the result of the task is saved or the time is thrown out, and then the report is called to get the result of the task.
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
awaitDone
The get method gets the task result, but it is not finished. It will block and wait through awaitDone
private int awaitDone(boolean timed, long nanos) throws InterruptedException { // Set the waiting time calculation end time stamp final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // Be interrupted if (Thread.interrupted()) { // Remove WaitNode from waiting queue removeWaiter(q); // Throw wrong throw new InterruptedException(); } // Execution complete, exception or cancel // Return status value int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // In COMPLETING state // The execution result has not been saved to the output after the task is completed // Concession operation else if (s == COMPLETING) // cannot time out yet Thread.yield(); // Wait for the node to be empty, create a new one else if (q == null) q = new WaitNode(); // Update waiters before joining the team else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // Wait timeout set else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { // Remove the node q waiting for the queue if it has timed out removeWaiter(q); // Return to task status return state; } // Continue to block waiting before timeout LockSupport.parkNanos(this, nanos); } // Blocking wait else LockSupport.park(this); } }
report
Called by get method, return task execution result (i.e. output) or throw error
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
isCancelled
Judge whether the task has been CANCELLED, including the status of cancel, INTERRUPTING and INTERRUPTED. Refer to the above state transition diagram, it can be seen that the task state transition will return true after calling the cancel method
public boolean isCancelled() { return state >= CANCELLED; }
isDone
Judge whether the task is completed, not in the NEW state, and other States belong to the completed task. No matter whether it is abnormal, cancelled or completed normally, refer to the description of the above state transition diagram
public boolean isDone() { return state != NEW; }
cancel
Cancel task execution. The mayInterruptIfRunning parameter is allow to cancel a task that is executing but not completed
public boolean cancel(boolean mayInterruptIfRunning) { // If the status is not NEW creation / task execution or is in NEW status and the update status fails, it will directly return false. Only after the update is successful, can the following code continue to execute // Refer to task state transition diagram if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { // Get thread to execute interrupt when true // That is, interrupt the executed task thread try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state // CAS update state // Corresponding to the last one in the state transition diagram UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // Subsequent processing finishCompletion(); } return true; }
run
The execution method of the final task Thread is the same as the run method of Thread. The main process is as follows:
- Judge the current task status. If it has been executed or cancelled, it will return directly. If the status is NEW, it will also return directly if CAS fails to update the runner
- Perform tasks
- Get task execution results
- Set task status and execution result
public void run() { // If the task has been completed or cancelled, return directly // If the task is in NEW status, try to save the current execution thread to the runner. If it succeeds, continue to execute. If it fails, return directly if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // Get specific execution tasks Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // Execute tasks and get execution results result = c.call(); ran = true; } catch (Throwable ex) { // Execute exception set execution result to null // The state transition and subsequent processing are completed through setException result = null; ran = false; setException(ex); } if (ran) // After normal execution, the state transition and subsequent processing are completed through set set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) // Interrupt handling handlePossibleCancellationInterrupt(s); } }
set
Set the result after the normal execution of the task, which is called in the run method. We can see that CAS in the method updates the execution status of the task twice, corresponding to the first situation in the state transition diagram: New - > completing - > normal
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // outcome save task results outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state // Subsequent processing finishCompletion(); } }
setException
Similar to set, it is mainly called when there is an exception in task execution, which also corresponds to the second situation in the state transition diagram: New - > completing - > excel
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
finishCompletion
If the task is executed normally, if it is abnormal or cancelled, it will execute. Traverse the waiters list, wake up the waiting thread in all nodes, and then leave the callable in it empty
private void finishCompletion() { // assert state > COMPLETING; // ergodic for (WaitNode q; (q = waiters) != null;) { // Empty space if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; // Release. awaitDone has the corresponding LockSupport.park LockSupport.unpark(t); } // Empty space WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
runAndReset
Similar to run, the difference is that the task execution result will not be saved, and if the normal execution is completed, the set method will not be called to update the task status, similar to Runnable task
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { // setException will be called in case of throw error setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
removeWaiter
Remove the node node in the waiting list waiters
private void removeWaiter(WaitNode node) { // Non empty calibration if (node != null) { node.thread = null; // Update waiters retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
summary
So far, the source code of FutureTask has been basically analyzed, which is not too complex in essence. To implement the tasks of the Callable() interface, then submit the tasks to the thread pool for execution. We only need to get the execution results through the get method, and at the same time, we can cancel the execution of the tasks, which is convenient for operation. FutureTask itself has seven states, which can be understood by referring to the state transition diagram or better understanding of its source code implementation
If there is any problem in the above content, please point out that the author will correct it in time after verification. Thank you