Concurrent model of multiprocessing

Demand:
1. Periodically perform certain operations on a batch of machines;
2. Required for this operation time Unstable and fluctuating;
3. More machines need to be operated each time processRealization But Not suitable Start the process separately for each machine, otherwise there are too many processes;

Since the operation time of each cycle is not fixed, it is inevitable that two cycles overlap, so the operation of each cycle should be independent as much as possible, so that they will not affect each other.
Based on the above considerations, program General thinking Yes: the main process periodically fork a sub process, which uses multi process ing creates multiple processes to perform specific operations. The code is as follows:

 

  1. #! /usr/bin/env python
    # -*- coding: utf-8 -*-
    #
    import os
    import sys
    import time
    import random
    import multiprocessing
    # Target machine set, hundreds less, thousands more
    hosts = [ '192.168.1.1',
              '192.168.1.2',
              '192.168.1.3',
              '192.168.1.4',
              '192.168.1.5',
              '192.168.1.6',
              '192.168.1.7',
              '192.168.1.8',
              '192.168.1.9',
              '192.168.1.10',
             ]
    # For the specific operation of each machine, nothing is done here
    def handle(host):
        print 'Handling %s' % host
        time.sleep(random.randint(10, 15))
        return host
    # Loop takes a pending machine from the task queue for processing and places the result in the done queue until 'STOP' is encountered
    def grandchild(task_queue, done_queue):
        for item in iter(task_queue.get, 'STOP'):
            result = handle(item)
            done_queue.put(result)
    # Each cycle is operated in a separate subprocess
    def child():
        print 'Hi! This is child %d, my father is %d' %\
                            (os.getpid(), os.getppid())
        task_queue = multiprocessing.Queue()
        done_queue = multiprocessing.Queue()
        # Put the machine to be processed into the task queue
        for host in hosts:
            print 'put %s in task_queue' % host
            task_queue.put(host)
        # Create a fixed number of processes for concurrent processing
        PN = 6
        processes = []
        for i in range(PN):
            process = multiprocessing.Process(target=grandchild, args=(task_queue, done_queue))
            process.start()
            processes.append(process)
        # Print the results in the done queue. Here, you can process the results, such as saving the database
        print 'Unordered results:'
        for i in range(len(hosts)):
            print 'get %s from done_queue' % done_queue.get()
        # End processing
        for i in range(PN):
            task_queue.put('STOP')
            print 'Stopping process #%d' % i
        # Avoid process becoming zombie
        print 'joining.....'
        for process in processes:
            process.join()
        
        sys.exit()
    children = []
    # Clean up zombie process
    def clear_defunct():
        for child in children:
            pid, status = os.waitpid(child, os.WNOHANG)
            if pid:
                children.remove(pid)
                print 'clear defunct', pid
    if __name__ == '__main__':
        # Periodically create subprocesses
        while True:
            pid = os.fork()
            if pid < 0:
                print 'fork error'
            else:
                if pid == 0:
                    child()
                else:
                    children.append(pid)
                    print 'Hi! This is parent %d' % os.getpid()
            clear_defunct()
            time.sleep(30)

Keywords: Python less Database

Added by BhA on Sun, 05 Jan 2020 10:46:07 +0200