[concurrent programming] readwritelock ReentrantReadWriteLock from getting started to mastering the source code

What is a read-write lock?

  • When there is no write operation, there is no problem for multiple threads to read a resource at the same time. Multiple threads are allowed to read shared resources at the same time (reading can be concurrent).
  • If a thread wants to write these shared resources, it should not allow other threads to read and write the resources (read-write, write read, write mutually exclusive).
  • In the case of more reads than writes, read-write locks can provide better concurrency and throughput than exclusive locks.
  • Read / write locks internally maintain a pair of related locks. One is used for read-only operation, which is called read lock; One is used for write operation, which is called write lock.
  • The precondition for a thread to enter the read lock: there is no write lock of other threads; There is no write request or write request, but the calling thread and the thread holding the lock are the same.
  • The precondition for a thread to enter the write lock: there is no read lock of other threads; There are no write locks for other threads.

Important characteristics of read-write lock

  • Fair selectivity: supports unfair (default) and fair lock acquisition methods. Throughput is still unfair rather than fair.
  • Reentrant: both read and write locks support thread reentry. Take the read-write thread as an example: after the read thread obtains the read lock, it can obtain the read lock again. After obtaining the write lock, the write thread can obtain the write lock again, and can also obtain the read lock.
  • Lock demotion: follow the sequence of obtaining a write lock, obtaining a read lock, and finally releasing a write lock. A write lock can be demoted to a read lock.

Use of ReentrantReadWriteLock

  • ReentrantReadWriteLock is a reentrant read-write lock implementation class.
  • Inside it, a pair of related locks are maintained, one for read-only operation and the other for write operation.
  • As long as there is no Writer thread, the read lock can be held by multiple Reader threads at the same time.
  • In other words, write locks are exclusive and read locks are shared.

Precautions for using ReentrantReadWriteLock

  • Read lock does not support conditional variables
  • Upgrade during reentry is not supported: obtaining a write lock while holding a read lock will lead to permanent wait for acquisition
  • Demotion is supported during reentry: you can obtain the read lock when holding the write lock

Application scenario of ReentrantReadWriteLock

  • Scenarios with shared variables and more reading and less writing!

Lock degradation

  • Lock degradation refers to the degradation of a write lock to a read lock. The main purpose is to prevent the data from not brushing back to the main memory, resulting in inconsistent values obtained by other threads!
  • No lock upgrade: because a large number of threads acquire read locks, one thread becomes a write lock and changes the data, which is unknown to other threads, resulting in inconsistent values obtained by other threads!
  • If the current thread has a write lock, then releases it, and finally obtains a read lock, this segmented process cannot be called lock degradation.
  • Lock degradation refers to the process of holding the (currently owned) write lock, obtaining the read lock, and then releasing the (previously owned) write lock.
  • Lock degradation can help us get the modified results of the current thread without being damaged by other threads, so as to prevent the loss of updates.

How to use ReentrantReadWriteLock lock lock

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

// Processing and reading operations are carried out by colleagues; Reading, writing, reading and writing will be carried out at the same time!
public class ReentrantReadWriteLockTest1 {
	static Map<String, Object> map = new HashMap<String, Object>();
	static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
	static Lock r = rwl.readLock();
	static Lock w = rwl.writeLock();

	// Read operation, with read lock
	public final Object read(String key) throws InterruptedException {
		r.lock();
		try {
			System.out.println(System.currentTimeMillis() + "Read lock acquisition succeeded...");
			Thread.sleep(6000);
			System.out.println(System.currentTimeMillis() + "Read lock execution completed...");
			return map.get(key);
		} finally {
			System.out.println(System.currentTimeMillis() + "Read lock release...");
			r.unlock();
		}
	}

	// Write operation, write lock
	public final Object write(String key, Object value) throws InterruptedException {
		w.lock();
		try {
			System.out.println(System.currentTimeMillis() + "Write lock obtained successfully...");
			Thread.sleep(6000);
			System.out.println(System.currentTimeMillis() + "Write lock execution completed...");
			return map.put(key, value);
		} finally {
			System.out.println(System.currentTimeMillis() + "Write lock release...");
			w.unlock();
		}
	}

	public static void main(String[] args) {
		ReentrantReadWriteLockTest1 lock = new ReentrantReadWriteLockTest1();

		new Thread(() -> {
			try {
				lock.read("1");
				// lock.write("1","2");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}).start();

		new Thread(() -> {
			try {
				lock.read("1");
				// lock.write("1","2");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}).start();
	}
}

Use of lock degradation

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReentrantReadWriteLockTest1 {

	private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
	private final Lock readLock = rwl.readLock();
	private final Lock writeLock = rwl.writeLock();
	private volatile boolean update = false;

	public void test() {
		readLock.lock();
		if (!update) {
			// The read lock must be released first
			readLock.unlock();
			// Lock degradation starts from write lock acquisition to
			writeLock.lock();
			try {
				if (!update) {
					// TODO data preparation process (omitted)
					update = true;
				}
				// =====This line of code is the start code of lock degradation=====
				readLock.lock();
			} finally {
				writeLock.unlock();
			}
			// When the lock demotion is completed, the write lock is demoted to the read lock
		}
		try {
			// Process of TODO using data (omitted)
		} finally {
			// =====This line of code is the end code of lock degradation=====
			readLock.unlock();
		}
	}
}

Flow chart of ReentrantReadWriteLock source code

ReentrantReadWriteLock read / write status source code analysis: a variable maintains multiple states!

// Shared shift
static final int SHARED_SHIFT   = 16;
// Shared unit: 00000000 00000001 00000000 00000000
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
// Maximum number of shares: 00000000 00000000 11111111111 11111111
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
// Exclusive unit: 00000000 00000000 11111111111 11111111
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count  */
// Calculate the number of shares (read lock): the high 16 bits represent. Read locks can be held by multiple threads at the same time. The read locks held by each thread support reentry. Therefore, the number of read locks held by each thread needs to be counted separately, which requires the HoldCounter counter counter
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count  */
// Calculate the number of exclusive reentries (write locks): the lower 16 bits represent.
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

Source code analysis of readlock counter HoldCounter of ReentrantReadWriteLock

/**
 * The essence of read lock is shared lock. One operation of shared lock is equivalent to the operation of HoldCounter counter counter.
 * If the shared lock is obtained, the counter is + 1. If the shared lock is released, the counter is - 1. Only after the thread obtains the shared lock can it release and re-enter the shared lock.
 * HoldCounter Is an object used to record the number of readlock reentries
 */
static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
}

/**
 * Through the ThreadLocalHoldCounter class, HoldCounter is bound to the thread.
 * HoldCounter Is a counter of the bound thread, and ThreadLocalHoldCounter is the ThreadLocal bound by the thread.
 * ThreadLocalHoldCounter Is a ThreadLocal variable, which is used to store the readlock reentry number object of other threads that are not the first thread to obtain the readlock
 */
static final class ThreadLocalHoldCounter
	extends ThreadLocal<HoldCounter> {
	public HoldCounter initialValue() {
	    return new HoldCounter();
	}
}

Source code analysis of ReentrantReadWriteLock construction method

/**
 * Parameterless construction directly calls parameterless construction, which is not a fair mode!
 */
public ReentrantReadWriteLock() {
    this(false);
}

/**
 * Incoming fair mode
 */
public ReentrantReadWriteLock(boolean fair) {
    // The difference is the fair model
    sync = fair ? new FairSync() : new NonfairSync();
    // Initialize read lock
    readerLock = new ReadLock(this);
    // Initialize write lock
    writerLock = new WriteLock(this);
}

Analysis on the source code of ReentrantReadWriteLock

/**
 * Directly call AQS to acquire exclusive lock logic
 */
public void lock() {
    sync.acquire(1);
}

/**
 * AQS Get exclusive lock logic
 */
public final void acquire(int arg) {
    // tryAcquire source code is at the bottom and acquirequeueueueued is in AQS, which is consistent with the implementation of ReentrantLock.
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

/**
 * Attempt to acquire lock
 */
protected final boolean tryAcquire(int acquires) {
    // Get current thread
    Thread current = Thread.currentThread();
    // Get current status
    int c = getState();
    // Gets the status of the write lock
    int w = exclusiveCount(c);
    // The total status is not 0, and there may be some reading
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // The write lock is 0, or the current thread is not the thread holding the lock. Returns an attempt to acquire a lock that failed
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // Too many write locks, throw exception!
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        // After execution, the current thread performs reentry processing
        setState(c + acquires);
        // Returns the success of the attempt to acquire the lock
        return true;
    }
    // When there are queued in the synchronization queue and cannot be re entered, the return attempt to obtain the lock fails
    // cas failed to change the actual state, and returned that the attempt to obtain the lock failed
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    // The code execution band here indicates that the lock can be obtained
    // Set the thread holding the write lock as the current thread
    setExclusiveOwnerThread(current);
    // Returns the success of the attempt to acquire the lock
    return true;
}

/**
 * Fair lock determines whether there is a queue in the queue
 */
final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
}

/**
 * The non fair lock determines whether there is a queue in the queue and directly returns false
 */
final boolean writerShouldBlock() {
    return false; // writers can always barge
}

/**
 * Judge whether there is a queue in the queue
 */
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    // When not initialized, the value at the head and tail of the queue is null, equal - not satisfied
    // When there is only one, the head and tail are equal - not satisfied
    // The next description is null
    // The next thread is the current thread, and the re-entry is satisfied
    // Simply put: if there is only one linked list or the linked list is empty, return false. There are multiple in the linked list. If the reentry mechanism is not satisfied, false is returned. Only when there are multiple data in the linked list and the holding thread is not the current thread will it return true!
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

ReentrantReadWriteLock write lock release source code analysis

/**
 * The release of write lock directly calls the release logic of AQS.
 */
public void unlock() {
    sync.release(1);
}

/**
 * Release lock logic: the method of read lock and write lock calls the same method
 */
public final boolean release(int arg) {
    // Attempt to release lock
    if (tryRelease(arg)) {
        // The lock is released successfully, and the header node is obtained
        Node h = head;
        // The head node is not null and the current node is not in the initialization state
        if (h != null && h.waitStatus != 0)
            // unpark wakes up the next thread in the queue
            unparkSuccessor(h);
        // Return to unlock successfully
        return true;
    }
    // Return unlocking failure
    return false;
}

/**
 * ReentrantReadWriteLock Write lock attempt to release lock logic
 */
protected final boolean tryRelease(int releases) {
    // Not the current thread, throw an exception directly!
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // Get enough unlock status
    int nextc = getState() - releases;
    // Get whether the low order can change to 0
    boolean free = exclusiveCount(nextc) == 0;
    // If it can be changed to 0 (if it is not changed to 0, it is reentrant), unbind the current thread
    if (free)
        setExclusiveOwnerThread(null);
    // Set the new status in
    setState(nextc);
    // Returns whether the unlocking is successful
    return free;
}

Analysis on the source code of ReentrantReadWriteLock

/**
 * The logic of directly calling AQS to obtain the shared lock
 */
public void lock() {
    sync.acquireShared(1);
}

/**
 * Get shared lock
 */
public final void acquireShared(int arg) {
    // Attempt to acquire lock
    if (tryAcquireShared(arg) < 0)
        // This loop gets the shared lock
        doAcquireShared(arg);
}

/**
 * Try to get the lock
 */
protected final int tryAcquireShared(int unused) {
    // Gets the current thread
    Thread current = Thread.currentThread();
    // Gets the status of the current lock object
    int c = getState();
    // The exclusive lock is not 0 and the thread holding the write lock is not the current thread! Returns a negative number (no lock obtained)
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // Get the read state of the current lock
    int r = sharedCount(c);
    // There are no queued in the queue
    // And the number of reads is not greater than the maximum number
    // And the current lock can be cas successfully changed (obtained)
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // Read lock equals 0, the first thread
        if (r == 0) {
            // Set the thread currently holding the lock as the current thread
            firstReader = current;
            // Set the number of shares held to 1
            firstReaderHoldCount = 1;
        // If the first thread holding the lock is the current thread, the number of reentries is increased by 1
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            // Not the first thread will execute the logic here
            // Get the counter of the current thread
            HoldCounter rh = cachedHoldCounter;
            // The counter of the current thread is empty or the current counter does not belong to the current thread: get this counter
            if (rh == null || rh.tid != getThreadId(current))
                // Create a counter object for the current thread
                cachedHoldCounter = rh = readHolds.get();
            // The number of current counters is 0 and there is no read lock
            else if (rh.count == 0)
                // Set value for counter
                readHolds.set(rh);
            // Number of counters plus 1
            rh.count++;
        }
        // Return attempt succeeded
        return 1;
    }
    // Full attempt to acquire shared lock
    return fullTryAcquireShared(current);
}

/**
 * Full attempt to acquire shared lock
 */
final int fullTryAcquireShared(Thread current) {
    // Define counter
    HoldCounter rh = null;
    for (;;) {
        // Gets the status of the current lock
        int c = getState();
        // The exclusive lock state is not 0, indicating that there is an exclusive lock
        if (exclusiveCount(c) != 0) {
            // The thread holding the write lock is not the current thread and returns a negative number (no lock was obtained)
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        // There are queued in the queue
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            // Is the current thread, skip
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                // The counter is empty
                if (rh == null) {
                    // Assign a value to the counter
                    rh = cachedHoldCounter;
                    // The counter of the current thread is empty or the current counter does not belong to the current thread: get this counter
                    if (rh == null || rh.tid != getThreadId(current)) {
                        // Assign a new counter
                        rh = readHolds.get();
                        // The number of counters is 0. There is no read lock. Kill the current counter
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // The number of counters is 0, and there is no lock read. Failure to obtain the lock is returned
                if (rh.count == 0)
                    return -1;
            }
        }
        // If the number of shares is equal to the maximum, an exception is thrown
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // cas successfully changed the current lock (acquired lock)
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // The number of shared locks (read locks) is equal to 0
            if (sharedCount(c) == 0) {
                // Set the thread currently holding the lock as the current thread
                firstReader = current;
                // Set the number of shares held to 1
                firstReaderHoldCount = 1;
            // If the first thread holding the lock is the current thread, the number of reentries is increased by 1
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                // The counter of the current thread is empty: copy this counter
                if (rh == null)
                    rh = cachedHoldCounter;
                // The counter of the current thread is empty or the current counter does not belong to the current thread: get this counter
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                // The number of current counters is 0 and there is no read lock
                else if (rh.count == 0)
                    // Set value for counter
                    readHolds.set(rh);
                // Number of counters plus 1
                rh.count++;
                // Set counters for the current thread
                cachedHoldCounter = rh; // cache for release
            }
            // Returns the success of obtaining the lock
            return 1;
        }
    }
}

/**
 * For non fair locks, check whether there are queued in the queue
 */
final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}

/**
 * Fair lock: check whether there are queued in the queue
 */
final boolean readerShouldBlock() {
    return hasQueuedPredecessors();
}

/**
 * This method is to make subsequent methods have the possibility of obtaining locks and prevent waiting all the time
 */
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    // The head node of the returned synchronization queue is not empty
    // And the next node of the head node is not empty
    // And the next node of the head node is not eutectoid mode
    // And the thread of the next node of the head node is not empty
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}

/**
 * Loop to get shared lock
 */
private void doAcquireShared(int arg) {
    // Add to the synchronization queue and return to the current node. addWaiter is implemented in AQS in the same way as ReentrantLock
    final Node node = addWaiter(Node.SHARED);
    // Define failure flag bit true
    boolean failed = true;
    try {
        // Define interrupt flag as false
        boolean interrupted = false;
        for (;;) {
            // Get the precursor node of the current node
            final Node p = node.predecessor();
            // If the front node is the head node
            if (p == head) {
                // Try to get the lock
                int r = tryAcquireShared(arg);
                // Lock acquisition succeeded
                if (r >= 0) {
                    // Set the header node and linked list. setHeadAndPropagate is implemented in AQS in the same way as CountDownLatch
                    setHeadAndPropagate(node, r);
                    // Cancel pointing to facilitate GC to recycle
                    p.next = null; // help GC
                    // The interrupt state has changed. Set the current thread to be interrupted
                    if (interrupted)
                        selfInterrupt();
                    // Change failure flag bit false
                    failed = false;
                    // End cycle
                    return;
                }
            }
            // houldParkAfterFailedAcquire and parkAndCheckInterrupt are implemented in AQS in the same way as CountDownLatch
            // The code executes here, indicating that the attempt to obtain the lock failed.
            // The preparation operation before blocking is successful (successful when the status is - 1)
            // Block the thread and wait for him to wake up. After waking up, return to the interrupt state of the thread!
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // When the above code throws an exception, it will execute the logic here
        if (failed)
            // Cancel the logic to acquire the lock. cancelAcquire is implemented in AQS in the same way as CountDownLatch
            cancelAcquire(node);
    }
}

ReentrantReadWriteLock read lock unlock source code analysis

/**
 * Directly call AQS to contact the logic of shared lock
 */
public void unlock() {
    sync.releaseShared(1);
}

/**
 * Logic for releasing shared locks
 */
public final boolean releaseShared(int arg) {
    // Try to release the shared lock
    if (tryReleaseShared(arg)) {
        // Wake up subsequent nodes and ensure continuous propagation. doReleaseShared is implemented in AQS in the same way as CountDownLatch
        doReleaseShared();
        // The whole returns true, indicating that the shared lock is released successfully
        return true;
    }
    // java specification: there must be a return value, which will not be executed here!
    return false;
}

/**
 * Try to release the shared lock
 */
protected final boolean tryReleaseShared(int unused) {
    // Gets the current thread
    Thread current = Thread.currentThread();
    // The first thread is the current thread
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        // When the number of reentries is reduced by one and there is only one, it will directly become null
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        // Not the first thread, get its counter
        HoldCounter rh = cachedHoldCounter;
        // The counter of the current thread is empty or the current counter does not belong to the current thread: get this counter
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        // Get the number of counters
        int count = rh.count;
        // Remove one at a time
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        // Counter minus one
        --rh.count;
    }
    for (;;) {
        // Get the current thread status
        int c = getState();
        // Calculate the next value that should become (minus the 1 of the upper 16 bits)
        int nextc = c - SHARED_UNIT;
        // cas release lock
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            // Releasing the read lock has no effect on the read lock, but if both the read lock and the write lock are now available, waiting writers may be allowed to continue.
            return nextc == 0;
    }
}

Concluding remarks

  • Get more pre knowledge articles of this article and new valuable articles. Let's become architects together!
  • Paying attention to the official account gives you a deep understanding of MySQL.
  • Pay attention to official account and keep continuous understanding of concurrent programming every day!
  • Pay attention to the official account, and follow the continuous and efficient understanding of spring source code.
  • This official account is not advertising!!! Update daily!!!

Keywords: Java Concurrent Programming

Added by Kaboom on Fri, 04 Feb 2022 15:46:08 +0200