1. Thread pool
As we all know, a thread is an executing branch of a process
On the operating system, scheduling is carried out on a site by site basis
=> Theoretically, the more threads a process has, the higher its processing efficiency
But in fact, it is affected by the operating system and hardware equipment
The number of threads created by a process is not directly proportional to the processing efficiency (linear relationship)
Therefore, in general, the creation of threads cannot "do whatever they want"
If you create thousands of threads to copy the previous code at the same time
Then the system will get stuck
Therefore, in order to solve this BUG, the concept of thread pool is introduced with reference to C + +
=> C has no thread pool
2. Principle of thread pool Producer and consumer model
producer
Responsible for generating data to be processed
consumer
Responsible for consumption data
There is a task queue
The producer is responsible for adding tasks
Consumers are responsible for processing tasks
When there are too many tasks in the task queue
You need to add threads to handle tasks
When there are not enough tasks in the task queue
It is necessary to reduce threads and consumption
producer
Add task if responsible
For example, when I read an ordinary file in the previous program
Be responsible for putting ordinary documents into the team
Thread pool
If (queue is not empty)
{
Wake up a queue to execute
}
consumer
Waiting to be awakened
Wake up the outgoing node and process the task
To copy the ordinary files in the team
Under normal circumstances, consumers will not die unless the task queue
There are too few tasks in the to cancel
void *thrad_fun(void *arg)
{
while(1)
{
Wait for the task to wake up
Processing tasks
}
}
Template code: cp.c thread_pool.c thread_pool.h
thread_pool.h
#ifndef _THREAD_POOL_H_ #define _THREAD_POOL_H_ #include <stdio.h> #include <stdbool.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <strings.h> #include <errno.h> #include <pthread.h> #define MAX_WAITING_TASKS 1000 #define MAX_ACTIVE_THREADS 20 #define BUFSIZE 100 #define PATHSIZE 100 // "task": sequence of instructions // "Inside a function": instructions are encapsulated within a function // "Task" -> a function // Is the "task" submitted immediately? //You need to save this function so that subsequent threads can execute it // "Save function" - > function pointer // What's the use of function pointers? Save the address of a function, // Because you don't call it now, wait until later // call me back // callback: back, call struct task//Task node { void *(*do_task)(void *arg); //Function pointer to the function to be executed by the task void *arg; //A pointer that is passed as a parameter of the function when the task executes the function struct task *next; }; typedef struct thread_pool//Thread pool head node { pthread_mutex_t lock; //Mutex is used to protect the "thread pool", which is used to protect the linked list pthread_cond_t cond; //Conditions with tasks bool shutdown; //Whether to exit. struct task *task_list;//Task linked list refers to the first task node pthread_t *tids;//An array pointing to thread ID S because I might create multiple threads. //size = n * sizeof(pthread_t) unsigned int max_waiting_tasks;//Represents the maximum number of tasks performed unsigned int waiting_tasks; //The number of tasks currently on the linked list, that is, tasks to be executed unsigned int active_threads; //Number of threads in service }thread_pool; /* func:Initialize a thread pool param:pool A thread_ The pointer of pool should have a space to point to threads_number: You should initially have several active threads return Return true for success and false for failure */ bool init_pool(thread_pool *pool, unsigned int threads_number); /* func:Add task param:pool A thread_ The pointer of pool should have a space to point to do_task:Which function do you want to execute task: Parameters of the function to execute return Return true for success and false for failure */ bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task); /* func:Increase the number of threads and consumers param:pool A thread_ The pointer of pool should have a space to point to additional_threads_number How many threads do you want to add return The number of threads added is returned successfully, and - 1 is returned for failure */ int add_thread(thread_pool *pool, unsigned int additional_threads_number); /* func:Delete thread param:pool A thread_ The pointer of pool should have a space to point to removing_threads_number How many threads do you want to delete return The number of threads cancelled is returned successfully, and - 1 is returned for failure /~~~Successfully returned the number of threads remaining */ int remove_thread(thread_pool *pool, unsigned int removing_threads_number); /* func:Destroy thread pool param:pool A thread_ The pointer of pool should have a space to point to */ bool destroy_pool(thread_pool *pool); void *routine(void *arg);//Task execution function #endif
thread_pool.c
#include "thread_pool.h" void handler(void *arg) { pthread_mutex_unlock((pthread_mutex_t *)arg);//Unlock } void *routine(void *arg)//Task execution function { #ifdef DEBUG printf("[%u] is started.\n", (unsigned)pthread_self()); #endif thread_pool *pool = (thread_pool *)arg;//Parameter is actually a thread pool header node struct task *p;//Point to the task I want to perform while(1) { /* ** push a cleanup function handler(), make sure that ** the calling thread will release the mutex properly ** even if it is cancelled during holding the mutex. ** ** NOTE: ** pthread_cleanup_push() is a macro which includes a ** loop in it, so if the specified field of codes that ** paired within pthread_cleanup_push() and pthread_ ** cleanup_pop() use 'break' may NOT break out of the ** truely loop but break out of these two macros. ** see line 56 below. */ //================================================// //pthread_cleanup_push and pthread_cleanup_pop is used together //From push to pop, as long as the thread is interrupted for any reason, it will execute the function registered by push //This means to prevent accidental thread termination from causing deadlock pthread_cleanup_push(handler, (void *)&pool->lock);//Register a thread cleanup function to prevent deadlock Lock pthread_mutex_lock(&pool->lock); //================================================// // 1, no task, and is NOT shutting down, then wait while(pool->waiting_tasks == 0 && !pool->shutdown) { Wait for conditional variables pthread_cond_wait(&pool->cond, &pool->lock);//When there is no task but the thread pool is not related, it will block here. There are two wake-up places //One is when adding tasks, the other is when destroying thread pools } // 2, no task, and is shutting down, then exit if(pool->waiting_tasks == 0 && pool->shutdown == true) { pthread_mutex_unlock(&pool->lock); pthread_exit(NULL); // CANNOT use 'break'; } // 3, have some task, then consume it Tasks performed task p p = pool->task_list->next;//The first task node has no assignment, so you can skip it pool->task_list->next = p->next;//Update pool - > task_ list->next pool->waiting_tasks--; Above is the task node task Removal of //================================================// pthread_mutex_unlock(&pool->lock); pthread_cleanup_pop(0);//Exit protection with lock end //================================================// pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); // Execute thread function (p->do_task)(p->arg);//Perform tasks pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); ????Why do you want to modify the properties? From default to cancelable to non cancelable, which thread's properties are changed Modification is the property of the current thread, not the thread executing the task. It prevents the thread of the thread pool from being cancelled when executing the task p->next = NULL; free(p);//Free up space for task nodes to be executed } pthread_exit(NULL); } /* init_pool:Initialize a thread pool @pool: Pointer. Point to the purebred pool to initialize @threads_number:Number of resident threads in the thread */ bool init_pool(thread_pool *pool, unsigned int threads_number)//Initialize the thread pool chain table and create a thread to run the thread pool management function { Initialize a thread mutex pthread_mutex_init(&pool->lock, NULL); pthread_cond_init(&pool->cond, NULL); pool->shutdown = false;//Power on //Create the first task node and use task_list points to it. Note that the first task node has no assignment. why? pool->task_list = malloc(sizeof(struct task)); pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS); ????Does the above macro definition 20 refer to the array length of 20? Can store 20 ID?? Yes, set this thread pool to hold 20 threads ID if(pool->task_list == NULL || pool->tids == NULL) { perror("allocate memory error"); return false; } pool->task_list->next = NULL; pool->max_waiting_tasks = MAX_WAITING_TASKS; pool->waiting_tasks = 0; unsigned int threads_number Initially, there should be several active threads pool->active_threads = threads_number; int i; //Create active_threads a thread to run the task execution function for(i=0; i < pool->active_threads; i++) { thread id Deposit of if(pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0) { perror("create threads error"); return false; } //Conditional compilation test #ifdef DEBUG printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n", (unsigned)pthread_self(), __FUNCTION__, i, (unsigned)pool->tids[i]); #endif } return true; } bool add_task(thread_pool *pool,void *(*do_task)(void *arg), void *arg)//Add task { struct task *new_task = malloc(sizeof(struct task));//Create a new task node if(new_task == NULL) { perror("allocate memory error"); return false; } new_task->do_task = do_task; //Assign a value to the new task node new_task->arg = arg; new_task->next = NULL; //============ LOCK =============// pthread_mutex_lock(&pool->lock); //===============================// if(pool->waiting_tasks >= MAX_WAITING_TASKS)//If the number of waiting tasks reaches the maximum { pthread_mutex_unlock(&pool->lock); fprintf(stderr, "too many tasks.\n"); free(new_task); return false;//Discard the addition and end the function } struct task *tmp = pool->task_list; while(tmp->next != NULL) { tmp = tmp->next;//Find the last node } tmp->next = new_task; pool->waiting_tasks++; //=========== UNLOCK ============// pthread_mutex_unlock(&pool->lock); //===============================// #ifdef DEBUG printf("[%u][%s] ==> a new task has been added.\n", (unsigned)pthread_self(), __FUNCTION__); #endif ????What is the role of the wake-up condition variable? Remind to start work? To wake up a thread, the system will automatically allocate a awakened thread to work without specifying a thread to work pthread_cond_signal(&pool->cond);//Wake up condition variable return true; } int add_thread(thread_pool *pool, unsigned additional_threads)//Create active thread { if(additional_threads == 0) { return 0; } //Finally, you have several active threads unsigned total_threads = pool->active_threads + additional_threads; int i, actual_increment = 0; for(i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++) { thread id Deposit if(pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0)//Create thread { perror("add threads error"); // no threads has been created, return fail if(actual_increment == 0) return -1; break; } actual_increment++; #ifdef DEBUG printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n", (unsigned)pthread_self(), __FUNCTION__, i, (unsigned)pool->tids[i]); #endif } pool->active_threads += actual_increment; return actual_increment; } int remove_thread(thread_pool *pool, unsigned int removing_threads) { if(removing_threads == 0) return pool->active_threads; int remaining_threads = pool->active_threads - removing_threads;//Threads remaining after deletion remaining_threads = remaining_threads > 0 ? remaining_threads : 1;//Why keep 1, why? The thread pool itself is a thread int i; for(i = pool->active_threads - 1; i > remaining_threads - 1; i--)//Our goal is to ease the shortage of hardware resources rather than end the task { //Cancel thread errno = pthread_cancel(pool->tids[i]); if(errno != 0) break; #ifdef DEBUG printf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n", (unsigned)pthread_self(), __FUNCTION__, i, (unsigned)pool->tids[i]); #endif } if(i == pool->active_threads-1) { Failed to cancel thread from start return -1; } else { Output the number of threads left at present pool->active_threads = i+1; return i+1; } } bool destroy_pool(thread_pool *pool) { // 1, activate all threads pool->shutdown = true; pthread_cond_broadcast(&pool->cond); What is the function of the above broadcast? Wake up all threads? How to wake up? Are there any requirements for mutexes? Why not? Bring your own? The built-in function will wake up all threads in the thread pool // 2, wait for their exiting int i; for(i=0; i < pool->active_threads; i++) { Loop end thread id When did you deposit it? When initializing a thread pool and increasing the number of threads errno = pthread_join(pool->tids[i], NULL); if(errno != 0) { printf("join tids[%d] error: %s\n", i, strerror(errno)); } } // 3, free memories free(pool->task_list); free(pool->tids); free(pool); return true; } The thread pool can be increased to 20 threads. At present, 10 threads have been initialized, and the number of tasks that can be performed is 1000
cp.c (simply use this thread pool)
#include <sys/types.h> #include <dirent.h> #include <sys/stat.h> #include <unistd.h> #include <fcntl.h> #include <stdio.h> #include <errno.h> #include <libgen.h> #include <pthread.h> #include "thread_pool.h" // char *dirname(char *path); // char *basename(char *path); //Point to a "thread pool" thread_pool *pool = NULL; //Copy ordinary files struct cp_files { char file1[512]; char file2[512]; }; The function parameter of the execution thread is the structure cp void* cp_files_routine(void *data) //void cp_file(char *file1, char *file2) { char *file1, *file2; int fd1, fd2; int ret; char buf[1024]; /Take out the two file names in the structure struct cp_files *p = (struct cp_files *)data; file1 = p->file1; file2 = p->file2; //pthread_detach(pthread_self()); fd1 = open(file1, O_RDONLY); if (fd1 == -1) { printf("%s \n", file1); perror("open error"); //return NULL; goto cp_return; } fd2 = open(file2, O_WRONLY | O_CREAT | O_TRUNC, 0777); if (fd2 == -1) { printf("%s \n", file2); perror("open error"); //close(fd1); //return NULL; goto cp_return; } while (1) { ret Is the number of bytes actually read ret = read(fd1, buf, 1024); if (ret == 0) { break; } else if (ret > 0) { int w = write(fd2, buf, ret); if (w != ret) { } } else { perror("read error"); break; } } cp_return: free(data); close(fd1); close(fd2); return NULL; } //replica catalog void cp_dir(char *path1, char *path2) { struct dirent * dirp = NULL; DIR *dir = opendir(path1); if (dir == NULL) { perror("opendir error"); return ; } char path2_name[512]; char *p_name = basename(path1); sprintf(path2_name, "%s/%s", path2, p_name); Create file 2 and give all permissions mkdir(path2_name, 0777); path2 = path2_name; while (dirp = readdir(dir)) { ///????? What does strcmp compare? If the read file name is a directory entry, continue to read it below, followed by recursion if (strcmp(dirp->d_name, ".") == 0 || strcmp(dirp->d_name, "..") == 0) { continue; } char filename[512]; sprintf(filename, "%s/%s", path1, dirp->d_name); struct stat st; lstat(filename, &st); /Ordinary file ||Or symbolic link file(Shortcut) if (S_ISREG(st.st_mode) || S_ISLNK(st.st_mode)) { char file[512]; sprintf(file, "%s/%s", path2, dirp->d_name); struct cp_files* cp = malloc(sizeof(*cp)); copy filename reach cp->file1 Copy to structure cp inside strcpy(cp->file1, filename); strcpy(cp->file2, file); //cp_file( filename, file); Add tasks to linked list nodes add_task(pool, cp_files_routine, cp); } Is a catalog item else if (S_ISDIR(st.st_mode)) { char pathname[512]; sprintf(pathname, "%s/%s", path2, dirp->d_name); mkdir(pathname, 0777); Recursive function recursion itself cp_dir(filename, pathname); } } closedir(dir); return ; } // cp f/d1 f/d2 int main(int argc, char *argv[]) { int ret; struct stat st1, st2; if( stat(argv[1], &st1) || stat(argv[2], &st2) ) { printf("-----\n"); perror("stat error"); return ; } if (S_ISREG(st1.st_mode) && S_ISREG(st2.st_mode)) { //cp_file(argv[1], argv[2]); } //Define a "thread pool" and initialize it pool = malloc(sizeof(*pool)); Initialize thread pool init_pool(pool, 10); if (S_ISDIR(st1.st_mode) && S_ISDIR(st2.st_mode)) { /cp Files in directory cp_dir(argv[1], argv[2]); } Destroy thread pool destroy_pool( pool); return 0; }
Thread pool function structure:
Overall:
Two structures, six functions
Structures: task nodes
Function pointer Thread function do_task
A pointer The parameters of the above function are passed in arg
Point to the next task node
Structure: Thread pool head node
Lock Protect thread pool lock
condition Used to wake up threads cond
Exit thread pool usage flag shutdown
Linked list of task nodes task_list
An array of thread ID s tids
Thread pool capacity:
Maximum number of tasks performed max_waiting_tasks
Number of tasks on the linked list waiting_tasks
Number of running threads active_threads (number in array of thread ID S)
Function: (the parameters of the following functions are selected from the above template)
initialization Thread pool init_pool(pool ,10);
Increase the number of tasks add_task(pool,cp_files_routine,cp); [1. Thread pool 2. Thread function 3. Parameters]
Add thread add_thread(pool,10);
Delete thread remove_thread(pool,2);
Destroy thread pool destroy_pool(pool);
Task execution function routine();
Specific structure of each function:
init_pool(pool,10);
Initialize fabric thread pool:
Lock, condition variable,
Create a thread execution function:
.......
add_task(pool,cp_files_routine,cp);
initialization new_task
Lock Thread pool
Determine the number of tasks -- > temp: task_ Tail node of list new_ Task - > Add
Unlock Thread pool
Wake up condition variable (automatically assign thread execution task)
add_thread(pool,10);
Safety judgment
Final active thread
Create a new thread from the number of new threads
Modify the number of active threads
remove_thread(pool,2);
Safety judgment
Number of threads finally active
Number of security monitoring active threads
Cancel the corresponding thread from the last one
destrey_pool(pool);
Modify exit flag
Broadcast, wake up all threads
End the thread for loop by ID
.....
routine();
Lock Judgment:
1. Wait to use thread pool
2. Quit using thread pool
3. Thread pool for execution: Remove task node: task_list--; ----> Unlock
Execute thread function
.....