1, Functions of thread pool
Reduce memory resource consumption for thread creation and destruction.
2, Thread pool infrastructure
- Message queue
- Task execution queue
- Management components
2.1 simple data structure
2.1.1 message queue
The queue structure is realized by using a two-way linked list. The node message includes the executed message function and function parameter data.
1 struct nTask { 2 void (*task_func)(struct nTask *task); 3 void *user_data; 4 5 struct nTask *prev; 6 struct nTask *next; 7 };
2.1.2 task execution queue
The two-way linked list is used to realize the queue structure. The node information includes thread id, thread survival status and management component.
struct nWorker { pthread_t threadid; int terminate; struct nManager *manager; struct nWorker *prev; struct nWorker *next; };
2.1.3 management components
The management component manages message queues and task execution queues. At the same time, locks and condition variables are used to protect shared data.
typedef struct nManager { struct nTask *tasks; struct nWorker *workers; pthread_mutex_t mutex; pthread_cond_t cond; } ThreadPool;
2.2 implementation of underlying data structure interface
2.2.1 inserting nodes into queue headers
#define LIST_INSERT(item, list) do { \ item->prev = NULL; \ item->next = list; \ if ((list) != NULL) (list)->prev = item; \ (list) = item; \ } while(0)
2.2.2 delete the specified node in the queue
#define LIST_REMOVE(item, list) do { \ if (item->prev != NULL) item->prev->next = item->next; \ if (item->next != NULL) item->next->prev = item->prev; \ if (list == item) list = item->next; \ item->prev = item->next = NULL; \ } while(0)
2.3 thread task definition
The thread pool thread executes the same thread callback function. In the callback function, it obtains and executes the messages to be processed in the message queue.
// callback != task static void *nThreadPoolCallback(void *arg) { struct nWorker *worker = (struct nWorker*)arg; while (1) { pthread_mutex_lock(&worker->manager->mutex); while (worker->manager->tasks == NULL) { if (worker->terminate) break; pthread_cond_wait(&worker->manager->cond, &worker->manager->mutex); } if (worker->terminate) { pthread_mutex_unlock(&worker->manager->mutex); break; } struct nTask *task = worker->manager->tasks; LIST_REMOVE(task, worker->manager->tasks); pthread_mutex_unlock(&worker->manager->mutex); task->task_func(task); } free(worker); }
Operations on the shared data message queue should be performed in the lock. Perform tasks outside the lock to make the lock as small as possible.
3, Business interface implementation
3.1 thread pool creation function
The function parameters are the thread pool object pointer and the maximum number of threads.
The initialization of mutex and condition variables can be divided into static initialization and dynamic initialization. Here, the mutex and condition variables of the thread pool are initialized with the statically initialized mutex and condition variables and memset.
When applying for memory, you should judge whether the memory application is successful.
// API int nThreadPoolCreate(ThreadPool *pool, int numWorkers) { if (pool == NULL) return -1; if (numWorkers < 1) numWorkers = 1; memset(pool, 0, sizeof(ThreadPool)); pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t)); //pthread_mutex_init(&pool->mutex, NULL); pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t)); int i = 0; for (i = 0;i < numWorkers;i ++) { struct nWorker *worker = (struct nWorker*)malloc(sizeof(struct nWorker)); if (worker == NULL) { perror("malloc"); return -2; } memset(worker, 0, sizeof(struct nWorker)); worker->manager = pool; // int ret = pthread_create(&worker->threadid, NULL, nThreadPoolCallback, worker); if (ret) { perror("pthread_create"); free(worker); return -3; } LIST_INSERT(worker, pool->workers); } // success return 0; }
3.2 thread pool destruction function
The main work of thread pool destruction is to set the thread survival status to destroy, and broadcast the condition variable to notify the thread callback function to destroy the thread. Condition variables should be used with locks.
// API int nThreadPoolDestory(ThreadPool *pool, int nWorker) { struct nWorker *worker = NULL; for (worker = pool->workers;worker != NULL;worker = worker->next) { worker->terminate; } pthread_mutex_lock(&pool->mutex); pthread_cond_broadcast(&pool->cond); pthread_mutex_unlock(&pool->mutex); pool->workers = NULL; pool->tasks = NULL; return 0; }
3.3 message adding function
This function operates on the shared data message queue, so a lock operation is required. pthread_ cond_ The signal () function wakes up a thread waiting for a condition variable at random.
// API int nThreadPoolPushTask(ThreadPool *pool, struct nTask *task) { pthread_mutex_lock(&pool->mutex); LIST_INSERT(task, pool->tasks); pthread_cond_signal(&pool->cond); pthread_mutex_unlock(&pool->mutex); }
4, Test sample
#if 1 #define THREADPOOL_INIT_COUNT 20 #define TASK_INIT_SIZE 1000 void task_entry(struct nTask *task) { //type //struct nTask *task = (struct nTask*)task; int idx = *(int *)task->user_data; printf("idx: %d\n", idx); free(task->user_data); free(task); } int main(void) { ThreadPool pool = {0}; nThreadPoolCreate(&pool, THREADPOOL_INIT_COUNT); // pool --> memset(); int i = 0; for (i = 0;i < TASK_INIT_SIZE;i ++) { struct nTask *task = (struct nTask *)malloc(sizeof(struct nTask)); if (task == NULL) { perror("malloc"); exit(1); } memset(task, 0, sizeof(struct nTask)); task->task_func = task_entry; task->user_data = malloc(sizeof(int)); *(int*)task->user_data = i; nThreadPoolPushTask(&pool, task); } getchar(); } #endif