Python distributed process

Distributed processes:
Distributed Process refers to distributing Process processes to multiple machines, making full use of the performance of multiple machines to complete complex tasks. Among threads and processes, Process should be preferred because Process is more stable, and Process can be distributed to multiple machines, while threads can only be distributed to multiple CPU s of the same machine at most.

Python's multiprocessing module not only supports multi processes, but also the managers sub module supports the distribution of multi processes to multiple machines. A service process can act as a scheduler to distribute tasks to other processes and rely on network communication. Because the managers module is well encapsulated, it is easy to write distributed multi process programs without knowing the details of network communication.

For example, when doing crawler programs, we often encounter such scenes. We want to grab the link address of the picture and store the link address in the Queue. Another process is responsible for reading the link address from the Queue, downloading and storing it locally. Now make this process distributed. The processes on one machine are responsible for fetching links and the processes on other machines are responsible for downloading and storing. The main problem is to expose the Queue to the network so that other machine processes can access it. The distributed process encapsulates this process. We can call this process the networking of this Queue.

Creating a distributed process requires a service process and a task process:

Service process creation:
Establish a queue for inter process communication. The service process creates a task queue task_queue, which is used as a channel to transfer tasks to task processes; Service process creates result queue result_queue, as the channel for the task process to reply to the service process after completing the task. In the distributed multi - process environment, tasks must be added by obtaining the queue interface from the queue manager
The queue established in the first step is registered on the network and exposed to other processes (hosts). After registration, the network queue is obtained, which is equivalent to the image of the queue
Create an instance of queue manager (base manager), bind the port and verify the password.
Start the instance established in step 3, that is, start the management manager to supervise the information channel
The Queue object accessed through the network is obtained by managing instances, that is, the network Queue is materialized into a local Queue that can be used
Create tasks to the "local" queue, automatically upload tasks to the network queue, and assign them to the task process for processing.
Note: I'm based on the window s operating system here, and the linux system will be different

coding:utf-8 for win

> import Queue
> from multiprocessing.managers import BaseManager
> from multiprocessing import freeze_support

Number of tasks

task_num = 10

Define send receive queue

task_queue = Queue.Queue(task_num)
result_queue = Queue.Queue(task_num)

def get_task():
    return task_queue

def get_result():
    return result_queue

Create a similar QueueManager

class QueueManager(BaseManager):

def win_run():
    # lambda cannot be used for binding and calling interfaces under windows, so you can only define functions before binding
    QueueManager.register('get_task_queue', callable=get_task)
    QueueManager.register('get_result_queue', callable=get_result)
    # Bind the port and set the authentication password. The IP address needs to be filled in under windows, not under Linux. It is local by default
    manager = QueueManager(address=('', 4000), authkey='qty')
# start-up

# Get task queue and result queue through network
task = manager.get_task_queue()
result = manager.get_result_queue()


    # Add task
    for i in range(10):
        print 'put task %s...' % i
    print 'try get result...'

    for i in range(10):
        print 'result is %s' % result.get(timeout=10)

    print 'manage error'
    # Be sure to close it, otherwise an error will be reported that the management is not closed
    print 'master exit!'

if name == '__main__':

# There may be problems with multiple processes under windows. Adding this sentence can alleviate these problems

Task process
Use the QueueManager to register the method name used to obtain the Queue. The task process can only obtain the Queue on the network through the name
When connecting to the server, the port and authentication password should be completely consistent with those in the service process
Get the Queue from the network for localization
Get the Task from the Task queue and the result to the result queue


import time
from multiprocessing.managers import BaseManager

Create a similar QueueManager:

class QueueManager(BaseManager):


Step 1: use the Queue manager to register the method name used to get the Queue


Step 2: connect to the server

server_addr = ''
print "Connect to server %s" % server_addr

The port and authentication password should be completely consistent with the service process

m = QueueManager(address=(server_addr, 4000), authkey='qty')

Connect from network


Step 3: get the object of the Queue

task = m.get_task_queue()
result = m.get_result_queue()

Step 4: get the task from the task queue and write the result to the result queue:

while not task.empty():

index = task.get(True, timeout=10)
print 'run task download %s' % str(index)
result.put('%s---->success ' % str(index))

End of processing

print 'worker exit.'

results of enforcement

Run first: the service process gets the result

put task 0...
put task 1...
put task 2...
put task 3...
put task 4...
put task 5...
put task 6...
put task 7...
put task 8...
put task 9...
try get result...

Run again immediately: the task process gets the result to prevent the process from not getting the result after running. Here, it must be executed immediately

Connect to server
run task download 0
run task download 1
run task download 2
run task download 3
run task download 4
run task download 5
run task download 6
run task download 7
run task download 8
run task download 9
worker exit.

Finally, look back at the results of the service process window

put task 0...
put task 1...
put task 2...
put task 3...
put task 4...
put task 5...
put task 6...
put task 7...
put task 8...
put task 9...
try get result...
result is 0---->success 
result is 1---->success 
result is 2---->success 
result is 3---->success 
result is 4---->success 
result is 5---->success 
result is 6---->success 
result is 7---->success 
result is 8---->success 
result is 9---->success 
master exit!

This is a simple but true distributed computing. Slightly modify the code, start multiple worker s, and distribute the tasks to several or even dozens of machines to realize large-scale distributed crawlers

Keywords: Python

Added by lightkeepr on Sat, 08 Jan 2022 08:51:33 +0200