preface
Celery is a simple, flexible and reliable distributed system, which is used to process a large number of messages and provide the tools required for operation and maintenance of such systems. It is a task queue, which focuses on real-time processing and supports task scheduling.
You can use scenarios such as:
- Send e-mail asynchronously. At this time, you only need to submit the task to celery. Then, the worker will send e-mail
- The task of running batch interface takes a long time, and it can also be made into asynchronous task at this time
- Scheduled scheduling tasks, etc
Introduction to Celery
Celery plays the role of producer and consumer. Let's first understand what is the producer consumer model.
The model also needs a buffer between producers and consumers as an intermediary. The producer puts the data into the buffer, and the consumer takes the data out of the buffer, as shown in the figure below:
Next, we need to find out several questions: who produces data (Task), who is the middleware (Broker), who consumes data (Worker), and where is the back end after consumption?
It's clear from the picture below
Five roles of celery
- Task s are tasks, including asynchronous tasks and celery beats
- The Broker broker receives the message from the producer, that is, the Task, and stores the Task in the queue. The consumer of the Task is Worker. Celery itself does not provide queue services. Redis or RabbitMQ is recommended to implement queue services.
- Worker is the unit that executes tasks. It monitors the message queue in real time. If there is a task, it obtains the task and executes it.
- The Beat timing task scheduler sends tasks to the Broker according to the configuration timing.
- Backend is used to store the execution results of tasks.
Use environment
Celery itself does not provide queue services. Redis or RabbitMQ is recommended to implement queue services. Then you need to install middleware such as redis first
docker pull redis:latest docker run -itd --name redis-test -p 6379:6379 redis
There is no password above. Use the following sentence to set the password
docker run -itd --name myredis -p 6379:6379 redis --requirepass "123456" --restart=always --appendonly yes
pip installation dependent packages
pip install celery==3.1.26.post2 pip install redis==2.10.6
Task
First write the simplest demo and create a new tasks.py file. The task task task needs to use @ shared_task decorator
from celery import Celery from celery import shared_task # Instantiate and add the broker address app = Celery('tasks', broker='redis://ip:6379') @shared_task def add(x, y): return x + y
Open the directory where tasks.py is located and start the worker. The - A parameter indicates the name of the Celery APP, which refers to tasks.py.
worker is a role that performs tasks. The log type of the log that follows loglevel=info is info by default.
celery -A tasks worker --loglevel=info
Operation results
D:\demo\demo\aaa>celery -A tasks worker --loglevel=info [2021-10-19 09:12:01,168: WARNING/MainProcess] e:\python36\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning: Starting from version 3.2 Celery will refuse to accept pickle by default. The pickle serializer is a security concern as it may give attackers the ability to execute any command. It's important to secure your broker from unauthorized access when using pickle, so we think that enabling pickle should require a deliberate action and not be the default choice. If you depend on pickle then you should set a setting to disable this warning and to be sure that everything will continue working when you upgrade to Celery 3.2:: CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] You must only enable the serializers that you will actually use. warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED)) -------------- celery@DESKTOP-HJ487C8 v3.1.26.post2 (Cipater) ---- **** ----- --- * *** * -- Windows-10-10.0.17134-SP0 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: tasks:0x2148e024c50 - ** ---------- .> transport: redis://localhost:6379// - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> celery exchange=celery(direct) key=celery [tasks] . tasks.add [2021-10-19 09:12:01,276: INFO/MainProcess] Connected to redis://localhost:6379// [2021-10-19 09:12:01,365: INFO/MainProcess] mingle: searching for neighbors [2021-10-19 09:12:02,761: INFO/MainProcess] mingle: all alone [2021-10-19 09:12:03,313: WARNING/MainProcess] celery@DESKTOP-HJ487C8 ready.
From the running log, you can see that there are tasks
[tasks] . tasks.add
Seeing Connected to redis indicates that the connection has been successful
Trigger task (delay)
The task has been created, so when to trigger the task? We need to trigger the task in the code, and then continue to write the above code
@shared_task def add(x, y): return x + y if __name__ == '__main__': # Trigger task res = add.delay(10, 15) print(res) print(type(res)) # AsyncResult
Operation results
7492f49b-6735-46fb-a16d-9ec24bd31e56 <class 'celery.result.AsyncResult'>
Through the add task, call the. delay() method to trigger a task and return the AsyncResult class. Then the executed task results are in the AsyncResult class
When running, check the worker running log to see that the Received task has been received, and each task will generate a uuid task_id, not repeated
[2021-10-19 09:24:14,356: INFO/MainProcess] Received task: tasks.add[885a79ba-c87c-49f7-a23f-0824142c3c98] [2021-10-19 09:24:14,395: INFO/MainProcess] Task tasks.add[885a79ba-c87c-49f7-a23f-0824142c3c98] succeeded in 0.046999999998661224s: 25
The worker will automatically listen to the pushed task and execute it. You can see the execution result 'succeeded'
backend task results
After calling the. delay() method to trigger the task, return the AsyncResult class to view the status, id and result of the task
D:\demo\demo\aaa>python Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32 Type "help", "copyright", "credits" or "license" for more information. >>> from tasks import add >>> res = add.delay(10, 15) >>> res.task_id '6a7c8e10-7192-4865-9108-3e98596b9d37' >>> >>> res.status Traceback (most recent call last): File "<stdin>", line 1, in <module> File "E:\python36\lib\site-packages\celery\result.py", line 394, in state return self._get_task_meta()['status'] File "E:\python36\lib\site-packages\celery\result.py", line 339, in _get_task_meta return self._maybe_set_cache(self.backend.get_task_meta(self.id)) File "E:\python36\lib\site-packages\celery\backends\base.py", line 307, in get_task_meta meta = self._get_task_meta_for(task_id) AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for' >>>
A task will be generated after the task is executed_ ID, when viewing the running status of the task, you will find an exception AttributeError: 'DisabledBackend' object has no attribute '_ get_task_meta_for’
This is because the running results of the task need to be saved to a backend. However, when instantiating, only one broker address is configured, and the backend address is not configured to receive the running results
from celery import Celery from celery import shared_task # backend receives the execution result app = Celery('tasks', broker='redis://ip:6379', backend='redis://ip:6379') @shared_task def add(x, y): return x + y
After reconfiguration, be sure to restart worker listening
celery -A tasks worker --loglevel=info
In the startup log [config], you will see that the results item has been configured successfully
D:\demo\demo\aaa>python Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32 Type "help", "copyright", "credits" or "license" for more information. >>> from tasks import add >>> res = add.delay(10,15) >>> res.task_id '5ce249c9-a15b-426a-949b-d1b94bf9f8fa' >>> >>> res.status 'SUCCESS' >>> >>> res.get() 25
Several common attributes
- res.task_id: if the task id is unique, you can get the result according to the id
- res.status task status: PENDING, STARTED, RETRY, FAILURE, SUCCESS
- res.get() task running results. Only when the task status is' SUCCESS' can there be running results
- r.successful() returns a Boolean value. Successful execution returns True
- r.ready() # returns a Boolean value. When the task is completed, it returns True. Otherwise, it returns False
- r.wait() # wait for the task to complete and return the task execution result
- r.result # task execution result
- r.state # is the same as res.status. Task status: PENDING, START, SUCCESS
AsyncResult get results
When a task is triggered, you will get a task_id, but we won't always query the status status to get the results. We may look at the running results after a while.
So you already know the task_ In the case of ID, how to query the status and results? You can use the AsyncResult class
from celery.result import AsyncResult res = AsyncResult(id='5ce249c9-a15b-426a-949b-d1b94bf9f8fa') print(res.state) # 'SUCCESS' print(res.get()) # 25
For use with django, refer to the previous article https://www.cnblogs.com/yoyoketang/p/15422804.html
More reference tutorials https://blog.csdn.net/u010339879/article/details/97691231
More reference tutorials https://www.pianshen.com/article/2176289575/