Processes and threads in Python

Threads and processes

1, What is a process / thread

1. Introduction

As we all know, CPU is the core of computer, which undertakes all computing tasks. The operating system is the manager of the computer and a big housekeeper. It is responsible for task scheduling, resource allocation and management, and commands the whole computer hardware. An application program is a program with certain functions. The program runs on the operating system

2. Thread

In the early days, computers did not have the concept of thread, but with the development of the times, there are many deficiencies in using only process to process programs. For example, when a process is blocked, the whole program will stop at the blocked place, and if the process is switched frequently, it will waste system resources. So the thread appears

Thread is the smallest unit that can have resources and run independently, and it is also the smallest unit of program execution. A process can have multiple threads, and multiple threads belonging to the same process will share the resources of the process

3. Process

Process is a dynamic execution process of a program with certain functions on a data set. The process consists of program, data set and process control block. The program is used to describe the functions to be completed by the process. It is a set of instructions that control the execution of the process; Data set is the data and work area required by the program during execution; Program control block (PCB) contains the description information and control information of the program, which is the only sign of the existence of the process

4. Distinction

  1. A process consists of one or more threads. Threads are different execution routes of code in a process
  2. Switching processes requires more resources than switching threads
  3. Processes are independent of each other, and threads in the same process share the memory space of the program (such as code segments, data sets, stacks, etc.). Threads within a process are not visible to other processes. In other words, threads share the same memory space, while processes have independent memory space

5. Use

In Python, thread support is provided through two standard libraries thread and threading, which encapsulate thread. The threading module provides Thread,Lock, RLOCK, Condition and other components

2, Multithreading

In Python, threads and processes are used through the thread class. This class is in our thread and threading modules. We usually import through threading

By default, if no error is reported in the interpreter, the thread is available

from threading import Thread

1. Common methods

Thread.run(self)  # The method that runs when the thread is started, which calls the function specified by the target parameter
Thread.start(self)  # To start a thread, the start method is to call the run method
Thread.terminate(self)  # Force thread termination
Thread.join(self, timeout)  # Block the call and the main thread waits
Thread.setDaemon(self, daemonic)  # Set child thread as daemon thread
Thread.getName(self, name)  # Get thread name
Thread.setName(self, name)  # Set thread name

2. Common parameters

parameterexplain
targetRepresents the calling object, that is, the task to be executed by the child thread
nameThe name of the child thread
argsThe positional parameter passed into the target function is a tuple, and a comma must be added after the parameter

3. Application of multithreading

3.1 rewriting thread method

import time, queue, threading


class MyThread(threading.Thread):

    def __init__(self):
        super().__init__()
        self.daemon = True  # Turn on guard mode
        self.queue = queue.Queue(3)  # Open the queue object and store three tasks
        self.start()  # When instantiating, start the thread directly, and there is no need to start the thread manually

    def run(self) -> None:  # run method is a method of the thread. It is a built-in method and will be called automatically when the thread is running
        while True:  # Continuous processing of tasks
            func, args, kwargs = self.queue.get()
            func(*args, **kwargs)  # The tuple of the calling function execution task is variable in length. Remember to unpack
            self.queue.task_done()  # Solve a task by reducing the counter by one to avoid blocking

    # Producer model
    def submit_tasks(self, func, args=(), kwargs={}):  # func is the task to be executed, adding the variable length parameter (the default parameter is used by default)
        self.queue.put((func, args, kwargs))  # Submit task

    # Override join method
    def join(self) -> None:
        self.queue.join()  # Check whether the queue timer is 0. If the task is empty, close the queue
        
        
def f2(*args, **kwargs):
    time.sleep(2)
    print("Task 2 complete", args, kwargs)

# Instantiate thread object
mt = MyThread()
# Submit task
mt.submit_tasks(f2, args=("aa", "aasd"), kwargs={"a": 2, "s": 3})

# Let the main thread wait for the child thread to finish
mt.join()

Guard mode:

  • The main thread runs only after other non daemon threads run (the daemon thread is recycled at this time). Because the end of the main thread means the end of the process, the overall resources of the process will be recycled, and the process must ensure that all non daemon threads run before it can end

3.2 direct debugging

def f2(i):
    time.sleep(2)
    print("Task 2 complete", i)

lis = []
for i in range(5):
    t = Thread(target=f2, args=(i,))
    t.start()  # Start 5 threads
    lis.append(t)

for i in lis:
    i.join()  # Thread waiting

4. Data sharing between threads

Now there are multiple threads in our program code, and the same part of the content will be operated in these threads, so how to realize the sharing of these data?

At this time, you can use the Lock object Lock in the threading library to protect it

The acquire method of the Lock object is to apply for a Lock

Before operating the shared data object, each thread should apply for obtaining the operation right, that is, calling the acquire method of the lock object corresponding to the shared data object. If thread A executes the acquire() method and other thread B has applied for the lock and has not released it, the code of thread A will wait here for thread B to release the lock, Do not execute the following code.

Thread A cannot acquire the lock until thread B executes the release method of the lock and releases the lock, and then the following code can be executed

For example:

import threading

var = 1
# Add a mutex and get the lock
lock = threading.Lock()

# Define the tasks to be used by two threads
def func1():
    global var  # Declare global variables
    for i in range(1000000):
        lock.acquire()  # Lock before operation
        var += i
        lock.release()  # Release the lock after operation
        
def func2():
    global var  # Declare global variables
    for i in range(1000000):       
		lock.acquire()  # Lock before operation    
        var -= i
        lock.release()  # Release the lock after operation
        
# Create 2 threads
t1 = threading.Thread(target=func1)
t2 = threading.Thread(target=func2)
t1.start()
t2.start()
t1.join()
t2.join()
print(var)

When using multithreading, if the data is inconsistent with your expectations, you can consider whether the shared data is called and overwritten

Use the Lock in the threading library to protect the object

3, Multi process usage

1. Introduction

Multiprocessing in Python is implemented through multiprocessing package, and multithreading Thread is similar. It can take advantage of multiprocessing Process object to create a process object. The method of this process object is similar to that of the thread object. There are also start(), run(), join(), and other methods. One method is different. The daemon method in the thread object is setdaemon, while the daemon of the process object is completed by setting the daemon attribute

2. Apply

2.1 rewrite process method

import time
from multiprocessing import Process


class MyProcess(Process):  # Inherit Process class
    def __init__(self, target, args=(), kwargs={}):
        super(MyProcess, self).__init__()
        self.daemon = True  # Start daemon
        self.target = target
        self.args = args
        self.kwargs = kwargs
        self.start()  # Auto start process

    def run(self):
        self.target(*self.args, **self.kwargs)


def fun(*args, **kwargs):
    print(time.time())
    print(args[0])


if __name__ == '__main__':
    lis = []
    for i in range(5):
        p = MyProcess(fun, args=(1, ))
        lis.append(p)
    for i in lis:
        i.join()  # Let the process wait

Guard mode:

  • The main process has finished running after its code is finished (the daemon will be recycled at this time), and then the main process will wait until the non daemon sub processes are finished running and recycle the resources of the sub processes (otherwise, a zombie process will be generated)

2.2 direct transfer usage

import time
from multiprocessing import  Process

def fun(*args, **kwargs):
    print(time.time())
    print(args[0])

if __name__ == '__main__':
    lis = []
    for i in range(5):
        p = Process(target=fun, args=(1, ))
        lis.append(p)
    for i in lis:
        i.join()  # Let the process wait

3. Data sharing between processes

3.1 Lock method

The use method is similar to the Lock use method of the thread

3.2 Manager method

The function of Manager is to provide global variables shared by multiple processes. The Manager() method will return an object that controls a service process. The objects saved in the process run other processes and use agents to operate

Manager Supported types are: list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value and Array

Syntax:

from multiprocessing import Process, Lock, Manager

def f(n, d, l, lock):
    lock.acquire()
    d[str(n)] = n
    l[n] = -99
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    with Manager() as manager:
        d = manager.dict()  # Empty dictionary
        l = manager.list(range(10))  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
        # Start 10 processes, and different processes operate on different elements in d and l
        for i in range(10):
            p = Process(target=f, args=(i, d, l, lock))
            p.start()
            p.join()

        print(d)
        print(l)

4, Pool concurrency

1. Grammar

The base class of thread pool is concurrent Executor in the futures module. Executor provides two subclasses, ThreadPoolExecutor and ProcessPoolExecutor. ThreadPoolExecutor is used to create thread pool and ProcessPoolExecutor is used to create process pool

If you use thread pool / process pool to manage concurrent programming, as long as you submit the corresponding task function to thread pool / process pool, the rest will be done by thread pool / process pool

Exectuor provides the following common methods:

  • submit(fn, *args, **kwargs): submit the fn function to the thread pool* Args represents the parameters passed to fn function, * kwargs represents the parameters passed to fn function in the form of keyword parameters
  • map(func, *iterables, timeout=None, chunksize=1): this function is similar to the global function map(func, *iterables), except that this function will start multiple threads to immediately execute map processing on iterables asynchronously.
  • shutdown(wait=True): close the thread pool

After the program submits the task function to the thread pool, the submit method will return a Future object. The Future class is mainly used to obtain the return value of the thread task function. Since the thread task will be executed asynchronously in the new thread, the function executed by the thread is equivalent to a "to be completed" task, so Python uses Future to represent it

Future provides the following methods:

  • cancel(): cancels the thread task represented by the Future. If the task is being executed and cannot be cancelled, the method returns False; Otherwise, the program cancels the task and returns True.
  • cancelled(): Returns whether the thread task represented by Future has been successfully cancelled.
  • running(): if the thread task represented by the Future is executing and cannot be cancelled, the method returns True.
  • done(): this method returns True if the thread task represented by this feature is successfully cancelled or executed.
  • result(timeout=None): get the last result returned by the thread task represented by the Future. If the thread task represented by Future has not been completed, this method will block the current thread, and the timeout parameter specifies the maximum number of seconds to block.
  • exception(timeout=None): get the exception thrown by the thread task represented by the Future. If the task is completed successfully and there is no exception, the method returns None.
  • add_done_callback(fn): register a "callback function" for the thread task represented by the Future. When the task is successfully completed, the program will automatically trigger the fn function

2. Get the number of CPU s

from multiprocessing import cpu_count  # CPU core number module, which can obtain the number of CPU cores

n = cpu_count()  # Get the number of cpu cores

3. Thread pool

The steps of using thread pool to execute thread tasks are as follows:

  1. Call the constructor of the ThreadPoolExecutor class to create a thread pool
  2. Define a normal function as a thread task
  3. Call the submit() method of the ThreadPoolExecutor object to submit the thread task
  4. When you don't want to submit any tasks, call the shutdown() method of the ThreadPoolExecutor object to close the thread pool
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# Define a function that is prepared as a thread task
def action(max):
    my_sum = 0
    for i in range(max):
        print(threading.current_thread().name + '  ' + str(i))
        my_sum += i
    return my_sum

# Create a thread pool with 2 threads
pool = ThreadPoolExecutor(max_workers=2)
# Submit a task to the thread pool, and 50 will be used as the parameter of the action() function
future1 = pool.submit(action, 50)
# Submit another task to the thread pool, and 100 will be used as the parameter of the action() function
future2 = pool.submit(action, 100)
def get_result(future):
	print(future.result())
    
# Add thread completed callback function for future1
future1.add_done_callback(get_result)
# Add a thread completed callback function for future2
future2.add_done_callback(get_result)
# Judge whether the task represented by future1 is over
print(future1.done())
time.sleep(3)
# Judge whether the task represented by future2 is over
print(future2.done())
# View the results returned by the tasks represented by future1
print(future1.result())
# View the results returned by the tasks represented by future2
print(future2.result())
# Close thread pool
pool.shutdown()  # The sequence can use the with statement to manage the thread pool, which can avoid manually closing the thread pool

Optimal number of threads = ((thread waiting time + thread CPU time) / thread CPU time) * number of CPUs

It can also be lower than the number of CPU cores

3. Process pool

The steps of using thread pool to execute thread tasks are as follows:

  1. Call the constructor of ProcessPoolExecutor class to create a thread pool
  2. Define a normal function as a process task
  3. Call the submit() method of the ProcessPoolExecutor object to submit the thread task
  4. When you don't want to submit any tasks, call the shutdown() method of the ProcessPoolExecutor object to close the thread pool

The opening code of the process must be placed in if__ name__ == '__ main__': Under the code, it cannot be placed in a function or elsewhere

Tips for starting the process

from concurrent.futures import ProcessPoolExecutor

pool = ProcessPoolExecutor(max_workers=cpu_count())  # How many processes are started according to the number of cpu cores

The number of open processes should be lower than the maximum number of CPU cores

Keywords: Python Back-end

Added by DChiuch on Tue, 01 Mar 2022 09:07:12 +0200