So implementieren Sie geplante Aufgaben und periodische Aufgaben in FastAPI
Einführung:
FastAPI ist ein modernes, hochleistungsfähiges Python-Framework, das sich auf die Erstellung von API-Anwendungen konzentriert. Manchmal müssen wir jedoch geplante Aufgaben und regelmäßige Aufgaben in FastAPI-Anwendungen ausführen. Dieser Artikel beschreibt, wie diese Aufgaben in einer FastAPI-Anwendung implementiert werden, und stellt entsprechende Codebeispiele bereit.
1. Implementierung geplanter Aufgaben
Verwendung der APScheduler-Bibliothek
APScheduler ist eine leistungsstarke Python-Bibliothek zum Planen und Verwalten geplanter Aufgaben. Es unterstützt mehrere Aufgabenplaner, z. B. basierend auf Datum, Zeitintervall und Cron-Ausdruck. Im Folgenden sind die Schritte aufgeführt, um mit APScheduler geplante Aufgaben in FastAPI zu implementieren:
pip install apscheduler
im Terminal aus, um die APScheduler-Bibliothek zu installieren. pip install apscheduler
来安装APScheduler库。tasks.py
的文件,用于定义定时任务。from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() @scheduler.scheduled_job('interval', seconds=10) def job(): print("This is a scheduled job") scheduler.start()
from fastapi import FastAPI from .tasks import scheduler app = FastAPI() app.mount("/tasks", scheduler.app)
使用Celery库
Celery是一个强大的分布式任务队列库,支持异步和定时任务。以下是在FastAPI中使用Celery实现定时任务的步骤:
pip install celery
来安装Celery库。tasks.py
tasks.py
, um geplante Aufgaben zu definieren. from celery import Celery app = Celery('tasks', broker='redis://localhost:6379') @app.task def job(): print("This is a scheduled job")
from fastapi import FastAPI from .tasks import app as celery_app app = FastAPI() app.mount("/tasks", celery_app)
Celery ist eine leistungsstarke verteilte Aufgabenwarteschlangenbibliothek, die asynchrone und geplante Aufgaben unterstützt. Im Folgenden sind die Schritte aufgeführt, um Celery zum Implementieren geplanter Aufgaben in FastAPI zu verwenden:
pip install celery
im Terminal aus, um die Celery-Bibliothek zu installieren. Erstellen Sie ein Modul für geplante Aufgaben: Erstellen Sie im Stammverzeichnis der FastAPI-Anwendung eine Datei mit dem Namen tasks.py
, um geplante Aufgaben zu definieren. from apscheduler.triggers.cron import CronTrigger scheduler = BackgroundScheduler() @scheduler.scheduled_job(CronTrigger.from_crontab('* * * * *')) def job(): print("This is a periodic job") scheduler.start()
from celery import Celery from celery.schedules import crontab app = Celery('tasks', broker='redis://localhost:6379') @app.task def job(): print("This is a periodic job") app.conf.beat_schedule = { 'job': { 'task': 'tasks.job', 'schedule': crontab(minute='*'), }, }
Erstellen Sie ein periodisches Aufgabenmodul: siehe Schritt 2 im vorherigen Artikel.
Installieren Sie die Celery-Bibliothek: Siehe Schritt 1 im vorherigen Artikel.
🎜Erstellen Sie ein periodisches Aufgabenmodul: siehe Schritt 2 im vorherigen Artikel. 🎜🎜🎜🎜rrreee🎜Fazit: 🎜Durch die Verwendung der APScheduler- oder Celery-Bibliothek können wir geplante Aufgaben und periodische Aufgaben problemlos in FastAPI-Anwendungen implementieren. Die oben bereitgestellten Codebeispiele können als Referenz verwendet werden, um Ihnen bei der schnellen Implementierung dieser Aufgabenfunktionen in Ihrem FastAPI-Projekt zu helfen. Obwohl es sich bei den oben genannten Beispielen um einfache Beispiele handelt, können Sie Ihre eigene Aufgabenlogik basierend auf diesen Beispielen weiter erweitern und anpassen. 🎜🎜Referenzmaterialien: 🎜🎜🎜Offizielle Dokumentation von APScheduler: https://apscheduler.readthedocs.io/🎜🎜Offizielle Dokumentation von Celery: https://docs.celeryproject.org/🎜🎜🎜 (Dieser Artikel dient bitte nur als Referenz Basieren Sie es auf der tatsächlichen Situation. Passen Sie es entsprechend an und modifizieren Sie es bei Bedarf)🎜.Das obige ist der detaillierte Inhalt vonSo implementieren Sie geplante und periodische Aufgaben in FastAPI. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!