On thread pool and its implementation

On thread pool:

Thread pool is a multi-threaded process for a task that can be executed in parallel.
A task that can be executed in parallel is that a task can be divided into many parts, each part can be processed at the same time, and the whole task will be processed after all parts are processed. (equivalent to producing a toy, many parts can be disassembled, each part is not necessarily produced in order, but as long as the production is completed, the toy is completed)
Multiple threads will be started in the thread pool. Each thread can perform independent work, and can or can not contact each other.
In threads, tasks cooperate with each other through mutual exclusion and synchronization of threads

Thread pool internal components

//Maximum number of waiting tasks
#define MAX_WAITING_TASKS 2000
//Maximum number of active threads
#define MAX_ACTIVE_THREADS 20

//Task linked list node
typedef struct task{
    void *(*task_func)(void *arg);//Task processing function
    void *arg;//Parameters of task processing function

    struct task *next;//Single linked list
}task_t;

//Thread pool
typedef struct thread_pool{
    pthread_mutex_t lock;//mutex
    pthread_cond_t cond;//Conditional variable

    task_t *task_list;//Task list

    int shutdown;//End sign

    unsigned int max_waiting_tasks;//Maximum number of waiting tasks
    unsigned int current_waiting_tasks;//Number of tasks currently waiting
    unsigned int max_active_threads;//Maximum number of active threads
    unsigned int current_active_threads;//Number of currently active threads

    pthread_t id[MAX_ACTIVE_THREADS];
}thread_pool_t;

Thread function and thread exit processing function

//Thread exit handler
void func(void *arg)
{
    printf("thread%lu Exit processing!\n",pthread_self());
    //Unlock
    pthread_mutex_unlock((pthread_mutex_t *)arg);
}

//Thread function
void *start_routine(void *arg)
{
    task_t *p = NULL;

    thread_pool_t *thread_pool = (thread_pool_t *)arg;

    pthread_cleanup_push(func,&thread_pool->lock);

    while(1){
        //Lock up
        pthread_mutex_lock(&thread_pool->lock);
        //If there is no task to execute, there is no exit sign, and sleep is waiting to wake up
        if(thread_pool->current_waiting_tasks==0&&thread_pool->shutdown==0){
            pthread_cond_wait(&thread_pool->cond,&thread_pool->lock);
        }

        //No task, but exit flag
        if(thread_pool->current_waiting_tasks==0&&thread_pool->shutdown!=0){
            pthread_exit(NULL);
        }

        //With tasks, perform tasks
        //Exit not allowed
        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

        p = thread_pool->task_list->next;//Take a task from the list
        if(!p){
            printf("task_list error!\n");
            pthread_mutex_unlock(&thread_pool->lock);
            continue;
        }
        //Delete task from linked list
        thread_pool->task_list->next = p->next;
        //Number of tasks currently waiting - 1
        thread_pool->current_waiting_tasks--;

        //Unlock
        pthread_mutex_unlock(&thread_pool->lock);
        //Execute task processing function
        p->task_func(p->arg);
        //Release p
        free(p);

        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
    }

    pthread_exit(NULL);
    pthread_cleanup_pop(0);
}

Thread pool related external interfaces

  • Initialize thread pool
    Parameters:
    Thread pool: thread pool
    Thread? Num: number of threads in the thread pool
    Return value:
    Success returned 0, failure returned - 1
int init_thread_pool(thread_pool_t *thread_pool,int threads_num)
{
    if(threads_num>MAX_ACTIVE_THREADS){
        printf("too many threads_num!\n");
        return -1;
    }

    //Initializing mutexes and conditional variables
    pthread_mutex_init(&thread_pool->lock,NULL);
    pthread_cond_init(&thread_pool->cond,NULL);

    //Initialize task list
    thread_pool->task_list = (task_t *)malloc(sizeof(task_t));
    if(!thread_pool->task_list){
        perror("malloc");
        return -1;
    }

    thread_pool->task_list->next = NULL;

    //Initialization end flag
    thread_pool->shutdown = 0;//Not end

    //Maximum number of waiting tasks
    thread_pool->max_waiting_tasks = MAX_WAITING_TASKS;
    //Number of tasks currently waiting
    thread_pool->current_waiting_tasks = 0;
    //Maximum number of active threads
    thread_pool->max_active_threads = MAX_ACTIVE_THREADS;
    //Number of currently active threads
    thread_pool->current_active_threads = threads_num;

    //Create thread
    int i = 0;
    for(i=0;i<threads_num;i++){
        pthread_create(&thread_pool->id[i],NULL,start_routine,(void *)thread_pool);
        printf("%lu Thread start!\n",thread_pool->id[i]);
    }
   
    return 0;
}
  • Add tasks
    Parameters:
    Thread pool: thread pool
    Task func (void ARG): thread function
    arg: parameter of the incoming thread function
    Return value:
    Success returned 0, failure returned - 1
int add_task(thread_pool_t *thread_pool,void *(*task_func)(void *arg),void *arg)
{
    if(thread_pool->current_waiting_tasks>=thread_pool->max_waiting_tasks){
        printf("too many tasks!\n");
        return -1;
    }

    //Structural node
    task_t *new_task = (task_t *)malloc(sizeof(task_t));
    if(!new_task){
        perror("malloc");
        return -1;
    }

    //Initialize node
    new_task->task_func = task_func;
    new_task->arg = arg;

    //Insert node into linked list
    //Lock up
    pthread_mutex_lock(&thread_pool->lock);
    //Insert the end of the list
    task_t *tail = thread_pool->task_list;
    while(tail->next)
        tail = tail->next;

    tail->next = new_task;
    new_task->next = NULL;
    //Number of waiting tasks + 1
    thread_pool->current_waiting_tasks++;
    //Unlock
    pthread_mutex_unlock(&thread_pool->lock);

    //Wake up a waiting thread
    pthread_cond_signal(&thread_pool->cond);

    return 0;
}
  • Add thread
    parameter
    Thread pool: thread pool
    num: number of threads to add
    Return value:
    Returns the current number of threads after adding threads successfully
    Failed return 0
int add_thread(thread_pool_t *thread_pool,int num)
{
    //Number of bus threads added
    unsigned total_treads = 0;

    //If num is illegal or the maximum number of threads has been reached, return directly
    if(num<=0||thread_pool->current_active_threads>=thread_pool->max_active_threads)
        return 0;

    if(thread_pool->current_active_threads+num>thread_pool->max_active_threads){
        total_treads = thread_pool->max_active_threads;
    }
    else{
        total_treads = thread_pool->current_active_threads+num;
    }

    //Create thread
    int i = 0,inc_threads = 0;
    for(i=thread_pool->current_active_threads;i<total_treads;i++){
        pthread_create(&thread_pool->id[i],NULL,start_routine,(void *)thread_pool);
        //Increase number of threads + 1
        inc_threads++;
        printf("thread%lu Create success!\n",thread_pool->id[i]);
    }

    thread_pool->current_active_threads += inc_threads;
    //Returns the number of threads added

    return inc_threads;
}
  • Delete thread
    Parameters:
    Thread pool: thread pool
    num: number of threads to delete
    Return value:
    Successfully returned the current number of threads after thread deletion
    Failed return 0
int del_thread(thread_pool_t *thread_pool,int num)
{
    //Number of bus processes after thread deletion
    unsigned total_treads = 0;

    //If num is illegal or there is only one thread, return directly
    if(num<=0||thread_pool->current_active_threads==1)
        return 0;

    //Keep at least one thread
    if(thread_pool->current_active_threads-num<1){
        total_treads = 1;
    }
    else{
        total_treads = thread_pool->current_active_threads-num;
    }

    //Delete the last specified number of threads in the array
    int i = 0;
    for(i=thread_pool->current_active_threads-1;i>total_treads-1;i--){
        pthread_cancel(thread_pool->id[i]);
        printf("thread%lu Cancelled!\n",thread_pool->id[i]);
    }

    //Update current active threads
    return thread_pool->current_active_threads = total_treads;
}
  • Destroy thread pool
    Parameters:
    Thread pool: thread pool
int destroy_thread_pool(thread_pool_t *thread_pool)
{
    //Set end flag
    thread_pool->shutdown = 1;
    //Wake up all sleeping threads
    pthread_cond_broadcast(&thread_pool->cond);

    //Wait for all threads to end and recycle the resources of the end thread
    int i;
    for(i=0;i<thread_pool->current_active_threads;i++){
        pthread_join(thread_pool->id[i],NULL);
        printf("%lu Thread end!\n",thread_pool->id[i]);
    }

    //Destroy linked list
    free(thread_pool->task_list);
    thread_pool->task_list = NULL;

    return 0;
}
Published 17 original articles, won praise 4, visited 288
Private letter follow

Added by trillion on Fri, 17 Jan 2020 12:21:19 +0200