[LINUX] multithreading (producer consumer model, POXIS semaphore)

Producer consumer model

Why use the producer consumer model

Producer consumer model is to solve the strong coupling problem between producers and consumers through a container. Producers and consumers do not communicate directly with each other, but communicate through the blocking queue. Therefore, after producing data, producers do not need to wait for consumers to process, but directly throw it to the blocking queue. Consumers do not ask producers for data, but directly take it from the blocking queue. The blocking queue is equivalent to a buffer, which balances the processing capacity of producers and consumers. This blocking queue is used to decouple producers and consumers.

This is a common example in life:
In the supermarket, the manufacturer will negotiate with the supermarket to deliver the goods to the supermarket. When consumers see that the supermarket has its own things, they will go back to consume. If the supermarket does not have them, consumers will send a signal to the supermarket, and the supermarket will urge the producers. This is an example of simple life.

Advantages of producer consumer model

  • decoupling
  • Support concurrency
  • Support uneven busy and idle


We can understand it more intuitively through this kind of diagram:

Generally, producers and consumers will be represented by process threads in LINUX.

In this picture, we can clearly see that there are three-tier relationships.

1. Producer producer
2. Consumer consumer
3. Consumer producer

  • first floor:
    • There is mutual exclusion between producers, because when a brand is on the shelf, another brand will queue up until the first one is over, and then it will be the second one.
  • The second floor:
    • Same as the first floor.
  • Third floor:
    • It can be understood as synchronization. When Empty or Full, they will send signals to remind each other, which is what we call conditional variables.

Producer consumer model based on BlockingQueue

BlockingQueue

In multithreaded programming, blocking queue is a data structure commonly used to implement producer and consumer models. The difference from ordinary queues is that when the queue is empty, the operation of obtaining elements from the queue will be blocked until elements are put into the queue; When the queue is full, the operation of storing elements in the queue will also be blocked until some elements are taken out of the queue (the above operations are based on different threads, which will be blocked when operating on the blocked queue process)

Actual combat demonstration

Thread execution code

**Consumers output data through code**

**Producers enter data through codes**

void *consumer_run(void *arg)
{
      BlockQueue *bq = (BlockQueue*)arg;

      while(true)
      {
        int n=0;
        bq->Get(n);
        cout<<n<<endl;
      }
        
}
void *productor_run(void* arg)
{
  sleep(1);
  BlockQueue *bq=(BlockQueue*)arg;
  int count=0;
  while(1)
  {
    count=count%5+1;
    bq->Put(count);
    cout<<count<<endl;
    sleep(1);
  }
}

BlockQueue creation

First of all, if we want to signal consumers and producers, we need two conditional variables, and their relationship is synchronous. Therefore, to ensure the atomicity of the operation, we must lock it.

When the queue is empty, the consumer comes in and waits and sends a signal to the producer.
When the queue is full, the producer comes in and waits and sends a signal to the consumer.

class BlockQueue
{
  private:
    std::queue<int> q;
    size_t cap;
    pthread_mutex_t lock;
    pthread_cond_t c_cond;
    pthread_cond_t p_cond;
    bool Isempty()
    {
      return q.empty();
    }
    bool IsFull()
    {
      if(cap==q.size())
      {
        return true;
      }
      return false;
    }
    void WakeUpProtector()
    {
      printf("wakeup protector\n");
      pthread_cond_signal(&p_cond);

    }
    void WakeUpComsumer()
    {
      printf("wakeup comsumer\n");
      pthread_cond_signal(&c_cond);
    }
    void ProtectorWait()
    {
      printf("waite protector\n");
      pthread_cond_wait(&p_cond,&lock);
    }
    void ConsumerWait()
    {
      printf("wait comsumer\n");
      pthread_cond_wait(&c_cond,&lock);
    }
    void LockQueue()
    {
      pthread_mutex_lock(&lock);
    }
    void UnLockQueue()
    {
      pthread_mutex_unlock(&lock);
    }
  public:
    BlockQueue(size_t _cap)
      :cap(_cap)
    {
      pthread_mutex_init(&lock,NULL);
      pthread_cond_init(&c_cond,NULL);
      pthread_cond_init(&p_cond,NULL);
    }
    void Put(int t)
    {
      LockQueue();
      if(IsFull())
      {
        WakeUpComsumer();
        ProtectorWait();
      }
        q.push(t);
        UnLockQueue();
    }
    void Get(int &t)
    {
      LockQueue();
      if(Isempty())
      {
        WakeUpProtector();
        ConsumerWait();
      }
      t=q.front();
      q.pop();
      UnLockQueue();
    }
    ~BlockQueue()
    {
      pthread_mutex_destroy(&lock);
      pthread_cond_destroy(&c_cond);
      pthread_cond_destroy(&p_cond);

    }
}; 

Result presentation:
Through the results, we can see that the consumers come in empty, notify the producers, send letters to consumers and block when the producers are full, which will improve efficiency.

POXIS semaphore

Basic concepts and creation

POSIX semaphores and SystemV semaphores have the same function. They are used for synchronous operation to achieve the purpose of accessing shared resources without conflict. However, POSIX can be used for inter thread synchronization.

Initialization semaphore

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
Parameters:
pshared:0 Indicates inter thread sharing, and non-zero indicates inter process sharing
value: Initial value of semaphore

Destroy semaphore

int sem_destroy(sem_t *sem);

Waiting semaphore

Function: wait for the semaphore, and the value of the semaphore will be reduced by 1
int sem_wait(sem_t *sem); //P()

Release semaphore

Function: release semaphore, indicating that the resource is used up and can be returned. Increase the semaphore value by 1.
int sem_post(sem_t *sem);//V()

Semaphore is essentially a counter that describes the effective number of critical resources.

Critical resources can be regarded as multiple, no conflict and improve efficiency

This is equivalent to dividing the critical area into multiple areas. If a thread space comes in, it will correspond to the P () operation. When a thread goes out, it will + + correspond to the V () operation. When all the space is occupied, it will let the late threads wait until there is room left.

Production and consumption model based on ring queue

Here, the ring queue needs information about the ring. If you forget what Edison analyzed before, you can click the link to review the following.
Link: Joseph Ring.

Simulation code implementation

#include<iostream>
#include<unistd.h>
#include<pthread.h>
#include<semaphore.h>
#include<stdlib.h>
#include<vector>
using namespace std;
class RingQueue
{
private:
  vector<int> q;
  int _cap;
  sem_t data_sem;
  sem_t blank_sem;
  int consumer_step;
  int product_step;
public:
  RingQueue(int cap=10)
    :q(cap),_cap(cap)
  {
    sem_init(&data_sem,0,0);
    sem_init(&blank_sem,0,_cap);
    consumer_step=0;
    product_step=0;
  }
  void PutData(int &data)
  {
    sem_wait(&blank_sem);
    q[consumer_step]=data;
    consumer_step++;
    consumer_step%=_cap;
    sem_post(&data_sem);
  }
  void GetData(int &data)
  {
    sem_wait(&data_sem);
    data=q[product_step];
    product_step++;
    product_step%=_cap;
    sem_post(&blank_sem);
  }
  ~RingQueue()
  {
    sem_destroy(&data_sem);
    sem_destroy(&blank_sem);
  }

};

void* consumer(void* arg)
{
  RingQueue* q=(RingQueue*)arg;
  int data;
  while(1)
  {
    q->GetData(data);
    cout<<"consume data done:"<<data<<endl;
    sleep(1);
  }
}
void* productor(void* arg)
{
  RingQueue *q = (RingQueue*)arg;
  while(1)
  {
    int data=rand()%10;
    q->PutData(data);
    cout<<"Product data done"<<data<<endl;
    sleep(1);
  }
}
int main()
{
  RingQueue rq;
  pthread_t c,p;
  pthread_create(&c,NULL,consumer,(void*)&rq);
  pthread_create(&p,NULL,productor,(void*)&rq);
  pthread_join(c,NULL);
  pthread_join(p,NULL);
}

Keywords: Linux

Added by maexus on Sun, 20 Feb 2022 06:24:52 +0200