Java blocking queue -- BlockingQueue

1. What is a blocking queue?

A blocking queue is a queue that supports two additional operations. These two additional operations are: when the queue is empty, the thread that gets the element will wait for the queue to become non empty. When the queue is full, the thread that stores the element waits for the queue to become available. Blocking queues are often used in the scenario of producers and consumers. Producers are threads that add elements to the queue, and consumers are threads that take elements from the queue. A blocking queue is a container where producers store elements, while consumers only get elements from the container.

Blocking queue provides four processing methods:

Methods \ processing methodsThrow exceptionReturn special valueAlways blockedTimeout exit
Insertion methodadd(e)offer(e)put(e)offer(e,time,unit)
Remove Method remove()poll()take()poll(time,unit)
Inspection methodelement()peek()Not availableNot available

Exception: when the blocking queue is full, inserting elements into the queue will throw an IllegalStateException("Queue full") exception. When the queue is empty, NoSuchElementException will be thrown when getting elements from the queue.


  • Return special value: the insertion method will return whether it is successful. If it is successful, it will return true. To remove a method, an element is taken from the queue. If not, null is returned
  • Always blocking: when the blocking queue is full, if the producer thread put s elements into the queue, the queue will always block the producer thread until it gets the data or exits in response to the interrupt. When the queue is empty, the consumer thread attempts to take elements from the queue, and the queue will block the consumer thread until the queue is available.
  • Timeout exit: when the blocking queue is full, the queue will block the producer thread for a period of time. If it exceeds a certain time, the producer thread will exit.

BlockingQueue is introduced in detail. The following are the main contents involved:

  • The core method of BlockingQueue
  • Overview of the members of the blocking queue
  • The principles of DelayQueue, ArrayBlockingQueue and LinkedBlockingQueue are introduced in detail
  • Thread pool and BlockingQueue
1. Initial knowledge of blocking queue

In the new Concurrent package, BlockingQueue solves the problem of how to "transmit" data efficiently and safely in multithreading. These efficient and thread safe queue classes bring great convenience for us to quickly build high-quality multithreaded programs. This article introduces all members of the BlockingQueue family in detail, including their respective functions and common usage scenarios.

The core method of BlockingQueue:

public interface BlockingQueue<E> extends Queue<E> {

    //Set the given element to the queue. If the setting is successful, return true, otherwise an exception will be thrown. If you are setting a value to a queue with a limited length, the offer() method is recommended.
    boolean add(E e);

    //Set the given element to the queue. If the setting is successful, it returns true; otherwise, it returns false The value of e cannot be null, otherwise a null pointer exception will be thrown.
    boolean offer(E e);

    //Set the element to the queue. If there is no extra space in the queue, the method will block until there is extra space in the queue.
    void put(E e) throws InterruptedException;

    //Set the given element to the queue within a given time. If the setting is successful, it returns true, otherwise it returns false
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    //Get the value from the queue. If there is no value in the queue, the thread will block until there is a value in the queue and the method obtains the value.
    E take() throws InterruptedException;

    //In a given time, get the value from the queue. If you don't get it, an exception will be thrown.
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    //Gets the space remaining in the queue.
    int remainingCapacity();

    //Removes the specified value from the queue.
    boolean remove(Object o);

    //Determine whether the value is in the queue.
    public boolean contains(Object o);

    //Remove all the values in the queue and set them to a given set concurrently.
    int drainTo(Collection<? super E> c);

    //Specify the maximum number limit, remove all the values in the queue, and set them concurrently in the given collection.
    int drainTo(Collection<? super E> c, int maxElements);
}

Learn about ReentrantLock and Condition before going deeper:
ReentrantLock:
ReentrantLock lock can only be held by one thread lock at the same time point; Reentrant means that the ReentrantLock lock can be acquired by a single thread multiple times.
ReentrantLock is divided into "fair lock" and "unfair lock". The difference between them is whether the mechanism of obtaining locks is fair. "Lock" is to protect competing resources and prevent multiple threads from operating threads at the same time. ReentrantLock can only be obtained by one thread at the same time point (when a thread obtains the "lock", other threads must wait); Reentrant lock manages all threads that obtain the lock through a FIFO wait queue. Under the "fair lock" mechanism, threads queue up to obtain locks in turn; The "unfair lock" will acquire the lock when the lock is available, whether it is at the beginning of the queue or not.
Main methods:

  • lock() get lock
  • lockInterruptibly() gets the lock, but gives priority to responding to interrupts
  • tryLock() attempts to obtain a lock and returns true if it succeeds. Otherwise, if it is false, the method does not wait and returns immediately
  • tryLock(long time,TimeUnit unit) attempts to obtain a lock within a given time
  • unlock() release lock

Condition: await() and signal() methods correspond to wait() and notify() of the previous Object respectively

  • Used with reentry lock
  • await() is the current thread waiting to release the lock at the same time
  • Awaituninterruptible() does not respond to interruptions while waiting
  • signal() is used to wake up a waiting thread and the corresponding singalAll() method
2. Blocking queue members
queueBoundednesslockdata structure
ArrayBlockingQueueBoundedLockarrayList
LinkedBlockingQueueoptionally-boundedLocklinkedList
PriorityBlockingQueueunboundedLockheap
DelayQueueunboundedLockheap
SynchronousQueueboundedLocknothing
LinkedTransferQueueunboundedLockheap
LinkedBlockingDequeunboundedNo lockheap

The following is a brief introduction:

  • ArrayBlockingQueue: it is a bounded blocking queue implemented by array. This queue sorts the elements according to the principle of first in first out (FIFO). Support fair lock and unfair lock. [Note: each thread may queue up to wait when obtaining a lock. If the request of the thread that obtains the lock first must be satisfied first in terms of waiting time, the lock is fair. On the contrary, the lock is unfair. Obtaining a lock fairly means that the thread with the longest waiting time obtains the lock first]

  • LinkedBlockingQueue: a bounded queue composed of linked list structure. The length of this queue is integer MAX_ VALUE. This queue is sorted in first in first out order.
  • PriorityBlockingQueue: an unbounded queue that supports thread priority sorting. It sorts in natural order by default. You can also customize the implementation of compareTo() method to specify the element sorting rules. The order of elements with the same priority cannot be guaranteed.
  • DelayQueue: an unbounded queue that implements PriorityBlockingQueue to achieve delayed acquisition. When creating elements, you can specify how long it takes to obtain the current element from the queue. Elements can only be obtained from the queue after the delay expires. (DelayQueue can be used in the following application scenarios: 1. Cache system design: you can use DelayQueue to save the validity period of cache elements, and use a thread to query DelayQueue circularly. Once the elements can be obtained from DelayQueue, it indicates that the validity period of cache is up. 2. Scheduled task scheduling. Use DelayQueue to save the tasks and execution time that will be executed on the day, and once from delayqu The task will be executed when it is obtained from eue. For example, TimerQueue is implemented using DelayQueue.)
  • SynchronousQueue: a blocking queue that does not store elements. Each put operation must wait for the take operation, otherwise elements cannot be added. Support fair lock and unfair lock. One usage scenario of synchronous queue is in the thread pool. Executors.newCachedThreadPool() uses SynchronousQueue. This thread pool creates new threads as needed (when a new task arrives). If there are idle threads, they will be reused, and the threads will be recycled after being idle for 60 seconds.
  • LinkedTransferQueue: an unbounded blocking queue composed of linked list structure, which is equivalent to other queues. The LinkedTransferQueue queue has more transfer and tryTransfer methods.
  • LinkedBlockingDeque: a bidirectional blocking queue composed of linked list structure. Elements can be added and removed at the head and tail of the queue. When multithreading is concurrent, lock contention can be reduced to half at most.

Next, focus on ArrayBlockingQueue, LinkedBlockingQueue and DelayQueue

3. Principle and application of blocking queue

(1)DelayQueue

The generic parameter of DelayQueue needs to implement the Delayed interface, which inherits the Comparable interface. The DelayQueue uses a non thread safe priority queue and uses the Leader/Followers mode to minimize unnecessary waiting time. DelayQueue does not allow null elements.

Leader/Followers mode:

  1. There are several threads (generally forming a thread pool) to handle a large number of events
  2. There is a thread as the leader, waiting for the event to happen; Other threads, as followers, just sleep.
  3. If there is an event to deal with, the leader will appoint a new leader from his followers to deal with the event by himself.
  4. Wake up followers as new leaders waiting for events to happen.
  5. After the thread handling events is completed, it will become a follower until it is awakened to become a leader.
  6. If there are too many events to be processed and the number of threads is not enough (it is another matter to be able to create threads dynamically), some events may not be processed.

All threads have one of three identities: leader and follower, and a working state: proccesser. Its basic principle is that there is always at most one leader. And all followers are waiting to become leaders. When the thread pool starts, a leader will be automatically generated to wait for the network IO event. When an event occurs, the leader thread will first notify a follower thread to promote it to a new leader, and then go to work to deal with the network event. After processing, join the follower thread waiting queue and wait to become a leader next time. This method can enhance the similarity of CPU cache and eliminate dynamic memory allocation and data exchange between threads.
Parameters and constructors:

    // Reentrant lock
    private final transient ReentrantLock lock = new ReentrantLock();
    
    // Queue for storing queue elements - priority queue
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    //Thread element for optimizing blocking notification: leader, Leader/Followers mode
    private Thread leader = null;

    //Condition object used to implement blocking and notification
    private final Condition available = lock.newCondition();
    
    public DelayQueue() {}
    
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

First look at the offer() method:

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            // If the original queue is empty, reset the leader thread and notify the available condition
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

    //Because the DelayQueue does not limit the length, there will be no blocking when adding elements because the queue is full. Therefore, the timeout setting of the offer method with timeout does not work
    public boolean offer(E e, long timeout, TimeUnit unit) {
        // It is the same as the offer method without timeout
        return offer(e);
    }

Ordinary poll() method: if the delay time is not exhausted, it will directly return null

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }

Take another look at the take() method:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                // If the queue is empty, you need to wait for the available condition to be notified
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    // If the delay time has expired, the first element is returned directly
                    if (delay <= 0)
                        return q.poll();
                    // The presence of the leader thread indicates that there are other threads waiting, so the current thread must wait
                    else if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        // If there is no leader thread, set the current thread as the leader thread
                        // Try to wait until the delay time runs out (may return early, then next time)
                        // Loop will continue processing)
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            // If the leader thread is still the current thread, reset it for the next loop.
                            // While waiting for the available condition, the lock may be occupied by other threads, resulting in
                            // Therefore, the thread leader should be checked
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // If no other thread is waiting and the queue is not empty, notify the available condition
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

Finally, take a look at the poll method with timeout:

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    if (nanos <= 0)
                        return null;
                    else
                        // Try to wait for the available condition and record the remaining time
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    if (nanos <= 0)
                        return null;
                    // When the leader thread is not empty (at this time, delay > = nanos), the waiting time
                    // It seems that delay is more reasonable, but nanos is also OK, because it is ahead of the current thread
                    // When other threads return, they will wake up the available condition and return,
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            // nanos needs to be updated
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

(2)ArrayBlockingQueue

Parameters and constructors:

    // An array of storage queue elements
    final Object[] items;

    // Take the index of the data and use it for take, poll, peek and remove methods
    int takeIndex;

    // Put the index of data, which is used for put, offer and add methods
    int putIndex;

    // Number of elements
    int count;

    // Reentrant lock
    final ReentrantLock lock;
    // notEmpty condition object, created by lock
    private final Condition notEmpty;
    // notFull condition object, created by lock
    private final Condition notFull;

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);//The blocking queue of unfair lock is constructed by default 
    }
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        //Initialize ReentrantLock and re-enter the lock. The same lock is available for outgoing and incoming teams 
        lock = new ReentrantLock(fair);
        //Initialize non empty wait queue
        notEmpty = lock.newCondition();
        //Initialize the non full wait queue 
        notFull =  lock.newCondition();
    }
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            //Adds a collection to a queue of arrays 
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

Implementation principle of added:

 

The final call of the add method and offer method here is enqueue(E x) method, which directly adds elements to the array items through the putIndex index. Here, you may wonder that when the putIndex index size is equal to the array length, you need to reset the putIndex to 0, because the current queue always obtains elements from the queue head, The added elements are obtained from the end of the queue, so when the queue index (starting from 0) is equal to the length of the array, we need to add from the head of the array next time, as shown in the following figure

//Queue operation
    private void enqueue(E x) {
        final Object[] items = this.items;
        //The array is assigned a value through the putIndex index
        items[putIndex] = x;
        //The index increases automatically. If it is the last position, reset putIndex = 0;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

Next, let's look at the method:
The put method is a blocking method. If the queue element is full, the current thread will be suspended by the notFull condition object and added to the waiting queue. It will not wake up to perform the add operation until the queue has a gap. However, if the queue is not full, the enqueue(e) method is called directly to add elements to the array queue. So far, we have analyzed the three addition methods, namely put, offer and add. Under normal circumstances, offer and add are non blocking addition, while put method is blocking addition. This is the process of adding a blocking queue. To put it plainly, when the queue is full, the thread that calls the put method is blocked by conditional object Condtion until the thread is awakened again. Generally speaking, there are two situations in the execution of adding threads. One is that when the queue is full, the new put thread will be added to the notFull condition queue and wait. The other is that there is a removal thread to perform the removal operation. If the removal is successful, the put thread will wake up at the same time, as shown in the figure below 

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //Cannot add an element when the number of queue elements is equal to the length of the array
            while (count == items.length)
                //Suspend the current calling thread and add it to the notFull conditional queue for wake-up call.
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

Removal implementation principle:

poll method, which gets and removes the header element of the queue. If the queue is empty, it returns null

    public E poll() {
      final ReentrantLock lock = this.lock;
       lock.lock();
       try {
           //Judge whether the queue is null. If it is not null, execute the dequeue() method, otherwise null will be returned
           return (count == 0) ? null : dequeue();
       } finally {
           lock.unlock();
       }
    }
    //Delete the queue header element and return
    private E dequeue() {
     //Get the data of the current array
     final Object[] items = this.items;
      @SuppressWarnings("unchecked")
      //Get the object to delete
      E x = (E) items[takeIndex];
      Add to array takeIndex Index position set to null
      items[takeIndex] = null;
      //The takeIndex index increases by 1 and determines whether it is equal to the length of the array,
      //If equality is reached, it is restored to 0
      if (++takeIndex == items.length)
          takeIndex = 0;
      count--;//Number of queues minus 1
      if (itrs != null)
          itrs.elementDequeued();//Update the element data in the iterator at the same time
      //The deleted element indicates that the queue is empty. Wake up the notFull condition object to add the thread and perform the add operation
      notFull.signal();
      return x;
    }

Next, look at the remove(Object o) method

    public boolean remove(Object o) {
        if (o == null) return false;
        //Get array data
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();//Lock
        try {
            //If the queue is not null at this time, this is to prevent concurrency
            if (count > 0) {
                //Gets the index of the next element to be added
                final int putIndex = this.putIndex;
                //Gets the index of the element currently to be deleted
                int i = takeIndex;
                //Execute a loop to find the element to delete
                do {
                    //Find the element to delete
                    if (o.equals(items[i])) {
                        removeAt(i);//Execute delete
                        return true;//true will be returned if the deletion is successful
                    }
                    //Determine whether the current deleted index is equal to the length of the array after adding 1
                    //If true, it indicates that the index has reached the end of the array, and i is set to 0
                    if (++i == items.length)
                        i = 0; 
                } while (i != putIndex);//Inheritance lookup
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

    //Deleting an element according to the index actually moves the element after deleting the index forward by one position
    void removeAt(final int removeIndex) {

     final Object[] items = this.items;
      //First judge whether the element to be deleted is the current queue header element
      if (removeIndex == takeIndex) {
          //If it is deleted directly
          items[takeIndex] = null;
          //Add 1 to the current queue header element and judge whether it is equal to the length of the array. If true, set it to 0
          if (++takeIndex == items.length)
              takeIndex = 0;
          count--;//Queue element minus 1
          if (itrs != null)
              itrs.elementDequeued();//Update data in iterator
      } else {
      //If the element to be deleted is not at the head of the queue,
      //Then you just need to cycle and iterate to move all the elements after the deleted element forward by one position
          //Get the index of the next element to be added as the end condition of loop judgment
          final int putIndex = this.putIndex;
          //Execution cycle
          for (int i = removeIndex;;) {
              //Gets the next index of the node index to be deleted
              int next = i + 1;
              //Judge whether it is the length of the array. If it is from the head of the array (the index is 0), find it
              if (next == items.length)
                  next = 0;
               //If the index searched is not equal to the index of the element to be added, the element can be moved again
              if (next != putIndex) {
                  items[i] = items[next];//Move the latter element forward to overwrite the element to be deleted
                  i = next;
              } else {
              //The elements after the removeIndex index are moved forward, and the last element is cleared after completion
                  items[i] = null;
                  this.putIndex = i;
                  break;//End cycle
              }
          }
          count--;//Queue element minus 1
          if (itrs != null)
              itrs.removedAt(removeIndex);//Update iterator data
      }
      notFull.signal();//Wake up add thread
    }

The delete process of the remove(Object o) method is relatively complex because it does not directly delete elements from the queue head. First, the thread obtains the lock first, and then judges that the queue count > 0. This is to ensure the safe execution of deletion operations in case of concurrency. Then get the index putIndex and takeIndex of the next source to be added as the end judgment of the subsequent cycle, because as long as putIndex and takeIndex are not equal, the queue is not ended. Then find the element index to be deleted through the while loop and execute the removeAt(i) method to delete. In the removeAt(i) method, two things are actually done: first, judge whether the queue header element is a deleted element. If it is deleted directly, wake up the adding thread; second, if the element to be deleted is not a queue header element, execute the loop operation, Move the elements after removeIndex one position forward from the index of the element to be deleted, and the element to be deleted will be replaced by the element after removeIndex, thus completing the deletion operation.

Then look at the take() method
The take method is actually very simple. If there is data in the queue, delete it and block it. Note that this blocking can be interrupted. If there is no data in the queue, join the notEmpty condition queue and wait (if there is data, take it directly and the method ends). If a new put thread adds data, the put operation will wake up the take thread and execute the take operation. The illustration is as follows 

    //When deleted from the queue header, the queue is blocked without elements and can be interrupted
     public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
          lock.lockInterruptibly();//interrupt
          try {
              //If the queue has no elements
              while (count == 0)
                  //Perform blocking operation
                  notEmpty.await();
              return dequeue();//If there are elements in the queue, delete them
          } finally {
              lock.unlock();
          }
        }

Finally, take a look at the peek() method, which is relatively simple. It directly returns the header element of the current queue without deleting any elements.

    public E peek() {
          final ReentrantLock lock = this.lock;
          lock.lock();
          try {
           //Returns the header element of the current queue directly, but does not delete it
              return itemAt(takeIndex); // null when queue is empty
          } finally {
              lock.unlock();
          }
      }

    final E itemAt(int i) {
          return (E) items[i];
      }

(3)LinkedBlockingQueue

Parameters and constructors:

    //Node class for storing data
    static class Node<E> {
        E item;
        Node<E> next;

        Node(E x) { item = x; }
    }
    // Capacity size
    private final int capacity;

    // Number of elements. Because there are 2 locks and race conditions exist, AtomicInteger is used
    private final AtomicInteger count = new AtomicInteger(0);

    // Head node
    private transient Node<E> head;

    // Tail node
    private transient Node<E> last;

    // The locks used to obtain and remove elements, such as take, poll, etc
    private final ReentrantLock takeLock = new ReentrantLock();

    // notEmpty condition object, which is used to suspend the thread performing deletion when there is no data in the queue
    private final Condition notEmpty = takeLock.newCondition();

    // Locks used when adding elements, such as put, offer, etc 
    private final ReentrantLock putLock = new ReentrantLock();

    // notFull condition object, which is used to suspend the added thread when the queue data is full 
    private final Condition notFull = putLock.newCondition();


    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }
4. BlockingQueue in thread pool

First look at the constructor

public ThreadPoolExecutor(int corePoolSize,
                         int maximumPoolSize,
                         long keepAliveTime,
                         TimeUnit unit,
                         BlockingQueue<Runnable> workQueue,
                         ThreadFactory threadFactory,
                         RejectedExecutionHandler handler){...}

TimeUnit: time unit; BlockingQueue: the waiting thread stores the queue; keepAliveTime: idle timeout time of non core thread, beyond which it will be recycled; RejectedExecutionHandler: the processing policy of the thread pool for rejecting tasks.
Custom thread pool: this construction method is critical to the type of queue.

  • When using bounded queue, if there are new tasks to be executed, if the actual number of threads in the thread pool is less than corePoolSize, threads will be created first,
  • If it is greater than corePoolSize, the task will be added to the queue,
  • If the queue is full, create a new thread on the premise that the number of bus processes is not greater than maximumPoolSize,
  • If the queue is full and the number of threads is greater than maximumPoolSize, the reject policy is executed. Or other customized methods.

Next, let's look at the source code:

  public void execute(Runnable command) {  
          if (command == null) //Cannot be an empty task  
              throw new NullPointerException();  
      //If the corePoolSize has not been reached, a new thread is added to execute the task  
          if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  
           //If the corePoolSize has been reached, tasks will be added to the work queue continuously  
              if (runState == RUNNING && workQueue.offer(command)) {  
              //The thread pool has no tasks  
                  if (runState != RUNNING || poolSize == 0)   
                      ensureQueuedTaskHandled(command);  
              }  
           //If the thread pool is not running or the work queue is full, but the current number of threads is less than the maximum allowable number of maximumPoolSize threads, continue to create threads to execute the task  
              else if (!addIfUnderMaximumPoolSize(command))  
              //When the maximum number of threads has been reached and the task queue is full, the saturation policy execution processor is called  
                  reject(command); // is shutdown or saturated  
          }  
  }  
  
  private boolean addIfUnderCorePoolSize(Runnable firstTask) {  
          Thread t = null;  
          final ReentrantLock mainLock = this.mainLock;  
          mainLock.lock();  
          //Changing several important control fields requires locking  
          try {  
              //The number of threads in the pool is less than the number of core threads, and it also needs to be the runtime  
              if (poolSize < corePoolSize && runState == RUNNING)  
                  t = addThread(firstTask);  
          } finally {  
              mainLock.unlock();  
          }  
          if (t == null)  
              return false;  
          t.start(); //Once created, execute the task immediately  
          return true;  
      }  
  
  private Thread addThread(Runnable firstTask) {  
          Worker w = new Worker(firstTask);  
          Thread t = threadFactory.newThread(w); //It is created by the thread factory and has the same group and priority. All threads are non background threads  
          if (t != null) {  
              w.thread = t;  
              workers.add(w); //Add to worker thread collection  
              int nt = ++poolSize;  
              if (nt > largestPoolSize)  
                  largestPoolSize = nt;  
          }  
          return t;  
      }  

Keywords: Java Multithreading

Added by feidakila on Wed, 09 Mar 2022 11:20:55 +0200