[source code analysis] the child process of parallel distributed task queue Celery processes messages

[source code analysis] the child process of parallel distributed task queue Celery processes messages

0x00 summary

Celery is a simple, flexible and reliable distributed system that processes a large number of messages. It focuses on asynchronous task queue for real-time processing, and also supports task scheduling. In the previous article, we introduced the celery multithreading model. This article introduces how the subprocess processes messages.

Through this article, you can sort out the following processes:

  • How the parent process sends messages to the child process;
  • How the child process receives the parent process message;
  • How the subprocess parses the message step by step, so as to peel off all kinds of information needed to run the task layer by layer;
  • How to run the task after the subprocess gets the task information;
  • Why should Celery have all kinds of complex and cumbersome packaging?

0x01 origin

Let's first review the foregoing. Before, there was apply in the Celery work_ Async function is called to Pool, that is, when a user's task message comes, Celery is ready to call to Pool.

def apply_async(self, func, args=(), kwds={},...):           
        if self.threads:
            self._taskqueue.put(([(TASK, (result._job, None,
                                func, args, kwds))], None))
        else:
            self._quick_put((TASK, (result._job, None, func, args, kwds)))
        return result

Then, in Billiard / Pool Py you can see here that the Pool will be self_ Taskqueue is used as the medium to pass the message to TaskHandler, and then it will be called to the child process.

class Pool(object):
    '''
    Class which supports an async version of applying functions to arguments.
    '''
    Worker = Worker
    Supervisor = Supervisor
    TaskHandler = TaskHandler
    TimeoutHandler = TimeoutHandler
    ResultHandler = ResultHandler

    def __init__(self, processes=None, initializer=None, initargs=(),...):

        self._task_handler = self.TaskHandler(self._taskqueue,
                                              self._quick_put,
                                              self._outqueue,
                                              self._pool,
                                              self._cache)
        if threads:
            self._task_handler.start()

At this time, the logic is as shown in the legend above:

                           +
    Consumer               |
                   message |
                           v         strategy  +------------------------------------+
              +------------+------+            | strategies                         |
              | on_task_received  | <--------+ |                                    |
              |                   |            |[myTest.add : task_message_handler] |
              +------------+------+            +------------------------------------+
                           |
                           |
   +------------------------------------------------------------------------------------+
   strategy                |
                           |
                           |
                           v                Request [myTest.add]
              +------------+-------------+                       +---------------------+
              | task_message_handler     | <-------------------+ | create_request_cls  |
              |                          |                       |                     |
              +------------+-------------+                       +---------------------+
                           | _process_task_sem
                           |
  +------------------------------------------------------------------------------------+
   Worker                  | req[{Request} myTest.add]
                           v
                  +--------+-----------+
                  | WorkController     |
                  |                    |
                  |            pool +-------------------------+
                  +--------+-----------+                      |
                           |                                  |
                           |               apply_async        v
               +-----------+----------+                   +---+-------------------+
               |{Request} myTest.add  | +---------------> | TaskPool              |
               +----------------------+                   +----+------------------+
                                          myTest.add           |
                                                               |
+--------------------------------------------------------------------------------------+
                                                               |
                                                               v
                                                          +----+------------------+
                                                          | billiard.pool.Pool    |
                                                          +-------+---------------+
                                                                  |
                                                                  |
 Pool              +---------------------------+                  |
                   | TaskHandler               |                  |
                   |                           |                  |  self._taskqueue.put
                   |              _taskqueue   |  <---------------+
                   |                           |
                   +------------+--------------+
                                |
                                |  put(task)
                                |
+--------------------------------------------------------------------------------------+
                                |
 Sub process                    |
                                v
                            self._inqueue                       

Mobile phones are as follows:

Note: some students pointed out that there was an error in Celery multithreading on Windows. It is hereby explained.

My environment is complex, including Mac, Linux and windows, and there are multiple operating systems, so I have no choice to switch.

When analyzing part of the code in this article, you only have Windows, so you can only post the specific debugging variables under Windows.

In fact, the specific OS is not important. The important thing is to analyze the design idea of Celery through code.

So we followed the taskqueue to the TaskHandler.

0x02 parent process TaskHandler

This section describes how the parent process passes task messages to the child process.

At this time, it is still the parent process. The code location is: \ billiard \ pool py. The specific stack is:

_send_bytes, connection.py:314
send, connection.py:233
body, pool.py:596
run, pool.py:504
_bootstrap_inner, threading.py:926
_bootstrap, threading.py:890

The variables are:

self = {TaskHandler} <TaskHandler(Thread-16, started daemon 14980)>
 additional_info = {PyDBAdditionalThreadInfo} State:2 Stop:None Cmd: 107 Kill:False
 cache = {dict: 1} {0: <%s: 0 ack:False ready:False>}
 daemon = {bool} True
 name = {str} 'Thread-16'
 outqueue = {SimpleQueue} <billiard.queues.SimpleQueue object at 0x000001E2C07DD6C8>
 pool = {list: 8} [<SpawnProcess(SpawnPoolWorker-1, started daemon)>, <SpawnProcess(SpawnPoolWorker-2, started daemon)>, <SpawnProcess(SpawnPoolWorker-3, started daemon)>, <SpawnProcess(SpawnPoolWorker-4, started daemon)>, <SpawnProcess(SpawnPoolWorker-5, started daemon)>, <SpawnProcess(SpawnPoolWorker-6, started daemon)>, <SpawnProcess(SpawnPoolWorker-7, started daemon)>, <SpawnProcess(SpawnPoolWorker-8, started daemon)>]
 taskqueue = {Queue} <queue.Queue object at 0x000001E2C07DD208>
  _args = {tuple: 0} ()
  _children = {WeakKeyDictionary: 0} <WeakKeyDictionary at 0x1e2c0883448>
  _daemonic = {bool} True
  _kwargs = {dict: 0} {}
  _name = {str} 'Thread-16'
  _parent = {_MainThread} <_MainThread(MainThread, started 13408)>
  _pid = {NoneType} None
  _start_called = {bool} True
  _started = {Event} <threading.Event object at 0x000001E2C0883D88>
  _state = {int} 0
  _stderr = {LoggingProxy} <celery.utils.log.LoggingProxy object at 0x000001E2C07DD188>
  _target = {NoneType} None
  _tstate_lock = {lock} <locked _thread.lock object at 0x000001E2C081FDB0>
  _was_started = {bool} True

2.1 sending messages

When the parent process receives the task message, it calls put(task) to send a message to the pipeline between the parent process and the child process.

Note that because the previous assignment code is:

self._taskqueue = Queue()

def _setup_queues(self):
        self._inqueue = Queue()
        self._outqueue = Queue()
        self._quick_put = self._inqueue.put
        self._quick_get = self._outqueue.get

In other words, if you receive a message inside the TaskHandler, you can use self_ inqueue. The put function of this pipeline sends messages to its child processes. self._taskqueue is just an intermediate variable medium.

So the variables are as follows:

put = {method} <bound method _ConnectionBase.send of <billiard.connection.PipeConnection object at 0x000001E2C07DD2C8>>

self = {TaskHandler} <TaskHandler(Thread-16, started daemon 14980)>

task = {tuple: 2} 
 0 = {int} 2
 1 = {tuple: 5} (0, None, <function _trace_task_ret at 0x000001E2BFCA3438>, ('myTest.add', 'dee72291-5614-4106-a7bf-007023286e9e', {'lang': 'py', 'task': 'myTest.add', 'id': 'dee72291-5614-4106-a7bf-007023286e9e', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'dee72291-5614-4106-a7bf-007023286e9e', 'parent_id': None, 'argsrepr': '(2, 8)', 'kwargsrepr': '{}', 'origin': 'gen17456@DESKTOP-0GO3RPO', 'reply_to': '21660796-c7e7-3736-9d42-e1be6ff7eaa8', 'correlation_id': 'dee72291-5614-4106-a7bf-007023286e9e', 'hostname': 'celery@DESKTOP-0GO3RPO', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 8], 'kwargs': {}}, b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8'), {})
 __len__ = {int} 2
    
taskqueue = {Queue} <queue.Queue object at 0x000001E2C07DD208>

The specific code is as follows: Send a message to the pipeline and notify the result handler and other worker s:

class TaskHandler(PoolThread):

    def __init__(self, taskqueue, put, outqueue, pool, cache):
        self.taskqueue = taskqueue
        self.put = put
        self.outqueue = outqueue
        self.pool = pool
        self.cache = cache
        super(TaskHandler, self).__init__()

    def body(self):
        cache = self.cache
        taskqueue = self.taskqueue
        put = self.put

        for taskseq, set_length in iter(taskqueue.get, None):
            task = None
            i = -1
            try:
                for i, task in enumerate(taskseq):
                    try:
                        put(task)

                break


        self.tell_others()

2.2 other notices

tell_others is used to notify the result handler and other worker s.

def tell_others(self):
    outqueue = self.outqueue
    put = self.put
    pool = self.pool

    try:
        # tell result handler to finish when cache is empty
        outqueue.put(None)

        # tell workers there is no more work
        for p in pool:
            put(None)

0x03 sub process worker

This section describes how the Worker subprocess accepts tasks and executes them.

Now that the task message has been sent to the child process through the pipeline, the execution comes to the child process. Note that self is billiard pool. Worker.

3.1 sub process loop

In worker, the specific logic of message loop (multiple parsing of messages) is:

  • Call wait_for_job to wait for the message written by the parent process to the pipeline;
  • After the user message req is obtained, it is parsed: type_, args = req;
  • If you need to send an ACK, send it;
  • For the parsed args, parse again: job, i, fun, args, kwargs = args_, Get the job, the function to be executed by the sub process, the parameters of the function, and so on;
  • If you need to wait_for_syn, processing;
  • Indirectly call the user-defined function result = (true, prepare_result (fun (* args, * * kwargs)) through fun, and return result. It should be noted that the fun here is_ trace_task_ret, user-defined function by_ trace_task_ret internal call;
  • Carry out subsequent processing, such as sending READY to the parent process;

The code is as follows:

def workloop(self, debug=debug, now=monotonic, pid=None):
    pid = pid or os.getpid()
    put = self.outq.put
    inqW_fd = self.inqW_fd
    synqW_fd = self.synqW_fd
    maxtasks = self.maxtasks
    prepare_result = self.prepare_result

    wait_for_job = self.wait_for_job
    _wait_for_syn = self.wait_for_syn

    def wait_for_syn(jid):
        i = 0
        while 1:
            if i > 60:
                error('!!!WAIT FOR ACK TIMEOUT: job:%r fd:%r!!!',
                      jid, self.synq._reader.fileno(), exc_info=1)
            req = _wait_for_syn()
            if req:
                type_, args = req # Parse the message req sent by the user
                if type_ == NACK:
                    return False
                assert type_ == ACK
                return True
            i += 1

    completed = 0
    try:
        while maxtasks is None or (maxtasks and completed < maxtasks):
            req = wait_for_job()
            if req:
                type_, args_ = req
                assert type_ == TASK
                job, i, fun, args, kwargs = args_ # Parse again to get the variable. The fun here is`_ trace_task_ret `, the user-defined function is defined by`_ trace_task_ret ` internal call
                put((ACK, (job, i, now(), pid, synqW_fd)))
                if _wait_for_syn:
                    confirm = wait_for_syn(job)
                    if not confirm:
                        continue  # received NACK

                    result = (True, prepare_result(fun(*args, **kwargs)))
 
                    put((READY, (job, i, result, inqW_fd)))

                completed += 1
                if max_memory_per_child > 0:
                    used_kb = mem_rss()
                    if used_kb > 0 and used_kb > max_memory_per_child:
                        warning(MAXMEM_USED_FMT.format(
                            used_kb, max_memory_per_child))
                        return EX_RECYCLE

        if maxtasks:
            return EX_RECYCLE if completed == maxtasks else EX_FAILURE
        return EX_OK
    finally:
        self._ensure_messages_consumed(completed=completed)

At this time, the variables are as follows. The req variable is the message sent by the parent process through the pipeline, and the child process will initially resolve to args_:

prepare_result = {method} <bound method Worker.prepare_result of <billiard.pool.Worker object at 0x000001BFAE5AE308>>
    
put = {method} <bound method _SimpleQueue.put of <billiard.queues.SimpleQueue object at 0x000001BFAE1BE7C8>>
    
type_ = 2 // In pool TASK = 2 is defined in PY
  
req = {tuple: 2} (2, (6, None, <function _trace_task_ret at 0x000001BFAE53EA68>, ('myTest.add', '2c6d431f-a86a-4972-886b-472662401d20', {'lang': 'py', 'task': 'myTest.add', 'id': '2c6d431f-a86a-4972-886b-472662401d20', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '2c6d431f-a86a-4972-886b-472662401d20', 'parent_id': None, 'argsrepr': '(2, 8)', 'kwargsrepr': '{}', 'origin': 'gen14656@DESKTOP-0GO3RPO', 'reply_to': '3c9cc3a7-65d6-349b-ba66-399dc47b7cad', 'correlation_id': '2c6d431f-a86a-4972-886b-472662401d20', 'hostname': 'DESKTOP-0GO3RPO', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 8], 'kwargs': {}, 'is_eager': False, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}, b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8'), {}))

self = {Worker} <billiard.pool.Worker object at 0x000001BFAE5AE308>
    
kwargs = {dict: 0} {}

args_ = (6, None, <function _trace_task_ret at 0x000001BFAE53EA68>, ('myTest.add', '2c6d431f-a86a-4972-886b-472662401d20', {'lang': 'py', 'task': 'myTest.add', 'id': '2c6d431f-a86a-4972-886b-472662401d20', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '2c6d431f-a86a-4972-886b-472662401d20', 'parent_id': None, 'argsrepr': '(2, 8)', 'kwargsrepr': '{}', 'origin': 'gen14656@DESKTOP-0GO3RPO', 'reply_to': '3c9cc3a7-65d6-349b-ba66-399dc47b7cad', 'correlation_id': '2c6d431f-a86a-4972-886b-472662401d20', 'hostname': 'DESKTOP-0GO3RPO', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 8], 'kwargs': {}, 'is_eager': False, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}, b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8'), {}))

For the previous logic diagram, our logic diagram is as follows:

                                                               +
                                                               |
                                                               |
                                                               v
                                                          +----+------------------+
                                                          | billiard.pool.Pool    |
                                                          +-------+---------------+
                                                                  |
                                                                  |
 Pool              +---------------------------+                  |
                   | TaskHandler               |                  |
                   |                           |                  |  self._taskqueue.put
                   |              _taskqueue   |  <---------------+
                   |                           |
                   +------------+--------------+
                                |
                                |  put(task)
                                |
+--------------------------------------------------------------------------------------+
                                |
 billiard.pool.Worker           |  get                             Sub process
                                v
                     +----------+-----------------------------+
                     |  workloop                              |
                     |                                        |
                     |                                        |
                     |          wait_for_job                  |
                     |                                        |
                     +----------------------------------------+

Mobile phones are as follows:

3.2 get parent process message

wait_ for_ The job function is finally called to_ make_recv_method is to use the read function of pipe conn to process.

What you read is the message req passed from the parent process. See the previous section for details.

Review the write message content of the parent process:

put = {method} <bound method _ConnectionBase.send of <billiard.connection.PipeConnection object at 0x000001E2C07DD2C8>>

self = {TaskHandler} <TaskHandler(Thread-16, started daemon 14980)>

task = {tuple: 2} 
 0 = {int} 2
 1 = {tuple: 5} (0, None, <function _trace_task_ret at 0x000001E2BFCA3438>, ('myTest.add', 'dee72291-5614-4106-a7bf-007023286e9e', {'lang': 'py', 'task': 'myTest.add', 'id': 'dee72291-5614-4106-a7bf-007023286e9e', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'dee72291-5614-4106-a7bf-007023286e9e', 'parent_id': None, 'argsrepr': '(2, 8)', 'kwargsrepr': '{}', 'origin': 'gen17456@DESKTOP-0GO3RPO', 'reply_to': '21660796-c7e7-3736-9d42-e1be6ff7eaa8', 'correlation_id': 'dee72291-5614-4106-a7bf-007023286e9e', 'hostname': 'celery@DESKTOP-0GO3RPO', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 8], 'kwargs': {}}, b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8'), {})
 __len__ = {int} 2

You can see that the content written by the parent process is read by the child process. The specific subprocess is through_ make_recv_method to read the message is to use the read function of pipe conn to process it.

This is the subprocess.

    def _make_recv_method(self, conn):
        get = conn.get

        if hasattr(conn, '_reader'):
            _poll = conn._reader.poll
            if hasattr(conn, 'get_payload') and conn.get_payload:
                get_payload = conn.get_payload

                def _recv(timeout, loads=pickle_loads):
                    return True, loads(get_payload())
            else:
                def _recv(timeout):  # noqa
                    if _poll(timeout):
                        return True, get()
                    return False, None
        else:
            def _recv(timeout):  # noqa
                try:
                    return True, get(timeout=timeout)
                except Queue.Empty:
                    return False, None
        return _recv

3.3 parsing messages

After the subprocess reads the message, it parses it. job, i, fun, args, kwargs = args_

In fact, args_ Analyze the contents of one by one.

args_ = (6, None, <function _trace_task_ret at 0x000001BFAE53EA68>, ('myTest.add', '2c6d431f-a86a-4972-886b-472662401d20', {'lang': 'py', 'task': 'myTest.add', 'id': '2c6d431f-a86a-4972-886b-472662401d20', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '2c6d431f-a86a-4972-886b-472662401d20', 'parent_id': None, 'argsrepr': '(2, 8)', 'kwargsrepr': '{}', 'origin': 'gen14656@DESKTOP-0GO3RPO', 'reply_to': '3c9cc3a7-65d6-349b-ba66-399dc47b7cad', 'correlation_id': '2c6d431f-a86a-4972-886b-472662401d20', 'hostname': 'DESKTOP-0GO3RPO', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 8], 'kwargs': {}, 'is_eager': False, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}, b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8'), {}))

So we get:

job = {int} 6

i = {NoneType} None

fun = {function} <function _trace_task_ret at 0x000001BFAE53EA68>

kwargs = {dict: 0} {}

args = {tuple: 6} 
 0 = {str} 'myTest.add'
 1 = {str} '2c6d431f-a86a-4972-886b-472662401d20'
 2 = {dict: 26} {'lang': 'py', 'task': 'myTest.add', 'id': '2c6d431f-a86a-4972-886b-472662401d20', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '2c6d431f-a86a-4972-886b-472662401d20',
 3 = {bytes: 81} b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'
 4 = {str} 'application/json'
 5 = {str} 'utf-8'
 __len__ = {int} 6

In this way, the child process will know what function it needs to call (here is myTest.add) and what parameters the function has (here is (2, 8)).

Let's sort out the message reading and parsing process:

  • Parent process writes task
  • The subprocess is read as req
  • The subprocess resolves req to type_, args_
  • Subprocess parsing args_ They are: job, i, fun, args, kwargs. The fun here is_ trace_task_ret, user-defined function by_ trace_task_ret internal call.
  • args contains user-defined functions and their parameters;

3.3.1 configuration of callback function in parent process

As mentioned just now, the fun parsed for the first time is_ trace_task_ret, user-defined function by_ trace_task_ret internal call.

We need to see where the callback function fun is configured in the parent process.

As we know from the above, when a task is accepted, the task_message_handler uses multiple processes through the Rqeust class.

Note: the Worker scope in this figure is Celery / apps / worker Py, which belongs to the logical category of celery, is not a related concept of sub process. There are multiple classes with the same name in celery, which is very tangled.

                         +
  Consumer               |
                 message |
                         v         strategy  +------------------------------------+
            +------------+------+            | strategies                         |
            | on_task_received  | <--------+ |                                    |
            |                   |            |[myTest.add : task_message_handler] |
            +------------+------+            +------------------------------------+
                         |
                         |
 +------------------------------------------------------------------------------------+
 strategy                |
                         |
                         |
                         v                Request [myTest.add]
            +------------+-------------+                       +---------------------+
            | task_message_handler     | <-------------------+ | create_request_cls  |
            |                          |                       |                     |
            +------------+-------------+                       +---------------------+
                         | _process_task_sem
                         |
+--------------------------------------------------------------------------------------+
 Worker                  | req[{Request} myTest.add]
                         v
                +--------+-----------+
                | WorkController     |
                |                    |       apply_async
                |            pool +-------------------------+
                +--------+-----------+                      |
                         |                                  |
                         |                                  v
             +-----------+----------+                   +---+-------+
             |{Request} myTest.add  | +---------------> | TaskPool  |
             +----------------------+                   +-----------+
                                        myTest.add

Mobile phones are as follows:

Apply called at this time_ Async is actually pool apply_async method.

Execute in the Request class_ using_ In pool, we found that pool apply_ The async parameter is trace_task_ret, so I know, trace_task_ret must be the parameter passed by the parent process.

class Request:
    """A request for task execution."""
    
   def execute_using_pool(self, pool, **kwargs):
        """Used by the worker to send this task to the pool.
        """

        result = pool.apply_async(
            trace_task_ret, # Here it is
            args=(self._type, task_id, self._request_dict, self._body,
                  self._content_type, self._content_encoding), # User defined functions are included here
            accept_callback=self.on_accepted,
            timeout_callback=self.on_timeout,
            callback=self.on_success,
            error_callback=self.on_failure,
            soft_timeout=soft_time_limit or task.soft_time_limit,
            timeout=time_limit or task.time_limit,
            correlation_id=task_id,
        )
        # cannot create weakref to None
        self._apply_result = maybe(ref, result)
        return result    

Call function

As we know from the above, the calling function of Pool is:_ trace_task_ret, i.e_ trace_task_ret is a unified outer encapsulation of user functions. For Pool, call_ trace_task_ret_ trace_task_ret will call user functions internally.

Why not directly call the user function mytest add? But use_ trace_task_ret another layer? From the name with trace, we can see that this is a comprehensive compromise of scalability, debugging, trace and running speed.

There are two core codes:

3.3.1 get the Celery application

The first key point is to obtain the Celery application set in the sub process in advance. The code is as follows:

app = app or current_app._get_current_object()

Here is a question: how can the child process get the Celery application in the parent process.

Although in some multi process mechanisms, the variables of the parent process will be copied to the child process, this is not certain, so there must be a mechanism for the parent process to set the Celery application to the child process.

For details about how the parent process configures the Celery application for the child process and how the child process obtains the detailed analysis of this application, see the previous article.

3.3.2 obtaining tasks

The second key point is: how to obtain and implement the registered task. The code is as follows:

R, I, T, Rstr = trace_task(app.tasks[name], uuid, args, kwargs, request, app=app)

Among them, app Tasks are pre registered variables, which are all tasks in the gallery, including built-in tasks and user tasks.

So app Tasks [name] is to get the corresponding task itself through the task name.

app.tasks = {TaskRegistry: 9} 
 NotRegistered = {type} <class 'celery.exceptions.NotRegistered'>
 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x1bfae596d48>
 'celery.chord' = {chord} <@task: celery.chord of myTest at 0x1bfae596d48>
 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x1bfae596d48>
 'celery.chunks' = {chunks} <@task: celery.chunks of myTest at 0x1bfae596d48>
 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x1bfae596d48>
 'celery.group' = {group} <@task: celery.group of myTest at 0x1bfae596d48>
 'celery.map' = {xmap} <@task: celery.map of myTest at 0x1bfae596d48>
 'celery.chain' = {chain} <@task: celery.chain of myTest at 0x1bfae596d48>
 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x1bfae596d48>

The logic is as follows:

                                                                   +
                                                                   |
                                                                   |
                                                                   v
                                                           +-------+---------------+
                                                           | billiard.pool.Pool    |
                                                           +-------+---------------+
                                                                   |
                                                                   |
    +---------------------------+                                  |
    | TaskHandler               |                                  |
    |                           |                                  | self._taskqueue.put
    |              _taskqueue   |  <-------------------------------+
    |                           |
    +------------+--------------+
                 |
                 |  put(task)                                                     Pool
                 |
 +-------------------------------------------------------------------------------------+
                 |
                 |  get                               billiard.pool.Worker   Sub process
                 v
+----------------+------+           +--------------------------------------------------+
|  workloop             |           | app.tasks                                        |
|                       |           |                                                  |
|       wait_for_job    |           |'celery.chord' =  @task: celery.chord of myTest   |
|                       |           |'celery.chunks' =  @task: celery.chunks of myTest |
|     app.tasks[name] <-------------+'celery.group' =   @task: celery.group of myTest> |
|                       |           | ......                                           |
|                       |           |                                                  |
+-----------------------+           +--------------------------------------------------+

Mobile phones are as follows:

3.3.3 call task

Now that we know which task to call, let's see how to call it.

3.3.3.1 obtaining tasks

As can be seen from the above, the callback function is passed from the parent process, that is

fun = {function} <function _trace_task_ret at 0x000001BFAE53EA68>

_ trace_task_ret is defined in celery \ app \ trace py.

The logic is:

  • Get the application of Celery to the app.

  • Extract the message content and update the Request, such as:

    • request = {dict: 26} 
       'lang' = {str} 'py'
       'task' = {str} 'myTest.add'
       'id' = {str} 'a8928c1e-1e56-4502-9929-80a01b1bbfd8'
       'shadow' = {NoneType} None
       'eta' = {NoneType} None
       'expires' = {NoneType} None
       'group' = {NoneType} None
       'group_index' = {NoneType} None
       'retries' = {int} 0
       'timelimit' = {list: 2} [None, None]
       'root_id' = {str} 'a8928c1e-1e56-4502-9929-80a01b1bbfd8'
       'parent_id' = {NoneType} None
       'argsrepr' = {str} '(2, 8)'
       'kwargsrepr' = {str} '{}'
       'origin' = {str} 'gen17060@DESKTOP-0GO3RPO'
       'reply_to' = {str} '5a520373-7712-3326-9ce8-325df14aa2ad'
       'correlation_id' = {str} 'a8928c1e-1e56-4502-9929-80a01b1bbfd8'
       'hostname' = {str} 'DESKTOP-0GO3RPO'
       'delivery_info' = {dict: 4} {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}
       'args' = {list: 2} [2, 8]
       'kwargs' = {dict: 0} {}
       'is_eager' = {bool} False
       'callbacks' = {NoneType} None
       'errbacks' = {NoneType} None
       'chain' = {NoneType} None
       'chord' = {NoneType} None
       __len__ = {int} 26
      
  • Get the user Task from the Task name

  • Use request to call user Task.

The specific codes are as follows:

def trace_task(task, uuid, args, kwargs, request={}, **opts):
    """Trace task execution."""
    try:
        if task.__trace__ is None:
            task.__trace__ = build_tracer(task.name, task, **opts)
        return task.__trace__(uuid, args, kwargs, request) # Call the method written when strategy is updated


def _trace_task_ret(name, uuid, request, body, content_type,
                    content_encoding, loads=loads_message, app=None,
                    **extra_request):
    
    app = app or current_app._get_current_object()    # Get Celery app
    
    embed = None
    if content_type:
        accept = prepare_accept_content(app.conf.accept_content)
        args, kwargs, embed = loads(
            body, content_type, content_encoding, accept=accept,
        )
    else:
        args, kwargs, embed = body
    
    request.update({
        'args': args, 'kwargs': kwargs,
        'hostname': hostname, 'is_eager': False,
    }, **embed or {})
    
    R, I, T, Rstr = trace_task(app.tasks[name],
                        uuid, args, kwargs, request, app=app)    # Call trace_task execute task
    
    return (1, R, T) if I else (0, Rstr, T)

trace_task_ret = _trace_task_ret

The variable is:

accept = {set: 1} {'application/json'}
app = {Celery} <Celery myTest at 0x1bfae596d48>
args = {list: 2} [2, 8]
body = {bytes: 81} b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'
content_encoding = {str} 'utf-8'
content_type = {str} 'application/json'
embed = {dict: 4} {'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}
extra_request = {dict: 0} {}
kwargs = {dict: 0} {}
loads = {method} <bound method SerializerRegistry.loads of <kombu.serialization.SerializerRegistry object at 0x000001BFAE329408>>
name = {str} 'myTest.add'
request = {dict: 26} {'lang': 'py', 'task': 'myTest.add', 'id': '2c6d431f-a86a-4972-886b-472662401d20', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '2c6d431f-a86a-4972-886b-472662401d20',
uuid = {str} '2c6d431f-a86a-4972-886b-472662401d20'
3.3.3.2 call task

Trace is used when calling_ Task, which is defined as follows:

def trace_task(task, uuid, args, kwargs, request=None, **opts):
    """Trace task execution."""
    request = {} if not request else request
    try:
        if task.__trace__ is None:
            task.__trace__ = build_tracer(task.name, task, **opts)
        return task.__trace__(uuid, args, kwargs, request)

In update_ The method passed in during strategy is,

task.__trace__ = build_tracer(name, task, loader, self.hostname,
                                          app=self.app) 

build_ Part of the parsing of the tracer function is,

def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                 Info=TraceInfo, eager=False, propagate=False, app=None,
                 monotonic=monotonic, truncate=truncate,
                 trace_ok_t=trace_ok_t, IGNORE_STATES=IGNORE_STATES):
  
    fun = task if task_has_custom(task, '__call__') else task.run   # Get the run function corresponding to the task

    ...
    def trace_task(uuid, args, kwargs, request=None):
        # R      - is the possibly prepared return value.
        # I      - is the Info object.
        # T      - runtime
        # Rstr   - textual representation of return value
        # retval - is the always unmodified return value.
        # state  - is the resulting task state.

        # This function is very long because we've unrolled all the calls
        # for performance reasons, and because the function is so long
        # we want the main variables (I, and R) to stand out visually from the
        # the rest of the variables, so breaking PEP8 is worth it ;)
        
        R = I = T = Rstr = retval = state = None
        task_request = None
        time_start = monotonic()
        ...
        # -*- TRACE -*-
            try:
                R = retval = fun(*args, **kwargs) # Execute the corresponding function
                state = SUCCESS
            except Reject as exc:
                    ...
    return trace_task

At this time, the called fun function is the function that the task should have executed (myTest.add). At this time, the corresponding task is executed and the return result of function execution is obtained.

At this point, a consumption process is completed.

Starting from the following, we will introduce some auxiliary functions of Celery, such as load balancing, fault tolerance and so on.

0xFF reference

celery source code analysis - Task initialization and sending tasks

Celery source code analysis III: implementation of Task object

Distributed task queue Celery -- detailed workflow

★★★★★★★ thinking about life and technology ★★★★★★

Wechat public account: Rossi's thinking

If you want to get a personal message or push the technical information, you can scan the following two-dimensional code (or long Click to identify the two-dimensional code) and pay attention to the official account number.

Added by jawinn on Sat, 19 Feb 2022 17:24:43 +0200