在web开发过程当中,总会需要一种情况,就是用户发送完请求之后只需要确认自己已经发送了指令,并不着急等待最终结果的返回,仅仅需要一个响应状态,如果这个时候程序要等待执行完毕后返回结果,肯定是不合理的,因此fastapi提供一种自己的后台程序处理方法。
BackgroundTasks
代码案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_notification(email: str, message=""): with open("log.txt", mode="w") as email_file: content = f"notification for {email}: {message}" email_file.write(content)
@app.post("/send-notification/{email}") async def send_notification(email: str, background_tasks: BackgroundTasks): background_tasks.add_task(write_notification, email, message="some notification") return {"message": "Notification sent in the background"}
|
结合依赖注入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| from typing import Optional
from fastapi import BackgroundTasks, Depends, FastAPI
app = FastAPI()
def write_log(message: str): with open("log.txt", mode="a") as log: log.write(message)
def get_query(background_tasks: BackgroundTasks, q: Optional[str] = None): if q: message = f"found query: {q}\n" background_tasks.add_task(write_log, message) return q
@app.post("/send-notification/{email}") async def send_notification(email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)): message = f"message to {email}\n" background_tasks.add_task(write_log, message) return {"message": "Message sent"}
|