[source code analysis] the child process of parallel distributed task queue Celery processes messages
0x00 summary
Celery is a simple, flexible and reliable distributed system that processes a large number of messages. It focuses on asynchronous task queue for real-time processing, and also supports task scheduling. In the previous article, we introduced the celery multithreading model. This article introduces how the subprocess processes messages.
Through this article, you can sort out the following processes:
- How the parent process sends messages to the child process;
- How the child process receives the parent process message;
- How the subprocess parses the message step by step, so as to peel off all kinds of information needed to run the task layer by layer;
- How to run the task after the subprocess gets the task information;
- Why should Celery have all kinds of complex and cumbersome packaging?
0x01 origin
Let's first review the foregoing. Before, there was apply in the Celery work_ Async function is called to Pool, that is, when a user's task message comes, Celery is ready to call to Pool.
def apply_async(self, func, args=(), kwds={},...): if self.threads: self._taskqueue.put(([(TASK, (result._job, None, func, args, kwds))], None)) else: self._quick_put((TASK, (result._job, None, func, args, kwds))) return result
Then, in Billiard / Pool Py you can see here that the Pool will be self_ Taskqueue is used as the medium to pass the message to TaskHandler, and then it will be called to the child process.
class Pool(object): ''' Class which supports an async version of applying functions to arguments. ''' Worker = Worker Supervisor = Supervisor TaskHandler = TaskHandler TimeoutHandler = TimeoutHandler ResultHandler = ResultHandler def __init__(self, processes=None, initializer=None, initargs=(),...): self._task_handler = self.TaskHandler(self._taskqueue, self._quick_put, self._outqueue, self._pool, self._cache) if threads: self._task_handler.start()
At this time, the logic is as shown in the legend above:
+ Consumer | message | v strategy +------------------------------------+ +------------+------+ | strategies | | on_task_received | <--------+ | | | | |[myTest.add : task_message_handler] | +------------+------+ +------------------------------------+ | | +------------------------------------------------------------------------------------+ strategy | | | v Request [myTest.add] +------------+-------------+ +---------------------+ | task_message_handler | <-------------------+ | create_request_cls | | | | | +------------+-------------+ +---------------------+ | _process_task_sem | +------------------------------------------------------------------------------------+ Worker | req[{Request} myTest.add] v +--------+-----------+ | WorkController | | | | pool +-------------------------+ +--------+-----------+ | | | | apply_async v +-----------+----------+ +---+-------------------+ |{Request} myTest.add | +---------------> | TaskPool | +----------------------+ +----+------------------+ myTest.add | | +--------------------------------------------------------------------------------------+ | v +----+------------------+ | billiard.pool.Pool | +-------+---------------+ | | Pool +---------------------------+ | | TaskHandler | | | | | self._taskqueue.put | _taskqueue | <---------------+ | | +------------+--------------+ | | put(task) | +--------------------------------------------------------------------------------------+ | Sub process | v self._inqueue
Mobile phones are as follows:
Note: some students pointed out that there was an error in Celery multithreading on Windows. It is hereby explained.
My environment is complex, including Mac, Linux and windows, and there are multiple operating systems, so I have no choice to switch.
When analyzing part of the code in this article, you only have Windows, so you can only post the specific debugging variables under Windows.
In fact, the specific OS is not important. The important thing is to analyze the design idea of Celery through code.
So we followed the taskqueue to the TaskHandler.
0x02 parent process TaskHandler
This section describes how the parent process passes task messages to the child process.
At this time, it is still the parent process. The code location is: \ billiard \ pool py. The specific stack is:
_send_bytes, connection.py:314 send, connection.py:233 body, pool.py:596 run, pool.py:504 _bootstrap_inner, threading.py:926 _bootstrap, threading.py:890
The variables are:
self = {TaskHandler} <TaskHandler(Thread-16, started daemon 14980)> additional_info = {PyDBAdditionalThreadInfo} State:2 Stop:None Cmd: 107 Kill:False cache = {dict: 1} {0: <%s: 0 ack:False ready:False>} daemon = {bool} True name = {str} 'Thread-16' outqueue = {SimpleQueue} <billiard.queues.SimpleQueue object at 0x000001E2C07DD6C8> pool = {list: 8} [<SpawnProcess(SpawnPoolWorker-1, started daemon)>, <SpawnProcess(SpawnPoolWorker-2, started daemon)>, <SpawnProcess(SpawnPoolWorker-3, started daemon)>, <SpawnProcess(SpawnPoolWorker-4, started daemon)>, <SpawnProcess(SpawnPoolWorker-5, started daemon)>, <SpawnProcess(SpawnPoolWorker-6, started daemon)>, <SpawnProcess(SpawnPoolWorker-7, started daemon)>, <SpawnProcess(SpawnPoolWorker-8, started daemon)>] taskqueue = {Queue} <queue.Queue object at 0x000001E2C07DD208> _args = {tuple: 0} () _children = {WeakKeyDictionary: 0} <WeakKeyDictionary at 0x1e2c0883448> _daemonic = {bool} True _kwargs = {dict: 0} {} _name = {str} 'Thread-16' _parent = {_MainThread} <_MainThread(MainThread, started 13408)> _pid = {NoneType} None _start_called = {bool} True _started = {Event} <threading.Event object at 0x000001E2C0883D88> _state = {int} 0 _stderr = {LoggingProxy} <celery.utils.log.LoggingProxy object at 0x000001E2C07DD188> _target = {NoneType} None _tstate_lock = {lock} <locked _thread.lock object at 0x000001E2C081FDB0> _was_started = {bool} True
2.1 sending messages
When the parent process receives the task message, it calls put(task) to send a message to the pipeline between the parent process and the child process.
Note that because the previous assignment code is:
self._taskqueue = Queue() def _setup_queues(self): self._inqueue = Queue() self._outqueue = Queue() self._quick_put = self._inqueue.put self._quick_get = self._outqueue.get
In other words, if you receive a message inside the TaskHandler, you can use self_ inqueue. The put function of this pipeline sends messages to its child processes. self._taskqueue is just an intermediate variable medium.
So the variables are as follows:
put = {method} <bound method _ConnectionBase.send of <billiard.connection.PipeConnection object at 0x000001E2C07DD2C8>> self = {TaskHandler} <TaskHandler(Thread-16, started daemon 14980)> task = {tuple: 2} 0 = {int} 2 1 = {tuple: 5} (0, None, <function _trace_task_ret at 0x000001E2BFCA3438>, ('myTest.add', 'dee72291-5614-4106-a7bf-007023286e9e', {'lang': 'py', 'task': 'myTest.add', 'id': 'dee72291-5614-4106-a7bf-007023286e9e', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'dee72291-5614-4106-a7bf-007023286e9e', 'parent_id': None, 'argsrepr': '(2, 8)', 'kwargsrepr': '{}', 'origin': 'gen17456@DESKTOP-0GO3RPO', 'reply_to': '21660796-c7e7-3736-9d42-e1be6ff7eaa8', 'correlation_id': 'dee72291-5614-4106-a7bf-007023286e9e', 'hostname': 'celery@DESKTOP-0GO3RPO', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 8], 'kwargs': {}}, b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8'), {}) __len__ = {int} 2 taskqueue = {Queue} <queue.Queue object at 0x000001E2C07DD208>
The specific code is as follows: Send a message to the pipeline and notify the result handler and other worker s:
class TaskHandler(PoolThread): def __init__(self, taskqueue, put, outqueue, pool, cache): self.taskqueue = taskqueue self.put = put self.outqueue = outqueue self.pool = pool self.cache = cache super(TaskHandler, self).__init__() def body(self): cache = self.cache taskqueue = self.taskqueue put = self.put for taskseq, set_length in iter(taskqueue.get, None): task = None i = -1 try: for i, task in enumerate(taskseq): try: put(task) break self.tell_others()
2.2 other notices
tell_others is used to notify the result handler and other worker s.
def tell_others(self): outqueue = self.outqueue put = self.put pool = self.pool try: # tell result handler to finish when cache is empty outqueue.put(None) # tell workers there is no more work for p in pool: put(None)
0x03 sub process worker
This section describes how the Worker subprocess accepts tasks and executes them.
Now that the task message has been sent to the child process through the pipeline, the execution comes to the child process. Note that self is billiard pool. Worker.
3.1 sub process loop
In worker, the specific logic of message loop (multiple parsing of messages) is:
- Call wait_for_job to wait for the message written by the parent process to the pipeline;
- After the user message req is obtained, it is parsed: type_, args = req;
- If you need to send an ACK, send it;
- For the parsed args, parse again: job, i, fun, args, kwargs = args_, Get the job, the function to be executed by the sub process, the parameters of the function, and so on;
- If you need to wait_for_syn, processing;
- Indirectly call the user-defined function result = (true, prepare_result (fun (* args, * * kwargs)) through fun, and return result. It should be noted that the fun here is_ trace_task_ret, user-defined function by_ trace_task_ret internal call;
- Carry out subsequent processing, such as sending READY to the parent process;
The code is as follows:
def workloop(self, debug=debug, now=monotonic, pid=None): pid = pid or os.getpid() put = self.outq.put inqW_fd = self.inqW_fd synqW_fd = self.synqW_fd maxtasks = self.maxtasks prepare_result = self.prepare_result wait_for_job = self.wait_for_job _wait_for_syn = self.wait_for_syn def wait_for_syn(jid): i = 0 while 1: if i > 60: error('!!!WAIT FOR ACK TIMEOUT: job:%r fd:%r!!!', jid, self.synq._reader.fileno(), exc_info=1) req = _wait_for_syn() if req: type_, args = req # Parse the message req sent by the user if type_ == NACK: return False assert type_ == ACK return True i += 1 completed = 0 try: while maxtasks is None or (maxtasks and completed < maxtasks): req = wait_for_job() if req: type_, args_ = req assert type_ == TASK job, i, fun, args, kwargs = args_ # Parse again to get the variable. The fun here is`_ trace_task_ret `, the user-defined function is defined by`_ trace_task_ret ` internal call put((ACK, (job, i, now(), pid, synqW_fd))) if _wait_for_syn: confirm = wait_for_syn(job) if not confirm: continue # received NACK result = (True, prepare_result(fun(*args, **kwargs))) put((READY, (job, i, result, inqW_fd))) completed += 1 if max_memory_per_child > 0: used_kb = mem_rss() if used_kb > 0 and used_kb > max_memory_per_child: warning(MAXMEM_USED_FMT.format( used_kb, max_memory_per_child)) return EX_RECYCLE if maxtasks: return EX_RECYCLE if completed == maxtasks else EX_FAILURE return EX_OK finally: self._ensure_messages_consumed(completed=completed)
At this time, the variables are as follows. The req variable is the message sent by the parent process through the pipeline, and the child process will initially resolve to args_:
prepare_result = {method} <bound method Worker.prepare_result of <billiard.pool.Worker object at 0x000001BFAE5AE308>> put = {method} <bound method _SimpleQueue.put of <billiard.queues.SimpleQueue object at 0x000001BFAE1BE7C8>> type_ = 2 // In pool TASK = 2 is defined in PY req = {tuple: 2} (2, (6, None, <function _trace_task_ret at 0x000001BFAE53EA68>, ('myTest.add', '2c6d431f-a86a-4972-886b-472662401d20', {'lang': 'py', 'task': 'myTest.add', 'id': '2c6d431f-a86a-4972-886b-472662401d20', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '2c6d431f-a86a-4972-886b-472662401d20', 'parent_id': None, 'argsrepr': '(2, 8)', 'kwargsrepr': '{}', 'origin': 'gen14656@DESKTOP-0GO3RPO', 'reply_to': '3c9cc3a7-65d6-349b-ba66-399dc47b7cad', 'correlation_id': '2c6d431f-a86a-4972-886b-472662401d20', 'hostname': 'DESKTOP-0GO3RPO', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 8], 'kwargs': {}, 'is_eager': False, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}, b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8'), {})) self = {Worker} <billiard.pool.Worker object at 0x000001BFAE5AE308> kwargs = {dict: 0} {} args_ = (6, None, <function _trace_task_ret at 0x000001BFAE53EA68>, ('myTest.add', '2c6d431f-a86a-4972-886b-472662401d20', {'lang': 'py', 'task': 'myTest.add', 'id': '2c6d431f-a86a-4972-886b-472662401d20', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '2c6d431f-a86a-4972-886b-472662401d20', 'parent_id': None, 'argsrepr': '(2, 8)', 'kwargsrepr': '{}', 'origin': 'gen14656@DESKTOP-0GO3RPO', 'reply_to': '3c9cc3a7-65d6-349b-ba66-399dc47b7cad', 'correlation_id': '2c6d431f-a86a-4972-886b-472662401d20', 'hostname': 'DESKTOP-0GO3RPO', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 8], 'kwargs': {}, 'is_eager': False, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}, b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8'), {}))
For the previous logic diagram, our logic diagram is as follows:
+ | | v +----+------------------+ | billiard.pool.Pool | +-------+---------------+ | | Pool +---------------------------+ | | TaskHandler | | | | | self._taskqueue.put | _taskqueue | <---------------+ | | +------------+--------------+ | | put(task) | +--------------------------------------------------------------------------------------+ | billiard.pool.Worker | get Sub process v +----------+-----------------------------+ | workloop | | | | | | wait_for_job | | | +----------------------------------------+
Mobile phones are as follows:
3.2 get parent process message
wait_ for_ The job function is finally called to_ make_recv_method is to use the read function of pipe conn to process.
What you read is the message req passed from the parent process. See the previous section for details.
Review the write message content of the parent process:
put = {method} <bound method _ConnectionBase.send of <billiard.connection.PipeConnection object at 0x000001E2C07DD2C8>> self = {TaskHandler} <TaskHandler(Thread-16, started daemon 14980)> task = {tuple: 2} 0 = {int} 2 1 = {tuple: 5} (0, None, <function _trace_task_ret at 0x000001E2BFCA3438>, ('myTest.add', 'dee72291-5614-4106-a7bf-007023286e9e', {'lang': 'py', 'task': 'myTest.add', 'id': 'dee72291-5614-4106-a7bf-007023286e9e', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'dee72291-5614-4106-a7bf-007023286e9e', 'parent_id': None, 'argsrepr': '(2, 8)', 'kwargsrepr': '{}', 'origin': 'gen17456@DESKTOP-0GO3RPO', 'reply_to': '21660796-c7e7-3736-9d42-e1be6ff7eaa8', 'correlation_id': 'dee72291-5614-4106-a7bf-007023286e9e', 'hostname': 'celery@DESKTOP-0GO3RPO', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 8], 'kwargs': {}}, b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8'), {}) __len__ = {int} 2
You can see that the content written by the parent process is read by the child process. The specific subprocess is through_ make_recv_method to read the message is to use the read function of pipe conn to process it.
This is the subprocess.
def _make_recv_method(self, conn): get = conn.get if hasattr(conn, '_reader'): _poll = conn._reader.poll if hasattr(conn, 'get_payload') and conn.get_payload: get_payload = conn.get_payload def _recv(timeout, loads=pickle_loads): return True, loads(get_payload()) else: def _recv(timeout): # noqa if _poll(timeout): return True, get() return False, None else: def _recv(timeout): # noqa try: return True, get(timeout=timeout) except Queue.Empty: return False, None return _recv
3.3 parsing messages
After the subprocess reads the message, it parses it. job, i, fun, args, kwargs = args_
In fact, args_ Analyze the contents of one by one.
args_ = (6, None, <function _trace_task_ret at 0x000001BFAE53EA68>, ('myTest.add', '2c6d431f-a86a-4972-886b-472662401d20', {'lang': 'py', 'task': 'myTest.add', 'id': '2c6d431f-a86a-4972-886b-472662401d20', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '2c6d431f-a86a-4972-886b-472662401d20', 'parent_id': None, 'argsrepr': '(2, 8)', 'kwargsrepr': '{}', 'origin': 'gen14656@DESKTOP-0GO3RPO', 'reply_to': '3c9cc3a7-65d6-349b-ba66-399dc47b7cad', 'correlation_id': '2c6d431f-a86a-4972-886b-472662401d20', 'hostname': 'DESKTOP-0GO3RPO', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 8], 'kwargs': {}, 'is_eager': False, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}, b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8'), {}))
So we get:
job = {int} 6 i = {NoneType} None fun = {function} <function _trace_task_ret at 0x000001BFAE53EA68> kwargs = {dict: 0} {} args = {tuple: 6} 0 = {str} 'myTest.add' 1 = {str} '2c6d431f-a86a-4972-886b-472662401d20' 2 = {dict: 26} {'lang': 'py', 'task': 'myTest.add', 'id': '2c6d431f-a86a-4972-886b-472662401d20', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '2c6d431f-a86a-4972-886b-472662401d20', 3 = {bytes: 81} b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' 4 = {str} 'application/json' 5 = {str} 'utf-8' __len__ = {int} 6
In this way, the child process will know what function it needs to call (here is myTest.add) and what parameters the function has (here is (2, 8)).
Let's sort out the message reading and parsing process:
- Parent process writes task
- The subprocess is read as req
- The subprocess resolves req to type_, args_
- Subprocess parsing args_ They are: job, i, fun, args, kwargs. The fun here is_ trace_task_ret, user-defined function by_ trace_task_ret internal call.
- args contains user-defined functions and their parameters;
3.3.1 configuration of callback function in parent process
As mentioned just now, the fun parsed for the first time is_ trace_task_ret, user-defined function by_ trace_task_ret internal call.
We need to see where the callback function fun is configured in the parent process.
As we know from the above, when a task is accepted, the task_message_handler uses multiple processes through the Rqeust class.
Note: the Worker scope in this figure is Celery / apps / worker Py, which belongs to the logical category of celery, is not a related concept of sub process. There are multiple classes with the same name in celery, which is very tangled.
+ Consumer | message | v strategy +------------------------------------+ +------------+------+ | strategies | | on_task_received | <--------+ | | | | |[myTest.add : task_message_handler] | +------------+------+ +------------------------------------+ | | +------------------------------------------------------------------------------------+ strategy | | | v Request [myTest.add] +------------+-------------+ +---------------------+ | task_message_handler | <-------------------+ | create_request_cls | | | | | +------------+-------------+ +---------------------+ | _process_task_sem | +--------------------------------------------------------------------------------------+ Worker | req[{Request} myTest.add] v +--------+-----------+ | WorkController | | | apply_async | pool +-------------------------+ +--------+-----------+ | | | | v +-----------+----------+ +---+-------+ |{Request} myTest.add | +---------------> | TaskPool | +----------------------+ +-----------+ myTest.add
Mobile phones are as follows:
Apply called at this time_ Async is actually pool apply_async method.
Execute in the Request class_ using_ In pool, we found that pool apply_ The async parameter is trace_task_ret, so I know, trace_task_ret must be the parameter passed by the parent process.
class Request: """A request for task execution.""" def execute_using_pool(self, pool, **kwargs): """Used by the worker to send this task to the pool. """ result = pool.apply_async( trace_task_ret, # Here it is args=(self._type, task_id, self._request_dict, self._body, self._content_type, self._content_encoding), # User defined functions are included here accept_callback=self.on_accepted, timeout_callback=self.on_timeout, callback=self.on_success, error_callback=self.on_failure, soft_timeout=soft_time_limit or task.soft_time_limit, timeout=time_limit or task.time_limit, correlation_id=task_id, ) # cannot create weakref to None self._apply_result = maybe(ref, result) return result
Call function
As we know from the above, the calling function of Pool is:_ trace_task_ret, i.e_ trace_task_ret is a unified outer encapsulation of user functions. For Pool, call_ trace_task_ret_ trace_task_ret will call user functions internally.
Why not directly call the user function mytest add? But use_ trace_task_ret another layer? From the name with trace, we can see that this is a comprehensive compromise of scalability, debugging, trace and running speed.
There are two core codes:
3.3.1 get the Celery application
The first key point is to obtain the Celery application set in the sub process in advance. The code is as follows:
app = app or current_app._get_current_object()
Here is a question: how can the child process get the Celery application in the parent process.
Although in some multi process mechanisms, the variables of the parent process will be copied to the child process, this is not certain, so there must be a mechanism for the parent process to set the Celery application to the child process.
For details about how the parent process configures the Celery application for the child process and how the child process obtains the detailed analysis of this application, see the previous article.
3.3.2 obtaining tasks
The second key point is: how to obtain and implement the registered task. The code is as follows:
R, I, T, Rstr = trace_task(app.tasks[name], uuid, args, kwargs, request, app=app)
Among them, app Tasks are pre registered variables, which are all tasks in the gallery, including built-in tasks and user tasks.
So app Tasks [name] is to get the corresponding task itself through the task name.
app.tasks = {TaskRegistry: 9} NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x1bfae596d48> 'celery.chord' = {chord} <@task: celery.chord of myTest at 0x1bfae596d48> 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x1bfae596d48> 'celery.chunks' = {chunks} <@task: celery.chunks of myTest at 0x1bfae596d48> 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x1bfae596d48> 'celery.group' = {group} <@task: celery.group of myTest at 0x1bfae596d48> 'celery.map' = {xmap} <@task: celery.map of myTest at 0x1bfae596d48> 'celery.chain' = {chain} <@task: celery.chain of myTest at 0x1bfae596d48> 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x1bfae596d48>
The logic is as follows:
+ | | v +-------+---------------+ | billiard.pool.Pool | +-------+---------------+ | | +---------------------------+ | | TaskHandler | | | | | self._taskqueue.put | _taskqueue | <-------------------------------+ | | +------------+--------------+ | | put(task) Pool | +-------------------------------------------------------------------------------------+ | | get billiard.pool.Worker Sub process v +----------------+------+ +--------------------------------------------------+ | workloop | | app.tasks | | | | | | wait_for_job | |'celery.chord' = @task: celery.chord of myTest | | | |'celery.chunks' = @task: celery.chunks of myTest | | app.tasks[name] <-------------+'celery.group' = @task: celery.group of myTest> | | | | ...... | | | | | +-----------------------+ +--------------------------------------------------+
Mobile phones are as follows:
3.3.3 call task
Now that we know which task to call, let's see how to call it.
3.3.3.1 obtaining tasks
As can be seen from the above, the callback function is passed from the parent process, that is
fun = {function} <function _trace_task_ret at 0x000001BFAE53EA68>
_ trace_task_ret is defined in celery \ app \ trace py.
The logic is:
-
Get the application of Celery to the app.
-
Extract the message content and update the Request, such as:
-
request = {dict: 26} 'lang' = {str} 'py' 'task' = {str} 'myTest.add' 'id' = {str} 'a8928c1e-1e56-4502-9929-80a01b1bbfd8' 'shadow' = {NoneType} None 'eta' = {NoneType} None 'expires' = {NoneType} None 'group' = {NoneType} None 'group_index' = {NoneType} None 'retries' = {int} 0 'timelimit' = {list: 2} [None, None] 'root_id' = {str} 'a8928c1e-1e56-4502-9929-80a01b1bbfd8' 'parent_id' = {NoneType} None 'argsrepr' = {str} '(2, 8)' 'kwargsrepr' = {str} '{}' 'origin' = {str} 'gen17060@DESKTOP-0GO3RPO' 'reply_to' = {str} '5a520373-7712-3326-9ce8-325df14aa2ad' 'correlation_id' = {str} 'a8928c1e-1e56-4502-9929-80a01b1bbfd8' 'hostname' = {str} 'DESKTOP-0GO3RPO' 'delivery_info' = {dict: 4} {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None} 'args' = {list: 2} [2, 8] 'kwargs' = {dict: 0} {} 'is_eager' = {bool} False 'callbacks' = {NoneType} None 'errbacks' = {NoneType} None 'chain' = {NoneType} None 'chord' = {NoneType} None __len__ = {int} 26
-
-
Get the user Task from the Task name
-
Use request to call user Task.
The specific codes are as follows:
def trace_task(task, uuid, args, kwargs, request={}, **opts): """Trace task execution.""" try: if task.__trace__ is None: task.__trace__ = build_tracer(task.name, task, **opts) return task.__trace__(uuid, args, kwargs, request) # Call the method written when strategy is updated def _trace_task_ret(name, uuid, request, body, content_type, content_encoding, loads=loads_message, app=None, **extra_request): app = app or current_app._get_current_object() # Get Celery app embed = None if content_type: accept = prepare_accept_content(app.conf.accept_content) args, kwargs, embed = loads( body, content_type, content_encoding, accept=accept, ) else: args, kwargs, embed = body request.update({ 'args': args, 'kwargs': kwargs, 'hostname': hostname, 'is_eager': False, }, **embed or {}) R, I, T, Rstr = trace_task(app.tasks[name], uuid, args, kwargs, request, app=app) # Call trace_task execute task return (1, R, T) if I else (0, Rstr, T) trace_task_ret = _trace_task_ret
The variable is:
accept = {set: 1} {'application/json'} app = {Celery} <Celery myTest at 0x1bfae596d48> args = {list: 2} [2, 8] body = {bytes: 81} b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' content_encoding = {str} 'utf-8' content_type = {str} 'application/json' embed = {dict: 4} {'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None} extra_request = {dict: 0} {} kwargs = {dict: 0} {} loads = {method} <bound method SerializerRegistry.loads of <kombu.serialization.SerializerRegistry object at 0x000001BFAE329408>> name = {str} 'myTest.add' request = {dict: 26} {'lang': 'py', 'task': 'myTest.add', 'id': '2c6d431f-a86a-4972-886b-472662401d20', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '2c6d431f-a86a-4972-886b-472662401d20', uuid = {str} '2c6d431f-a86a-4972-886b-472662401d20'
3.3.3.2 call task
Trace is used when calling_ Task, which is defined as follows:
def trace_task(task, uuid, args, kwargs, request=None, **opts): """Trace task execution.""" request = {} if not request else request try: if task.__trace__ is None: task.__trace__ = build_tracer(task.name, task, **opts) return task.__trace__(uuid, args, kwargs, request)
In update_ The method passed in during strategy is,
task.__trace__ = build_tracer(name, task, loader, self.hostname, app=self.app)
build_ Part of the parsing of the tracer function is,
def build_tracer(name, task, loader=None, hostname=None, store_errors=True, Info=TraceInfo, eager=False, propagate=False, app=None, monotonic=monotonic, truncate=truncate, trace_ok_t=trace_ok_t, IGNORE_STATES=IGNORE_STATES): fun = task if task_has_custom(task, '__call__') else task.run # Get the run function corresponding to the task ... def trace_task(uuid, args, kwargs, request=None): # R - is the possibly prepared return value. # I - is the Info object. # T - runtime # Rstr - textual representation of return value # retval - is the always unmodified return value. # state - is the resulting task state. # This function is very long because we've unrolled all the calls # for performance reasons, and because the function is so long # we want the main variables (I, and R) to stand out visually from the # the rest of the variables, so breaking PEP8 is worth it ;) R = I = T = Rstr = retval = state = None task_request = None time_start = monotonic() ... # -*- TRACE -*- try: R = retval = fun(*args, **kwargs) # Execute the corresponding function state = SUCCESS except Reject as exc: ... return trace_task
At this time, the called fun function is the function that the task should have executed (myTest.add). At this time, the corresponding task is executed and the return result of function execution is obtained.
At this point, a consumption process is completed.
Starting from the following, we will introduce some auxiliary functions of Celery, such as load balancing, fault tolerance and so on.
0xFF reference
celery source code analysis - Task initialization and sending tasks
Celery source code analysis III: implementation of Task object
Distributed task queue Celery -- detailed workflow
★★★★★★★ thinking about life and technology ★★★★★★
Wechat public account: Rossi's thinking
If you want to get a personal message or push the technical information, you can scan the following two-dimensional code (or long Click to identify the two-dimensional code) and pay attention to the official account number.