Fastapi+Celery实现异步回调
用户请求接口 /analyze,传入一个文本;FastAPI 处理后,用 Celery 异步任务 模拟调用大模型进行文本分析;分析完成后,调用用户提供的回调地址(比如 /callback)并把分析结果发回去。
这里写目录标题
场景简介(模拟大模型调用):
用户请求接口 /analyze,传入一个文本; FastAPI 处理后,用 Celery 异步任务 模拟调用大模型进行文本分析; 分析完成后,调用用户提供的回调地址(比如 /callback)并把分析结果发回去。
一、准备工作
安装依赖
我们先安装所需的包,使用 pip install(不加 sudo):
pip install fastapi uvicorn celery redis httpx
安装并启动redis
确保本地已经装好 redis 并且是启动状态(默认假设开启端口为6371)。
安装方法:
- 服务器直接安装
- 通过docker安装
二、FastAPI + Celery 项目结构
callback_project/
├── main.py # FastAPI 主程序
├── celery_worker.py # 启动 Celery worker 的入口
├── tasks.py # Celery 任务定义
└── test_client.py # 模拟客户端回调服务
└── test.py # 模拟客户端调用的测试脚本
注意:celery_worker.py也可以和tasks.py 合并。
三、项目代码
test_client.py
from fastapi import FastAPI
from fastapi.responses import JSONResponse
# 这是我们用来接收回调的“客户端”服务
callback_app = FastAPI()
@callback_app.post("/callback")
async def callback(data: dict):
print("收到回调结果:", data)
return JSONResponse(content={"status": "received"})
# 启动测试用 callback 服务(用 uvicorn)
# 另起一个终端执行:
# uvicorn test_client:callback_app --port 8001
main.py
from fastapi import FastAPI
from pydantic import BaseModel
from tasks import analyze_text_task
app = FastAPI()
class AnalyzeRequest(BaseModel):
text: str
callback_url: str
@app.post("/analyze")
async def analyze(request: AnalyzeRequest):
# 异步调用任务
analyze_text_task.delay(request.text, request.callback_url)
return {"message": "Analysis started"}
tasks.py
from celery import Celery
import time
import httpx
celery_app = Celery(
"worker",
broker="redis://127.0.0.1:6371/1",
backend="redis://localhost:6371/1"
)
@celery_app.task
def analyze_text_task(text, callback_url):
# 模拟调用大模型(这里用 sleep)
print(f"Start analyzing: {text}")
time.sleep(5) # 模拟耗时
result = {"summary": f"这段文本的长度是 {len(text)} 个字符。"}
# 发送回调
try:
httpx.post(callback_url, json=result)
except Exception as e:
print(f"Callback failed: {e}")
test.py
import requests
url = "http://localhost:8000/analyze"
payload = {
"text": "这是一个测试文本李昌奎中杰克曼去哪里",
"callback_url": "http://localhost:8001/callback"
}
headers = {
"Content-Type": "application/json"
}
response = requests.post(url, json=payload, headers=headers)
print("响应状态码:", response.status_code)
print("响应内容:", response.json())
"""
等价命令:
curl -X POST http://localhost:8000/analyze \
-H "Content-Type: application/json" \
-d '{"text": "这是一个测试文本", "callback_url": "http://localhost:8001/callback"}'
"""
四、测试流程
-
启动 Redis:redis-server --port 6371
-
启动回调服务器:uvicorn test_client:callback_app --port 8001
-
启动 Celery worker:celery -A tasks.celery_app worker --loglevel=info
-
启动主服务:uvicorn main:app --port 8000
-
用脚本或 curl 发起请求
-
查看 callback 服务是否打印出“收到回调结果”
注意:如果需要长期运行使用screen命令
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)