[Niuke C + + server project learning] Day15 - Preparation of thread pool class

Project learning address: [ niuke.com C + + server project learning ]

It took three hours to write the thread pool class. It was actually tested and can run normally. The performance test has not been done yet. I don't know how fast it can be compared with the traditional [immediate creation and immediate destruction] mechanism.

I still stepped on some holes in writing this program. Let me summarize the problems I encountered in writing the program:

  • It's easy to write using template classes. At first, I didn't want to use the template class to develop the thread pool class. I thought I couldn't use it. However, problems were encountered soon:
    • Adding a task function to the thread pool usually needs to be encapsulated with a structure. Because the function is used as a parameter, the expression is too long, which is a little inconvenient. However, encapsulating functions in structures also brings new problems: member functions in structures cannot be modified. Therefore, it is more convenient to use template programming.
void * func(void *arg) // This is a task function
// Encapsulated in structure
struct Job
{
	void * func(void *arg);
};
// Member functions in a structure cannot be modified
Job *newjob = new Job();
newjob->func = anotherfunc; // The compiler will report an error: functions can only be called and cannot be modified
  • There is no need to encapsulate the [synchronous thread] class: This is another problem I encountered when writing programs. [niuke.com] the video encapsulates three thread synchronization mechanisms, but, but, but. My favorite semaphore. I want to initialize the constructor with parameters in the class, and the compiler will report an error!. Finally, I give up using the written Locker class and call the semaphore function directly.
  • Change the thread synchronization mechanism: in the [niuke.com] video, he used a mutual exclusion lock to ensure that only one thread can access shared resources at a time. He also used a semaphore to represent the number of resources in the task job queue. But this is not a very standard thread synchronization mechanism. The correct posture is to use semaphores to create three variables: full, empty and mutex, which respectively represent available resources, free slots and mutexes. And the locking and unlocking methods are also fixed. I have implemented them in my V3 version of code.
    • What are the benefits of the new thread synchronization mechanism:
      • First, standardize the marking, and there will be no unexpected situation;
      • Second: most importantly, please think about the append function in the [original video]. If the task queue is full, it does not block the append function, but returns directly. If the corresponding processing is not performed according to the return value of append when calling the append function, the current task will be lost.
  • Pay attention to various addresses. This program has some variables in the heap area and many variables in the stack area. Also note whether it should be an incoming address or a value.

talk is cheap, show me the code:

// Encapsulate a thread pool class
#ifndef __THREADPOOLV3_H__
#define __THREADPOOLV3_H__

// Import header file
#include <queue>
#include <stdio.h>
#include <semaphore.h>
// In class declaration, out of class definition
template <typename T>
class ThreadPool
{
public:
    // Constructor
    ThreadPool(unsigned int t_nums, unsigned int j_nums);

    // Destructor
    ~ThreadPool();

    // Member function: add a task event to the thread pool
    bool append(T *newjob);

private:
    // Number of core threads
    unsigned int m_thread_num;
    // Maximum number of threads
    unsigned int m_thread_max;
    // Task queue length
    unsigned int m_queue_len;
    // Work queue (where threads are stored)
    pthread_t *m_work_threads; // Implementation with dynamic array
    // Task queue (to store the task function to be executed by the thread)
    std::queue<T *> m_job_queue;
    // queue, list and array can be used
    // Callback function of thread
    static void *worker(void *arg);
    void m_run();          // Wrapper function of worker
    void m_add(T *newjob); // Wrapper function for append
    // Thread synchronization mechanism
    sem_t sem_job_src;   // Number of task queue resources
    sem_t sem_job_empty; //Number of idle locations of task queue
    sem_t sem_mutex;     // Mutex semaphore
    // Set a flag bit to indicate whether the entire thread pool is in progress
    bool isRun;
};

/********************** The following are all definitions of classes**********************/
// Constructor
template <typename T>
ThreadPool<T>::ThreadPool(unsigned int t_nums, unsigned int j_nums)
{
    // Initialize member variables such as the number of threads
    // Judge the legitimacy of input data
    if (t_nums <= 0 || j_nums <= 0)
    {
        printf("Parameter input error\n");
        throw std::exception();
    }
    m_thread_num = t_nums;
    m_queue_len = j_nums;
    // The initialization semaphore indicates that the semaphore of the job queue resource remains unchanged, and the semaphore of the mutex is initialized to 1
    sem_init(&sem_job_src, 0 , 0);
    sem_init(&sem_job_empty, 0, m_queue_len);
    sem_init(&sem_mutex, 0, 1);
    // Initialize thread pool status
    isRun = true;
    // Apply for heap memory and store the thread number of the child thread
    m_work_threads = new pthread_t[m_thread_num];
    if (!m_work_threads)
    {
        // If it fails to open up heap dynamic memory, an exception is thrown
        isRun = false;
        printf("Heap memory opening failed\n");
        throw std::exception();
    }
    // Create m_thread_num sub thread
    for (int i = 0; i < m_thread_num; ++i)
    {
        int ret;
        ret = pthread_create(m_work_threads + i, NULL, worker, this);
        if (ret != 0)
        {
            // If an exception occurs in the creation thread, terminate the whole program and clear the resources
            delete[] m_work_threads;
            isRun = false;
            printf("Failed to create thread\n");
            throw std::exception();
        }
    }
    // After the thread is created, set thread separation
    for (int i = 0; i < m_thread_num; ++i)
    {
        int ret;
        ret = pthread_detach(m_work_threads[i]);
        if (ret != 0)
        {
            // If an exception occurs in the creation thread, terminate the whole program and clear the resources
            delete[] m_work_threads;
            isRun = false;
            printf("Thread detach failed\n");
            throw std::exception();
        }
    }
}
// Destructor
template <typename T>
ThreadPool<T>::~ThreadPool()
{
    // Destroy pointers, free heap memory, etc
    delete[] m_work_threads;
    isRun = false;
}
// public member function: append
template <typename T>
bool ThreadPool<T>::append(T *newjob)
{
    printf("Add task\n");
    m_add(newjob);
    return true;
}
template <typename T>
void ThreadPool<T>::m_add(T *newjob)
{
    // Add a work event to the memory pool. The event should be added to the task queue
    // In the main thread, data written to the task queue must be locked
    // Upper mutex
    // Use semaphores to determine whether the job queue still has space
    sem_wait(&sem_job_empty); // The work queue is full and blocked here
    sem_wait(&sem_mutex);

    m_job_queue.push(newjob);

    sem_post(&sem_mutex);   // Unlock
    sem_post(&sem_job_src); // Semaphore of job resource plus 1
}
// private member function: m_run
template <typename T>
void *ThreadPool<T>::worker(void *arg)
{
    // Threads in the memory pool, tasks to be executed, and tasks are fetched from the task queue
    // Take a task from the task queue and a thread from the work queue (thread s), and let the thread execute the task (function)
    // What should be the form of the task:
    // Functions
    // In fact, it is still a producer and consumer model in essence
    printf("Perform work tasks\n");
    ThreadPool *pool = (ThreadPool *)arg;
    pool->m_run();
    return NULL;
}
template <typename T>
void ThreadPool<T>::m_run()
{
    while (isRun)
    {
        // Consume a resource. If the task queue has no resources, it will block the waiting (which is equivalent to making the thread sleep)
        sem_wait(&sem_job_src);
        sem_wait(&sem_mutex); // mutex 

        // Take out a task
        T *Newjob = m_job_queue.front();
        m_job_queue.pop();

        // Exit, unlock
        sem_post(&sem_mutex);
        sem_post(&sem_job_empty);

        // After getting the job, execute the specific functions in the job outside the lock
        printf("The thread successfully acquired the task\n");
        Newjob->func();
    }
}
#endif

Keywords: C++ Linux Back-end server

Added by jadebabe on Thu, 06 Jan 2022 18:53:23 +0200