Writing a simple thread pool

Thread pool

Define a simple thread pool. The main structure includes two, one is task and the other is thread pool.
The maintainer thread queue and task array in the task pool. The sub threads in the thread queue will cycle to extract tasks from the task array. The addition of tasks and the consumption of threads are a pair of producers and consumers. When the tasks are full or insufficient, you need to lock them with conditional variables to stop adding tasks and taking out tasks.

The following is the structure

```cpp
typedef struct 
{
    void *(*function)(void *);          /* Function pointer */
    void *arg;                          /* Parameters of the above function */
} threadpool_task_t;                    /* Task structure of each sub thread */

/* Describes information about thread pools */
struct threadpool_t 
{
    pthread_mutex_t lock;               /* It is used to lock the structure */    
    pthread_mutex_t thread_counter;     /* Record the number of busy threads de trivial -- busy_thr_num */

    pthread_cond_t queue_not_full;      /* When the task queue is full, the thread adding the task is blocked and waits for this condition variable */
    pthread_cond_t queue_not_empty;     /* When the task queue is not empty, notify the thread waiting for the task */

    pthread_t *threads;                 /* Store the tid of each thread in the thread pool. array */
    pthread_t adjust_tid;               /* Memory management thread tid */
    threadpool_task_t *task_queue;      /* Task queue (first address of array) */

    int min_thr_num;                    /* Minimum number of threads in thread pool */
    int max_thr_num;                    /* Maximum number of threads in thread pool */
    int live_thr_num;                   /* Current number of surviving threads */
    int busy_thr_num;                   /* Number of busy threads */
    int wait_exit_thr_num;              /* Number of threads to destroy */

    int queue_front;                    /* task_queue Team head subscript */
    int queue_rear;                     /* task_queue Tail subscript */
    int queue_size;                     /* task_queue Actual tasks in the team */
    int queue_max_size;                 /* task_queue Maximum number of tasks the queue can hold */

    int shutdown;                       /* Flag bit, thread pool usage status, true or false */
};


Initialize the value of each variable of thread pool
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);
Initialize the value in the thread pool, the initialization of this initialization, and the development space of this development space
Initialize all locks and condition variables, start the minimum working thread, set the thread separation property, and pass the parameter min_thr_num determines the number of threads to start
Create manager thread adjust_tid, the callback function is adjust, and the parameter is thread pool. Remember to free up space when the code call fails

Add a task to the thread pool
int threadpool_add(threadpool_t pool, void(*function)(void *arg), void *arg);
Lock
Task full blocking wait condition variable
The thread pool is in the closed state. Unlock and exit (pool - > shutdown records whether it is closed or not)
Add task
Unlock
The task has been published and people are called to do the task (the condition variable is used to contact the blocking of the working thread without the task)

Each worker thread in the thread pool
void *threadpool_thread(void *threadpool)
Create thread pool pointer receive parameters
Create task copy
Circular processing task
Lock
Judge the status first: the thread pool is open. If there is no task, the waiting signal will be blocked
Judge whether there are threads to end. If so, first count the number of idle threads –. Check whether the number of threads is greater than the minimum value. If it is greater than the host itself
Judge whether the wire path pool is in the open state without self explosion,
Get the task and get the task out of the team
Notify that new tasks can be added
And immediately unlocked it
Execute the task (note that when executing the task, you need to record the busy thread + 1, and remember to padlock when using)

Management threads use loops and sleep to maintain the number of threads in the thread pool regularly
void *adjust_thread(void *threadpool);
Receive parameters
When the thread pool is in the non closed state, the loop starts, and the closed state explodes
Stop an update time to facilitate the work of other threads
Lock record: number of tasks, number of busy threads, and number of threads in stock
Customize the algorithm to control the addition and deletion of threads and remember to lock (for example, create a new thread algorithm: when the number of tasks is greater than the minimum thread pool and the number of surviving threads is less than the maximum number of threads, destroy the redundant idle thread algorithm: when the number of busy threads X2 is less than the number of surviving threads and the number of surviving threads is greater than the minimum number of threads.)

The specific codes are as follows:

#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include "pthreadpool.h"

#define DEFAULT_ Time 10 / * 10s test once*/
#define MIN_WAIT_TASK_NUM 10 / * if queue_ size > MIN_ WAIT_ TASK_ Num adds a new thread to the thread pool*/ 
#define DEFAULT_ THREAD_ Var 10 / * number of threads created and destroyed each time*/
#define true 1
#define false 0

typedef struct 
{
    void *(*function)(void *);          /* Function pointer */
    void *arg;                          /* Parameters of the above function */
} threadpool_task_t;                    /* Task structure of each sub thread */

/* Describes information about thread pools */
struct threadpool_t 
{
    pthread_mutex_t lock;               /* It is used to lock the structure */    
    pthread_mutex_t thread_counter;     /* Record the number of threads in busy state de trivial -- busy_thr_num */

    pthread_cond_t queue_not_full;      /* When the task queue is full, the thread adding the task is blocked and waits for this condition variable */
    pthread_cond_t queue_not_empty;     /* When the task queue is not empty, notify the thread waiting for the task */

    pthread_t *threads;                 /* Store the tid of each thread in the thread pool. array */
    pthread_t adjust_tid;               /* Memory management thread tid */
    threadpool_task_t *task_queue;      /* Task queue (first address of array) */

    int min_thr_num;                    /* Minimum number of threads in thread pool */
    int max_thr_num;                    /* Maximum number of threads in thread pool */
    int live_thr_num;                   /* Current number of surviving threads */
    int busy_thr_num;                   /* Number of busy threads */
    int wait_exit_thr_num;              /* Number of threads to destroy */

    int queue_front;                    /* task_queue Team head subscript */
    int queue_rear;                     /* task_queue Tail subscript */
    int queue_size;                     /* task_queue Actual tasks in the team */
    int queue_max_size;                 /* task_queue Maximum number of tasks the queue can hold */

    int shutdown;                       /* Flag bit, thread pool usage status, true or false */
};

void *threadpool_thread(void *threadpool);

void *adjust_thread(void *threadpool);

int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t *pool);

//threadpool_create(3,100,100);  
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{
    int i;
    threadpool_t *pool = NULL;
    do 
	{
        if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) 
		{  
            printf("malloc threadpool fail");
            break;                                      /*Jump out of do while*/
        }

        pool->min_thr_num = min_thr_num;
        pool->max_thr_num = max_thr_num;
        pool->busy_thr_num = 0;
        pool->live_thr_num = min_thr_num;               /* Number of threads alive initial value = minimum number of threads */
        pool->wait_exit_thr_num = 0;
        pool->queue_size = 0;                           /* There are 0 products */
        pool->queue_max_size = queue_max_size;
        pool->queue_front = 0;
        pool->queue_rear = 0;
        pool->shutdown = false;                         /* Do not close thread pool */

        /* According to the maximum number of threads, open up space for the worker thread array and clear it */
        pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); 
        if (pool->threads == NULL) 
		{
            printf("malloc threads fail");
            break;
        }
        memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);

        /* Queue open space */
        pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
        if (pool->task_queue == NULL) 
		{
            printf("malloc task_queue fail\n");
            break;
        }

        /* Initialize mutually exclusive and conditional variables */
        if (pthread_mutex_init(&(pool->lock), NULL) != 0
                || pthread_mutex_init(&(pool->thread_counter), NULL) != 0
                || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
                || pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
        {
            printf("init the lock or cond fail\n");
            break;
        }

		//Start worker thread
		pthread_attr_t attr;
		pthread_attr_init(&attr);
		pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
        for (i = 0; i < min_thr_num; i++) 
		{
            pthread_create(&(pool->threads[i]), &attr, threadpool_thread, (void *)pool);/*pool Point to the current thread pool*/
            printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);
        }

		//Create manager thread
        pthread_create(&(pool->adjust_tid), &attr, adjust_thread, (void *)pool);

        return pool;

    } while (0);

	/* When the previous code call fails, free the poll storage space */
    threadpool_free(pool);

    return NULL;
}

/* Add a task to the thread pool */
//threadpool_ add(thp, process, (void*)&num[i]);   /*  Add task process to thread pool: lowercase ----- > uppercase*/

int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{
    pthread_mutex_lock(&(pool->lock));

    /* ==If true, the queue is full. Call wait to block */
    while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) 
	{
        pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
    }

    if (pool->shutdown) 
	{
        pthread_cond_broadcast(&(pool->queue_not_empty));
        pthread_mutex_unlock(&(pool->lock));
        return 0;
    }

    /* Clears the parameter arg of the callback function called by the worker thread */
    if (pool->task_queue[pool->queue_rear].arg != NULL) 
	{
        pool->task_queue[pool->queue_rear].arg = NULL;
    }

    /*Add task to task queue*/
    pool->task_queue[pool->queue_rear].function = function;
    pool->task_queue[pool->queue_rear].arg = arg;
    pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;       /* The pointer at the end of the queue moves to simulate a ring */
    pool->queue_size++;

    /*After adding a task, the queue is not empty. Wake up the threads waiting to process the task in the thread pool*/
    pthread_cond_signal(&(pool->queue_not_empty));
    pthread_mutex_unlock(&(pool->lock));

    return 0;
}

/* Each worker thread in the thread pool */
void *threadpool_thread(void *threadpool)
{
    threadpool_t *pool = (threadpool_t *)threadpool;
    threadpool_task_t task;

    while (true) 
	{
        /* Lock must be taken to wait on conditional variable */
        /*Just after the thread is created, wait for a task in the task queue. Otherwise, block and wait for a task in the task queue before waking up to receive a task*/
        pthread_mutex_lock(&(pool->lock));

        /*queue_size == 0 It indicates that there is no task. Call wait to block the condition variable. If there is a task, skip the while*/
        while ((pool->queue_size == 0) && (!pool->shutdown)) 
		{  
            printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());
            pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));//Pause here

            /*Clear the specified number of idle threads. If the number of threads to end is greater than 0, end the thread*/
            if (pool->wait_exit_thr_num > 0) 
			{
                pool->wait_exit_thr_num--;

                /*If the number of threads in the thread pool is greater than the minimum, the current thread can be terminated*/
                if (pool->live_thr_num > pool->min_thr_num) 
				{
                    printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
                    pool->live_thr_num--;
                    pthread_mutex_unlock(&(pool->lock));
					//pthread_detach(pthread_self());
                    pthread_exit(NULL);
                }
            }
        }

        /*If true is specified, close each thread in the thread pool and exit the processing by itself - destroy the thread pool*/
        if (pool->shutdown) 
		{
            pthread_mutex_unlock(&(pool->lock));
            printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
            //pthread_detach(pthread_self());
            pthread_exit(NULL);     /* The thread ends itself */
        }

        /*Getting a task from a task queue is an out of queue operation*/
        task.function = pool->task_queue[pool->queue_front].function;
        task.arg = pool->task_queue[pool->queue_front].arg;

        pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;       /* Out of line, simulate a ring queue */
        pool->queue_size--;

        /*Notification can add new tasks*/
        pthread_cond_broadcast(&(pool->queue_not_full));

        /*After the task is taken out, the thread pool will be released immediately*/
        pthread_mutex_unlock(&(pool->lock));

        /*Perform tasks*/ 
        printf("thread 0x%x start working\n", (unsigned int)pthread_self());
        pthread_mutex_lock(&(pool->thread_counter));                            /*Busy thread count variable*/
        pool->busy_thr_num++;                                                   /*Number of busy threads + 1*/
        pthread_mutex_unlock(&(pool->thread_counter));

        (*(task.function))(task.arg);                                           /*Execute callback function task*/
        //task.function(task.arg);                                              /* Execute callback function task*/

        /*Task end processing*/ 
        printf("thread 0x%x end working\n", (unsigned int)pthread_self());
        pthread_mutex_lock(&(pool->thread_counter));
        pool->busy_thr_num--;                                       /*Handle a task, number of busy States, number of threads - 1*/
        pthread_mutex_unlock(&(pool->thread_counter));
    }

    pthread_exit(NULL);
}

/* Management thread */
void *adjust_thread(void *threadpool)
{
    int i;
    threadpool_t *pool = (threadpool_t *)threadpool;
    while (!pool->shutdown) 
	{

        sleep(DEFAULT_TIME);                                    /*Regular thread pool management*/

        pthread_mutex_lock(&(pool->lock));
        int queue_size = pool->queue_size;                      /* Number of focus tasks */
        int live_thr_num = pool->live_thr_num;                  /* Number of surviving threads */
        pthread_mutex_unlock(&(pool->lock));

        pthread_mutex_lock(&(pool->thread_counter));
        int busy_thr_num = pool->busy_thr_num;                  /* Number of busy threads */
        pthread_mutex_unlock(&(pool->thread_counter));

        /* Create a new thread algorithm: when the number of tasks is greater than the minimum thread pool and the number of surviving threads is less than the maximum number of threads, such as 30 > = 10 & & 40 < 100*/
        if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) 
		{
            pthread_mutex_lock(&(pool->lock));  
            int add = 0;

            /*Add default at one time_ Thread threads*/
            for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
                    && pool->live_thr_num < pool->max_thr_num; i++) 
			{
                if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) 
				{
                    pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
                    add++;
                    pool->live_thr_num++;
                }
            }

            pthread_mutex_unlock(&(pool->lock));
        }

        /* Algorithm for destroying redundant idle threads: when the number of busy threads X2 is less than the number of surviving threads and the number of surviving threads is greater than the minimum number of threads*/
        if ((busy_thr_num * 2) < live_thr_num  &&  live_thr_num > pool->min_thr_num) 
		{
            /* One time destroy DEFAULT_THREAD, 10 threads at random */
            pthread_mutex_lock(&(pool->lock));
            pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;      /* The number of threads to destroy is set to 10 */
            pthread_mutex_unlock(&(pool->lock));

            for (i = 0; i < DEFAULT_THREAD_VARY; i++) 
			{
                /* Notify idle threads that they will terminate themselves*/
                pthread_cond_signal(&(pool->queue_not_empty));
            }
        }
    }

    return NULL;
}

int threadpool_destroy(threadpool_t *pool)
{
    int i;
    if (pool == NULL) 
	{
        return -1;
    }
    pool->shutdown = true;

    /*Destroy the management thread first*/
    //pthread_join(pool->adjust_tid, NULL);

    for (i = 0; i < pool->live_thr_num; i++) 
	{
        /*Notify all idle threads*/
        pthread_cond_broadcast(&(pool->queue_not_empty));
    }

    /*for (i = 0; i < pool->live_thr_num; i++) 
	{
        pthread_join(pool->threads[i], NULL);
    }*/

    threadpool_free(pool);

    return 0;
}

int threadpool_free(threadpool_t *pool)
{
    if (pool == NULL) 
	{
        return -1;
    }

    if (pool->task_queue) 
	{
        free(pool->task_queue);
    }

    if (pool->threads) 
	{
        free(pool->threads);
        pthread_mutex_lock(&(pool->lock));
        pthread_mutex_destroy(&(pool->lock));
        pthread_mutex_lock(&(pool->thread_counter));
        pthread_mutex_destroy(&(pool->thread_counter));
        pthread_cond_destroy(&(pool->queue_not_empty));
        pthread_cond_destroy(&(pool->queue_not_full));
    }

    free(pool);
    pool = NULL;

    return 0;
}

int threadpool_all_threadnum(threadpool_t *pool)
{
    int all_threadnum = -1;

    pthread_mutex_lock(&(pool->lock));
    all_threadnum = pool->live_thr_num;
    pthread_mutex_unlock(&(pool->lock));

    return all_threadnum;
}

int threadpool_busy_threadnum(threadpool_t *pool)
{
    int busy_threadnum = -1;

    pthread_mutex_lock(&(pool->thread_counter));
    busy_threadnum = pool->busy_thr_num;
    pthread_mutex_unlock(&(pool->thread_counter));

    return busy_threadnum;
}

int is_thread_alive(pthread_t tid)
{
    int kill_rc = pthread_kill(tid, 0);     //Send signal 0 to test whether the thread is alive
    if (kill_rc == ESRCH) 
	{
        return false;
    }

    return true;
}

Keywords: C++

Added by cl77 on Sun, 23 Jan 2022 03:29:00 +0200