ReentrantReadWriteLock source code analysis and AQS shared lock

Preface

The previous article explained the exclusive lock part of AQS (see: ReentrantLock source code analysis and AQS (1) ), this article will introduce the shared lock of AQS and ReentrantReadWriteLock based on the shared lock to separate the read and write locks. (if the method mentioned before is met, it will not be repeated)

First of all, why do we use read-write lock to separate?

We know that ReentrantLock uses exclusive lock. No matter whether the thread is in read or write state, it will block, which will undoubtedly reduce the amount of concurrency.

However, we know that when multiple threads read data at the same time, there is no thread safety problem because they do not interfere with each other. So why not design a scheme so that all the reading threads can share and read data together at the same time, just block the writing thread. While improving the concurrency, there will be no data inconsistency.

Similarly, if a thread is writing data, it will also block other reading threads (also block other writing threads), and the data can be read only after the data is written, so as to ensure that the data read is up-to-date.

Therefore, we can use read and write locks to control the reading and writing of data respectively. Realize read sharing, read-write mutual exclusion and write write mutual exclusion. This is also the origin of ReentrantReadWriteLock. It is very suitable for reading more and writing less scenes.

ReentrantReadWriteLock

Like ReentrantLock, it is also a reentrant lock, and based on AQS shared lock, it realizes read-write separation. Its internal structure is also similar, supporting fair lock and unfair lock. Let's take a look at its constructor,

public ReentrantReadWriteLock() {
    //Default unfairness
    this(false);
}

public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

It defines two inner classes to represent read lock and write lock, and both of them implement lock adding and lock releasing functions through inner class Sync.

public static class ReadLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -5992448646407690164L;
    private final Sync sync;

    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    ...
}

public static class WriteLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -4992448646407690164L;
    private final Sync sync;
    
    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    ...
}

abstract static class Sync extends AbstractQueuedSynchronizer {
}

Let's look at fair lock and unfair lock again. There are two important methods to judge whether read lock and write lock should be blocked. They will be used later when adding lock (in fact, whether the actual situation should be blocked still needs to be considered, which will be said later).

static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    //The read and write of fair lock need to judge whether there is a thread waiting in front of it.
    //In some cases, the current thread needs to be blocked, which also reflects fairness.
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;
    //Unfair lock. It does not need to block when writing. It returns false directly
    final boolean writerShouldBlock() {
        return false; // writers can always barge
    }
    final boolean readerShouldBlock() {
        //To avoid write thread starvation, it is necessary to determine whether the first queued (head.next) in the synchronization queue is an exclusive lock (write thread)
        //If so, the current read thread needs to be blocked, which is the method in AQS
        return apparentlyFirstQueuedIsExclusive();
    }
}

final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}

Reflection:

We know that the synchronization state and reentrant times of ReentrantLock are directly represented by the state value. Now, I need to read and write two locks. How can I use an int type value to represent the state of two locks? Moreover, the lock is reentrant. How to record the number of reentries?

Don't worry, said the next one.

How to use a state value to express the two locks of reading and writing?

State is a 32-bit int value. In the read-write lock, it is divided into two parts. The high 16 bits are used to represent the read state, and its value represents the number of threads of the read lock. As shown in the figure, there are three. The low 16 bits represent the write state, and its value represents the number of re entrances of the write lock (because it is an exclusive lock). In this way, the number of read locks and write locks can be calculated respectively. Its related properties and methods are defined in the Sync class.

static final int SHARED_SHIFT   = 16;
//It indicates that the actual value of state increases by 2 ^ 16 for each additional read lock
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
//The maximum number of write locks and read locks
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

//The number of threads holding the read lock. The parameter c represents the state value
//The 32-bit binary bit of state is actually the value of the higher 16 bits after the unsigned right shift of 16 bits
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
//Number of write locks, that is, the number of re entrances of write locks  
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

The calculation of the number of read locks is relatively simple, and it can be done by directly moving 16 bits to the right without symbols. Let's see how the number of write lock reentries is calculated. First, look at the value of exclusive mask, which is (1 < < 16) - 1. We use binary to express the calculation process as follows:

//Binary of 1
0000 0000 0000 0000 0000 0000 0000 0001      
//1 shift left 16 bits
0000 0000 0000 0001 0000 0000 0000 0000
 / / decrease by 1
0000 0000 0000 0000 1111 1111 1111 1111
 //Any 32-bit binary number c, and the "and" operation of the above value is the lower 16 bit value of its own c
 //Don't explain this. If it doesn't, you need to add some basic knowledge...

How is the number of lock reentry calculated?

Write lock is relatively simple, and the calculated low 16 bit value can represent the number of write lock reentry.

Reading lock is more complicated, because the high 16 bits can only indicate the number of threads holding the shared lock, which is really a lack of skill. Therefore, within Sync, a class is maintained to represent the number of times each thread has been re entered,

static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
}

Here, a counter is defined to represent the number of re entrances and tid to represent the current thread id. However, this is not enough. We need to bind HoldCounter and thread, so that we can distinguish the number of locks (reentry times) held by each thread, which requires ThreadLocal.

static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    //Overriding this method allows you to,
    //Directly using get method, initializing an object instead of a new one
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

In addition, there are some other read lock related properties defined in Sync,

//Saves the number of read locks for the current thread's reentry, which will be removed when the number of reentry is reduced to 0
//Removal should be for performance, because HoldCounter can be initialized at any time through get method
private transient ThreadLocalHoldCounter readHolds;

//Save the thread count of the last successful read lock acquisition. The purpose of this variable is to:
//If the last thread obtaining the read lock repeatedly obtains the read lock, it can be used directly without updating.
//Equivalent to cache, improve efficiency
private transient HoldCounter cachedHoldCounter;

//The first thread to acquire a read lock
private transient Thread firstReader = null;
//Thread count of the first read lock acquisition
private transient int firstReaderHoldCount;
//These two parameters are for efficiency. When only one thread obtains a read lock, it avoids looking for readHolds

The basic knowledge is finished, then the next step is the acquisition and release of locks. Let's talk about write lock first, because it's easy to understand the basis of the last exclusive lock.

Acquisition of write lock

The acquisition of the write lock starts with the lock method,

//ReentrantReadWriteLock.WriteLock.lock 
public void lock() {
    sync.acquire(1);
}
//AbstractQueuedSynchronizer.acquire
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
//Fair locks and unfair locks call the same method, defined in the Sync class
//ReentrantReadWriteLock.Sync.tryAcquire
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    //Get synchronization state
    int c = getState();
    //Write lock status
    int w = exclusiveCount(c);
    //If the synchronization status is not 0, a thread obtains a read lock or a write lock
    if (c != 0) {
        //If the synchronization status is not 0 and the write lock status is 0, it means that the read lock is occupied. Because the read and write locks are mutually exclusive, false is returned
        //If the write lock status is not 0 and the current thread has not obtained the write lock, it cannot be reentered, and false is returned
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        //If the maximum number of write locks is exceeded, an exception is thrown
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //If you go to this step, it indicates that the current thread has been reentered, then calculate the number of reenters, and return true
        setState(c + acquires);
        return true;
    }
    //This indicates that c is 0, and neither the read lock nor the write lock is occupied
    //false if the write lock should be blocked or CAS fails to obtain the write lock
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    //Make the current thread the owner of the exclusive lock
    setExclusiveOwnerThread(current);
    return true;
}   

Write lock release

Similarly, the release of a write lock starts with the unlock method,

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    //Throw an exception if the owner of the exclusive lock is not the current thread
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //state minus 1 for each release
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

As you can see, the acquisition and release of write lock is similar to the basic idea of reentrant lock. Next, it focuses on the acquisition and release of read lock, which is relatively complex.

Acquisition of read lock

tryAcquireShared

Starting with the ReadLock.lock method,

public void lock() {
    //Method calling AQS
    sync.acquireShared(1);
}

public final void acquireShared(int arg) {
    //If the tryAcquireShared method returns less than 0, the read lock acquisition fails
    if (tryAcquireShared(arg) < 0)
        //Join the synchronization queue in sharing mode, and then spin the lock
        doAcquireShared(arg);
}

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    //If a thread acquires a write lock and is not the current thread, then - 1 is returned.
    //This is because if the thread obtains the write lock first, it can re-enter and acquire the read lock again. This is lock degradation.
    //Otherwise, do not re-enter.
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    //Number of read locks
    int r = sharedCount(c);
    //If the following three conditions are met at the same time (the read thread should not be blocked, the number of read locks is less than the maximum limit, and CAS succeeds),
    //It indicates that the read lock is obtained successfully and returns 1. Then set the value of the related property.
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //If the read lock status is 0, no other thread has acquired the read lock
        if (r == 0) {
            //Set the current thread as the first thread to acquire the read lock
            firstReader = current;
            //First read thread count set to 1
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            //If the current thread is the first thread to acquire the read lock, it will be reentered, and the count will be increased by 1
            firstReaderHoldCount++;
        } else { //The read lock status is not 0 and the current thread is not firstReader
            //Last thread counter to successfully acquire read lock
            HoldCounter rh = cachedHoldCounter;
            //If the counter is empty or the tid of the counter is not the current thread id, there are two situations
            //1.rh has not been set by any thread. At this time, only one thread of firstReader obtains the read lock.
            //2.rh has been set and is not the current thread, indicating that before the current thread, except for firstReader,
            //If other threads acquire the read lock, then the current thread is the third one (at least).
            if (rh == null || rh.tid != getThreadId(current))
                //In either case, you need to create and initialize a counter for the current thread and assign it to the cachedHoldCounter
                //Because the current thread is the last thread to obtain the read lock at this time, it needs to be cached
                cachedHoldCounter = rh = readHolds.get();
            //If the current thread is the latest thread to acquire the read lock, and the count is 0,
            else if (rh.count == 0)
                //Put the number of times the rh thread holds the lock into the local thread readHolds
                readHolds.set(rh);
            //Finally, add 1 to the count 
            rh.count++;
        }
        return 1;
    }
    //If any of the above three conditions is not satisfied, call this method and try to acquire the lock again with all your strength
    return fullTryAcquireShared(current);
}

The fullTryAcquireShared method is very similar to the tryAcquireShared method, except that there is an additional spin process until a certain value (- 1 or 1) is returned.

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    //Spin until a certain value (1 or - 1) is returned
    for (;;) {
        int c = getState();
        //If the write lock status is not 0, a thread acquires the write lock
        if (exclusiveCount(c) != 0) {
            //If the thread getting the write lock is not the current thread, return - 1 
            if (getExclusiveOwnerThread() != current)
                return -1;
            //else is omitted here, which indicates that the current thread has acquired a write lock, so it needs to do lock degradation,
            //Demote write lock to read lock. Because if you don't, the thread will block here and cause a deadlock.
            //Then jump to ① to continue
            //===========//
        } else if (readerShouldBlock()) {  //The write lock is idle and the read lock should be blocked, indicating that head.next is waiting to acquire the write lock
            //Although the read lock should be blocked, it should not be immediately blocked here, because there may be a re-entry of the read lock, which needs to be reconfirmed.
            if (firstReader == current) {//Current thread is the first read lock, reentrant
                // Jump to ①
            } else {
                if (rh == null) { //Must be null the first time it loops in
                    rh = cachedHoldCounter;  //Counter of the last read lock obtained in the cache
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        //The count is 0, indicating that the current thread has not acquired the read lock
                        if (rh.count == 0)
                            //For performance reasons, if the count is 0, it needs to be removed
                            readHolds.remove();
                    }
                }
                //This indicates that the current thread is not the first reader and has not acquired the read lock, which does not meet the re-entry condition,
                //Then make sure to block, only to queue, return - 1.
                if (rh.count == 0)
                    return -1;
            }
        }
        // 1.
        //An exception is thrown if the number of read locks reaches max'count
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //CAS obtains the read lock, which is the same as the processing logic of tryAcquireShared and will not be discussed in detail
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

doAcquireShared

If tryAcquireShared fails in the end, the doAcquireShared method is executed.

private void doAcquireShared(int arg) {
    //Join synchronization queue in shared mode
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                //If the predecessor of the current node is the head node, try to acquire the read lock again
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //Set the current node as the head node and propagate the shared state
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            //Failed to get read lock, judge whether the current thread can be suspended
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

setHeadAndPropagate

private void setHeadAndPropagate(Node node, int propagate) {
    //Old head node
    Node h = head; 
    //Set the current node as a new header node
    setHead(node);

    //propagate is the return value of the tryAcquireShared method
    //If it is greater than 0, or the old header node is empty, or the ws of the header node is less than 0
    //Or the new head node is empty, or the ws of the new head node is less than 0, then the successor node is obtained
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        //If there is no successor node or the successor node is a shared node, the wake-up is performed
        if (s == null || s.isShared())
            //Release resources and wake up subsequent nodes, which will be explained later
            doReleaseShared();
    }
}

Release of read lock

tryReleaseShared

Starting with the ReadLock.unlock method,

public void unlock() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //Current thread is the first read thread
    if (firstReader == current) {
        //Set firstReader to null if its count is 1
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
        //Otherwise, count minus 1, indicating the number of reentry times minus 1
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            //Remove if the current thread count is less than or equal to 1
            readHolds.remove();
            if (count <= 0)
                //Throw an exception if the count is less than or equal to 0
                throw unmatchedUnlockException();
        }
        //Count minus 1
        --rh.count;
    }
    for (;;) {
        int c = getState();
        //Reading lock status minus 1 is actually state value minus 65536
        //Because of the high 16 bit read lock actual value, the performance difference in state is 65536
        int nextc = c - SHARED_UNIT;
        // CAS sets the latest state
        if (compareAndSetState(c, nextc))
            //Returns true if the read lock status is reduced to 0
            //Releasing a read lock has no effect on other read threads,
            //However, if both the read and write locks are idle, the waiting write threads can be allowed to continue executing
            return nextc == 0;
    }
}

doReleaseShared

If the tryreleased method returns true, it indicates that the read lock is released successfully, and the subsequent node needs to be awakened,

private void doReleaseShared() {
    for (;;) {
        //Head node
        Node h = head;
        //Indicates that there are at least two nodes in the queue
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                //If the ws of the header node is - 1, CAS sets it to 0, because after waking up the successor node,
                //It doesn't need to do anything. Failed to continue spin attempt
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;                       // loop to recheck cases
                // If CAS succeeds, wake up the successor node
                unparkSuccessor(h);
            }
            //If ws is 0, set it to - 3, indicating that the shared state can propagate backward. If it fails, continue the spin attempt
            //Later, I kept thinking, why do I need to set a state like promote, but I still haven't got a clue
            //You can see the analysis of this article, which may have some reference value:
            //https://www.cnblogs.com/micrari/p/6937995.html
            //It can only be said that the logic of Doug Lea is too meticulous. I'll figure it out later, and then add it.
            //This is a sign of unconditional communication
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        //If h is equal to the head node at the moment, it means that the head node has not changed, then the whole cycle will be skipped
        //Otherwise, if the header node has been modified by other threads, the next cycle judgment will continue
        if (h == head)                   // loop if head changed
            break;
    }
}

epilogue

About exclusive locks, it's relatively simple. But read lock involves many critical points and instantaneous states. In fact, it's not as simple as it seems on the surface. It's easy to understand. After all, the thought of Doug Lea is not something that ordinary people can figure out.

This article is just some of my personal understanding, if there is not enough explanation, welcome to clap bricks.

In fact, there are many details, this article did not expand. For example, why does the setHeadAndPropagate method judge the ws state of new and old nodes twice and what is the meaning. Why does the doReleaseShared method finally need to design a judgment like h == head? What's the meaning. It includes why to design the promote state and how to design without it.

It's a long way to go... Come to mend the hole later, this article can only be called analysis.  ̄□ ̄||

If this article is useful to you, please like, comment and forward.

Learning is boring and interesting. I am "the rain and the sky". Welcome to follow me. I can receive the article push as soon as possible.

Keywords: Java less

Added by stpra123 on Tue, 17 Mar 2020 15:55:13 +0200