Implementation of thread pool based on POISX thread

1, Functions of thread pool

Reduce memory resource consumption for thread creation and destruction.

2, Thread pool infrastructure

  • Message queue
  • Task execution queue
  • Management components

2.1 simple data structure

2.1.1 message queue

The queue structure is realized by using a two-way linked list. The node message includes the executed message function and function parameter data.

1 struct nTask {
2     void (*task_func)(struct nTask *task);
3     void *user_data;
4 
5     struct nTask *prev;
6     struct nTask *next;
7 };
2.1.2 task execution queue

The two-way linked list is used to realize the queue structure. The node information includes thread id, thread survival status and management component.

struct nWorker {
    pthread_t threadid;
    int terminate;
    struct nManager *manager;

    struct nWorker *prev;
    struct nWorker *next;
};
2.1.3 management components

The management component manages message queues and task execution queues. At the same time, locks and condition variables are used to protect shared data.

typedef struct nManager {
    struct nTask *tasks;
    struct nWorker *workers;

    pthread_mutex_t mutex;
    pthread_cond_t cond;
} ThreadPool;

2.2 implementation of underlying data structure interface

2.2.1 inserting nodes into queue headers
#define LIST_INSERT(item, list) do {    \
    item->prev = NULL;                    \
    item->next = list;                    \
    if ((list) != NULL) (list)->prev = item; \
    (list) = item;                        \
} while(0)
2.2.2 delete the specified node in the queue
#define LIST_REMOVE(item, list) do {    \
    if (item->prev != NULL) item->prev->next = item->next; \
    if (item->next != NULL) item->next->prev = item->prev; \
    if (list == item) list = item->next;                     \
    item->prev = item->next = NULL;    \
} while(0)

2.3 thread task definition

The thread pool thread executes the same thread callback function. In the callback function, it obtains and executes the messages to be processed in the message queue.

// callback != task
static void *nThreadPoolCallback(void *arg) {

    struct nWorker *worker = (struct nWorker*)arg;

    while (1) {

        pthread_mutex_lock(&worker->manager->mutex);
        while (worker->manager->tasks == NULL) {
            if (worker->terminate) break;
            pthread_cond_wait(&worker->manager->cond, &worker->manager->mutex);
        }
        if (worker->terminate) {
            pthread_mutex_unlock(&worker->manager->mutex);
            break;
        }

        struct nTask *task = worker->manager->tasks;
        LIST_REMOVE(task, worker->manager->tasks);

        pthread_mutex_unlock(&worker->manager->mutex);

        task->task_func(task);
    }

    free(worker);
    
}

Operations on the shared data message queue should be performed in the lock. Perform tasks outside the lock to make the lock as small as possible.

3, Business interface implementation

3.1 thread pool creation function

The function parameters are the thread pool object pointer and the maximum number of threads.

The initialization of mutex and condition variables can be divided into static initialization and dynamic initialization. Here, the mutex and condition variables of the thread pool are initialized with the statically initialized mutex and condition variables and memset.

When applying for memory, you should judge whether the memory application is successful.

// API
int nThreadPoolCreate(ThreadPool *pool, int numWorkers) {

    if (pool == NULL) return -1;
    if (numWorkers < 1) numWorkers = 1;
    memset(pool, 0, sizeof(ThreadPool));

    pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
    memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));

    //pthread_mutex_init(&pool->mutex, NULL);
    pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
    memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t));
    

    int i = 0;
    for (i = 0;i < numWorkers;i ++) {
        struct nWorker *worker = (struct nWorker*)malloc(sizeof(struct nWorker));
        if (worker == NULL) {
            perror("malloc");
            return -2;
        }
        memset(worker, 0, sizeof(struct nWorker));
        worker->manager = pool; //

        int ret = pthread_create(&worker->threadid, NULL, nThreadPoolCallback, worker);
        if (ret) {
            perror("pthread_create");
            free(worker);
            return -3;
        }
        
        LIST_INSERT(worker, pool->workers);
    }

    // success
    return 0; 

}

3.2 thread pool destruction function

The main work of thread pool destruction is to set the thread survival status to destroy, and broadcast the condition variable to notify the thread callback function to destroy the thread. Condition variables should be used with locks.

// API
int nThreadPoolDestory(ThreadPool *pool, int nWorker) {

    struct nWorker *worker = NULL;

    for (worker = pool->workers;worker != NULL;worker = worker->next) {
        worker->terminate;
    }

    pthread_mutex_lock(&pool->mutex);

    pthread_cond_broadcast(&pool->cond);

    pthread_mutex_unlock(&pool->mutex);

    pool->workers = NULL;
    pool->tasks = NULL;

    return 0;
    
}

3.3 message adding function

This function operates on the shared data message queue, so a lock operation is required. pthread_ cond_ The signal () function wakes up a thread waiting for a condition variable at random.

// API
int nThreadPoolPushTask(ThreadPool *pool, struct nTask *task) {

    pthread_mutex_lock(&pool->mutex);

    LIST_INSERT(task, pool->tasks);

    pthread_cond_signal(&pool->cond);

    pthread_mutex_unlock(&pool->mutex);

}

4, Test sample

 

#if 1

#define THREADPOOL_INIT_COUNT    20
#define TASK_INIT_SIZE            1000


void task_entry(struct nTask *task) { //type 

    //struct nTask *task = (struct nTask*)task;
    int idx = *(int *)task->user_data;

    printf("idx: %d\n", idx);

    free(task->user_data);
    free(task);
}


int main(void) {

    ThreadPool pool = {0};
    
    nThreadPoolCreate(&pool, THREADPOOL_INIT_COUNT);
    // pool --> memset();
    
    int i = 0;
    for (i = 0;i < TASK_INIT_SIZE;i ++) {
        struct nTask *task = (struct nTask *)malloc(sizeof(struct nTask));
        if (task == NULL) {
            perror("malloc");
            exit(1);
        }
        memset(task, 0, sizeof(struct nTask));

        task->task_func = task_entry;
        task->user_data = malloc(sizeof(int));
        *(int*)task->user_data  = i;

        
        nThreadPoolPushTask(&pool, task);
    }

    getchar();
    
}


#endif

 

Added by Tezread on Tue, 04 Jan 2022 14:07:47 +0200