A new member has been added to the collection of duplicate codes: tracking the execution time of Celery tasks.
Each Celery task actually has two different "execution" times:
Both are important because our ultimate goal is to know when the task is complete.
After triggering a task, we need to know when the task is completed and when we can expect the results. It's like project estimating. What managers really want to know is when the project will be completed, not that it will be completed in a week but no one will have time to do it in the next six months.
We can use Celery signals to time tasks.
Tip 1: All parameters of Celery signals are keyword parameters. This means we can just list the keyword arguments we're interested in and pack the rest into **kwargs
. This is a great design! All signals should be done this way!
Tip 2: We can store the execution start and end time in the "headers" property of the task object.
When the Celery task enters the queue, record the current time:
<code class="language-python">from celery import signals from dateutil.parser import isoparse from datetime import datetime, timezone @signals.before_task_publish.connect def before_task_publish(*, headers: dict, **kwargs): raw_eta = headers.get("eta") publish_time = isoparse(raw_eta) if raw_eta else datetime.now(tz=timezone.utc) headers["__publish_time"] = publish_time.isoformat()</code>
When the worker process receives the task, record the current time:
<code class="language-python">from celery import signals from datetime import datetime, timezone @signals.task_prerun.connect def task_prerun(*, task: Task, **kwargs): setattr(task.request, "__prerun_time", datetime.now(tz=timezone.utc).isoformat())</code>
When the task is completed, calculate the execution time and store it somewhere, such as StatsD or other monitoring tool.
StatsD is the industry standard technology stack for monitoring applications and instrumenting any software to provide custom metrics.
<code class="language-python">from celery import signals, Task from dateutil.parser import isoparse from datetime import datetime, timezone, timedelta def to_milliseconds(td: timedelta) -> int: return int(td.total_seconds() * 1000) @signals.task_postrun.connect def task_postrun(*, task: Task, **kwargs): now = datetime.now(tz=timezone.utc) publish_time = isoparse(getattr(task.request, "__publish_time", "")) prerun_time = isoparse(getattr(task.request, "__prerun_time", "")) exec_time = now - prerun_time if prerun_time else timedelta(0) waiting_time = prerun_time - publish_time if publish_time and prerun_time else timedelta(0) waiting_and_exec_time = now - publish_time if publish_time else timedelta(0) stats = { "exec_time_ms": to_milliseconds(exec_time), "waiting_time_ms": to_milliseconds(waiting_time), "waiting_and_exec_time_ms": to_milliseconds(waiting_and_exec_time), } # TODO: 将统计数据发送到 StatsD 或其他监控工具 statsd.timing(f"celery.task.exec_time_ms", stats["exec_time_ms"], tags=[f"task:{task.name}"]) # ... 发送其他统计数据 ...</code>
It is possible to add a hardcoded threshold in the above function:
<code class="language-python">if exec_time > timedelta(hours=1): logger.error(f"任务 {task.name} 执行时间过长: {exec_time}。请检查!")</code>
Alternatively, one can set multi-level thresholds or thresholds based on the task definition, or whatever can be expressed in code.
The above is the detailed content of How do I measure the execution time of Celery tasks?. For more information, please follow other related articles on the PHP Chinese website!