Heim > Backend-Entwicklung > Python-Tutorial > Wie messe ich die Ausführungszeit von Celery-Aufgaben?

Wie messe ich die Ausführungszeit von Celery-Aufgaben?

Barbara Streisand
Freigeben: 2025-01-13 22:28:44
Original
880 Leute haben es durchsucht

How do I measure the execution time of Celery tasks?

Der Sammlung doppelter Codes wurde ein neues Mitglied hinzugefügt: Verfolgung der Ausführungszeit von Celery-Aufgaben.

Jede Celery-Aufgabe hat tatsächlich zwei unterschiedliche „Ausführungszeiten“:

  • Tatsächliche Ausführungszeit: Die Zeit, die der Code benötigt, um ausgeführt zu werden.
  • „Abschlusszeit“: Beinhaltet die Zeit, die in der Warteschlange auf einen verfügbaren Arbeitsprozess gewartet wurde.

Beides ist wichtig, denn unser oberstes Ziel ist es, zu wissen, wann die Aufgabe abgeschlossen ist.

Nachdem wir eine Aufgabe ausgelöst haben, müssen wir wissen, wann die Aufgabe abgeschlossen ist und wann wir mit den Ergebnissen rechnen können. Es ist wie eine Projektschätzung. Was Manager wirklich wissen wollen, ist, wann das Projekt abgeschlossen sein wird, und nicht, dass es in einer Woche abgeschlossen sein wird, aber in den nächsten sechs Monaten niemand Zeit dafür haben wird.

Verwenden Sie Selleriesignale

Wir können Celery-Signale verwenden, um Aufgaben zeitlich zu steuern.

Tipp 1: Alle Parameter von Celery-Signalen sind Schlüsselwortparameter. Das bedeutet, dass wir einfach die Schlüsselwortargumente auflisten können, die uns interessieren, und den Rest in **kwargs packen können. Das ist ein tolles Design! Alle Signale sollten auf diese Weise erfolgen!

Tipp 2: Wir können die Start- und Endzeit der Ausführung in der Eigenschaft „headers“ des Aufgabenobjekts speichern.

Aufgabenbeitritt

Wenn die Celery-Aufgabe in die Warteschlange gelangt, notieren Sie die aktuelle Uhrzeit:

<code class="language-python">from celery import signals
from dateutil.parser import isoparse
from datetime import datetime, timezone

@signals.before_task_publish.connect
def before_task_publish(*, headers: dict, **kwargs):
    raw_eta = headers.get("eta")
    publish_time = isoparse(raw_eta) if raw_eta else datetime.now(tz=timezone.utc)
    headers["__publish_time"] = publish_time.isoformat()</code>
Nach dem Login kopieren

Aufgabenausführung beginnt

Wenn der Arbeitsprozess die Aufgabe empfängt, notieren Sie die aktuelle Zeit:

<code class="language-python">from celery import signals
from datetime import datetime, timezone

@signals.task_prerun.connect
def task_prerun(*, task: Task, **kwargs):
    setattr(task.request, "__prerun_time", datetime.now(tz=timezone.utc).isoformat())</code>
Nach dem Login kopieren

Aufgabenausführung beendet

Wenn die Aufgabe abgeschlossen ist, berechnen Sie die Ausführungszeit und speichern Sie sie irgendwo, z. B. in StatsD oder einem anderen Überwachungstool.

StatsD ist der branchenübliche Technologie-Stack zur Überwachung von Anwendungen und zur Instrumentierung jeglicher Software, um benutzerdefinierte Metriken bereitzustellen.

  • Netdata: StatsD-Einführung [1]
<code class="language-python">from celery import signals, Task
from dateutil.parser import isoparse
from datetime import datetime, timezone, timedelta

def to_milliseconds(td: timedelta) -> int:
    return int(td.total_seconds() * 1000)

@signals.task_postrun.connect
def task_postrun(*, task: Task, **kwargs):
    now = datetime.now(tz=timezone.utc)
    publish_time = isoparse(getattr(task.request, "__publish_time", ""))
    prerun_time = isoparse(getattr(task.request, "__prerun_time", ""))

    exec_time = now - prerun_time if prerun_time else timedelta(0)
    waiting_time = prerun_time - publish_time if publish_time and prerun_time else timedelta(0)
    waiting_and_exec_time = now - publish_time if publish_time else timedelta(0)

    stats = {
        "exec_time_ms": to_milliseconds(exec_time),
        "waiting_time_ms": to_milliseconds(waiting_time),
        "waiting_and_exec_time_ms": to_milliseconds(waiting_and_exec_time),
    }
    # TODO: 将统计数据发送到 StatsD 或其他监控工具
    statsd.timing(f"celery.task.exec_time_ms", stats["exec_time_ms"], tags=[f"task:{task.name}"])
    # ... 发送其他统计数据 ...</code>
Nach dem Login kopieren

Zusätzliche Funktion: Warnung vor langer Ausführungszeit einstellen

Es ist möglich, in der obigen Funktion einen fest codierten Schwellenwert hinzuzufügen:

<code class="language-python">if exec_time > timedelta(hours=1):
    logger.error(f"任务 {task.name} 执行时间过长: {exec_time}。请检查!")</code>
Nach dem Login kopieren

Alternativ kann man mehrstufige Schwellenwerte oder Schwellenwerte basierend auf der Aufgabendefinition oder was auch immer im Code ausgedrückt werden kann, festlegen.

Das obige ist der detaillierte Inhalt vonWie messe ich die Ausführungszeit von Celery-Aufgaben?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Neueste Artikel des Autors
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage