[source code analysis] multi process architecture and model of parallel distributed task queue Celery

[source code analysis] multi process architecture and model of parallel distributed task queue Celery

0x00 summary

Celery is a simple, flexible and reliable distributed system that processes a large number of messages. It focuses on asynchronous task queue for real-time processing, and also supports task scheduling. Because celery improves execution efficiency through multiple processes, this article will lead you to a preliminary understanding of celery's multi process architecture and model.

Through this article, you can understand what thoughts and abstractions Celery has made in order to implement a multi process architecture, such as:

  • As a whole, how does Celery integrate the multi process model to get the process pool;
  • How to instantiate different multi process models according to different OS;
  • How to establish the communication mechanism between parent-child processes and how to separate reading and writing;
  • How to generate subprocesses, what is the working logic of subprocesses, and how to abstract subprocesses;
  • How to assist in managing sub processes;
  • How to assign tasks to child processes;
  • How to handle the return of subprocess;

Let's give a rough logic first, so that you can have a rough logic for subsequent understanding. Note in the following figure:

  • TaskHandler is the logic that the parent process assigns tasks to the child process;
  • ResultHandler is the logic that the parent process handles the return of the child process;
  • Supervisor is the assistant management handler;
  • Worker is the logical business code of sub process_ Pool is a process pool, ForkProcess (that is, WorkerProcess) is a sub process abstraction, and each sub process abstraction will run worker.

These logical concepts must be clearly distinguished.

+--------------------------+
| AsynPool                 |
|                          |
|                          |
|         ResultHandler  +------->  celery.concurrency.asynpool.ResultHandler
|                          |
|         Supervisor     +------->  billiard.pool.Supervisor
|                          |
|         TaskHandler    +------->  billiard.pool.TaskHandler
|                          |
|         TimeoutHandler +------->  billiard.pool.TimeoutHandler
|                          |
|         Worker         +------->  celery.concurrency.asynpool.Worker
|                          |
|         _pool +-----------------+--->  <ForkProcess(ForkPoolWorker-1, started daemon)>
+--------------------------+      |
                                  +--->  <ForkProcess(ForkPoolWorker-2, started daemon)>
                                  |
                                  +--->  <ForkProcess(ForkPoolWorker-3, started daemon)>
                                  |
                                  +--->  <ForkProcess(ForkPoolWorker-4, started daemon)>

Mobile phones are as follows

The multi process entry is located in the pool step of the Consumer, so we start with the start of the Consumer component.

0x01 Consumer component Pool bootstep

First, the Consumer Pool starts with bootsteps. This Bootstep is the real execution engine of worker.

The reason why the Pool bootstrap here has another layer of encapsulation is that it needs to set a scaling value, that is, the so-called autoscaler. Because we have built various pools here. There are task s behind them. Just throw them directly into the Pool.

1.1 bootsteps

The code is located in: cell / worker / components py. This is an entrance to:

  • Make various configurations;
  • Introduce TaskPool, which is the entrance of worker multi process;
class Pool(bootsteps.StartStopStep):

    def __init__(self, w, autoscale=None, **kwargs):
        w.pool = None
        if isinstance(autoscale, str):
            max_c, _, min_c = autoscale.partition(',')
            autoscale = [int(max_c), min_c and int(min_c) or 0]
        w.autoscale = autoscale
        if w.autoscale:
            w.max_concurrency, w.min_concurrency = w.autoscale
        super().__init__(w, **kwargs)

    def create(self, w):
        
        semaphore = None
        max_restarts = None
        
        if w.app.conf.worker_pool in GREEN_POOLS:  # pragma: no cover judge worker_pool is in 'eventlet' and 'gevent', and the default is prefork
        threaded = not w.use_eventloop or IS_WINDOWS            # user_ Whether EventLoop is True and whether it is windows. If so, use threads
        procs = w.min_concurrency                               # The minimum number of buffer pools is 4 by default
        w.process_task = w._process_task                        # The of the worker_ process_ Bind task to process_task
        if not threaded:                                        # Without threads
            semaphore = w.semaphore = LaxBoundedSemaphore(procs)  # Atomic operation is realized through LaxBoundedSemaphore, which is realized by queue
            w._quick_acquire = w.semaphore.acquire                # Assign relevant operation methods to worker
            w._quick_release = w.semaphore.release                
            max_restarts = 100                                    # Maximum number of restarts
            if w.pool_putlocks and w.pool_cls.uses_semaphore:     # Check whether the class configuration updates the process_task method
                w.process_task = w._process_task_sem              # Default configuration update process_task
        allow_restart = w.pool_restarts                           # Allow restart        
 
        pool = w.pool = self.instantiate(
            w.pool_cls, w.min_concurrency,  # w.pool_cls defaults to prefork TaskPool
            initargs=(w.app, w.hostname),
            maxtasksperchild=w.max_tasks_per_child,
            max_memory_per_child=w.max_memory_per_child,
            timeout=w.time_limit,
            soft_timeout=w.soft_time_limit,
            putlocks=w.pool_putlocks and threaded,
            lost_worker_timeout=w.worker_lost_wait,
            threads=threaded,
            max_restarts=max_restarts,
            allow_restart=allow_restart,
            forking_enable=True,
            semaphore=semaphore,
            sched_strategy=self.optimization,
            app=w.app,
        )
        _set_task_join_will_block(pool.task_join_will_block)
        return pool

Here is w.pool_cls is < class' cell concurrency. prefork. Taskpool '>, the logic is as follows:

+-------------------------------+
| Pool(bootsteps.StartStopStep) |
|                               |
|                               |
|  celery/worker/components.py  |
+---------------+---------------+
                |
                |
                |
                v
            __init__
                +
                |
                |
                |
                v
             create
                +
                |
                |
                v
       +--------+----------+
       |       TaskPool    |
       |                   |
       |            Pool +------->  celery.concurrency.asynpool.AsynPool
       |                   |
       |            app  +------->  Celery
       |                   |
       +-------------------+

0x02 process pool entry – TaskPool

TaskPool is a multi process portal, which is unified for all operating systems.

Because here is w.pool_cls is < class' cell concurrency. prefork. TaskPool '>, so the code comes to TaskPool. During initialization, instantiate will first come to the base class BasePool, which is located at: cell / concurrency / base py.

2.1 process pool initialization

Here__ init__ Note: self App = app, that is, the cell application itself will be passed in during initialization.

class BasePool:
    """Task pool."""

    Timer = timer2.Timer
    _state = None
    _pool = None

    def __init__(self, limit=None, putlocks=True, forking_enable=True,
                 callbacks_propagate=(), app=None, **options):
        self.limit = limit
        self.putlocks = putlocks
        self.options = options
        self.forking_enable = forking_enable
        self.callbacks_propagate = callbacks_propagate
        self.app = app

2.2 process pool start

Blueprint calls start.

class Blueprint:
    def start(self, parent):
        self.state = RUN
        if self.on_start:
            self.on_start()
        for i, step in enumerate(s for s in parent.steps if s is not None):
            self.started = i + 1
            step.start(parent)

Since the start function is not declared in TaskPool, the function defined in its parent class BasePool will be called here, as defined below

class BasePool(object):
    """Task pool."""

    def start(self):
        self._does_debug = logger.isEnabledFor(logging.DEBUG)
        self.on_start()
        self._state = self.RUN

On will be called here_ Start function. Since each subclass overrides this function, it will call on in the subclass_ Start function. Similarly, take TaskPool as an example, on_ The start function is defined as follows

class TaskPool(BasePool):
    """Multiprocessing Pool implementation."""

    Pool = AsynPool
    BlockingPool = BlockingPool

    uses_semaphore = True
    write_stats = None

    def on_start(self):
        forking_enable(self.forking_enable)
        Pool = (self.BlockingPool if self.options.get('threads', True)
                else self.Pool) # If multithreading is used, use BlockingPool; otherwise, use AsynPool
        P = self._pool = Pool(processes=self.limit,
                              initializer=process_initializer,
                              on_process_exit=process_destructor,
                              enable_timeouts=True,
                              synack=False,
                              **self.options) # Create Pool

        # Create proxy methods
        self.on_apply = P.apply_async # Set the method in the pool to the pool class
        self.maintain_pool = P.maintain_pool
        self.terminate_job = P.terminate_job
        self.grow = P.grow
        self.shrink = P.shrink
        self.flush = getattr(P, 'flush', None)  # FIXME add to billiard

As you can see, on_ The start function mainly completes three tasks

  • Determine whether BlockingPool or asynpool is used according to the option parameters (billiard.pool.Pool and celery.concurrency.asynpool.asyncpool respectively);
  • Create a Pool;
  • Create proxy method;

Here, in windows system, the corresponding_ Pool is < class' billiard pool. Then: mac pool '>, asyn system.

At this time, the specific logic is as follows:

+------------------------------+
| Pool(bootsteps.StartStopStep)|
+-------------+--------------+
              |
              |
              |
            1 | instantiate
              |                          2 on_start
              |        +--------------------------------+
              v        |                                |
      +-------+--------+--+                             |
      |    TaskPool       |                             |
      |                   |      +------+               |
      |       app   +----------> |celery|               |
      |                   |      +------+               |
      |                   |                             |
      |                   |      +-----------+          |
      |      _pool  +----------> | AsynPool  |          |
      |                   |      +-----------+          |
      +----------------+--+                             |
                       ^                                |
                       |                                |
                       +--------------------------------+

0x03 process pool implementation – AsynPool

_ pool is implemented separately according to different operating systems. You can see that pipes, file s, queue s, and real thread pools are configured here.

Assuming that the system is mac, we come to the AsynPool process pool. Location: Celery / concurrency / AsynPool py

3.1 instantiation

It mainly implements the instantiation of the process Pool. This instantiation is the concrete implementation of prefork. This Pool is actually AsyncPool.

The specific work is as follows:

  • Configure scheduling strategy;

  • According to the number of processes configured locally, that is, the number of sub processes that need to fork. The default is the number of cpu cores. If the - c parameter is set on the command line, it is the value of the - c parameter;

  • Create a bunch of read and write pipes. Depending on the flow direction and the difference between the main process and the child process, the pipes at the corresponding end will be closed respectively. For example, the parent process closes the write and the child process closes the read. It will be encapsulated with abstract data structure to facilitate management. The instance of this data structure is used to provide two-way data transmission for the main process and the sub process to be fork ed. Similarly, multiple pipeline instances will be created according to the number of child processes;

  • Call the base class constructor. Here is the key of fork;

  • Configure the relationship between file and queue according to the result of establishing sub process;

The code is as follows:

class AsynPool(_pool.Pool):
    """AsyncIO Pool (no threads)."""

    ResultHandler = ResultHandler
    Worker = Worker

    def WorkerProcess(self, worker):
        worker = super().WorkerProcess(worker)
        worker.dead = False
        return worker

    def __init__(self, processes=None, synack=False,
                 sched_strategy=None, proc_alive_timeout=None,
                 *args, **kwargs):
        self.sched_strategy = SCHED_STRATEGIES.get(sched_strategy,
                                                   sched_strategy)
        processes = self.cpu_count() if processes is None else processes #The number of child processes that need to fork. The default is the number of cpu cores. If the - c parameter is specified on the command line, it is the value of the - c parameter
        self.synack = synack
        # create queue-pairs for all our processes in advance.
        self._queues = {
            self.create_process_queues(): None for _ in range(processes) #Create a bunch of read and write pipes
        }

        # inqueue fileno -> process mapping
        self._fileno_to_inq = {}
        # outqueue fileno -> process mapping
        self._fileno_to_outq = {}
        # synqueue fileno -> process mapping
        self._fileno_to_synq = {}

        # We keep track of processes that haven't yet
        # sent a WORKER_UP message.  If a process fails to send
        # this message within _proc_alive_timeout we terminate it
        # and hope the next process will recover.
        self._proc_alive_timeout = (
            PROC_ALIVE_TIMEOUT if proc_alive_timeout is None
            else proc_alive_timeout
        )
        self._waiting_to_start = set()

        # denormalized set of all inqueues.
        self._all_inqueues = set()

        # Set of fds being written to (busy)
        self._active_writes = set()

        # Set of active co-routines currently writing jobs.
        self._active_writers = set()

        # Set of fds that are busy (executing task)
        self._busy_workers = set()
        self._mark_worker_as_available = self._busy_workers.discard

        # Holds jobs waiting to be written to child processes.
        self.outbound_buffer = deque()

        self.write_stats = Counter()

        super().__init__(processes, *args, **kwargs) #Call the base class constructor

        for proc in self._pool:
            # create initial mappings, these will be updated
            # as processes are recycled, or found lost elsewhere.
            self._fileno_to_outq[proc.outqR_fd] = proc
            self._fileno_to_synq[proc.synqW_fd] = proc

        self.on_soft_timeout = getattr(
            self._timeout_handler, 'on_soft_timeout', noop,
        )
        self.on_hard_timeout = getattr(
            self._timeout_handler, 'on_hard_timeout', noop,
        )

3.2 establish communication mechanism

In the instantiation code, the establishment of queues needs to be emphasized, because the parent process and child process use queues for communication.

The code is as follows:

self._queues = {
    self.create_process_queues(): None for _ in range(processes)
}

A pile of read and write pipelines are created here. The number of processes here is 4, so four groups of pipeline lists are established, and each group of list includes two_ SimpleQueue, as follows.

self._queues = {dict: 4} 
 (<billiard.queues._SimpleQueue object>, <billiard.queues._SimpleQueue object at 0x = {NoneType} 
 (<billiard.queues._SimpleQueue object>, <billiard.queues._SimpleQueue object at 0x = {NoneType} 
 (<billiard.queues._SimpleQueue object>, <billiard.queues._SimpleQueue object at 0x = {NoneType} 
 (<billiard.queues._SimpleQueue object>, <billiard.queues._SimpleQueue object at 0x = {NoneType} 
 __len__ = {int} 4

The method of creating queues is as follows: INQ, outq and synq are established here:

def create_process_queues(self):
    """Create new in, out, etc. queues, returned as a tuple."""
    # NOTE: Pipes must be set O_NONBLOCK at creation time (the original
    # fd), otherwise it won't be possible to change the flags until
    # there's an actual reader/writer on the other side.
    inq = _SimpleQueue(wnonblock=True)
    outq = _SimpleQueue(rnonblock=True)
    synq = None
    if self.synack:
        synq = _SimpleQueue(wnonblock=True)
    return inq, outq, synq

3.2.1 _SimpleQueue

_ SimpleQueue is a locked pipe, that is, a pipe. It is defined as follows:

class _SimpleQueue(object):
    '''
    Simplified Queue type -- really just a locked pipe
    '''
    def __init__(self, rnonblock=False, wnonblock=False, ctx=None):
        self._reader, self._writer = connection.Pipe(
            duplex=False, rnonblock=rnonblock, wnonblock=wnonblock,
        )
        self._poll = self._reader.poll
        self._rlock = self._wlock = None

Examples of variables are as follows:

self._poll = {method} <bound method _ConnectionBase.poll of <billiard.connection.Connection self = {_SimpleQueue} <billiard.queues._SimpleQueue object at 0x7fc46ae049e8>
  _reader = {Connection} <billiard.connection.Connection object at 0x7fc46ae68c18>
  _writer = {Connection} <billiard.connection.Connection object at 0x7fc46ae726a0>

3.2.2 Pipe

Above_ Self. Of SimpleQueue_ reader, self._ The writer is of pipe type, so you need to see it.

pipe is defined as follows:

In fact, two connections are established and returned to_ SimpleQueue. One of these two connections is read abstraction and the other is write abstraction.

if sys.platform != 'win32':

    def Pipe(duplex=True, rnonblock=False, wnonblock=False):
        '''
        Returns pair of connection objects at either end of a pipe
        '''
        if duplex:
            s1, s2 = socket.socketpair()
            s1.setblocking(not rnonblock)
            s2.setblocking(not wnonblock)
            c1 = Connection(detach(s1))
            c2 = Connection(detach(s2))
        else:
            fd1, fd2 = os.pipe()
            if rnonblock:
                setblocking(fd1, 0)
            if wnonblock:
                setblocking(fd2, 0)
            c1 = Connection(fd1, writable=False)
            c2 = Connection(fd2, readable=False)

        return c1, c2

3.2.3 Connection

The above also involves Connection. Note that this is not the Connection of Kombu, but the Connection definition of multiple processes.

class Connection(_ConnectionBase):
    """
    Connection class based on an arbitrary file descriptor (Unix only), or
    a socket handle (Windows).
    """

Connection is a connection class based on file descriptor.

class _ConnectionBase(object):
    _handle = None

    def __init__(self, handle, readable=True, writable=True):
        if isinstance(handle, _SocketContainer):
            self._socket = handle.sock  # keep ref so not collected
            handle = handle.sock.fileno()
        handle = handle.__index__()
        self._handle = handle
        self._readable = readable
        self._writable = writable

Now the variables are as follows:

c1 = {Connection} <billiard.connection.Connection object at 0x7fc46ae68c18>
c2 = {Connection} <billiard.connection.Connection object at 0x7fc46ae726a0>

So AsynPool ended up as follows:

    +------------------------------+                                                                     +----------------+
    | Pool(bootsteps.StartStopStep)|                                          +-----------------+        | Connection     |
    +-------------+--------------+                                            |  _SimpleQueue   |        |                |
                  |                                                           |                 |        |        _write  |
                  |                                                           |      _reader +---------> |        _read   |
                  |                                                           |                 |        |        _send   |
                1 | instantiate                                               |                 |        |        _recv   |
                  |                                                           |                 |        |        _handle |
 2 on_start       |                                                           |                 |        +----------------+
                  |                                                           |      _poll   +--------->  _ConnectionBase.poll
+-------------+   |                                                           |                 |
|             |   |                                                           |                 |        +------------+
|             |   v                                                           |      _writer +---------> | Connection |
|         +---+---+-----------+                                               |                 |        +------------+
|         |    TaskPool       |                                               +-------+---------+
|         |                   |      +------+                                         ^
|         |       app   +----------> |celery|                                         |
|         |                   |      +------+                                         |
|         |                   |                                                       +
|         |                   |      +--------------------------+     +---->  (<_SimpleQueue>, <_SimpleQueue>)
|         |      _pool  +----------> | AsynPool                 |     |
|         |                   |      |                          |     |
|         +---+---------------+      |               _queues +------->----->  (<_SimpleQueue>, <_SimpleQueue>)
|             ^                      |                          |     |
|             |                      |                          |     |
|             |                      +--------------------------+     +---->  (<_SimpleQueue>, <_SimpleQueue>)
+-------------+                                                       |
                                                                      |
                                                                      +---->  (<_SimpleQueue>, <_SimpleQueue>)

Mobile phones are as follows:

3.3 construction method of process pool base class

Besides, the author said that this kind of process should be modified specifically for Celery. Various message processing functions and sub processes are established here.

Location: Billiard / pool py

The key work here is as follows:

  • Use self_ Process = self._ ctx. Process is set to < class' billiard context. ForkProcess'>;

  • Pass according to the number of child processes_ create_worker_process(i) establish sub processes;

  • Create self_ worker_ handler = self. Supervisor(self);

  • Create a TaskHandler for assigning tasks;

  • Create TimeoutHandler;

  • Create ResultHandler;

The specific codes are as follows:

class Pool(object):
    '''
    Class which supports an async version of applying functions to arguments.
    '''

    def __init__(self, processes=None, initializer=None, initargs=(),..., **kwargs):
        self._ctx = context or get_context()
        self._setup_queues()
        self._taskqueue = Queue()
        self._cache = {}
        self._state = RUN
        .....
        self.readers = {}

        self._processes = self.cpu_count() if processes is None else processes
        self.max_restarts = max_restarts or round(self._processes * 100)
        self.restart_state = restart_state(max_restarts, max_restart_freq or 1)
        self._Process = self._ctx.Process
        self._pool = []
        self._poolctrl = {}
        self._on_ready_counters = {}
        self.putlocks = putlocks
        self._putlock = semaphore or LaxBoundedSemaphore(self._processes)
        for i in range(self._processes):
            self._create_worker_process(i)

        self._worker_handler = self.Supervisor(self)
        if threads:
            self._worker_handler.start()

        self._task_handler = self.TaskHandler(self._taskqueue,
                                              self._quick_put,
                                              self._outqueue,
                                              self._pool,
                                              self._cache)
        if threads:
            self._task_handler.start()

        # Thread killing timedout jobs.
        if self.enable_timeouts:
            self._timeout_handler = self.TimeoutHandler(
                self._pool, self._cache,
                self.soft_timeout, self.timeout,
            )
            self._timeout_handler_mutex = Lock()
            self._timeout_handler_started = False
            self._start_timeout_handler()
            # If running without threads, we need to check for timeouts
            # while waiting for unfinished work at shutdown.
            if not threads:
                self.check_timeouts = self._timeout_handler.handle_event

        # Thread processing results in the outqueue.
        self._result_handler = self.create_result_handler()
        self.handle_result_event = self._result_handler.handle_event

        if threads:
            self._result_handler.start()

        self._terminate = Finalize(
            self, self._terminate_pool,
            args=(self._taskqueue, self._inqueue, self._outqueue,
                  self._pool, self._worker_handler, self._task_handler,
                  self._result_handler, self._cache,
                  self._timeout_handler,
                  self._help_stuff_finish_args()),
            exitpriority=15,
        )

Let's analyze them one by one.

3.3.1 establish sub process

The following code establishes a sub process.

for i in range(self._processes):
    self._create_worker_process(i)

_ create_ worker_ The main work of process is as follows:

  • inq, outq, synq = self.get_process_queues() gets an abstract object of a read and write pipeline. This pipeline was created in advance (created by self.create_process_queues() above). It is mainly used for the subprocess about to fork. The subprocess will listen to the read events in the abstract instance of the pipeline data structure, and can also write data from the write pipeline.

  • w. That is, self The instance of workerprocess is actually an abstract encapsulation of the child process from fork. It is used to conveniently and quickly manage sub processes and abstract them into a process pool. This w will record some meta information of sub processes from fork, such as pid, fd of pipeline reading and writing, etc., and register them in the main process, which can be used by the main process for task distribution;

  • Record the instance of WorkerProcess in self_ Pool, which is very important. The parent process uses this variable to know which child processes there are;

  • w.start() contains a specific fork process;

The code is as follows:

def _create_worker_process(self, i):
    sentinel = self._ctx.Event() if self.allow_restart else None
    inq, outq, synq = self.get_process_queues()
    on_ready_counter = self._ctx.Value('i')
    
    w = self.WorkerProcess(self.Worker(
        inq, outq, synq, self._initializer, self._initargs,
        self._maxtasksperchild, sentinel, self._on_process_exit,
        # Need to handle all signals if using the ipc semaphore,
        # to make sure the semaphore is released.
        sigprotection=self.threads,
        wrap_exception=self._wrap_exception,
        max_memory_per_child=self._max_memory_per_child,
        on_ready_counter=on_ready_counter,
    ))
    self._pool.append(w)
    self._process_register_queues(w, (inq, outq, synq))
    w.name = w.name.replace('Process', 'PoolWorker')
    w.daemon = True
    w.index = i
    w.start()
    self._poolctrl[w.pid] = sentinel
    self._on_ready_counters[w.pid] = on_ready_counter
    if self.on_process_up:
        self.on_process_up(w)
    return w

Because I mentioned self WorkerProcess (self. Worker...), so let's introduce WorkerProcess and worker respectively.

The logic is as follows:

    +----------------+
    |  StartStopStep |
    +-------+--------+
            |
            |   start
            |
            v
+-----------+-------------------+
|        BasePool               |
|   celery/concurrency/base.py  |
+-----------+-------------------+
            |
            |   start
            |
            v
+-----------+-------------------+
|        TaskPool               |
| celery/concurrency/prefork.py |
+-----------+-------------------+
            |
            |  on_start
            |
            v
+-----------+--------------------+
|        AsynPool                |
| celery/concurrency/asynpool.py |
+-----------+--------------------+
            |
            |
            v
   +--------+------------+
   |  class Pool(object) |
   |   billiard/pool.py  |
   +--------+------------+
            |
       +----+------+
       |           |
       v           v                          +----------------------+
   __init__    _create_worker_process  +--->  | class Worker(object) |
                                              +----------------------+

3.3.1.1 sub process working code

Worker is the working code of the child process. There are also several different implementation methods, such as:

celery.concurrency.asynpool.Worker,billiard/pool. Workers are all sub process work cycles.

In Billiard / pool Take worker as an example.

The main work of Worker init is to configure various fd.

Here obj inqW_ fd = self. inq._ writer. Fileno() is to get the corresponding FD from the corresponding Connection of queues:

class _ConnectionBase(object):
    _handle = None

    def fileno(self):
        """File descriptor or handle of the connection"""
        self._check_closed()
        return self._handle

Specific Worker definitions are as follows:

class Worker(object):

    def __init__(self, inq, outq, synq=None, initializer=None, initargs=(),...):
        ......
        self.max_memory_per_child = max_memory_per_child
        self._shutdown = sentinel
        self.inq, self.outq, self.synq = inq, outq, synq
        self.contribute_to_object(self)

    def contribute_to_object(self, obj):
        obj.inq, obj.outq, obj.synq = self.inq, self.outq, self.synq
        obj.inqW_fd = self.inq._writer.fileno()    # inqueue write fd
        obj.outqR_fd = self.outq._reader.fileno()  # outqueue read fd
        if self.synq:
            obj.synqR_fd = self.synq._reader.fileno()  # synqueue read fd
            obj.synqW_fd = self.synq._writer.fileno()  # synqueue write fd
            obj.send_syn_offset = _get_send_offset(self.synq._writer)
        else:
            obj.synqR_fd = obj.synqW_fd = obj._send_syn_offset = None
        obj._quick_put = self.inq._writer.send
        obj._quick_get = self.outq._reader.recv
        obj.send_job_offset = _get_send_offset(self.inq._writer)
        return obj

The variables are:

self = {Worker}  
 initargs = {tuple: 2} (<Celery tasks at 0x7f8a0a70dd30>, )
 inq = {_SimpleQueue} <billiard.queues._SimpleQueue object at 0x7f8a0b66aba8>
 inqW_fd = {int} 7
 max_memory_per_child = {NoneType} None
 maxtasks = {NoneType} None
 on_ready_counter = {Synchronized} <Synchronized wrapper for c_int(0)>
 outq = {_SimpleQueue} <billiard.queues._SimpleQueue object at 0x7f8a0b6844a8>
 outqR_fd = {int} 8
 sigprotection = {bool} False
 synq = {NoneType} None
 synqR_fd = {NoneType} None
 synqW_fd = {NoneType} None
 wrap_exception = {bool} True

The logic of AsynPool simplified version is as follows:

Note in the following figure:

Worker is the logical business code of sub process_ Pool is a process pool, ForkProcess (that is, WorkerProcess) is a sub process abstraction, and each sub process abstraction will run worker. These logical concepts must be clearly distinguished.

+--------------------------+
| AsynPool                 |
|                          |
|                          |
|         ResultHandler  +------->  celery.concurrency.asynpool.ResultHandler
|                          |
|         Supervisor     +------->  billiard.pool.Supervisor
|                          |
|         TaskHandler    +------->  billiard.pool.TaskHandler
|                          |
|         TimeoutHandler +------->  billiard.pool.TimeoutHandler
|                          |
|         Worker         +------->  celery.concurrency.asynpool.Worker
|                          |
|         _pool +-----------------+--->  <ForkProcess(ForkPoolWorker-1, started daemon)>
+--------------------------+      |
                                  +--->  <ForkProcess(ForkPoolWorker-2, started daemon)>
                                  |
                                  +--->  <ForkProcess(ForkPoolWorker-3, started daemon)>
                                  |
                                  +--->  <ForkProcess(ForkPoolWorker-4, started daemon)>

Mobile phones are as follows

The logic of fine version is as follows:

    +------------------------------+                                                                     +----------------+
    | Pool(bootsteps.StartStopStep)|                                          +-----------------+        | Connection     |
    +-------------+--------------+                                            |  _SimpleQueue   |        |                |
                  |                                                           |                 |        |        _write  |
                  |                                                           |      _reader +---------> |        _read   |
                  |                                                           |                 |        |        _send   |
                1 | instantiate                                               |                 |        |        _recv   |
                  |                                                           |                 |        |        _handle+---> {int} 8  <-+
 2 on_start       |                                                           |                 |        +----------------+               |
                  |                                                           |      _poll   +--------->  _ConnectionBase.poll            |
+-------------+   |                                                           |                 |                                         |
|             |   |                                                           |                 |        +----------------+               |
|             |   v                                                           |      _writer +---------> | Connection     |               |
|         +---+---+-----------+                                               |                 |        |                |               |
|         |    TaskPool       |                                               +-------+---------+        |       _handle+----> {int} 7    |
|         |                   |      +------+                                         ^                  |                |               |
|         |       app   +----------> |celery|                                         |                  +----------------+      ^        |
|         |                   |      +------+                                         |                                          |        |
|         |                   |                                                       +                                          |        |
|         |                   |      +--------------------------+     +---->  (<_SimpleQueue>, <_SimpleQueue>)                   |        |
|         |      _pool  +----------> | AsynPool                 |     |                                                          |        |
|         |                   |      |                          |     |                                                          |        |
|         +---+---------------+      |               _queues +------->----->  (<_SimpleQueue>, <_SimpleQueue>)                   |        |
|             ^                      |                          |     |                                                          |        |
|             |                      |                          |     |                                                          |        |
|             |                      |                          |     +---->  (<_SimpleQueue>, <_SimpleQueue>)                   |        |
+-------------+                      |                          |     |                                                          |        |
                                     |                          |     |                                                          |        |
                                     +--------------------------+     +---->  (<_SimpleQueue>, <_SimpleQueue>)                   |        |
                                                                                                                                 |        |
                                                                     +----------------------+                                    |        |
                                                                     |                      |                                    |        |
                                                                     | Worker     inq       |                                    |        |
                                                                     |                      |                                    |        |
                                                                     |            outq      |                                    |        |
                                                                     |                      |                                    |        |
                                                                     |            synq      |                                    |        |
                                                                     |                      |                                    |        |
                                                                     |         inqW_fd +-----------------------------------------+        |
                                                                     |                      |                                             |
                                                                     |         outqR_fd  +------------------------------------------------+
                                                                     |                      |
                                                                     |         workloop     |
                                                                     |                      |
                                                                     |        after_fork    |
                                                                     |                      |
                                                                     +----------------------+

Mobile phones are as follows:

3.3.1.2 sub process Abstract encapsulation - WorkerProcess

WorkerProcess is actually an abstract encapsulation of subprocesses from fork. It is used to conveniently and quickly manage sub processes and abstract them into a process pool. This w will record some meta information of sub processes from fork, such as pid, fd of pipeline reading and writing, etc., and register them in the main process, which can be used by the main process for task distribution;

The role of WorkerProcess is to encapsulate ForkProcess. ForkProcess is defined as follows:

class ForkProcess(process.BaseProcess):
    _start_method = 'fork'

    @staticmethod
    def _Popen(process_obj):
        from .popen_fork import Popen
        return Popen(process_obj)
3.3.1.2.1 specific implementation of workerprocess

The specific implementation of WorkerProcess is as follows:

def WorkerProcess(self, worker):
    worker = super().WorkerProcess(worker)
    worker.dead = False
    return worker

First execute the code in the base class, so finally return ForkProcess:

def Process(self, *args, **kwds):
    return self._Process(*args, **kwds)

def WorkerProcess(self, worker):
    return worker.contribute_to_object(self.Process(target=worker))

In self_ In the process (* args, * * kwds) call, the related variables are:

self._Process = {type} <class 'billiard.context.ForkProcess'>
args = {tuple: 0} ()
kwds = {dict: 1} {'target': <celery.concurrency.asynpool.Worker object at 0x7f9c306326a0>}
self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7f9c30604da0>

So it calls the ForkProcess(process.BaseProcess) base class.

3.3.1.2.2 base class BaseProcess

The BaseProcess base class is as follows. Note that run here is the loop of the child process_ target is the running code of the child process.

 _target = {Worker} <celery.concurrency.asynpool.Worker object at 0x7f9ad358b240>

It is defined as follows:

class BaseProcess(object):
    '''
    Process objects represent activity that is run in a separate process
    The class is analagous to `threading.Thread`
    '''

    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, daemon=None, **_kw):
 
        count = next(_process_counter)
        self._identity = _current_process._identity + (count, )
        self._config = _current_process._config.copy()
        self._parent_pid = os.getpid()
        self._popen = None
        self._target = target
        self._args = tuple(args)
        self._kwargs = dict(kwargs)
        self._name = (
            name or type(self).__name__ + '-' +
            ':'.join(str(i) for i in self._identity)
        )
        if daemon is not None:
            self.daemon = daemon
        if _dangling is not None:
            _dangling.add(self)
        
        self._controlled_termination = False

    def run(self):
        '''
        Method to be run in sub-process; can be overridden in sub-class
        '''
        if self._target:
            self._target(*self._args, **self._kwargs)
               

After the base class is processed, ForkProcess is obtained

self = {ForkProcess} <ForkProcess(ForkProcess-1, initial)>
 authkey = {AuthenticationString: 32} b''
 daemon = {bool} False
 exitcode = {NoneType} None
 ident = {NoneType} None
 name = {str} 'ForkProcess-1'
 pid = {NoneType} None
  _args = {tuple: 0} ()
  _authkey = {AuthenticationString: 32} 
  _children = {set: 0} set()
  _config = {dict: 2} {'authkey': b'', 'semprefix': '/mp'}
  _counter = {count} count(2)
  _daemonic = {bool} False
  _identity = {tuple: 1} 1
  _kwargs = {dict: 0} {}
  _name = {str} 'ForkProcess-1'
  _parent_pid = {int} 14747
  _popen = {NoneType} None
  _start_method = {str} 'fork'
  _target = {Worker} <celery.concurrency.asynpool.Worker object at 0x7f9ad358b240>
  _tempdir = {NoneType} None
3.3.1.2.3 adding process list

After the child process is generated, self_ pool. The function of append (W) is to add the child process to the list of child processes in the parent process. And configure queues.

def _create_worker_process(self, i):
    sentinel = self._ctx.Event() if self.allow_restart else None
    inq, outq, synq = self.get_process_queues()
    on_ready_counter = self._ctx.Value('i')
    
    w = self.WorkerProcess(self.Worker(
        inq, outq, synq, self._initializer, self._initargs,
        self._maxtasksperchild, sentinel, self._on_process_exit,
        # Need to handle all signals if using the ipc semaphore,
        # to make sure the semaphore is released.
        sigprotection=self.threads,
        wrap_exception=self._wrap_exception,
        max_memory_per_child=self._max_memory_per_child,
        on_ready_counter=on_ready_counter,
    ))
    
    self._pool.append(w) # Run here.
    self._process_register_queues(w, (inq, outq, synq)) #Here we are

The variables are as follows:

self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7f9ad36680f0>
 ResultHandler = {type} <class 'celery.concurrency.asynpool.ResultHandler'>
 SoftTimeLimitExceeded = {type} <class 'billiard.exceptions.SoftTimeLimitExceeded'>
 Supervisor = {type} <class 'billiard.pool.Supervisor'>
 TaskHandler = {type} <class 'billiard.pool.TaskHandler'>
 TimeoutHandler = {type} <class 'billiard.pool.TimeoutHandler'>
 Worker = {type} <class 'celery.concurrency.asynpool.Worker'>
 ......
 outbound_buffer = {deque: 0} deque([])
 readers = {dict: 0} {}
 restart_state = {restart_state} <billiard.common.restart_state object at 0x7f9ad3668e80>
 sched_strategy = {int} 4
 timers = {dict: 1} {<bound method Pool.maintain_pool of <celery.concurrency.asynpool.AsynPool object at 0x7f9ad36680f0>>: 5.0}

 write_stats = {Counter: 0} Counter()
 _Process = {type} <class 'billiard.context.ForkProcess'>
 _active_writers = {set: 0} set()
 _active_writes = {set: 0} set()
 _all_inqueues = {set: 0} set()
 _busy_workers = {set: 0} set()
 _cache = {dict: 0} {}
 _ctx = {ForkContext} <billiard.context.ForkContext object at 0x7f9ad27ad828>
 _fileno_to_inq = {dict: 0} {}
 _fileno_to_outq = {dict: 0} {}
 _fileno_to_synq = {dict: 0} {}
 _initargs = {tuple: 2} (<Celery myTest at 0x7f9ad270c128>, 'celery@me2koreademini')
 _inqueue = {NoneType} None
 _max_memory_per_child = {NoneType} None
 _maxtasksperchild = {NoneType} None
 _on_ready_counters = {dict: 0} {}
 _outqueue = {NoneType} None
 _poll_result = {NoneType} None
 _pool = {list: 1} [<ForkProcess(ForkPoolWorker-1, initial daemon)>]
 _poolctrl = {dict: 0} {}
 _proc_alive_timeout = {float} 4.0
 _processes = {int} 4
 _putlock = {LaxBoundedSemaphore} <LaxBoundedSemaphore at 0x7f9ad354db70 value:4 waiting:0>
 _queues = {dict: 4} {(<billiard.queues._SimpleQueue object at 0x7f9ad35acef0>, <billiard.queues._SimpleQueue object at 0x7f9ad3668160>, None): <ForkProcess(ForkPoolWorker-1, initial daemon)>, (<billiard.queues._SimpleQueue object at 0x7f9ad36684a8>, <billiard.queues._SimpleQu
 _quick_get = {NoneType} None
 _quick_put = {NoneType} None
 _state = {int} 0
 _taskqueue = {Queue} <queue.Queue object at 0x7f9ad2a30908>
 _waiting_to_start = {set: 0} set()
 _wrap_exception = {bool} True
sentinel = {NoneType} None
synq = {NoneType} None
3.3.1.3 fork process

w.start() contains a specific fork procedure.

def _create_worker_process(self, i):
    sentinel = self._ctx.Event() if self.allow_restart else None
    inq, outq, synq = self.get_process_queues()
    on_ready_counter = self._ctx.Value('i')
    
    w = self.WorkerProcess(self.Worker(
        inq, outq, synq, self._initializer, self._initargs,
        self._maxtasksperchild, sentinel, self._on_process_exit,
        # Need to handle all signals if using the ipc semaphore,
        # to make sure the semaphore is released.
        sigprotection=self.threads,
        wrap_exception=self._wrap_exception,
        max_memory_per_child=self._max_memory_per_child,
        on_ready_counter=on_ready_counter,
    ))
    self._pool.append(w)
    self._process_register_queues(w, (inq, outq, synq))
    w.name = w.name.replace('Process', 'PoolWorker')
    w.daemon = True
    w.index = i
    w.start() # At this point, we will fork.
    self._poolctrl[w.pid] = sentinel
    self._on_ready_counters[w.pid] = on_ready_counter
    if self.on_process_up:
        self.on_process_up(w)
    return w

The specific codes are as follows:

class BaseProcess(object):
    '''
    Process objects represent activity that is run in a separate process
    The class is analagous to `threading.Thread`
    '''

    def start(self):
        '''
        Start child process
        '''
        assert self._popen is None, 'cannot start a process twice'
        assert self._parent_pid == os.getpid(), \
            'can only start a process object created by current process'
        _cleanup()
        self._popen = self._Popen(self)
        self._sentinel = self._popen.sentinel
        _children.add(self)

The main one is self_ popen = self._ Popen (self) is more important. Let's take a look at the source code of Popen_ launch:

class ForkProcess(process.BaseProcess):
    _start_method = 'fork'

    @staticmethod
    def _Popen(process_obj):
        from .popen_fork import Popen
        return Popen(process_obj)

Code in: / billiard/popen_fork.py.

See here, we should understand. When the launch method is executed, OS Fork() derives a sub process, and uses ps.pipe() to create a pair of read-write pipes. Then, by comparing whether [self.pid] is 0, different logic is executed in the main process and sub process:

  • The subprocess closes the read pipeline and then executes process_obj._bootstrap() method.
  • The parent process closes the write pipeline and records the fd of the read pipeline.
class Popen(object):
    method = 'fork'
    sentinel = None

    def __init__(self, process_obj):
        sys.stdout.flush()
        sys.stderr.flush()
        self.returncode = None
        self._launch(process_obj)

    def duplicate_for_child(self, fd):
        return fd

    def poll(self, flag=os.WNOHANG):
        if self.returncode is None:
            while True:
                try:
                    pid, sts = os.waitpid(self.pid, flag)
                except OSError as e:
                    if e.errno == errno.EINTR:
                        continue
                    # Child process not yet created. See #1731717
                    # e.errno == errno.ECHILD == 10
                    return None
                else:
                    break
            if pid == self.pid:
                if os.WIFSIGNALED(sts):
                    self.returncode = -os.WTERMSIG(sts)
                else:
                    assert os.WIFEXITED(sts)
                    self.returncode = os.WEXITSTATUS(sts)
        return self.returncode

    def _launch(self, process_obj):
        code = 1
        parent_r, child_w = os.pipe()
        self.pid = os.fork()
        if self.pid == 0:
            try:
                os.close(parent_r)
                if 'random' in sys.modules:
                    import random
                    random.seed()
                code = process_obj._bootstrap()
            finally:
                os._exit(code)
        else:
            os.close(child_w)
            self.sentinel = parent_r

3.4.2 auxiliary management Supervisor

The Supervisor class will maintain the thread pool regularly, such as whether dynamic scaling is required.

class Supervisor(PoolThread):

    def __init__(self, pool):
        self.pool = pool
        super(Supervisor, self).__init__()

    def body(self):
 
        time.sleep(0.8)
        pool = self.pool

        try:
            # do a burst at startup to verify that we can start
            # our pool processes, and in that time we lower
            # the max restart frequency.
            prev_state = pool.restart_state
            pool.restart_state = restart_state(10 * pool._processes, 1)
            for _ in range(10):
                if self._state == RUN and pool._state == RUN:
                    pool._maintain_pool()
                    time.sleep(0.1)

            # Keep maintaing workers until the cache gets drained, unless
            # the pool is termianted
            pool.restart_state = prev_state
            while self._state == RUN and pool._state == RUN:
                pool._maintain_pool()
                time.sleep(0.8)
        except RestartFreqExceeded:
            pool.close()
            pool.join()
            raise

3.3.3 assigning tasks to subprocesses -- TaskHandler

This class is responsible for the specific business, that is, passing the task message from the parent process to the child process.

In the previous task handler, the important point is

  • Put self_ The taskqueue is passed in, so that the task message can be delivered through this in the future_ Taskqueue is a simple data structure application used to buffer messages between the Celery Consumer worker and the pool.

  • Put self_ quick_ Put is passed in and assigned to put, that is, put points to self_ inqueue. put;

  • In this way, TaskHandler gives the pipeline before the parent-child process through put(task)_ Inqueue sends a message. In other words, within TaskHandler, if the parent process receives a message, it passes self_ inqueue. The put function of this pipeline sends messages to its child processes. self._taskqueue is just an intermediate variable.

At this time, the sources of various queue s are:

self._taskqueue = Queue()

def _setup_queues(self):
        self._inqueue = Queue()
        self._outqueue = Queue()
        self._quick_put = self._inqueue.put
        self._quick_get = self._outqueue.get
        
self._task_handler = self.TaskHandler(self._taskqueue,
                                          self._quick_put,
                                          self._outqueue,
                                          self._pool,
                                          self._cache)

So the variables during initialization are:

outqueue = {SimpleQueue} <billiard.queues.SimpleQueue object at 0x000001B55131AE88>

pool = {list: 8} [<SpawnProcess(SpawnPoolWorker-1, started daemon)>, <SpawnProcess(SpawnPoolWorker-2, started daemon)>, <SpawnProcess(SpawnPoolWorker-3, started daemon)>, <SpawnProcess(SpawnPoolWorker-4, started daemon)>, <SpawnProcess(SpawnPoolWorker-5, started daemon)>, <SpawnProcess(SpawnPoolWorker-6, started daemon)>, <SpawnProcess(SpawnPoolWorker-7, started daemon)>, <SpawnProcess(SpawnPoolWorker-8, started daemon)>]

put = {method} <bound method _ConnectionBase.send of <billiard.connection.PipeConnection object at 0x000001B55131AF08>>

self = {TaskHandler} Unable to get repr for <class 'billiard.pool.TaskHandler'>
taskqueue = {Queue} <queue.Queue object at 0x000001B551334308>

The code of the simplified version of TaskHandler is as follows:

class TaskHandler(PoolThread):

    def __init__(self, taskqueue, put, outqueue, pool, cache):
        self.taskqueue = taskqueue
        self.put = put
        self.outqueue = outqueue
        self.pool = pool
        self.cache = cache
        super(TaskHandler, self).__init__()

    def body(self):
        cache = self.cache
        taskqueue = self.taskqueue
        put = self.put

        for taskseq, set_length in iter(taskqueue.get, None):
            task = None
            i = -1
            try:
                for i, task in enumerate(taskseq):
                    if self._state:
                        break
                     put(task)
                else:
                    if set_length:
                        set_length(i + 1)
                    continue
                break

        self.tell_others()

    def tell_others(self):
        outqueue = self.outqueue
        put = self.put
        pool = self.pool

        try:
            # tell result handler to finish when cache is empty
            outqueue.put(None)

            # tell workers there is no more work
            for p in pool:
                put(None)

    def on_stop_not_started(self):
        self.tell_others()

At this time, the logic is:

Note: the Worker scope in the figure here is Celery / apps / worker Py, which belongs to the logical category of celery, is not a related concept of sub process (the same as the following figures). There are multiple classes with the same name in celery, which is very tangled.

                           +
    Consumer               |
                   message |
                           v         strategy  +------------------------------------+
              +------------+------+            | strategies                         |
              | on_task_received  | <--------+ |                                    |
              |                   |            |[myTest.add : task_message_handler] |
              +------------+------+            +------------------------------------+
                           |
                           |
   +------------------------------------------------------------------------------------+
   strategy                |
                           |
                           |
                           v                Request [myTest.add]
              +------------+-------------+                       +---------------------+
              | task_message_handler     | <-------------------+ | create_request_cls  |
              |                          |                       |                     |
              +------------+-------------+                       +---------------------+
                           | _process_task_sem
                           |
  +------------------------------------------------------------------------------------+
   Worker                  | req[{Request} myTest.add]
                           v
                  +--------+-----------+
                  | WorkController     |
                  |                    |
                  |            pool +-------------------------+
                  +--------+-----------+                      |
                           |                                  |
                           |               apply_async        v
               +-----------+----------+                   +---+-------------------+
               |{Request} myTest.add  | +---------------> | TaskPool              |
               +----------------------+                   +----+------------------+
                                          myTest.add           |
                                                               |
+--------------------------------------------------------------------------------------+
                                                               |
                                                               v
                                                          +----+------------------+
                                                          | billiard.pool.Pool    |
                                                          +-------+---------------+
                                                                  |
                                                                  |
 Pool              +---------------------------+                  |
                   | TaskHandler               |                  |
                   |                           |                  |  self._taskqueue.put
                   |              _taskqueue   |  <---------------+
                   |                           |
                   +---------------------------+

Mobile phones are as follows:

3.4.3 processing subprocess return - ResultHandler

The parent process uses ResultHandler to handle the running return of the child process.

def create_result_handler(self):
    return super().create_result_handler(
        fileno_to_outq=self._fileno_to_outq,
        on_process_alive=self.on_process_alive,
    )

class ResultHandler(_pool.ResultHandler):
    """Handles messages from the pool processes."""

    def __init__(self, *args, **kwargs):
        self.fileno_to_outq = kwargs.pop('fileno_to_outq')
        self.on_process_alive = kwargs.pop('on_process_alive')
        super().__init__(*args, **kwargs)
        # add our custom message handler
        self.state_handlers[WORKER_UP] = self.on_process_alive

Specific variables are as follows:

ResultHandler = {type} <class 'celery.concurrency.asynpool.ResultHandler'>
 daemon = {property} <property object at 0x7f847454d638>
  fdel = {NoneType} None
 exitcode = {property} <property object at 0x7f8475c9e8b8>
  fdel = {NoneType} None
  fset = {NoneType} None
 ident = {property} <property object at 0x7f847454d4f8>
  fdel = {NoneType} None
  fset = {NoneType} None
 name = {property} <property object at 0x7f847454d598>
  fdel = {NoneType} None
  _initialized = {bool} False

The specific code is as follows. You can see the use of poll to block the waiting message.

    def _process_result(self, timeout=1.0):
        poll = self.poll
        on_state_change = self.on_state_change

        while 1:
            try:
                ready, task = poll(timeout)

            if ready:
                on_state_change(task)
                if timeout != 0:  # blocking
                    break
            else:
                break
            yield

    def handle_event(self, fileno=None, events=None):
        if self._state == RUN:
            if self._it is None:
                self._it = self._process_result(0)  # non-blocking
            try:
                next(self._it)
            except (StopIteration, CoroStop):
                self._it = None

The specific poll corresponds to_ poll_result is self_ outqueue._ reader. poll(timeout).

It can be seen that it is blocked on the outqueue, which is the pipeline outgoing interface of the child process.

def _setup_queues(self):
    self._inqueue = self._ctx.SimpleQueue()
    self._outqueue = self._ctx.SimpleQueue()
    self._quick_put = self._inqueue._writer.send
    self._quick_get = self._outqueue._reader.recv

    def _poll_result(timeout):
        if self._outqueue._reader.poll(timeout):
            return True, self._quick_get()
        return False, None
    self._poll_result = _poll_result

So the logic is as follows:

                           +
    Consumer               |
                   message |
                           v         strategy  +------------------------------------+
              +------------+------+            | strategies                         |
              | on_task_received  | <--------+ |                                    |
              |                   |            |[myTest.add : task_message_handler] |
              +------------+------+            +------------------------------------+
                           |
                           |
   +------------------------------------------------------------------------------------+
   strategy                |
                           |
                           |
                           v                Request [myTest.add]
              +------------+-------------+                       +---------------------+
              | task_message_handler     | <-------------------+ | create_request_cls  |
              |                          |                       |                     |
              +------------+-------------+                       +---------------------+
                           | _process_task_sem
                           |
  +------------------------------------------------------------------------------------+
   Worker                  | req[{Request} myTest.add]
                           v
                  +--------+-----------+
                  | WorkController     |
                  |                    |
                  |            pool +-------------------------+
                  +--------+-----------+                      |
                           |                                  |
                           |               apply_async        v
               +-----------+----------+                   +---+-------------------+
               |{Request} myTest.add  | +---------------> | TaskPool              |
               +----------------------+                   +----+------------------+
                                          myTest.add           |
                                                               |
+--------------------------------------------------------------------------------------+
                                                               |
                                                               v
                                                          +----+------------------+
                                                          | billiard.pool.Pool    |
                                                          +-------+---------------+
                                                                  |
                                                                  |
 Pool              +---------------------------+                  |
                   | TaskHandler               |                  |
                   |                           |                  |  self._taskqueue.put
                   |              _taskqueue   |  <---------------+
                   |                           |
                   +------------+--------------+
                                |
                                |  put(task)
                                |
                                |                       +------------------+
                                |                       |  ResultHandler   |
                                |                       +------------------+
                                |
                                |                                 ^
                                |                                 |
                                |                                 |
+--------------------------------------------------------------------------------------+
                                |                                 |
 Sub process                    |                                 |
                                v                                 +
                            self._inqueue                   self._outqueue

Mobile phones are as follows:

3.5 configure the relationship between file and queue

Finally, configure the relationship between file and queue according to the result of establishing sub process.

It can be seen that the relationship between outq and synq is configured here, that is, which child process these queue s point to.

The code is as follows:

class AsynPool(_pool.Pool):
    """AsyncIO Pool (no threads)."""

    def __init__(self, processes=None, synack=False,
                 sched_strategy=None, proc_alive_timeout=None,
                 *args, **kwargs):

        ......
        super().__init__(processes, *args, **kwargs)

        for proc in self._pool:
            # create initial mappings, these will be updated
            # as processes are recycled, or found lost elsewhere.
            self._fileno_to_outq[proc.outqR_fd] = proc
            self._fileno_to_synq[proc.synqW_fd] = proc

After the configuration is completed fd, it is:

self._fileno_to_outq = {dict: 4} 
 8 = {ForkProcess} <ForkProcess(ForkPoolWorker-1, started daemon)>
 12 = {ForkProcess} <ForkProcess(ForkPoolWorker-2, started daemon)>
 16 = {ForkProcess} <ForkProcess(ForkPoolWorker-3, started daemon)>
 20 = {ForkProcess} <ForkProcess(ForkPoolWorker-4, started daemon)>
 __len__ = {int} 4
    
self._fileno_to_synq = {dict: 1} {None: <ForkProcess(ForkPoolWorker-4, started daemon)>}

3.6 overall results of asynpool

The final result of AsynPool is as follows. We can see various internal variables, which can be understood according to the previous text:

self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7fe44f664128>
 ResultHandler = {type} <class 'celery.concurrency.asynpool.ResultHandler'>
 SoftTimeLimitExceeded = {type} <class 'billiard.exceptions.SoftTimeLimitExceeded'>
 Supervisor = {type} <class 'billiard.pool.Supervisor'>
 TaskHandler = {type} <class 'billiard.pool.TaskHandler'>
 TimeoutHandler = {type} <class 'billiard.pool.TimeoutHandler'>
 Worker = {type} <class 'celery.concurrency.asynpool.Worker'>
 allow_restart = {bool} False
 enable_timeouts = {bool} True
 lost_worker_timeout = {float} 10.0
 max_restarts = {int} 100
 on_process_down = {NoneType} None
 on_process_up = {NoneType} None
 on_timeout_cancel = {NoneType} None
 on_timeout_set = {NoneType} None
 outbound_buffer = {deque: 0} deque([])
 process_sentinels = {list: 4} [25, 27, 29, 31]
 putlocks = {bool} False
 readers = {dict: 0} {}
 restart_state = {restart_state} <billiard.common.restart_state object at 0x7fe44f6644a8>
 sched_strategy = {int} 4
 soft_timeout = {NoneType} None
 synack = {bool} False
 threads = {bool} False
 timeout = {NoneType} None
 timers = {dict: 1} {<bound method Pool.maintain_pool of <celery.concurrency.asynpool.AsynPool object at 0x7fe44f664128>>: 5.0}
 write_stats = {Counter: 0} Counter()
  _Process = {type} <class 'billiard.context.ForkProcess'>
  _active_writers = {set: 0} set()
  _active_writes = {set: 0} set()
  _all_inqueues = {set: 0} set()
  _busy_workers = {set: 0} set()
  _cache = {dict: 0} {}
  _ctx = {ForkContext} <billiard.context.ForkContext object at 0x7fe44e7ac7f0>
  _fileno_to_inq = {dict: 0} {}
  _fileno_to_outq = {dict: 4} {8: <ForkProcess(ForkPoolWorker-1, started daemon)>, 12: <ForkProcess(ForkPoolWorker-2, started daemon)>, 16: <ForkProcess(ForkPoolWorker-3, started daemon)>, 20: <ForkProcess(ForkPoolWorker-4, stopped[SIGABRT] daemon)>}
  _fileno_to_synq = {dict: 1} {None: <ForkProcess(ForkPoolWorker-4, stopped[SIGABRT] daemon)>}
  _initargs = {tuple: 2} (<Celery myTest at 0x7fe44e61cb38>, 'celery@me2koreademini')
  _inqueue = {NoneType} None
  _max_memory_per_child = {NoneType} None
  _maxtasksperchild = {NoneType} None
  _on_ready_counters = {dict: 4} {14802: <Synchronized wrapper for c_int(0)>, 14803: <Synchronized wrapper for c_int(0)>, 14804: <Synchronized wrapper for c_int(0)>, 14806: <Synchronized wrapper for c_int(0)>}
  _outqueue = {NoneType} None
  _poll_result = {NoneType} None
  _pool = {list: 4} [<ForkProcess(ForkPoolWorker-1, started daemon)>, <ForkProcess(ForkPoolWorker-2, started daemon)>, <ForkProcess(ForkPoolWorker-3, started daemon)>, <ForkProcess(ForkPoolWorker-4, stopped[SIGABRT] daemon)>]
  _poolctrl = {dict: 4} {14802: None, 14803: None, 14804: None, 14806: None}
  _proc_alive_timeout = {float} 4.0
  _processes = {int} 4
  _putlock = {LaxBoundedSemaphore} <LaxBoundedSemaphore at 0x7fe44f54bf98 value:4 waiting:0>
  _queues = {dict: 4} {(<billiard.queues._SimpleQueue object at 0x7fe44f664160>, <billiard.queues._SimpleQueue object at 0x7fe44f664240>, None): <ForkProcess(ForkPoolWorker-1, started daemon)>, (<billiard.queues._SimpleQueue object at 0x7fe44f664550>, <billiard.queues._SimpleQu
  _quick_get = {NoneType} None
  _quick_put = {NoneType} None
  _result_handler = {ResultHandler} <ResultHandler(Thread-170, initial daemon)>
  _state = {int} 0
  _task_handler = {TaskHandler} <TaskHandler(Thread-168, initial daemon)>
  _taskqueue = {Queue} <queue.Queue object at 0x7fe44f664978>
  _terminate = {Finalize} <Finalize object, callback=_terminate_pool, args=(<queue.Queue object at 0x7fe44f664978>, None, None, [<ForkProcess(ForkPoolWorker-1, started daemon)>, <ForkProcess(ForkPoolWorker-2, started daemon)>, <ForkProcess(ForkPoolWorker-3, started daemon)>, <ForkP
  _timeout_handler = {TimeoutHandler} <TimeoutHandler(Thread-169, initial daemon)>
  _timeout_handler_mutex = {DummyLock} <kombu.asynchronous.semaphore.DummyLock object at 0x7fe44f6cb7b8>
  _timeout_handler_started = {bool} False
  _waiting_to_start = {set: 0} set()
  _worker_handler = {Supervisor} <Supervisor(Thread-151, initial daemon)>
  _wrap_exception = {bool} True

Therefore, the final figure of this article is as follows, in which worker is the working code of sub process and ForkProcess is the abstraction of sub process (only one is shown here):

    +------------------------------+                                                                     +----------------+
    | Pool(bootsteps.StartStopStep)|                                          +-----------------+        | Connection     |
    +-------------+--------------+                                            |  _SimpleQueue   |        |                |
                  |                                                           |                 |        |        _write  |
                  |                                                           |      _reader +---------> |        _read   |
                  |                                                           |                 |        |        _send   |
                1 | instantiate                                               |                 |        |        _recv   |
                  |                                                           |                 |        |        _handle+---> {int} 8  <-+
 2 on_start       |                                                           |                 |        +----------------+               |
                  |                                                           |      _poll   +--------->  _ConnectionBase.poll            |
+-------------+   |                                                           |                 |                                         |
|             |   |                                                           |                 |        +----------------+               |
|             |   v                                                           |      _writer +---------> | Connection     |               |
|         +---+---+-----------+                                               |                 |        |                |               |
|         |    TaskPool       |                                               +-------+---------+        |       _handle+----> {int} 7    |
|         |                   |      +------+                                         ^                  |                |               |
|         |       app   +----------> |celery|                                         |                  +----------------+      ^        |
|         |                   |      +------+                                         |                                          |        |
|         |                   |                                                       +                                          |        |
|         |                   |      +--------------------------+     +---->  (<_SimpleQueue>, <_SimpleQueue>)                   |        |
|         |      _pool  +----------> | AsynPool                 |     |                                                          |        |
|         |                   |      |                          |     |                                                          |        |
|         +---+---------------+      |               _queues +------->----->  (<_SimpleQueue>, <_SimpleQueue>)                   |        |
|             ^                      |                          |     |                                                          |        |
|             |                      |          _fileno_to_inq  |     |                                                          |        |
|             |                      |                          |     +---->  (<_SimpleQueue>, <_SimpleQueue>)                   |        |
+-------------+                      |         _fileno_to_outq +--+   |                                                          |        |
                                     |                          | |   |                                                          |        |
                                     |          _queues[queues] | |   +---->  (<_SimpleQueue>, <_SimpleQueue>)                   |        |
                                     |                       +  | |                                                              |        |
                                     |               _pool   |  | |  +----------------------+                                    |        |
                                     |                +      |  | |  |                      |                                    |        |
                                     +--------------------------+ |  | Worker     inq       |                                    |        |
                                                      |      |    |  |                      |                                    |        |
                                                      |      |    |  |            outq      |                                    |        |
                                       2.1 append(w)  |      |    |  |                      |                                    |        |
                                                      |      |    |  |            synq      |                                    |        |
                                                      v      |    |  |                      |                                    |        |
                                        +-------------+--+   |    |  |         inqW_fd +-----------------------------------------+        |
                                        |                | <-+    |  |                      |                                             |
                                        |  ForkProcess   |        |  |         outqR_fd  +------------------------------------------------+
                                        |                | <------+  |                      |
                                        |                |           |         workloop     |
                                        |     _target +------------> |                      |
                                        |                |           |        after_fork    |
                                        |                |           |                      |
                                        +----------------+           +----------------------+

Mobile phones are as follows:

0xFF reference

Celery source code learning (II) multi process model

celery source code analysis - worker initialization analysis (Part 2)

★★★★★★★ thinking about life and technology ★★★★★★

Wechat public account: Rossi's thinking

If you want to get a personal message or push the technical information, you can scan the following two-dimensional code (or long Click to identify the two-dimensional code) and pay attention to the official account number.

Keywords: Python Celery Distribution

Added by new2phpcode on Mon, 07 Mar 2022 21:27:03 +0200