ReentrantLock introduction and AQS source code

Lock

Lock is the core tool in J.U.C. its function is the same as synchronized explained earlier. It is also used to solve thread safety problems in multithreaded environment. In the J.U.C package, the lock mechanism is used in many places.

J. The full name of U.C is java.util.concurrent. It is a common toolkit in concurrent programming. This package contains many components used in concurrent scenarios, such as thread pool, blocking queue, timer, synchronizer, concurrent collection, etc. The author of the contract is the famous Doug Lea.

Before the emergence of Lock interface, the concurrent safe processing of multithreading in Java applications can only be solved based on the synchronized keyword. However, synchronized has some shortcomings in some scenarios, that is, it is not suitable for all concurrent scenarios. However, after Java 5, the emergence of Lock can solve the shortcomings of synchronized in some scenes, which is more flexible than synchronized.

Lock is an interface that defines abstract methods for releasing and obtaining locks. There are many classes that implement the lock interface. The following are common lock implementations:

  • ReentrantLock: represents a reentrant Lock. It is the only class that implements the Lock interface. Reentry Lock means that after a thread obtains a Lock, it does not need to be blocked to obtain the Lock again. Instead, it is directly associated with a counter to increase the number of reentries

  • ReentrantReadWriteLock: reentrant read-write Lock. It implements the ReadWriteLock interface. In this class, two locks are maintained, one is ReadLock and the other is WriteLock. Both of them implement the Lock interface respectively. Read write Lock is a tool suitable for solving thread safety problems in the scenario of more reading and less writing. The basic principles are: read and read are not mutually exclusive, read and write are mutually exclusive, and write and write are mutually exclusive. In other words, operations that affect data changes will be mutually exclusive.

  • stampedLock: stampedLock is a new locking mechanism introduced by JDK8. It can be simply regarded as an improved version of read-write lock. Although read and write lock can be completely concurrent by separating read and write, read and write conflict. If a large number of read threads exist, it may cause hunger of write threads. stampedLock is an optimistic read strategy so that optimistic locks do not block write threads at all

ReentrantLock

usage

Before Synchronized and lock upgrade As mentioned in, the result of count++ 10000 is less than 10000. The reason is non atomicity. One method is to add synchronized and the other is to add lock lock. The code is as follows:

public class ReentrantLockDemo {

    static ReentrantLock lock = new ReentrantLock();

    static int count = 0;

    public static void incr() {
        //Preemptive lock. If the lock is not preempted, it will be blocked
        lock.lock();
        try {
            count++;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10000; i++) {
            new Thread(ReentrantLockDemo::incr).start();
        }
        Thread.sleep(6000);
        System.out.println("result:" + count);
    }

}

The final result is 10000.

The above is the usage. It should be noted that you must use unlock() in the finally block to unlock.

Reentry lock

Reentry lock means that the same thread can repeatedly obtain the same lock, that is, if the current thread t1 obtains the lock by calling the lock method
After locking, calling lock again will not block to obtain the lock. Just increase the number of retries. synchronized and
Reentrantlocks are reentrant locks.

Class diagram

The following is the class diagram in ReentrantLock:


The ReentrantLock constructor is a non fair lock by default, and can also be specified.

public ReentrantLock() {
    sync = new NonfairSync();
}


public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

Let's go to the key part to explain the source code of AbstractQueuedSynchronizer(AQS).

AQS

Importance of AQS

Let's first introduce the importance of AQS (AbstractQueuedSynchronizer) and see which classes AQS is used in.

As shown in the figure, AQS is used in workers of ReentrantLock, ReentrantReadWriteLock, Semaphore, CountDownLatch and ThreadPoolExcutor (JDK 1.8). AQS is the underlying principle of these classes.

Many of the above classes are often used by us, and most of them have been introduced in detail in the previous class. Therefore, many important tool classes in the JUC package are inseparable from the AQS framework. Therefore, the importance of AQS is self-evident. AQS is the cornerstone of J.U.C.

tryAcquire(arg)

First, enter the lock method of the fair lock and see an acquire method


Click inside to enter the parent AQS class


Click try acquire to return to the specific subclass try acquire

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        //Here, c is the mark of the lock. 0 means that the lock was not robbed, 1 means that the lock was robbed, and 2,3,4,5. Represents the number of reentries
        int c = getState();
        //Indicates the unlocked state
        if (c == 0) {
            //If the queue is empty, go to CAS to grab the lock
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                //If the grab is successful, save the current thread in exclusiveOwnerThread
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //If you already have a lock, see if it's your own
        else if (current == getExclusiveOwnerThread()) {
            //If yes, go in and add the state + 1
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

Let's look at the lock method of unfair lock

final void lock() {
    //This is the embodiment of non-public lock. Whether there are waiting threads in your queue or not, first CAS to see if you can succeed
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        //If you don't succeed, continue to acquire
        acquire(1);
}

Or the method into the AQS class

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

Or click try acquire to enter the implementation of the unfair lock

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

Jump back to the Sync class. Note that it's very windy. Several classes jump back and forth, so you should first see the inheritance relationship of the above classes

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //No lock
    if (c == 0) {
        //CAS lock grab
        if (compareAndSetState(0, acquires)) {
           //If successful, record the current thread
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //If you already have a lock and are yourself, set state+1
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

At this point, we will talk about the difference between fair locks and unfair locks.

Suppose that thread A holds A lock and thread B requests the lock. Since thread A already holds the lock, thread B will fall into A wait. When waiting, thread B will be suspended, that is, it will enter A blocking state. When thread A releases the lock, it should be thread B's turn to wake up and obtain the lock, but if thread C suddenly cuts in line to request the lock, Then, according to the unfair strategy, the lock will be given to thread C. This is because waking up thread B requires A lot of overhead. It is likely that thread C has obtained the lock and released the lock after executing the task before waking up. Compared with the long process of waiting for thread B to wake up, jumping in the queue will make thread C skip the blocking process. If there is not much to be executed in the lock code, thread C can quickly complete the task and hand over the lock before thread B is fully awakened. This is A win-win situation for thread C, There is no need to wait, which improves its efficiency. For thread B, the time for it to obtain the lock is not delayed, because thread C will release the lock long after it is awakened, because the execution speed of thread C is very fast compared with the wake-up speed of thread B. therefore, Java designers design unfair locks to improve the overall operation efficiency.

Reflected in the above source code, the tryAcquire and nonfairTryAcquire methods are similar in length, and the only difference is that there is one more fair lock! hasQueuedPredecessors() determines the condition. If it is a fair lock, once a thread is already queued, the current thread will no longer try to obtain the lock; For unfair locks, no matter whether a thread is already queued, it will try to obtain the lock. If it cannot be obtained, it will queue again. An unfair lock is a critical area where the previous thread just released the lock and the state is 0. Then wake up the next thread to CAS to grab the lock. At this time, another thread just came in and broke the rules. CAS grabbed it first.

ReentrantLock is a non fair lock by default.

public ReentrantLock() {
    sync = new NonfairSync();
}

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

If the above acquire fails, it proves that a thread is in use, or the current thread cannot re-enter, so you need to add the current thread to the two-way linked list and wait. Acquirequeueueued (addwaiter (node. Exclusive), Arg) splits it into two methods.

  • Addwaiter (node. Exclusive) - > add a mutex node
  • Acquirequeueueueued() - > spin lock and blocking operations
    First look at addwaiter (node. Exclusive)
private Node addWaiter(Node mode) {
    //Encapsulate the current thread into a Node node.
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    //Tail node is not empty
    if (pred != null) {
        node.prev = pred;
        //Try to insert CAS into the tail of the chain
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //If it is empty or the insertion into the tail node fails
    enq(node);
    return node;
}


private Node enq(final Node node) {
    //spin
    for (;;) {
        Node t = tail;
        //The tail node is empty. Initialize
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                //Both the head node and the tail node point to an empty node
                tail = head;
        } else {
            node.prev = t;
            //Repeatedly insert CAS into the tail of the chain until it succeeds
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

Look at acquirequeueueueued ()

//After adding it to the tail, spin it first
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //spin
        for (;;) {
            final Node p = node.predecessor();
            //If the previous node is the head node, prove that you are in the first position, and then try acquire
            //tryAcquire has been analyzed above and is divided into fair locks and unfair locks
            if (p == head && tryAcquire(arg)) {
                //If you grab it, set yourself as the head node, and then return directly
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //Otherwise, let the thread park
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


//This method is mainly used to check the status to see if you can really rest, that is, enter the waiting status,
//In case the threads in front of the queue give up and wait there, this will certainly not work
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


private final boolean parkAndCheckInterrupt() {
    //LockSupport.park blocking
    LockSupport.park(this);
    //Interrupt status
    return Thread.interrupted();
}
  • SIGNAL=-1 here indicates that the successor node is waiting for the current node to wake up. When the successor node joins the queue, the status of the predecessor node will be updated to SIGNAL.
  • When WS > 0, only CANCELLED = 1, which means that the thread has been cancelled. In this case, you need to remove this node.

unlock()

Next, let's look at the unlocking process

public void unlock() {
    sync.release(1);
}

Enter release method

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        //Get the head node in the current AQS queue.
        Node h = head;
        //The head node is not empty
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

Unlock

protected final boolean tryRelease(int releases) {
    //This is minus one because it may be reentrant
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //If it is 0, it is completely released
    if (c == 0) {
        //Mark as released
        free = true;
        //Mark the owner of the current thread as null
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    //Indicates a wake-up state
    if (ws < 0)
        //Restore to 0
        compareAndSetWaitStatus(node, ws, 0);

    //The next node of the head node
    Node s = node.next;
    //If the node is empty or the thread has been destroyed, an exception occurs, and so on
    if (s == null || s.waitStatus > 0) {
        //Leave node empty
        s = null;
        //Start from the tail of the chain and find nodes less than or equal to 0
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //Wake up the thread if it is not empty
    if (s != null)
        LockSupport.unpark(s.thread);
}

flow chart

OK, the source code of AQS is analyzed here. It is very clear and simple. Use a diagram to demonstrate the above process and deepen your impression.


Finally, thank you for watching~

Keywords: Java Concurrent Programming lock

Added by pixelsoul on Sat, 04 Sep 2021 23:11:06 +0300