[source code analysis] multi process architecture and model of parallel distributed task queue Celery
0x00 summary
Celery is a simple, flexible and reliable distributed system that processes a large number of messages. It focuses on asynchronous task queue for real-time processing, and also supports task scheduling. Because celery improves execution efficiency through multiple processes, this article will lead you to a preliminary understanding of celery's multi process architecture and model.
Through this article, you can understand what thoughts and abstractions Celery has made in order to implement a multi process architecture, such as:
- As a whole, how does Celery integrate the multi process model to get the process pool;
- How to instantiate different multi process models according to different OS;
- How to establish the communication mechanism between parent-child processes and how to separate reading and writing;
- How to generate subprocesses, what is the working logic of subprocesses, and how to abstract subprocesses;
- How to assist in managing sub processes;
- How to assign tasks to child processes;
- How to handle the return of subprocess;
Let's give a rough logic first, so that you can have a rough logic for subsequent understanding. Note in the following figure:
- TaskHandler is the logic that the parent process assigns tasks to the child process;
- ResultHandler is the logic that the parent process handles the return of the child process;
- Supervisor is the assistant management handler;
- Worker is the logical business code of sub process_ Pool is a process pool, ForkProcess (that is, WorkerProcess) is a sub process abstraction, and each sub process abstraction will run worker.
These logical concepts must be clearly distinguished.
+--------------------------+ | AsynPool | | | | | | ResultHandler +-------> celery.concurrency.asynpool.ResultHandler | | | Supervisor +-------> billiard.pool.Supervisor | | | TaskHandler +-------> billiard.pool.TaskHandler | | | TimeoutHandler +-------> billiard.pool.TimeoutHandler | | | Worker +-------> celery.concurrency.asynpool.Worker | | | _pool +-----------------+---> <ForkProcess(ForkPoolWorker-1, started daemon)> +--------------------------+ | +---> <ForkProcess(ForkPoolWorker-2, started daemon)> | +---> <ForkProcess(ForkPoolWorker-3, started daemon)> | +---> <ForkProcess(ForkPoolWorker-4, started daemon)>
Mobile phones are as follows
The multi process entry is located in the pool step of the Consumer, so we start with the start of the Consumer component.
0x01 Consumer component Pool bootstep
First, the Consumer Pool starts with bootsteps. This Bootstep is the real execution engine of worker.
The reason why the Pool bootstrap here has another layer of encapsulation is that it needs to set a scaling value, that is, the so-called autoscaler. Because we have built various pools here. There are task s behind them. Just throw them directly into the Pool.
1.1 bootsteps
The code is located in: cell / worker / components py. This is an entrance to:
- Make various configurations;
- Introduce TaskPool, which is the entrance of worker multi process;
class Pool(bootsteps.StartStopStep): def __init__(self, w, autoscale=None, **kwargs): w.pool = None if isinstance(autoscale, str): max_c, _, min_c = autoscale.partition(',') autoscale = [int(max_c), min_c and int(min_c) or 0] w.autoscale = autoscale if w.autoscale: w.max_concurrency, w.min_concurrency = w.autoscale super().__init__(w, **kwargs) def create(self, w): semaphore = None max_restarts = None if w.app.conf.worker_pool in GREEN_POOLS: # pragma: no cover judge worker_pool is in 'eventlet' and 'gevent', and the default is prefork threaded = not w.use_eventloop or IS_WINDOWS # user_ Whether EventLoop is True and whether it is windows. If so, use threads procs = w.min_concurrency # The minimum number of buffer pools is 4 by default w.process_task = w._process_task # The of the worker_ process_ Bind task to process_task if not threaded: # Without threads semaphore = w.semaphore = LaxBoundedSemaphore(procs) # Atomic operation is realized through LaxBoundedSemaphore, which is realized by queue w._quick_acquire = w.semaphore.acquire # Assign relevant operation methods to worker w._quick_release = w.semaphore.release max_restarts = 100 # Maximum number of restarts if w.pool_putlocks and w.pool_cls.uses_semaphore: # Check whether the class configuration updates the process_task method w.process_task = w._process_task_sem # Default configuration update process_task allow_restart = w.pool_restarts # Allow restart pool = w.pool = self.instantiate( w.pool_cls, w.min_concurrency, # w.pool_cls defaults to prefork TaskPool initargs=(w.app, w.hostname), maxtasksperchild=w.max_tasks_per_child, max_memory_per_child=w.max_memory_per_child, timeout=w.time_limit, soft_timeout=w.soft_time_limit, putlocks=w.pool_putlocks and threaded, lost_worker_timeout=w.worker_lost_wait, threads=threaded, max_restarts=max_restarts, allow_restart=allow_restart, forking_enable=True, semaphore=semaphore, sched_strategy=self.optimization, app=w.app, ) _set_task_join_will_block(pool.task_join_will_block) return pool
Here is w.pool_cls is < class' cell concurrency. prefork. Taskpool '>, the logic is as follows:
+-------------------------------+ | Pool(bootsteps.StartStopStep) | | | | | | celery/worker/components.py | +---------------+---------------+ | | | v __init__ + | | | v create + | | v +--------+----------+ | TaskPool | | | | Pool +-------> celery.concurrency.asynpool.AsynPool | | | app +-------> Celery | | +-------------------+
0x02 process pool entry – TaskPool
TaskPool is a multi process portal, which is unified for all operating systems.
Because here is w.pool_cls is < class' cell concurrency. prefork. TaskPool '>, so the code comes to TaskPool. During initialization, instantiate will first come to the base class BasePool, which is located at: cell / concurrency / base py.
2.1 process pool initialization
Here__ init__ Note: self App = app, that is, the cell application itself will be passed in during initialization.
class BasePool: """Task pool.""" Timer = timer2.Timer _state = None _pool = None def __init__(self, limit=None, putlocks=True, forking_enable=True, callbacks_propagate=(), app=None, **options): self.limit = limit self.putlocks = putlocks self.options = options self.forking_enable = forking_enable self.callbacks_propagate = callbacks_propagate self.app = app
2.2 process pool start
Blueprint calls start.
class Blueprint: def start(self, parent): self.state = RUN if self.on_start: self.on_start() for i, step in enumerate(s for s in parent.steps if s is not None): self.started = i + 1 step.start(parent)
Since the start function is not declared in TaskPool, the function defined in its parent class BasePool will be called here, as defined below
class BasePool(object): """Task pool.""" def start(self): self._does_debug = logger.isEnabledFor(logging.DEBUG) self.on_start() self._state = self.RUN
On will be called here_ Start function. Since each subclass overrides this function, it will call on in the subclass_ Start function. Similarly, take TaskPool as an example, on_ The start function is defined as follows
class TaskPool(BasePool): """Multiprocessing Pool implementation.""" Pool = AsynPool BlockingPool = BlockingPool uses_semaphore = True write_stats = None def on_start(self): forking_enable(self.forking_enable) Pool = (self.BlockingPool if self.options.get('threads', True) else self.Pool) # If multithreading is used, use BlockingPool; otherwise, use AsynPool P = self._pool = Pool(processes=self.limit, initializer=process_initializer, on_process_exit=process_destructor, enable_timeouts=True, synack=False, **self.options) # Create Pool # Create proxy methods self.on_apply = P.apply_async # Set the method in the pool to the pool class self.maintain_pool = P.maintain_pool self.terminate_job = P.terminate_job self.grow = P.grow self.shrink = P.shrink self.flush = getattr(P, 'flush', None) # FIXME add to billiard
As you can see, on_ The start function mainly completes three tasks
- Determine whether BlockingPool or asynpool is used according to the option parameters (billiard.pool.Pool and celery.concurrency.asynpool.asyncpool respectively);
- Create a Pool;
- Create proxy method;
Here, in windows system, the corresponding_ Pool is < class' billiard pool. Then: mac pool '>, asyn system.
At this time, the specific logic is as follows:
+------------------------------+ | Pool(bootsteps.StartStopStep)| +-------------+--------------+ | | | 1 | instantiate | 2 on_start | +--------------------------------+ v | | +-------+--------+--+ | | TaskPool | | | | +------+ | | app +----------> |celery| | | | +------+ | | | | | | +-----------+ | | _pool +----------> | AsynPool | | | | +-----------+ | +----------------+--+ | ^ | | | +--------------------------------+
0x03 process pool implementation – AsynPool
_ pool is implemented separately according to different operating systems. You can see that pipes, file s, queue s, and real thread pools are configured here.
Assuming that the system is mac, we come to the AsynPool process pool. Location: Celery / concurrency / AsynPool py
3.1 instantiation
It mainly implements the instantiation of the process Pool. This instantiation is the concrete implementation of prefork. This Pool is actually AsyncPool.
The specific work is as follows:
-
Configure scheduling strategy;
-
According to the number of processes configured locally, that is, the number of sub processes that need to fork. The default is the number of cpu cores. If the - c parameter is set on the command line, it is the value of the - c parameter;
-
Create a bunch of read and write pipes. Depending on the flow direction and the difference between the main process and the child process, the pipes at the corresponding end will be closed respectively. For example, the parent process closes the write and the child process closes the read. It will be encapsulated with abstract data structure to facilitate management. The instance of this data structure is used to provide two-way data transmission for the main process and the sub process to be fork ed. Similarly, multiple pipeline instances will be created according to the number of child processes;
-
Call the base class constructor. Here is the key of fork;
-
Configure the relationship between file and queue according to the result of establishing sub process;
The code is as follows:
class AsynPool(_pool.Pool): """AsyncIO Pool (no threads).""" ResultHandler = ResultHandler Worker = Worker def WorkerProcess(self, worker): worker = super().WorkerProcess(worker) worker.dead = False return worker def __init__(self, processes=None, synack=False, sched_strategy=None, proc_alive_timeout=None, *args, **kwargs): self.sched_strategy = SCHED_STRATEGIES.get(sched_strategy, sched_strategy) processes = self.cpu_count() if processes is None else processes #The number of child processes that need to fork. The default is the number of cpu cores. If the - c parameter is specified on the command line, it is the value of the - c parameter self.synack = synack # create queue-pairs for all our processes in advance. self._queues = { self.create_process_queues(): None for _ in range(processes) #Create a bunch of read and write pipes } # inqueue fileno -> process mapping self._fileno_to_inq = {} # outqueue fileno -> process mapping self._fileno_to_outq = {} # synqueue fileno -> process mapping self._fileno_to_synq = {} # We keep track of processes that haven't yet # sent a WORKER_UP message. If a process fails to send # this message within _proc_alive_timeout we terminate it # and hope the next process will recover. self._proc_alive_timeout = ( PROC_ALIVE_TIMEOUT if proc_alive_timeout is None else proc_alive_timeout ) self._waiting_to_start = set() # denormalized set of all inqueues. self._all_inqueues = set() # Set of fds being written to (busy) self._active_writes = set() # Set of active co-routines currently writing jobs. self._active_writers = set() # Set of fds that are busy (executing task) self._busy_workers = set() self._mark_worker_as_available = self._busy_workers.discard # Holds jobs waiting to be written to child processes. self.outbound_buffer = deque() self.write_stats = Counter() super().__init__(processes, *args, **kwargs) #Call the base class constructor for proc in self._pool: # create initial mappings, these will be updated # as processes are recycled, or found lost elsewhere. self._fileno_to_outq[proc.outqR_fd] = proc self._fileno_to_synq[proc.synqW_fd] = proc self.on_soft_timeout = getattr( self._timeout_handler, 'on_soft_timeout', noop, ) self.on_hard_timeout = getattr( self._timeout_handler, 'on_hard_timeout', noop, )
3.2 establish communication mechanism
In the instantiation code, the establishment of queues needs to be emphasized, because the parent process and child process use queues for communication.
The code is as follows:
self._queues = { self.create_process_queues(): None for _ in range(processes) }
A pile of read and write pipelines are created here. The number of processes here is 4, so four groups of pipeline lists are established, and each group of list includes two_ SimpleQueue, as follows.
self._queues = {dict: 4} (<billiard.queues._SimpleQueue object>, <billiard.queues._SimpleQueue object at 0x = {NoneType} (<billiard.queues._SimpleQueue object>, <billiard.queues._SimpleQueue object at 0x = {NoneType} (<billiard.queues._SimpleQueue object>, <billiard.queues._SimpleQueue object at 0x = {NoneType} (<billiard.queues._SimpleQueue object>, <billiard.queues._SimpleQueue object at 0x = {NoneType} __len__ = {int} 4
The method of creating queues is as follows: INQ, outq and synq are established here:
def create_process_queues(self): """Create new in, out, etc. queues, returned as a tuple.""" # NOTE: Pipes must be set O_NONBLOCK at creation time (the original # fd), otherwise it won't be possible to change the flags until # there's an actual reader/writer on the other side. inq = _SimpleQueue(wnonblock=True) outq = _SimpleQueue(rnonblock=True) synq = None if self.synack: synq = _SimpleQueue(wnonblock=True) return inq, outq, synq
3.2.1 _SimpleQueue
_ SimpleQueue is a locked pipe, that is, a pipe. It is defined as follows:
class _SimpleQueue(object): ''' Simplified Queue type -- really just a locked pipe ''' def __init__(self, rnonblock=False, wnonblock=False, ctx=None): self._reader, self._writer = connection.Pipe( duplex=False, rnonblock=rnonblock, wnonblock=wnonblock, ) self._poll = self._reader.poll self._rlock = self._wlock = None
Examples of variables are as follows:
self._poll = {method} <bound method _ConnectionBase.poll of <billiard.connection.Connection self = {_SimpleQueue} <billiard.queues._SimpleQueue object at 0x7fc46ae049e8> _reader = {Connection} <billiard.connection.Connection object at 0x7fc46ae68c18> _writer = {Connection} <billiard.connection.Connection object at 0x7fc46ae726a0>
3.2.2 Pipe
Above_ Self. Of SimpleQueue_ reader, self._ The writer is of pipe type, so you need to see it.
pipe is defined as follows:
In fact, two connections are established and returned to_ SimpleQueue. One of these two connections is read abstraction and the other is write abstraction.
if sys.platform != 'win32': def Pipe(duplex=True, rnonblock=False, wnonblock=False): ''' Returns pair of connection objects at either end of a pipe ''' if duplex: s1, s2 = socket.socketpair() s1.setblocking(not rnonblock) s2.setblocking(not wnonblock) c1 = Connection(detach(s1)) c2 = Connection(detach(s2)) else: fd1, fd2 = os.pipe() if rnonblock: setblocking(fd1, 0) if wnonblock: setblocking(fd2, 0) c1 = Connection(fd1, writable=False) c2 = Connection(fd2, readable=False) return c1, c2
3.2.3 Connection
The above also involves Connection. Note that this is not the Connection of Kombu, but the Connection definition of multiple processes.
class Connection(_ConnectionBase): """ Connection class based on an arbitrary file descriptor (Unix only), or a socket handle (Windows). """
Connection is a connection class based on file descriptor.
class _ConnectionBase(object): _handle = None def __init__(self, handle, readable=True, writable=True): if isinstance(handle, _SocketContainer): self._socket = handle.sock # keep ref so not collected handle = handle.sock.fileno() handle = handle.__index__() self._handle = handle self._readable = readable self._writable = writable
Now the variables are as follows:
c1 = {Connection} <billiard.connection.Connection object at 0x7fc46ae68c18> c2 = {Connection} <billiard.connection.Connection object at 0x7fc46ae726a0>
So AsynPool ended up as follows:
+------------------------------+ +----------------+ | Pool(bootsteps.StartStopStep)| +-----------------+ | Connection | +-------------+--------------+ | _SimpleQueue | | | | | | | _write | | | _reader +---------> | _read | | | | | _send | 1 | instantiate | | | _recv | | | | | _handle | 2 on_start | | | +----------------+ | | _poll +---------> _ConnectionBase.poll +-------------+ | | | | | | | | +------------+ | | v | _writer +---------> | Connection | | +---+---+-----------+ | | +------------+ | | TaskPool | +-------+---------+ | | | +------+ ^ | | app +----------> |celery| | | | | +------+ | | | | + | | | +--------------------------+ +----> (<_SimpleQueue>, <_SimpleQueue>) | | _pool +----------> | AsynPool | | | | | | | | | +---+---------------+ | _queues +------->-----> (<_SimpleQueue>, <_SimpleQueue>) | ^ | | | | | | | | | | +--------------------------+ +----> (<_SimpleQueue>, <_SimpleQueue>) +-------------+ | | +----> (<_SimpleQueue>, <_SimpleQueue>)
Mobile phones are as follows:
3.3 construction method of process pool base class
Besides, the author said that this kind of process should be modified specifically for Celery. Various message processing functions and sub processes are established here.
Location: Billiard / pool py
The key work here is as follows:
-
Use self_ Process = self._ ctx. Process is set to < class' billiard context. ForkProcess'>;
-
Pass according to the number of child processes_ create_worker_process(i) establish sub processes;
-
Create self_ worker_ handler = self. Supervisor(self);
-
Create a TaskHandler for assigning tasks;
-
Create TimeoutHandler;
-
Create ResultHandler;
The specific codes are as follows:
class Pool(object): ''' Class which supports an async version of applying functions to arguments. ''' def __init__(self, processes=None, initializer=None, initargs=(),..., **kwargs): self._ctx = context or get_context() self._setup_queues() self._taskqueue = Queue() self._cache = {} self._state = RUN ..... self.readers = {} self._processes = self.cpu_count() if processes is None else processes self.max_restarts = max_restarts or round(self._processes * 100) self.restart_state = restart_state(max_restarts, max_restart_freq or 1) self._Process = self._ctx.Process self._pool = [] self._poolctrl = {} self._on_ready_counters = {} self.putlocks = putlocks self._putlock = semaphore or LaxBoundedSemaphore(self._processes) for i in range(self._processes): self._create_worker_process(i) self._worker_handler = self.Supervisor(self) if threads: self._worker_handler.start() self._task_handler = self.TaskHandler(self._taskqueue, self._quick_put, self._outqueue, self._pool, self._cache) if threads: self._task_handler.start() # Thread killing timedout jobs. if self.enable_timeouts: self._timeout_handler = self.TimeoutHandler( self._pool, self._cache, self.soft_timeout, self.timeout, ) self._timeout_handler_mutex = Lock() self._timeout_handler_started = False self._start_timeout_handler() # If running without threads, we need to check for timeouts # while waiting for unfinished work at shutdown. if not threads: self.check_timeouts = self._timeout_handler.handle_event # Thread processing results in the outqueue. self._result_handler = self.create_result_handler() self.handle_result_event = self._result_handler.handle_event if threads: self._result_handler.start() self._terminate = Finalize( self, self._terminate_pool, args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, self._worker_handler, self._task_handler, self._result_handler, self._cache, self._timeout_handler, self._help_stuff_finish_args()), exitpriority=15, )
Let's analyze them one by one.
3.3.1 establish sub process
The following code establishes a sub process.
for i in range(self._processes): self._create_worker_process(i)
_ create_ worker_ The main work of process is as follows:
-
inq, outq, synq = self.get_process_queues() gets an abstract object of a read and write pipeline. This pipeline was created in advance (created by self.create_process_queues() above). It is mainly used for the subprocess about to fork. The subprocess will listen to the read events in the abstract instance of the pipeline data structure, and can also write data from the write pipeline.
-
w. That is, self The instance of workerprocess is actually an abstract encapsulation of the child process from fork. It is used to conveniently and quickly manage sub processes and abstract them into a process pool. This w will record some meta information of sub processes from fork, such as pid, fd of pipeline reading and writing, etc., and register them in the main process, which can be used by the main process for task distribution;
-
Record the instance of WorkerProcess in self_ Pool, which is very important. The parent process uses this variable to know which child processes there are;
-
w.start() contains a specific fork process;
The code is as follows:
def _create_worker_process(self, i): sentinel = self._ctx.Event() if self.allow_restart else None inq, outq, synq = self.get_process_queues() on_ready_counter = self._ctx.Value('i') w = self.WorkerProcess(self.Worker( inq, outq, synq, self._initializer, self._initargs, self._maxtasksperchild, sentinel, self._on_process_exit, # Need to handle all signals if using the ipc semaphore, # to make sure the semaphore is released. sigprotection=self.threads, wrap_exception=self._wrap_exception, max_memory_per_child=self._max_memory_per_child, on_ready_counter=on_ready_counter, )) self._pool.append(w) self._process_register_queues(w, (inq, outq, synq)) w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.index = i w.start() self._poolctrl[w.pid] = sentinel self._on_ready_counters[w.pid] = on_ready_counter if self.on_process_up: self.on_process_up(w) return w
Because I mentioned self WorkerProcess (self. Worker...), so let's introduce WorkerProcess and worker respectively.
The logic is as follows:
+----------------+ | StartStopStep | +-------+--------+ | | start | v +-----------+-------------------+ | BasePool | | celery/concurrency/base.py | +-----------+-------------------+ | | start | v +-----------+-------------------+ | TaskPool | | celery/concurrency/prefork.py | +-----------+-------------------+ | | on_start | v +-----------+--------------------+ | AsynPool | | celery/concurrency/asynpool.py | +-----------+--------------------+ | | v +--------+------------+ | class Pool(object) | | billiard/pool.py | +--------+------------+ | +----+------+ | | v v +----------------------+ __init__ _create_worker_process +---> | class Worker(object) | +----------------------+
3.3.1.1 sub process working code
Worker is the working code of the child process. There are also several different implementation methods, such as:
celery.concurrency.asynpool.Worker,billiard/pool. Workers are all sub process work cycles.
In Billiard / pool Take worker as an example.
The main work of Worker init is to configure various fd.
Here obj inqW_ fd = self. inq._ writer. Fileno() is to get the corresponding FD from the corresponding Connection of queues:
class _ConnectionBase(object): _handle = None def fileno(self): """File descriptor or handle of the connection""" self._check_closed() return self._handle
Specific Worker definitions are as follows:
class Worker(object): def __init__(self, inq, outq, synq=None, initializer=None, initargs=(),...): ...... self.max_memory_per_child = max_memory_per_child self._shutdown = sentinel self.inq, self.outq, self.synq = inq, outq, synq self.contribute_to_object(self) def contribute_to_object(self, obj): obj.inq, obj.outq, obj.synq = self.inq, self.outq, self.synq obj.inqW_fd = self.inq._writer.fileno() # inqueue write fd obj.outqR_fd = self.outq._reader.fileno() # outqueue read fd if self.synq: obj.synqR_fd = self.synq._reader.fileno() # synqueue read fd obj.synqW_fd = self.synq._writer.fileno() # synqueue write fd obj.send_syn_offset = _get_send_offset(self.synq._writer) else: obj.synqR_fd = obj.synqW_fd = obj._send_syn_offset = None obj._quick_put = self.inq._writer.send obj._quick_get = self.outq._reader.recv obj.send_job_offset = _get_send_offset(self.inq._writer) return obj
The variables are:
self = {Worker} initargs = {tuple: 2} (<Celery tasks at 0x7f8a0a70dd30>, ) inq = {_SimpleQueue} <billiard.queues._SimpleQueue object at 0x7f8a0b66aba8> inqW_fd = {int} 7 max_memory_per_child = {NoneType} None maxtasks = {NoneType} None on_ready_counter = {Synchronized} <Synchronized wrapper for c_int(0)> outq = {_SimpleQueue} <billiard.queues._SimpleQueue object at 0x7f8a0b6844a8> outqR_fd = {int} 8 sigprotection = {bool} False synq = {NoneType} None synqR_fd = {NoneType} None synqW_fd = {NoneType} None wrap_exception = {bool} True
The logic of AsynPool simplified version is as follows:
Note in the following figure:
Worker is the logical business code of sub process_ Pool is a process pool, ForkProcess (that is, WorkerProcess) is a sub process abstraction, and each sub process abstraction will run worker. These logical concepts must be clearly distinguished.
+--------------------------+ | AsynPool | | | | | | ResultHandler +-------> celery.concurrency.asynpool.ResultHandler | | | Supervisor +-------> billiard.pool.Supervisor | | | TaskHandler +-------> billiard.pool.TaskHandler | | | TimeoutHandler +-------> billiard.pool.TimeoutHandler | | | Worker +-------> celery.concurrency.asynpool.Worker | | | _pool +-----------------+---> <ForkProcess(ForkPoolWorker-1, started daemon)> +--------------------------+ | +---> <ForkProcess(ForkPoolWorker-2, started daemon)> | +---> <ForkProcess(ForkPoolWorker-3, started daemon)> | +---> <ForkProcess(ForkPoolWorker-4, started daemon)>
Mobile phones are as follows
The logic of fine version is as follows:
+------------------------------+ +----------------+ | Pool(bootsteps.StartStopStep)| +-----------------+ | Connection | +-------------+--------------+ | _SimpleQueue | | | | | | | _write | | | _reader +---------> | _read | | | | | _send | 1 | instantiate | | | _recv | | | | | _handle+---> {int} 8 <-+ 2 on_start | | | +----------------+ | | | _poll +---------> _ConnectionBase.poll | +-------------+ | | | | | | | | | +----------------+ | | | v | _writer +---------> | Connection | | | +---+---+-----------+ | | | | | | | TaskPool | +-------+---------+ | _handle+----> {int} 7 | | | | +------+ ^ | | | | | app +----------> |celery| | +----------------+ ^ | | | | +------+ | | | | | | + | | | | | +--------------------------+ +----> (<_SimpleQueue>, <_SimpleQueue>) | | | | _pool +----------> | AsynPool | | | | | | | | | | | | | +---+---------------+ | _queues +------->-----> (<_SimpleQueue>, <_SimpleQueue>) | | | ^ | | | | | | | | | | | | | | | | +----> (<_SimpleQueue>, <_SimpleQueue>) | | +-------------+ | | | | | | | | | | +--------------------------+ +----> (<_SimpleQueue>, <_SimpleQueue>) | | | | +----------------------+ | | | | | | | Worker inq | | | | | | | | outq | | | | | | | | synq | | | | | | | | inqW_fd +-----------------------------------------+ | | | | | outqR_fd +------------------------------------------------+ | | | workloop | | | | after_fork | | | +----------------------+
Mobile phones are as follows:
3.3.1.2 sub process Abstract encapsulation - WorkerProcess
WorkerProcess is actually an abstract encapsulation of subprocesses from fork. It is used to conveniently and quickly manage sub processes and abstract them into a process pool. This w will record some meta information of sub processes from fork, such as pid, fd of pipeline reading and writing, etc., and register them in the main process, which can be used by the main process for task distribution;
The role of WorkerProcess is to encapsulate ForkProcess. ForkProcess is defined as follows:
class ForkProcess(process.BaseProcess): _start_method = 'fork' @staticmethod def _Popen(process_obj): from .popen_fork import Popen return Popen(process_obj)
3.3.1.2.1 specific implementation of workerprocess
The specific implementation of WorkerProcess is as follows:
def WorkerProcess(self, worker): worker = super().WorkerProcess(worker) worker.dead = False return worker
First execute the code in the base class, so finally return ForkProcess:
def Process(self, *args, **kwds): return self._Process(*args, **kwds) def WorkerProcess(self, worker): return worker.contribute_to_object(self.Process(target=worker))
In self_ In the process (* args, * * kwds) call, the related variables are:
self._Process = {type} <class 'billiard.context.ForkProcess'> args = {tuple: 0} () kwds = {dict: 1} {'target': <celery.concurrency.asynpool.Worker object at 0x7f9c306326a0>} self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7f9c30604da0>
So it calls the ForkProcess(process.BaseProcess) base class.
3.3.1.2.2 base class BaseProcess
The BaseProcess base class is as follows. Note that run here is the loop of the child process_ target is the running code of the child process.
_target = {Worker} <celery.concurrency.asynpool.Worker object at 0x7f9ad358b240>
It is defined as follows:
class BaseProcess(object): ''' Process objects represent activity that is run in a separate process The class is analagous to `threading.Thread` ''' def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None, **_kw): count = next(_process_counter) self._identity = _current_process._identity + (count, ) self._config = _current_process._config.copy() self._parent_pid = os.getpid() self._popen = None self._target = target self._args = tuple(args) self._kwargs = dict(kwargs) self._name = ( name or type(self).__name__ + '-' + ':'.join(str(i) for i in self._identity) ) if daemon is not None: self.daemon = daemon if _dangling is not None: _dangling.add(self) self._controlled_termination = False def run(self): ''' Method to be run in sub-process; can be overridden in sub-class ''' if self._target: self._target(*self._args, **self._kwargs)
After the base class is processed, ForkProcess is obtained
self = {ForkProcess} <ForkProcess(ForkProcess-1, initial)> authkey = {AuthenticationString: 32} b'' daemon = {bool} False exitcode = {NoneType} None ident = {NoneType} None name = {str} 'ForkProcess-1' pid = {NoneType} None _args = {tuple: 0} () _authkey = {AuthenticationString: 32} _children = {set: 0} set() _config = {dict: 2} {'authkey': b'', 'semprefix': '/mp'} _counter = {count} count(2) _daemonic = {bool} False _identity = {tuple: 1} 1 _kwargs = {dict: 0} {} _name = {str} 'ForkProcess-1' _parent_pid = {int} 14747 _popen = {NoneType} None _start_method = {str} 'fork' _target = {Worker} <celery.concurrency.asynpool.Worker object at 0x7f9ad358b240> _tempdir = {NoneType} None
3.3.1.2.3 adding process list
After the child process is generated, self_ pool. The function of append (W) is to add the child process to the list of child processes in the parent process. And configure queues.
def _create_worker_process(self, i): sentinel = self._ctx.Event() if self.allow_restart else None inq, outq, synq = self.get_process_queues() on_ready_counter = self._ctx.Value('i') w = self.WorkerProcess(self.Worker( inq, outq, synq, self._initializer, self._initargs, self._maxtasksperchild, sentinel, self._on_process_exit, # Need to handle all signals if using the ipc semaphore, # to make sure the semaphore is released. sigprotection=self.threads, wrap_exception=self._wrap_exception, max_memory_per_child=self._max_memory_per_child, on_ready_counter=on_ready_counter, )) self._pool.append(w) # Run here. self._process_register_queues(w, (inq, outq, synq)) #Here we are
The variables are as follows:
self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7f9ad36680f0> ResultHandler = {type} <class 'celery.concurrency.asynpool.ResultHandler'> SoftTimeLimitExceeded = {type} <class 'billiard.exceptions.SoftTimeLimitExceeded'> Supervisor = {type} <class 'billiard.pool.Supervisor'> TaskHandler = {type} <class 'billiard.pool.TaskHandler'> TimeoutHandler = {type} <class 'billiard.pool.TimeoutHandler'> Worker = {type} <class 'celery.concurrency.asynpool.Worker'> ...... outbound_buffer = {deque: 0} deque([]) readers = {dict: 0} {} restart_state = {restart_state} <billiard.common.restart_state object at 0x7f9ad3668e80> sched_strategy = {int} 4 timers = {dict: 1} {<bound method Pool.maintain_pool of <celery.concurrency.asynpool.AsynPool object at 0x7f9ad36680f0>>: 5.0} write_stats = {Counter: 0} Counter() _Process = {type} <class 'billiard.context.ForkProcess'> _active_writers = {set: 0} set() _active_writes = {set: 0} set() _all_inqueues = {set: 0} set() _busy_workers = {set: 0} set() _cache = {dict: 0} {} _ctx = {ForkContext} <billiard.context.ForkContext object at 0x7f9ad27ad828> _fileno_to_inq = {dict: 0} {} _fileno_to_outq = {dict: 0} {} _fileno_to_synq = {dict: 0} {} _initargs = {tuple: 2} (<Celery myTest at 0x7f9ad270c128>, 'celery@me2koreademini') _inqueue = {NoneType} None _max_memory_per_child = {NoneType} None _maxtasksperchild = {NoneType} None _on_ready_counters = {dict: 0} {} _outqueue = {NoneType} None _poll_result = {NoneType} None _pool = {list: 1} [<ForkProcess(ForkPoolWorker-1, initial daemon)>] _poolctrl = {dict: 0} {} _proc_alive_timeout = {float} 4.0 _processes = {int} 4 _putlock = {LaxBoundedSemaphore} <LaxBoundedSemaphore at 0x7f9ad354db70 value:4 waiting:0> _queues = {dict: 4} {(<billiard.queues._SimpleQueue object at 0x7f9ad35acef0>, <billiard.queues._SimpleQueue object at 0x7f9ad3668160>, None): <ForkProcess(ForkPoolWorker-1, initial daemon)>, (<billiard.queues._SimpleQueue object at 0x7f9ad36684a8>, <billiard.queues._SimpleQu _quick_get = {NoneType} None _quick_put = {NoneType} None _state = {int} 0 _taskqueue = {Queue} <queue.Queue object at 0x7f9ad2a30908> _waiting_to_start = {set: 0} set() _wrap_exception = {bool} True sentinel = {NoneType} None synq = {NoneType} None
3.3.1.3 fork process
w.start() contains a specific fork procedure.
def _create_worker_process(self, i): sentinel = self._ctx.Event() if self.allow_restart else None inq, outq, synq = self.get_process_queues() on_ready_counter = self._ctx.Value('i') w = self.WorkerProcess(self.Worker( inq, outq, synq, self._initializer, self._initargs, self._maxtasksperchild, sentinel, self._on_process_exit, # Need to handle all signals if using the ipc semaphore, # to make sure the semaphore is released. sigprotection=self.threads, wrap_exception=self._wrap_exception, max_memory_per_child=self._max_memory_per_child, on_ready_counter=on_ready_counter, )) self._pool.append(w) self._process_register_queues(w, (inq, outq, synq)) w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.index = i w.start() # At this point, we will fork. self._poolctrl[w.pid] = sentinel self._on_ready_counters[w.pid] = on_ready_counter if self.on_process_up: self.on_process_up(w) return w
The specific codes are as follows:
class BaseProcess(object): ''' Process objects represent activity that is run in a separate process The class is analagous to `threading.Thread` ''' def start(self): ''' Start child process ''' assert self._popen is None, 'cannot start a process twice' assert self._parent_pid == os.getpid(), \ 'can only start a process object created by current process' _cleanup() self._popen = self._Popen(self) self._sentinel = self._popen.sentinel _children.add(self)
The main one is self_ popen = self._ Popen (self) is more important. Let's take a look at the source code of Popen_ launch:
class ForkProcess(process.BaseProcess): _start_method = 'fork' @staticmethod def _Popen(process_obj): from .popen_fork import Popen return Popen(process_obj)
Code in: / billiard/popen_fork.py.
See here, we should understand. When the launch method is executed, OS Fork() derives a sub process, and uses ps.pipe() to create a pair of read-write pipes. Then, by comparing whether [self.pid] is 0, different logic is executed in the main process and sub process:
- The subprocess closes the read pipeline and then executes process_obj._bootstrap() method.
- The parent process closes the write pipeline and records the fd of the read pipeline.
class Popen(object): method = 'fork' sentinel = None def __init__(self, process_obj): sys.stdout.flush() sys.stderr.flush() self.returncode = None self._launch(process_obj) def duplicate_for_child(self, fd): return fd def poll(self, flag=os.WNOHANG): if self.returncode is None: while True: try: pid, sts = os.waitpid(self.pid, flag) except OSError as e: if e.errno == errno.EINTR: continue # Child process not yet created. See #1731717 # e.errno == errno.ECHILD == 10 return None else: break if pid == self.pid: if os.WIFSIGNALED(sts): self.returncode = -os.WTERMSIG(sts) else: assert os.WIFEXITED(sts) self.returncode = os.WEXITSTATUS(sts) return self.returncode def _launch(self, process_obj): code = 1 parent_r, child_w = os.pipe() self.pid = os.fork() if self.pid == 0: try: os.close(parent_r) if 'random' in sys.modules: import random random.seed() code = process_obj._bootstrap() finally: os._exit(code) else: os.close(child_w) self.sentinel = parent_r
3.4.2 auxiliary management Supervisor
The Supervisor class will maintain the thread pool regularly, such as whether dynamic scaling is required.
class Supervisor(PoolThread): def __init__(self, pool): self.pool = pool super(Supervisor, self).__init__() def body(self): time.sleep(0.8) pool = self.pool try: # do a burst at startup to verify that we can start # our pool processes, and in that time we lower # the max restart frequency. prev_state = pool.restart_state pool.restart_state = restart_state(10 * pool._processes, 1) for _ in range(10): if self._state == RUN and pool._state == RUN: pool._maintain_pool() time.sleep(0.1) # Keep maintaing workers until the cache gets drained, unless # the pool is termianted pool.restart_state = prev_state while self._state == RUN and pool._state == RUN: pool._maintain_pool() time.sleep(0.8) except RestartFreqExceeded: pool.close() pool.join() raise
3.3.3 assigning tasks to subprocesses -- TaskHandler
This class is responsible for the specific business, that is, passing the task message from the parent process to the child process.
In the previous task handler, the important point is
-
Put self_ The taskqueue is passed in, so that the task message can be delivered through this in the future_ Taskqueue is a simple data structure application used to buffer messages between the Celery Consumer worker and the pool.
-
Put self_ quick_ Put is passed in and assigned to put, that is, put points to self_ inqueue. put;
-
In this way, TaskHandler gives the pipeline before the parent-child process through put(task)_ Inqueue sends a message. In other words, within TaskHandler, if the parent process receives a message, it passes self_ inqueue. The put function of this pipeline sends messages to its child processes. self._taskqueue is just an intermediate variable.
At this time, the sources of various queue s are:
self._taskqueue = Queue() def _setup_queues(self): self._inqueue = Queue() self._outqueue = Queue() self._quick_put = self._inqueue.put self._quick_get = self._outqueue.get self._task_handler = self.TaskHandler(self._taskqueue, self._quick_put, self._outqueue, self._pool, self._cache)
So the variables during initialization are:
outqueue = {SimpleQueue} <billiard.queues.SimpleQueue object at 0x000001B55131AE88> pool = {list: 8} [<SpawnProcess(SpawnPoolWorker-1, started daemon)>, <SpawnProcess(SpawnPoolWorker-2, started daemon)>, <SpawnProcess(SpawnPoolWorker-3, started daemon)>, <SpawnProcess(SpawnPoolWorker-4, started daemon)>, <SpawnProcess(SpawnPoolWorker-5, started daemon)>, <SpawnProcess(SpawnPoolWorker-6, started daemon)>, <SpawnProcess(SpawnPoolWorker-7, started daemon)>, <SpawnProcess(SpawnPoolWorker-8, started daemon)>] put = {method} <bound method _ConnectionBase.send of <billiard.connection.PipeConnection object at 0x000001B55131AF08>> self = {TaskHandler} Unable to get repr for <class 'billiard.pool.TaskHandler'> taskqueue = {Queue} <queue.Queue object at 0x000001B551334308>
The code of the simplified version of TaskHandler is as follows:
class TaskHandler(PoolThread): def __init__(self, taskqueue, put, outqueue, pool, cache): self.taskqueue = taskqueue self.put = put self.outqueue = outqueue self.pool = pool self.cache = cache super(TaskHandler, self).__init__() def body(self): cache = self.cache taskqueue = self.taskqueue put = self.put for taskseq, set_length in iter(taskqueue.get, None): task = None i = -1 try: for i, task in enumerate(taskseq): if self._state: break put(task) else: if set_length: set_length(i + 1) continue break self.tell_others() def tell_others(self): outqueue = self.outqueue put = self.put pool = self.pool try: # tell result handler to finish when cache is empty outqueue.put(None) # tell workers there is no more work for p in pool: put(None) def on_stop_not_started(self): self.tell_others()
At this time, the logic is:
Note: the Worker scope in the figure here is Celery / apps / worker Py, which belongs to the logical category of celery, is not a related concept of sub process (the same as the following figures). There are multiple classes with the same name in celery, which is very tangled.
+ Consumer | message | v strategy +------------------------------------+ +------------+------+ | strategies | | on_task_received | <--------+ | | | | |[myTest.add : task_message_handler] | +------------+------+ +------------------------------------+ | | +------------------------------------------------------------------------------------+ strategy | | | v Request [myTest.add] +------------+-------------+ +---------------------+ | task_message_handler | <-------------------+ | create_request_cls | | | | | +------------+-------------+ +---------------------+ | _process_task_sem | +------------------------------------------------------------------------------------+ Worker | req[{Request} myTest.add] v +--------+-----------+ | WorkController | | | | pool +-------------------------+ +--------+-----------+ | | | | apply_async v +-----------+----------+ +---+-------------------+ |{Request} myTest.add | +---------------> | TaskPool | +----------------------+ +----+------------------+ myTest.add | | +--------------------------------------------------------------------------------------+ | v +----+------------------+ | billiard.pool.Pool | +-------+---------------+ | | Pool +---------------------------+ | | TaskHandler | | | | | self._taskqueue.put | _taskqueue | <---------------+ | | +---------------------------+
Mobile phones are as follows:
3.4.3 processing subprocess return - ResultHandler
The parent process uses ResultHandler to handle the running return of the child process.
def create_result_handler(self): return super().create_result_handler( fileno_to_outq=self._fileno_to_outq, on_process_alive=self.on_process_alive, ) class ResultHandler(_pool.ResultHandler): """Handles messages from the pool processes.""" def __init__(self, *args, **kwargs): self.fileno_to_outq = kwargs.pop('fileno_to_outq') self.on_process_alive = kwargs.pop('on_process_alive') super().__init__(*args, **kwargs) # add our custom message handler self.state_handlers[WORKER_UP] = self.on_process_alive
Specific variables are as follows:
ResultHandler = {type} <class 'celery.concurrency.asynpool.ResultHandler'> daemon = {property} <property object at 0x7f847454d638> fdel = {NoneType} None exitcode = {property} <property object at 0x7f8475c9e8b8> fdel = {NoneType} None fset = {NoneType} None ident = {property} <property object at 0x7f847454d4f8> fdel = {NoneType} None fset = {NoneType} None name = {property} <property object at 0x7f847454d598> fdel = {NoneType} None _initialized = {bool} False
The specific code is as follows. You can see the use of poll to block the waiting message.
def _process_result(self, timeout=1.0): poll = self.poll on_state_change = self.on_state_change while 1: try: ready, task = poll(timeout) if ready: on_state_change(task) if timeout != 0: # blocking break else: break yield def handle_event(self, fileno=None, events=None): if self._state == RUN: if self._it is None: self._it = self._process_result(0) # non-blocking try: next(self._it) except (StopIteration, CoroStop): self._it = None
The specific poll corresponds to_ poll_result is self_ outqueue._ reader. poll(timeout).
It can be seen that it is blocked on the outqueue, which is the pipeline outgoing interface of the child process.
def _setup_queues(self): self._inqueue = self._ctx.SimpleQueue() self._outqueue = self._ctx.SimpleQueue() self._quick_put = self._inqueue._writer.send self._quick_get = self._outqueue._reader.recv def _poll_result(timeout): if self._outqueue._reader.poll(timeout): return True, self._quick_get() return False, None self._poll_result = _poll_result
So the logic is as follows:
+ Consumer | message | v strategy +------------------------------------+ +------------+------+ | strategies | | on_task_received | <--------+ | | | | |[myTest.add : task_message_handler] | +------------+------+ +------------------------------------+ | | +------------------------------------------------------------------------------------+ strategy | | | v Request [myTest.add] +------------+-------------+ +---------------------+ | task_message_handler | <-------------------+ | create_request_cls | | | | | +------------+-------------+ +---------------------+ | _process_task_sem | +------------------------------------------------------------------------------------+ Worker | req[{Request} myTest.add] v +--------+-----------+ | WorkController | | | | pool +-------------------------+ +--------+-----------+ | | | | apply_async v +-----------+----------+ +---+-------------------+ |{Request} myTest.add | +---------------> | TaskPool | +----------------------+ +----+------------------+ myTest.add | | +--------------------------------------------------------------------------------------+ | v +----+------------------+ | billiard.pool.Pool | +-------+---------------+ | | Pool +---------------------------+ | | TaskHandler | | | | | self._taskqueue.put | _taskqueue | <---------------+ | | +------------+--------------+ | | put(task) | | +------------------+ | | ResultHandler | | +------------------+ | | ^ | | | | +--------------------------------------------------------------------------------------+ | | Sub process | | v + self._inqueue self._outqueue
Mobile phones are as follows:
3.5 configure the relationship between file and queue
Finally, configure the relationship between file and queue according to the result of establishing sub process.
It can be seen that the relationship between outq and synq is configured here, that is, which child process these queue s point to.
The code is as follows:
class AsynPool(_pool.Pool): """AsyncIO Pool (no threads).""" def __init__(self, processes=None, synack=False, sched_strategy=None, proc_alive_timeout=None, *args, **kwargs): ...... super().__init__(processes, *args, **kwargs) for proc in self._pool: # create initial mappings, these will be updated # as processes are recycled, or found lost elsewhere. self._fileno_to_outq[proc.outqR_fd] = proc self._fileno_to_synq[proc.synqW_fd] = proc
After the configuration is completed fd, it is:
self._fileno_to_outq = {dict: 4} 8 = {ForkProcess} <ForkProcess(ForkPoolWorker-1, started daemon)> 12 = {ForkProcess} <ForkProcess(ForkPoolWorker-2, started daemon)> 16 = {ForkProcess} <ForkProcess(ForkPoolWorker-3, started daemon)> 20 = {ForkProcess} <ForkProcess(ForkPoolWorker-4, started daemon)> __len__ = {int} 4 self._fileno_to_synq = {dict: 1} {None: <ForkProcess(ForkPoolWorker-4, started daemon)>}
3.6 overall results of asynpool
The final result of AsynPool is as follows. We can see various internal variables, which can be understood according to the previous text:
self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7fe44f664128> ResultHandler = {type} <class 'celery.concurrency.asynpool.ResultHandler'> SoftTimeLimitExceeded = {type} <class 'billiard.exceptions.SoftTimeLimitExceeded'> Supervisor = {type} <class 'billiard.pool.Supervisor'> TaskHandler = {type} <class 'billiard.pool.TaskHandler'> TimeoutHandler = {type} <class 'billiard.pool.TimeoutHandler'> Worker = {type} <class 'celery.concurrency.asynpool.Worker'> allow_restart = {bool} False enable_timeouts = {bool} True lost_worker_timeout = {float} 10.0 max_restarts = {int} 100 on_process_down = {NoneType} None on_process_up = {NoneType} None on_timeout_cancel = {NoneType} None on_timeout_set = {NoneType} None outbound_buffer = {deque: 0} deque([]) process_sentinels = {list: 4} [25, 27, 29, 31] putlocks = {bool} False readers = {dict: 0} {} restart_state = {restart_state} <billiard.common.restart_state object at 0x7fe44f6644a8> sched_strategy = {int} 4 soft_timeout = {NoneType} None synack = {bool} False threads = {bool} False timeout = {NoneType} None timers = {dict: 1} {<bound method Pool.maintain_pool of <celery.concurrency.asynpool.AsynPool object at 0x7fe44f664128>>: 5.0} write_stats = {Counter: 0} Counter() _Process = {type} <class 'billiard.context.ForkProcess'> _active_writers = {set: 0} set() _active_writes = {set: 0} set() _all_inqueues = {set: 0} set() _busy_workers = {set: 0} set() _cache = {dict: 0} {} _ctx = {ForkContext} <billiard.context.ForkContext object at 0x7fe44e7ac7f0> _fileno_to_inq = {dict: 0} {} _fileno_to_outq = {dict: 4} {8: <ForkProcess(ForkPoolWorker-1, started daemon)>, 12: <ForkProcess(ForkPoolWorker-2, started daemon)>, 16: <ForkProcess(ForkPoolWorker-3, started daemon)>, 20: <ForkProcess(ForkPoolWorker-4, stopped[SIGABRT] daemon)>} _fileno_to_synq = {dict: 1} {None: <ForkProcess(ForkPoolWorker-4, stopped[SIGABRT] daemon)>} _initargs = {tuple: 2} (<Celery myTest at 0x7fe44e61cb38>, 'celery@me2koreademini') _inqueue = {NoneType} None _max_memory_per_child = {NoneType} None _maxtasksperchild = {NoneType} None _on_ready_counters = {dict: 4} {14802: <Synchronized wrapper for c_int(0)>, 14803: <Synchronized wrapper for c_int(0)>, 14804: <Synchronized wrapper for c_int(0)>, 14806: <Synchronized wrapper for c_int(0)>} _outqueue = {NoneType} None _poll_result = {NoneType} None _pool = {list: 4} [<ForkProcess(ForkPoolWorker-1, started daemon)>, <ForkProcess(ForkPoolWorker-2, started daemon)>, <ForkProcess(ForkPoolWorker-3, started daemon)>, <ForkProcess(ForkPoolWorker-4, stopped[SIGABRT] daemon)>] _poolctrl = {dict: 4} {14802: None, 14803: None, 14804: None, 14806: None} _proc_alive_timeout = {float} 4.0 _processes = {int} 4 _putlock = {LaxBoundedSemaphore} <LaxBoundedSemaphore at 0x7fe44f54bf98 value:4 waiting:0> _queues = {dict: 4} {(<billiard.queues._SimpleQueue object at 0x7fe44f664160>, <billiard.queues._SimpleQueue object at 0x7fe44f664240>, None): <ForkProcess(ForkPoolWorker-1, started daemon)>, (<billiard.queues._SimpleQueue object at 0x7fe44f664550>, <billiard.queues._SimpleQu _quick_get = {NoneType} None _quick_put = {NoneType} None _result_handler = {ResultHandler} <ResultHandler(Thread-170, initial daemon)> _state = {int} 0 _task_handler = {TaskHandler} <TaskHandler(Thread-168, initial daemon)> _taskqueue = {Queue} <queue.Queue object at 0x7fe44f664978> _terminate = {Finalize} <Finalize object, callback=_terminate_pool, args=(<queue.Queue object at 0x7fe44f664978>, None, None, [<ForkProcess(ForkPoolWorker-1, started daemon)>, <ForkProcess(ForkPoolWorker-2, started daemon)>, <ForkProcess(ForkPoolWorker-3, started daemon)>, <ForkP _timeout_handler = {TimeoutHandler} <TimeoutHandler(Thread-169, initial daemon)> _timeout_handler_mutex = {DummyLock} <kombu.asynchronous.semaphore.DummyLock object at 0x7fe44f6cb7b8> _timeout_handler_started = {bool} False _waiting_to_start = {set: 0} set() _worker_handler = {Supervisor} <Supervisor(Thread-151, initial daemon)> _wrap_exception = {bool} True
Therefore, the final figure of this article is as follows, in which worker is the working code of sub process and ForkProcess is the abstraction of sub process (only one is shown here):
+------------------------------+ +----------------+ | Pool(bootsteps.StartStopStep)| +-----------------+ | Connection | +-------------+--------------+ | _SimpleQueue | | | | | | | _write | | | _reader +---------> | _read | | | | | _send | 1 | instantiate | | | _recv | | | | | _handle+---> {int} 8 <-+ 2 on_start | | | +----------------+ | | | _poll +---------> _ConnectionBase.poll | +-------------+ | | | | | | | | | +----------------+ | | | v | _writer +---------> | Connection | | | +---+---+-----------+ | | | | | | | TaskPool | +-------+---------+ | _handle+----> {int} 7 | | | | +------+ ^ | | | | | app +----------> |celery| | +----------------+ ^ | | | | +------+ | | | | | | + | | | | | +--------------------------+ +----> (<_SimpleQueue>, <_SimpleQueue>) | | | | _pool +----------> | AsynPool | | | | | | | | | | | | | +---+---------------+ | _queues +------->-----> (<_SimpleQueue>, <_SimpleQueue>) | | | ^ | | | | | | | | _fileno_to_inq | | | | | | | | +----> (<_SimpleQueue>, <_SimpleQueue>) | | +-------------+ | _fileno_to_outq +--+ | | | | | | | | | | _queues[queues] | | +----> (<_SimpleQueue>, <_SimpleQueue>) | | | + | | | | | _pool | | | +----------------------+ | | | + | | | | | | | +--------------------------+ | | Worker inq | | | | | | | | | | | | | | outq | | | 2.1 append(w) | | | | | | | | | | | synq | | | v | | | | | | +-------------+--+ | | | inqW_fd +-----------------------------------------+ | | | <-+ | | | | | ForkProcess | | | outqR_fd +------------------------------------------------+ | | <------+ | | | | | workloop | | _target +------------> | | | | | after_fork | | | | | +----------------+ +----------------------+
Mobile phones are as follows:
0xFF reference
Celery source code learning (II) multi process model
celery source code analysis - worker initialization analysis (Part 2)
★★★★★★★ thinking about life and technology ★★★★★★
Wechat public account: Rossi's thinking
If you want to get a personal message or push the technical information, you can scan the following two-dimensional code (or long Click to identify the two-dimensional code) and pay attention to the official account number.