Easy to understand AQS source code

Explain the AQS source code hand in hand

1, Overview

This article will take ReentrantLock as an example to show you the source code of AQS. In fact, it is not difficult. The following is a small case of fair lock. You can run and feel it yourself. The following will take you to read the source code bit by bit. If you look at it carefully, you will find that it is not difficult.

/**
 * @author VernHe
 * @date 2021 December 2
 */
public class TestAQS {
    /**
     * true Indicates a fair lock, and false indicates a non fair lock
     */
    static ReentrantLock reentrantLock = new ReentrantLock(true);

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new Thread(new Target(i)).start();
        }
    }

    static class Target implements Runnable {
        // Number of each task
        private int num;

        public Target(int num) {
            this.num = num;
        }

        @Override
        public void run() {
            for (int i = 0; i < 2; i++) {
                reentrantLock.lock();
                try {
                    System.out.println("task" + this.num + "Yes");
                } finally {
                	// The unlock method must be written in finally
                    reentrantLock.unlock();
                }
            }
        }
    }
}

2, Source code part

ReentrantLock

Press Ctrl + the left mouse button and click the lock() method to enter the method

public void lock() {
	sync.lock();
}

Here, you will find that the lock() method of sync is used. Press and hold Ctrl and click sync to find that it is actually an abstract static internal class

public class ReentrantLock implements Lock, java.io.Serializable {


    private final Sync sync;

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
    	//Omitted here
    }
}

According to the English comments (if you don't understand it, you can translate it). It can be seen that ReentrantLock implements fair / unfair locks based on the Sync class. The state attribute in AQS is used to represent the number of locks. In fact, AQS is actually AbstractQueuedSynchronizer. Continue to click the lock() method, select FairSync, and you can see the following source code

static final class FairSync extends Sync {

	final void lock() {
		acquire(1);
	}
	// ... omitted
}

Enter acquire(1)

public final void acquire(int arg) {
	if (!tryAcquire(arg) &&
		acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		selfInterrupt();
}

Here are three methods in if:

  • tryAcquire()

    As its name implies, it will try to acquire a lock once. The logic of fair / unfair lock will be a little different

    protected final boolean tryAcquire(int acquires) {
    	final Thread current = Thread.currentThread();
        // Get the value of state
    	int c = getState();
    	if (c == 0) {	// If the state is 0, it means that everyone is currently using it
    		if (!hasQueuedPredecessors() &&	// Fair: it will judge whether there are other threads in front of it. If it is not fair, it will not
    			compareAndSetState(0, acquires)) {	// CAS operation, trying to get
                 setExclusiveOwnerThread(current);	// Set yourself as an exclusive thread
                 return true;
              }
         }
         else if (current == getExclusiveOwnerThread()) {	// If someone uses it and the person who uses it is himself
    		int nextc = c + acquires;	// Each lock() will increase the value of state by 1
             if (nextc < 0)
                 throw new Error("Maximum lock count exceeded");
             setState(nextc);			// Update state
             return true;
          }
          return false;
    }
    

    To sum up, the above method is actually very simple. It will return true only when [no one uses it and the current thread is successfully monopolized] or [the thread being monopolized is its own]. To put it more popularly, try to see if it can be monopolized

  • addWaiter()

    In fact, to put it bluntly, after the above attempt to obtain the lock fails, it will be added to the waiting queue

    private Node addWaiter(Node mode) {
    	Node node = new Node(Thread.currentThread(), mode);	// Create a node for the current thread and set the corresponding mode
    	// Try the fast path of enq; backup to full enq on failure
    	Node pred = tail; // Point to the Node at the end of the waiting queue
    	if (pred != null) {	// If a thread is waiting
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node; // Line up behind the last Node
                return node; // After successful queuing, the Node corresponding to the current thread is returned
            }
    	}
        enq(node); // If this thread is the first to queue, or the previous queue is unsuccessful, try queuing again until it succeeds
        return node; // After successful queuing, the Node corresponding to the current thread is returned
    }
    

    To sum up, the popular point is to queue up

  • acquireQueued()

    If CAS spin is done here, the lock will be obtained continuously, and the current thread will be blocked after failure

    final boolean acquireQueued(final Node node, int arg) {
    	boolean failed = true;	// Record exclusive failed
    	try {
            boolean interrupted = false; // Whether the recording thread is interrupted
            for (;;) {	// Loop until successful exclusive
                final Node p = node.predecessor(); // Get previous Node
                if (p == head && tryAcquire(arg)) {	// If you are the second and succeed in monopolizing
                    setHead(node);	// Set the current Node to the new head
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;	// Return, jump out of loop
                }
                if (shouldParkAfterFailedAcquire(p, node) && //Check if you should be blocked
                    parkAndCheckInterrupt()) // Block the current thread (debug will stop at this step)
                    interrupted = true;	 // When the thread is awakened again, it will know this method, and then continue the loop
    		}
    	} finally {
    		if (failed)
    			cancelAcquire(node); // If the exclusive fails, the following nodes will be awakened to continue to execute this method
    	}
    }
    

AbstractQueuedSynchronizer

component

1. The waiting queue (CLH queue) is essentially a two-way linked list

2. state variable to save the synchronization status

3. Head and tail pointers are used to save the head and tail of the waiting queue

4. Internal class Node, through two pointers, the previous predecessor Node and the successor Node

5. Methods of operating queues and a series of CASnative methods

3, Summary

Personally, the AQS framework extracts many characteristics of synchronization strategies, such as semaphores, mutexes, and situations where some threads need to wait in various locks. Therefore, a blocking queue (CLH) is extracted to save blocked threads and wake up later. Different synchronization strategies allow different numbers of threads to run at the same time. Therefore, a state variable is extracted. Then there are a series of CAS methods to operate the blocking queue, and the bottom layer is the atomic operation implemented by C language.

Keywords: Java cas JUC aqs

Added by gabriel kent on Sat, 04 Dec 2021 02:19:20 +0200