在 FastAPI 应用中,如果需要在异步路由中执行无限循环,直接使用 while True 可能会导致整个应用死锁,其他路由无法响应。这是因为异步函数在执行时,如果没有适当的让出控制权,会阻塞事件循环,导致 FastAPI 无法处理其他请求。下面将介绍两种避免死锁的解决方案。
FastAPI 提供了 BackgroundTasks 类,可以将耗时任务放入后台执行,从而避免阻塞主线程。这种方式非常适合处理无限循环任务。
from fastapi import FastAPI, BackgroundTasks import random app = FastAPI() @app.get("/hello") async def hello(): return {"Hello": "World"} @app.get("/normal") def route_normal(): while True: print({"route_normal": random.randint(0, 10)}) @app.get("/async") async def route_async(background_tasks: BackgroundTasks): def background_task(): while True: print({"route_async": random.randint(0, 10)}) background_tasks.add_task(background_task) return {"message": "Background task started"}
代码解释:
注意事项:
另一种解决方案是在无限循环中加入 asyncio.sleep(),让出控制权,允许事件循环处理其他任务。
import asyncio from fastapi import FastAPI import random app = FastAPI() @app.get("/hello") async def hello(): return {"Hello": "World"} @app.get("/normal") def route_normal(): while True: print({"route_normal": random.randint(0, 10)}) @app.get("/async") async def route_async(): while True: await asyncio.sleep(0) # do a sleep here so that the main thread can do its magic, at least once per loop, changing the sleep duration will allow the main thread to process other threads longer, please read up more on the specifics print({"route_async": random.randint(0, 10)})
代码解释:
注意事项:
在 FastAPI 异步路由中使用无限循环时,需要特别注意避免阻塞事件循环。使用 BackgroundTasks 可以将任务放入后台执行,而使用 asyncio.sleep() 可以让出控制权。选择哪种方案取决于具体的应用场景和需求。如果任务不需要立即返回结果,且对实时性要求不高,建议使用 BackgroundTasks。如果需要在循环中进行一些操作,且对实时性有一定的要求,可以使用 asyncio.sleep()。
以上就是解决 FastAPI 异步路由中无限循环导致的死锁问题的详细内容,更多请关注php中文网其它相关文章!
Copyright 2014-2025 //m.sbmmt.com/ All Rights Reserved | php.cn | 湘ICP备2023035733号