python test development django-158.celery learning and use

preface

Celery is a simple, flexible and reliable distributed system, which is used to process a large number of messages and provide the tools required for operation and maintenance of such systems. It is a task queue, which focuses on real-time processing and supports task scheduling.
You can use scenarios such as:

  • Send e-mail asynchronously. At this time, you only need to submit the task to celery. Then, the worker will send e-mail
  • The task of running batch interface takes a long time, and it can also be made into asynchronous task at this time
  • Scheduled scheduling tasks, etc

Introduction to Celery

Celery plays the role of producer and consumer. Let's first understand what is the producer consumer model.
The model also needs a buffer between producers and consumers as an intermediary. The producer puts the data into the buffer, and the consumer takes the data out of the buffer, as shown in the figure below:

Next, we need to find out several questions: who produces data (Task), who is the middleware (Broker), who consumes data (Worker), and where is the back end after consumption?

It's clear from the picture below

Five roles of celery

  • Task s are tasks, including asynchronous tasks and celery beats
  • The Broker broker receives the message from the producer, that is, the Task, and stores the Task in the queue. The consumer of the Task is Worker. Celery itself does not provide queue services. Redis or RabbitMQ is recommended to implement queue services.
  • Worker is the unit that executes tasks. It monitors the message queue in real time. If there is a task, it obtains the task and executes it.
  • The Beat timing task scheduler sends tasks to the Broker according to the configuration timing.
  • Backend is used to store the execution results of tasks.

Use environment

Celery itself does not provide queue services. Redis or RabbitMQ is recommended to implement queue services. Then you need to install middleware such as redis first

docker pull redis:latest
docker run -itd --name redis-test -p 6379:6379 redis

There is no password above. Use the following sentence to set the password

docker run -itd --name myredis   -p 6379:6379 redis --requirepass "123456" --restart=always --appendonly yes 

pip installation dependent packages

pip install celery==3.1.26.post2
pip install redis==2.10.6

Task

First write the simplest demo and create a new tasks.py file. The task task task needs to use @ shared_task decorator

from celery import Celery
from celery import shared_task

# Instantiate and add the broker address
app = Celery('tasks', broker='redis://ip:6379')


@shared_task
def add(x, y):
    return x + y

Open the directory where tasks.py is located and start the worker. The - A parameter indicates the name of the Celery APP, which refers to tasks.py.
worker is a role that performs tasks. The log type of the log that follows loglevel=info is info by default.

celery -A tasks worker --loglevel=info

Operation results

D:\demo\demo\aaa>celery -A tasks worker --loglevel=info
[2021-10-19 09:12:01,168: WARNING/MainProcess] e:\python36\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.


  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

 -------------- celery@DESKTOP-HJ487C8 v3.1.26.post2 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.17134-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x2148e024c50
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2021-10-19 09:12:01,276: INFO/MainProcess] Connected to redis://localhost:6379//
[2021-10-19 09:12:01,365: INFO/MainProcess] mingle: searching for neighbors
[2021-10-19 09:12:02,761: INFO/MainProcess] mingle: all alone
[2021-10-19 09:12:03,313: WARNING/MainProcess] celery@DESKTOP-HJ487C8 ready.

From the running log, you can see that there are tasks

[tasks]
  . tasks.add

Seeing Connected to redis indicates that the connection has been successful

Trigger task (delay)

The task has been created, so when to trigger the task? We need to trigger the task in the code, and then continue to write the above code

@shared_task
def add(x, y):
    return x + y

if __name__ == '__main__':
    # Trigger task
    res = add.delay(10, 15)
    print(res)
    print(type(res))  # AsyncResult

Operation results

7492f49b-6735-46fb-a16d-9ec24bd31e56
<class 'celery.result.AsyncResult'>

Through the add task, call the. delay() method to trigger a task and return the AsyncResult class. Then the executed task results are in the AsyncResult class

When running, check the worker running log to see that the Received task has been received, and each task will generate a uuid task_id, not repeated

[2021-10-19 09:24:14,356: INFO/MainProcess] Received task: tasks.add[885a79ba-c87c-49f7-a23f-0824142c3c98]
[2021-10-19 09:24:14,395: INFO/MainProcess] Task tasks.add[885a79ba-c87c-49f7-a23f-0824142c3c98] succeeded in 0.046999999998661224s: 25

The worker will automatically listen to the pushed task and execute it. You can see the execution result 'succeeded'

backend task results

After calling the. delay() method to trigger the task, return the AsyncResult class to view the status, id and result of the task

D:\demo\demo\aaa>python
Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.

>>> from tasks import add
>>> res = add.delay(10, 15)
>>> res.task_id
'6a7c8e10-7192-4865-9108-3e98596b9d37'
>>>
>>> res.status
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "E:\python36\lib\site-packages\celery\result.py", line 394, in state
    return self._get_task_meta()['status']
  File "E:\python36\lib\site-packages\celery\result.py", line 339, in _get_task_meta
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
  File "E:\python36\lib\site-packages\celery\backends\base.py", line 307, in get_task_meta
    meta = self._get_task_meta_for(task_id)
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
>>>

A task will be generated after the task is executed_ ID, when viewing the running status of the task, you will find an exception AttributeError: 'DisabledBackend' object has no attribute '_ get_task_meta_for’
This is because the running results of the task need to be saved to a backend. However, when instantiating, only one broker address is configured, and the backend address is not configured to receive the running results

from celery import Celery
from celery import shared_task
# backend receives the execution result
app = Celery('tasks',
             broker='redis://ip:6379',
             backend='redis://ip:6379')


@shared_task
def add(x, y):
    return x + y

After reconfiguration, be sure to restart worker listening

celery -A tasks worker --loglevel=info


In the startup log [config], you will see that the results item has been configured successfully

D:\demo\demo\aaa>python
Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import add
>>> res = add.delay(10,15)
>>> res.task_id
'5ce249c9-a15b-426a-949b-d1b94bf9f8fa'
>>>
>>> res.status
'SUCCESS'
>>>
>>> res.get()
25

Several common attributes

  • res.task_id: if the task id is unique, you can get the result according to the id
  • res.status task status: PENDING, STARTED, RETRY, FAILURE, SUCCESS
  • res.get() task running results. Only when the task status is' SUCCESS' can there be running results
  • r.successful() returns a Boolean value. Successful execution returns True
  • r.ready() # returns a Boolean value. When the task is completed, it returns True. Otherwise, it returns False
  • r.wait() # wait for the task to complete and return the task execution result
  • r.result # task execution result
  • r.state # is the same as res.status. Task status: PENDING, START, SUCCESS

AsyncResult get results

When a task is triggered, you will get a task_id, but we won't always query the status status to get the results. We may look at the running results after a while.
So you already know the task_ In the case of ID, how to query the status and results? You can use the AsyncResult class

from celery.result import AsyncResult
res = AsyncResult(id='5ce249c9-a15b-426a-949b-d1b94bf9f8fa')
print(res.state)   # 'SUCCESS'
print(res.get())  # 25

For use with django, refer to the previous article https://www.cnblogs.com/yoyoketang/p/15422804.html
More reference tutorials https://blog.csdn.net/u010339879/article/details/97691231
More reference tutorials https://www.pianshen.com/article/2176289575/

Keywords: Python Django Middleware

Added by Agtronic on Sat, 23 Oct 2021 06:59:43 +0300