Introduction to python asynchronous task framework Celery

brief introduction

 

Celery is a distributed task scheduling framework written in python.

 

It has several main concepts:

celery application

  • The code script written by the user is used to define the task to be executed, and then send the task to the message queue through the broker

broker

  • The agent coordinates between the client and the worker through message queuing.
  • The celery itself does not contain a message queue. It supports the message queue rabbitmqrdismazon sqszookeeper
  • For more information about Broker, see the official documentation

backend

  • Database, which is used to store the results returned by the task.

worker

  • Worker, used to perform tasks assigned by broker.

task

  • Tasks, defined tasks to be performed

Version requirements

Celery5.1 Requirements:

  • python(3.6,3.7,3.8)

Celery is the least funded project, so we don't support Microsoft Windows.

See the official documentation for more detailed version requirements

 

install

Install using pip:

pip install -U Celery

Bundle

Celery also defines a set of packages for installing celery and given dependencies.

You can specify these dependencies by implementing braces in the pip command.

pip install "celery[librabbitmq]"
pip install "celery[librabbitmq,redis,auth,msgpack]"

See the official documentation for specific supported dependency packages.

Simple use

1. Select a broker

To use celery, you first need to select a message queue. Install any familiar message queuing supported by the previously mentioned celery.

2. Write a celery application

First, we need to write a celery application to create tasks and manage wokers, which can be imported by other modules.

Create a tasks Py file:

from celery import Celery


app = Celery('tasks', broker='redis://localhost:6379/0')


@app.task
def add(x, y):   
    return x + y

The first parameter tasks is the name of the current module, which can be omitted. It is recommended to use the current module name as the name.

The second keyword parameter is {broker='redis://localhost:6379/0 'specify that we use Redis as the message queue and specify the connection address.

3. Run the worker service of celery

cd to tasks Py directory, and then run the following command to start the worker service

celery -A tasks worker --loglevel=INFO

4. Call task

>>> from tasks import add
>>> add.delay(4,4)

Execute the corresponding task by calling the {delay} of the task. celery will send the execution command to the broker, and the broker will send the message to the worker service for execution. If everything is normal, you will see the log of receiving and executing tasks in the log of the worker service.

5. Save results

If you want to track the status of the task and save the returned results of the task, celery needs to send it somewhere. Celery provides a variety of back ends for results.

Let's take reids as an example and modify tasks Py, add a Redis backend.

app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

See the official documentation for more results.

Restart the worker service and reopen the python interpreter

>>> from tasks import add
>>> result = add.delay(4,4)

The ready() method returns whether the task is completed or not:

>>> result.ready()
False

You can also wait for the results to complete, but this method is rarely used because it converts asynchronous calls to synchronous calls

>>> result.get(timeout=1)
8

 

Using celery in applications

Create project

Project structure:

proj/__init__.py
    /celery.py
    /tasks.py

proj/celery.py

from celery import Celery


app = Celery('proj',
            broker='redis://localhost:6379/0',
            backend='redis://localhost:6379/1',             
            include=['proj.tasks']


)# to configure
app.conf.update(
   result_expires=3600, # Result expiration time
)

In this module, we created a "celery" module. To use celery in your project, just import this instance.

proj/tasks.py

from .celery import app




@app.task
def add(x, y): 
   return x + y




@app.task
def mul(x, y):
   return x * y




@app.tas
kdef xsum(numbers)
    return sum(numbers)

Start worker

celery -A proj worker -l INFO

Call task

>>> from proj.tasks import add
>>> add.delay(2, 2)

Using celery in django

To use Celery in your django project, you first need to define an instance of Celery.

If you have django project as follows:

- proj/ 
 - manage.py
 - proj/ 
   - __init__.py
   - settings.py
   - urls.py

The recommended method is to create a new proj / proj / cell Py module to define celery instance: File: proj / proj / celery py

import os


from celery import Celery


# Set the default django setting module for 'celery'
os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings')


app = Celery('proj')


# Set configuration source
app.config_from_object('django.conf:settings',namespace='CELERY')


# Load tasks in all registered django applications
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self): 
   print(f'Request: {self.request!r}')

Then you need to be in your proj/proj/__init__.py module. This ensures that the application is loaded when Django starts up for @ shared_ Use of task decorator.

proj/proj/__init__.py:

from .celery import app as celery_app
__all__ = ('celery_app',)

Note that this sample project layout is suitable for larger projects, and for simple projects, you can use a single module that defines applications and tasks.

Next, let's explain "celery" Py. First, we set the environment variable Django of the celery command line program_ SETTINGS_ Default value of module:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

This line is used to load the environment settings of the current django project, especially when ORM needs to be used in asynchronous tasks. It must be before creating an application instance.

app = Celery('proj')

We also added the Django setup module as the configuration source of Celery. This means that we don't have to use multiple configuration files, but directly configure Celery in Django's configuration file.

app.config_from_object('django.conf:settings', namespace='CELERY')

An uppercase namespace means that all CELERY configuration items must be specified in uppercase and in {CELERY_ So, for example, broker_url , setting changed to , salary_ broker_url.

For example, a configuration file for a Django project might include:

settings.py

CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30*60

Next, a common practice for reusable applications is in separate tasks All tasks are defined in the PY module. Celery has a method to automatically discover these modules:

app.autodiscover_tasks()

Using the above line, Celery will follow tasks Py} contract to automatically discover tasks from all installed applications:

- app1/
   - tasks.py
   - models.py
- app2/
   - tasks.py
   - models.py

This eliminates the need to manually add individual modules to CELERY_IMPORTS} setting.

Use @ shared_task decorator

The tasks we write may exist in reusable applications, which cannot depend on the project itself, so we cannot directly import the celery application instance.

@shared_ The task decorator allows us to create tasks without any specific cell instance: demoapp / tasks py

# Create your tasks here


from demoapp.models import Widget


from celery import shared_task




@shared_task
def add(x, y):
   return x + y




@shared_task
def mul(x, y):
   return x * y




@shared_task
def xsum(numbers):
   return sum(numbers)




@shared_task
def count_widgets(): 
   return Widget.objects.count()




@shared_task
def rename_widget(widget_id, name):
   w = Widget.objects.get(id=widget_id)
   w.name = name
   w.save()

Keywords: Python Database

Added by joey3002 on Mon, 13 Dec 2021 13:54:01 +0200