Use of process Pool in combination with Queue queue in Manager

Process Pool:

When the number of sub processes to be created is small, you can directly use the Process in multiprocessing to dynamically generate multiple processes, but if it is hundreds of Process goals, the workload of manual creation will be very huge. At this time, you can use the Pool class in the multiprocessing module.

When initializing the Pool, you can specify a maximum number of processes. When a new request is submitted to the Pool, if the process Pool is not full, a new process will be created to execute the request; If the number of processes in the Pool has reached the specified maximum, the request will wait until there are processes ending in the Pool, and then the previously ended processes will be used to perform new tasks.

Common methods of Pool:

        apply_async(func[,args[,kwds]]): call func in a non blocking manner (parallel execution, blocking mode must wait for the upper

One process cannot execute the next process until it exits). args is the parameter list passed to func and kwds is the parameter list passed to func

func keyword erasure list.  

close(): close the Pool so that it will no longer receive new tasks.

terminate(): terminate the task immediately regardless of whether it is completed or not.

join(): the main process is blocked and waits for the child process to exit. It must be used after close() or terminate().

I Use of process Pool in multiprocessing module:

        from multiprocessing import Pool

Pool = pool (number of processes in the process pool)

        pool.apply_async(func = task function, args (tuple class parameter,))

        pool.close()

        pool.join()

from multiprocessing import Pool
import os, time, random


def work1(msg):
    start_time = time.time()  # start time
    print("Circular task%d By process number%d Process execution" % (msg, os.getpid()))
    time.sleep(random.random())  # Random production of floating point numbers 0-1
    end_time = time.time()  # End time
    print(msg, "Execution completed, time consuming%0.2f" % (end_time - start_time))


if __name__ == '__main__':
    pool = Pool(3)  # Define a process pool. The maximum number of processes is 3
    for i in range(1, 11):
        # Each cycle will use the idle subprocess to call the target task (function)
        pool.apply_async(func=work1, args=(i,))  
    print("------start------")
    pool.close()  # Close the process pool. After closing, the pool will no longer receive new request tasks
    pool.join()  # Wait for all child processes in the pool process pool to complete execution and must be placed in the pool After close()
    print("-----end------")


Output:
------start------
Cycle task 1 is executed by process number 18809
 Loop task 2 is executed by process number 18810
 Loop task 3 is executed by process number 18811
3 Execution completed, time consuming 0.42
 Cycle task 4 is executed by process number 18811
1 Execution completed, time consuming 0.47
 Circular task 5 is executed by process No. 18809
2 Execution completed, time consuming 0.54
 Loop task 6 is executed by process number 18810
4 Execution completed, time consuming 0.19
 Circular task 7 is executed by process number 18811
7 Execution completed, time consuming 0.06
 Loop task 8 is executed by process number 18811
8 Execution completed, time consuming 0.12
 Loop task 9 is executed by process number 18811
5 Execution completed, time consuming 0.82
 Cycle task 10 is executed by process number 18809
6 Execution completed, time consuming 0.86
9 Execution completed, time consuming 0.85
10 Execution completed, time consuming 0.65
-----end------

Summary: for the three processes in the process pool, when executing tasks circularly, initially, the three processes will start and execute tasks 1, 2 and 3 at the same time; When process 18811 completes task 1 first, task 4 will be executed immediately.

_____________________________________________________________________________

II Inter process communication problems in process pool:

Use of process Pool in combination with Queue in Manager:

Queue in the process Pool: if you want to use Pool to create a process, you need to use multiprocessing Queue () in Manager () instead of multiprocessing Queue().

from multiprocessing import Pool, Manager
import os, time, random


def reader(q):
    print("reader Start(%s),Parent process is(%s)" % (os.getpid(), os.getppid()))
    print("reader process%s from Queue The information obtained in is:%s" % (os.getpid(), q.get()))


def writer(q):
    print("writer Start(%s),Parent process is(%s)" % (os.getpid(), os.getppid()))
    for i in range(1, 11):
        q.put(i)


if __name__ == '__main__':
    print("(%s) start" % os.getpid())  # Print main process (parent process)
    pool = Pool(5)  # Create process pool
    q = Manager().Queue()  # Using Queue in Manager
    pool.apply_async(func=writer, args=(q,))
    time.sleep(1)  # First let the writer task finish the production data in the queue, and then use the reader task to consume the data from the queue.
    while True:
        if q.qsize() > 0:
            pool.apply_async(func=reader, args=(q,))
        else:
            break
    pool.close()
    pool.join()

Output:
(23003) start
writer Start (23005),Parent process is (23003)
reader Start (23005),Parent process is (23003)
reader Process 23005 from Queue The information obtained in is: 1
reader Start (23005),Parent process is (23003)
reader Process 23005 from Queue The information obtained in is: 2
reader Start (23005),Parent process is (23003)
reader Process 23005 from Queue The information obtained in is: 3
reader Start (23005),Parent process is (23003)
reader Process 23005 from Queue The information obtained in is: 4
reader Start (23005),Parent process is (23003)
reader Process 23005 from Queue The information obtained in is: 5
reader Start (23005),Parent process is (23003)
reader Process 23005 from Queue The information obtained in is: 6
reader Start (23006),Parent process is (23003)
reader Start (23007),Parent process is (23003)
reader Start (23009),Parent process is (23003)
reader Process 23006 from Queue The information obtained in is: 7
reader Start (23008),Parent process is (23003)
reader Process 23007 from Queue The information obtained in is: 8
reader Process 23009 from Queue The information obtained in is: 9
reader Start (23005),Parent process is (23003)
reader Process 23005 from Queue The information obtained in is: 10
reader Start (23007),Parent process is (23003)
reader Start (23009),Parent process is (23003)
reader Start (23006),Parent process is (23003)
reader Start (23005),Parent process is (23003)

Summary: five processes in the process pool, according to the manager The data in the queue () queue is dynamically assigned to the writer and reader tasks. There are five processes from 23005 to 23009, all of which execute the reader task in turn.

Keywords: Python queue

Added by PowersWithin on Mon, 07 Feb 2022 08:51:23 +0200