Proficient in concurrent programming from semare to Semaphore

What is Semaphore?

  • Semaphore, commonly known as semaphore, is the implementation of the primitive of PV operation in the operating system in java.
  • Implementation based on AbstractQueuedSynchronizer!
  • Semaphore is very powerful. Semaphores with a size of 1 are similar to mutually exclusive locks, which can be achieved by only one thread acquiring semaphores at the same time.
  • The semaphore with size n (n > 0) can realize the function of current limiting. It can realize that only n threads can obtain semaphores at the same time.

What is PV operation?

  • PV operation is an effective method to realize process mutual exclusion and synchronization in operating system.
  • PV operation is related to the processing of semaphore (S). P means passing and V means releasing.
  • When using PV operation to manage shared resources, we must first ensure the correctness of the execution of PV operation itself.

Main actions of P operation:

  • S minus 1;
  • If S minus 1 is still greater than or equal to 0, the process continues to execute;
  • If S minus 1 is less than 0, the process is blocked and placed in the waiting queue waiting for the semaphore, and then transferred to process scheduling.

Main actions of V operation:

  • S plus 1;
  • If the added result is greater than 0, the process continues to execute;
  • If the added result is less than or equal to 0, release a waiting process from the waiting queue of the signal, and then return to the original process to continue execution or transfer to process scheduling.

Usage scenario of Semaphore

  • It can be used for flow control, especially in application scenarios with limited public resources!

How Semaphore is used

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class SemaphoneTest1 {

	/**
	 * Implement a current limiter that can only process three requests at the same time
	 */
	private static Semaphore semaphore = new Semaphore(3);
	

	/**
	 * Define a thread pool
	 */
	private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS,
			new LinkedBlockingDeque<>(200));

	/**
	 * Simulation execution method
	 */
	public static void exec() {
		try {
			semaphore.acquire(1);
			// Simulate real method execution
			System.out.println("implement exec method" + System.currentTimeMillis());
			Thread.sleep(2000);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			semaphore.release(1);
		}
	}

	public static void main(String[] args) throws InterruptedException {
		{
			for (;;) {
				Thread.sleep(100);
				// The simulation request is at a speed of 10 / s
				executor.execute(() -> exec());
			}
		}
	}
}
  • results of enforcement

Construction method of Semaphore

/**
 * Default to use unfair methods
 */
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

/**
 * permits Indicates the number of licenses (number of resources)
 * fair Indicates fairness. If this is set to true, the next thread to execute will be the thread that waits the longest
 */
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Source code analysis of Semaphore construction method

/**
 * Unfair lock: the implementation of the parent class (AQS) is called directly
 */
NonfairSync(int permits) {
    super(permits);
}

/**
 * Fair lock: the implementation of the parent class (AQS) is called directly
 */
FairSync(int permits) {
    super(permits);
}

/**
 * Call setState(permits); Method to set the number of resources
 */
Sync(int permits) {
    setState(permits);
}

/**
 * Assign a value to the state of AQS 
 */
protected final void setState(int newState) {
    state = newState;
}

Semaphore's lock acquisition method: acquire (int permissions); Source code analysis

/**
 * Get lock source code
 */
public void acquire(int permits) throws InterruptedException {
    // Throw an exception when the number of incoming licenses is less than 0
    if (permits < 0) throw new IllegalArgumentException();
    // Call the method of obtaining interruptible shared resources in AQS
    sync.acquireSharedInterruptibly(permits);
}

/**
 * Get interruptible shared resources
 */
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // If the thread is interrupted, an exception is thrown
    if (Thread.interrupted())
        throw new InterruptedException();
    // Try to obtain a lock. If it is less than 0, there is no lock
    if (tryAcquireShared(arg) < 0)
        // Attempt to acquire a shared lock or interrupt
        doAcquireSharedInterruptibly(arg);
}

Semaphore's attempt to acquire lock logic: tryAcquireShared(arg)

/**
 * tryAcquireShared implementation of fair lock.
 * Compared with the unfair lock, there is one more check: hasQueuedPredecessors()
 */
protected int tryAcquireShared(int acquires) {
    for (;;) {
        // If there are queued objects in the linked list! Return - 1
        if (hasQueuedPredecessors())
            return -1;
        // Gets the current number of resources
        int available = getState();
        // The number of resources to be consumed is subtracted from the current number of resources to calculate the number of resources to be consumed
        int remaining = available - acquires;
        // Only when the number of remaining resources is less than 0 (there is no competition) or the current number of resources can be changed to a reduced value through CAS (these threads can obtain locks or wait for locks), the result will be returned. Otherwise, the loop will continue.
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            // Returns the number of remaining resources
            return remaining;
    }
}

/**
 * tryAcquireShared implementation method of unfair lock: call nonfairtryacquireshared (acquire)
 */
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

/**
 * Unfair lock attempts to acquire shared resources
 */
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // Gets the current number of resources
        int available = getState();
        // The number of resources to be consumed is subtracted from the current number of resources to calculate the number of resources to be consumed
        int remaining = available - acquires;
        // If the number of remaining resources is less than 0 (there is no competition) or the current number of resources can be reduced through CAS (these threads can obtain locks or wait for locks)
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            // Returns the number of remaining resources
            return remaining;
    }
}

/**
 * Judge whether there is a queue in the queue
 */
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    // When not initialized, the value at the head and tail of the queue is null, equal - not satisfied
    // When there is only one, the head and tail are equal - not satisfied
    // The next one in the header is null, indicating that there is only one -- satisfied
    // The next thread is the current thread, and the re-entry is satisfied
    // Simply put: if there is only one linked list or the linked list is empty, return false. There are multiple in the linked list. If the reentry mechanism is not satisfied, false is returned. Only when there are multiple data in the linked list and the holding thread is the current thread will it return true!
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

Attempt to acquire a shared lock or interrupt: doacquiressharedinterruptible

/**
 * Attempt to acquire a shared lock or interrupt
 */
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // Add to waiting queue
    final Node node = addWaiter(Node.SHARED);
    // Definition failure marked as true
    boolean failed = true;
    try {
        for (;;) {
            // Get the precursor node of the current node
            final Node p = node.predecessor();
            // If the front node is the head node
            if (p == head) {
                 // Try to obtain the lock: the implementation of fair lock and non fair lock is inconsistent!
                 int r = tryAcquireShared(arg);
                 // The number of resources obtained is greater than 0 (resources obtained, Semaphore)
                 if (r >= 0) {
                     // Set the header node and linked list, and prepare to wake up the successor node
                     setHeadAndPropagate(node, r);
                     // Cancel the reference of nodes to facilitate GC to recycle
                     p.next = null; // help GC
                     // Change the failure flag to false
                     failed = false;
                     // Jump out of the way!
                     return;
                 }
            }
            // The code executes here, indicating that the attempt to obtain the lock failed.
            // The preparation operation before blocking is successful (successful when the status is - 1)
            // Block the thread and wait for him to wake up. After waking up, return to the interrupt state of the thread!
            if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                 throw new InterruptedException();
        }
    } finally {
        // When the above code throws an exception, it will execute the logic here
        if (failed)
            // Logic to cancel lock acquisition
            cancelAcquire(node);
    }
}

/**
 * Add thread to synchronization queue
 */
private Node addWaiter(Node mode) {
    // Create a Node of the current thread
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure: try the fast path of enq; Backup to full enq in case of failure
    // Get the previous tail node
    Node pred = tail;
    // The tail node is not empty, indicating that the queue exists
    if (pred != null) {
        // Set the previous node of the current thread node to be the previous tail node
        node.prev = pred;
        // cas attempts to set the tail node as the node of the current thread
        if (compareAndSetTail(pred, node)) {
            // Set the previous tail node. The next node of the penultimate node is the current thread node
            pred.next = node;
            // Returns the current node
            return node;
        }
    }
    // If CAS fails or the node is not created, the queued operation will be executed. See the following code for details
    enq(node);
    // After joining the queue successfully, return to the current node
    return node;
}

/**
 * Design essence: 100% create queue or 100% join queue
 */
private Node enq(final Node node) {
    for (;;) {
        // Get the previous tail node
        Node t = tail;
        // The previous tail node is empty and the queue needs to be initialized
        if (t == null) { // Must initialize
            // Set the header node as the current node through CAS
            if (compareAndSetHead(new Node()))
                // After the head node is set successfully, it is copied to the tail node. State of only one node
                tail = head;
        } else {
            // Set the previous node of the current thread node to be the previous tail node
            node.prev = t;
            // cas attempts to set the tail node as the node of the current thread
            if (compareAndSetTail(t, node)) {
                // Set the previous tail node. The next node of the penultimate node is the current thread node
                t.next = node;
                // Return to the current node! Note: This is the only place this method returns! In other words, after initialization, it will continue to cycle once to set the previous node and the next node, and then return.
                return t;
            }
        }
    }
}

/**
 * Set header node and linked list
 */
private void setHeadAndPropagate(Node node, int propagate) {
    // Get the old head
    Node h = head; // Record old head for check below
    // Set the current node as the head node, the owning thread as null, and the precursor node as null
    setHead(node);
    // Propagate > 0: it indicates that there are still remaining shared locks available, so the conditions behind the short circuit.
    // h == null and (h = head) == null are standard ways to prevent control, because after the addWaiter method, there must be a node!
    // h. Waitstatus < 0: the status is less than 0. When calling doReleaseShared, the cas status is returned to 0! It indicates that there are other threads calling doReleaseShared(). The first is the old head node, and the second is the new head node (current node)!
    // (h = head) == null: in addition to preventing null pointers, the current node will be copied to the h variable here
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        // Get the next node of the current node
        Node s = node.next;
        // The next node is empty (the node is the end of the queue) or the next node is in shared mode
        if (s == null || s.isShared())
            // Wake up subsequent nodes and ensure continuous propagation
            doReleaseShared();
    }
}

/**
 * Wake up subsequent nodes and ensure continuous propagation
 */
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
    * in-progress acquires/releases.  This proceeds in the usual
    * way of trying to unparkSuccessor of head if it needs
    * signal. But if it does not, status is set to PROPAGATE to
    * ensure that upon release, propagation continues.
    * Additionally, we must loop in case a new node is added
    * while we are doing this. Also, unlike other uses of
    * unparkSuccessor, we need to know if CAS to reset status
    * fails, if so rechecking.
    */
    for (;;) {
        // Get the current header node
        Node h = head;
        // When the head node is not empty and the head node is not the tail node
        if (h != null && h != tail) {
            // Get the status of the header node
            int ws = h.waitStatus;
            // The status of the head node is - 1: the threads contained in the subsequent nodes of the current node need to run
            if (ws == Node.SIGNAL) {
                // CAS failed to set the current header node status to 0 (the current node is in the sync queue, waiting to obtain the lock). Enter the next cycle! CAS continues successfully!
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // Execute the process of wake-up lock
                unparkSuccessor(h);
            }
            // The status of the head node is 0 and cannot be changed to cas status to - 3 (subsequent acquireShared can be executed). Continue! Otherwise, enter the next cycle
            else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // If the head changes, continue the cycle. Jump out of the loop without changing!
        if (h == head)                   // loop if head changed
            break;
    }
}

/**
 * Wake up lock process
 */
private void unparkSuccessor(Node node) {
    /*
    * If status is negative (i.e., possibly needing signal) try
    * to clear in anticipation of signalling.  It is OK if this
    * fails or if status is changed by waiting thread.
    */
    // Gets the status of the current node
    int ws = node.waitStatus;
    // If the status is less than 0, try changing the status to 0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
    * Thread to unpark is held in successor, which is normally
    * just the next node.  But if cancelled or apparently null,
    * traverse backwards from tail to find the actual
    * non-cancelled successor.
    */
    // Gets the next node of the current node (head node)
    Node s = node.next;
    // The next node of the head node is empty or the state of the next node is 1 (the current thread is cancelled)
    if (s == null || s.waitStatus > 0) {
        // Set next node to null
        s = null;
        // Traverse forward from the tail to find the first node in the normal blocking state until the nodes coincide
        // The reason for traversing from the tail is to prevent missing threads in high concurrency scenarios
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // The next node after filtering is not null. Wake him up
    if (s != null)
        // Wake up the next node
        LockSupport.unpark(s.thread);
}


/**
 * Preparation logic after lock acquisition failure and before blocking
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // Get the waiting state of the current node of the precursor
    int ws = pred.waitStatus;
    // When the status is - 1: the threads contained in the subsequent nodes of the current node need to run, that is, unpark;
    if (ws == Node.SIGNAL)
        /*
        * This node has already set status asking a release
        * to signal it, so it can safely park.
        */
        // The current node state of the precursor has been set to SIGNAL and can be blocked safely
        return true;
    // Greater than 0 (CANCELLED status): indicates that the current thread is canceled;
    if (ws > 0) {
        /*
        * Predecessor was cancelled. Skip over predecessors and
        * indicate retry.
        */
        // The precursor node has timed out or responded to an interrupt. You need to skip these nodes with a state greater than 0 until you find a node with a state not greater than 0.
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // After skipping the interrupted thread, set the next node of the precursor node as the current node.
        pred.next = node;
    } else {
        /*
        * waitStatus must be 0 or PROPAGATE.  Indicate that we
        * need a signal, but don't park yet.  Caller will need to
        * retry to make sure it cannot acquire before parking.
        */
        // For ReentrantLock, the status here can only be 0 or PROPAGATE (- 3)
        // Set the status of the front node to SIGNAL (- 1) through CAS
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

/**
 * The status is SIGNAL (- 1) successful. He needs to queue, so he directly calls the park method to block
 */
private final boolean parkAndCheckInterrupt() {
    // block
    LockSupport.park(this);
    // After unpark, return to the current interrupt state and clear the interrupt flag bit
    return Thread.interrupted();
}

/**
 * Logic to cancel lock acquisition
 */
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    // Ignore when the node does not exist
    if (node == null)
        return;

    // Set the thread of the current node to null
    node.thread = null;

    // Skip cancelled predecessors: a precursor node has been cancelled, skipping all cancelled nodes
    // Get precursor node
    Node pred = node.prev;
    // The status of the precursor node is greater than 0 and is canceled
    while (pred.waitStatus > 0)
        // Set the predecessor node of the predecessor node as the predecessor node of the current node. The simple understanding is to set the precursor node of the current node as the first found precursor node in the normal state (< = 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    // Gets the next node of the current node
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    // Set the current node status to 1 (cancel status). The reason why CAS is not used here is that other threads will skip the cancellation state after the execution. There are no other threads executing before the execution!
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    // If the current node is the tail node, set the tail node to the previous node. The simple understanding is to remove the current node.
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // Entering else indicates that the node is not the tail of the queue (or the tail of the queue but the tail of cas fails (in fact, the result is not the tail of the queue because it is preempted by other threads))
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        // Define a status identifier
        int ws;
        // The filtered precursor node is not a header node
        // And the current node state is - 1 (waiting for wake-up) or (the current node is not running or is not cancelled (< = 0) and the current node CAS can be set to - 1 state (waiting for wake-up))
        // And the precursor node has thread holding!
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            // Get the next node of the current node
            Node next = node.next;
            // The next node is not empty, and the next node has not been cancelled
            if (next != null && next.waitStatus <= 0)
                // CAS connects the previous node to the next node. The simple understanding is to skip (cancel) the current node!
                compareAndSetNext(pred, predNext, next);
        } else {
            // Wake up the next node that will not be cancelled!
            unparkSuccessor(node);
        }
        // The next node of the current node is not pointed to
        node.next = node; // help GC
    }
}

Semaphore's lock release method: release(int permits); Source code analysis

/**
 * Logic for releasing locks
 */
public void release(int permits) {
    // If the quantity passed in is less than 0, an exception will be thrown directly
    if (permits < 0) throw new IllegalArgumentException();
    // Call the method to release the sharing statement
    sync.releaseShared(permits);
}

/**
 * Logic for releasing shared locks
 */
public final boolean releaseShared(int arg) {
    // Try to release the shared lock
    if (tryReleaseShared(arg)) {
        // Wake up subsequent nodes and ensure continuous propagation
        doReleaseShared();
        // The whole returns true, indicating that the shared lock is released successfully
        return true;
    }
    // java specification: there must be a return value, which will not be executed here!
    return false;
}

/**
 * Try to release the shared lock
 */
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // Acquisition status: the number of locks and the number of construction methods passed in
        int current = getState();
        // Get the total quantity if enough is released
        int next = current + releases;
        // Greater than the maximum value of int, next becomes negative! Overflow, throw exception!
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // CAS sets the held status (number of remaining resources) to a new value
        if (compareAndSetState(current, next))
            return true;
    }
}

Concluding remarks

  • Get more valuable articles and let's become architects together!
  • Paying attention to the official account gives you a deep understanding of MySQL.
  • Pay attention to official account and keep continuous understanding of concurrent programming every day!
  • Pay attention to the official account, and follow the continuous and efficient understanding of spring source code.
  • This official account is not advertising!!! Update daily!!!

Keywords: Concurrent Programming

Added by ClosetGeek on Tue, 01 Feb 2022 19:09:12 +0200