How do I get the Thread return value? (FutureTask Interpretation)

Fallen life...

How do I get the Thread return value?

Occasionally see this question, for the standard jdk1.8, do we take the answer in minutes?
Answer: Simple, Callable, perfect solution, next question...

But what do the predecessors of jdk1.4 (and even earlier) do to get the thread return value? Or, disable the Callable skill, how to get the thread return value?
Well, this seems to be a problem of inter-thread communication; Runnable is the only weapon, which is troublesome and challenging!

First, define task Task

// Task
class Task implements Runnable{

    @Getter
    Object result;    //Return value

    @Override
    public void run() {
        try {
            // Simulate a time-consuming logic
            System.out.println(String.format("[%s] Execution in progress..",Thread.currentThread().getName()));
            TimeUnit.SECONDS.sleep(2L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // Calculate to get the final result
        result = Integer.valueOf(9987);
        
        System.out.println(String.format("[%s] completion of enforcement..",Thread.currentThread().getName()));
    }
}

After that, start the thread

public static void main(String[] args) {
    // Internal classes are used, in the following way new
    CallbackTest callbackTest = new CallbackTest();
    Task task = callbackTest.new Task();
    
    final String threadName = "T-1";
    Thread thread = new Thread(task,threadName);

    thread.start();
    
}

Okay, the T-1 thread started, and it looks like it can perform the task very well. The question is how to get the result of Task's return value in main method?
Running with the current code, the effect is absolutely T-1 threads flying alone, and main threads have nothing to do with it.

I have the ability to always find the easiest way to do it in the first place.

Simple Way

The main thread works harder and runs more errands to check the result status:

public static void main(String[] args) {
    CallbackTest callbackTest = new CallbackTest();
    Task task = callbackTest.new Task();
    final String threadName = "T-1";
    Thread thread = new Thread(task,threadName);
    thread.start();
    
    // Mai threads frequently check T-1 threads
    while (true){
        if(task.getResult()!=null){
            System.out.println(String.format("Result task=%s",task.getResult()));
            break;
        }

        // Let the cpu calm down a bit
        TimeUnit.MILLISECONDS.sleep(200L);
        System.out.println(String.format("[main] Diligent inspection result Medium ( result=%s)",task.getResult()));
    }
}

The operation effect may be as follows:

[T-1] is in execution.
[main] Check results diligently (result=null)
[main] Check results diligently (result=null)
[main] Check results diligently (result=null)
[main] Check results diligently (result=null)
[main] Check results diligently (result=null)
[main] Check results diligently (result=null)
[main] Check results diligently (result=null)
[main] Check results diligently (result=null)
[main] Check results diligently (result=null)
[T-1] Executed.
[main] check result diligently (result=9987)
Result task=9987

Although human sleep has been added to the cpu, it is still not a good solution. This scheme greatly troubles the main thread, and needs to check the operation of the sub-threads over and over again - whether the sub-threads assign the final results.
Is there a way to tell the main thread after T-1 runs?
As an old-fashioned (technically obsolete) programmer, the first thing I think about is the wait. notify combination.

Wait. notify combination

  • In the wait section, check the result value. If null, it means that T-1 has not been executed yet. Wait with ease.
public static void main(String[] args){

        CallbackTest callbackTest = new CallbackTest();
        Task task = callbackTest.new Task();
        final String threadName = "T-1";
        Thread thread = new Thread(task,threadName);
        thread.start();

        while (true){
            //Check the result status and wait until it has been assigned
            if(task.getResult()==null){
                System.out.println(String.format("[%s] Waiting for execution..",Thread.currentThread().getName()));
                synchronized (task){
                    try {
                        task.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

            if(task.getResult()!=null){
                System.out.println(String.format("Result task=%s",task.getResult()));
                break;
            }
        }

    }
  • The notify section adds wake-up logic
class Task implements Runnable{

    @Getter
    Object result;
    
    @Override
    public void run() {
        try {
            System.out.println(String.format("[%s] Execution in progress..",Thread.currentThread().getName()));
            TimeUnit.SECONDS.sleep(2L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
        result = Integer.valueOf(9987);
    
        //Wake up the object of wait
        synchronized (this){
            this.notify();
        }
        System.out.println(String.format("[%s] completion of enforcement..",Thread.currentThread().getName()));
    }
}

After the transformation, the implementation effect is as follows:

[T-1] is in execution.
[main] Waiting for execution.
[T-1] Executed.
Result task=9987

LockSupport Implementation

Actually, you can also use LockSupport, similar to wait / notify, to post the complete code directly.

public class CallbackTest {
    class Task implements Runnable{
    
        @Getter
        Object result;
    
        // Constructor Input Call Thread (main thread)
        Thread runner;
        Task(Thread runner){
            this.runner = runner;
        }
    
        @Override
        public void run() {
            try {
                System.out.println(String.format("[%s] Execution in progress..",Thread.currentThread().getName()));
                TimeUnit.SECONDS.sleep(2L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            result = Integer.valueOf(9987);
    
            //Wake up main thread
            synchronized (this){
                LockSupport.unpark(runner);
            }
            System.out.println(String.format("[%s] completion of enforcement..",Thread.currentThread().getName()));
        }
    }
    
    public static void main(String[] args) {
    
        CallbackTest callbackTest = new CallbackTest();
        Task task = callbackTest.new Task(Thread.currentThread());
        final String threadName = "T-1";
        Thread thread = new Thread(task,threadName);
    
        thread.start();
    
        while (true){
            if(task.getResult()==null){
                System.out.println(String.format("[%s] Waiting for execution..",Thread.currentThread().getName()));
                LockSupport.park(); //main thread blocking
            }
    
            if(task.getResult()!=null){
                System.out.println(String.format("Result task=%s",task.getResult()));
                break;
            }
        }
    
    }
}

Callable uses

So far, we can get the return value of Thread in our own way, and then review the answer at the beginning of the article.

Occasionally see this question, for the standard `jdk1.8', we take the answer in minutes?
Answer: Simple, `Callable', perfect solution, next question...

It was natural to answer Callable and see how it works first.

public class CallbackTest {

    class Task implements Callable<Object> {
    
        @Override
        public Object call() {
            try {
                // A Time-consuming Logic
                System.out.println(String.format("[%s] Execution in progress..",Thread.currentThread().getName()));
                TimeUnit.SECONDS.sleep(2L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return Integer.valueOf(9987);
        }
    }
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CallbackTest callbackTest = new CallbackTest();
        Task task = callbackTest.new Task();

        ExecutorService es = Executors.newSingleThreadExecutor();
        Future<Object> future = es.submit(task);
        System.out.println("Result:"+future.get());

        es.shutdown();

    }
}

The code is not complicated. The way to get the return value in demo is future.get(), which is a blocking method; it will always block until the sub-thread has finished executing (return). Unused development brothers (sisters?) Let's popularize science on our own. We won't explain much.

The Callable source code is as follows:

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Callable itself is an interface, there is no mystery, mystery on FutureTask or FutureTask.

The big deal is to see how the source code achieves the return value from other threads.

FutureTask Analysis

First look at FutureTask's status as a lake and river.

As you can see, FutureTask is the implementation class of Future interface and Runnable interface.
Let's take a look at how FutureTask relates to Callable.

The relationship between FutureTask and Callable

(Source code can be traced to the following; if you really don't understand it, you can jump directly to the conclusion at the end of this chapter.)

ExecutorService es = Executors.newSingleThreadExecutor();
Future<Object> future = es.submit(task);

Taking the submit method of ExecutorService in the example as the entry, the actual implementation method is the submit of AbstractExecutorService:

/* `AbstractExecutorService` */
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);    //Note 1 - Building FutureTask
    execute(ftask);    //Note 2 - Eventually the run method of ftash is called, that is, the run method of the FutureTask object constructed in `Step 1'.
    return ftask;
}
...

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);    //1.1 - Call FutureTask's constructor
}
  • Note 1 - Observe the constructor of FutureTask:

// callable is a member variable of FutureTask
private Callable<V> callable;
    
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;    //Assignment of member variables
    this.state = NEW;       // ensure visibility of callable
}

Conclusion 1: through ExecutorService The needle-threading thread.Callable Will eventually be assigned to FutureTask Membership variables

  • Note 2 - Follow up the executive section again, and pay attention to the analysis of the annotations:
/* `AbstractExecutorService`submit` */
public <T> Future<T> submit(Callable<T> task) {
    ...
    execute(ftask);    //Note 2 - Eventually the run method of ftash is called, that is, the run method of the FutureTask object constructed in `Step 1'.
    ...
}

    ↓↓↓↓↓
    ↓↓↓↓↓

/* ThreadPoolExecutor Excute */
public void execute(Runnable command) {
    ...
    addWorker(null, false);    //Add to worker
    ...
}

    ↓↓↓↓↓
    ↓↓↓↓↓
    
/* ThreadPoolExecutor AdWorker */
private boolean addWorker(Runnable firstTask, boolean core) {
    w = new Worker(firstTask);    //` Worker `Packaging
    final Thread t = w.thread;
    
    ...
            t.start();    //The thread in Note 3-worker executes the start method and calls the run method corresponding to Runnable
    ...
}

    ↓↓↓↓↓
    ↓↓↓↓↓
    
/* Internal class `Work` */
final Thread thread;    // Membership variables
Runnable firstTask;    // Membership variables

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;    //Assignment member variable thread
    this.thread = getThreadFactory().newThread(this);    //Create a new thread and assign the member variable firstTask
}

// The start at 3.1 - `Note 3', which executes the run method here, then calls the runWorker method
public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    ...
    Runnable task = w.firstTask;
    ...
                    task.run();    //##### Note that it will eventually be called here.#####
    ...
}

What is the task in task.run(), which is the FutureTask (Note 1) that was assigned at the beginning, and look at its run method

public void run() {
    ...
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();    //callable's call method, which is our own defined logic
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);    //Note 4 - Assignment Action
        }
    ...
}

Conclusion 2: The call process, after a series of turns, will eventually call Callable's call method (that is, our custom logic)

  • Note 4 - Look at the assignment here
//Membership variables
private Object outcome;

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;    //Assignment to member variables
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

The conclusion is that FutureTask wraps Callable and assigns the return value to the member variable after execution of the call method.

Next, we explore the acquisition of the return value, which is the implementation of Future.get().

Return value acquisition

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)    // 1 - Unfinished state, thread blocking
        s = awaitDone(false, 0L);
    return report(s);    // 2 - Completed state, directly acquired
}

// 1.1-Blocking
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    ...
        LockSupport.park(this);    //block
    ...

// 2.1 - Returns result
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

The logic of get() is not complicated:

  1. To determine the state, block if the execution is not completed at this time, or if the member variable outcome (reference to the return value of the call() method) has not been assigned a value.
  2. If outcom has been assigned a value at this time, the object is returned

Well, is it a little familiar? This is consistent with our own version of the logic!
Look at the set() method again and find the LockSupport. unpark (Thread) method inside as evidence.

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();    //It looks suspicious here.
    }
}

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (WAITERS.weakCompareAndSet(this, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);    //Roar, scream, catch you, sure enough, LockSupport.unpark(t)
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

We found the unpark corresponding to the park method, which proved that our inference was correct. The core idea of FutureTask implementation is consistent with our own implementation (especially LockSupport version), that is, the subthread is blocked when it is not completed and released when it is completed.

After the main logic analysis, we have two more appetizers.

Focus

LockSupport

Comparing with the implementation of source code, LockSupport is used, but the blocking method is different - park() vs park(Object blocker)

What's the difference? Referring to the official documents:

The three forms of park each also support a blocker object parameter. 
This object is recorded while the thread is blocked to permit monitoring and diagnostic tools to identify the reasons that threads are blocked. 
Record this object when a thread is blocked to allow monitoring and diagnostic tools to identify the cause of the thread blocking.
(Such tools may access blockers using method getBlocker(java.lang.Thread).) 

The use of these forms rather than the original forms without this parameter is strongly encouraged. 
park(Object blocker) with pending parameters is strongly recommended

The normal argument to supply as a blocker within a lock implementation is this.

According to the meaning of the document: the incoming blocker object is equivalent to a flag object, which is recorded when the thread is blocked. The following examples show the difference.

Examples are as follows: https://www.jianshu.com/p/835...)

private static void parkVsParkBlocker() {
    Thread t1 = new Thread(() -> {
        LockSupport.park();
    }, "t1");
    t1.start();

    Object blocker = new Object();
    Thread t2 = new Thread(() -> {
        LockSupport.park(blocker);
    }, "t2");
    t2.start();

    LockSupport.getBlocker(t2);

    unpark(t1, 60);
    unpark(t2, 60);
}

Print java stack trace of a given jvm process.
jstack jps -l | grep LockSupport | awk '{print $1}'

VarHandle

FutureTask, as an abstract tool class, takes into account the get() situation in multithreaded environment, which is not deliberately ignored in the previous article.
The data unification in concurrent environment is mainly achieved by the following volatile keywords + CAS. (Classical Model)

// Status, recording the execution of sub-threads
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

// Record sub-threads and run Callable.call() threads
private volatile Thread runner;
// Waiting Node, Link List
private volatile WaitNode waiters;

volatile keyword, which is mainly used to make other threads visible (visibility); what is CAS (Compare And Sweep)?

Essentially, it is an optimistic lock:

  1. Compare the current and guessed values of a variable at a memory address. If they are identical, the atom replaces the variable with a new value.
  2. If not, return false

Before jdk 9, it relied mainly on Unsafe; jdk 9 started with VarHandle, designed to replace AtomicXX and make it easier for developers to use some of Unsafe's powers.

Take the change of state state as an example:

private volatile int state;

/* Declarations and assignments */
private static final VarHandle STATE;
static{
    try {
        MethodHandles.Lookup l = MethodHandles.lookup();    //1 - Declare the MethodHandles.Lookup object through MethodHandles.lookup()
        STATE = l.findVarHandle(FutureTask.class, "state", int.class);    //2 - Assignment of VarHandle STATE, at which point STATE and state establish some connection
    } catch (ReflectiveOperationException e) {
        throw new ExceptionInInitializerError(e);
    }
}

/* call */
protected void set(V v) {
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {    // 3 - Current object, change state variable from NEW to COMPLETING
        outcome = v;
        STATE.setRelease(this, NORMAL); // final state
        finishCompletion();
    }
}

Keywords: Java JDK jvm

Added by vichiq on Thu, 29 Aug 2019 11:15:22 +0300