FastAPI 中的後台任務執行
在FastAPI 中,開發者可能會遇到需要在後台執行某個函數的場景,該函數與他們的API。為此,需要採用線程機制。
解決方案 1:在 uvicorn.run 之前啟動線程
一種方法是在呼叫 uvicorn.run 之前啟動線程,因為 uvicorn.run 會阻塞線程。這種方法可以如下實現:
import time import threading from fastapi import FastAPI import uvicorn app = FastAPI() class BackgroundTasks(threading.Thread): def run(self,*args,**kwargs): while True: print('Hello') time.sleep(5) if __name__ == '__main__': t = BackgroundTasks() t.start() uvicorn.run(app, host="0.0.0.0", port=8000)
或者,開發人員可以利用 FastAPI 的啟動事件在應用程式啟動之前啟動執行緒:
@app.on_event("startup") async def startup_event(): t = BackgroundTasks() t.start()
解決方案2:事件排程程式
另一個選項是為下列物件實作一個重複事件排程器後台任務:
import sched, time from threading import Thread from fastapi import FastAPI import uvicorn app = FastAPI() s = sched.scheduler(time.time, time.sleep) def print_event(sc): print("Hello") sc.enter(5, 1, print_event, (sc,)) def start_scheduler(): s.enter(5, 1, print_event, (s,)) s.run() @app.on_event("startup") async def startup_event(): thread = Thread(target=start_scheduler) thread.start() if __name__ == '__main__': uvicorn.run(app, host="0.0.0.0", port=8000)
解決方案3:使用事件循環
對於非同步後台任務(async def函數),開發人員可以將當前事件循環與asyncio.create_task() 函數:
from asyncio import create_task from fastapi import FastAPI app = FastAPI() @app.on_event("startup") async def startup_task(): await create_task(print_task(5))
在這種方法中,事件循環在啟動時建立uvicorn 伺服器。
以上是如何在FastAPI中執行後台任務?的詳細內容。更多資訊請關注PHP中文網其他相關文章!