Thread pool and Simulation Implementation

Thread pool

A thread usage pattern. Too many threads can lead to scheduling overhead, which will affect cache locality and overall performance. Thread pools maintain multiple threads, waiting for supervisors to assign concurrent tasks. This avoids the cost of creating and destroying threads when dealing with short-term tasks. Thread pool can not only ensure the full use of the kernel, but also prevent over-scheduling. The number of available threads should depend on the number of available concurrent processors, processor cores, memory, network sockets, etc.

Application scenarios for thread pools

1. A large number of threads are needed to complete the task, and the events to complete the task are relatively short. The thread pool technology is very suitable for web server to fulfill the task of web page request. Because the single task is small and the task is huge, you can imagine the number of clicks on a popular website. But for long-term tasks, such as a Telnet connection request, the advantages of thread pooling are not obvious. Because Telnet session events take more time than thread creation

2. Performance-demanding applications, such as requiring servers to respond quickly to customer requests

3. Accept a large number of sudden requests, but not to make the server produce a large number of threads of application. A large number of sudden customer requests will generate a large number of threads without thread pools. Although theoretically the maximum number of threads in most operating systems is not a problem, generating a large number of threads in a short time may limit memory and cause errors.

Simulated Implementation of Thread Pool

    #include<iostream>                                                                              
    #include<queue>
    #include<pthread.h>
    #include<unistd.h>
    
    using namespace std;
    #define MAX_THREAD 5
    
    typedef bool (*handler_t)(int);
    class ThreadTask
    {
      private:
        int _data;
        handler_t _handler;
      public:
        ThreadTask()
          :_data(-1)
          ,_handler(NULL)
        {}
        ThreadTask(int data, handler_t handler)
        {
          _data = data;
          _handler = handler;
        }
    
        void run()
        {
          _handler(_data);
        }
    };
   
    class ThreadPool
    {
      private:
        int _thread_max;
        int _thread_cur;
        bool _tp_quit;
        queue<ThreadTask *> _task_queue;
        pthread_mutex_t _lock;
        pthread_cond_t _cond;
      private:
        void LockQueue(){
          pthread_mutex_lock(&_lock);
        }
        void UnLockQueue(){
          pthread_mutex_unlock(&_lock);
        }
        void WakeUpOne(){
          pthread_cond_signal(&_cond);
        }
        void WakeUpAll(){
          pthread_cond_broadcast(&_cond);
        }
        void ThreadQuit(){
          _thread_cur--;
          UnLockQueue();
          pthread_exit(NULL);
        }
    
        void ThreadWait(){
          if(_tp_quit){
            ThreadQuit();
          }
          pthread_cond_wait(&_cond, &_lock);
        }
        bool IsEmpty(){
          return _task_queue.empty();
        }
    
        static void *thr_start(void *arg)
        {
          ThreadPool *tp = (ThreadPool*)arg;
          while(1)                                                                                  
          {
            tp->LockQueue();
            while(tp->IsEmpty())
            {
              tp->ThreadWait();
            }
            ThreadTask *tt;
            tp->PopTask(&tt);
            tp->UnLockQueue();
            tt->run();
            delete tt;
          }
          return NULL;
        }
    
      public:
        ThreadPool(int max = MAX_THREAD)
          :_thread_max(max)
          ,_thread_cur(max)
          ,_tp_quit(false)
      {
        pthread_mutex_init(&_lock,NULL);
        pthread_cond_init(&_cond,NULL);
      }
        ~ThreadPool(){
          pthread_mutex_destroy(&_lock);
         pthread_cond_destroy(&_cond);
       }
   
       bool PoolInit()
       {
         pthread_t tid;
         for(int i = 0; i < _thread_max; i++)
         {
           int ret = pthread_create(&tid, NULL, thr_start, this);
           if(ret != 0)
           {
             cout << "create pool thread error" << endl;                                           
             return false;
           }
         }
         return true;
       }
   
       bool PushTask(ThreadTask *tt)
      {
         LockQueue();
         if(_tp_quit)                                                                              
         {
           UnLockQueue();
           return false;
         }
         _task_queue.push(tt);
         WakeUpOne();
         UnLockQueue();
         return true;
       }
   
       bool PopTask(ThreadTask **tt)
       {
         *tt = _task_queue.front();
         _task_queue.pop();
         return true;
       }
  
       bool PoolQuit()
       {
         LockQueue();
         _tp_quit = true;
         UnLockQueue();
         while(_thread_cur > 0)
         {
           WakeUpAll();
           usleep(1000);
         }
         return true;
       }
   };
   
   
   bool handler(int data)
   {
     srand(time(NULL));
     int n = rand()%5;
     printf("Thread: %p Run Task: %d--sleep %d sec\n", pthread_self(), data, n);
     sleep(n);
     return true;
   }
   
   int main()
   {
     int i;
     ThreadPool pool;
     pool.PoolInit();
     for(i = 0; i < 10; i++){
       ThreadTask *tt = new ThreadTask(i, handler);
       pool.PushTask(tt);
     }
   
     pool.PoolQuit();
     return 0;
   }
                                      

 

Keywords: network Web Server Session

Added by 9902468 on Sat, 17 Aug 2019 11:12:26 +0300