Threads and processes
1, What is a process / thread
1. Introduction
As we all know, CPU is the core of computer, which undertakes all computing tasks. The operating system is the manager of the computer and a big housekeeper. It is responsible for task scheduling, resource allocation and management, and commands the whole computer hardware. An application program is a program with certain functions. The program runs on the operating system
2. Thread
In the early days, computers did not have the concept of thread, but with the development of the times, there are many deficiencies in using only process to process programs. For example, when a process is blocked, the whole program will stop at the blocked place, and if the process is switched frequently, it will waste system resources. So the thread appears
Thread is the smallest unit that can have resources and run independently, and it is also the smallest unit of program execution. A process can have multiple threads, and multiple threads belonging to the same process will share the resources of the process
3. Process
Process is a dynamic execution process of a program with certain functions on a data set. The process consists of program, data set and process control block. The program is used to describe the functions to be completed by the process. It is a set of instructions that control the execution of the process; Data set is the data and work area required by the program during execution; Program control block (PCB) contains the description information and control information of the program, which is the only sign of the existence of the process
4. Distinction
- A process consists of one or more threads. Threads are different execution routes of code in a process
- Switching processes requires more resources than switching threads
- Processes are independent of each other, and threads in the same process share the memory space of the program (such as code segments, data sets, stacks, etc.). Threads within a process are not visible to other processes. In other words, threads share the same memory space, while processes have independent memory space
5. Use
In Python, thread support is provided through two standard libraries thread and threading, which encapsulate thread. The threading module provides Thread,Lock, RLOCK, Condition and other components
2, Multithreading
In Python, threads and processes are used through the thread class. This class is in our thread and threading modules. We usually import through threading
By default, if no error is reported in the interpreter, the thread is available
from threading import Thread
1. Common methods
Thread.run(self) # The method that runs when the thread is started, which calls the function specified by the target parameter Thread.start(self) # To start a thread, the start method is to call the run method Thread.terminate(self) # Force thread termination Thread.join(self, timeout) # Block the call and the main thread waits Thread.setDaemon(self, daemonic) # Set child thread as daemon thread Thread.getName(self, name) # Get thread name Thread.setName(self, name) # Set thread name
2. Common parameters
parameter | explain |
---|---|
target | Represents the calling object, that is, the task to be executed by the child thread |
name | The name of the child thread |
args | The positional parameter passed into the target function is a tuple, and a comma must be added after the parameter |
3. Application of multithreading
3.1 rewriting thread method
import time, queue, threading class MyThread(threading.Thread): def __init__(self): super().__init__() self.daemon = True # Turn on guard mode self.queue = queue.Queue(3) # Open the queue object and store three tasks self.start() # When instantiating, start the thread directly, and there is no need to start the thread manually def run(self) -> None: # run method is a method of the thread. It is a built-in method and will be called automatically when the thread is running while True: # Continuous processing of tasks func, args, kwargs = self.queue.get() func(*args, **kwargs) # The tuple of the calling function execution task is variable in length. Remember to unpack self.queue.task_done() # Solve a task by reducing the counter by one to avoid blocking # Producer model def submit_tasks(self, func, args=(), kwargs={}): # func is the task to be executed, adding the variable length parameter (the default parameter is used by default) self.queue.put((func, args, kwargs)) # Submit task # Override join method def join(self) -> None: self.queue.join() # Check whether the queue timer is 0. If the task is empty, close the queue def f2(*args, **kwargs): time.sleep(2) print("Task 2 complete", args, kwargs) # Instantiate thread object mt = MyThread() # Submit task mt.submit_tasks(f2, args=("aa", "aasd"), kwargs={"a": 2, "s": 3}) # Let the main thread wait for the child thread to finish mt.join()
Guard mode:
- The main thread runs only after other non daemon threads run (the daemon thread is recycled at this time). Because the end of the main thread means the end of the process, the overall resources of the process will be recycled, and the process must ensure that all non daemon threads run before it can end
3.2 direct debugging
def f2(i): time.sleep(2) print("Task 2 complete", i) lis = [] for i in range(5): t = Thread(target=f2, args=(i,)) t.start() # Start 5 threads lis.append(t) for i in lis: i.join() # Thread waiting
4. Data sharing between threads
Now there are multiple threads in our program code, and the same part of the content will be operated in these threads, so how to realize the sharing of these data?
At this time, you can use the Lock object Lock in the threading library to protect it
The acquire method of the Lock object is to apply for a Lock
Before operating the shared data object, each thread should apply for obtaining the operation right, that is, calling the acquire method of the lock object corresponding to the shared data object. If thread A executes the acquire() method and other thread B has applied for the lock and has not released it, the code of thread A will wait here for thread B to release the lock, Do not execute the following code.
Thread A cannot acquire the lock until thread B executes the release method of the lock and releases the lock, and then the following code can be executed
For example:
import threading var = 1 # Add a mutex and get the lock lock = threading.Lock() # Define the tasks to be used by two threads def func1(): global var # Declare global variables for i in range(1000000): lock.acquire() # Lock before operation var += i lock.release() # Release the lock after operation def func2(): global var # Declare global variables for i in range(1000000): lock.acquire() # Lock before operation var -= i lock.release() # Release the lock after operation # Create 2 threads t1 = threading.Thread(target=func1) t2 = threading.Thread(target=func2) t1.start() t2.start() t1.join() t2.join() print(var)
When using multithreading, if the data is inconsistent with your expectations, you can consider whether the shared data is called and overwritten
Use the Lock in the threading library to protect the object
3, Multi process usage
1. Introduction
Multiprocessing in Python is implemented through multiprocessing package, and multithreading Thread is similar. It can take advantage of multiprocessing Process object to create a process object. The method of this process object is similar to that of the thread object. There are also start(), run(), join(), and other methods. One method is different. The daemon method in the thread object is setdaemon, while the daemon of the process object is completed by setting the daemon attribute
2. Apply
2.1 rewrite process method
import time from multiprocessing import Process class MyProcess(Process): # Inherit Process class def __init__(self, target, args=(), kwargs={}): super(MyProcess, self).__init__() self.daemon = True # Start daemon self.target = target self.args = args self.kwargs = kwargs self.start() # Auto start process def run(self): self.target(*self.args, **self.kwargs) def fun(*args, **kwargs): print(time.time()) print(args[0]) if __name__ == '__main__': lis = [] for i in range(5): p = MyProcess(fun, args=(1, )) lis.append(p) for i in lis: i.join() # Let the process wait
Guard mode:
- The main process has finished running after its code is finished (the daemon will be recycled at this time), and then the main process will wait until the non daemon sub processes are finished running and recycle the resources of the sub processes (otherwise, a zombie process will be generated)
2.2 direct transfer usage
import time from multiprocessing import Process def fun(*args, **kwargs): print(time.time()) print(args[0]) if __name__ == '__main__': lis = [] for i in range(5): p = Process(target=fun, args=(1, )) lis.append(p) for i in lis: i.join() # Let the process wait
3. Data sharing between processes
3.1 Lock method
The use method is similar to the Lock use method of the thread
3.2 Manager method
The function of Manager is to provide global variables shared by multiple processes. The Manager() method will return an object that controls a service process. The objects saved in the process run other processes and use agents to operate
Manager Supported types are: list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value and Array
Syntax:
from multiprocessing import Process, Lock, Manager def f(n, d, l, lock): lock.acquire() d[str(n)] = n l[n] = -99 lock.release() if __name__ == '__main__': lock = Lock() with Manager() as manager: d = manager.dict() # Empty dictionary l = manager.list(range(10)) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] # Start 10 processes, and different processes operate on different elements in d and l for i in range(10): p = Process(target=f, args=(i, d, l, lock)) p.start() p.join() print(d) print(l)
4, Pool concurrency
1. Grammar
The base class of thread pool is concurrent Executor in the futures module. Executor provides two subclasses, ThreadPoolExecutor and ProcessPoolExecutor. ThreadPoolExecutor is used to create thread pool and ProcessPoolExecutor is used to create process pool
If you use thread pool / process pool to manage concurrent programming, as long as you submit the corresponding task function to thread pool / process pool, the rest will be done by thread pool / process pool
Exectuor provides the following common methods:
- submit(fn, *args, **kwargs): submit the fn function to the thread pool* Args represents the parameters passed to fn function, * kwargs represents the parameters passed to fn function in the form of keyword parameters
- map(func, *iterables, timeout=None, chunksize=1): this function is similar to the global function map(func, *iterables), except that this function will start multiple threads to immediately execute map processing on iterables asynchronously.
- shutdown(wait=True): close the thread pool
After the program submits the task function to the thread pool, the submit method will return a Future object. The Future class is mainly used to obtain the return value of the thread task function. Since the thread task will be executed asynchronously in the new thread, the function executed by the thread is equivalent to a "to be completed" task, so Python uses Future to represent it
Future provides the following methods:
- cancel(): cancels the thread task represented by the Future. If the task is being executed and cannot be cancelled, the method returns False; Otherwise, the program cancels the task and returns True.
- cancelled(): Returns whether the thread task represented by Future has been successfully cancelled.
- running(): if the thread task represented by the Future is executing and cannot be cancelled, the method returns True.
- done(): this method returns True if the thread task represented by this feature is successfully cancelled or executed.
- result(timeout=None): get the last result returned by the thread task represented by the Future. If the thread task represented by Future has not been completed, this method will block the current thread, and the timeout parameter specifies the maximum number of seconds to block.
- exception(timeout=None): get the exception thrown by the thread task represented by the Future. If the task is completed successfully and there is no exception, the method returns None.
- add_done_callback(fn): register a "callback function" for the thread task represented by the Future. When the task is successfully completed, the program will automatically trigger the fn function
2. Get the number of CPU s
from multiprocessing import cpu_count # CPU core number module, which can obtain the number of CPU cores n = cpu_count() # Get the number of cpu cores
3. Thread pool
The steps of using thread pool to execute thread tasks are as follows:
- Call the constructor of the ThreadPoolExecutor class to create a thread pool
- Define a normal function as a thread task
- Call the submit() method of the ThreadPoolExecutor object to submit the thread task
- When you don't want to submit any tasks, call the shutdown() method of the ThreadPoolExecutor object to close the thread pool
from concurrent.futures import ThreadPoolExecutor import threading import time # Define a function that is prepared as a thread task def action(max): my_sum = 0 for i in range(max): print(threading.current_thread().name + ' ' + str(i)) my_sum += i return my_sum # Create a thread pool with 2 threads pool = ThreadPoolExecutor(max_workers=2) # Submit a task to the thread pool, and 50 will be used as the parameter of the action() function future1 = pool.submit(action, 50) # Submit another task to the thread pool, and 100 will be used as the parameter of the action() function future2 = pool.submit(action, 100) def get_result(future): print(future.result()) # Add thread completed callback function for future1 future1.add_done_callback(get_result) # Add a thread completed callback function for future2 future2.add_done_callback(get_result) # Judge whether the task represented by future1 is over print(future1.done()) time.sleep(3) # Judge whether the task represented by future2 is over print(future2.done()) # View the results returned by the tasks represented by future1 print(future1.result()) # View the results returned by the tasks represented by future2 print(future2.result()) # Close thread pool pool.shutdown() # The sequence can use the with statement to manage the thread pool, which can avoid manually closing the thread pool
Optimal number of threads = ((thread waiting time + thread CPU time) / thread CPU time) * number of CPUs
It can also be lower than the number of CPU cores
3. Process pool
The steps of using thread pool to execute thread tasks are as follows:
- Call the constructor of ProcessPoolExecutor class to create a thread pool
- Define a normal function as a process task
- Call the submit() method of the ProcessPoolExecutor object to submit the thread task
- When you don't want to submit any tasks, call the shutdown() method of the ProcessPoolExecutor object to close the thread pool
The opening code of the process must be placed in if__ name__ == '__ main__': Under the code, it cannot be placed in a function or elsewhere
Tips for starting the process
from concurrent.futures import ProcessPoolExecutor pool = ProcessPoolExecutor(max_workers=cpu_count()) # How many processes are started according to the number of cpu cores
The number of open processes should be lower than the maximum number of CPU cores