One article deals with Python multiprocessing

1.Python multi process module
Multiprocessing in Python is implemented through multiprocessing package, and multithreading Thread is similar. It can take advantage of multiprocessing Process object to create a process object. The method of this process object is similar to that of the thread object. There are also methods such as start(), run(), join(). One method is different. The daemon method in the thread thread object is setdaemon, while the daemon of the process process object is completed by setting the daemon attribute.
Let's talk about the implementation method of Python multi process, which is similar to multithreading
2.Python multi process implementation method I

from multiprocessing import  Process

def fun1(name):
    print('test%s Multi process' %name)

if __name__ == '__main__':
    process_list = []
    for i in range(5):  #Start 5 sub processes to execute fun1 function
        p = Process(target=fun1,args=('Python',)) #Instantiate process object
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    print('End test')

result

test Python Multi process
 test Python Multi process
 test Python Multi process
 test Python Multi process
 test Python Multi process
 End test

Process finished with exit code 0

The above code enables five sub processes to execute functions. We can observe that the results are printed at the same time. Here, the real parallel operation is realized, that is, multiple CPU s execute tasks at the same time. We know that a process is the smallest resource allocation unit in python, that is, the data in the middle of the process. Memory is not shared. Every time a process is started, it is necessary to allocate resources and copy the accessed data independently, so the cost of process startup and destruction is relatively large. Therefore, in practice, the use of multiple processes should be set according to the configuration of the server.
3.Python multi process implementation method II
Remember the second implementation of python multithreading? It is implemented through class inheritance, and the second implementation of python multi process is the same

from multiprocessing import  Process

class MyProcess(Process): #Inherit Process class
    def __init__(self,name):
        super(MyProcess,self).__init__()
        self.name = name

    def run(self):
        print('test%s Multi process' % self.name)


if __name__ == '__main__':
    process_list = []
    for i in range(5):  #Start 5 sub processes to execute fun1 function
        p = MyProcess('Python') #Instantiate process object
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    print('End test')

result

test Python Multi process
 test Python Multi process
 test Python Multi process
 test Python Multi process
 test Python Multi process
 End test

Process finished with exit code 0

The effect is the same as the first way.
We can see that the implementation of Python multi process is almost the same as that of multithreading.
Other methods of the Process class

Construction method:

Process([group [, target [, name [, args [, kwargs]]]]])
  group: Thread group 
  target: Method to execute
  name: Process name
  args/kwargs: Parameters to pass in the method

Instance method:
  is_alive(): Returns whether the process is running,bool Type.
  join([timeout]): Blocks the process of the current context until the process calling this method terminates or reaches the specified timeout(Optional parameters).
  start(): Process ready, waiting CPU dispatch
  run(): strat()call run Method, if the incoming target,this star implement t default run()method.
  terminate(): Stop the work process immediately whether the task is completed or not

Properties:
  daemon: And threaded setDeamon Same function
  name: Process name
  pid: Process number

The use of join and daemon is the same as that of python multithreading. We won't repeat it here. You can take a look at the previous python multithreading series articles.
4.Python multithreaded communication
The process is the basic unit for the system independent scheduling core to allocate system resources (CPU and memory). The processes are independent of each other. Each start of a new process is equivalent to cloning the data. The data modification in the sub process cannot affect the data in the main process, and the data between different sub processes cannot be shared, This is the most obvious difference between multiprocessing and multithreading. But is Python isolated among multiple processes? Of course not. Python also provides a variety of methods to realize communication and data sharing among multiple processes (you can modify a copy of data)
Process pair column Queue
Queue has also been mentioned in multithreading. It is thread safe to use in the producer consumer mode. It is the data pipeline between producers and consumers. In python multiprocessing, it is actually the data pipeline between processes to realize process communication.

from multiprocessing import Process,Queue


def fun1(q,i):
    print('Subprocess%s start put data' %i)
    q.put('I am%s adopt Queue signal communication' %i)

if __name__ == '__main__':
    q = Queue()

    process_list = []
    for i in range(3):
        p = Process(target=fun1,args=(q,i,))  #Note that in args, the q object should be passed to the method we want to execute, so that the child process can communicate with the main process using Queue
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    print('Main process acquisition Queue data')
    print(q.get())
    print(q.get())
    print(q.get())
    print('End test')

result

Child process 0 started put data
 Subprocess 1 started put data
 Subprocess 2 starts put data
 Main process acquisition Queue data
 I passed 0 Queue signal communication
 I was 1 through Queue signal communication
 I passed 2 Queue signal communication
 End test

Process finished with exit code 0

From the above code results, we can see that in our main process, we can obtain the put data in the sub process through the Queue to realize the communication between processes.
Pipe
The functions of Pipe and Queue are roughly the same. They also realize inter process communication. Let's see how to use them

from multiprocessing import Process, Pipe
def fun1(conn):
    print('Child process send message:')
    conn.send('Hello, main process')
    print('Subprocess accept message:')
    print(conn.recv())
    conn.close()

if __name__ == '__main__':
    conn1, conn2 = Pipe() #Key points, pipe instantiation generates a two-way pipe
    p = Process(target=fun1, args=(conn2,)) #conn2 passed to child process
    p.start()
    print('Main process accept message:')
    print(conn1.recv())
    print('Message sent by main process:')
    conn1.send("Hello sub process")
    p.join()
    print('End test')

result

Main process accept message:
Child process send message:
Subprocess accept message:
Hello, main process
 Message sent by main process:
Hello sub process
 End test

Process finished with exit code 0

As you can see above, the main process and sub process can send messages to each other
Managers
Queue and Pipe only realize data interaction, not data sharing, that is, one process changes the data of another process. It takes so long to use Managers

from multiprocessing import Process, Manager

def fun1(dic,lis,index):

    dic[index] = 'a'
    dic['2'] = 'b'    
    lis.append(index)    #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
    #print(l)

if __name__ == '__main__':
    with Manager() as manager:
        dic = manager.dict()#Note that the declaration method of the dictionary cannot be defined directly through {}
        l = manager.list(range(5))#[0,1,2,3,4]

        process_list = []
        for i in range(10):
            p = Process(target=fun1, args=(dic,l,i))
            p.start()
            process_list.append(p)

        for res in process_list:
            res.join()
        print(dic)
        print(l)

result:

{0: 'a', '2': 'b', 3: 'a', 1: 'a', 2: 'a', 4: 'a', 5: 'a', 7: 'a', 6: 'a', 8: 'a', 9: 'a'}
[0, 1, 2, 3, 4, 0, 3, 1, 2, 4, 5, 7, 6, 8, 9]

You can see that the main process defines a dictionary and a list. In the sub process, you can add and modify the contents of the dictionary, insert new data into the list, and realize data sharing between processes, that is, you can modify the same data together
5. Process pool
A process sequence is maintained in the process pool. When used, a process is obtained from the process pool. If there are no incoming processes available in the process pool sequence, the program will wait until there are available processes in the process pool. There are several processes that can be used.
There are two methods in the process pool:
apply: synchronous, generally not used
apply_async: asynchronous

from  multiprocessing import Process,Pool
import os, time, random

def fun1(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    pool = Pool(5) #Create a process pool of 5 processes

    for i in range(10):
        pool.apply_async(func=fun1, args=(i,))

    pool.close()
    pool.join()
    print('End test')

result

Run task 0 (37476)...
Run task 1 (4044)...
Task 0 runs 0.03 seconds.
Run task 2 (37476)...
Run task 3 (17252)...
Run task 4 (16448)...
Run task 5 (24804)...
Task 2 runs 0.27 seconds.
Run task 6 (37476)...
Task 1 runs 0.58 seconds.
Run task 7 (4044)...
Task 3 runs 0.98 seconds.
Run task 8 (17252)...
Task 5 runs 1.13 seconds.
Run task 9 (24804)...
Task 6 runs 1.46 seconds.
Task 4 runs 2.73 seconds.
Task 8 runs 2.18 seconds.
Task 7 runs 2.93 seconds.
Task 9 runs 2.93 seconds.
End test

Calling the join() method on the Pool object will wait for all child processes to complete execution. Before calling join(), you must call close(). After calling close(), you cannot continue to add new processes.
Process pool map method
The case comes from the Internet. Please let us know. Thank you
I think it's good to see this example online, so I don't write my own case here. This case is more persuasive

import os 
import PIL 

from multiprocessing import Pool 
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = \'thumbs\'

def get_image_paths(folder):
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if \'jpeg\' in f)

def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == \'__main__\':
    folder = os.path.abspath(
        \'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840\')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    pool = Pool()
    pool.map(creat_thumbnail, images) #Key, images is an iteratable object
    pool.close()
    pool.join()

The main work of the above code is to traverse the image files in the incoming folder, generate thumbnails one by one, and save these thumbnails to a specific folder. On my machine, it takes 27.9 seconds to process 6000 pictures with this program. The map function does not support manual thread management, but makes the related debug ging work extremely simple.
map can also be used in the field of crawlers. For example, the content of multiple URLs can be crawled. You can put the URL into Yuanzu and pass it to the execution function.

Keywords: Python

Added by phpCCore Brad on Wed, 05 Jan 2022 21:31:51 +0200