Executor thread pool for java Concurrent Programming

Executor thread pool for java Concurrent Programming

1 Thread-Multithread-Process

a. Threads:
Threads are an entity of a process and the basic unit for CPU scheduling and allocation. Threads are smaller than processes and can run independently.Threads themselves have little system resources and only a few resources (such as program counters, a set of registers, and stacks) that are essential to running, but they can share all the resources owned by processes with other threads belonging to the same process.

b. Multithreading:
Multithreading means that multiple different threads can run simultaneously in a single program to perform different tasks.The purpose of multithreaded programming is to "maximize the use of CPU resources". When a thread's processing does not need to consume CPU but only deals with IOS and other resources, other threads that need to consume Cpu have other opportunities to obtain CPU resources.Fundamentally, this is the ultimate goal of multithreaded programming.

c. The difference between threads and processes

  • Processes are the smallest unit of resources allocated by the operating system, and threads are the smallest unit of operating system scheduling.
  • A program has at least one process and a process has at least one thread.
2-Thread Implementation
  • Inherit Thread class, override run method
public class ThreadDemo extends Thread {
    
    private static final Logger logger = LoggerFactory.getLogger(ThreadDemo.class);

    public static void main(String[] args) {
        ThreadDemo thread =  new ThreadDemo();
        thread.start();
    }

    @Override
    public void run() {
        logger.info("The mountains and rivers are different places, the wind and the moon are the same boat");
    }
}

  • Implement the Runnable interface, override the run method.
    public class RunnableDemo implements Runnable{
    private static final Logger logger = LoggerFactory.getLogger(RunnableDemo.class);
    
    public static void main(String[] args) {
        new Thread(new RunnableDemo()).start();
    }
    
    @Override
    public void run() {
        logger.info("The mountains and rivers are different places, the wind and the moon are the same boat");
    }
    

}
```

  • Create threads through Callable and FutureTask and implement their call() method
public class CallableDemo implements Callable<String> {
    private static final Logger logger = LoggerFactory.getLogger(CallableDemo.class);
    
    @Override
    public String call() throws Exception {
        return "Heterogeneous Mountains and Rivers";
    }

    public static void main(String[] args) throws InterruptedException,ExecutionException {
        FutureTask task = new FutureTask(new CallableDemo());
        new Thread(task).start();
        logger.info(task.get().toString());
    }
}
Differences among the three implementation threads

Implementing the Runnable interface offers the following advantages over inheriting the Thread class

  • Avoid limitations due to Java's single inheritance feature
  • Enhance the robustness of the program, code can be shared by multiple threads, code and data are independent
  • Thread pools can only be placed on threads that implement the Runable or Callable class, not directly on classes that inherit Thread
    The difference between implementing the Runnable interface and implementing the Callable interface
  • Runnable is available from Java 1.1 and Callable is added after 1.5
  • Task threads implementing the Callable interface can return execution results, while task threads implementing the Runnable interface cannot return results
  • The call() method of the Callable interface allows exceptions to be thrown, whereas the run() method of the Runnable interface can only digest exceptions internally and cannot continue to be thrown
  • Join the thread pool, Runnable uses ExecutorService's execute method, Callable uses submit method
    Note: The Callable interface supports returning execution results, at which point you need to call the FutureTask.get() method implementation, which will block the main thread until you get the return results. When this method is not called, the main thread will not block
4 Threads Life Cycle-State

state Explain
NEW Initial state, threads are built, but no start() method has been called
RUNNABLE Running state, JAVA threads collectively refer to the ready and running states in the operating system as'running'
BLOCKED Blocking state, indicating that a thread is blocking a lock
WAITING The wait state, which indicates that the thread has entered a wait state, indicates that the current thread needs to wait for other threads to make certain actions (notifications or interruptions)
TIME_WAITING Timeout wait state, which is different from WAITING and can be returned on its own at a specified time
TERMINATED Termination state indicating that the current thread has completed execution
5 tips View thread status:

JPS -> jstack PID (often used to determine deadlocks)

6 What is the Executor framework?
  • The main internal implementation of Executor is the thread pool, which controls the upper level scheduling through the thread pool.So Executor acts as a thread factory in a way that allows us to create thread pools for specific functions through the Executor framework.
7 Why is the Executor framework introduced?
  • Each time a task is executed, creating a thread, new Thread(), is more time consuming and resource consuming.
  • The threads created by the call to new Thread() are poorly managed, are called wild threads, and can be created indefinitely, competing with each other, resulting in excessive consumption of system resources and paralysis of the system.
  • Threads started with new Thread() are not conducive to extension, such as timed execution, periodic execution, timed periodic execution, thread interruption, etc. To solve this problem, JDK1.5 introduced the Executor framework to manage threads and dispatch threads, referring to the two-level dispatch model of the Executor framework.
Two-level scheduling model for 8 Executor framework

In the HotSpot virtual machine thread model, Java threads (java.lang.Thread) are mapped one-to-one to local operating system threads.A native operating system thread is created when the Java thread starts.However, when the Java thread terminates, the native operating system thread is also recycled.The operating system schedules all threads and assigns them to the available CPU s.
With the Executor framework, we can divide thread scheduling into two levels:

  • The upper layer is the application thread layer at the Java level
  • The lower layer is the lower thread layer at the local operating system level
    The application controls the upper scheduling through the Executor framework, while the lower scheduling is controlled by the operating system kernel, and the lower scheduling is not controlled by the application.
9 Thread Pool Framework

  • First layer structure

    • sun.nio.ch.AsynchronousChannelGroupImpl(Iocp) Asynchronous channel-AIO related implementation

    • java.util.concurrent.CompletableFuture.ThreadPerTaskExecutor (starts a thread execution)

    • sun.net.httpserver.ServerImpl.DefaultExecutor (more executor, direct execution)

    • com.sun.jmx.remote.internal.ClientNotifForwarder.LinearExecutor (Linear Executor)
      java.util.concurrent.ExecutorService (Core Executor Service)

  • Introduction to Interfaces

    • java.util.concurrent.Executor (executor, execution method)
    • java.util.concurrent.ExecutorService (Execute Service) contains the life cycle of a service
    • java.util.concurrent.ScheduledExecutorService (dispatch related services)
  • Core Implementation Class

    • java.util.concurrent.ThreadPoolExecutor (common thread pool implementation class)
    • java.util.concurrent.ScheduledThreadPoolExecutor (core implementation class for scheduling)
  • Auxiliary Classes

    • java.util.concurrent.Executors
  • Complete Service

    • java.util.concurrent.CompletionService
    • java.util.concurrent.ExecutorCompletionService
10 Thread Pool Execution Principle
  • The most complete construction method for the ThreadPoolExecutor parameter:
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

corePoolSize: Number of threads saved in the pool, including idle threads
* maximumPoolSize: Maximum number of threads allowed in the pool
Keep AliveTime: When the number of threads is greater than the core, this is the maximum amount of time that extra idle threads wait for a new task before terminating
* Unit:time unit of keepAliveTime parameter
* workQueue: Queue used to keep tasks in place before execution.This queue maintains only unnable tasks submitted by the execute method
* threadFactory: The factory used by the executor to create new threads
handler: executed due to exceeding thread scope and queue capacity

  • Thread Pool Running Ideas
    • If the current pool size poolSize is less than corePoolSize, a new thread is created to perform the task
    • If the current pool size poolSize is larger than corePoolSize and the waiting queue is not full, enter the waiting queue
    • If the current pool size poolSize is greater than corePoolSize and less than maximumPoolSize and the waiting queue is full, a new thread is created
      Execute Tasks
    • If the current pool size poolSize is greater than corePoolSize and greater than maximumPoolSize and the waiting queue is full, a rejection policy is invoked
      Briefly handle the task
    • Instead of exiting immediately after each thread in the thread pool has executed a task, it checks to see if there are any threaded tasks remaining in the waiting queue.
    • If you can't wait for a new task in keepAliveTime, the thread will exit
  • The addWorker method implements java.util.concurrent.ThreadPoolExecutor#addWorker
retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary. 
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;// Two scenarios 1. If not running 2. This is not the case (stop state and null object and workQueue is not equal to null)

    for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;// Determine if the capacity is saturated
        if (compareAndIncrementWorkerCount(c)) //Increase the number of jobs and skip out
            break retry;
        c = ctl.get();  // Re-read ctl continues recursion after failing to increase work
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);//Add a worker
    final Thread t = w.thread;
    if (t != null) {//Determine whether null
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.After locking and re-checking for thread factory failures or pre-lock closures
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();  
                workers.add(w);   //Add work
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) { //start run will be called this time if new work is added successfully
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;

  • java.util.concurrent.ThreadPoolExecutor#execute
if (command == null)
    throw new NullPointerException();
int c = ctl.get();
//Determining if it is smaller than the core number is to exit directly after a successful direct addition of work 
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();// Continue getting tags after adding failures
}
//Once you have determined that it is running and thrown into workQueue successfully
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
//check again to determine if the running state is not running then remove & reject
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0) //Otherwise, it is found that the number of possible running threads is 0, so add a null worker.
        addWorker(null, false);
}
else if (!addWorker(command, false)) //Increase worker directly If unsuccessful direct reject
    reject(command);

  • java.util.concurrent.ThreadPoolExecutor#addWorker
retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary. 
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;// Two scenarios 1. If not running 2. This is not the case (stop state and null object and workQueue is not equal to null)

    for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;// Determine if the capacity is saturated
        if (compareAndIncrementWorkerCount(c)) //Increase the number of jobs and skip out
            break retry;
        c = ctl.get();  // Re-read ctl continues recursion after failing to increase work
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);//Add a worker
    final Thread t = w.thread;
    if (t != null) {//Determine whether null
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.After locking and re-checking for thread factory failures or pre-lock closures
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();  
                workers.add(w);   //Add work
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) { //start run will be called this time if new work is added successfully
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;
Nineteen original articles were published. 5. 20,000 visits+
Private letter follow

Keywords: Java Programming less iOS

Added by M. Abdel-Ghani on Mon, 10 Feb 2020 04:39:56 +0200