Asynchronous artifact: implementation principle and usage scenario of completable future

1. General

Completable future is jdk1 8 introduced implementation classes. Future and CompletionStage are extended. It is a future that can trigger some operations in the task completion stage. In short, asynchronous callback can be implemented.

2. Why is completable future introduced

For jdk1 The Future of 5 provides the ability to process tasks asynchronously, but the way to obtain results is not elegant. It still needs to be blocked (or rotation training). How to avoid blocking? It's actually a registration callback.

The industry implements asynchronous callback in combination with observer mode. That is, notify the observer when the task is completed. For example, Netty's ChannelFuture can process asynchronous results by registering and listening.

Netty's ChannelFuture

public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    checkNotNull(listener, "listener");
    synchronized (this) {
        addListener0(listener);
    }
    if (isDone()) {
        notifyListeners();
    }
    return this;
}
private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        if (checkNotifyWaiters()) {
            notifyListeners();
        }
        return true;
    }
    return false;
}

Register the listener through the addListener method. If the task is completed, the notifyListeners notification is called.

Completable Future extends the Future, introduces functional programming, and processes the results through callback.

3. Function

The function of completable future is mainly reflected in its CompletionStage.

It can realize the following functions

  • Convert (thenpose)
  • thenCombine
  • Consumption (thenAccept)
  • Run (thenRun).
  • Consumption with return (thenApply)
    Difference between consumption and operation:
    Consumption uses execution results. Run only runs specific tasks. You can check other functions according to your needs.

Completable future can realize chain call with the help of CompletionStage method. And you can choose synchronous or asynchronous.

Here is a simple example to experience its functions.

public static void thenApply() {
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
        try {
            //  Thread.sleep(2000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("supplyAsync " + Thread.currentThread().getName());
        return "hello";
    }, executorService).thenApplyAsync(s -> {
        System.out.println(s + "world");
        return "hhh";
    }, executorService);
    cf.thenRunAsync(() -> {
        System.out.println("ddddd");
    });
    cf.thenRun(() -> {
        System.out.println("ddddsd");
    });
    cf.thenRun(() -> {
        System.out.println(Thread.currentThread());
        System.out.println("dddaewdd");
    });
}

results of enforcement

supplyAsync pool-1-thread-1
helloworld
ddddd
ddddsd
Thread[main,5,main]
dddaewdd

According to the results, we can see that the corresponding tasks will be executed in an orderly manner.

be careful:

If yes, execute cf.thenRun synchronously. Its execution thread may be the main thread or the thread executing the source task. If the thread executing the source task finishes executing the task before the main call. Then the cf.thenRun method will be called by the main thread.

Here's an explanation. If there are multiple dependent tasks for the same task:

If these dependent tasks are executed synchronously. So if these tasks are executed by the current calling thread (main), they are executed in an orderly way. If the thread executing the source task is executed, it will be executed in reverse order. Because the internal task data structure is LIFO.
If these dependent tasks are executed asynchronously, they will execute tasks through the asynchronous thread pool. The execution order of tasks cannot be guaranteed.
The above conclusion is obtained by reading the source code. Let's dig into the source code.

3. Source code tracking
Create completable future
There are many methods to create, and you can even directly create a new one. Let's take a look at the method of supplyAsync asynchronous creation.

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                   Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}
static Executor screenExecutor(Executor e) {
    if (!useCommonPool && e == ForkJoinPool.commonPool())
        return asyncPool;
    if (e == null) throw new NullPointerException();
    return e;
}

Input parameter Supplier, function with return value. If the method is asynchronous and the executor is passed, the passed in executor will be used to execute the task. Otherwise, use the public ForkJoin parallel thread pool. If parallelism is not supported, create a new thread to execute.

Here we need to note that ForkJoin performs tasks through daemon threads. Therefore, there must be a non daemon thread.

asyncSupplyStage method

static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                 Supplier<U> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<U> d = new CompletableFuture<U>();
    e.execute(new AsyncSupply<U>(d, f));
    return d;
}

A completable future for return will be created here.

Then construct an AsyncSupply and pass the created completable future as a construction parameter.
Then, the execution of the task depends entirely on AsyncSupply.

AsyncSupply#run

public void run() {
    CompletableFuture<T> d; Supplier<T> f;
    if ((d = dep) != null && (f = fn) != null) {
        dep = null; fn = null;
        if (d.result == null) {
            try {
                d.completeValue(f.get());
            } catch (Throwable ex) {
                d.completeThrowable(ex);
            }
        }
        d.postComplete();
    }
}

1. This method will call the get method of Supplier. And set the result to completable future. We should be aware that these operations are invoked in asynchronous threads.

2. The d.postcomplete method is to notify the completion of task execution. Trigger the execution of subsequent dependent tasks, which is the key point to realize CompletionStage.
Before looking at the postComplete method, let's look at the logic for creating dependent tasks.

thenAcceptAsync method

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
    return uniAcceptStage(asyncPool, action);
}
private CompletableFuture<Void> uniAcceptStage(Executor e,
                                               Consumer<? super T> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    if (e != null || !d.uniAccept(this, f, null)) {
        # 1
        UniAccept<T> c = new UniAccept<T>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
    }
    return d;
}

Mentioned above. thenAcceptAsync is used to consume completable future. This method calls uniAcceptStage.

Unacceptstage logic:

1. Construct a completable future, mainly for chain call.

2. If it is an asynchronous task, return directly. Because the asynchronous thread will be triggered to execute the corresponding logic after the end of the source task.

3. If it is a synchronization task (e==null), d.uniAccept method will be called. This method is logical here: if the source task is completed, call f and return true. Otherwise, enter the if code block (Mark 1).

4. If it is an asynchronous task, directly enter if (Mark 1).

Mark1 logic:

1. Construct a UniAccept and push it into the stack. Here, optimistic locking is realized through CAS.

2. Call c.tryFire method.

final CompletableFuture<Void> tryFire(int mode) {
    CompletableFuture<Void> d; CompletableFuture<T> a;
    if ((d = dep) == null ||
        !d.uniAccept(a = src, fn, mode > 0 ? null : this))
        return null;
    dep = null; src = null; fn = null;
    return d.postFire(a, mode);
}

1. The d.uniAccept method will be called. In fact, this method determines whether the source task is completed. If it is completed, the dependent task will be executed, otherwise false will be returned.

2. If the dependent task has been executed, call d.postFire, which is mainly the subsequent processing of Fire. The logic is different according to different modes.
To be brief here, in fact, mode has synchronization, asynchrony, and iteration. Iteration in order to avoid infinite recursion.

Here's the third parameter of the d.uniAccept method.

If it is an asynchronous call (mode > 0), null is passed in. Otherwise, this is passed in.
See the following code for the difference. If C is not null, the c.claim method will be called.

try {
    if (c != null && !c.claim())
        return false;
    @SuppressWarnings("unchecked") S s = (S) r;
    f.accept(s);
    completeNull();
} catch (Throwable ex) {
    completeThrowable(ex);
}

final boolean claim() {
    Executor e = executor;
    if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
        if (e == null)
            return true;
        executor = null; // disable
        e.execute(this);
    }
    return false;
}

The claim method is logical:

If the asynchronous thread is null. If synchronization is specified, return true directly. Finally, the upper function will call f.accept(s) to execute the task synchronously.
If the asynchronous thread is not null, use the asynchronous thread to execute this.
The run task of this is as follows. That is to call the tryFire method synchronously in the asynchronous thread. Achieve its purpose of being executed by asynchronous threads.

public final void run()                { tryFire(ASYNC); }

After reading the above logic, we basically understand the logic of dependent tasks.

In fact, it is to judge whether the source task is completed first. If it is completed, the task will be directly executed in the corresponding thread (if it is synchronous, it will be processed in the current thread, otherwise it will be processed in the asynchronous thread)

If the task is not completed, return directly, because after the task is completed, it will trigger the call of dependent tasks through postComplete.

postComplete method

final void postComplete() {
    /*
     * On each step, variable f holds current dependents to pop
     * and run.  It is extended along only one path at a time,
     * pushing others to avoid unbounded recursion.
     */
    CompletableFuture<?> f = this; Completion h;
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        if (f.casStack(h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;    // detach
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

Called after the source task is completed.

In fact, the logic is very simple, which is the dependent task of the iteration stack. Call the h.tryFire method. NESTED is to avoid recursive dead loops. Because FirePost will call postComplete. If it is NESTED, it is not called.

The contents of the stack are actually added when the dependent task is created. We have already mentioned above.

4. Summary

Basically, the above source code has analyzed the logic.

Because asynchronous operations are involved, we need to sort them out (here for fully asynchronous tasks):

1. After creating completable future successfully, the corresponding task will be executed through asynchronous thread.

2. If completable future has dependent tasks (asynchronous), the tasks will be added to the stack of completable future and saved. For subsequent execution of dependent tasks.

Of course, creating dependent tasks is not just adding them to the stack. If the source task has been executed when the dependent task is created, the current thread will trigger the asynchronous thread of the dependent task to process the dependent task directly. And it will tell the stack that other dependent task source tasks have been completed.

Mainly consider code reuse. So logic is relatively difficult to understand.

The postComplete method is called after the source task thread executes the source task. It can also be invoked after relying on the task thread.

The method of executing dependent tasks mainly depends on the tryFire method. Because this method may be triggered by many different types of threads, the logic also goes around a little. (other dependent task threads, source task threads, current dependent task threads)

If it is the current dependent task thread, the dependent task will be executed and other dependent tasks will be notified.
If it is a source task thread and other dependent task threads, convert the task to the dependent thread for execution. There is no need to notify other dependent tasks to avoid dead recursion.

I have to say that Doug Lea's coding is really art. The reusability of code is fully reflected in logic.

Link: https://blog.csdn.net/weixin_...

Finally, I would like to share with you some good github projects in my collection. The contents are still good. If you feel it helpful, you can give it to star by the way.

Keywords: Java

Added by trent2800 on Mon, 14 Feb 2022 17:43:05 +0200