A brief analysis of JUC source code Semaphore

Related reading

brief introduction

Semaphore is an auxiliary class for thread synchronization. It internally maintains the number of threads currently accessing itself and provides a synchronization mechanism;
Using Semaphore, you can control the number of threads accessing resources at the same time.

Brief analysis of source code

Internal class - Sync

brief introduction

Inheriting from AbstractQueuedSynchronizer and overriding the tryrereleaseshared method of the parent class; Nonfairtryacquisseshared, reducePermits and drainPermits methods are provided;

tryReleaseShared

protected final boolean tryReleaseShared(int releases) {
    // The deadlock ensures that the lock is released successfully. There are multiple threads that modify the shared lock resource state at the same time
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current)
            // The resource count is reversed and an error needs to be thrown
            throw new Error("Maximum permit count exceeded");
        // CAS attempts to update lock resource
        // If the update fails and there are other threads trying to acquire / release the lock at the same time, the next cycle is required
        if (compareAndSetState(current, next))
            // true indicates that the lock is released successfully
            return true;
    }
}

nonfairTryAcquireShared

final int nonfairTryAcquireShared(int acquires) {
    // The deadlock ensures that the attempt to obtain the lock is successful. If there is a shared lock resource, there are multiple threads to modify the state of the shared lock resource at the same time
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            // If no shared lock resource is available, exit directly and return to the current shared lock resource status
            // Otherwise, CAS attempts to update the lock resource
            //     If the update is successful, you can exit and return to the current shared lock resource status
            //     The update failed because there are other threads trying to acquire / release the lock at the same time, so the next cycle needs to be carried out
            return remaining;
    }
}

reducePermits

final void reducePermits(int reductions) {
    // The dead loop ensures successful execution. There are multiple threads that modify the shared lock resource state at the same time
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current)
            // The resource count is reversed and an error needs to be thrown
            throw new Error("Permit count underflow");
        // CAS attempts to update lock resource
        if (compareAndSetState(current, next))
            // If the update is successful, you can exit
            // The update failed because there are other threads trying to acquire / release the lock at the same time, so the next cycle needs to be carried out
            return;
    }
}

drainPermits

final int drainPermits() {
    // Multiple threads can successfully modify the shared resources and execute the loop at the same time
    for (;;) {
        int current = getState();
        // If no shared lock resource is available, exit directly and return to the current shared lock resource status
        // Otherwise, CAS attempts to update the lock resource
        //     If the update is successful, you can exit and return to the current shared lock resource status
        //     The update failed because there are other threads trying to acquire / release the lock at the same time, so the next cycle needs to be carried out
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}

Internal class - NonfairSync

brief introduction

Inherit the internal class Sync to realize unfair semaphore related operations;
The methods to be implemented are: tryAcquireShared, the abstract method of AbstractQueuedSynchronizer inherited from the parent class Sync;

tryAcquireShared

protected int tryAcquireShared(int acquires) {
    // Directly call the implementation of nonfairTryAcquireShared method of parent class Sync
    return nonfairTryAcquireShared(acquires);
}

This method is implemented in the way of unfair competition for locks. The unfairness is reflected in: the new thread does not consider whether there are other threads waiting to obtain the lock, but directly attempts to obtain the lock; If the thread holding the lock just releases the lock at this time, the new thread may succeed in obtaining the lock. The subsequent node thread awakened by the thread releasing the lock will set the waiting state of the head node (the head node has not changed) to SIGNAL again because of the failure to obtain the lock, and then hang up, waiting for the new thread to wake itself up when the lock is released;

Internal class - FairSync

brief introduction

Inherit the internal class Sync to realize fair semaphore related operations;
The methods to be implemented are: tryAcquireShared, the abstract method of AbstractQueuedSynchronizer inherited from the parent class Sync;

tryAcquireShared

protected int tryAcquireShared(int acquires) {
    // The dead loop ensures successful execution. There are multiple threads that modify the shared lock resource state at the same time
    for (;;) {
        if (hasQueuedPredecessors())
            // If there is a thread waiting to acquire a lock, it directly returns - 1, indicating that acquiring the lock failed
            return -1;
        int available = getState();
        int remaining = available - acquires;
        // If no shared lock resource is available, exit directly and return to the current shared lock resource status
        // Otherwise, CAS attempts to update the lock resource
        //     If the update is successful, you can exit and return to the current shared lock resource status
        //     The update failed because there are other threads trying to acquire / release the lock at the same time, so the next cycle needs to be carried out
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

// AbstractQueuedSynchronizer
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    // If the head node is the tail node, there are no other waiting nodes in the queue
    // There are nodes in the queue, and
    //     The next of the head node does not exist, that is, at this time, a new node is inserted into the queue, the tail node has just been updated, and there are other waiting nodes (new nodes) in the queue before updating the next of the old tail node
    //     Or if there is but not this thread, there are other waiting nodes in the queue
    // Otherwise, there are no other waiting nodes, because the next of the head node is the node of this thread, which is the case when the node thread in the waiting queue attempts to obtain the lock
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

This method is implemented in the way of fair competition for locks. Fairness is reflected in: before the new thread attempts to obtain the lock, it will first consider whether there are other threads waiting to obtain the lock. If so, it will directly give up obtaining the lock; If it does not exist, try to obtain the lock;

Semaphore

Constructor

public Semaphore(int permits) {
    // The default is unfair semaphore, and there are more usage scenarios
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

acquire

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public void acquire(int permits) throws InterruptedException {
    // Check input parameters
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

acquireUninterruptibly

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

public void acquireUninterruptibly(int permits) {
    // Check input parameters
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

tryAcquire

public boolean tryAcquire() {
    // If the return value is not less than 0, the lock is successfully obtained
    return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(int permits) {
    // Check input parameters
    if (permits < 0) throw new IllegalArgumentException();
    // If the return value is not less than 0, the lock is successfully obtained
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

release

public void release() {
    sync.releaseShared(1);
}

public void release(int permits) {
    // Check input parameters
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

Other methods

public int availablePermits() {
    return sync.getPermits();
}

public int drainPermits() {
    return sync.drainPermits();
}

protected void reducePermits(int reduction) {
    // Check input parameters
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

test

Demo

class Demo {
    private int parties = 5;
    private Semaphore semaphore = new Semaphore(2);


    public static void main(String[] args) throws InterruptedException {
        Demo demo = new Demo();
        ExecutorService es = Executors.newFixedThreadPool(demo.parties);

        for (int i=0; i<demo.parties; i++) {
            es.execute(new SemaphoreTask(i, demo.semaphore));
        }
        es.shutdown();
    }


    private static class SemaphoreTask implements Runnable {

        private Semaphore semaphore;
        private int index;


        SemaphoreTask(int index, Semaphore semaphore) {
            this.semaphore = semaphore;
            this.index = index;
        }

        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println("Thread:" + index + " waits to acquire semaphore");
                semaphore.acquire();
                System.out.println("Thread:" + index + " has acquired semaphore");

                TimeUnit.SECONDS.sleep(2);
                semaphore.release();
                System.out.println("Thread:" + index + " has released semaphore");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

test result

Thread:0 waits to acquire semaphore
Thread:0 has acquired semaphore
Thread:2 waits to acquire semaphore
Thread:2 has acquired semaphore
Thread:1 waits to acquire semaphore
Thread:3 waits to acquire semaphore
Thread:4 waits to acquire semaphore
Thread:0 has released semaphore
Thread:1 has acquired semaphore
Thread:2 has released semaphore
Thread:3 has acquired semaphore
Thread:1 has released semaphore
Thread:4 has acquired semaphore
Thread:3 has released semaphore
Thread:4 has released semaphore

Keywords: JUC

Added by the-botman on Sat, 19 Feb 2022 17:13:06 +0200