How to use Celery to implement distributed task scheduling
Overview:
Celery is one of the most commonly used distributed task queue libraries in Python, which can be used to implement asynchronous task scheduling. This article will introduce how to use Celery to implement distributed task scheduling, and attach code examples.
First, we need to install the Celery library. Celery can be installed through the following command:
pip install celery
After the installation is complete, we need to create a Celery configuration file. Create a file calledceleryconfig.pyand add the following content:
broker_url = 'amqp://guest@localhost//' # RabbitMQ服务器地址 result_backend = 'db+sqlite:///results.sqlite' # 结果存储方式(使用SQLite数据库) task_serializer = 'json' # 任务序列化方式 result_serializer = 'json' # 结果序列化方式 accept_content = ['json'] # 接受的内容类型 timezone = 'Asia/Shanghai' # 时区设置
In the code we need to import Celery library and create a Celery application. Here is an example:
from celery import Celery app = Celery('mytasks', include=['mytasks.tasks']) app.config_from_object('celeryconfig')
In the above code, we create a Celery application namedmytasksand apply the configuration inceleryconfig.pyinto the Celery application.
Next, we need to create a task. A task is an independent function that can perform individual operations. Here is an example:
# tasks.py from mytasks import app @app.task def add(x, y): return x + y
In the above code, we have defined a task namedaddto calculate the sum of two numbers.
To enable distributed execution of tasks, we need to start one or more Celery Workers to process tasks. Celery Worker can be started through the following command:
celery -A mytasks worker --loglevel=info
After the startup is completed, Celery Worker will listen and process tasks in the queue.
In other code, we can submit tasks to the Celery queue. Here is an example:
# main.py from mytasks.tasks import add result = add.delay(4, 6) print(result.get())
In the above code, we import theaddtask defined previously and then submit a task using thedelaymethod. Thedelaymethod will return anAsyncResultobject, and we can get the result of the task by calling thegetmethod.
We can use theAsyncResultobject to monitor the execution status of the task. The following is an example:
# main.py from mytasks.tasks import add result = add.delay(4, 6) while not result.ready(): print("Task is still running...") time.sleep(1) print(result.get())
In the above code, we monitor the execution status of the task through a loop.readyThe method will return a Boolean value indicating whether the task has been completed.
Summary:
This article briefly introduces how to use Celery to implement distributed task scheduling. By installing and configuring Celery, creating a Celery application, defining tasks, starting Celery Workers, and submitting tasks to the queue, we can implement distributed task scheduling. Using Celery can improve task execution efficiency and is suitable for situations where parallel computing or asynchronous processing is required.
The above is the detailed content of How to use Celery to implement distributed task scheduling. For more information, please follow other related articles on the PHP Chinese website!