Tool for java Concurrent Programming sharing model - AQS + ReentrantLock

preface

This article discusses the AQS interface provided in JDK and one of its implementation classes, ReentrantLock. The article is based on the book "the art of Java Concurrent Programming" and the video of dark horse Dark horse multithreading Take notes.

1. AQS

1. Concept

  1. Overview: the full name is AbstractQueuedSynchronizer, which is the framework of blocking lock and related synchronizer tools

  2. characteristic:

    • The state attribute is used to represent the state of resources (exclusive mode and shared mode). Subclasses need to define how to maintain this state and control how to obtain and release locks

      ⅰ. getState - get state
      ⅱ. setState - set state
      ⅲ. compareAndSetState - cas mechanism sets state state
      ⅳ. Exclusive mode allows only one thread to access resources, while shared mode allows multiple threads to access resources

    1. FIFO based waiting queue is provided, which is similar to the EntryList of Monitor
    2. Condition variables are used to realize the waiting and wake-up mechanism. Multiple condition variables are supported, similar to the WaitSet of Monitor

Subclasses mainly implement such methods (UnsupportedOperationException is thrown by default)

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively
//Get lock pose
// If lock acquisition fails
if (!tryAcquire(arg)) {
 // After joining the queue, you can choose to block the current thread aqs and use park unpark to resume running and prohibit running
}

//Release lock posture
// If the lock is released successfully
if (tryRelease(arg)) {
 // Let the blocked thread resume operation
}

2. Code

We first implement a non reentrant lock ourselves, and then look at the source code

@Slf4j
//Custom non reentrant lock
class MyLoack implements Lock{

    //Exclusive lock, synchronizer class
    class MySync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            if(compareAndSetState(0, 1)){
                //The lock is added and the owner is set to the current thread
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            //Set that no thread occupies this lock. exclusiveOwnerThread is the current owner of exclusive mode synchronization
            setExclusiveOwnerThread(null);
            setState(0);    //Put the writing of volatile variable in the back, and make the previous setExclusiveOwnerThread visible to other threads
            return true;
        }

        @Override       //Do you hold an exclusive lock
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        public Condition newCondition(){
            return new ConditionObject();
        }
    }

    private MySync sync = new MySync();

    @Override   //Lock
    public void lock() {
        sync.acquire(1);
    }

    @Override   //Lock, interruptible lock
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override   //Attempt to acquire lock
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override   //Attempt to acquire lock parameter: time
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override   //Unlock
    public void unlock() {
        //release wakes up the waiting thread
        sync.release(1);
    }

    @Override   //Set up a new container
    public Condition newCondition() {
        return sync.newCondition();
    }
}

Test:

  MyLoack lock = new MyLoack();
        new Thread(()->{
            lock.lock();
            try {
                log.debug("Lock successfully");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                log.debug("Unlocked successfully");
                lock.unlock();
            }
        }, "t1").start();

        new Thread(()->{
            lock.lock();
            try {
                log.debug("Locking succeeded");
            }finally {
                log.debug("Unlocked successfully");
                lock.unlock();
            }
        }, "t2").start();

Result output:



2. ReentrantLock principle


1. Implementation principle of unfair lock

From the perspective of constructors, ReentrantLock mode implements unfair locks

1. Locking process

1. Successful process

public void lock() {
    sync.lock();
  }
 final void lock() {
 	//Try changing state from 0 to 1
    if (compareAndSetState(0, 1))
    		//Successfully set the thread to be the current thread
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

2. Failure process

Based on the above, when another thread attempts to acquire a lock, compareAndSetState(0, 1) fails. At this time, enter the process of acquire(1)

public final void acquire(int arg) {
  //First, try acquire to acquire the lock again. If the acquisition fails, enter the acquirequeueueueueueueueued process
  if (!tryAcquire(arg) &&
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         selfInterrupt();
 }

In this method, first tryAcquire attempts to acquire the lock again. If the acquisition fails, enter the acquireQueued process. The process of this method is to create a node and add it to the waiting queue. The process is as follows:

Thread-1 executed

  1. CAS tried to change the state from 0 to 1 in the lock method, but failed

  2. In the lock method, call the acquire method further and enter the tryAcquire logic. Here, we think that the state is already 1, and the result still fails

  3. Next, enter the addWaiter logic of the acquire method to construct the Node queue

    • The yellow triangle in the figure indicates the waitStatus status of the Node, where 0 is the default normal status
    • Node creation is lazy
    • The first Node is called Dummy or sentinel. It is used to occupy a bit and is not associated with a thread



 private Node addWaiter(Node mode) {
 	//Create a node and associate the node with the current thread
     Node node = new Node(Thread.currentThread(), mode);
     //Get tail node
     Node pred = tail;
     //This step is similar to the implementation in enq
     if (pred != null) {
     	//Make a connection between the tail node and the current node. Note that the following is a two-way linked list
         node.prev = pred;
         //Connect node < - > tail
         if (compareAndSetTail(pred, node)) {
             pred.next = node;
             return node;
         }
     }
     //If the tail node is empty, it means that the queue has not been initialized. You need to initialize the head node and join the new node
     enq(node);
     return node;
 }



Next, the current thread enters the acquirequeueueueueueueued logic of the acquire method

  1. Acquirequeueueued will keep trying to obtain locks in an endless loop, and enter park blocking after failure
  2. If you are next to the head (in the second place), try to acquire the lock again. We set the state here as 1 and fail
  3. Enter the shouldParkAfterFailedAcquire logic, change the waitStatus of the precursor node, that is, head, to - 1, and return false this time- 1 indicates that this node has the responsibility to wake up its successor nodes

  1. shouldParkAfterFailedAcquire returns to acquirequeueueueued after execution. Try to acquire the lock again. Of course, the state is still 1 at this time and fails
  2. When entering shouldParkAfterFailedAcquire again, it returns true this time because the waitStatus of its precursor node is - 1
  3. Enter parkAndCheckInterrupt, Thread-1 park (gray indicates blocked)


final boolean acquireQueued(final Node node, int arg) {
	//Set to true
     boolean failed = true;
     try {
         boolean interrupted = false;
         for (;;) {
         	 //Gets the precursor node of the current node
             final Node p = node.predecessor();
             //Judge that if the precursor node is the head node (placeholder node), it will try to obtain the lock again
             if (p == head && tryAcquire(arg)) {
             	 //If the lock is obtained successfully
                 //If it is a header node, set the current node as the header, so the first node in the queue is
                 //The working node is the waiting node at the beginning of the second node. You can also see from the above figure
                 setHead(node);
                 p.next = null; //It is convenient for GC to recycle old header nodes
                 failed = false;
                 //Returns false, indicating that it cannot be interrupted
                 return interrupted;
             }
             //The lock cannot be obtained. The following is to determine whether the thread should be suspended or not
             if (shouldParkAfterFailedAcquire(p, node) &&
             	//The following is to suspend the current thread. At this time, you will find that not only will the thread be suspended, but also
             	//The interrupt flag bit is set to true. If wait, sleep and join methods are used
             	//Any one of them, throw the exception directly
                 parkAndCheckInterrupt())
                 //This method is blocked here. Wait until the previous thread wakes up before running down
                 interrupted = true;
         }
     } finally {
         if (failed)
         	//You can only enter here by throwing an exception
             cancelAcquire(node);
     }
 }


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	   //Gets the status of the precursor node
        int ws = pred.waitStatus;
        //If the status is - 1
        if (ws == Node.SIGNAL)
        	//This node has been set to require the release of the signal, so it can be safely suspended
            return true;
        if (ws > 0) {
        	//If it is greater than 0, the current node will be put first
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
           //Equal to 0 (equal to 0 at the beginning), the old set the status of the header node to - 1 
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }


 //In this method, the park method is called and the interrupted flag is set to true
  private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }



2. Unlocking process

public void unlock() {
        sync.release(1);
    }

1. Successfully unlocked

Now suppose that many threads have failed to compete, it will look like this

Thread-0 releases the lock and enters the tryrelease process. If successful

  • Setting exclusiveOwnerThread to null
  • state = 0
  • If the current queue header Node is not null and the waitstatus of the head = - 1, enter the unparksuccess process: unparksuccess will find the nearest Node in the queue to the head (not cancelled), and unpark will resume its operation, which is Thread-1 in this example
  • At this point, return to the acquirequeueueueueueueued method. The thread continues to run, enters the for loop again, and then locks
  public final boolean release(int arg) {
  //Let's not consider the details here
  //Until exclusive ownerthread = null is set and star is set to 0
    if (tryRelease(arg)) {
    	 //Get the first node
          Node h = head;
          //If the head is not null and the state of the head is 0
          if (h != null && h.waitStatus != 0)
          	  //Wake up the next node, and then continue to run in the acquirequeueueueued method to try to lock
              unparkSuccessor(h);
          return true;
      }
      return false;
  }

 private void unparkSuccessor(Node node) {
 
     	/**
     		Omit the previous code
     	*/
        if (s != null)
        	//Here, the unpark method of the thread is called to wake up
            LockSupport.unpark(s.thread);
    }

If the locking is successful (no other i thread competes with Thread-1), it will be set (in the acquirequeueueueueueueueued method)

  • Thread state = 1, thread state = 1
  • The head points to the Node where Thread-1 is just located, and the Node clears the Thread
  • The original head can be garbage collected because it is disconnected from the linked list



2. Failed to unlock

If there are other threads competing at this time (unfair embodiment), for example, Thread-4 comes at this time. If Thread-4 happens to take the lead

  • Thread-4 is set to exclusiveOwnerThread, state = 1
  • Thread-1 enters acquirequeueueueued again. The process of this method obtains it again, and then rewrites it to enter park blocking



2. Reentrant principle

static final class NonfairSync extends Sync {
    // ...
	//Get lock reentrant
    // Sync inherited method, easy to read, put here
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        //Get status first
        int c = getState();
        //If it is 0, it means that no one has obtained the lock
        if (c == 0) {
        	//Then change from 0 to 1
            if (compareAndSetState(0, acquires)) {
            	//Then set exclusiveOwnerThread to the current thread
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // If the lock has been obtained and the thread is still the current thread, it indicates that lock reentry has occurred
        else if (current == getExclusiveOwnerThread()) {
            // state++
            //For example, the second time, c is 1. At this time, c + acquire means state++
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }	
	
	//Releasable lock
    // Sync inherited method, easy to read, put here
    protected final boolean tryRelease(int releases) {
        // state -- if it is reentrant, subtract one layer. For example, if it is 2 now, it is 2-1
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // Lock reentry is supported. It can be released successfully only when the state is reduced to 0
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        //If the lock is re entered, false will be returned for the first time, and true will not be returned until the number of lock re-entry is reduced to 0, indicating that the release is successful
        return free;
    }
}



3. Interruptible principle

1. Non interruptible mode

Non interruptible mode: in this mode, even if it is interrupted, it will still stay in the AQS queue and can not run until the lock is obtained (to continue running, interruption means that the interrupt flag is set to true, which has no impact on the operation of the thread)

Let's look at the following code: try to acquire the lock in acquire, and then enter the acquirequeueueueueued method. These are all mentioned above. Let's look at the acquirequeueueueueueueueued method. When the lock is not obtained, it will be park. When the previous thread completes execution, it will call the unpark method to wake up the thread. Then the thread will execute interrupted = true downward, but after executing the statement, it will continue the for loop until the lock is obtained, At this point, you can enter if (P = = head & tryAcquire (ARG)) statement to return interrupted = true to acquire, and to acquire, call selfInterrupt() to continue to taste a break mark, because the parkAndCheckInterrupt in this method is parkAndCheckInterrupt. Interrupted clears the break flag

// Sync inherited from AQS
static final class NonfairSync extends Sync {
    // ...

    private final boolean parkAndCheckInterrupt() {
        // If the break flag is already true, the park will be invalidated
        LockSupport.park(this);
        // interrupted will clear the break mark so that you can park next time. Otherwise, if there is a break mark, you can't park
        return Thread.interrupted();
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null;
                    failed = false;
                    // You still need to obtain the lock before you can return to the broken state
                    return interrupted;
                }
                if (
                        shouldParkAfterFailedAcquire(p, node) &&
                                parkAndCheckInterrupt()
                ) {
                    // If it is because interrupt is awakened, the interrupt status returned is true
                    interrupted = true;
                }
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
	
    public final void acquire(int arg) {
        if (	
            
            	//Here you can return to enter if to prove that the above method returns true
                !tryAcquire(arg) &&
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        ) {
            // If the interrupt status is true
            selfInterrupt();
        }
    }

    static void selfInterrupt() {
        // Re generate an interrupt. If the thread is running normally, the interrupt method will not report an error if it is not in sleep or other states
        Thread.currentThread().interrupt();
    }
}



2. Interruptible mode

We call the acquireinterruptible method instead of the acquire method to acquire the lock. In this method, if the doacquireinterruptible fails to obtain the lock, it will still enter the park. At this time, it will throw an exception directly after being awakened by other threads. Therefore, if you are interrupted while waiting in the queue, you will throw an exception.

static final class NonfairSync extends Sync {
    public final void acquireInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // If the lock is not obtained, enter one
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    // An interruptible lock acquisition process
    private void doAcquireInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt()) {
                    // In the process of park, if it is interrupt ed, it will enter this field
                    // At this time, an exception is thrown instead of entering for (;) again
                    
                   //Therefore, if you are interrupted while waiting in the queue, you will throw an exception directly
                    throw new InterruptedException();
                }
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}



4. Fair lock

1. Compare the following unfair locks

The code is the re-entry lock. You can see that the non fair lock will not check the AQS queue, but directly compare and setstate.

 // Three inherited methods are convenient to read and put here
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // If the lock has not been obtained, this is the first time to enter
        if (c == 0) {
            // Try to obtain with cas, which reflects the unfairness: do not check the AQS queue
            if (compareAndSetState(0, acquires)) {
            	//Set exclusiveOwnerThread to the current thread
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // If the lock has been obtained and the thread is still the current thread, it indicates that a lock re-entry lock has occurred
        else if (current == getExclusiveOwnerThread()) {
            // state++
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        // Failed to get. Return to the calling function
        return false;
    }



2. Fair lock

In the tryAcquire method, it is judged that if there is no precursor node in the AQS queue, the current thread is allowed to go to CAS to obtain the lock

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    final void lock() {
        acquire(1);
    }

    // The method inherited from AQS is easy to read and placed here
    public final void acquire(int arg) {
        if (
                !tryAcquire(arg) &&
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        ) {
            selfInterrupt();
        }
    }
    // The main difference from non fair lock is the implementation of tryAcquire method
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // First check whether there are precursor nodes in the AQS queue. If not, compete
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //The following paragraph is also the principle of reentry, which is described above
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    // A method inherited from AQS, which is easy to read and put here
    public final boolean hasQueuedPredecessors() {
        Node t = tail;
        Node h = head;
        Node s;
        // h !=  When t (the head is not equal to the tail), it indicates that there are nodes in the queue. It is proved that there are nodes. If your thread is not in the second (with the highest weight), the eldest is occupied.
        return h != t &&
                (
                        //Indicates whether there is a dick in the queue
                        (s = h.next) == null || 
                        		//Or the current thread is not the second thread
                                s.thread != Thread.currentThread()
                );
    }
}



5. Realization principle of conditional variable

Each condition variable actually corresponds to a waiting queue, and its implementation class is ConditionObject


1. await process

Start Thread-0 to hold the lock, call await, enter the addConditionWaiter process of ConditionObject (single linked list), create a new Node with the status of - 2 (Node.CONDITION), associate Thread-0, and add it to the tail of the waiting queue

Next, enter the fully release process of AQS, release the lock on the synchronizer and wake up the successor nodes of the current node

Then thread-0 unpark the next node in AQS queue, competing for lock. Assuming there are no other competing threads, then Thread-1 competes successfully

park blocking Thread-0


 public final void await() throws InterruptedException {
     //If the thread has been interrupted, an exception is thrown
     if (Thread.interrupted())
         throw new InterruptedException();
     //Add the node to the ConditionObject queue
     Node node = addConditionWaiter();
     //fullyRelease: release all the locks on this thread. fullyRelease allows for the release of reentry locks
     int savedState = fullyRelease(node);
     int interruptMode = 0;
     //Judge if the above node is 2 and is the first
     while (!isOnSyncQueue(node)) {
     	 //The thread is blocked here
         LockSupport.park(this);
         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
             break;
     }
     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
         interruptMode = REINTERRUPT;
     if (node.nextWaiter != null) // clean up if cancelled
         unlinkCancelledWaiters();
     if (interruptMode != 0)
         reportInterruptAfterWait(interruptMode);
 }

//Implementation of single linked list
 private Node addConditionWaiter() {
      Node t = lastWaiter;
      if (t != null && t.waitStatus != Node.CONDITION) {
          unlinkCancelledWaiters();
          t = lastWaiter;
      }
      //Create a node with the status set to 2
      Node node = new Node(Thread.currentThread(), Node.CONDITION);
      //Connect to single linked list
      if (t == null)
          firstWaiter = node;
      else
          t.nextWaiter = node;
      lastWaiter = node;
      return node;
  }

//Release all locks
final int fullyRelease(Node node) {
     boolean failed = true;
     try {
         //Here is the status obtained and the number of reentries
         int savedState = getState();
         //Release the lock according to the number of reentries. In the process of failed, the successor nodes of this node will be awakened
         if (release(savedState)) {
             failed = false;
             return savedState;
         } else {
             throw new IllegalMonitorStateException();
         }
     } finally {
         if (failed)
             node.waitStatus = Node.CANCELLED;
     }
 }



2. signal process

Suppose Thread-1 wants to wake up Thread-0

Enter the doSignal process of ConditionObject and obtain the first Node in the waiting queue, that is, the Node where Thread-0 is located

Execute the transferForSignal process, add the Node to the end of the AQS queue, change the waitStatus of Thread-0 to 0 and the waitStatus of Thread-3 to - 1.

Thread-1 releases the lock and enters the unlock process


public final void signal() {
	//First, judge whether the thread calling signal is the lock holder. Only the owner thread is qualified to wake up
   if (!isHeldExclusively())
        //No, just throw the exception
        throw new IllegalMonitorStateException();
   //Find the element of team leader
    Node first = firstWaiter;
    //Call doSignal
    if (first != null)
        doSignal(first);
}


 private void doSignal(Node first) {
     do {
          //Find the next element, then null, and set lastWaiter to null, indicating that there is no node
          //firstWaiter= first.nextWaiter: assign the next node as the first node
          if ( (firstWaiter = first.nextWaiter) == null)
              lastWaiter = null;
          first.nextWaiter = null;
          //transferForSignal: transfer the above node to the waiting queue
          //If the transfer fails, we will see if there are more nodes. If there are, we will recycle
          //Sometimes when the element in the waiting queue is interrupted, it will be cancelled, and false will be returned
      } while (!transferForSignal(first) &&
               (first = firstWaiter) != null);
  }

 final boolean transferForSignal(Node node) {
        //Set state = 0. Why 0? Because the last element added to the AQS competition queue is usually 0
        //Other elements will be changed to - 1
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
		//enq method is to add a node to the AQS queue and return the precursor node of the added node, that is
		//Original tail node
        Node p = enq(node);
        //Gets the status of the precursor node
        int ws = p.waitStatus;
        //Then try to change the state of the precursor node to - 1, indicating that the precursor node is obliged to wake up the newly added tail node
        //Then wake up the node we just joined. In the above example
        //p = Thread-3, joined node - Thread-0
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

Keywords: Java Back-end Multithreading

Added by IronWarrior on Sun, 06 Feb 2022 23:44:45 +0200