Future implementation analysis

Future implementation analysis

Or a little demo first

 @Test
    public void testFuture() throws ExecutionException, InterruptedException {
        FutureTask<String> test = new FutureTask<>(() -> {
            logMessage("I'm going to sleep");

            TimeUnit.SECONDS.sleep(5);
            logMessage("I wake up");
            return "1";
        });
        Thread thread = new Thread(test);
        thread.start();

        logMessage(test.get());

    }

    public static void logMessage(Object o) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(simpleDateFormat.format(new Date()) + "-" + Thread.currentThread().getName() + ":" + o);
    }

FutureTask parsing

Or from his inheritance relationship, what is the Future interface? Runnable, I know, but what is the Future?

/*
 * @see FutureTask
 * @see Executor
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this Future's {@code get} method
 * Future Indicates asynchronous execution. Several methods are provided to check whether the calculation is completed, wait for the calculation to be completed, and obtain the calculation results.
 * The main thing to remember is that it is used to perform asynchronous tasks. It also provides some useful APIs
 */
public interface Future<V> {

    /*
     1. Try to cancel the execution of the task. If the task has not been executed, cancel it directly and the task will not run again
     2. If the task has been completed, or has been cancelled, or cannot be cancelled for some other reason, false is returned
     3. If the task has started running, the mayInterruptIfRunning parameter indicates whether to interrupt the thread executing the task to stop the task
     
     After this method is called, the method calling isDone later will return true. If this method returns true, the subsequent call of isCancelled method will also return true
    */
    boolean cancel(boolean mayInterruptIfRunning);

    /*
     Determine whether the task has been cancelled
    */
    boolean isCancelled();

    /**
     If the task is completed, or is terminated normally, or is abnormal, or cancelled when it is completed, true will be returned
     */
    boolean isDone();

    /**
    Wait for the result, wait all the time
     */
    V get() throws InterruptedException, ExecutionException;

    /**
      Wait for the result and increase the timeout
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future means to obtain results asynchronously. It specifies some methods, depending on how the specific implementation is done. So let's look at FutureTask directly here

Look at the properties

   // Indicates the current state of this asynchronous calculation, 
   private volatile int state;

 // Here are some code values in different states.
    private static final int NEW          = 0; //newly build
    private static final int COMPLETING   = 1; // In progress
    private static final int NORMAL       = 2; // normal
    private static final int EXCEPTIONAL  = 3;// abnormal
    private static final int CANCELLED    = 4;// cancel
    private static final int INTERRUPTING = 5;// Interrupt in progress
    private static final int INTERRUPTED  = 6;// interrupt

   // Wait for the callable to run
    private Callable<V> callable;

 //Where the results are stored and are not volatile modified. Is to protect the read and write state
     // When there is a result, this is the result. If there is an exception, this is the exception. Therefore, if there is an exception in Future, it must be obtained through the get method
 // This reminds me of the thread pool. The submit method will also return a future. What will happen if the thread pool is abnormal?
    private Object outcome; 
   // Running thread
    private volatile Thread runner;
  // Waiting node queue, 
    private volatile WaitNode waiters;

These attributes will be continuously referenced in the following code and articles.

Start with the construction method

  1. public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    

    This construction method is certainly the most seen. It is just an assignment operation, which assigns the passed in callable and changes the state to NEW.

  2.   public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }
    

    I haven't used it. I haven't cared about it before. It's really special here. There is a resutl, and the two parameters are wrapped as Executors.callable(runnable, result);, Then look at this method.

      static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }
    

    Run the original task and return the result. Does that mean passing a Runnable, operating on the result, and finally returning the result. Like this one below

      @Test
        public void testFuture() throws ExecutionException, InterruptedException {
            HashMap<String, String> map = new HashMap<>();
            FutureTask<Map> test = new FutureTask<>(() -> {
                logMessage("I'm going to sleep");
                map.put("a","a");
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                logMessage("I wake up");
            },map);
            
            Thread thread = new Thread(test);
            thread.start();
            logMessage(test.get());
        }
    

Pass an empty map in, and then perform an operation and return. A little bit of a problem.

You know, he still inherits the Runnable interface and passes it to the Thread class, so Runnable must depend on the run method, and the Future must be manipulated in the run method.

run method analysis

 public void run() {
       //If the current status is not NEw or replacing cas, changing the runner to the current thread fails and returns directly without running.
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
           // Get callable
            Callable<V> c = callable;
            // Ready to run
            if (c != null && state == NEW) {
               // Accept results
                V result;
                boolean ran;
                try {
                  // Operation method,
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                   //If any exception occurs, change result to null and ran to false.
                    result = null;
                    ran = false;
                   //
                    setException(ex);
                }
              // If there are exceptions, you won't go to this method,
               // Normally, it must be this method. Here, it must set the result to the outcome.
                if (ran)
                    set(result);
            }
        } finally {
            // The cas operation is not used here. It is directly null to prevent calling again.
            runner = null;
            // After the runner becomes null, reread the value of state.
            // Why do you need to read again? state is modified by volatile. Is it to prevent instruction reordering.
            int s = state;
            // If the state is the state before INTERRUPTING, (new, COMPLETING, NORMAL, EXCEPTIONAL, canceled, INTERRUPTING) as long as it is not interrupted.
            if (s >= INTERRUPTING)
              
              // Looking at what this method does?
                handlePossibleCancellationInterrupt(s);
        }
    }
  

setException (exception occurs during operation, set the value, and change the state)

  // After an exception occurs, the method is called, and the operation is clear,
   protected void setException(Throwable t) {
      // Use the cas operation to change the status of the current future to being completed
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
           // Assign exception information to outcome
            outcome = t;
            //Change state to exception
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
           // Look at what this has done
            finishCompletion();
        }
    }
 

Handlepossibleconcellationinterrupt (if the state is INTERRUPTING, give up the right to use the cpu.)

 //The function of this method is not understood.
 // Look at the comments on the source code
 // This method is to ensure that the interrupt generated when cancel ing (true) is called can be delivered only when task run or task runAndReset is called.
 private void handlePossibleCancellationInterrupt(int s) {
        // If the status of the current future is INTERRUPTING, it means that the current future is being interrupted, and so on.
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
               // The current thread gives up execution and gives up the right to use the cpu.
                Thread.yield(); 

        // assert state == INTERRUPTED;

        // We want to clear any interrupt we may have received from
        // cancel(true).  However, it is permissible to use interrupts
        // as an independent mechanism for a task to communicate with
        // its caller, and there is no way to clear only the
        // cancellation interrupt.
        //
        // Thread.interrupted();
    } 

set (the task is executed successfully. set the value and change the status)

  // Or the same operation, state first change to COMPLETING, finally change to NORMAL, set the value, then call the finishCompletion method, wake-up wait.
  protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

Finishcompletement (wake up the waiting node and remove the waiting queue)

  // Look what's done in this method.
 // After reading this operation, you get all the waiting threads, wake up, remove them, and call the done method.
  private void finishCompletion() {
        // assert state > COMPLETING;
    
     // Get the waiting queue, start from scratch, loop through, unpark, remove, and start from scratch,
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                  
                    // Remove, help gc,
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        //This method is reserved for expansion
        done();

        callable = null;        // to reduce footprint
    }

Summary:

Future is essentially a Runnable. In the run method, the current thread will be set as the thread executing future. Call the call method of callable. When an exception occurs, it will first change the state to being completed, then set the exception to the result, and then change the state to exception. And wake up the waiting thread (loop from scratch, wake up) and remove it from the waiting queue (remove from scratch). If there is no exception, state is in the process of completion, setting the value, and finally becomes normal.

After the above execution, at the end, change the runner to null, and judge if the status is INTERRUPTING, the current thread waits, and so on.

Question:

  1. In the finally method, why should we judge to obtain state again.

    I really don't understand this. Is it to prevent instruction reordering.

  2. In the finally method, why make runner null

    Because finally is executed at the end of the method, the runner will be null regardless of success or failure. It can ensure that after the task is executed once, the currently executing thread is assigned null

  3. The handlepossibleconcellationinterrupt method appears only under what specific scenario.

get method analysis

 // get method. If state < = completing, it indicates that the operation has not ended. You have to wait. If not, report
public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

awaitDone analysis

 // This method is to add the current thread to the waiting queue. Note that WaitNode is decorated with volatile. I guess it must be the operation of cas.
// It should be noted that for the state returned by this method, the actual operation to obtain the value is obtained in the report method through different values of s.
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
  
      // Dead cycle
        for (;;) {
             // Whether the current Thread is interrupted. The interrupted method belongs to the Thread class, and this method will clear the status of thread intercept states.
           // If the current thread is interrupted,
            if (Thread.interrupted()) {
               // Remove it and throw an exception
                removeWaiter(q);
                throw new InterruptedException();
            }
            
           // future has been completed. Direct return to s
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // Just wait and give up the execution right of the cpu.
                Thread.yield();
          
            //If q is null, is this the first cycle? Here, it means that its stats < = completing, which means that the task has not been executed or just new,
           // Build a waiting node, connect to the queue, join the queue,
           // In the first loop, q must be null, so the wait queue is built
           // The first cycle is over,
           // When the second cycle comes, it won't q stop= null
            else if (q == null)
               //If q is constructed in the first loop, and then the interrupt is sent, passing it to the removeWaiter method will cause invalid linked list lookup.
                q = new WaitNode();
          
           // The second cycle came and began to join the team.
            else if (!queued)
               //The head insertion method is used here.
               // If the insertion is successful, the queued is true, and the third loop will not come here.
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
           // After joining the queue, if there is time, unpark the current thread and block the current thread on the current object,
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
               //Block on current object
                LockSupport.parkNanos(this, nanos);
            }
            else
               // Keep blocking and wait for unpark.
                LockSupport.park(this);
        }
    }

report method analysis

 //Judging from the status passed in, if the current status is NORMAL,
private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
   // If cancelled, interrupt, interrupt, throw exception
        if (s >= CANCELLED)
            throw new CancellationException();
  
  
      // If not, throw an ExecutionException and wrap the original exception in the ExecutionException
        throw new ExecutionException((Throwable)x);
    }

removeWaiter analysis

  // The premise is that the current node has been built. If the node is null, it really doesn't make sense
/**
 The premise is that the current thread has been interrupted.
1. At first, change the thread of node to null. In the awaitDone method, get the reference of the WaitNode node. First, make it null. The WaitNode thread must be modified by volatile. Therefore, it can be directly null here.
2. When it becomes null at first, it will be found in the subsequent operations, and then removed from the waiting queue.
3. Here is the deletion of a single linked list. There must be two pointers (the front node and the current node)
	- There are two cases
		1. The node to be deleted is the head node. if it is the head node to be deleted, the first two IFS will not go in. Go directly to the fourth and change q to s, that is, change the head node to the next head node.
    2. If not the head node
         The first if can always go in. If it is found, it will go in. The second if will be deleted directly first. pred.next = s; (to be deleted is p).
         		After looking at the following judgment, if the front node becomes null, it means that there are other threads to delete the pred node at this moment, then the current one will be abandoned directly
        
*/
private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }

summary

  1. When the get method is called, it will judge the state through the state. If the state is completed, it will go to the report method, or continue to judge through the state. If it is a normal result, it will return directly. If it is not normal, it will throw an exception.

    If it is not completed, it will go to the awaitDone method, where the waiting node will be built, added to the waiting queue through cas operation, and then park. In this process, it will also judge whether the current thread is interrupted. If an interrupt is sent, it will also remove the current thread from the head node.

  2. In other words, in get, a thread will cycle from blocking to at least twice. One is to build a node, the other is to replace cas, and then blocking. If cas replacement fails, it will continue. Therefore, there are at least two attempts.

Analysis of cancel method

    public boolean cancel(boolean mayInterruptIfRunning) {
       // Conditions not met
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    
           // 
            if (mayInterruptIfRunning) {
                try {
                   //Call the interrupt method to run the future thread
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                   // The status changes to INTERRUPTED
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
          // It has been analyzed above
            finishCompletion();
        }
        return true;
    }

That's all for the Future analysis. If there is anything wrong, please point it out. thank you.

Keywords: Java Multithreading

Added by Bramme on Mon, 20 Sep 2021 21:33:09 +0300