locks of Java Concurrent Programming


Locks are used to control how multiple threads access shared resources. Generally speaking, a Lock can prevent multiple threads from accessing shared resources at the same time (but some locks can allow multiple threads to access shared resources concurrently, such as read-write locks). Before the emergence of the Lock interface, Java programs used the synchronized keyword to realize the Lock function. After Java SE 5, the Lock interface (and related implementation classes) was added to the contract to realize the Lock function. It provides a synchronization function similar to the synchronized keyword, but the Lock needs to be explicitly obtained and released during use. Although it lacks the convenience of implicitly acquiring and releasing locks (provided by synchronized blocks or methods), it has the synchronization characteristics that many synchronized keywords such as Lock acquisition and release operability, interruptible acquisition Lock and timeout acquisition Lock do not have.

Using the synchronized keyword will implicitly acquire the lock, but it solidifies the acquisition and release of the lock, that is, acquire it first and then release it. Although this method simplifies the management of synchronization, the scalability is not as good as the lock acquisition and release shown.

How to use the lock

lock.lock();
try{
} finally{
    lock.unlock();
}

The purpose of releasing the lock in the finally block is to ensure that the lock can be released after it is obtained.

Do not write the process of obtaining locks in the try block, because if an exception occurs when obtaining locks (the implementation of custom locks), the exception will be thrown and the lock will be released for no reason.

Reentrant lock ReentrantLock

Reentrant locks, as the name suggests, are locks that support reentry. It indicates that the lock can support repeated locking of resources by a thread. In addition, the of the lock also supports fair and unfair selection when acquiring the lock.

The synchronized keyword implicitly supports re-entry, such as a synchronized modified recursive method. When the method is executed, the execution thread can still obtain the lock several times after obtaining the lock.

Although ReentrantLock does not support implicit re-entry like the synchronized keyword, when calling the lock() method, the thread that has obtained the lock can call the lock() method again to obtain the lock without being blocked.

ReentrantLock provides a constructor to control whether the lock is fair:

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

How to implement reentrant

ReentrantLock achieves lock acquisition and release by combining a custom synchronizer (AQS).

The thread that has acquired the lock acquires the lock again

Take the non fairness (default) implementation as an example:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

Determine whether the acquisition operation is successful by judging whether the current thread is the thread that obtains the lock. If the thread that obtains the lock requests again, increase the synchronization status value and return true, indicating that the synchronization status is obtained successfully.

Final release of lock

The thread that successfully acquires the lock acquires the lock again, but only increases the synchronization state value, which requires ReentrantLock to reduce the synchronization state value when releasing the synchronization state:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

If the lock is acquired n times, the first (n-1) tryRelease(int releases) method must return false, and only when the synchronization state is completely released can it return true. It can be seen that this method takes whether the synchronization status is 0 as the condition for final release. When the synchronization status is 0, the occupying thread is set to null and returns true, indicating that the release is successful.

The difference between fair and unfair lock acquisition

Obtain locks fairly:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

The way they obtain the lock is: first, judge whether the state is 0. If it is 0, it means that the lock has not been occupied by the thread, then CAS will add the value of state by one (compareandsetstate (0, acquire)), and then set the thread as the thread with access permission to the lock (setExclusiveOwnerThread(current)); If the state is not equal to 0, it indicates that the lock has been held by a thread object. Next, judge whether the thread that wants to obtain the lock already owns the lock (judge reentry). If so, add one to the state value in CAS and return true, indicating that the reentry is successful in obtaining the lock. Otherwise, it directly returns false, indicating that obtaining the lock failed.

The only difference between obtaining the lock tryAcquire(int acquires) in fair mode and obtaining the lock nonfairTryAcquire(int acquires) in unfair mode is that when the state is 0, that is, the lock has not been occupied by the thread, the hasQueuedPredecessors() method will be called first to judge whether the current node added to the synchronization queue has a precursor node. If this method returns true, It means that a thread requests to acquire a lock earlier than the current thread, so it needs to wait for the precursor thread to acquire and release the lock before continuing to acquire the lock.

Reentrant read / write lockreentrantreadwritelock

ReentrantLock and synchronized are exclusive locks. These locks allow only one thread to access at the same time, while read-write locks allow multiple read threads to access at the same time. However, when writing threads access, all read threads and other write threads are blocked. The read-write lock maintains a pair of locks, a read lock and a write lock. By separating the read lock and the write lock, the concurrency is greatly improved compared with the general exclusive lock.

ReentrantReadWriteLock implements the ReadWriteLock interface. ReadWriteLock only defines two methods to obtain read lock and write lock, namely readLock() method and writeLock() method; In addition to the interface methods, ReentrantReadWriteLock also provides some methods for external monitoring
Method of internal working state:

ReentrantReadWriteLock has five internal classes. Sync inherits from AQS, NonfairSync and FairSync, and inherits from the sync class (the Boolean value passed in by the constructor determines which sync instance to construct); ReadLock and WriteLock implement the Lock interface:

Design of read-write state

Read lockreadlock is a shared lock:

public void lock() {
    sync.acquireShared(1);
}

Write lockwritelock is an exclusive lock:

public void lock() {
    sync.acquire(1);
}

However, an AQS object has only one state. If you want to represent two different states, you need to use the "bit by bit cut" method to cut the binary bits of an integer state into two parts. The high 16 bits represent reading and the low 16 bits represent writing (for example, this figure shows that a thread has obtained a write lock, re entered it twice, and also obtained a read lock twice in succession):

Assume that the current synchronization status value is c

Get write status (get write lock quantity exclusiveCount): C & ((1 < < 16) - 1) (0x0000ffff / 65535) (erase all the upper 16 bits)

static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

Get read status (get the number of read locks sharedCount): C > > > 16 (unsigned shift right 16 bits)

static final int SHARED_SHIFT   = 16;
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

Write status plus one: c + 1

compareAndSetState(c, c + 1)

Read state plus one: c + (1 < < 16)

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
compareAndSetState(c, c + SHARED_UNIT)

When c is not equal to 0, when the write state is equal to 0, the read state must be greater than 0, that is, the read lock has been obtained.

Acquisition and release of write lock

A write lock is an exclusive lock that supports reentry. If the current thread has acquired a write lock, the write state is increased. If the read lock has been acquired when the current thread acquires the write lock (the read state is not 0) or the thread is not a thread that has acquired the write lock, the current thread enters the waiting state.

obtain

The acquisition of write lock is realized by tryAcquire():

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    //Number of write threads (i.e. the number of reentries to acquire exclusive locks)
    int w = exclusiveCount(c);
    //Current synchronization state= 0, indicating that another thread has obtained a read lock or write lock
    if (c != 0) {
        //The total number of locks is not 0, but the number of write locks is 0, indicating that the number of read locks is not 0. false is returned
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

In addition to the reentry condition (the current thread is the thread that has obtained the write lock), this method adds a judgment on whether the read lock exists. If there is a read lock, the write lock cannot be acquired. The reason is that the read-write lock should ensure that the operation of the write lock is visible to the read lock. If the read lock is allowed to acquire the write lock when it has been acquired, other running read threads cannot perceive the operation of the current write thread. Therefore, the write lock can only be obtained by the current thread after other read threads release the read lock. Once the write lock is obtained, the subsequent access of other read-write threads is blocked.

release

The release process of write lock is basically similar to that of ReentrantLock. Each release reduces the write state. When the write state is 0, it indicates that the write lock has been released, so that the waiting read-write thread can continue to access the read-write lock. At the same time, the modification of the previous write thread is visible to subsequent read-write threads.

protected final boolean tryRelease(int releases) {
    //If the lock holder is not the current thread, an exception is thrown
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //Number of new threads to write lock
    int nextc = getState() - releases;
    //If the exclusive mode reentry number is 0, the exclusive mode is released
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        //If the number of new threads of the write lock is 0, the lock holder is set to null
        setExclusiveOwnerThread(null);
    //Sets the number of new threads to write lock
    //Update exclusive reentry count whether exclusive mode is released or not
    setState(nextc);
    return free;
}

Acquisition and release of read lock

obtain

Read lock is a shared lock that supports re-entry. It can be acquired by multiple threads at the same time. When no other write thread accesses (or the write state is 0), the read lock will always be acquired successfully, and all you do is to increase the read state. If the current thread has acquired a read lock, the read state is increased. If the current thread obtains the read lock and the write lock has been obtained by other threads, it enters the waiting state.

protected final int tryAcquireShared (int unused){
    for (;;) {
        int c = getState();
        int nextc = c + (1 << 16);
        if (nextc < c)
            throw new Error("Maximum lock count exceeded");
        if (exclusiveCount(c) != 0 && owner != Thread.currentThread())
            return -1;
        if (compareAndSetState(c, nextc))
            return 1;
    }
}

If other threads have acquired the write lock (exclusivecount (c)! = 0), the current thread fails to acquire the read lock and enters the waiting state (returns - 1). If the current thread obtains the write lock or the write lock is not obtained, the current thread (thread safety, guaranteed by CAS) increases the read state and successfully obtains the read lock (return 1).

release

Each release of the read lock (thread safe, multiple read threads may release the read lock at the same time) reduces the read state, and the reduced value is (1 < < 16).

static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);

for (;;) {
    int c = getState();
    int nextc = c - SHARED_UNIT;
    if (compareAndSetState(c, nextc))
        return nextc == 0;
}

Lock degradation

Lock demotion means that a write lock is demoted to a read lock. If the current thread has a write lock, then releases it, and finally obtains a read lock, this segmented process cannot be called lock degradation. Lock demotion refers to the process of obtaining a read lock after obtaining a write lock, and then releasing the previously owned write lock.

ReentrantReadWriteLock summary

  • When a thread holds a read lock, it cannot acquire a write lock (because when acquiring a write lock, if it finds that the current read lock is occupied, it immediately fails to acquire it, regardless of whether the read lock is held by the current thread or not)
  • When a thread holds a write lock, it can continue to acquire a read lock (if a write lock is found to be occupied when acquiring a read lock, the acquisition will fail only if the write lock is not occupied by the current thread)
  • If a thread wants to hold a write lock and a read lock at the same time, it must first obtain a write lock and then obtain a read lock
  • Write locks can be "downgraded" to read locks. For a thread that obtains a write lock, it must monopolize the read-write lock, so it can continue to obtain the read lock. When it obtains both the write lock and the read lock, it can release the write lock first and continue to hold the read lock. In this way, a write lock will be "degraded" to a read lock
  • Read locks cannot be "promoted" to write locks. When a thread acquires a read lock, other threads may also hold a read lock, so the thread that acquires the read lock cannot be "upgraded" to a write lock

LockSupport

LockSupport is a programming tool class, which is mainly used to block and wake up threads.

There are actually two main methods inside it: Park (stop blocking thread) and unpark (start waking thread). They also call the park and unpark methods of the UnSafe class.

characteristic

  1. The lock releasing method unpark() can take precedence over the lock obtaining method call park():

  2. Release lock unpark() can be called multiple times

  3. LockSupport locks are not reentrant. Unlike synchronized locks, they are reentrant through the of ObjectWaiter_ Unlike ReentrantLock, count adds and subtracts by adding and subtracting AQS state. It has only one "license". If this license is occupied by one thread, not only other threads can't get it, but even threads that already have a "license" can't get it again. Only when the thread releases the "license" Before you can re acquire the license.

The difference between park/unpark and wait/notify

  1. wait() and notify() are both methods in Object. You must obtain the lock Object before calling these two methods, but park() does not need to obtain the lock of an Object to lock the thread. That is, LockSupport does not need to be in the synchronization code block. Therefore, there is no need to maintain a shared synchronization Object between threads, which realizes the decoupling between threads.
  2. notify() can only randomly select a thread to wake up, but cannot wake up the specified thread. unpark() can wake up a specified thread.

Write an example code. Thread A executes a section of business logic and calls wait to block itself. The main thread calls the notify method to wake up thread a, and then thread a prints the results of its own execution.

public class Main {
    public static void main(String[] args) {
        final Object lock = new Object();
        Thread A = new Thread(() -> {
            int sum = 0;
            for (int i = 0; i < 10; i++) {
                sum += i;
            }
            try {
                lock.wait();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(sum);
        });
        A.start();
        Thread.sleep(1000);
        lock.notify();
    }
}

This code will report illegal monitor exception IllegalMonitorStateException:
Because the wait(), notify()/notifyAll() methods can only be used in synchronous code blocks:

public class Main {
    public static void main(String[] args) {
        final Object lock = new Object();
        Thread A = new Thread(() -> {
            int sum = 0;
            for (int i = 0; i < 10; i++) {
                sum += i;
            }
            try {
                synchronized (lock) {
                    lock.wait();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(sum);
        });
        A.start();
        Thread.sleep(1000);
        synchronized (lock) {
            lock.notify();
        }
    }
}

Using synchronized to lock the lock object and then calling the wait()/notify() method will not report an error.

LockSupport also enables:

public class Main {
    public static void main(String[] args) {
        Thread A = new Thread(() -> {
            int sum = 0;
            for (int i = 0; i < 10; i++) {
                sum += i;
            }
            LockSupport.park();
            System.out.println(sum);
        });
        A.start();
        Thread.sleep(1000);
        LockSupport.unpark(A);
    }
}

For wait()/notify(), after the thread A is started, it calls for Thread.sleep() to ensure that the thread A is executed and enters the wait state. Without this code, it is very likely that the thread A has not executed the accumulating operation, and the outer notify() method has been invoked, that is to call the notify() method before the wait() method, which will cause the thread to never be awakened.

public class Main {
    public static void main(String[] args) {
        final Object lock = new Object();
        Thread A = new Thread(() -> {
            int sum = 0;
            for (int i = 0; i < 10; i++) {
                sum += i;
            }
            try {
                synchronized (lock) {
                    lock.wait();
                    System.out.println("wait");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(sum);
        });
        A.start();
        //Thread.sleep(1000);
        synchronized (lock) {
            lock.notify();
            System.out.println("notify");
        }
    }
}


Using LockSupport will never cause similar problems:

public class Main {
    public static void main(String[] args){
        Thread A = new Thread(() -> {
            int sum = 0;
            for (int i = 0; i < 10; i++) {
                sum += i;
            }
            LockSupport.park();
            System.out.println(sum);
        });
        A.start();
        //Thread.sleep(1000);
        LockSupport.unpark(A);
    }
}

Condition interface

Any Java Object has a set of monitor methods (defined on java.lang.Object), mainly including wait(), wait(long timeout), notify() and notifyAll() methods. These methods cooperate with the synchronized synchronization keyword to realize the wait / notification mode. The Condition interface also provides monitor methods similar to Object:

Condition defines two types of wait / notify methods. When the current thread calls these methods, it needs to obtain the Lock associated with the condition object in advance. The condition object is created by the Lock object (calling the newCondition() method of the Lock object). In other words, the condition depends on the Lock object:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

ConditionObject

ConditionObject is the internal class of the synchronizer AbstractQueuedSynchronizer, which implements the Condition interface. Each Condition object contains a queue (hereinafter referred to as waiting queue), which is the key to the Condition object's waiting / notification function.

AQS synchronization queue and Condition waiting queue

Each ConditionObject object contains a waiting queue. The ConditionObject object has a first node (firstWaiter) and a last node (lastWaiter). The waiting queue is a FIFO queue. Each node in the queue (AbstractQueuedSynchronizer.Node) contains a thread reference. This thread is the thread waiting on the ConditionObject object object. If a thread calls its await() method, the thread will release the lock, construct the node to join the waiting queue and enter the waiting state (addConditionWaiter()):

The ConditionObject object has a reference to the head and tail nodes, and the new node only needs to point to the original tail node nextWaiter and update the tail node. The above node reference update process does not use CAS guarantee, because the thread calling await() method must be the thread that has obtained the lock, that is, the thread safety of the process is guaranteed by the lock:

private Node addConditionWaiter() {
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

On the monitor model of Object, an Object has a synchronization queue and a waiting queue, while the Lock Object in the merge contract (more specifically, the synchronizer) has a synchronization queue and multiple waiting queues (a Lock Object can call the newCondition() method multiple times to obtain multiple Condition objects):

wait for

Calling the await() method (or the method starting with await) of the ConditionObject object will cause the current thread to enter the waiting queue and release the lock, and the thread state will change to the waiting state. When returning from the await() method, the current thread must have obtained the lock associated with the ConditionObject object object.

If you look at the await() method from the perspective of queues (synchronization queue and waiting queue), calling the await() method is equivalent to moving the first node of the synchronization queue (the node that has obtained the lock) to the waiting queue (adding it as the tail node of the waiting queue):

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //The current thread joins the waiting queue
    Node node = addConditionWaiter();
    //Release synchronization status (release lock)
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

The thread calling await() successfully obtains the lock, that is, the first node in the synchronization queue. This method will construct the current thread into a node and join the waiting queue, release the synchronization state, wake up the subsequent nodes in the synchronization queue, and then the current thread will enter the waiting state.

When the node in the waiting queue is awakened, the thread that wakes up the node starts to try to obtain the synchronization status. If the waiting thread is interrupted instead of being awakened by calling the signal() method through other threads, an InterruptedException will be thrown.

awaken

When await() method is called, it is equivalent to moving the first node of the waiting queue (the node that has obtained the lock) to the synchronization queue (adding it as the tail node of the synchronization queue) and waking up:

public final void signal() {
    //The current thread must be the thread that has acquired the lock
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //Add the head node of the waiting queue to the synchronization queue (as the tail node of the synchronization queue)
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //Wake up the node that has just joined the synchronization queue from the waiting queue
        LockSupport.unpark(node.thread);
    return true;
}


The signalAll() method is equivalent to executing the signal() method once for each node in the waiting queue. The effect is to move all nodes in the waiting queue to the synchronization queue and wake up the threads of each node.

Keywords: Java Multithreading Concurrent Programming lock

Added by tylerdurden on Sat, 02 Oct 2021 03:20:15 +0300