ホームページ > バックエンド開発 > Python チュートリアル > Celery タスクの実行時間を測定するにはどうすればよいですか?

Celery タスクの実行時間を測定するにはどうすればよいですか?

Barbara Streisand
リリース: 2025-01-13 22:28:44
オリジナル
880 人が閲覧しました

How do I measure the execution time of Celery tasks?

Celery タスクの実行時間を追跡する新しいメンバーが重複コードのコレクションに追加されました。

Celery の各タスクには、実際には 2 つの異なる「実行」時間があります。

  • 実際の実行時間: コードの実行にかかる時間。
  • "完了時間": 使用可能なワーカー プロセスをキューで待機している時間も含まれます。

私たちの最終目標はタスクがいつ完了するかを知ることであるため、どちらも重要です。

タスクをトリガーした後、タスクがいつ完了し、いつ結果が期待できるかを知る必要があります。プロジェクトの見積もりの​​ようなものです。マネージャーが本当に知りたいのは、プロジェクトがいつ完了するかということであり、1 週間で完了するが、今後 6 か月以内にプロジェクトを実行する時間が誰もないということではありません。

セロリシグナルを使用する

Celery シグナルを使用してタスクの時間を計ることができます。

ヒント 1: Celery シグナルのすべてのパラメーターは キーワード パラメーター です。つまり、関心のあるキーワード引数をリストし、残りを **kwargs に詰め込むことができます。これは素晴らしいデザインですね!すべての信号はこのように行う必要があります。

ヒント 2: 実行の開始時刻と終了時刻をタスク オブジェクトの "headers" プロパティに保存できます。

タスク参加

Celery タスクがキューに入ったら、現在の時間を記録します:

<code class="language-python">from celery import signals
from dateutil.parser import isoparse
from datetime import datetime, timezone

@signals.before_task_publish.connect
def before_task_publish(*, headers: dict, **kwargs):
    raw_eta = headers.get("eta")
    publish_time = isoparse(raw_eta) if raw_eta else datetime.now(tz=timezone.utc)
    headers["__publish_time"] = publish_time.isoformat()</code>
ログイン後にコピー

タスクの実行が開始されます

ワーカー プロセスがタスクを受信したら、現在の時刻を記録します:

<code class="language-python">from celery import signals
from datetime import datetime, timezone

@signals.task_prerun.connect
def task_prerun(*, task: Task, **kwargs):
    setattr(task.request, "__prerun_time", datetime.now(tz=timezone.utc).isoformat())</code>
ログイン後にコピー

タスクの実行が終了しました

タスクが完了したら、実行時間を計算し、StatsD やその他の監視ツールなどの場所に保存します。

StatsD は、アプリケーションを監視し、カスタム メトリクスを提供するソフトウェアを計測するための業界標準のテクノロジー スタックです。

  • ネットデータ: StatsD の概要 [1]
<code class="language-python">from celery import signals, Task
from dateutil.parser import isoparse
from datetime import datetime, timezone, timedelta

def to_milliseconds(td: timedelta) -> int:
    return int(td.total_seconds() * 1000)

@signals.task_postrun.connect
def task_postrun(*, task: Task, **kwargs):
    now = datetime.now(tz=timezone.utc)
    publish_time = isoparse(getattr(task.request, "__publish_time", ""))
    prerun_time = isoparse(getattr(task.request, "__prerun_time", ""))

    exec_time = now - prerun_time if prerun_time else timedelta(0)
    waiting_time = prerun_time - publish_time if publish_time and prerun_time else timedelta(0)
    waiting_and_exec_time = now - publish_time if publish_time else timedelta(0)

    stats = {
        "exec_time_ms": to_milliseconds(exec_time),
        "waiting_time_ms": to_milliseconds(waiting_time),
        "waiting_and_exec_time_ms": to_milliseconds(waiting_and_exec_time),
    }
    # TODO: 将统计数据发送到 StatsD 或其他监控工具
    statsd.timing(f"celery.task.exec_time_ms", stats["exec_time_ms"], tags=[f"task:{task.name}"])
    # ... 发送其他统计数据 ...</code>
ログイン後にコピー

追加機能: 長い実行時間の警告を設定します

上記の関数でハードコードされたしきい値を追加することができます:

<code class="language-python">if exec_time > timedelta(hours=1):
    logger.error(f"任务 {task.name} 执行时间过长: {exec_time}。请检查!")</code>
ログイン後にコピー

あるいは、タスク定義やコードで表現できるものに基づいて、マルチレベルのしきい値やしきい値を設定することもできます。

以上がCelery タスクの実行時間を測定するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
著者別の最新記事
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート