중복 코드 모음에 Celery 작업 실행 시간 추적이라는 새 멤버가 추가되었습니다.
각 Celery 작업에는 실제로 두 가지 "실행" 시간이 있습니다.
우리의 궁극적인 목표는 작업이 언제 완료되는지 아는 것이기 때문에 두 가지 모두 중요합니다.
작업을 트리거한 후 작업이 언제 완료되고 언제 결과를 기대할 수 있는지 알아야 합니다. 그것은 프로젝트 견적과 같습니다. 관리자들이 정말로 알고 싶어하는 것은 프로젝트가 언제 완료될 것인지, 일주일 안에 완료될 것인지가 아니라 앞으로 6개월 안에 그것을 할 시간이 아무도 없을 것인지입니다.
Celery 신호를 사용하여 작업 시간을 측정할 수 있습니다.
팁 1: Celery 신호의 모든 매개변수는 키워드 매개변수입니다. 즉, 관심 있는 키워드 인수만 나열하고 나머지는 **kwargs
에 넣을 수 있습니다. 이것은 훌륭한 디자인입니다! 모든 신호는 이렇게 해야 합니다!
팁 2: 작업 개체의 "headers" 속성에 실행 시작 및 종료 시간을 저장할 수 있습니다.
Celery 작업이 대기열에 들어가면 현재 시간을 기록합니다.
<code class="language-python">from celery import signals from dateutil.parser import isoparse from datetime import datetime, timezone @signals.before_task_publish.connect def before_task_publish(*, headers: dict, **kwargs): raw_eta = headers.get("eta") publish_time = isoparse(raw_eta) if raw_eta else datetime.now(tz=timezone.utc) headers["__publish_time"] = publish_time.isoformat()</code>
작업자 프로세스가 작업을 수신하면 현재 시간을 기록합니다.
<code class="language-python">from celery import signals from datetime import datetime, timezone @signals.task_prerun.connect def task_prerun(*, task: Task, **kwargs): setattr(task.request, "__prerun_time", datetime.now(tz=timezone.utc).isoformat())</code>
작업이 완료되면 실행 시간을 계산하여 StatsD나 기타 모니터링 도구 같은 곳에 저장합니다.
StatsD는 애플리케이션을 모니터링하고 소프트웨어를 계측하여 맞춤형 측정항목을 제공하기 위한 업계 표준 기술 스택입니다.
<code class="language-python">from celery import signals, Task from dateutil.parser import isoparse from datetime import datetime, timezone, timedelta def to_milliseconds(td: timedelta) -> int: return int(td.total_seconds() * 1000) @signals.task_postrun.connect def task_postrun(*, task: Task, **kwargs): now = datetime.now(tz=timezone.utc) publish_time = isoparse(getattr(task.request, "__publish_time", "")) prerun_time = isoparse(getattr(task.request, "__prerun_time", "")) exec_time = now - prerun_time if prerun_time else timedelta(0) waiting_time = prerun_time - publish_time if publish_time and prerun_time else timedelta(0) waiting_and_exec_time = now - publish_time if publish_time else timedelta(0) stats = { "exec_time_ms": to_milliseconds(exec_time), "waiting_time_ms": to_milliseconds(waiting_time), "waiting_and_exec_time_ms": to_milliseconds(waiting_and_exec_time), } # TODO: 将统计数据发送到 StatsD 或其他监控工具 statsd.timing(f"celery.task.exec_time_ms", stats["exec_time_ms"], tags=[f"task:{task.name}"]) # ... 发送其他统计数据 ...</code>
위 함수에 하드코딩된 임계값을 추가할 수 있습니다.
<code class="language-python">if exec_time > timedelta(hours=1): logger.error(f"任务 {task.name} 执行时间过长: {exec_time}。请检查!")</code>
또는 작업 정의 또는 코드로 표현할 수 있는 모든 항목을 기반으로 다중 수준 임계값 또는 임계값을 설정할 수 있습니다.
위 내용은 Celery 작업의 실행 시간을 어떻게 측정합니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!