linux: thread POSIX semaphore & & thread pool

POSIX semaphore

POSIX semaphores and SystemV semaphores have the same function. They are used for synchronous operation to achieve the purpose of accessing shared resources without conflict. However, POSIX can be used for inter thread synchronization.

1. System call function of POSIX semaphore

Header file: #include < semaphore h>

(1) Initialization semaphore

int sem_init(sem_t *sem, int pshared, unsigned int value);

Parameters:
pshared:0 indicates inter thread sharing, and non-zero indicates inter process sharing.
Value: initial value of semaphore.

(2) Destroy semaphore

int sem_destroy(sem_t *sem);

(3) Waiting semaphore

P operation, waiting for semaphore, will reduce the value of semaphore by 1 and request resources.

int sem_wait(sem_t *sem); //P operation

(4) Release semaphore

V operation, release the semaphore, increase the semaphore value by 1, and release resources.

int sem_post(sem_t *sem);//V operation

2. Production and consumption model based on ring queue

The ring queue is simulated by array, and the ring characteristics are simulated by module operation; Set the subscript + + to 0 when it reaches the end of the array.

#pragma once
#include<iostream>
#include<semaphore.h>
#include<sys/sem.h>
#include<vector>
#include<unistd.h>
using namespace std;
#define NUM 10 / / the macro defines the global variable NUM as the capacity of the ring queue

class ringQueue
{
private:
  vector<int> v;//vector an array that simulates a ring queue
  int max_cap;//Capacity of ring queue
  sem_t sem_blank;//Spaces in the queue; Producer concerns
  sem_t sem_data;//Data in the queue; Consumer concerns
  int con_i;//Producer index
  int pro_i;//Consumer index
public:
  //P operation, waiting for semaphore, will reduce the value of semaphore by 1 and request resources
  void P(sem_t &s)
  {
    sem_wait(&s);
  }
  //V operation, release the semaphore, increase the semaphore value by 1, and release resources
  void V(sem_t &s)
  {
    sem_post(&s);
  }
public:
  ringQueue(int _cap=NUM):max_cap(_cap),v(_cap)
  {
    //At the beginning, the number of blank is NUM and the number of data is 0; Both subscripts are 0;
    sem_init(&sem_blank,0,NUM);
    sem_init(&sem_data,0,0);
    con_i=0;pro_i=0;
  }
  ~ringQueue()
  {
    sem_destroy(&sem_blank);
    sem_destroy(&sem_data);
    con_i=pro_i=0;
  }
  //Consumer consumption; Fetch value
  void Get(int &out)
  {
    //Request data resources and fetch
    P(sem_data);
    out=v[con_i];
    con_i++;
    //After getting the data, the consumer subscript con_i return to zero
    con_i%=max_cap;
    //Free blank resources
    V(sem_blank);
  }
  //Producer production: put in value
  void Put(const int &in)
  {
    //Request blank resources and put data
    P(sem_blank);
    v[pro_i]=in;
    pro_i++;
    //When the data is full, the consumer subscript con_i return to zero
    pro_i%=max_cap;
    //Free data resources
    V(sem_data);
  }
};

(1) Fast production and slow consumption

#include "ringQueue.h"
using namespace std;
//Take out the data in the ring queue, and print "consumer done!" after each data is taken out.
void *consumer(void *ring_queue)
{
  ringQueue *rq=(ringQueue*)ring_queue;
  while(true)
  {
    sleep(1);
    int data=1;
    rq->Get(data);
    cout<<"consumer done# "<<data<<endl;
  }
}
//Put data into the ring queue, and print "product done!" after each time.
void *productor(void *ring_queue)
{
  ringQueue *rq=(ringQueue*)ring_queue;
  int count=0;
  while(true)
  {
    //sleep(1);
    rq->Put(count);
    count++;
    if(count==9)
    {
      count=0;
    }
    cout<<"productor done!"<<endl;
  }
}
int main()
{
  pthread_t con,pro;
  ringQueue *rq=new ringQueue();

  pthread_create(&con,nullptr,consumer,rq);
  pthread_create(&pro,nullptr,productor,rq);

  pthread_join(con,nullptr);
  pthread_join(pro,nullptr);

  return 0;
}

Running screenshot

(1) Slow production and fast consumption

Just let the consumer take out the data and not in sleep(1); Instead, let the producer put in the data and sleep(1).

#include "ringQueue.h"
using namespace std;
//Take out the data in the ring queue, and print "consumer done!" after each data is taken out.
void *consumer(void *ring_queue)
{
  ringQueue *rq=(ringQueue*)ring_queue;
  while(true)
  {
    //sleep(1);
    int data=1;
    rq->Get(data);
    cout<<"consumer done# "<<data<<endl;
  }
}
//Put data into the ring queue, and print "product done!" after each time.
void *productor(void *ring_queue)
{
  ringQueue *rq=(ringQueue*)ring_queue;
  int count=0;
  while(true)
  {
    sleep(1);
    rq->Put(count);
    count++;
    if(count==9)
    {
      count=0;
    }
    cout<<"productor done!"<<endl;
  }
}
int main()
{
  pthread_t con,pro;
  ringQueue *rq=new ringQueue();

  pthread_create(&con,nullptr,consumer,rq);
  pthread_create(&pro,nullptr,productor,rq);

  pthread_join(con,nullptr);
  pthread_join(pro,nullptr);

  return 0;
}

Thread pool

1. What is a thread pool

A thread usage mode. Too many threads will bring scheduling overhead, which will affect cache locality and overall performance. The thread pool maintains multiple threads, waiting for the supervisor to assign concurrent tasks. This avoids the cost of creating and destroying threads when processing short-time tasks. Thread pool can not only ensure the full utilization of the kernel, but also prevent over scheduling. The number of available threads should depend on the number of available concurrent processors, processor cores, memory, network sockets, etc.

2. Usage scenario

  1. A large number of threads are required to complete the task, and the time to complete the task is relatively short. For example, the WEB server completes the task of WEB page request. Because a single task is small and the number of tasks is huge. But for long-time tasks, such as a Telnet connection request, the advantage of thread pool is not obvious. Because Telnet session time is much longer than thread creation time.
  2. Applications that require strict performance, such as requiring the server to respond quickly to customer requests.
  3. An application that accepts a large number of sudden requests without causing the server to produce a large number of threads. A large number of sudden customer requests will generate a large number of threads without thread pool. Although in theory, the maximum number of threads in most operating systems is not a problem. Generating a large number of threads in a short time may make the memory reach the limit and make errors.

3. Simple code implementation of thread pool

#pragma once
#include<iostream>
#include<queue>
#include<unistd.h>
#include<pthread.h>
#include<math.h>
using namespace std;
#define NUM 5
//Create a thread to implement the task. Here is to calculate the square of a number; A task is a thread in a thread pool
class Task
{
public:
  int num;
public:
  Task(){}
  Task(int n):num(n)
  {}
  void Run()
  {
    cout<<"thread id ["<<pthread_self()<<"]"<<"task run ! num "<<num<<" pow is "<<pow(num,2)<<endl;
  }
  ~Task()
  {}
};
//Thread pool
class threadPool
{
private:
  queue<Task*> q;
  int max_num;
  pthread_mutex_t lock;
  pthread_cond_t cond;//It refers to allowing consumers to operate
  //static bool quit;
public:
  //Lock
  void LockQueue()
  {
    pthread_mutex_lock(&lock);
  }
  //Unlock
  void UnlockQueue()
  {
    pthread_mutex_unlock(&lock);
  }
  bool IsEmpty()
  {
    return q.size()==0;
  }
  //Thread waiting
  void ThreadWait()
  {
    pthread_cond_wait(&cond,&lock);
  }
  //Wake up thread
  void ThreadWakeUp()
  {
    pthread_cond_signal(&cond);
  }
public:
  threadPool(int _max=NUM):max_num(_max)
  {}
  //Realize the operation of a single thread
  static void *Rontine(void *arg)
  {
    threadPool *this_p=(threadPool*)arg;
    while(true)
    {
      this_p->LockQueue();
      while(this_p->IsEmpty())
      {
        this_p->ThreadWait();
      }
      Task t;
      this_p->Get(t);
      this_p->UnlockQueue();
      t.Run();
    }
  }
  //Thread pool initialization
  void ThreadPoolInit()
  {
  	//Initialization lock and waiting condition
    pthread_mutex_init(&lock,nullptr);
    pthread_cond_init(&cond,nullptr);
    pthread_t t;
    //If the thread pool queue is not full, it will always be put into the Task and run
    for(int i=0;i<max_num;i++)
    {
      pthread_create(&t,nullptr,Rontine,this);
    }
  }
  //Put a task in the thread pool queue
  void Put(Task &in)
  {
    LockQueue();
    q.push(&in);
    UnlockQueue();
    ThreadWakeUp();
  }
  //Take out and execute the task
  void Get(Task &out)
  {
    Task*t=q.front();
    q.pop();
    out=*t;
  }
  ~threadPool()
  {
    pthread_mutex_destroy(&lock);
    pthread_cond_destroy(&cond);
  }
};

#include "threadPool.h"
using namespace std;

int main()
{
  threadPool *tp = new threadPool();
  tp->ThreadPoolInit();
  while(true)
  {
    sleep(1);
    int x=rand()%10+1;
    Task t(x);
    tp->Put(t);
  }
  return 0;
}

Screenshot of running: it can be seen in the following figure that many different threads run together, and the thread id is different

Keywords: Linux

Added by Brian Swan on Sat, 19 Feb 2022 01:03:53 +0200