1,AQS
1.1 overview
Its full name is AbstractQueuedSynchronizer. It is the framework of blocking lock and related synchronizer tools, which has its own characteristics
- The state attribute is used to represent the state of resources (exclusive mode and shared mode). Subclasses need to define how to maintain this state and control how to obtain and release locks
- getState - get state
- setState - set state
- compareAndSetState - CAS mechanism sets state state
- Exclusive mode allows only one thread to access resources, while shared mode allows multiple threads to access resources
- It provides a FIFO based wait queue, similar to the EntryList of Monitor
- Condition variables are used to realize the waiting and wake-up mechanism. Multiple condition variables are supported, similar to the WaitSet of Monitor and the condition of ReentrantLock
Its subclasses need to choose to implement some of the following methods, because AQS implements an UnsupportedOperationException thrown for these methods by default
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
1.2. User defined non reentrant lock
class MyLock implements Lock { static class MySync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { setExclusiveOwnerThread(null); setState(0); return true; } @Override protected boolean isHeldExclusively() { return getState() == 1; } public ConditionObject newCondition() { return new ConditionObject(); } } private final MySync sync = new MySync(); @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } }
test
MyLock myLock = new MyLock(); new Thread(() -> { myLock.lock(); log.info("lock...{}", LocalDateTime.now()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } myLock.unlock(); log.info("unlock...{}", LocalDateTime.now()); }).start(); new Thread(() -> { myLock.lock(); log.info("lock...{}", LocalDateTime.now()); myLock.unlock(); log.info("unlock...{}", LocalDateTime.now()); }).start();
Repeat locking test
MyLock myLock = new MyLock(); new Thread(() -> { myLock.lock(); log.info("lock...{}", LocalDateTime.now()); myLock.lock(); log.info("lock...{}", LocalDateTime.now()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } myLock.unlock(); log.info("unlock...{}", LocalDateTime.now()); }).start();
It can be seen that it is non reentrant, and the thread is blocked. It can be added again only after it is untied
2,ReentrantLock
2.1 unfair lock
2.1.1 lock and unlock process
Starting from the constructor, the default is the implementation of unfair lock (who gets the lock first)
NonfairSync inherits from Sync, and Sync inherits from AQS
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
- When there is no competition, exclusiveOwnerThread will be marked t1
2.1.1.1 locking failure
- At this time, t2 comes. At first, CAS wants to change the state from 0 to 1, but fails. Enter the acquire method
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
-
You can see that acquire calls its own tryAcquire method again, but it must fail at this time. It will enter the acquirequeueueueued method. This method will first call addWaiter to create a Node object, and then acquirequeueueued will be added to the waiting queue
-
The Node queue is a two-way linked list, and the default normal state of each Node is waitStatus 0. For the head of this two-way linked list, the Node is used to occupy space and is not associated with threads. It is called Dummy or sentinel
- Enter the acquirequeueueueueued logical method
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //Get the precursor Node of the current newly created Node, which is now the sentinel final Node p = node.predecessor(); //If you are next to the head (second), try to acquire the lock again. Of course, the state is still 1 and fails if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //If it fails, first judge whether park is required. If park is required, execute parkAndCheckInterrupt if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
-
It can be seen that the code itself will carry out an endless loop, constantly trying to obtain the lock, and after failure, it will enter the park block
-
At this time, enter the shouldParkAfterFailedAcquire logic, change the waitStatus of the precursor node, that is, head, to - 1, and return false this time. Where SIGNAL = -1 means that it is responsible for waking up its successor nodes
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //Node.SIGNAL = -1 if (ws == Node.SIGNAL) /* * The node has set the state to require release to signal, so it can park safely. */ return true; if (ws > 0) { /* * The precursor node was canceled. Skip the precursor and instruct to retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {//ws = 0 /* * waitStatus Must be 0 or PROPAGATE = -3. It means we need a signal, but don't park. The caller needs to retry to ensure that the lock cannot be acquired before park. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
- Enter the loop again and try to obtain the lock again. It fails. Enter shouldParkAfterFailedAcquire again. At this time, the precursor node is - 1 and returns true directly. Then call parkAndCheckInterrupt to start blocking
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
- At this time, another thread attempts to obtain the lock, but fails. Another Node node is created to enter the acquirequeueueueued method. Now its precursor Node is not head and directly enters the shouldParkAfterFailedAcquire method
- The Node is now set to its predecessor - T0, so the Node is set to its predecessor - t0 again
- Come again, it will be like this
2.1.1.2 unlock competition
- At this time, t1 releases the lock, enters the unlock - > release - > tryrelease method, and finally sets the status to 0
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; //The head node has been created and is followed by a node if (h != null && h.waitStatus != 0) //Wake up its successor nodes unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) //Change to 0 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; //Look forward from the end until you find the node node for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //Wake up the thread corresponding to the node node if (s != null) LockSupport.unpark(s.thread); }
- The first node thread of the queue is also blocked in the parkAndCheckInterrupt method. After waking up, it starts to cycle again to obtain the lock
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { //After obtaining the lock, set the head node as the current node because the previous node has been executed setHead(node); p.next = null; failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
- If another thread comes in the process of waking up t2 to obtain the lock, which is not in the blocking queue. It directly grabs the lock first, then t2 can only re-enter the park blocking (unfair embodiment)
2.2 reentrant principle
Look directly at the code comments
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { //Sync from parent class return nonfairTryAcquire(acquires); } } //Methods in parent class Sync final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //When the same thread comes, it must not be able to obtain the lock. If the locked thread is the same as the current thread, it indicates that lock reentry has occurred, and then enter the following branches else if (current == getExclusiveOwnerThread()) { //state++ int nextc = c + acquires; if (nextc < 0) // Overflow (who enters so much) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } //The method of releasing the lock is also in the parent class Sync protected final boolean tryRelease(int releases) { //The release lock cannot be set to 0 directly because it can be re entered int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //If it has been reduced to 0, the lock will be released if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); //Only after releasing can it be unlocked return free; }
2.3 interruptible principle
Because ReentrantLock supports setting to non interruptible mode (lock is non interruptible and lockinterruptible is interruptible)
2.3.1 non interruptible mode
If it is interrupted, the interrupted method will be called to return whether to interrupt, and then the interrupt flag will be reset
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; //After obtaining the lock, return to whether you have been interrupted return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //If you are interrupted, you will set the representation to true. This flag will only be used after you really get the lock interrupted = true; } } finally { if (failed) cancelAcquire(node); } } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //If it is interrupted, enter this method selfInterrupt(); } static void selfInterrupt() { Thread.currentThread().interrupt(); }
It can be found that in this mode, even if it is interrupted, it will still reside in the AQS queue. You can't know that you are interrupted until you get the lock
2.3.2 interruptible mode
When the lockInterruptibly method is called:
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) //If the lock is not obtained, enter this method doAcquireInterruptibly(arg); } private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //In the same place, but if it is interrupted, it will throw an exception directly, stop waiting in the AQS queue, and directly jump out of the dead loop throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
2.4 fair lock
For non fair locks, after calling tryAcquire, the lock grabbing starts directly without taking care of other threads in the AQS queue
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //If not already occupied if (c == 0) { //Before occupying the lock, first judge whether there is a precursor node, and then compete if there is no precursor node if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. //tail Node t = tail; // Read fields in reverse initialization order //head Node h = head; Node s; //The head is not equal to the tail, which proves that there are nodes in the queue return h != t && //Indicates that there is no dick in the queue ((s = h.next) == null || //Or the second thread in the queue is not this thread s.thread != Thread.currentThread()); }
2.5. Condition variable ConditionObject
Each condition variable actually corresponds to a waiting queue, and its implementation class is ConditionObject
2.5.1,await
Calling await will call addConditionWaiter to create a new Node
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //Create a new Node Node node = addConditionWaiter(); //Why is it called fully? Because locks can be re entered, all locks need to be released int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //Node.CONDITION = -2 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else //Tail interpolation t.nextWaiter = node; lastWaiter = node; return node; }
- At first, t1 held the lock, then called the await method, entered the addConditionWaiter method, created the new Node, the state was -2, and associated it to t1, adding the tail of the waiting queue.
- Then call the fullyRelease method, what is called fully, because the lock can be reentrant, so all the locks need to be released.
final int fullyRelease(Node node) { boolean failed = true; try { //Get the total state int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //Wake up other nodes in the blocking queue unparkSuccessor(h); return true; } return false; }
- If no other thread competes for the lock, the t2 thread should get the lock
2.5.2,signal
Now, if t2 wants to wake up t2 just now, it will transfer the head node of the current waiting queue to the AQS waiting queue. If the transfer fails, it proves that the current node has been cancelled, and it will continue to find the next node. If the transfer succeeds, it will unlock and let the other nodes in the waiting queue start competing for locks
public final void signal() { //Determine whether the current thread holds a lock if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //Find the head node of the waiting queue Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { //Remove the header node if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; //The current head node will be transferred to the queue of competing locks. If the transfer fails, it will continue to find the next node } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. * If it cannot be transferred, it proves that the task has been cancelled and there is no need to transfer */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; //Set the previous node status to - 1 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //Perform unlocking operation LockSupport.unpark(node.thread); return true; }
3. Read write lock
3.1 use of ReentrantReadWriteLock
When the read operation is much higher than the write operation, the read-write lock is used to make the read-read concurrent and improve the performance. Similar to select... from... lock in share mode in the database, simply write a test class
@Slf4j class DataContainer { private Object data; private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock.ReadLock r = rw.readLock(); private final ReentrantReadWriteLock.WriteLock w = rw.writeLock(); @SneakyThrows public Object read() { log.info("Acquire read lock...{}", LocalDateTime.now()); r.lock(); try { log.info("read {}", LocalDateTime.now()); TimeUnit.SECONDS.sleep(1); return data; } finally { log.info("Release read lock...{}", LocalDateTime.now()); r.unlock(); } } @SneakyThrows public void write() { log.info("Get write lock...{}", LocalDateTime.now()); w.lock(); try { log.info("write in {}", LocalDateTime.now()); TimeUnit.SECONDS.sleep(1); } finally { log.info("Release write lock...{}", LocalDateTime.now()); w.unlock(); } } }
- Test read read
DataContainer dataContainer = new DataContainer(); new Thread(dataContainer::read, "t1").start(); new Thread(dataContainer::read, "t2").start();
It can be found that the reading operation is unimpeded
- Test read-write
DataContainer dataContainer = new DataContainer(); new Thread(dataContainer::read, "t1").start(); new Thread(dataContainer::write, "t2").start();
It can be found that read and write operations are mutually exclusive
- Test write write
DataContainer dataContainer = new DataContainer(); new Thread(dataContainer::write, "t1").start(); new Thread(dataContainer::write, "t2").start();
Writing is also mutually exclusive
be careful:
- Read lock does not support conditional variables
- Upgrading during reentry is not supported (i.e. obtaining a write lock while holding a read lock will cause permanent waiting for obtaining a write lock), because the read lock cannot guarantee that there will be other read locks ✌️
- However, it supports downgrade (that is, when you hold a write lock, you can obtain a read lock)
3.2 principle of read-write lock
The read-write lock uses the same Sync synchronizer, so the waiting queue and state are also the same
-
t1 write, t2 read
-
t1 successfully writes the lock. The process is no different from that of ReentrantLock. The difference is that the write lock status accounts for the lower 16 bits of state, while the read lock uses the upper 16 bits of state
//Method of writing lock public void lock() { sync.acquireShared(1); } //This method is the AQS method seen before public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //Method overridden by ReentrantReadWriteLock protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); //Write lock part int w = exclusiveCount(c); //If it is not 0, other threads may have a read lock or a write lock if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) //If the write lock is equal to 0, it means that the read lock has been added before. If the home is the write lock, judge whether it is added by yourself (re-entry) if (w == 0 || current != getExclusiveOwnerThread()) //Read and write are mutually exclusive, and false is returned directly return false; //If the write lock plus 1 exceeds the maximum value (16 bit 65535), there are too many reentries if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire //Write lock + 1, re-enter setState(c + acquires); return true; } //If there is no lock, I will first judge whether the current thread is blocked. If it is an unfair lock, it always returns false and needs to wait if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //Set yourself as the Owner thread setExclusiveOwnerThread(current); return true; }
- t2 comes at this time, executes the lock method of reading lock, and then enters the sync of reading lock Acquiresshared (1) process, first enter the tryacquiresshared process. If a write lock is occupied, tryAcquireShared returns - 1, indicating failure
- -1 means failure
- 0 indicates success, but the subsequent node will not continue to wake up = = (the read-write lock is not involved)==
- A positive number indicates success, and the value is that several subsequent nodes need to wake up, and the read-write lock returns 1
//Rewrite lock read method public void lock() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); //Verify that the write lock is 0 if (exclusiveCount(c) != 0 && //In the previous judgment, the write lock has been added, and then the one who adds the read lock is himself, and then adds the read lock. It should be allowed, otherwise - 1 getExclusiveOwnerThread() != current) //This returns directly to - 1 return -1; // int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
- If locking fails, you will enter the doAcquireShared method. First, you will call addWaiter to add a node. The difference is that the node is set to node Shared mode (read lock sharing) instead of node Exclusive mode (write lock exclusive). Note that t2 is still active at this time
//If locking fails, you will enter this method private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //Get precursor node final Node p = node.predecessor(); //If you are a dick, you still have the right to get the lock if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //Whether the current thread needs a park. If it does, change the state of the precursor node to - 1. It is responsible for waking up the current thread, and then return false to enter the next cycle if (shouldParkAfterFailedAcquire(p, node) && //The second time after park came in parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
-
t3 read t4 write
-
Now in this state, suppose that t3 adds a read lock and t4 adds a write lock. During this period, t1 still holds the lock, which becomes the following
- Now t1 ready to unlock, execute unlock - > release - > tryrelease
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //Start wake-up process unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; //Determine whether to re-enter boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
- When the lock is released, the unparkSuccessor method is invoked to start the wake-up process. At this point, t2 resumed operation, resumed operation from parkAndCheckInterrupt, re executed the for loop, and then obtained the lock.
- Then we call the setHeadAndPropagate method, which sets the original node of t2 as the header node.
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
- In the setHeadAndPropagate method, it will also check whether the next node is shared. If so, call doReleaseShared() to change the state of the head from - 1 to 0 and wake it up (first change it to 0 to avoid other threads performing the same operation here). At this time, t2 and t3 are both active. After t3 comes back, the tryacquishared method is executed, The read lock will also be increased by 1
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
At this time, t3 executes setHeadAndPropagate again. With the same logic, t3 node is set as the head node, and t4 behind it is no longer shared, so it stops waking up
-
t2 unlock, t3 unlock
-
t2 enter sync In releaseShared (1), calling tryReleaseShared(1) makes the count minus one, but at this time the count is not zero, and t3 is unlocked again to reduce it to 0.
public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //All can be released only when it is reduced to 0 doReleaseShared(); return true; } return false; }
- Enter the doReleaseShared method to release the lock completely
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //Avoid other thread operations if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //Wake up write lock unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
3.3,StampedLock
This class is added from JDK 8 to further optimize the read performance. Its feature is that when using read lock and write lock, it must be used with [stamp]
- Add interpretation lock
long stamp = lock.readLock(); lock.unlockRead(stamp);
- Add / remove write lock
long stamp = lock.writeLock(); lock.unlockWrite(stamp);
Optimistic reading. StampedLock supports the tryOptimisticRead() method (happy reading). After reading, a stamp verification needs to be done. If the verification passes, it indicates that there is no write operation during this period, and the data can be used safely. If the verification fails, the read lock needs to be obtained again to ensure data security.
long stamp = lock.tryOptimisticRead(); // Check stamp if(!lock.validate(stamp)){ // Lock upgrade }
Write a data container class, which uses the read() method of reading lock protection data and the write() method of writing lock protection data
@Slf4j class DataContainerStamped { private int data; private final StampedLock lock = new StampedLock(); public DataContainerStamped(int data) { this.data = data; } @SneakyThrows public int read(int readTime) { long stamp = lock.tryOptimisticRead(); log.info("optimistic read locking...{},{}", stamp, LocalDateTime.now()); TimeUnit.SECONDS.sleep(readTime); if (lock.validate(stamp)) { log.info("read finish...{}, data:{},{}", stamp, data, LocalDateTime.now()); return data; } // Lock upgrade - read lock log.info("updating to read lock... {},{}", stamp, LocalDateTime.now()); try { stamp = lock.readLock(); log.info("read lock {},{}", stamp, LocalDateTime.now()); TimeUnit.SECONDS.sleep(readTime); log.info("read finish...{}, data:{},{}", stamp, data, LocalDateTime.now()); return data; } finally { log.info("read unlock {},{}", stamp, LocalDateTime.now()); lock.unlockRead(stamp); } } @SneakyThrows public void write(int newData) { long stamp = lock.writeLock(); log.info("write lock {},{}", stamp, LocalDateTime.now()); try { TimeUnit.SECONDS.sleep(2); this.data = newData; } finally { log.info("write unlock {},{}", stamp, LocalDateTime.now()); lock.unlockWrite(stamp); } } }
Read
DataContainerStamped dataContainer = new DataContainerStamped(1); new Thread(() -> { dataContainer.read(1); }, "t1").start(); TimeUnit.MILLISECONDS.sleep(500); new Thread(() -> { dataContainer.read(0); }, "t2").start();
Reading and writing
DataContainerStamped dataContainer = new DataContainerStamped(1); new Thread(() -> { dataContainer.read(1); }, "t1").start(); TimeUnit.MILLISECONDS.sleep(500); new Thread(() -> { dataContainer.write(100); }, "t2").start();
Note: StampedLock does not support conditional variables and reentrant
4,Semaphore
Semaphore, used to limit the maximum number of threads that can access shared resources at the same time.
4.1 basic use
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
It can be seen that if you call the construction method without Boolean parameters, the default is a non fair lock (whoever grabs the lock will get the lock regardless of the order). If it is True, you will create a fair lock (strictly according to the first come, first served)
// 1. Create a semaphore object Semaphore semaphore = new Semaphore(3); // 2. 10 threads running at the same time for (int i = 0; i < 10; i++) { new Thread(() -> { // 3. Obtaining permits try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { log.info("running..."); TimeUnit.SECONDS.sleep(1); log.info("end..."); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 4. Release permit semaphore.release(); } }).start(); }
4.2 application
When learning the meta model, we used wait notify to implement a pseudo database connection pool
@Slf4j public class Test { @SneakyThrows public static void main(String[] args) { MockPool mockPool = new MockPool(5); for (int i = 0; i < 10; i++) { new Thread(() -> { MockConnection connection = mockPool.getConnection(); System.out.println("Get the connection:" + connection.getConnectionName()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Return connection:" + connection.getConnectionName()); mockPool.free(connection); }).start(); } } } final class MockPool { private final MockConnection[] connections; private final AtomicIntegerArray state; private static final Integer DEFAULT_POOL_SIZE = 5; public MockPool() { connections = new MockConnection[DEFAULT_POOL_SIZE]; state = new AtomicIntegerArray(DEFAULT_POOL_SIZE); initPool(); } public MockPool(Integer poolSize) { connections = new MockConnection[poolSize]; state = new AtomicIntegerArray(poolSize); initPool(); } private void initPool() { for (int i = 0; i < connections.length; i++) { connections[i] = new MockConnection("connect-" + (i + 1)); } } @SneakyThrows public MockConnection getConnection() { for (; ; ) { for (int i = 0; i < connections.length; i++) { if (state.get(1) == 0) { if (state.compareAndSet(i, 0, 1)) { return connections[i]; } } } synchronized (this) { System.out.println("No connection available, waiting..."); wait(); } } } public void free(MockConnection connection) { for (int i = 0; i < connections.length; i++) { if (connections[i] == connection) { state.set(i, 0); synchronized (this) { notifyAll(); } return; } } System.out.println("The connection to be released does not exist"); } } @Data @NoArgsConstructor @AllArgsConstructor class MockConnection { private String connectionName; }
wait notify is low-level, which is difficult for beginners to master and easy to make mistakes, so we use Semaphore to improve it
/** * @author People below two meters are mortals */ @Slf4j public class Test { @SneakyThrows public static void main(String[] args) { MockPool mockPool = new MockPool(5); for (int i = 0; i < 10; i++) { new Thread(() -> { MockConnection connection = mockPool.getConnection(); System.out.println("Get the connection:" + connection.getConnectionName()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Return connection:" + connection.getConnectionName()); mockPool.free(connection); }).start(); } } } final class MockPool { private final MockConnection[] connections; private final AtomicIntegerArray state; private static final Integer DEFAULT_POOL_SIZE = 5; private final Semaphore semaphore; public MockPool() { connections = new MockConnection[DEFAULT_POOL_SIZE]; state = new AtomicIntegerArray(DEFAULT_POOL_SIZE); semaphore = new Semaphore(DEFAULT_POOL_SIZE); initPool(); } public MockPool(Integer poolSize) { connections = new MockConnection[poolSize]; state = new AtomicIntegerArray(poolSize); semaphore = new Semaphore(poolSize); initPool(); } private void initPool() { for (int i = 0; i < connections.length; i++) { connections[i] = new MockConnection("connect-" + (i + 1)); } } @SneakyThrows public MockConnection getConnection() { semaphore.acquire(); for (int i = 0; i < connections.length; i++) { if (state.get(i) == 0) { if (state.compareAndSet(i, 0, 1)) { return connections[i]; } } } //It will not be implemented here return null; } public void free(MockConnection connection) { for (int i = 0; i < connections.length; i++) { if (connections[i] == connection) { state.set(i, 0); semaphore.release(); return; } } System.out.println("The connection to be released does not exist"); } } @Data @NoArgsConstructor @AllArgsConstructor class MockConnection { private String connectionName; }
4.3 principle
4.3.1 lock and unlock process
Semaphore is a bit like a parking lot. Permissions is like the number of parking spaces. When a thread obtains permissions, it is like obtaining parking spaces. Then the parking lot displays the free parking spaces minus one. At the beginning, * * permissions (state) * * is 3. At this time, five threads obtain resources
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } //In fact, the permissions are stored in the state Sync(int permits) { setState(permits); }
- Assuming that T1, T2, T4 contention lock is successful and T3, T5 contention fails, enter the AQS blocking queue park:
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //As long as the lock is obtained successfully, it is a positive number. The fourth time, it will be judged to be successful. Enter the following method if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); //No problem for the first three times. After the fourth time, it will be reduced to - 1 int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) //Directly returns - 1 return remaining; } } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //Create node final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); //Second, try again if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //Set to - 1 if (shouldParkAfterFailedAcquire(p, node) && //park Live parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
- This t4 releases Semaphore
public void release() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); //0 + 1 = 1 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //Do you want to wake up the successor if (ws == Node.SIGNAL) { //Avoid interference from other threads if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //Wake up successor unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
- Now wake up the next node t3 of the head node, re-enter the for loop, obtain the lock, and then execute the setHeadAndPropagate method to wake up all subsequent shared nodes, but it's useless because t3 gets it first and then wakes up, which is still 0 at this time, so t5 park s again
5,CountDownLatch
Used for thread synchronization and cooperation, waiting for all threads to complete the countdown. The construction parameter is used to initialize the waiting count value, await() is used to wait for the count to return to zero, and countDown() is used to reduce the count by one
CountDownLatch latch = new CountDownLatch(3); new Thread(() -> { log.info("begin..."); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); log.info("end...{}", latch.getCount()); }).start(); new Thread(() -> { log.info("begin..."); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); log.info("end...{}", latch.getCount()); }).start(); new Thread(() -> { log.info("begin..."); try { TimeUnit.MILLISECONDS.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); log.info("end...{}", latch.getCount()); }).start(); log.info("waiting..."); latch.await(); log.info("wait end...");
In fact, it should be regarded as the simplest tool in JUC. The source code is as follows. With the underlying principle analysis of the previous tools, it is easy to read and understand
public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
As for the Demo currently implemented, we find that its effect is the same as that of join. What is the significance of the existence of the counter? In fact, in the real scene, our threads are obtained from the thread pool, so as to achieve thread reuse. For some thread pools of fixed size, the thread will run all the time and will not end easily, Therefore, you can't use the underlying API of join to implement it. Here is a simple application that uses thread pool to count down
CountDownLatch latch = new CountDownLatch(3); ExecutorService service = Executors.newFixedThreadPool(4); service.submit(() -> { log.info("begin..."); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); log.info("end...{}", latch.getCount()); }); service.submit(() -> { log.info("begin..."); try { TimeUnit.MILLISECONDS.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); log.info("end...{}", latch.getCount()); }); service.submit(() -> { log.info("begin..."); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); log.info("end...{}", latch.getCount()); }); service.submit(() -> { try { log.info("waiting..."); latch.await(); log.info("wait end..."); } catch (InterruptedException e) { e.printStackTrace(); } });
6,CyclicBarrier
6.1. Problem raising
- When using CountDownLatch, during the execution process, after the CountDownLatch is created once, the main thread must wait until it is reduced to 0 to continue running. Now there is a new requirement. I need to repeat the execution three times this time, and the main thread must also wait three times. The usual method is to use the for loop:
ExecutorService service = Executors.newFixedThreadPool(4); for (int i = 0; i < 3; i++) { CountDownLatch latch = new CountDownLatch(3); service.submit(() -> { log.info("begin..."); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); log.info("end...{}", latch.getCount()); }); service.submit(() -> { log.info("begin..."); try { TimeUnit.MILLISECONDS.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); log.info("end...{}", latch.getCount()); }); service.submit(() -> { log.info("begin..."); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); log.info("end...{}", latch.getCount()); }); try { latch.await(); log.info("finish,{}", latch.getCount()); } catch (InterruptedException e) { e.printStackTrace(); } }
However, we found that a new counter is created in each cycle, and the counter itself has no way to directly change the count. Is there a better way to solve this problem
6.2 basic use
CyclicBarrier: a circular fence used for thread cooperation and waiting for threads to meet a certain count. When constructing, set the "count number". When each thread executes to a time when "synchronization" is required, call the await() method to wait. When the number of waiting threads meets the "count number", continue to execute
So we can use this tool to solve the above problems
// Execution will continue only when the number is 2 CyclicBarrier cb = new CyclicBarrier(2, () -> { System.out.println("task1 task2 end"); }); for (int i = 0; i < 3; i++) { int j = i; new Thread(() -> { System.out.println("thread " + j + "-1 start.." + new Date()); try { // When the number is insufficient, wait cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("thread " + j + "-1 Continue to run down..." + new Date()); }).start(); new Thread(() -> { System.out.println("thread " + j + "-2 start.." + new Date()); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } try { // After 2 seconds, the number of threads is enough to continue running cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("thread " + j + "-2 Continue to run down..." + new Date()); }).start();