November 12 concurrent 7_ Thread pool (structure + template)

1. Thread pool
    As we all know, a thread is an executing branch of a process
    On the operating system, scheduling is carried out on a site by site basis
    => Theoretically, the more threads a process has, the higher its processing efficiency
    But in fact, it is affected by the operating system and hardware equipment
    The number of threads created by a process is not directly proportional to the processing efficiency (linear relationship)

    Therefore, in general, the creation of threads cannot "do whatever they want"
    If you create thousands of threads to copy the previous code at the same time
    Then the system will get stuck

    Therefore, in order to solve this BUG, the concept of thread pool is introduced with reference to C + +
        => C has no thread pool   

2. Principle of thread pool   Producer and consumer model
    producer
        Responsible for generating data to be processed
    consumer
        Responsible for consumption data

    There is a task queue
    The producer is responsible for adding tasks
    Consumers are responsible for processing tasks

    When there are too many tasks in the task queue
        You need to add threads to handle tasks
    When there are not enough tasks in the task queue
        It is necessary to reduce threads and consumption
    


    producer
        Add task if responsible
        For example, when I read an ordinary file in the previous program
            Be responsible for putting ordinary documents into the team

    Thread pool
        If (queue is not empty)
            {
                Wake up a queue to execute
            }
    consumer
        Waiting to be awakened
            Wake up the outgoing node and process the task
                To copy the ordinary files in the team
        
        Under normal circumstances, consumers will not die unless the task queue
        There are too few tasks in the to cancel

        void *thrad_fun(void *arg)
        {
            while(1)
            {
                Wait for the task to wake up

                Processing tasks
            }
        }

Template code: cp.c           thread_pool.c          thread_pool.h

        thread_pool.h

#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_

#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>

#include <errno.h>
#include <pthread.h>

#define MAX_WAITING_TASKS	1000
#define MAX_ACTIVE_THREADS	20

#define BUFSIZE			100
#define PATHSIZE		100

// "task": sequence of instructions
//		"Inside a function": instructions are encapsulated within a function
// "Task" -> a function
// Is the "task" submitted immediately?
//You need to save this function so that subsequent threads can execute it
//	"Save function" - > function pointer

// What's the use of function pointers? Save the address of a function,
// Because you don't call it now, wait until later
//   call me back
// callback: back, call



struct task//Task node
{
	void *(*do_task)(void *arg); //Function pointer to the function to be executed by the task
	void *arg;  //A pointer that is passed as a parameter of the function when the task executes the function

	struct task *next;
};

typedef struct thread_pool//Thread pool head node
{
	pthread_mutex_t lock; //Mutex is used to protect the "thread pool", which is used to protect the linked list
	pthread_cond_t  cond; //Conditions with tasks

	bool shutdown;  //Whether to exit.

	struct task *task_list;//Task linked list refers to the first task node
	

	pthread_t *tids;//An array pointing to thread ID S because I might create multiple threads.
	//size = n * sizeof(pthread_t)

	unsigned int max_waiting_tasks;//Represents the maximum number of tasks performed
	unsigned int waiting_tasks; //The number of tasks currently on the linked list, that is, tasks to be executed
	unsigned int active_threads; //Number of threads in service
}thread_pool;

/*
	func:Initialize a thread pool
	param:pool A thread_ The pointer of pool should have a space to point to
	threads_number: You should initially have several active threads
	return Return true for success and false for failure
*/
bool init_pool(thread_pool *pool, unsigned int threads_number);
/*
	func:Add task
	param:pool A thread_ The pointer of pool should have a space to point to
		do_task:Which function do you want to execute
		task: Parameters of the function to execute
	return Return true for success and false for failure
*/
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);
/*
	func:Increase the number of threads and consumers
	param:pool A thread_ The pointer of pool should have a space to point to
		additional_threads_number How many threads do you want to add
	return The number of threads added is returned successfully, and - 1 is returned for failure
*/
int  add_thread(thread_pool *pool, unsigned int additional_threads_number);
/*
	func:Delete thread
	param:pool A thread_ The pointer of pool should have a space to point to
		removing_threads_number How many threads do you want to delete
	return The number of threads cancelled is returned successfully, and - 1 is returned for failure
	/~~~Successfully returned the number of threads remaining
*/
int  remove_thread(thread_pool *pool, unsigned int removing_threads_number);
/*
	func:Destroy thread pool
	param:pool A thread_ The pointer of pool should have a space to point to
*/
bool destroy_pool(thread_pool *pool);

void *routine(void *arg);//Task execution function


#endif

        thread_pool.c

#include "thread_pool.h"

void handler(void *arg)
{
	pthread_mutex_unlock((pthread_mutex_t *)arg);//Unlock
}

void *routine(void *arg)//Task execution function
{
	#ifdef DEBUG
	printf("[%u] is started.\n",
		(unsigned)pthread_self());
	#endif

	thread_pool *pool = (thread_pool *)arg;//Parameter is actually a thread pool header node
	struct task *p;//Point to the task I want to perform

	while(1)
	{
		/*
		** push a cleanup function handler(), make sure that
		** the calling thread will release the mutex properly
		** even if it is cancelled during holding the mutex.
		**
		** NOTE:
		** pthread_cleanup_push() is a macro which includes a
		** loop in it, so if the specified field of codes that 
		** paired within pthread_cleanup_push() and pthread_
		** cleanup_pop() use 'break' may NOT break out of the
		** truely loop but break out of these two macros.
		** see line 56 below.
		*/
		//================================================//
		//pthread_cleanup_push and pthread_cleanup_pop is used together
		//From push to pop, as long as the thread is interrupted for any reason, it will execute the function registered by push
		//This means to prevent accidental thread termination from causing deadlock
		pthread_cleanup_push(handler, (void *)&pool->lock);//Register a thread cleanup function to prevent deadlock
		
		Lock
		pthread_mutex_lock(&pool->lock);
		//================================================//

		// 1, no task, and is NOT shutting down, then wait
		while(pool->waiting_tasks == 0 && !pool->shutdown)
		{
			Wait for conditional variables
			pthread_cond_wait(&pool->cond, &pool->lock);//When there is no task but the thread pool is not related, it will block here. There are two wake-up places
														//One is when adding tasks, the other is when destroying thread pools
		}

		// 2, no task, and is shutting down, then exit
		if(pool->waiting_tasks == 0 && pool->shutdown == true)
		{
			pthread_mutex_unlock(&pool->lock);
			pthread_exit(NULL); // CANNOT use 'break';
		}

		// 3, have some task, then consume it
		
		Tasks performed task p
		p = pool->task_list->next;//The first task node has no assignment, so you can skip it
		pool->task_list->next = p->next;//Update pool - > task_ list->next
		pool->waiting_tasks--;
		Above is the task node task Removal of

		//================================================//
		pthread_mutex_unlock(&pool->lock);
		pthread_cleanup_pop(0);//Exit protection with lock end
		//================================================//

		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); //
		Execute thread function
		(p->do_task)(p->arg);//Perform tasks
		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
		????Why do you want to modify the properties? From default to cancelable to non cancelable, which thread's properties are changed
        Modification is the property of the current thread, not the thread executing the task. It prevents the thread of the thread pool from being cancelled when executing the task

		p->next = NULL;
		free(p);//Free up space for task nodes to be executed
	}

	pthread_exit(NULL);
}

/*
	init_pool:Initialize a thread pool
	@pool: Pointer. Point to the purebred pool to initialize
	@threads_number:Number of resident threads in the thread
*/
bool init_pool(thread_pool *pool, unsigned int threads_number)//Initialize the thread pool chain table and create a thread to run the thread pool management function
{
	Initialize a thread mutex
	pthread_mutex_init(&pool->lock, NULL);
	pthread_cond_init(&pool->cond, NULL);

	pool->shutdown = false;//Power on
	//Create the first task node and use task_list points to it. Note that the first task node has no assignment. why?
	pool->task_list = malloc(sizeof(struct task));
	pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);
	????Does the above macro definition 20 refer to the array length of 20? Can store 20 ID??
    Yes, set this thread pool to hold 20 threads ID

	if(pool->task_list == NULL || pool->tids == NULL)
	{
		perror("allocate memory error");
		return false;
	}

	pool->task_list->next = NULL;

	pool->max_waiting_tasks = MAX_WAITING_TASKS;
	pool->waiting_tasks = 0;
	
	 unsigned int threads_number Initially, there should be several active threads
	pool->active_threads = threads_number;

	int i;
	//Create active_threads a thread to run the task execution function
	for(i=0; i < pool->active_threads; i++)   
	{
		thread  id Deposit of
		if(pthread_create(&((pool->tids)[i]), NULL,
					routine, (void *)pool) != 0)
		{
			perror("create threads error");
			return false;
		}
		//Conditional compilation test
		#ifdef DEBUG
		printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
			(unsigned)pthread_self(), __FUNCTION__,
			i, (unsigned)pool->tids[i]);
		#endif
	}

	return true;
}

bool add_task(thread_pool *pool,void *(*do_task)(void *arg), void *arg)//Add task
{
	struct task *new_task = malloc(sizeof(struct task));//Create a new task node
	if(new_task == NULL)
	{
		perror("allocate memory error");
		return false;
	}
	new_task->do_task = do_task;  //Assign a value to the new task node
	new_task->arg = arg;
	new_task->next = NULL;

	//============ LOCK =============//
	pthread_mutex_lock(&pool->lock);
	//===============================//

	if(pool->waiting_tasks >= MAX_WAITING_TASKS)//If the number of waiting tasks reaches the maximum
	{
		pthread_mutex_unlock(&pool->lock);

		fprintf(stderr, "too many tasks.\n");
		free(new_task);

		return false;//Discard the addition and end the function
	}
	
	struct task *tmp = pool->task_list;
	while(tmp->next != NULL)
	{
		tmp = tmp->next;//Find the last node
	}

	tmp->next = new_task;
	pool->waiting_tasks++;

	//=========== UNLOCK ============//
	pthread_mutex_unlock(&pool->lock);
	//===============================//

	#ifdef DEBUG
	printf("[%u][%s] ==> a new task has been added.\n",
		(unsigned)pthread_self(), __FUNCTION__);
	#endif

	????What is the role of the wake-up condition variable? Remind to start work?
    To wake up a thread, the system will automatically allocate a awakened thread to work without specifying a thread to work
	pthread_cond_signal(&pool->cond);//Wake up condition variable
	return true;
}

int add_thread(thread_pool *pool, unsigned additional_threads)//Create active thread
{
	if(additional_threads == 0)
	{
		return 0;
	}
	//Finally, you have several active threads
	unsigned total_threads = pool->active_threads + additional_threads;

	int i, actual_increment = 0;
	for(i = pool->active_threads;
	    i < total_threads && i < MAX_ACTIVE_THREADS;
	    i++)	
	{
		thread  id Deposit
		if(pthread_create(&((pool->tids)[i]),
					NULL, routine, (void *)pool) != 0)//Create thread
		{
			perror("add threads error");

			// no threads has been created, return fail
			if(actual_increment == 0)
				return -1;

			break;
		}
		actual_increment++; 

		#ifdef DEBUG
		printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
			(unsigned)pthread_self(), __FUNCTION__,
			i, (unsigned)pool->tids[i]);
		#endif
	}

	pool->active_threads += actual_increment;
	return actual_increment;
}

int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
	if(removing_threads == 0)
		return pool->active_threads;

	int remaining_threads = pool->active_threads - removing_threads;//Threads remaining after deletion
	
	remaining_threads = remaining_threads > 0 ? remaining_threads : 1;//Why keep 1, why? The thread pool itself is a thread

	int i;
	for(i = pool->active_threads - 1; i > remaining_threads - 1; i--)//Our goal is to ease the shortage of hardware resources rather than end the task
	{
		//Cancel thread
		errno = pthread_cancel(pool->tids[i]);

		if(errno != 0)
			break;

		#ifdef DEBUG
		printf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",
			(unsigned)pthread_self(), __FUNCTION__,
			i, (unsigned)pool->tids[i]);
		#endif
	}

	if(i == pool->active_threads-1)
	{	
		Failed to cancel thread from start
		return -1;
	}
	else
	{
		Output the number of threads left at present
		pool->active_threads = i+1;
		return i+1;
	}
}

bool destroy_pool(thread_pool *pool)
{
	// 1, activate all threads
	pool->shutdown = true;
	pthread_cond_broadcast(&pool->cond);
	What is the function of the above broadcast? Wake up all threads? How to wake up? Are there any requirements for mutexes? Why not? Bring your own?
    The built-in function will wake up all threads in the thread pool
	// 2, wait for their exiting
	int i;
	for(i=0; i < pool->active_threads; i++)
	{
		Loop end thread id When did you deposit it?
        When initializing a thread pool and increasing the number of threads
		errno = pthread_join(pool->tids[i], NULL);
		if(errno != 0)
		{
			printf("join tids[%d] error: %s\n",
					i, strerror(errno));
		}
	}

	// 3, free memories
	free(pool->task_list);
	free(pool->tids);
	free(pool);

	return true;
}

The thread pool can be increased to 20 threads. At present, 10 threads have been initialized, and the number of tasks that can be performed is 1000

        cp.c     (simply use this thread pool)

#include <sys/types.h>
#include <dirent.h>

#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <errno.h>
#include <libgen.h>
#include <pthread.h>


#include "thread_pool.h"

	// char *dirname(char *path);

	// char *basename(char *path);



//Point to a "thread pool"
thread_pool *pool = NULL;



//Copy ordinary files
struct cp_files
{	
	char file1[512];
	char file2[512];
	
};


The function parameter of the execution thread is the structure cp
void* cp_files_routine(void *data)
//void cp_file(char *file1, char *file2)
{
	char *file1, *file2;
		

	int fd1, fd2;
	int ret;
	char buf[1024];

	/Take out the two file names in the structure
	struct cp_files *p = (struct cp_files *)data;
	file1 = p->file1;
	file2 = p->file2;


	//pthread_detach(pthread_self());

	
	fd1 = open(file1, O_RDONLY);
	if (fd1 == -1)
	{
		printf("%s \n", file1);
		perror("open error");
		//return NULL;
		goto cp_return;
	}

	fd2 = open(file2, O_WRONLY | O_CREAT | O_TRUNC, 0777);
	if (fd2 == -1)
	{
		printf("%s \n", file2);
		perror("open error");

		//close(fd1);
		//return NULL;
		goto cp_return;
	}


	while (1)
	{
		ret Is the number of bytes actually read
		ret = read(fd1, buf, 1024);
		if (ret == 0)
		{
			break;
		}
		else if (ret > 0)
		{
			int w = write(fd2, buf, ret);
			if (w != ret)
			{
				
			}
		}
		else
		{
			perror("read error");
			break;
		}
		
	}


cp_return:


	free(data);
	close(fd1);
	close(fd2);

	return NULL;
}


//replica catalog 

void cp_dir(char *path1, char *path2)
{
	struct dirent * dirp = NULL;
	DIR *dir = opendir(path1);
	if (dir == NULL)
	{
		perror("opendir error");
		return ;
	}

	char path2_name[512];
	char *p_name = basename(path1);

	sprintf(path2_name, "%s/%s", path2, p_name);
	
	Create file 2 and give all permissions
	mkdir(path2_name, 0777);
	path2 = path2_name;

	while (dirp = readdir(dir))
	{
		///????? What does strcmp compare?
        If the read file name is a directory entry, continue to read it below, followed by recursion
		if (strcmp(dirp->d_name, ".") == 0 || 
				strcmp(dirp->d_name, "..") == 0)
		{
			continue;
		}
		
		char filename[512];

		sprintf(filename, "%s/%s", path1, dirp->d_name);
	

		struct stat st;
		lstat(filename, &st);

		/Ordinary file  ||Or symbolic link file(Shortcut)
		if (S_ISREG(st.st_mode) || S_ISLNK(st.st_mode))
		{

			char file[512];
			sprintf(file, "%s/%s", path2, dirp->d_name);
			struct cp_files*  cp = malloc(sizeof(*cp));

			copy filename reach cp->file1   Copy to structure cp inside
			strcpy(cp->file1, filename);
			strcpy(cp->file2, file);
			
			//cp_file( filename, file);

			Add tasks to linked list nodes
			add_task(pool, cp_files_routine, cp);
		
		}

		Is a catalog item
		else if (S_ISDIR(st.st_mode))
		{
			char pathname[512];
			sprintf(pathname, "%s/%s", path2, dirp->d_name);

			mkdir(pathname, 0777);
			
			Recursive function recursion itself
			cp_dir(filename,  pathname);
			
		}
		
		
	}


	closedir(dir);

	return ;

}

// cp f/d1  f/d2
int main(int argc, char *argv[])
{
	int ret;
	struct stat st1, st2;

	if(	stat(argv[1], &st1) || stat(argv[2], &st2) )
	{
		printf("-----\n");
		perror("stat error");
		return ;
	}

	if (S_ISREG(st1.st_mode) && S_ISREG(st2.st_mode))
	{
		//cp_file(argv[1], argv[2]);
	}

	//Define a "thread pool" and initialize it
	 pool = malloc(sizeof(*pool));

	Initialize thread pool 
	init_pool(pool, 10);
	
	if (S_ISDIR(st1.st_mode) && S_ISDIR(st2.st_mode))
	{
		/cp Files in directory	
		cp_dir(argv[1], argv[2]);
	}

	Destroy thread pool
	destroy_pool( pool);
	
	return 0;
}

                                                                                                                                                           

Thread pool function structure:

Overall:

        Two structures, six functions

Structures: task nodes

                        Function pointer   Thread function     do_task

                        A pointer   The parameters of the above function are passed in     arg  

                        Point to the next task node

Structure:   Thread pool head node

                        Lock   Protect thread pool     lock

                        condition   Used to wake up threads   cond

                        Exit thread pool usage flag   shutdown

                        Linked list of task nodes   task_list

                        An array of thread ID s     tids

        Thread pool capacity:

                        Maximum number of tasks performed   max_waiting_tasks

                        Number of tasks on the linked list     waiting_tasks

                        Number of running threads   active_threads     (number in array of thread ID S)

Function: (the parameters of the following functions are selected from the above template)

        initialization   Thread pool   init_pool(pool ,10);

        Increase the number of tasks       add_task(pool,cp_files_routine,cp);     [1. Thread pool 2. Thread function 3. Parameters]

        Add thread           add_thread(pool,10);

        Delete thread           remove_thread(pool,2);

        Destroy thread pool       destroy_pool(pool);

        Task execution function   routine();

Specific structure of each function:

init_pool(pool,10);

        Initialize fabric thread pool:

                Lock, condition variable,

        Create a thread execution function:

        .......

add_task(pool,cp_files_routine,cp);

        initialization   new_task

        Lock   Thread pool  

        Determine the number of tasks -- > temp: task_ Tail node of list     new_ Task - > Add

        Unlock   Thread pool

        Wake up condition variable (automatically assign thread execution task)

add_thread(pool,10);

        Safety judgment  

        Final active thread

        Create a new thread from the number of new threads

        Modify the number of active threads

remove_thread(pool,2);

        Safety judgment

        Number of threads finally active

         Number of security monitoring active threads

        Cancel the corresponding thread from the last one

destrey_pool(pool);

        Modify exit flag

        Broadcast, wake up all threads

        End the thread for loop by ID

        .....

routine();

        Lock   Judgment:

                1. Wait to use thread pool

                2. Quit using thread pool

                3. Thread pool for execution:   Remove task node: task_list--;   ----> Unlock

                                                  Execute thread function

                .....

Keywords: C Concurrent Programming thread pool

Added by r_honey on Wed, 17 Nov 2021 03:36:16 +0200