brief introduction
Celery is a distributed task scheduling framework written in python.
It has several main concepts:
celery application
- The code script written by the user is used to define the task to be executed, and then send the task to the message queue through the broker
broker
- The agent coordinates between the client and the worker through message queuing.
- The celery itself does not contain a message queue. It supports the message queue rabbitmqrdismazon sqszookeeper
- For more information about Broker, see the official documentation
backend
- Database, which is used to store the results returned by the task.
worker
- Worker, used to perform tasks assigned by broker.
task
- Tasks, defined tasks to be performed
Version requirements
Celery5.1 Requirements:
- python(3.6,3.7,3.8)
Celery is the least funded project, so we don't support Microsoft Windows.
See the official documentation for more detailed version requirements
install
Install using pip:
pip install -U Celery
Bundle
Celery also defines a set of packages for installing celery and given dependencies.
You can specify these dependencies by implementing braces in the pip command.
pip install "celery[librabbitmq]" pip install "celery[librabbitmq,redis,auth,msgpack]"
See the official documentation for specific supported dependency packages.
Simple use
1. Select a broker
To use celery, you first need to select a message queue. Install any familiar message queuing supported by the previously mentioned celery.
2. Write a celery application
First, we need to write a celery application to create tasks and manage wokers, which can be imported by other modules.
Create a tasks Py file:
from celery import Celery app = Celery('tasks', broker='redis://localhost:6379/0') @app.task def add(x, y): return x + y
The first parameter tasks is the name of the current module, which can be omitted. It is recommended to use the current module name as the name.
The second keyword parameter is {broker='redis://localhost:6379/0 'specify that we use Redis as the message queue and specify the connection address.
3. Run the worker service of celery
cd to tasks Py directory, and then run the following command to start the worker service
celery -A tasks worker --loglevel=INFO
4. Call task
>>> from tasks import add >>> add.delay(4,4)
Execute the corresponding task by calling the {delay} of the task. celery will send the execution command to the broker, and the broker will send the message to the worker service for execution. If everything is normal, you will see the log of receiving and executing tasks in the log of the worker service.
5. Save results
If you want to track the status of the task and save the returned results of the task, celery needs to send it somewhere. Celery provides a variety of back ends for results.
Let's take reids as an example and modify tasks Py, add a Redis backend.
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
See the official documentation for more results.
Restart the worker service and reopen the python interpreter
>>> from tasks import add >>> result = add.delay(4,4)
The ready() method returns whether the task is completed or not:
>>> result.ready() False
You can also wait for the results to complete, but this method is rarely used because it converts asynchronous calls to synchronous calls
>>> result.get(timeout=1) 8
Using celery in applications
Create project
Project structure:
proj/__init__.py /celery.py /tasks.py
proj/celery.py
from celery import Celery app = Celery('proj', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1', include=['proj.tasks'] )# to configure app.conf.update( result_expires=3600, # Result expiration time )
In this module, we created a "celery" module. To use celery in your project, just import this instance.
proj/tasks.py
from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.tas kdef xsum(numbers) return sum(numbers)
Start worker
celery -A proj worker -l INFO
Call task
>>> from proj.tasks import add >>> add.delay(2, 2)
Using celery in django
To use Celery in your django project, you first need to define an instance of Celery.
If you have django project as follows:
- proj/ - manage.py - proj/ - __init__.py - settings.py - urls.py
The recommended method is to create a new proj / proj / cell Py module to define celery instance: File: proj / proj / celery py
import os from celery import Celery # Set the default django setting module for 'celery' os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings') app = Celery('proj') # Set configuration source app.config_from_object('django.conf:settings',namespace='CELERY') # Load tasks in all registered django applications app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print(f'Request: {self.request!r}')
Then you need to be in your proj/proj/__init__.py module. This ensures that the application is loaded when Django starts up for @ shared_ Use of task decorator.
proj/proj/__init__.py:
from .celery import app as celery_app __all__ = ('celery_app',)
Note that this sample project layout is suitable for larger projects, and for simple projects, you can use a single module that defines applications and tasks.
Next, let's explain "celery" Py. First, we set the environment variable Django of the celery command line program_ SETTINGS_ Default value of module:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
This line is used to load the environment settings of the current django project, especially when ORM needs to be used in asynchronous tasks. It must be before creating an application instance.
app = Celery('proj')
We also added the Django setup module as the configuration source of Celery. This means that we don't have to use multiple configuration files, but directly configure Celery in Django's configuration file.
app.config_from_object('django.conf:settings', namespace='CELERY')
An uppercase namespace means that all CELERY configuration items must be specified in uppercase and in {CELERY_ So, for example, broker_url , setting changed to , salary_ broker_url.
For example, a configuration file for a Django project might include:
settings.py
CELERY_TIMEZONE = "Asia/Shanghai" CELERY_TASK_TRACK_STARTED = True CELERY_TASK_TIME_LIMIT = 30*60
Next, a common practice for reusable applications is in separate tasks All tasks are defined in the PY module. Celery has a method to automatically discover these modules:
app.autodiscover_tasks()
Using the above line, Celery will follow tasks Py} contract to automatically discover tasks from all installed applications:
- app1/ - tasks.py - models.py - app2/ - tasks.py - models.py
This eliminates the need to manually add individual modules to CELERY_IMPORTS} setting.
Use @ shared_task decorator
The tasks we write may exist in reusable applications, which cannot depend on the project itself, so we cannot directly import the celery application instance.
@shared_ The task decorator allows us to create tasks without any specific cell instance: demoapp / tasks py
# Create your tasks here from demoapp.models import Widget from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers) @shared_task def count_widgets(): return Widget.objects.count() @shared_task def rename_widget(widget_id, name): w = Widget.objects.get(id=widget_id) w.name = name w.save()