Related reading
- A brief analysis of JUC source code AbstractQueuedSynchronizer
- Brief analysis of JUC source code ReentrantLock
brief introduction
Semaphore is an auxiliary class for thread synchronization. It internally maintains the number of threads currently accessing itself and provides a synchronization mechanism;
Using Semaphore, you can control the number of threads accessing resources at the same time.
Brief analysis of source code
Internal class - Sync
brief introduction
Inheriting from AbstractQueuedSynchronizer and overriding the tryrereleaseshared method of the parent class; Nonfairtryacquisseshared, reducePermits and drainPermits methods are provided;
tryReleaseShared
protected final boolean tryReleaseShared(int releases) { // The deadlock ensures that the lock is released successfully. There are multiple threads that modify the shared lock resource state at the same time for (;;) { int current = getState(); int next = current + releases; if (next < current) // The resource count is reversed and an error needs to be thrown throw new Error("Maximum permit count exceeded"); // CAS attempts to update lock resource // If the update fails and there are other threads trying to acquire / release the lock at the same time, the next cycle is required if (compareAndSetState(current, next)) // true indicates that the lock is released successfully return true; } }
nonfairTryAcquireShared
final int nonfairTryAcquireShared(int acquires) { // The deadlock ensures that the attempt to obtain the lock is successful. If there is a shared lock resource, there are multiple threads to modify the state of the shared lock resource at the same time for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) // If no shared lock resource is available, exit directly and return to the current shared lock resource status // Otherwise, CAS attempts to update the lock resource // If the update is successful, you can exit and return to the current shared lock resource status // The update failed because there are other threads trying to acquire / release the lock at the same time, so the next cycle needs to be carried out return remaining; } }
reducePermits
final void reducePermits(int reductions) { // The dead loop ensures successful execution. There are multiple threads that modify the shared lock resource state at the same time for (;;) { int current = getState(); int next = current - reductions; if (next > current) // The resource count is reversed and an error needs to be thrown throw new Error("Permit count underflow"); // CAS attempts to update lock resource if (compareAndSetState(current, next)) // If the update is successful, you can exit // The update failed because there are other threads trying to acquire / release the lock at the same time, so the next cycle needs to be carried out return; } }
drainPermits
final int drainPermits() { // Multiple threads can successfully modify the shared resources and execute the loop at the same time for (;;) { int current = getState(); // If no shared lock resource is available, exit directly and return to the current shared lock resource status // Otherwise, CAS attempts to update the lock resource // If the update is successful, you can exit and return to the current shared lock resource status // The update failed because there are other threads trying to acquire / release the lock at the same time, so the next cycle needs to be carried out if (current == 0 || compareAndSetState(current, 0)) return current; } }
Internal class - NonfairSync
brief introduction
Inherit the internal class Sync to realize unfair semaphore related operations;
The methods to be implemented are: tryAcquireShared, the abstract method of AbstractQueuedSynchronizer inherited from the parent class Sync;
tryAcquireShared
protected int tryAcquireShared(int acquires) { // Directly call the implementation of nonfairTryAcquireShared method of parent class Sync return nonfairTryAcquireShared(acquires); }
This method is implemented in the way of unfair competition for locks. The unfairness is reflected in: the new thread does not consider whether there are other threads waiting to obtain the lock, but directly attempts to obtain the lock; If the thread holding the lock just releases the lock at this time, the new thread may succeed in obtaining the lock. The subsequent node thread awakened by the thread releasing the lock will set the waiting state of the head node (the head node has not changed) to SIGNAL again because of the failure to obtain the lock, and then hang up, waiting for the new thread to wake itself up when the lock is released;
Internal class - FairSync
brief introduction
Inherit the internal class Sync to realize fair semaphore related operations;
The methods to be implemented are: tryAcquireShared, the abstract method of AbstractQueuedSynchronizer inherited from the parent class Sync;
tryAcquireShared
protected int tryAcquireShared(int acquires) { // The dead loop ensures successful execution. There are multiple threads that modify the shared lock resource state at the same time for (;;) { if (hasQueuedPredecessors()) // If there is a thread waiting to acquire a lock, it directly returns - 1, indicating that acquiring the lock failed return -1; int available = getState(); int remaining = available - acquires; // If no shared lock resource is available, exit directly and return to the current shared lock resource status // Otherwise, CAS attempts to update the lock resource // If the update is successful, you can exit and return to the current shared lock resource status // The update failed because there are other threads trying to acquire / release the lock at the same time, so the next cycle needs to be carried out if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // AbstractQueuedSynchronizer public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; // If the head node is the tail node, there are no other waiting nodes in the queue // There are nodes in the queue, and // The next of the head node does not exist, that is, at this time, a new node is inserted into the queue, the tail node has just been updated, and there are other waiting nodes (new nodes) in the queue before updating the next of the old tail node // Or if there is but not this thread, there are other waiting nodes in the queue // Otherwise, there are no other waiting nodes, because the next of the head node is the node of this thread, which is the case when the node thread in the waiting queue attempts to obtain the lock return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
This method is implemented in the way of fair competition for locks. Fairness is reflected in: before the new thread attempts to obtain the lock, it will first consider whether there are other threads waiting to obtain the lock. If so, it will directly give up obtaining the lock; If it does not exist, try to obtain the lock;
Semaphore
Constructor
public Semaphore(int permits) { // The default is unfair semaphore, and there are more usage scenarios sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
acquire
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public void acquire(int permits) throws InterruptedException { // Check input parameters if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
acquireUninterruptibly
public void acquireUninterruptibly() { sync.acquireShared(1); } public void acquireUninterruptibly(int permits) { // Check input parameters if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); }
tryAcquire
public boolean tryAcquire() { // If the return value is not less than 0, the lock is successfully obtained return sync.nonfairTryAcquireShared(1) >= 0; } public boolean tryAcquire(int permits) { // Check input parameters if (permits < 0) throw new IllegalArgumentException(); // If the return value is not less than 0, the lock is successfully obtained return sync.nonfairTryAcquireShared(permits) >= 0; }
release
public void release() { sync.releaseShared(1); } public void release(int permits) { // Check input parameters if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }
Other methods
public int availablePermits() { return sync.getPermits(); } public int drainPermits() { return sync.drainPermits(); } protected void reducePermits(int reduction) { // Check input parameters if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); }
test
Demo
class Demo { private int parties = 5; private Semaphore semaphore = new Semaphore(2); public static void main(String[] args) throws InterruptedException { Demo demo = new Demo(); ExecutorService es = Executors.newFixedThreadPool(demo.parties); for (int i=0; i<demo.parties; i++) { es.execute(new SemaphoreTask(i, demo.semaphore)); } es.shutdown(); } private static class SemaphoreTask implements Runnable { private Semaphore semaphore; private int index; SemaphoreTask(int index, Semaphore semaphore) { this.semaphore = semaphore; this.index = index; } @Override public void run() { try { TimeUnit.SECONDS.sleep(2); System.out.println("Thread:" + index + " waits to acquire semaphore"); semaphore.acquire(); System.out.println("Thread:" + index + " has acquired semaphore"); TimeUnit.SECONDS.sleep(2); semaphore.release(); System.out.println("Thread:" + index + " has released semaphore"); } catch (InterruptedException e) { e.printStackTrace(); } } } }
test result
Thread:0 waits to acquire semaphore Thread:0 has acquired semaphore Thread:2 waits to acquire semaphore Thread:2 has acquired semaphore Thread:1 waits to acquire semaphore Thread:3 waits to acquire semaphore Thread:4 waits to acquire semaphore Thread:0 has released semaphore Thread:1 has acquired semaphore Thread:2 has released semaphore Thread:3 has acquired semaphore Thread:1 has released semaphore Thread:4 has acquired semaphore Thread:3 has released semaphore Thread:4 has released semaphore