How to use Celery to implement distributed task scheduling
Overview:
Celery is one of the most commonly used distributed task queue libraries in Python, which can be used to implement asynchronous task scheduling. This article will introduce how to use Celery to implement distributed task scheduling, and attach code examples.
First, we need to install the Celery library. Celery can be installed through the following command:
pip install celery
After the installation is complete, we need to create a Celery configuration file. Create a file calledceleryconfig.py
and add the following content:
broker_url = 'amqp://guest@localhost//' # RabbitMQ服务器地址 result_backend = 'db+sqlite:///results.sqlite' # 结果存储方式(使用SQLite数据库) task_serializer = 'json' # 任务序列化方式 result_serializer = 'json' # 结果序列化方式 accept_content = ['json'] # 接受的内容类型 timezone = 'Asia/Shanghai' # 时区设置
In the code we need to import Celery library and create a Celery application. Here is an example:
from celery import Celery app = Celery('mytasks', include=['mytasks.tasks']) app.config_from_object('celeryconfig')
In the above code, we create a Celery application namedmytasks
and apply the configuration inceleryconfig.py
into the Celery application.
Next, we need to create a task. A task is an independent function that can perform individual operations. Here is an example:
# tasks.py from mytasks import app @app.task def add(x, y): return x + y
In the above code, we have defined a task namedadd
to calculate the sum of two numbers.
To enable distributed execution of tasks, we need to start one or more Celery Workers to process tasks. Celery Worker can be started through the following command:
celery -A mytasks worker --loglevel=info
After the startup is completed, Celery Worker will listen and process tasks in the queue.
In other code, we can submit tasks to the Celery queue. Here is an example:
# main.py from mytasks.tasks import add result = add.delay(4, 6) print(result.get())
In the above code, we import theadd
task defined previously and then submit a task using thedelay
method. Thedelay
method will return anAsyncResult
object, and we can get the result of the task by calling theget
method.
We can use theAsyncResult
object to monitor the execution status of the task. The following is an example:
# main.py from mytasks.tasks import add result = add.delay(4, 6) while not result.ready(): print("Task is still running...") time.sleep(1) print(result.get())
In the above code, we monitor the execution status of the task through a loop.ready
The method will return a Boolean value indicating whether the task has been completed.
Summary:
This article briefly introduces how to use Celery to implement distributed task scheduling. By installing and configuring Celery, creating a Celery application, defining tasks, starting Celery Workers, and submitting tasks to the queue, we can implement distributed task scheduling. Using Celery can improve task execution efficiency and is suitable for situations where parallel computing or asynchronous processing is required.
The above is the detailed content of How to use Celery to implement distributed task scheduling. For more information, please follow other related articles on the PHP Chinese website!