Maison > développement back-end > Tutoriel Python > Comment mesurer le temps d'exécution des tâches Celery ?

Comment mesurer le temps d'exécution des tâches Celery ?

Barbara Streisand
Libérer: 2025-01-13 22:28:44
original
880 Les gens l'ont consulté

How do I measure the execution time of Celery tasks?

Un nouveau membre a été ajouté à la collection de codes en double : le suivi du temps d'exécution des tâches Celery.

Chaque tâche Céleri a en fait deux temps "d'exécution" différents :

  • Temps d'exécution réel : Le temps nécessaire à l'exécution du code.
  • « Temps d'achèvement » : Inclut le temps passé à attendre dans la file d'attente un processus de travail disponible.

Les deux sont importants car notre objectif ultime est de savoir quand la tâche est terminée.

Après avoir déclenché une tâche, nous devons savoir quand la tâche est terminée et quand nous pouvons en attendre les résultats. C'est comme l'estimation d'un projet. Ce que les managers veulent vraiment savoir, c'est quand le projet sera terminé, non pas qu'il sera terminé en une semaine mais que personne n'aura le temps de le faire dans les six prochains mois.

Utilisez des signaux de céleri

Nous pouvons utiliser les signaux de céleri pour chronométrer les tâches.

Astuce 1 : Tous les paramètres des signaux de céleri sont des paramètres de mots clés. Cela signifie que nous pouvons simplement lister les arguments de mots-clés qui nous intéressent et regrouper le reste dans **kwargs. C'est une superbe conception ! Tous les signaux doivent être effectués de cette façon !

Astuce 2 : Nous pouvons stocker l'heure de début et de fin d'exécution dans la propriété "headers" de l'objet tâche.

Rejoindre une tâche

Lorsque la tâche Céleri entre dans la file d'attente, enregistrez l'heure actuelle :

<code class="language-python">from celery import signals
from dateutil.parser import isoparse
from datetime import datetime, timezone

@signals.before_task_publish.connect
def before_task_publish(*, headers: dict, **kwargs):
    raw_eta = headers.get("eta")
    publish_time = isoparse(raw_eta) if raw_eta else datetime.now(tz=timezone.utc)
    headers["__publish_time"] = publish_time.isoformat()</code>
Copier après la connexion

L'exécution de la tâche commence

Lorsque le processus de travail reçoit la tâche, enregistrez l'heure actuelle :

<code class="language-python">from celery import signals
from datetime import datetime, timezone

@signals.task_prerun.connect
def task_prerun(*, task: Task, **kwargs):
    setattr(task.request, "__prerun_time", datetime.now(tz=timezone.utc).isoformat())</code>
Copier après la connexion

Exécution de la tâche terminée

Lorsque la tâche est terminée, calculez le temps d'exécution et stockez-le quelque part, comme StatsD ou un autre outil de surveillance.

StatsD est la pile technologique standard de l'industrie pour surveiller les applications et instrumenter tout logiciel afin de fournir des métriques personnalisées.

  • Netdata : Introduction à StatsD [1]
<code class="language-python">from celery import signals, Task
from dateutil.parser import isoparse
from datetime import datetime, timezone, timedelta

def to_milliseconds(td: timedelta) -> int:
    return int(td.total_seconds() * 1000)

@signals.task_postrun.connect
def task_postrun(*, task: Task, **kwargs):
    now = datetime.now(tz=timezone.utc)
    publish_time = isoparse(getattr(task.request, "__publish_time", ""))
    prerun_time = isoparse(getattr(task.request, "__prerun_time", ""))

    exec_time = now - prerun_time if prerun_time else timedelta(0)
    waiting_time = prerun_time - publish_time if publish_time and prerun_time else timedelta(0)
    waiting_and_exec_time = now - publish_time if publish_time else timedelta(0)

    stats = {
        "exec_time_ms": to_milliseconds(exec_time),
        "waiting_time_ms": to_milliseconds(waiting_time),
        "waiting_and_exec_time_ms": to_milliseconds(waiting_and_exec_time),
    }
    # TODO: 将统计数据发送到 StatsD 或其他监控工具
    statsd.timing(f"celery.task.exec_time_ms", stats["exec_time_ms"], tags=[f"task:{task.name}"])
    # ... 发送其他统计数据 ...</code>
Copier après la connexion

Fonctionnalité supplémentaire : définir un avertissement de temps d'exécution long

Il est possible d'ajouter un seuil codé en dur dans la fonction ci-dessus :

<code class="language-python">if exec_time > timedelta(hours=1):
    logger.error(f"任务 {task.name} 执行时间过长: {exec_time}。请检查!")</code>
Copier après la connexion

Alternativement, on peut définir des seuils à plusieurs niveaux ou des seuils basés sur la définition de la tâche, ou tout ce qui peut être exprimé dans le code.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:php.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Derniers articles par auteur
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal