Smaphor (Signal) Realization of Current Limiter

Semaphore semaphore

The semaphore was used before the pipeline was put forward.

Semaphore model

One counter, one waiting queue, three methods. Counters and waiting queues are transparent to the outside world, so they can only be accessed through three methods provided by the semaphore model: init (), down (), up ().

  • init(): Sets the initial value of the counter
  • down(): The value of the counter is minus 1; if the value of the counter is less than 0 at this time, the current thread is blocked, otherwise the current thread continues to execute.
  • up(): The value of the counter is added to 1; if the value of the counter is less than or equal to 0 at this time, a thread in the waiting queue is welcome and removed from the waiting queue.

All three methods are atomic, which is guaranteed by the implementeer of the semaphore model. In Java SDK, the semaphore model is implemented by java.util.concurrent.Semaphore, which guarantees that all three methods are atomic operations.

Vernacular counter
  1. The counter records the number of concurrent requests for setting.
  2. The server accepts a request counter-1.
  3. When the counter equals zero, the request enters the waiting queue and waits.
  4. When the server finishes processing a request counter + 1 (the initial value of the counter at the upper limit, the maximum can only be the initial value of the counter)

Realize a semaphore by hand

import java.util.Queue;

/**
 * Semaphore
 **/
public class Semaphore {
    //Counter
    int count;
    //Waiting queue
    Queue<String> queue;
    //Initialization operation
    Semaphore(int  count){
        this.count = count;
    }

    void  down(){
        this.count--;
        if (this.count<0){
            //Insert the current thread into the waiting queue
            //Blocking the current thread
        }
    }
    void up(){
        this.count ++ ;
        if (this.count>=0){
            //Remove a thread T from the waiting queue
            //Wake-up Thread T
        }
    }
}

In Java SDK concurrent packages, down() and up() correspond to acquire() and release().

Realize a current limiter:

import java.util.List;
import java.util.Vector;
import java.util.concurrent.Semaphore;
import java.util.function.Function;

/**
 *
 *
 **/
public class ObjPool<T,R> {
    //Object pool
    final List<T> pool;
    //Semaphore
    final Semaphore sem;

    /**
     * Adding data to the thread pool
     * @param size Number of objects
     * @param t
     */
    public ObjPool(int size, T t) {
        this.pool = new Vector<>();
        for (int i=0; i<size;i++){
            pool.add(t);
        }
        sem = new Semaphore(size);
    }

    R exec(Function<T,R> func) throws  Exception{
        T t = null;
        sem.acquire();//Counter -1
        try {
            t = pool.remove(0);//Remove a value from the queue
            return func.apply(t);
        }finally {
            pool.add(t);
            sem.release();//Counter +1
        }
    }

    public static void main(String[] args) throws  Exception{
        ObjPool<Long, String> pool = new ObjPool<>(10, 2L);
        pool.exec(t -> {
            System.out.println(t);
            return  t.toString();
        });
    }
}

Code words are not easy. If they are helpful to you, please pay attention to them.

QQ Group: 894109590

Keywords: Java less SDK

Added by pt4siek on Sat, 05 Oct 2019 20:49:41 +0300