Der Sammlung doppelter Codes wurde ein neues Mitglied hinzugefügt: Verfolgung der Ausführungszeit von Celery-Aufgaben.
Jede Celery-Aufgabe hat tatsächlich zwei unterschiedliche „Ausführungszeiten“:
Beides ist wichtig, denn unser oberstes Ziel ist es, zu wissen, wann die Aufgabe abgeschlossen ist.
Nachdem wir eine Aufgabe ausgelöst haben, müssen wir wissen, wann die Aufgabe abgeschlossen ist und wann wir mit den Ergebnissen rechnen können. Es ist wie eine Projektschätzung. Was Manager wirklich wissen wollen, ist, wann das Projekt abgeschlossen sein wird, und nicht, dass es in einer Woche abgeschlossen sein wird, aber in den nächsten sechs Monaten niemand Zeit dafür haben wird.
Wir können Celery-Signale verwenden, um Aufgaben zeitlich zu steuern.
Tipp 1: Alle Parameter von Celery-Signalen sind Schlüsselwortparameter. Das bedeutet, dass wir einfach die Schlüsselwortargumente auflisten können, die uns interessieren, und den Rest in **kwargs
packen können. Das ist ein tolles Design! Alle Signale sollten auf diese Weise erfolgen!
Tipp 2: Wir können die Start- und Endzeit der Ausführung in der Eigenschaft „headers“ des Aufgabenobjekts speichern.
Wenn die Celery-Aufgabe in die Warteschlange gelangt, notieren Sie die aktuelle Uhrzeit:
<code class="language-python">from celery import signals from dateutil.parser import isoparse from datetime import datetime, timezone @signals.before_task_publish.connect def before_task_publish(*, headers: dict, **kwargs): raw_eta = headers.get("eta") publish_time = isoparse(raw_eta) if raw_eta else datetime.now(tz=timezone.utc) headers["__publish_time"] = publish_time.isoformat()</code>
Wenn der Arbeitsprozess die Aufgabe empfängt, notieren Sie die aktuelle Zeit:
<code class="language-python">from celery import signals from datetime import datetime, timezone @signals.task_prerun.connect def task_prerun(*, task: Task, **kwargs): setattr(task.request, "__prerun_time", datetime.now(tz=timezone.utc).isoformat())</code>
Wenn die Aufgabe abgeschlossen ist, berechnen Sie die Ausführungszeit und speichern Sie sie irgendwo, z. B. in StatsD oder einem anderen Überwachungstool.
StatsD ist der branchenübliche Technologie-Stack zur Überwachung von Anwendungen und zur Instrumentierung jeglicher Software, um benutzerdefinierte Metriken bereitzustellen.
<code class="language-python">from celery import signals, Task from dateutil.parser import isoparse from datetime import datetime, timezone, timedelta def to_milliseconds(td: timedelta) -> int: return int(td.total_seconds() * 1000) @signals.task_postrun.connect def task_postrun(*, task: Task, **kwargs): now = datetime.now(tz=timezone.utc) publish_time = isoparse(getattr(task.request, "__publish_time", "")) prerun_time = isoparse(getattr(task.request, "__prerun_time", "")) exec_time = now - prerun_time if prerun_time else timedelta(0) waiting_time = prerun_time - publish_time if publish_time and prerun_time else timedelta(0) waiting_and_exec_time = now - publish_time if publish_time else timedelta(0) stats = { "exec_time_ms": to_milliseconds(exec_time), "waiting_time_ms": to_milliseconds(waiting_time), "waiting_and_exec_time_ms": to_milliseconds(waiting_and_exec_time), } # TODO: 将统计数据发送到 StatsD 或其他监控工具 statsd.timing(f"celery.task.exec_time_ms", stats["exec_time_ms"], tags=[f"task:{task.name}"]) # ... 发送其他统计数据 ...</code>
Es ist möglich, in der obigen Funktion einen fest codierten Schwellenwert hinzuzufügen:
<code class="language-python">if exec_time > timedelta(hours=1): logger.error(f"任务 {task.name} 执行时间过长: {exec_time}。请检查!")</code>
Alternativ kann man mehrstufige Schwellenwerte oder Schwellenwerte basierend auf der Aufgabendefinition oder was auch immer im Code ausgedrückt werden kann, festlegen.
Das obige ist der detaillierte Inhalt vonWie messe ich die Ausführungszeit von Celery-Aufgaben?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!