在web开发过程当中,总会需要一种情况,就是用户发送完请求之后只需要确认自己已经发送了指令,并不着急等待最终结果的返回,仅仅需要一个响应状态,如果这个时候程序要等待执行完毕后返回结果,肯定是不合理的,因此fastapi提供一种自己的后台程序处理方法。
BackgroundTasks
代码案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
   | from fastapi import BackgroundTasks, FastAPI
  app = FastAPI()
 
  def write_notification(email: str, message=""):     with open("log.txt", mode="w") as email_file:         content = f"notification for {email}: {message}"         email_file.write(content)
 
  @app.post("/send-notification/{email}") async def send_notification(email: str, background_tasks: BackgroundTasks):     background_tasks.add_task(write_notification, email, message="some notification")      return {"message": "Notification sent in the background"}
   | 
 
结合依赖注入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
   | from typing import Optional
  from fastapi import BackgroundTasks, Depends, FastAPI
  app = FastAPI()
 
  def write_log(message: str):     with open("log.txt", mode="a") as log:         log.write(message)
 
  def get_query(background_tasks: BackgroundTasks, q: Optional[str] = None):     if q:         message = f"found query: {q}\n"         background_tasks.add_task(write_log, message)     return q
 
  @app.post("/send-notification/{email}") async def send_notification(email: str,                              background_tasks: BackgroundTasks,                              q: str = Depends(get_query)):     message = f"message to {email}\n"     background_tasks.add_task(write_log, message)     return {"message": "Message sent"}
   |