Detailed implementation of the java lock read-write lock ReentrantReadWriteLock

1. ReentrantReadWriteLock feature

1.1. Fairness

  • Unfair Lock (default): This is the same as ReentrantLock's unfairness, since there is no lock competition between read threads, there is no fairness or unfairness in the read operation. During the write operation, one or more read or write operations are postponed because the write operation may acquire the lock immediately.Therefore, the throughput of unfair locks is higher than that of fair locks.
  • Fair Lock uses the AQS CLH queue to assign a write lock to the write thread that has the longest wait time when releasing the currently held lock (read or write), provided that the write thread has a longer wait time than all the read threads.If the same thread holds a write lock or a write thread is already waiting, all threads that attempt to obtain a fair lock, including read and write threads, will be blocked until the first write thread releases the lock.If a read thread waits longer than a write thread, the set of read threads acquires locks once the last write thread releases the lock.

1.2, Reentrance

  • Read-write locks allow read and write threads to reacquire read or write locks in the order in which they are requested.Read threads can acquire reentrant locks only if the write thread releases the lock.
  • Write threads can acquire read locks again after acquiring them, but read threads cannot acquire write locks after acquiring read locks.
  • Additionally, read-write locks support a maximum of 65535 recursive write locks and 65535 recursive read locks.(The source code uses an int type of data to record, the number of read locks for high 16-bit records and the number of write locks for low 16-bit records)

1.3, Lock Degradation

Write threads can acquire read locks after acquiring them, and then release them, thereby changing from a write lock to a read lock, which implements the feature of lock degradation.

1.4, Lock upgrade

Read locks cannot be upgraded directly to write locks.Since acquiring a write lock requires all read locks to be released, a deadlock occurs when two read locks attempt to acquire a write lock without releasing the read lock.

1.5, Interruption of lock acquisition

Both read and write locks support interruption during lock acquisition.This is consistent with the exclusive lock.

1.6. Conditional variable

Write locks provide support for conditional variables, which are consistent with exclusive locks, but read locks do not allow acquisition of conditional variables, resulting in an UnsupportedOper ationException exception.

1.7, Reentries

The maximum number of read and write locks can only be 65535 (including reentrants), respectively.

1.8. Monitoring

ReentrantReadWriteLock supports some methods for determining whether to keep or contend for locks.These methods are designed to monitor system state rather than synchronize control.

2. Implementation principle of ReentrantReadWriteLock

2.1, ReentrantReadWriteLock Inheritance and Aggregation Relationships

The following are the inheritance and aggregation implementation relationships for ReentrantReadWriteLock.

ReentrantReadWriteLock Inheritance and Aggregation Relationships.png

ReentrantReadWriteLock inherits the ReadWriteLock interface and is implemented through ReadLock and WriteLock.ReadLock and WriteLock implement read-write lock through aggregation, while AQS implements lock at the bottom.

2.2. Sync implementation of synchronous lock

Sync inherits from AQS and implements interfaces for acquiring exclusive locks and sharing locks, while saving read and write counts of locks through lock state.

//Synchronization lock implementation, inherited from AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 6317671515068378041L;

    //Save the number of lock times for shared and exclusive locks in an int type.
    //The high 16 digits represent the occupancy count of shared locks and the low 16 digits represent the occupancy count of exclusive locks
    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    //Occupancy count for shared locks
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    //Occupancy count for exclusive locks
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    //Count of locks
    static final class HoldCounter {
        int count = 0;
        // Thread id
        final long tid = getThreadId(Thread.currentThread());
    }

    //Save Thread Lock Count via ThreadLocal
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }

    //Save Thread Read-Write Count Data
    private transient ThreadLocalHoldCounter readHolds;

    //Save the first thread to acquire a read lock and lock count
    private transient Thread firstReader = null;
    private transient int firstReaderHoldCount;

    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // ensures visibility of readHolds
    }

   //Read threads should be blocked
    abstract boolean readerShouldBlock();

   //Write thread should be blocked
    abstract boolean writerShouldBlock();

    //Release exclusive lock
    protected final boolean tryRelease(int releases) {
        //Does the current thread hold an exclusive lock?
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
            
        //Decrease the count of exclusive locks if 0 means that the wireless process has exclusive locks
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }

    //Acquire exclusive locks
    protected final boolean tryAcquire(int acquires) {
         //1. If the read or write lock count is not zero and the current thread does not own an exclusive lock, acquisition of the lock fails.
         //2. Throw an exception if the exclusive lock count is greater than the maximum (65535);
         //3. If the current thread can acquire a read lock and the CAS set state succeeds, the acquire lock succeeds;
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            setState(c + acquires);
            return true;
        }
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }

    //Release shared lock
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        //If the first thread when locking is the current thread, the first Reader and first ReaderHoldCount are processed;
        if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } 
        //Is the first thread to lock not the current thread? Remove the cache counter cachedHoldCounter if its thread is not the current thread;
        //Get the lock counter from readHolds
        else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            //Lock counter <=1 indicates that the current thread last released the lock and then no longer held it
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        //CAS Modify Lock State
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // Releasing the read lock has no effect on readers,
                // but it may allow waiting writers to proceed if
                // both read and write locks are now free.
                return nextc == 0;
        }
    }

    
    //Acquire shared locks
    protected final int tryAcquireShared(int unused) {
        //1. If the exclusive lock count is not equal to 0 and the current thread does not hold an exclusive lock, acquiring the read lock fails.
        Thread current = Thread.currentThread();
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        int r = sharedCount(c);
        //2. If the synchronization queue is empty or the current thread is the header node (!readerShouldBlock())
        //The shared lock count is less than the maximum value, and CAS successfully updates the lock state; the associated read lock count is updated
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            //Only one read lock?Only the threads and counts of the first read lock are updated    
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            }
            //Is the first read lock the current thread?
             else if (firstReader == current) {
                firstReaderHoldCount++;
            } 
            //Update Lock Count Cache
            else {
                //Read the last thread information to acquire the lock;
                //If the thread that last acquired the lock is the current thread, put the count cache in ThreadLocal;
                //Otherwise, get the lock count from ThreadLocal and put it in the cache
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        //Spin Retry
        return fullTryAcquireShared(current);
    }

    
    //CAS acquires a shared lock with approximately the same logic as tryAcquireShared()
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
            } else if (readerShouldBlock()) {
                // Make sure we're not acquiring read lock reentrantly
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }

    //Attempting to acquire a write lock
    final boolean tryWriteLock() {
        Thread current = Thread.currentThread();
        int c = getState();
        //Current Lock Count Value
        if (c != 0) {
            //The exclusive lock count is 0 () indicating a shared lock) or the current thread is not an exclusive lock thread
            int w = exclusiveCount(c);
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
        }
        //CAS changes lock state (possibly concurrently with other threads CAS changes lock state)
        if (!compareAndSetState(c, c + 1))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }

    //Attempting to acquire a read lock
    final boolean tryReadLock() {
        Thread current = Thread.currentThread();
        for (;;) {
            int c = getState();
            //The exclusive lock count is not zero, and the exclusive lock thread is not the current thread, acquiring the read lock failed
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return false;
            int r = sharedCount(c);
            if (r == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            //CAS Set Lock State
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return true;
            }
        }
    }
}

2.3, ReadLock and WriteLock implementations

Acquiring read locks and acquiring write locks in ReadLock and WriteLock are accomplished by calling related Sync methods directly.

ReadLock implementation:

public static class ReadLock implements Lock {
    private final Sync sync;
    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }

    public void lock() {
        sync.acquireShared(1);
    }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean tryLock() {
        return sync.tryReadLock();
    }
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    public void unlock() {
        sync.releaseShared(1);
    }
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }
}

WriteLock implementation:

public static class WriteLock implements Lock{
    private final Sync sync;

    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    public void lock() {
        sync.acquire(1);
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    public boolean tryLock( ) {
        return sync.tryWriteLock();
    }

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }
    public int getHoldCount() {
        return sync.getWriteHoldCount();
    }
}

Keywords: less

Added by knobby2k on Fri, 19 Jul 2019 03:23:22 +0300