Fallen life...
How do I get the Thread return value?
Occasionally see this question, for the standard jdk1.8, do we take the answer in minutes?
Answer: Simple, Callable, perfect solution, next question...
But what do the predecessors of jdk1.4 (and even earlier) do to get the thread return value? Or, disable the Callable skill, how to get the thread return value?
Well, this seems to be a problem of inter-thread communication; Runnable is the only weapon, which is troublesome and challenging!
First, define task Task
// Task class Task implements Runnable{ @Getter Object result; //Return value @Override public void run() { try { // Simulate a time-consuming logic System.out.println(String.format("[%s] Execution in progress..",Thread.currentThread().getName())); TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException e) { e.printStackTrace(); } // Calculate to get the final result result = Integer.valueOf(9987); System.out.println(String.format("[%s] completion of enforcement..",Thread.currentThread().getName())); } }
After that, start the thread
public static void main(String[] args) { // Internal classes are used, in the following way new CallbackTest callbackTest = new CallbackTest(); Task task = callbackTest.new Task(); final String threadName = "T-1"; Thread thread = new Thread(task,threadName); thread.start(); }
Okay, the T-1 thread started, and it looks like it can perform the task very well. The question is how to get the result of Task's return value in main method?
Running with the current code, the effect is absolutely T-1 threads flying alone, and main threads have nothing to do with it.
I have the ability to always find the easiest way to do it in the first place.
Simple Way
The main thread works harder and runs more errands to check the result status:
public static void main(String[] args) { CallbackTest callbackTest = new CallbackTest(); Task task = callbackTest.new Task(); final String threadName = "T-1"; Thread thread = new Thread(task,threadName); thread.start(); // Mai threads frequently check T-1 threads while (true){ if(task.getResult()!=null){ System.out.println(String.format("Result task=%s",task.getResult())); break; } // Let the cpu calm down a bit TimeUnit.MILLISECONDS.sleep(200L); System.out.println(String.format("[main] Diligent inspection result Medium ( result=%s)",task.getResult())); } }
The operation effect may be as follows:
[T-1] is in execution. [main] Check results diligently (result=null) [main] Check results diligently (result=null) [main] Check results diligently (result=null) [main] Check results diligently (result=null) [main] Check results diligently (result=null) [main] Check results diligently (result=null) [main] Check results diligently (result=null) [main] Check results diligently (result=null) [main] Check results diligently (result=null) [T-1] Executed. [main] check result diligently (result=9987) Result task=9987
Although human sleep has been added to the cpu, it is still not a good solution. This scheme greatly troubles the main thread, and needs to check the operation of the sub-threads over and over again - whether the sub-threads assign the final results.
Is there a way to tell the main thread after T-1 runs?
As an old-fashioned (technically obsolete) programmer, the first thing I think about is the wait. notify combination.
Wait. notify combination
- In the wait section, check the result value. If null, it means that T-1 has not been executed yet. Wait with ease.
public static void main(String[] args){ CallbackTest callbackTest = new CallbackTest(); Task task = callbackTest.new Task(); final String threadName = "T-1"; Thread thread = new Thread(task,threadName); thread.start(); while (true){ //Check the result status and wait until it has been assigned if(task.getResult()==null){ System.out.println(String.format("[%s] Waiting for execution..",Thread.currentThread().getName())); synchronized (task){ try { task.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } if(task.getResult()!=null){ System.out.println(String.format("Result task=%s",task.getResult())); break; } } }
- The notify section adds wake-up logic
class Task implements Runnable{ @Getter Object result; @Override public void run() { try { System.out.println(String.format("[%s] Execution in progress..",Thread.currentThread().getName())); TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException e) { e.printStackTrace(); } result = Integer.valueOf(9987); //Wake up the object of wait synchronized (this){ this.notify(); } System.out.println(String.format("[%s] completion of enforcement..",Thread.currentThread().getName())); } }
After the transformation, the implementation effect is as follows:
[T-1] is in execution. [main] Waiting for execution. [T-1] Executed. Result task=9987
LockSupport Implementation
Actually, you can also use LockSupport, similar to wait / notify, to post the complete code directly.
public class CallbackTest { class Task implements Runnable{ @Getter Object result; // Constructor Input Call Thread (main thread) Thread runner; Task(Thread runner){ this.runner = runner; } @Override public void run() { try { System.out.println(String.format("[%s] Execution in progress..",Thread.currentThread().getName())); TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException e) { e.printStackTrace(); } result = Integer.valueOf(9987); //Wake up main thread synchronized (this){ LockSupport.unpark(runner); } System.out.println(String.format("[%s] completion of enforcement..",Thread.currentThread().getName())); } } public static void main(String[] args) { CallbackTest callbackTest = new CallbackTest(); Task task = callbackTest.new Task(Thread.currentThread()); final String threadName = "T-1"; Thread thread = new Thread(task,threadName); thread.start(); while (true){ if(task.getResult()==null){ System.out.println(String.format("[%s] Waiting for execution..",Thread.currentThread().getName())); LockSupport.park(); //main thread blocking } if(task.getResult()!=null){ System.out.println(String.format("Result task=%s",task.getResult())); break; } } } }
Callable uses
So far, we can get the return value of Thread in our own way, and then review the answer at the beginning of the article.
Occasionally see this question, for the standard `jdk1.8', we take the answer in minutes? Answer: Simple, `Callable', perfect solution, next question...
It was natural to answer Callable and see how it works first.
public class CallbackTest { class Task implements Callable<Object> { @Override public Object call() { try { // A Time-consuming Logic System.out.println(String.format("[%s] Execution in progress..",Thread.currentThread().getName())); TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException e) { e.printStackTrace(); } return Integer.valueOf(9987); } } public static void main(String[] args) throws ExecutionException, InterruptedException { CallbackTest callbackTest = new CallbackTest(); Task task = callbackTest.new Task(); ExecutorService es = Executors.newSingleThreadExecutor(); Future<Object> future = es.submit(task); System.out.println("Result:"+future.get()); es.shutdown(); } }
The code is not complicated. The way to get the return value in demo is future.get(), which is a blocking method; it will always block until the sub-thread has finished executing (return). Unused development brothers (sisters?) Let's popularize science on our own. We won't explain much.
The Callable source code is as follows:
@FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
Callable itself is an interface, there is no mystery, mystery on FutureTask or FutureTask.
The big deal is to see how the source code achieves the return value from other threads.
FutureTask Analysis
First look at FutureTask's status as a lake and river.
As you can see, FutureTask is the implementation class of Future interface and Runnable interface.
Let's take a look at how FutureTask relates to Callable.
The relationship between FutureTask and Callable
(Source code can be traced to the following; if you really don't understand it, you can jump directly to the conclusion at the end of this chapter.)
ExecutorService es = Executors.newSingleThreadExecutor(); Future<Object> future = es.submit(task);
Taking the submit method of ExecutorService in the example as the entry, the actual implementation method is the submit of AbstractExecutorService:
/* `AbstractExecutorService` */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); //Note 1 - Building FutureTask execute(ftask); //Note 2 - Eventually the run method of ftash is called, that is, the run method of the FutureTask object constructed in `Step 1'. return ftask; } ... protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); //1.1 - Call FutureTask's constructor }
- Note 1 - Observe the constructor of FutureTask:
// callable is a member variable of FutureTask private Callable<V> callable; public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; //Assignment of member variables this.state = NEW; // ensure visibility of callable }
Conclusion 1
: through ExecutorService The needle-threading thread.Callable Will eventually be assigned to FutureTask Membership variables
- Note 2 - Follow up the executive section again, and pay attention to the analysis of the annotations:
/* `AbstractExecutorService`submit` */ public <T> Future<T> submit(Callable<T> task) { ... execute(ftask); //Note 2 - Eventually the run method of ftash is called, that is, the run method of the FutureTask object constructed in `Step 1'. ... } ↓↓↓↓↓ ↓↓↓↓↓ /* ThreadPoolExecutor Excute */ public void execute(Runnable command) { ... addWorker(null, false); //Add to worker ... } ↓↓↓↓↓ ↓↓↓↓↓ /* ThreadPoolExecutor AdWorker */ private boolean addWorker(Runnable firstTask, boolean core) { w = new Worker(firstTask); //` Worker `Packaging final Thread t = w.thread; ... t.start(); //The thread in Note 3-worker executes the start method and calls the run method corresponding to Runnable ... } ↓↓↓↓↓ ↓↓↓↓↓ /* Internal class `Work` */ final Thread thread; // Membership variables Runnable firstTask; // Membership variables Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //Assignment member variable thread this.thread = getThreadFactory().newThread(this); //Create a new thread and assign the member variable firstTask } // The start at 3.1 - `Note 3', which executes the run method here, then calls the runWorker method public void run() { runWorker(this); } final void runWorker(Worker w) { ... Runnable task = w.firstTask; ... task.run(); //##### Note that it will eventually be called here.##### ... }
What is the task in task.run(), which is the FutureTask (Note 1) that was assigned at the beginning, and look at its run method
public void run() { ... Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); //callable's call method, which is our own defined logic ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); //Note 4 - Assignment Action } ... }
Conclusion 2: The call process, after a series of turns, will eventually call Callable's call method (that is, our custom logic)
- Note 4 - Look at the assignment here
//Membership variables private Object outcome; protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; //Assignment to member variables UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
The conclusion is that FutureTask wraps Callable and assigns the return value to the member variable after execution of the call method.
Next, we explore the acquisition of the return value, which is the implementation of Future.get().
Return value acquisition
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 1 - Unfinished state, thread blocking s = awaitDone(false, 0L); return report(s); // 2 - Completed state, directly acquired } // 1.1-Blocking private int awaitDone(boolean timed, long nanos) throws InterruptedException { ... LockSupport.park(this); //block ... // 2.1 - Returns result private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
The logic of get() is not complicated:
- To determine the state, block if the execution is not completed at this time, or if the member variable outcome (reference to the return value of the call() method) has not been assigned a value.
- If outcom has been assigned a value at this time, the object is returned
Well, is it a little familiar? This is consistent with our own version of the logic!
Look at the set() method again and find the LockSupport. unpark (Thread) method inside as evidence.
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); //It looks suspicious here. } } private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (WAITERS.weakCompareAndSet(this, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); //Roar, scream, catch you, sure enough, LockSupport.unpark(t) } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
We found the unpark corresponding to the park method, which proved that our inference was correct. The core idea of FutureTask implementation is consistent with our own implementation (especially LockSupport version), that is, the subthread is blocked when it is not completed and released when it is completed.
After the main logic analysis, we have two more appetizers.
Focus
LockSupport
Comparing with the implementation of source code, LockSupport is used, but the blocking method is different - park() vs park(Object blocker)
What's the difference? Referring to the official documents:
The three forms of park each also support a blocker object parameter. This object is recorded while the thread is blocked to permit monitoring and diagnostic tools to identify the reasons that threads are blocked. Record this object when a thread is blocked to allow monitoring and diagnostic tools to identify the cause of the thread blocking. (Such tools may access blockers using method getBlocker(java.lang.Thread).) The use of these forms rather than the original forms without this parameter is strongly encouraged. park(Object blocker) with pending parameters is strongly recommended The normal argument to supply as a blocker within a lock implementation is this.
According to the meaning of the document: the incoming blocker object is equivalent to a flag object, which is recorded when the thread is blocked. The following examples show the difference.
Examples are as follows: https://www.jianshu.com/p/835...)
private static void parkVsParkBlocker() { Thread t1 = new Thread(() -> { LockSupport.park(); }, "t1"); t1.start(); Object blocker = new Object(); Thread t2 = new Thread(() -> { LockSupport.park(blocker); }, "t2"); t2.start(); LockSupport.getBlocker(t2); unpark(t1, 60); unpark(t2, 60); }
Print java stack trace of a given jvm process.
jstack jps -l | grep LockSupport | awk '{print $1}'
VarHandle
FutureTask, as an abstract tool class, takes into account the get() situation in multithreaded environment, which is not deliberately ignored in the previous article.
The data unification in concurrent environment is mainly achieved by the following volatile keywords + CAS. (Classical Model)
// Status, recording the execution of sub-threads private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; // Record sub-threads and run Callable.call() threads private volatile Thread runner; // Waiting Node, Link List private volatile WaitNode waiters;
volatile keyword, which is mainly used to make other threads visible (visibility); what is CAS (Compare And Sweep)?
Essentially, it is an optimistic lock:
- Compare the current and guessed values of a variable at a memory address. If they are identical, the atom replaces the variable with a new value.
- If not, return false
Before jdk 9, it relied mainly on Unsafe; jdk 9 started with VarHandle, designed to replace AtomicXX and make it easier for developers to use some of Unsafe's powers.
Take the change of state state as an example:
private volatile int state; /* Declarations and assignments */ private static final VarHandle STATE; static{ try { MethodHandles.Lookup l = MethodHandles.lookup(); //1 - Declare the MethodHandles.Lookup object through MethodHandles.lookup() STATE = l.findVarHandle(FutureTask.class, "state", int.class); //2 - Assignment of VarHandle STATE, at which point STATE and state establish some connection } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } /* call */ protected void set(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { // 3 - Current object, change state variable from NEW to COMPLETING outcome = v; STATE.setRelease(this, NORMAL); // final state finishCompletion(); } }