The Thinking Logic of Computer Programs (77) - Asynchronous Task Execution Service

Java concurrent packages provide a framework that greatly simplifies the development required to perform asynchronous tasks. In this section, we will explore this framework initially.

In the previous introduction, thread Thread represents both the task to be performed and the execution mechanism. This framework introduces the concept of "execution service", which separates "task submission" from "task execution", and "execution service" encapsulates the details of task execution. For task submitters, it can focus on Tasks themselves, such as submitting tasks, obtaining results, canceling tasks, do not need to pay attention to the details of task execution, such as thread creation, task scheduling, thread closure, etc.

The above description may be abstract. Next, we will elaborate step by step.

Basic interface

First, let's look at the basic interfaces involved in task execution services:

  • Runnable and Callable: Indicates the asynchronous task to be performed
  • Executor and Executor Service: Representation of Executor Service
  • Future: Represents the result of an asynchronous task

Runnable and Callable

As far as Runnable and Callable are concerned, we have seen in the previous sections that all indicate tasks, Runnable does not return results, and Callable does, Runnable does not throw exceptions, and Callable does.

Executor and Executor Service

Executor represents the simplest execution service defined as:

public interface Executor {
    void execute(Runnable command);
}

It is possible to execute a Runnable without returning the result. The interface does not define how tasks are performed, either by creating a new thread, by multiplexing a thread in the thread pool, or by executing it in the caller thread.

Executor Service extends Executor and defines more services by:

public interface ExecutorService extends Executor {
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    //... Other methods
}

All three submits indicate submitting a task, and the return value type is Future. After returning, they only indicate that the task has been submitted, not that it has been executed. Through Future, the status of asynchronous tasks can be queried, the final results can be obtained, and the tasks can be cancelled. We know that for Callable, the task ultimately has a return value, but for Runnable, there is no return value. The second method submitting Runnable can provide a result at the same time, returning at the end of the asynchronous task, and for the third method, the final return value of the asynchronous task is null.

Future

Let's look at the definition of the Future interface:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException,

        ExecutionException, TimeoutException;
}

Get is used to return the final result of an asynchronous task. If the task is not completed, it blocks the wait. Another get method can limit the time of the blocking wait. If the timeout task is not finished, a TimeoutException will be thrown.

Cancel is used to cancel an asynchronous task. If the task has been completed, cancelled, or cannot be cancelled for some reason, cancel returns false or true. If the task has not yet started, it will no longer run. But if the task is already running, it cannot be cancelled. The parameter mayInterruptIfRunning indicates whether to call the interrupt method to interrupt the thread if the task is executing. If it is false, it will not. If it is true, it will try to interrupt the thread, but we will 69 quarter Know that interrupts do not necessarily cancel threads.

isDone and isCancelled are used to query task status. IsCancelled indicates whether the task has been cancelled. As long as the cancel method returns true, the subsequent isCancelled method returns true, even if the thread executing the task has not really ended. isDone indicates that whether or not the task is finished, regardless of the reason, may be the normal end of the task, may be the exception thrown by the task, or may be the task was cancelled.

Let's look at the get method again. The task ends up with three results:

  1. The get method returns the result of its execution if the task is Runnable and the result is not provided, and null is returned.
  2. The task execution throws an exception. The get method wraps the exception as Execution Exception and re-throws it. The original exception can be retrieved by the getCause method of the exception.
  3. The task is cancelled and the get method throws an exception CancellationException

If the thread calling the get method is interrupted, the get method throws an InterruptedException.

Future is an important concept. It is the key to realize the separation of "task submission" and "task execution". It is the "link" between task submitter and task execution service, through which task submitter and task execution service isolate their respective concerns and collaborate at the same time.

Basic Usage

Basic examples

Having said so many interfaces, how to use them? Let's take a simple example:

public class BasicDemo {
    static class Task implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            int sleepSeconds = new Random().nextInt(1000);
            Thread.sleep(sleepSeconds);
            return sleepSeconds;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Integer> future = executor.submit(new Task());

        // Simulate other tasks
        Thread.sleep(100);

        try {
            System.out.println(future.get());
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        executor.shutdown();
    }
}

We used the factory class Executors to create a task execution service. Executors has several static methods that can be used to create Executor Service. Here we use:

public static ExecutorService newSingleThreadExecutor()

Represents using one thread to execute all services. We will introduce Executors in detail later. Note the difference between Executors and Executors, which are singular and interfaces.

Regardless of how ExecutorService was created, the usage is the same for users. The example submits a task, after which it can continue to perform other things, and then it can get the final result or handle the exception of task execution through Future.

Finally, we call the shutdown method of ExecutorService, which closes the task execution service.

More Methods for Executor Service

We've just introduced three submit methods of ExecutorService, but in fact it has the following methods: ExecutorService, ExecutorService, ExecutorService, ExecutorService, ExecutorService, ExecutorService, ExecutorService and ExecutorService.

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

There are two closing methods, shutdown and shutdown Now. The difference is that shutdown means no longer accepting new tasks, but submitted tasks will continue to execute. Even if the tasks have not started to execute, shutdownNow not only does not accept new tasks, but also submitted tasks that have not been executed will be terminated. For the tasks being executed, it will generally be tuned. The interrupt method of the thread is used to attempt to interrupt, but the thread may not respond to the interrupt, and shutdownNow returns a list of tasks submitted but not yet executed.

shutdown and shutdownNow do not block the wait. When they return, they do not mean that all tasks have ended, but the isShutdown method returns true. The caller can wait for all tasks to end through awaitTermination, which can limit the waiting time. If all tasks are finished before the timeout, that is, the isTerminated method returns true, or false.

ExecutorService has two sets of batch submission methods, invokeAll and invokeAny, both of which have two versions, one of which limits the waiting time.

InvokeAll waits for all tasks to be completed. In the Future list returned, each Future's isDone method returns true. However, isDone for true does not mean that the task is successfully executed. It may be cancelled. invokeAll can specify the waiting time. If some tasks are not completed after the timeout, they will be cancelled.

For invokeAny, as long as one task returns successfully within the time limit, it will return the result of the task, other tasks will be cancelled, if no task can return successfully within the time limit, throw TimeoutException, if all tasks within the time limit are over, but all exceptions occur, throw ExecutionException. .

ExecutorService's invokeAll example

We are 64 quarter After introducing the use of jsoup to download and analyze HTML, we use it to see an example of invokeAll, download and analyze the titles of two URL s at the same time, output the content of the titles, the code is:

public class InvokeAllDemo {
    static class UrlTitleParser implements Callable<String> {
        private String url;

        public UrlTitleParser(String url) {
            this.url = url;
        }

        @Override
        public String call() throws Exception {
            Document doc = Jsoup.connect(url).get();
            Elements elements = doc.select("head title");
            if (elements.size() > 0) {
                return elements.get(0).text();
            }
            return null;
        }
    }

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        String url1 = "http://www.cnblogs.com/swiftma/p/5396551.html";
        String url2 = "http://www.cnblogs.com/swiftma/p/5399315.html";

        Collection<UrlTitleParser> tasks = Arrays.asList(new UrlTitleParser[] {
                new UrlTitleParser(url1), new UrlTitleParser(url2) });
        try {
            List<Future<String>> results = executor.invokeAll(tasks, 10,
                    TimeUnit.SECONDS);
            for (Future<String> result : results) {
                try {
                    System.out.println(result.get());
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        executor.shutdown();
    }

}

Here, another factory method of Executors, newFixedThreadPool, is used to create a thread pool so that multiple tasks can be executed concurrently. We will introduce the thread pool in the next section.

The rest of the code is relatively simple, so we won't explain it. Using ExecutorService, writing code for concurrent asynchronous tasks is like writing sequential programs. It does not need to care about the creation and coordination of threads. It only needs to submit tasks and process results, which greatly simplifies the development work.

Basic Realization Principle

Understanding the basic usage of ExecutorService and Future, let's look at their basic implementation principles.

The main implementation class of ExecutorService is ThreadPoolExecutor, which is based on thread pool. We will introduce the thread pool in the next section. ExecutorService has an abstract implementation class AbstractExecutorService. In this section, we briefly analyze its principle and implement a simple ExecutorService based on it. The main implementation class of Future is FutureTask. We will also briefly discuss its principle.

AbstractExecutorService

AbstractExecutorService provides default implementations of submit, invokeAll and invokeAny. Subclasses only need to implement the following methods:

public void shutdown()
public List<Runnable> shutdownNow()
public boolean isShutdown()
public boolean isTerminated()
public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException
public void execute(Runnable command) 

In addition to execute, other methods are related to the implementation of service life cycle management. For simplification, we ignore its implementation and mainly consider execute.

Subit/invokeAll/invokeAny eventually calls execute, which decides how to execute tasks. For simplicity, we create a thread for each task. A complete and simplest ExecutorService implementation class is as follows:

public class SimpleExecutorService extends AbstractExecutorService {

    @Override
    public void shutdown() {
    }

    @Override
    public List<Runnable> shutdownNow() {
        return null;
    }

    @Override
    public boolean isShutdown() {
        return false;
    }

    @Override
    public boolean isTerminated() {
        return false;
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        return false;
    }

    @Override
    public void execute(Runnable command) {
        new Thread(command).start();
    }
}

For the previous example, the code to create an ExecutorService can be replaced by:

ExecutorService executor = new SimpleExecutorService();

The same effect can be achieved.

The most basic method of ExecutorService is submit. How does it work? Let's look at the code for AbstractExecutorService:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

It calls newTaskFor to generate a Runnable Future. Runnable Future is an interface that extends both Runnable and Future without defining a new method. As Runnable, it represents the task to be executed and passes it to the execute method for execution. As Future, it also represents the asynchronous result of task execution. This may be confusing. Let's look at the specific code:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

That is to create a FutureTask object, FutureTask implements the Runnable Future interface. How did it come about?

FutureTask

It has a member variable that represents the task to be performed, declared as:

private Callable<V> callable;

An integer variable state represents the state, declared as:

private volatile int state;

The value may be:

NEW          = 0; //The initial state, or the task is running
COMPLETING   = 1; //Temporary state, the task is about to end, in the settings results
NORMAL       = 2; //Normal execution of tasks
EXCEPTIONAL  = 3; //Mission Execution Throws Abnormal End
CANCELLED    = 4; //Mission cancelled
INTERRUPTING = 5; //Tasks are being interrupted
INTERRUPTED  = 6; //Tasks interrupted

There is a variable that represents the final execution result or exception declared as:

private Object outcome; 

There is a variable that represents the thread running the task:

private volatile Thread runner;

There is also a one-way list of threads waiting for the results of task execution:

private volatile WaitNode waiters;

The construction method of FutureTask initializes callable and state. If FutureTask accepts a Runnable object, it calls Executors.callable to transform it into a Callable object, as follows:

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

The task execution service uses a thread to execute the run method of FutureTask. The run() code is:

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

Its basic logic is:

  • Call the call method of callable to catch any exceptions
  • If normal execution is complete, call set to set the result and save it to result
  • If an exception occurs in the execution process, call setException to set the exception. The exception is also saved to the result, but the state is different.
  • In addition to setting the result and modifying the state, set and setException call finishCompletion, which wakes up all threads waiting for the result.

For the task submitter, it obtains the result through the get method, and the code of the time-limited get method is as follows:

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

The basic logic is that if the task has not been completed, wait, and finally call report to report the result. The report returns the result or throws an exception according to the status. The code is:

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

The code of cancel method is:

public boolean cancel(boolean mayInterruptIfRunning) {
    if (state != NEW)
        return false;
    if (mayInterruptIfRunning) {
        if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
            return false;
        Thread t = runner;
        if (t != null)
            t.interrupt();
        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
    }
    else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
        return false;
    finishCompletion();
    return true;
} 

Its basic logic is:

  • If the task is terminated or cancelled, return false
  • If mayInterruptIfRunning is true, call interrupt interrupt to interrupt threads and set the state to INTERRUPTED
  • If mayInterruptIfRunning is false, set the state to CANCELLED
  • Call finishCompletion to wake up all threads waiting for results

invokeAll and invokeAny

Understanding FutureTask, let's look at other methods of AbstractExecutor Service. The basic logic of invokeAll is simple. For each task, create a FutureTask and call execute to execute, then wait for all tasks to end.

The implementation of invokeAny is slightly more complex. It uses Executor Completion Service. We will introduce this class and the implementation of invokeAny in the following chapters.

Summary

This section introduces the basic concepts and principles of task execution service in Java concurrent package. This service embodies the idea of "separation of concerns" in concurrent asynchronous development. Users only need to submit tasks through Executor Service and operate tasks and results through Future, without paying attention to the details of thread creation and coordination.

This section mainly introduces the basic principles of AbstractExecutor Service and FutureTask, implements a simplest execution service, SimpleExecutor Service, and creates a separate thread for each task. In practice, the most frequently used execution service is ThreadPool Executor based on thread pool, which is a very important concept and technology in concurrent programs. Let's discuss it in the next section.

(As in other chapters, all the code in this section is located at https://github.com/swiftma/program-logic)

----------------

To be continued, check the latest articles, please pay attention to the Wechat public number "Lao Ma Says Programming" (scanning the two-dimensional code below), from the entry to advanced, in-depth shallow, Lao Ma and you explore the essence of Java programming and computer technology. Be original and reserve all copyright.

Keywords: Java Programming REST github

Added by matifibrahim on Sun, 14 Jul 2019 21:30:42 +0300