Main contents: semaphore and tube pass
Semaphore
Abstract data type of semaphore
- A shaping sem with two atomic operations
- p(), sem minus one. If it is less than 0, wait
- v(), sem+1. If SEM < = 0, wake up a waiting process
Semaphores are protected variables
- After initialization, the only way to change the value of a semaphore is through P V operation
- Operation must be atomic
P can block, V will not block
Usage scenarios of semaphores
- mutex
- Conditional synchronization
Use of semaphores
mutex
mutex = new Semaphore(1); mutex->P(); ... mutex->V();
Scheduling constraints
condition = new Semaphore(0); //Thread A ... condition->P(); //Wait for some instructions of thread B to complete before continuing to run. Block here ... //Thread B ... condition->V(); //Semaphore increase wakes up thread A ...
cooperation
Producer consumer problem
Correctness requirements
- Only one thread can operate the buffer (mutually exclusive) at any one time
- When the buffer is empty, the consumer must wait for the producer (scheduling, synchronization constraints)
- When the buffer is full, the producer must wait for the consumer (scheduling, synchronization constraints)
class BoundedBuffer{ mutex = new Semaphore(1); fullBuffers = new Semaphore(0); //Description the buffer is initially empty emptyBuffers = new Semaphore(n); //At the same time, there can be n producers to produce }; BoundedBuffer::Deposit(c){ emptyBuffers->P(); mutex->P(); Add c to the buffer; mutex->V(); fullBuffers->V(); } BoundedBuffer::Remove(c){ fullBuffers->P(); mutex->P(); Remove c from buffer; mutex->V(); emptyBuffers->V(); }
Semaphore implementation
Using hardware primitives
- disable interrupt
- Atomic instruction
class Semaphore{ int sem; WaitQueue q; }; Semaphore::P(){ --sem; if(sem < 0){ Add this thread t to q; block(p); } }; Semaphore::V(){ ++sem; if(sem <= 0){ Remove a thread t from q; wakeup(t); } }
Tube side
Objective: to separate the concerns of mutual exclusion and conditional synchronization
The management process can be understood as further encapsulation of semaphores to manage a group of semaphores
The pipe pass is divided into
- A lock
- 0 or more semaphores
usage method:
- Related shared data collected in objects and modules
- Define methods to access shared data
Lock
-
Lock::Acquire() waits until the lock is available, and then preempts the lock
-
Lock::Release() release lock
Condition Variable
- Allow waiting state to enter critical zone
- Allow a thread waiting (sleeping) to enter a critical zone
- At some point, the atom releases the lock and goes to sleep
- Wait() operation
- Release the lock, sleep, regain the lock and put it back
- Signal() operation(or broadcast() operation)
- Wake up the waiters (or all waiters), if any
Pipe process implementation producer consumer problem
Implementation of semaphore
class Condition{ int numWaiting = 0; WaitQueue q; }; Condition::Wait(lock){ numWaiting++; Add this thread t to q; release(lock); schedule(); //need mutex require(lock); } Condition::Signal(){ if(numWaiting > 0){ Remove a thread t from q; wakeup(t); //need mutex numWaiting--; } }
Producer consumer
class BoundedBuffer{ Lock lock; int count = 0; //buffer is empty Condition notFull, notEmpty; }; BoundedBuffer::Deposit(c){ lock->Acquire(); //Definition of pipe process: only one thread can enter the pipe process while(count == n) notFull.Wait(&lock); //Release the front lock Add c to the buffer; count++; notEmpty.Signal(); lock->Release(); } BoundedBuffer::Remove(c){ lock->Acquire(); while(count == 0) notEmpty.Wait(&lock); Remove c from buffer; count--; notFull.Signal(); lock->Release(); }
Classical synchronization problem
Reader writer problem
constraint
- Multiple readers are allowed at the same time, but there is only one writer at any time
- When there is no writer, the reader can access the data
- When there are no readers and writers, writers can access data
- Only one thread can manipulate shared variables at any time
Writer first design
Once the writer is ready, the writer will perform the write operation as much as possible. If the writer keeps appearing, the reader will always be blocked
//writer Database::Write(){ Wait until readers/writers; write database; check out - wake up waiting readers/writers; } //reader Database::Read(){ Wait until no writers; read database; check out - wake up waiting writers; } //Tube side realization AR = 0; // # of active readers AW = 0; // # of active writers WR = 0; // # of waiting readers WW = 0; // # of waiting writers Condition okToRead; Condition okToWrite; Lock lock; //writer Public Database::Write(){ //Wait until no readers/writers; StartWrite(); write database; //check out - wake up waiting readers/writers; DoneWrite(); } Private Database::StartWrite(){ lock.Acquire(); while((AW + AR) > 0){ WW++; okToWrite.wait(&lock); WW--; } AW++; lock.Release(); } Private Database::DoneWrite(){ lock.Acquire(); AW--; if(WW > 0){ okToWrite.signal(); } else if(WR > 0){ okToRead.broadcast(); //Wake up all reader s } lock.Release(); } //reader Public Database::Read(){ //Wait until no writers; StartRead(); read database; //check out - wake up waiting writers; DoneRead(); } Private Database::StartRead(){ lock.Acquire(); while(AW + WW > 0){ //Pay attention to the waiting writer, which shows that the writer has priority WR++; okToRead.wait(&lock); WR--; } AR++; lock.Release(); } private Database::DoneRead(){ lock.Acquire(); AR--; if(AR == 0 && WW > 0){ //Only when all the readers are gone, they need to be awakened okToWrite.signal(); } lock.Release(); }
Dining problem of philosophers
Semaphore implementation
#define N 5 #Define left (I + n - 1)% n / / left neighbor #Define right (I + 1)% n / / right neighbor #define THINKING 0 #define HUNGRY 1 #define EATING 2 typedef int semaphore; int state[N]; // Track the status of each philosopher semaphore mutex = 1; // The critical area is a state array, and its modification requires mutual exclusion semaphore s[N]; // One semaphore per philosopher void philosopher(int i) { while(TRUE) { think(i); take_two(i); eat(i); put_two(i); } } void take_two(int i) { down(&mutex); state[i] = HUNGRY; check(i); up(&mutex); down(&s[i]); // Only after receiving the notice can you start eating, otherwise you will wait all the time } void put_two(i) { down(&mutex); state[i] = THINKING; check(LEFT); // Try to inform your neighbors that you are finished and you can start eating check(RIGHT); up(&mutex); } void eat(int i) { down(&mutex); state[i] = EATING; up(&mutex); } // Check whether both neighbors have no meals. If so, up (& s [i]) so that down (& s [i]) can be notified and continue void check(i) { if(state[i] == HUNGRY && state[LEFT] != EATING && state[RIGHT] !=EATING) { state[i] = EATING; up(&s[i]); } }