Distributed Process refers to distributing Process processes to multiple machines, making full use of the performance of multiple machines to complete complex tasks. Among threads and processes, Process should be preferred because Process is more stable, and Process can be distributed to multiple machines, while threads can only be distributed to multiple CPU s of the same machine at most.
Python's multiprocessing module not only supports multi processes, but also the managers sub module supports the distribution of multi processes to multiple machines. A service process can act as a scheduler to distribute tasks to other processes and rely on network communication. Because the managers module is well encapsulated, it is easy to write distributed multi process programs without knowing the details of network communication.
For example, when doing crawler programs, we often encounter such scenes. We want to grab the link address of the picture and store the link address in the Queue. Another process is responsible for reading the link address from the Queue, downloading and storing it locally. Now make this process distributed. The processes on one machine are responsible for fetching links and the processes on other machines are responsible for downloading and storing. The main problem is to expose the Queue to the network so that other machine processes can access it. The distributed process encapsulates this process. We can call this process the networking of this Queue.
Creating a distributed process requires a service process and a task process:
Service process creation:
Establish a queue for inter process communication. The service process creates a task queue task_queue, which is used as a channel to transfer tasks to task processes; Service process creates result queue result_queue, as the channel for the task process to reply to the service process after completing the task. In the distributed multi - process environment, tasks must be added by obtaining the queue interface from the queue manager
The queue established in the first step is registered on the network and exposed to other processes (hosts). After registration, the network queue is obtained, which is equivalent to the image of the queue
Create an instance of queue manager (base manager), bind the port and verify the password.
Start the instance established in step 3, that is, start the management manager to supervise the information channel
The Queue object accessed through the network is obtained by managing instances, that is, the network Queue is materialized into a local Queue that can be used
Create tasks to the "local" queue, automatically upload tasks to the network queue, and assign them to the task process for processing.
Note: I'm based on the window s operating system here, and the linux system will be different
# coding:utf-8 # taskManager.py for win import Queue from multiprocessing.managers import BaseManage from multiprocessing import freeze_support
Number of tasks
task_num = 10
Define send receive queue
task_queue = Queue.Queue(task_num) result_queue = Queue.Queue(task_num) def get_task(): return task_queue def get_result(): return result_queue
Create a similar QueueManage
class QueueManager(BaseManager): pass def win_run(): # lambda cannot be used for binding and calling interfaces under windows, so you can only define functions before binding QueueManager.register('get_task_queue', callable=get_task) QueueManager.register('get_result_queue', callable=get_result) # Bind the port and set the authentication password. The IP address needs to be filled in under windows, not under Linux. It is local by default manager = QueueManager(address=('127.0.0.1', 4000), authkey='qty')
Get task queue and result queue through network
task = manager.get_task_queue() result = manager.get_result_queue() try: # Add task for i in range(10): print 'put task %s...' % i task.put(i) print 'try get result...' for i in range(10): print 'result is %s' % result.get(timeout=10)
except: print 'manage error' finally:
Be sure to close it, otherwise an error will be reported that the management is not closed
manager.shutdown() print 'master exit!'
if __name__ == '__main__':
There may be problems with multiple processes under windows. Adding this sentence can alleviate these problems
Use the QueueManager to register the method name used to obtain the Queue. The task process can only obtain the Queue on the network through the name
When connecting to the server, the port and authentication password should be completely consistent with those in the service process
Get the Queue from the network for localization
Get the Task from the Task queue and the result to the result queue
import time from multiprocessing.managers import BaseManage
Create a similar QueueManager:
class QueueManager(BaseManager): pass
Step 1: use the Queue manager to register the method name used to get the Queue
Step 2: connect to the server
server_addr = '127.0.0.1' print "Connect to server %s" % server_add
The port and authentication password should be completely consistent with the service process
m = QueueManager(address=(server_addr, 4000), authkey='qty')
Connect from network
Step 3: get the object of the Queue
task = m.get_task_queue() result = m.get_result_queue()
Step 4: get the task from the task queue and write the result to the result queue:
while not task.empty(): index = task.get(True, timeout=10) print 'run task download %s' % str(index) result.put('%s---->success ' % str(index))
End of processing
print 'worker exit.'
results of enforcement
Run first: the service process gets the result
put task 0... put task 1... put task 2... put task 3... put task 4... put task 5... put task 6... put task 7... put task 8... put task 9... try get result...
Run again immediately: the task process gets the result to prevent the process from not getting the result after running. Here, it must be executed immediately
Connect to server 127.0.0.1 run task download 0 run task download 1 run task download 2 run task download 3 run task download 4 run task download 5 run task download 6 run task download 7 run task download 8 run task download 9 worker exit.
Finally, look back at the results of the service process window
put task 0... put task 1... put task 2... put task 3... put task 4... put task 5... put task 6... put task 7... put task 8... put task 9... try get result... result is 0---->success result is 1---->success result is 2---->success result is 3---->success result is 4---->success result is 5---->success result is 6---->success result is 7---->success result is 8---->success result is 9---->success master exit!
This is a simple but true distributed computing. Slightly modify the code, start multiple worker s, and distribute the tasks to several or even dozens of machines to realize large-scale distributed crawlers