Java JUC concurrent linkedqueue parsing

Research on the principle of concurrent linkedqueue

introduce

ConcurrentLinkedQueue is a thread safe unbounded non blocking queue. The bottom layer uses one-way linked list to achieve thread safety. CAS is used for incoming and outgoing operations.

The internal queue of ConcurrentLinkedQueue is implemented in the way of one-way linked list, in which two volatile Node nodes are used to store the head and tail nodes of the queue respectively.

As can be seen from the parameterless constructor below, the default head and tail nodes point to the sentinel node with null item. The new element is inserted into the tail of the queue, and the obtained element is out of the queue from the head.

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

Maintain a variable item decorated with volatile in the Node node to store the value of the Node; Next stores the next Node of the linked list; The CAS algorithm provided by the UNSafe tool class is used internally to ensure the atomicity of the operation linked list when entering and leaving the team.

offer operation

The offer operation is to add an element at the end of the queue. If the passed parameter is null, an exception will be thrown. Otherwise, because the ConcurrentLinkedQueue is an unbounded queue, the method will always return true. In addition, because the CAS algorithm is used, the method will not be blocked and the caller thread will not be suspended. Let's look at the source code:

public boolean offer(E e) {
        //(1) null throws an exception
        checkNotNull(e);
       //(2) Construct a Node and call unsafe internally putObject
        final Node<E> newNode = new Node<E>(e);
        //(3) Insert from tail node
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            //(4) If q == null, it means that p is the tail node and the insertion is performed
            if (q == null) {
                //(5) Use CAS to set the next node of p node
                if (p.casNext(null, newNode)) {
                    //(6) If P= t. Set the queued node as the tail node, and it doesn't matter if the update fails, because failure means that other threads have successfully updated the tail node
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
            }
            else if (p == q)
                //(7) In multi-threaded operation, since the poll operation may turn the head into a self reference after removing the element, that is, the next of the head becomes the head. Here, you need to find a new head
                p = (t != (t = tail)) ? t : head;
            else
                //(8) Find tail node
                p = (p != t && t != (t = tail)) ? t : q;
        }
}

Let's analyze the execution process of this method. First, when the method is called, the code (1) checks the passed parameters and throws an exception if it is null. Otherwise, execute the code (2) and use item as the constructor parameter to create a new node. Then the code (3) starts the loop from the tail node of the queue and intends to add elements from the tail, When code (4) is received, the queue status is shown in the following figure.

At this time, nodes p, t, head and tail all point to the sentinel node whose item is null. Since the sentinel node next node is null, q is also null.

Code (4) finds that q == null, execute code (5), use CAS to judge whether the next node of P node is null, use newNode to replace the next node of P, and then execute code (6), but at this time p == t, so the tail node is not set, and then exit the offer method. The current queue status is as follows:

Just now, we explained that one thread calls the offer method, but if multiple threads call at the same time, multiple threads will execute code (5) at the same time.

Suppose that thread 1 calls offer(item1) and thread 2 calls offer(item2), and both execute code (5) and p.casNext(null,newNode) at the same time. Since CAS is atomic, let's assume that thread 1 performs CAS operation first. If it finds that p.next is null, it will be updated to item1. At this time, thread 2 will also judge whether p.next is null. If it finds that it is not null, it will jump to code (3) and then execute code (4). The queue distribution is as follows:

Then, when thread 2 finds that code (4) is not satisfied, it jumps to execute code (8) and assigns q to p. The queue status is as follows:

Then thread 2 jumps to code (3) again. When executing code (4), the queue status is as follows:

At this time, q == null, so thread 2 will execute code (5) and judge whether the p.next node is null through CAS operation. If it is null, use item2 to replace it. If not, continue the circular attempt. Assuming CAS is successful, execute code (6) due to P= t. So set the tail node to item2 and exit the offer method. At this time, the queue distribution is as follows:

Up to now, we haven't talked about the difference code (7). In fact, this step is mainly executed after the poll operation. The following situations may occur during poll execution:

Let's analyze that if you call offer to add elements in this state, the state diagram is as follows when executing code (4):

Here, because the Q node is not empty and p == q, it is executed to code (7). Because t == tail, p is assigned as head, and then it loops again. After the loop, it is executed to code (4). The queue status is as follows:

At this time, since q == null, execute code (5) to perform CAS operation. If no other thread currently performs offer operation, the CAS operation will succeed, and the p.next node is set as a new node. Then execute code (6), due to P= T so set the new node as the tail node of the queue. The current queue status is as follows:

📢 It should be noted that the self referencing node here will be garbage collected

Summary: the key step of offer operation is code (5). CAS operation is used to control that only one thread can append elements to the end of the queue at the same time, and the failed thread will cycle to try CAS operation until CAS succeeds.

add operation

The add operation is to add an element at the end of the linked list, or to call the offer method internally.

public boolean add(E e) {
    return offer(e);
}

poll operation

The poll operation is to get and remove an element at the head of the queue. If the queue is empty, it returns null. Let's take a look at the implementation principle of this method.

public E poll() {
    //(1) goto tag
    restartFromHead:
    //(2) Infinite loop
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            //(3) Save current node
            E item = p.item;
                         //(4) The current node has a value cut and the CAS becomes null
            if (item != null && p.casItem(item, null)) {
                //(5) After CAS succeeds, mark the current node and remove it
                if (p != h) // hop two nodes at a time
                    //Update header node
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
           //(6) If the current queue is empty, null is returned
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            //(7) If the current node is self referenced, look for the header node again
            else if (p == q)
                continue restartFromHead;
            else  //(8) If the next element is not empty, the next node of the head node is set as the head node
                p = q;
        }
    }
}
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

Since the poll method obtains an element from the header and removes it, the inner loop of code (2) starts from the head node, and code (3) obtains the head node of the current queue. When the queue is empty at the beginning, the queue status is as follows:

Since the head node points to the sentinel node whose item is null, code (6) will be executed. Assuming that no thread calls the offer method in this process, q is equal to null. At this time, the queue status is as follows:

Then execute the updateHead method. Since h == p, the header node is not set, and then null is returned.

Suppose that when code (6) is executed, other threads have called the offer method and successfully added an element to the queue. At this time, q points to the newly added element node, as shown in the following figure:

Then, if code (6) (q = p.next) == null is not satisfied, P= Q executes code (8), then points P to node Q, and the queue status is as follows:

Then the program executes the code (3). If the point pointed by P is not null, execute p.casItem(item, null) and try to set the item value of P to null through CAS operation. If CAS succeeds at this time, execute the code (5) and P= h sets the header node to P, and sets the next node of h to itself, poll, and then returns the removed node item. The queue diagram is as follows:

The current queue state is the state in which code (7) is executed when we just talked about the offer operation.

Now we still have code (7) that has not been executed. Let's see when it will be executed. Suppose that when thread 1 performs the poll operation, the queue status is as follows:

Then execute p.casItem(item, null) and try to set the item value of P to null through CAS operation. If CAS setting is successful, mark the node and remove it from the queue. The queue status is as follows:

Then, because p= h. Therefore, the updateHead method will be executed. If another thread 2 starts the poll operation before thread 1 executes updateHead, the p of thread 2 points to the head node, but the code (6) has not been executed yet. At this time, the queue status is as follows:

Then thread 1 executes the updateHead operation. After execution, thread 1 exits. At this time, the queue status is as follows:

Then thread 2 continues to execute code (6), q = p.next. Since this node is a self referencing node, p == q, it will execute code (7) and jump to the outer loop restartFromHead to obtain the current queue head. The current status is as follows:

Summary: when the poll method removes an element, it simply uses the CAS operation to set the item value of the current node to null, and then removes the element from the queue by resetting the header node. The removed node becomes an isolated node, which will be recycled during garbage collection. In addition, if it is found that the head node has been modified in the execution branch, skip to the outer loop to obtain the new head node again.

peek operation

peek operation is to get an element at the head of the queue (only get it without removing it). If the queue is empty, null will be returned. Let's look at its implementation principle.

public E peek() {
   //1.
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            //2.
            E item = p.item;
            //3.
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            //4.
            else if (p == q)
                continue restartFromHead;
            //5.
            else
                p = q;
        }
    }
}

The code structure of peek operation is similar to that of poll operation, except that the castItem operation is missing in code (3). In fact, this is normal, because peek only gets the value of the queue header element and does not empty its value. According to the previous introduction, we know that after the offer is executed for the first time, the head points to the sentinel node (that is, the node with null item). When peek is executed for the first time, we will find item == null in code (3), and then execute q = p.next. At this time, the Q node points to the first real element in the queue, or if the queue is null, Q points to null.

When the queue is empty, the queue is as follows:

At this time, execute updateHead. Since the h node is equal to the p node, no operation will be performed, and then the peek operation will return null.

When there is at least one element in the queue (assuming only one), the queue status is as follows:

At this time, execute code (5), p points to node q, and then execute code (3). At this time, the queue status is as follows:

When executing code (3), it is found that item is not null, so the updateHead method is executed, because H= p. Therefore, after setting the header node, the queue status is as follows:

Finally, the sentinel node is eliminated.

Summary: the code of peek operation is similar to that of poll operation, except that the former only obtains the queue header element but does not delete it from the queue, while the latter needs to delete it from the queue after obtaining it. In addition, when the peek operation is called for the first time, the sentinel node will be deleted and the head node of the queue will point to the first element in the queue or be null.

size operation

Calculating the number of elements in the current queue is not very useful in a concurrent environment. Because CAS is not locked, elements may be added or deleted from calling the size method to returning the results, resulting in inaccurate statistics of the number of elements.

public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}
//Get the first queue element (excluding sentinel node), otherwise it will be null
Node<E> first() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

remove operation

If the element exists in the queue, delete the element. If there are multiple elements, delete the first one and return true. Otherwise, return false.

public boolean remove(Object o) {
    if (o != null) {
        Node<E> next, pred = null;
        for (Node<E> p = first(); p != null; pred = p, p = next) {
            boolean removed = false;
            E item = p.item;
            if (item != null) {
              // If not, get the next node to continue matching
                if (!o.equals(item)) {
                    next = succ(p);
                    continue;
                }
              //If equal, CAS is set to null
                removed = p.casItem(item, null);
            }
            // Gets the successor node of the deleted node
            next = succ(p);
            //If there is a precursor node and the next node is not null, link the precursor node to the next node
            if (pred != null && next != null) // unlink
              // Remove the deleted node from the queue through CAS
                pred.casNext(p, next);
            if (removed)
                return true;
        }
    }
    return false;
}

contains operation

Judge whether the queue contains the specified object. Because it traverses the whole queue, the result is not so accurate like the size operation. It is possible that the element is still in the queue when the method is called, but other threads delete the element during the traversal process, and false will be returned.

public boolean contains(Object o) {
    if (o == null) return false;
    for (Node<E> p = first(); p != null; p = succ(p)) {
        E item = p.item;
        if (item != null && o.equals(item))
            return true;
    }
    return false;
}

summary

The underlying layer of ConcurrentLinkedQueue uses a one-way linked list data structure to store queue elements, and each element is wrapped into a Node node. Queues are maintained by head and tail nodes. When creating a queue, the head and tail nodes point to a sentinel Node with null item. When the peek or first operation is performed for the first time, the head will be pointed to the first real queue element. Because the non blocking CAS algorithm is used and there is no lock, the offer, poll or remove operations may be performed when calculating the size, resulting in the inaccurate number of elements. Therefore, the size method is not very useful in the case of concurrency.

Keywords: Java queue JUC

Added by ben2k8 on Thu, 27 Jan 2022 12:23:55 +0200