Depth analysis Semaphore working principle analysis

Simple understanding of Semaphore

What is Semaphore?

  1. Semaphore, as its name suggests, is called semaphore;
  2. Semaphore can be used to control the number of threads accessing specific resources at the same time, so as to coordinate thread work;
  3. Semaphore also has static internal classes of fair locks and non fair locks. Like ReentrantLock, semaphore basically calls semaphore through sync.xxx;
  4. Semaphore internally maintains a virtual resource pool. If the license is 0, the thread will block and wait until the license is greater than 0;

Semaphore state keyword

  1. In fact, the implementation of Semaphore also makes good use of the state variable value of its parent class AQS;
  2. Initialize a quantity value as the resource of the license pool, assuming N, then when any thread obtains the resource, the license will be reduced by 1 until the license is 0, and subsequent threads will need to wait;
  3. Semaphore simply means that threads A, B, C and D compete for resources at the same time. At present, the size of the card slot is 2. If threads A and B are executing and are not finished, then threads C and d wait outside the door. Once one of threads A and B is finished, then threads C and D will compete to see who executes first; The initial value of state is assumed to be N. after each tryAcquire(), the CAS of state will be reduced by 1. When state is 0, other threads are in the waiting state until state > 0 and < N, and processes can obtain locks for their own operations;

Common and important methods

public Semaphore(int permits)
// Create a semaphore object with a given number of permissions, and obtain resources in an unfair lock by default
    
public Semaphore(int permits, boolean fair)
// Create a semaphore object with a given number of permissions, and whether it is fair or not is determined by the value of the passed fair Boolean parameter
    
public void acquire() 
// Obtain a license from this semaphore. When the number of licenses is less than zero, block and wait
    
public void acquire(int permits)
// Obtain permissions from this semaphore. When the number of permissions is less than zero, it will block the waiting. However, if the thread blocking the waiting is found to be interrupted after waking up, it will throw an InterruptedException exception
    
public void acquireUninterruptibly(int permits)
// Obtain permissions from this semaphore. When the number of permissions is less than zero, the wait will be blocked. However, if the thread blocking the wait is found to have been interrupted after waking up, the InterruptedException exception will not be thrown
    
public void release()
// Release a license
    
public void acquire(int permits)
// Release permits licenses
    
### Design and implementation of pseudo code
    
#### Get shared lock:
```javapublic final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
acquire{
    If the interrupt status is detected and it is found that it has been interrupted, an exception is thrown InterruptedException abnormal
    If the attempt to acquire the shared lock fails( The various ways to attempt to acquire a shared lock are determined by AQS Subclass implementation of ),
    Then the new shared lock node is added to the queue through spin operation, and according to the waitStatus To decide whether to call LockSupport.park Take a break
}

Release shared lock:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
release{
      If the attempt to release the shared lock fails( Various ways to attempt to release a shared lock are determined by AQS Subclass implementation of ),
      Then, the calling operation of the blocking thread is completed through the spin operation
}

Semaphore's detailed understanding of life

For example, we eat fast food outside every day. I take fast food as an example to illustrate the Semaphore principle:

  • 1. Scene: the restaurant has only one queuing corridor and only ten rice windows;
  • 2. At the time of dinner, at the beginning, the number of people is small and few, and there are many windows. It is natural to get food quickly, but with the passage of time, the number of people will increase, showing congestion and long queues;
  • 3. There are more and more people, but there are only ten windows. The later ones have to wait in line for meals according to the principle of "first come, first served". There is a vacancy in the front window, and the first one in line goes up to get meals in an orderly manner;
  • 4. In short, everyone lined up one by one to eat, first come, first served, in order to eat in peace;
  • 5. So far, 1, 2, 3 and 4 can be considered as semaphore sharing locks in a fair way;
  • 6. However, there are those people who are in a hurry. When they come to the restaurant, they just see that the master has just finished a person's meal, so they plug in to pick up the meal and dare not time;
  • 7. If someone dares to find that the master is still cooking when they come, they have to wait in line to cook;
  • 8. At this point, 1, 2, 6 and 7 can be considered as semaphore sharing locks in an unfair way;

Source code analysis Semaphore

Semaphore constructor

Constructor source code:

   /**
     * Creates a {@code Semaphore} with the given number of
     * permits and nonfair fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
    * Creates a {@code Semaphore} with the given number of
    * permits and the given fairness setting.
    *
    * @param permits the initial number of permits available.
    *        This value may be negative, in which case releases
    *        must occur before any acquires will be granted.
    * @param fair {@code true} if this semaphore will guarantee
    *        first-in first-out granting of permits under contention,
    *        else {@code false}
    */
   public Semaphore(int permits, boolean fair) {
       sync = fair ? new FairSync(permits) : new NonfairSync(permits);
   }

Create a semaphore object with a given number of permissions. By default, non fair locks are used. Of course, fair locks or non fair locks can also be determined by the value of fair Boolean parameter;

Sync synchronizer

1. AQS -- > sync -- > fairsync / / fair lock | > nonfairsync / / non fair lock

2. The synchronizers in Semaphore operate the calling relationship through the Sync abstract interface. A closer look shows that they are basically called through such methods as sync.xxx;

acquire()

1. Source code:

   /**
     * Acquires a permit from this semaphore, blocking until one is
     * available, or the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * <p>Acquires a permit, if one is available and returns immediately,
     * reducing the number of available permits by one.
     *
     * <p>If no permit is available then the current thread becomes
     * disabled for thread scheduling purposes and lies dormant until
     * one of two things happens:
     * <ul>
     * <li>Some other thread invokes the {@link #release} method for this
     * semaphore and the current thread is next to be assigned a permit; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul> 
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * for a permit,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     */ 
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1); // Call the method in the parent class AQS to obtain the shared lock resource
    }

acquire is the entrance for semaphores to obtain shared resources. It attempts to obtain lock resources. If it is obtained, it immediately returns and jumps out of the method. If it is not obtained, the method blocks waiting; It is also the acquiresharedinterruptible method of AQS, the parent class of sync;

acquireSharedInterruptibly(int)

1. Source code:

    /**
    * Acquires in shared mode, aborting if interrupted.  Implemented
    * by first checking interrupt status, then invoking at least once
    * {@link #tryAcquireShared}, returning on success.  Otherwise the
    * thread is queued, possibly repeatedly blocking and unblocking,
    * invoking {@link #tryAcquireShared} until success or the thread
    * is interrupted.
    * @param arg the acquire argument.
    * This value is conveyed to {@link #tryAcquireShared} but is
    * otherwise uninterpreted and can represent anything
    * you like.
    * @throws InterruptedException if the current thread is interrupted
    */
   public final void acquireSharedInterruptibly(int arg)
           throws InterruptedException {
       if (Thread.interrupted()) // Before calling, first detect the thread interrupt flag bit to detect whether the thread has been interrupted before
           throw new InterruptedException(); // If it is interrupted, an interrupt exception is thrown
       if (tryAcquireShared(arg) < 0) // An attempt is made to acquire a shared resource lock. If it is less than 0, the acquisition fails. This method is implemented by a specific subclass of AQS
           doAcquireSharedInterruptibly(arg); // Enqueue the thread trying to acquire the lock resource
   }

2. Acquiressaredinterruptible is the base class method for threads to obtain lock resources in the shared mode. Whenever a thread obtains a shared resource, the shared resource value will be subtracted until the shared resource value is less than 0, and the thread will block and enter the queue to wait;

3. Moreover, the thread supports interrupts, just as the method name implies. When the method detects an interrupt, it will immediately throw an interrupt exception to let the place calling the method immediately perceive the thread interrupt;

tryAcquireShared(int)

1. tryAcquireShared source code of fair lock:

  // tryAcquireShared method of FairSync fair lock
  protected int tryAcquireShared(int acquires) {
    for (;;) { // Dead cycle operation mode of spin
      if (hasQueuedPredecessors()) // Check whether the thread has a blocking queue
        return -1; // If there is a blocking queue, it indicates that the allowed number of shared resources has been used up. Return - 1 to join the queue
      int available = getState(); // Gets the latest memory value of the lock resource
      int remaining = available - acquires; // Calculate the remaining number of licenses
      if (remaining < 0 || // If the number of remaining licenses is less than 0, it indicates that resources have been shared. Return a negative number, and then enter the queue operation obediently
        compareAndSetState(available, remaining)) // If the shared resource is greater than or equal to 0 to prevent concurrency, the last shared resource will be occupied through CAS operation
        return remaining; // No matter what logic is entered after the remaining is obtained, the remaining is returned after the operation. The upper layer will judge whether the queue operation is required according to the remaining value
    }
  }

2. Non fair lock tryAcquireShared source code:

  // tryAcquireShared method of NonfairSync unfair lock
  protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires); //
  }

  // NonfairSync unfair lock parent class the nonfairTryAcquireShared method of the Sync class
  final int nonfairTryAcquireShared(int acquires) {
    for (;;) { // Dead cycle operation mode of spin
      int available = getState(); // Gets the latest memory value of the lock resource
      int remaining = available - acquires; // Calculate the remaining number of licenses
      if (remaining < 0 || // If the number of remaining licenses is less than 0, it indicates that resources have been shared. Return a negative number, and then enter the queue operation obediently 
        compareAndSetState(available, remaining)) // If the shared resource is greater than or equal to 0 to prevent concurrency, the last shared resource will be occupied through CAS operation
        return remaining; // No matter what logic is entered after the remaining is obtained, the remaining is returned after the operation. The upper layer will judge whether the queue operation is required according to the remaining value
    }
  }    

3. The tryAcquireShared method is implemented by the subclass of AQS, that is, the two static internal classes of Semaphore. The purpose is to try to obtain the shared lock resources through CAS, obtain a natural number that is greater than or equal to 0 successfully, and return a negative number if it fails to obtain the shared lock resources;

doAcquireSharedInterruptibly(int)

1. Source code:

    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // Create a new node according to the given mode. There are two modes: Node.EXCLUSIVE exclusive mode and Node.SHARED shared mode;
        final Node node = addWaiter(Node.SHARED); // Create a node for shared mode
        boolean failed = true;
        try {
            for (;;) { // Dead cycle operation mode of spin
                final Node p = node.predecessor(); // Get the precursor node of the node
                if (p == head) { // If the precursor node is head, it means that the current node naturally goes without saying. The second only to the eldest is the second
                    int r = tryAcquireShared(arg); // And the second wants to try to get the lock. What if the head node happens to have just been released? There is still hope. What if it comes true...
                    if (r >= 0) { // If r > = 0, it indicates that the shared lock resource has been successfully obtained
                        setHeadAndPropagate(node, r); // Set the current node node as the head node, and call doReleaseShared to release the useless node
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) && // See if you need a break according to the precursor node
                    parkAndCheckInterrupt()) // Blocking operation. Under normal circumstances, the shared lock cannot be obtained, and the code stops in this method until it is awakened
        // After being awakened, if it is found that an interrupt is detected in parkAndCheckInterrupt(), an interrupt exception is added, so an exception is thrown
                    throw new InterruptedException(); 
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

2. Doacquireeshared interrupt also adopts a spin dead loop operation mode until it returns normally or is awakened and throws an interrupt exception;

release()

1. Source code:

    /**
     * Releases a permit, returning it to the semaphore.
     *
     * <p>Releases a permit, increasing the number of available permits by
     * one.  If any threads are trying to acquire a permit, then one is
     * selected and given the permit that was just released.  That thread
     * is (re)enabled for thread scheduling purposes.
     *
     * <p>There is no requirement that a thread that releases a permit must
     * have acquired that permit by calling {@link #acquire}.
     * Correct usage of a semaphore is established by programming convention
     * in the application.
     */ 
    public void release() {
        sync.releaseShared(1); // Release a licensed resource
    }

2. This method is to call a base class method of its parent class AQS to release shared resources;

releaseShared(int)

1. Source code:

    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // Attempt to release the shared lock resource. This method is implemented by a concrete subclass of AQS
            doReleaseShared(); // Spin operation, wake up subsequent nodes
            return true;
        }
        return false;
    }

2. releaseShared is mainly used to release shared lock resources. If the release is successful, the node waiting in the queue will be awakened. If it fails, it will return false, and the upper caller will decide how to handle it;

tryReleaseShared(int)

1. Source code:

  // The tryrereleaseshared method of the Sync class is the parent class of NonfairSync and FairSync
  protected final boolean tryReleaseShared(int releases) {
      for (;;) { // Dead cycle operation mode of spin 
          int current = getState(); // Get the latest shared lock resource value
          int next = current + releases; // Add the permitted quantity
          // The value of int type is less than 0 because the state value of int type overflows. If it overflows, it must explain how difficult it is to release the lock. There may be a problem
          if (next < current) // overflow
              throw new Error("Maximum permit count exceeded");
          if (compareAndSetState(current, next)) //
              return true; // A success flag is returned to tell the upper layer that the thread has released the shared lock resource
      }
  }

2. Tryrereleaseshared mainly adds state lock resources through CAS operation to free up redundant shared lock resources for other threads to compete;

doReleaseShared()

1. Source code:

    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) { // Dead cycle operation mode of spin
            Node h = head; // Each time, the header node of the queue is taken out
            if (h != null && h != tail) { // If the head node is not empty and is not the tail node
                int ws = h.waitStatus; // Then get the waitStatus status value of the header node
                if (ws == Node.SIGNAL) { // If the head node is in SIGNAL state, it means that the successor nodes of the head node need to be awakened
          // Try to set the state of the header node to null through CAS. If it fails, continue the cycle, because the concurrency may be released elsewhere
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h); // Wake up the successor node of the head node
                }
          // If the header node is in the empty state, change it to the PROPAGATE state. If it fails, it may have been changed due to concurrency, and then cycle again
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
          // If there is no change in the header node, it indicates that the above settings have been completed, the success is complete, and you will retire
          // If there is a change, it may be that the header node has been added or something during the operation, you must retry to ensure that the wake-up action can continue to pass
            if (h == head)                   // loop if head changed 
                break;
        }
    }

2. doReleaseShared is mainly used to release the sharing license, but the most important purpose is to ensure the transmission of wake-up subsequent nodes to enable these threads to release the semaphores they hold;

summary

1. After analyzing AQS, analyze whether Semaphore becomes relatively simple;

2. Here I briefly summarize some features of Semaphore's process: • manage a series of licenses, that is, state shared resource values; • Every time you acquire, the state will be reduced by 1 until the number of licenses is less than 0, and then the wait will be blocked; • When releasing the permission, it is necessary to wake up the successor node to ensure that the thread releases the Semaphore they hold; • It is an upgraded version of Synchronized, because Synchronized has only one license, and Semaphore can have multiple licenses as if it was opened and hung up;

Keywords: Java

Added by RickChase on Wed, 10 Nov 2021 18:30:27 +0200