Semaphore semaphore
The semaphore was used before the pipeline was put forward.
Semaphore model
One counter, one waiting queue, three methods. Counters and waiting queues are transparent to the outside world, so they can only be accessed through three methods provided by the semaphore model: init (), down (), up ().
- init(): Sets the initial value of the counter
- down(): The value of the counter is minus 1; if the value of the counter is less than 0 at this time, the current thread is blocked, otherwise the current thread continues to execute.
- up(): The value of the counter is added to 1; if the value of the counter is less than or equal to 0 at this time, a thread in the waiting queue is welcome and removed from the waiting queue.
All three methods are atomic, which is guaranteed by the implementeer of the semaphore model. In Java SDK, the semaphore model is implemented by java.util.concurrent.Semaphore, which guarantees that all three methods are atomic operations.
Vernacular counter
- The counter records the number of concurrent requests for setting.
- The server accepts a request counter-1.
- When the counter equals zero, the request enters the waiting queue and waits.
- When the server finishes processing a request counter + 1 (the initial value of the counter at the upper limit, the maximum can only be the initial value of the counter)
Realize a semaphore by hand
import java.util.Queue; /** * Semaphore **/ public class Semaphore { //Counter int count; //Waiting queue Queue<String> queue; //Initialization operation Semaphore(int count){ this.count = count; } void down(){ this.count--; if (this.count<0){ //Insert the current thread into the waiting queue //Blocking the current thread } } void up(){ this.count ++ ; if (this.count>=0){ //Remove a thread T from the waiting queue //Wake-up Thread T } } }
In Java SDK concurrent packages, down() and up() correspond to acquire() and release().
Realize a current limiter:
import java.util.List; import java.util.Vector; import java.util.concurrent.Semaphore; import java.util.function.Function; /** * * **/ public class ObjPool<T,R> { //Object pool final List<T> pool; //Semaphore final Semaphore sem; /** * Adding data to the thread pool * @param size Number of objects * @param t */ public ObjPool(int size, T t) { this.pool = new Vector<>(); for (int i=0; i<size;i++){ pool.add(t); } sem = new Semaphore(size); } R exec(Function<T,R> func) throws Exception{ T t = null; sem.acquire();//Counter -1 try { t = pool.remove(0);//Remove a value from the queue return func.apply(t); }finally { pool.add(t); sem.release();//Counter +1 } } public static void main(String[] args) throws Exception{ ObjPool<Long, String> pool = new ObjPool<>(10, 2L); pool.exec(t -> { System.out.println(t); return t.toString(); }); } }
Code words are not easy. If they are helpful to you, please pay attention to them.
QQ Group: 894109590