On thread pool:
Thread pool is a multi-threaded process for a task that can be executed in parallel.
A task that can be executed in parallel is that a task can be divided into many parts, each part can be processed at the same time, and the whole task will be processed after all parts are processed. (equivalent to producing a toy, many parts can be disassembled, each part is not necessarily produced in order, but as long as the production is completed, the toy is completed)
Multiple threads will be started in the thread pool. Each thread can perform independent work, and can or can not contact each other.
In threads, tasks cooperate with each other through mutual exclusion and synchronization of threads
Thread pool internal components
//Maximum number of waiting tasks #define MAX_WAITING_TASKS 2000 //Maximum number of active threads #define MAX_ACTIVE_THREADS 20 //Task linked list node typedef struct task{ void *(*task_func)(void *arg);//Task processing function void *arg;//Parameters of task processing function struct task *next;//Single linked list }task_t; //Thread pool typedef struct thread_pool{ pthread_mutex_t lock;//mutex pthread_cond_t cond;//Conditional variable task_t *task_list;//Task list int shutdown;//End sign unsigned int max_waiting_tasks;//Maximum number of waiting tasks unsigned int current_waiting_tasks;//Number of tasks currently waiting unsigned int max_active_threads;//Maximum number of active threads unsigned int current_active_threads;//Number of currently active threads pthread_t id[MAX_ACTIVE_THREADS]; }thread_pool_t;
Thread function and thread exit processing function
//Thread exit handler void func(void *arg) { printf("thread%lu Exit processing!\n",pthread_self()); //Unlock pthread_mutex_unlock((pthread_mutex_t *)arg); } //Thread function void *start_routine(void *arg) { task_t *p = NULL; thread_pool_t *thread_pool = (thread_pool_t *)arg; pthread_cleanup_push(func,&thread_pool->lock); while(1){ //Lock up pthread_mutex_lock(&thread_pool->lock); //If there is no task to execute, there is no exit sign, and sleep is waiting to wake up if(thread_pool->current_waiting_tasks==0&&thread_pool->shutdown==0){ pthread_cond_wait(&thread_pool->cond,&thread_pool->lock); } //No task, but exit flag if(thread_pool->current_waiting_tasks==0&&thread_pool->shutdown!=0){ pthread_exit(NULL); } //With tasks, perform tasks //Exit not allowed pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL); p = thread_pool->task_list->next;//Take a task from the list if(!p){ printf("task_list error!\n"); pthread_mutex_unlock(&thread_pool->lock); continue; } //Delete task from linked list thread_pool->task_list->next = p->next; //Number of tasks currently waiting - 1 thread_pool->current_waiting_tasks--; //Unlock pthread_mutex_unlock(&thread_pool->lock); //Execute task processing function p->task_func(p->arg); //Release p free(p); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL); } pthread_exit(NULL); pthread_cleanup_pop(0); }
Thread pool related external interfaces
-
Initialize thread pool
Parameters:
Thread pool: thread pool
Thread? Num: number of threads in the thread pool
Return value:
Success returned 0, failure returned - 1
int init_thread_pool(thread_pool_t *thread_pool,int threads_num) { if(threads_num>MAX_ACTIVE_THREADS){ printf("too many threads_num!\n"); return -1; } //Initializing mutexes and conditional variables pthread_mutex_init(&thread_pool->lock,NULL); pthread_cond_init(&thread_pool->cond,NULL); //Initialize task list thread_pool->task_list = (task_t *)malloc(sizeof(task_t)); if(!thread_pool->task_list){ perror("malloc"); return -1; } thread_pool->task_list->next = NULL; //Initialization end flag thread_pool->shutdown = 0;//Not end //Maximum number of waiting tasks thread_pool->max_waiting_tasks = MAX_WAITING_TASKS; //Number of tasks currently waiting thread_pool->current_waiting_tasks = 0; //Maximum number of active threads thread_pool->max_active_threads = MAX_ACTIVE_THREADS; //Number of currently active threads thread_pool->current_active_threads = threads_num; //Create thread int i = 0; for(i=0;i<threads_num;i++){ pthread_create(&thread_pool->id[i],NULL,start_routine,(void *)thread_pool); printf("%lu Thread start!\n",thread_pool->id[i]); } return 0; }
-
Add tasks
Parameters:
Thread pool: thread pool
Task func (void ARG): thread function
arg: parameter of the incoming thread function
Return value:
Success returned 0, failure returned - 1
int add_task(thread_pool_t *thread_pool,void *(*task_func)(void *arg),void *arg) { if(thread_pool->current_waiting_tasks>=thread_pool->max_waiting_tasks){ printf("too many tasks!\n"); return -1; } //Structural node task_t *new_task = (task_t *)malloc(sizeof(task_t)); if(!new_task){ perror("malloc"); return -1; } //Initialize node new_task->task_func = task_func; new_task->arg = arg; //Insert node into linked list //Lock up pthread_mutex_lock(&thread_pool->lock); //Insert the end of the list task_t *tail = thread_pool->task_list; while(tail->next) tail = tail->next; tail->next = new_task; new_task->next = NULL; //Number of waiting tasks + 1 thread_pool->current_waiting_tasks++; //Unlock pthread_mutex_unlock(&thread_pool->lock); //Wake up a waiting thread pthread_cond_signal(&thread_pool->cond); return 0; }
-
Add thread
parameter
Thread pool: thread pool
num: number of threads to add
Return value:
Returns the current number of threads after adding threads successfully
Failed return 0
int add_thread(thread_pool_t *thread_pool,int num) { //Number of bus threads added unsigned total_treads = 0; //If num is illegal or the maximum number of threads has been reached, return directly if(num<=0||thread_pool->current_active_threads>=thread_pool->max_active_threads) return 0; if(thread_pool->current_active_threads+num>thread_pool->max_active_threads){ total_treads = thread_pool->max_active_threads; } else{ total_treads = thread_pool->current_active_threads+num; } //Create thread int i = 0,inc_threads = 0; for(i=thread_pool->current_active_threads;i<total_treads;i++){ pthread_create(&thread_pool->id[i],NULL,start_routine,(void *)thread_pool); //Increase number of threads + 1 inc_threads++; printf("thread%lu Create success!\n",thread_pool->id[i]); } thread_pool->current_active_threads += inc_threads; //Returns the number of threads added return inc_threads; }
-
Delete thread
Parameters:
Thread pool: thread pool
num: number of threads to delete
Return value:
Successfully returned the current number of threads after thread deletion
Failed return 0
int del_thread(thread_pool_t *thread_pool,int num) { //Number of bus processes after thread deletion unsigned total_treads = 0; //If num is illegal or there is only one thread, return directly if(num<=0||thread_pool->current_active_threads==1) return 0; //Keep at least one thread if(thread_pool->current_active_threads-num<1){ total_treads = 1; } else{ total_treads = thread_pool->current_active_threads-num; } //Delete the last specified number of threads in the array int i = 0; for(i=thread_pool->current_active_threads-1;i>total_treads-1;i--){ pthread_cancel(thread_pool->id[i]); printf("thread%lu Cancelled!\n",thread_pool->id[i]); } //Update current active threads return thread_pool->current_active_threads = total_treads; }
-
Destroy thread pool
Parameters:
Thread pool: thread pool
int destroy_thread_pool(thread_pool_t *thread_pool) { //Set end flag thread_pool->shutdown = 1; //Wake up all sleeping threads pthread_cond_broadcast(&thread_pool->cond); //Wait for all threads to end and recycle the resources of the end thread int i; for(i=0;i<thread_pool->current_active_threads;i++){ pthread_join(thread_pool->id[i],NULL); printf("%lu Thread end!\n",thread_pool->id[i]); } //Destroy linked list free(thread_pool->task_list); thread_pool->task_list = NULL; return 0; }