Future mode of JUC package

Future mode

Future pattern is a common design pattern in multithreading development. The core idea is asynchronous call, which makes the problem of serialization parallel and saves time.

When a program executes a task, the task may execute very slowly. It cannot return results immediately, but it can return a contract. Therefore, when the task is executed, we can execute other tasks and finally use the contract to obtain results.

Take chestnuts for example:

After buying a mobile phone online, the mobile phone will arrive in three days, but an order will be generated immediately. This order is the contract mentioned above. Then we don't have to wait for the arrival of the mobile phone all the time. We can do other things. When the express arrives, check the order, and then get the final result.

Future mode in JDK

The Future interface is similar to the previous contract. It calls the get method according to the Future object and finally obtains the result.

FutureTask interface realizes the transition from Callable interface object to Runnable interface object, which will be finally completed by Callable interface, and the call method of Callable interface returns the final result.

FutureTask class description

If you have some obstacles in English, please read the Chinese notes

/**
1, Revision Description:

1. This is different from the previous version of this class that relied on AbstractQueuedSynchronizer,
The main purpose is to prevent users from accidentally retaining the interrupt state during the cancellation of competition.

2. The synchronization control in the current design relies on tracking the completion through the "state" field updated by CAS,
And a simple Treiber stack for holding waiting threads.

2, Description:

1. As usual, we bypassed the overhead of using atomicxfield dupdate and directly used unsafe internal functions.

Task status:
1, Status description
1. The running status of this task is initially new.
2. The running state is only converted to the terminal state in the methods set, setException and cancel.
3. During completion, the state may present a transient value of completion (when setting the result) or interruption (only when interrupting the runner to meet cancellation (true)).
4. The transition from these intermediate states to the final state uses cheaper sequential / delayed writes because the values are unique and cannot be further modified.

2, Possible state transitions:

1. New - > finish - > Normal
2. New - > finish - > exception
3. New - > cancel
4. New - > interrupt - > interrupt
**/

/**
 * A cancellable asynchronous computation.  This class provides a base
 * implementation of {@link Future}, with methods to start and cancel
 * a computation, query to see if the computation is complete, and
 * retrieve the result of the computation.  The result can only be
 * retrieved when the computation has completed; the {@code get}
 * methods will block if the computation has not yet completed.  Once
 * the computation has completed, the computation cannot be restarted
 * or cancelled (unless the computation is invoked using
 * {@link #runAndReset}).
 *
 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
 * {@link Runnable} object.  Because {@code FutureTask} implements
 * {@code Runnable}, a {@code FutureTask} can be submitted to an
 * {@link Executor} for execution.
 *
 * <p>In addition to serving as a standalone class, this class provides
 * {@code protected} functionality that may be useful when creating
 * customized task classes.
 *
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this FutureTask's {@code get} methods
 */
public class FutureTask<V> implements RunnableFuture<V> {
    /*
     * Revision notes: This differs from previous versions of this
     * class that relied on AbstractQueuedSynchronizer, mainly to
     * avoid surprising users about retaining interrupt status during
     * cancellation races. Sync control in the current design relies
     * on a "state" field updated via CAS to track completion, along
     * with a simple Treiber stack to hold waiting threads.
     *
     * Style note: As usual, we bypass overhead of using
     * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
     */

    /**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

    // Inner class WaitNode 
    // Record the simple linked list node of the waiting thread in the Treiber stack.
    //For a more detailed description, see other classes, such as Phaser and SynchronousQueue.
        /**
     * Simple linked list nodes to record waiting threads in a Treiber
     * stack.  See other classes such as Phaser and SynchronousQueue
     * for more detailed explanation.
     */
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

Simple use

get method

package com.github.excelent01;

import java.util.concurrent.*;

/**
 * @auther plg
 * @date 2019/5/17 16:54
 */
public class TestFuture {
    public static void main(String[] args) {
        ExecutorService service = Executors.newSingleThreadExecutor();
        Future<Integer> future =  service.submit(()->{
            TimeUnit.SECONDS.sleep(10); // Analog delay
            return 10;
        });
        //==============================
        System.out.println("do other works.");
        //==============================

        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        service.shutdown();

    }
}

Future interface API

public interface Future<V> {

    /**
     * It is used to cancel a task. If the cancellation is successful, it returns true. If the cancellation fails, it returns false.
     * mayInterruptIfRunning The parameter indicates whether to allow canceling tasks that are being executed but have not been completed. If it is set to true, it indicates that tasks in the process of execution can be cancelled.
     * If the task has been completed, this method returns false whether mayInterruptIfRunning is true or false, that is, if you cancel the completed task, it will return false;
     * If the task is executing, return true if mayInterruptIfRunning is set to true; return false if mayInterruptIfRunning is set to false;
     * If the task has not been executed, it will definitely return true whether mayInterruptIfRunning is true or false.
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * Indicates whether the task was successfully cancelled. If it was successfully cancelled before the task was completed normally, it returns true
     */
    boolean isCancelled();

    /**
     * Indicates whether the task has been completed. If the task is completed, it returns true
     */
    boolean isDone();

    /**
     * Get the execution result. If the final result is not obtained, the method will block until the task is completed and the result is returned
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * Obtain the execution result. If the result is not obtained within the specified time, a TimeoutException will be thrown
     */
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Future is to query, interrupt and obtain results for the execution of Runnable or Callable tasks.

Because it takes 15min to boil water, there is no need to wait for nothing and waste time. During this time period, you can complete some work of preparing tea sets. After the tea sets are ready, wait for the water to boil. Therefore, this is a typical problem to be solved by relying on the Future model.

Code implementation:

package Future;

import java.util.Scanner;
import java.util.concurrent.*;

/**
 * @auther plg
 * @date 2019/5/17 17:54
 */
public class TeaTest2 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        BoilWater boilWater = new BoilWater();
        FutureTask<String> futureTask1 = new FutureTask<>(boilWater);
        ReadyTeaSet readyTeaSet = new ReadyTeaSet(futureTask1);
        FutureTask<String> futureTask2 = new FutureTask<>(readyTeaSet);
        new Thread(futureTask1).start();
        Thread.sleep(2000);
        new Thread(futureTask2).start();
        System.out.println(futureTask2.get());

    }
}
// T1 thread
class  BoilWater implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("T1: Kettle");
        Thread.sleep(1000);
        System.out.println("T1: Boil water");
        Thread.sleep(10000);
        return "T1: The water boiled.";
    }
}
// T2 thread ReadyTeaSet
class ReadyTeaSet implements Callable<String>{
    private FutureTask<String> futureTask = null;
    public ReadyTeaSet(FutureTask<String> futureTask) {
        this.futureTask = futureTask;
    }

    @Override
    public String call() throws Exception {
        System.out.println("T2: Washing cup");
        Thread.sleep(1000);
        System.out.println("T2: Wash the teapot");
        Thread.sleep(2000);
        System.out.println("T2: Take tea");
        Thread.sleep(1000);
        System.out.println("T2: Wait for the water to boil.");
        System.out.println(futureTask.get());
        return "A pot of good tea.";
    }
}

Operation results:
T1: kettle
T1: boiling water
T2: washing cup
T2: washing teapot
T2: take tea
T2: wait for the water to boil.
T1: the water is boiling.
A pot of good tea

Process finished with exit code 0

Simple comparison between normal mode and Future mode:

  1. Normal mode is serial when dealing with multi tasks. When encountering time-consuming operations, you can only wait until the blocking is removed, and then continue to execute the next task
  2. In the Future mode, only the time-consuming operation is initiated, and the function returns immediately. The actual execution of the specific operation is completed by another working thread, which will not block the client thread.

Therefore, when the worker thread performs time-consuming operations, the client does not need to wait. It can continue to do other things and obtain the results from the worker thread when necessary.

Detailed explanation of Future mode:

1. Future pattern is a common design pattern in multithreading design.

2. Its core idea is asynchronous call.

For the Future model, it can't immediately return the data you need, but it will return a contract. In the Future, you can use this contract to obtain the information you need.

Future mode can be simply understood as: I have a task, which is time-consuming, but I don't want to wait all the time, and sometimes the result of the task is not needed immediately, so I submit the task to future, which completes the task for me, and future returns the information of the task order to me.

Then I can do whatever I want without waiting.

When I need the result of this task, I can try to get the result of this task from Future according to the returned order information (of course, if it is not completed at this time, it will be blocked).

Of course, considering that the task may not be completed at this time, Future also supports whether the task is completed or not. Therefore, we can design different logic according to whether the task is completed or not.

FutureTask

After saying Future, because Future is an interface that cannot be directly used to create objects, we have the following FutureTask.

Let's take a look at the implementation of FutureTask:

You can see that the FutureTask class implements the RunnableFuture interface, and then look at the source code of the RunnableFuture interface:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

It can be seen that the RunnableFuture interface inherits the Runnable interface and the Future interface, that is, FutureTask can be executed by the thread as Runnable or get the return value of Callable as Future.

Manually implement Future mode:

The DataFuture class below is just a wrapper class. There is no need to block and wait when creating it.

After the worker thread prepares the data, use the setRealData method to pass in the data.

The client only needs to call the getRealData method when it really needs data. If the data is ready at this time, it will return immediately. Otherwise, the getRealData method will wait until the data is obtained.

DataFuture

public class DataFuture<T> {

    private T realData;
    private boolean isOK = false;

    public synchronized T getRealData() {
        while (!isOK) {
            try {
                // Wait if data is not ready
                wait();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return realData;
    }

    public synchronized void setRealData(T data) {
        isOK = true;
        realData = data;
        notifyAll();
    }

}

The following implements a server. When the client requests data from the server, the server will not load the real data immediately, but just create a DataFuture, create a sub thread to load the real data, and the server can directly return DataFuture.

Server

import java.util.concurrent.Executors;

public class Server {

    public DataFuture<String> getData() {
        final DataFuture<String> data = new DataFuture<>();

        Executors.newSingleThreadExecutor().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                data.setRealData("Final data");
            }
        });
        return data;
    }
}

Test code

The calling code of the client is as follows:

TestDataFuture



public class TestDataFuture {

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        Server server = new Server();
        DataFuture<String> dataFuture = server.getData();

        try {
            // Perform other operations first
            Thread.sleep(5000);
            // Simulation time consuming
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Result data:" + dataFuture.getRealData());
        System.out.println("time consuming: " + (System.currentTimeMillis() - start));
    }
}

test result

Result data: final data
Time: 5006

Process finished with exit code 0

Future deficiencies

There are a lot of advantages of Future, so there are no disadvantages?

As can be seen from the above example, the efficiency of using Future mode is significantly higher than that of the traditional mode. Using Future can make tasks in a thread pool execute asynchronously to a certain extent;

But there are also obvious disadvantages:

That is, the callback cannot be executed in a thread different from the task. The biggest problem of traditional callback is that it cannot separate the control flow into different event processors.

For example, if the main thread wants to wait for the results returned by each asynchronous execution thread to do the next operation, it must be blocked in future When the get () method waits for the result to return, it is actually synchronized again. If the execution time of a thread is too long, the situation will be even worse.

When it comes to Java 8, a new implementation class completable future is introduced to make up for the above shortcomings. The use of completable future will be explained in the next chapter.

key word: java training

Keywords: JUC

Added by jdesilva on Fri, 25 Feb 2022 05:22:28 +0200