Project learning address: [ niuke.com C + + server project learning ]
It took three hours to write the thread pool class. It was actually tested and can run normally. The performance test has not been done yet. I don't know how fast it can be compared with the traditional [immediate creation and immediate destruction] mechanism.
I still stepped on some holes in writing this program. Let me summarize the problems I encountered in writing the program:
- It's easy to write using template classes. At first, I didn't want to use the template class to develop the thread pool class. I thought I couldn't use it. However, problems were encountered soon:
- Adding a task function to the thread pool usually needs to be encapsulated with a structure. Because the function is used as a parameter, the expression is too long, which is a little inconvenient. However, encapsulating functions in structures also brings new problems: member functions in structures cannot be modified. Therefore, it is more convenient to use template programming.
void * func(void *arg) // This is a task function // Encapsulated in structure struct Job { void * func(void *arg); }; // Member functions in a structure cannot be modified Job *newjob = new Job(); newjob->func = anotherfunc; // The compiler will report an error: functions can only be called and cannot be modified
- There is no need to encapsulate the [synchronous thread] class: This is another problem I encountered when writing programs. [niuke.com] the video encapsulates three thread synchronization mechanisms, but, but, but. My favorite semaphore. I want to initialize the constructor with parameters in the class, and the compiler will report an error!. Finally, I give up using the written Locker class and call the semaphore function directly.
- Change the thread synchronization mechanism: in the [niuke.com] video, he used a mutual exclusion lock to ensure that only one thread can access shared resources at a time. He also used a semaphore to represent the number of resources in the task job queue. But this is not a very standard thread synchronization mechanism. The correct posture is to use semaphores to create three variables: full, empty and mutex, which respectively represent available resources, free slots and mutexes. And the locking and unlocking methods are also fixed. I have implemented them in my V3 version of code.
- What are the benefits of the new thread synchronization mechanism:
- First, standardize the marking, and there will be no unexpected situation;
- Second: most importantly, please think about the append function in the [original video]. If the task queue is full, it does not block the append function, but returns directly. If the corresponding processing is not performed according to the return value of append when calling the append function, the current task will be lost.
- What are the benefits of the new thread synchronization mechanism:
- Pay attention to various addresses. This program has some variables in the heap area and many variables in the stack area. Also note whether it should be an incoming address or a value.
talk is cheap, show me the code:
// Encapsulate a thread pool class #ifndef __THREADPOOLV3_H__ #define __THREADPOOLV3_H__ // Import header file #include <queue> #include <stdio.h> #include <semaphore.h> // In class declaration, out of class definition template <typename T> class ThreadPool { public: // Constructor ThreadPool(unsigned int t_nums, unsigned int j_nums); // Destructor ~ThreadPool(); // Member function: add a task event to the thread pool bool append(T *newjob); private: // Number of core threads unsigned int m_thread_num; // Maximum number of threads unsigned int m_thread_max; // Task queue length unsigned int m_queue_len; // Work queue (where threads are stored) pthread_t *m_work_threads; // Implementation with dynamic array // Task queue (to store the task function to be executed by the thread) std::queue<T *> m_job_queue; // queue, list and array can be used // Callback function of thread static void *worker(void *arg); void m_run(); // Wrapper function of worker void m_add(T *newjob); // Wrapper function for append // Thread synchronization mechanism sem_t sem_job_src; // Number of task queue resources sem_t sem_job_empty; //Number of idle locations of task queue sem_t sem_mutex; // Mutex semaphore // Set a flag bit to indicate whether the entire thread pool is in progress bool isRun; }; /********************** The following are all definitions of classes**********************/ // Constructor template <typename T> ThreadPool<T>::ThreadPool(unsigned int t_nums, unsigned int j_nums) { // Initialize member variables such as the number of threads // Judge the legitimacy of input data if (t_nums <= 0 || j_nums <= 0) { printf("Parameter input error\n"); throw std::exception(); } m_thread_num = t_nums; m_queue_len = j_nums; // The initialization semaphore indicates that the semaphore of the job queue resource remains unchanged, and the semaphore of the mutex is initialized to 1 sem_init(&sem_job_src, 0 , 0); sem_init(&sem_job_empty, 0, m_queue_len); sem_init(&sem_mutex, 0, 1); // Initialize thread pool status isRun = true; // Apply for heap memory and store the thread number of the child thread m_work_threads = new pthread_t[m_thread_num]; if (!m_work_threads) { // If it fails to open up heap dynamic memory, an exception is thrown isRun = false; printf("Heap memory opening failed\n"); throw std::exception(); } // Create m_thread_num sub thread for (int i = 0; i < m_thread_num; ++i) { int ret; ret = pthread_create(m_work_threads + i, NULL, worker, this); if (ret != 0) { // If an exception occurs in the creation thread, terminate the whole program and clear the resources delete[] m_work_threads; isRun = false; printf("Failed to create thread\n"); throw std::exception(); } } // After the thread is created, set thread separation for (int i = 0; i < m_thread_num; ++i) { int ret; ret = pthread_detach(m_work_threads[i]); if (ret != 0) { // If an exception occurs in the creation thread, terminate the whole program and clear the resources delete[] m_work_threads; isRun = false; printf("Thread detach failed\n"); throw std::exception(); } } } // Destructor template <typename T> ThreadPool<T>::~ThreadPool() { // Destroy pointers, free heap memory, etc delete[] m_work_threads; isRun = false; } // public member function: append template <typename T> bool ThreadPool<T>::append(T *newjob) { printf("Add task\n"); m_add(newjob); return true; } template <typename T> void ThreadPool<T>::m_add(T *newjob) { // Add a work event to the memory pool. The event should be added to the task queue // In the main thread, data written to the task queue must be locked // Upper mutex // Use semaphores to determine whether the job queue still has space sem_wait(&sem_job_empty); // The work queue is full and blocked here sem_wait(&sem_mutex); m_job_queue.push(newjob); sem_post(&sem_mutex); // Unlock sem_post(&sem_job_src); // Semaphore of job resource plus 1 } // private member function: m_run template <typename T> void *ThreadPool<T>::worker(void *arg) { // Threads in the memory pool, tasks to be executed, and tasks are fetched from the task queue // Take a task from the task queue and a thread from the work queue (thread s), and let the thread execute the task (function) // What should be the form of the task: // Functions // In fact, it is still a producer and consumer model in essence printf("Perform work tasks\n"); ThreadPool *pool = (ThreadPool *)arg; pool->m_run(); return NULL; } template <typename T> void ThreadPool<T>::m_run() { while (isRun) { // Consume a resource. If the task queue has no resources, it will block the waiting (which is equivalent to making the thread sleep) sem_wait(&sem_job_src); sem_wait(&sem_mutex); // mutex // Take out a task T *Newjob = m_job_queue.front(); m_job_queue.pop(); // Exit, unlock sem_post(&sem_mutex); sem_post(&sem_job_empty); // After getting the job, execute the specific functions in the job outside the lock printf("The thread successfully acquired the task\n"); Newjob->func(); } } #endif