异步流式服务中的取消处理与数据持久化一致性方案
摘要: 本文描述了在FastAPI+SSE的LLM流式对话服务中,客户端断开导致已生成响应丢失的问题。原代码仅在流式结束后保存数据,但异常中断(如CancelledError)会跳过保存环节。首次尝试捕获异常直接保存时,因请求级DB会话失效导致失败。最终解决方案采用:1)独立后台任务执行保存操作;2)创建全新DB会话避免资源冲突;3)加入重试机制处理连接异常。部署后验证显示,中断场景下部分响应内容
异步流式服务中的取消处理与数据持久化一致性方案
1. 问题背景
在基于 FastAPI 和 Server-Sent Events (SSE) 的 LLM 流式对话服务中,观察到当客户端在生成过程中断开连接(Client Disconnection)或发生服务端超时(Server-side Timeout)时,已生成的 Assistant 响应内容未能持久化至 PostgreSQL 数据库。
故障现象:
- 数据库中存在用户提问记录,但对应的 Assistant 回复缺失。
- GKE 日志显示,在异常中断场景下,正常的
Streaming finished和数据保存逻辑未被执行。

可见,切换窗口后中间的消息丢失了
检查数据库
ai的回复没存下来 
2. 初始分析
通过分析 src/services/chat_service.py 的原始代码,我们发现数据保存逻辑仅存在于流式生成完成之后:
# 原始逻辑示意
try:
llm_stream = llm_service.llm.astream(chat_history)
async for chunk in llm_stream:
full_response_content += chunk.content
yield f"data: {chunk.content}\n\n"
# 只有当循环正常结束,才会执行到这里
logger.info("Streaming finished.")
if full_response_content:
# 保存到数据库
await message_dao.create_message(db, ...)
except Exception as e:
logger.exception(...)
原因推断:
如果客户端(前端 UI)在流式传输过程中断开连接(例如用户刷新页面、关闭标签页),或者 LLM 生成超时,FastAPI/Starlette 会抛出 asyncio.CancelledError,导致协程在 async for 循环中被立即中断。因此,代码永远无法执行到循环后的保存逻辑,导致已生成的部分内容丢失。
3. 第一次尝试:捕获 CancelledError
为了解决这个问题,我们尝试捕获 asyncio.CancelledError 并在捕获块中执行保存操作。
修改后的代码结构:
except asyncio.CancelledError:
logger.warning(f"Stream cancelled...")
if full_response_content:
# 尝试使用当前的 db session 保存
await message_dao.create_message(db, ...)
raise
遇到的新问题:
部署后,我们在日志中观察到了新的错误:sqlalchemy.dialects.postgresql.asyncpg.InterfaceError: cannot call Transaction.rollback(): the underlying connection is closed
原因分析:
当 CancelledError 发生时,FastAPI 框架正在取消当前的请求处理任务(Task)。
- 任务被取消导致与其关联的依赖资源(如通过
Depends(get_db_session)获取的dbSession)开始清理或已经失效。 - 此时在
except块中尝试复用这个已经处于“半关闭”或“被取消”状态的 Session 进行数据库操作,会引发InterfaceError。 - 此外,在已取消的 Task 中直接
await耗时操作本身就是不安全的,可能会被再次取消。
4. 最终解决方案:独立任务 + 新 Session + 重试机制
为了彻底解决问题,我们采取了以下策略:
- 使用后台任务 (
asyncio.create_task):将保存操作从当前(正在被取消的)请求上下文中剥离,放入一个新的后台 Task 中执行。这确保了即使主请求结束,保存逻辑也能继续运行。 - 创建全新 Session:不再复用请求级别的
dbSession,而是使用AsyncSessionFactory()手动创建一个全新的、干净的数据库连接。 - 引入重试机制:为了防止在这个时间窗口内连接池出现竞态条件(例如拿到了一个正好被 Engine 清理的连接),我们添加了简单的重试逻辑。
最终代码实现 (src/services/chat_service.py):
from sqlalchemy.exc import InterfaceError, OperationalError
from src.configs.db import AsyncSessionFactory
async def save_partial_response_task(conversation_id: int, content: str):
"""
后台任务:在流被中断时保存部分回复。
创建一个全新的 DB session,并包含重试机制以处理连接竞态条件。
"""
for attempt in range(3):
try:
# 关键点:手动创建新 Session,与原请求隔离
async with AsyncSessionFactory() as session:
assistant_message_to_save = MessageCreateSchema(
conversation_id=conversation_id,
role="assistant",
content=content,
)
await message_dao.create_message(session, message=assistant_message_to_save)
logger.info(f"Saved partial assistant response in background task: conv={conversation_id} len={len(content)}")
return # 成功则退出
except (InterfaceError, OperationalError, OSError) as e:
# 如果遇到连接关闭等错误,进行重试
if attempt < 2:
logger.warning(f"Failed to save partial response (attempt {attempt + 1}), retrying: {e}")
await asyncio.sleep(0.1)
else:
logger.error(f"Failed to save partial response after {attempt + 1} attempts: {e}")
except Exception as e:
logger.error(f"Failed to save partial response: {e}")
break
async def stream_chat_response(request: ChatRequest, llm_service: LLMService, db: AsyncSession):
# ... (初始化代码) ...
full_response_content = ""
response_saved = False
try:
# ... (流式生成循环) ...
except asyncio.CancelledError:
# 捕获客户端断开
logger.warning(f"Stream cancelled (client disconnected)...")
if full_response_content and not response_saved:
# 关键点:将保存操作放入后台任务,不 await 它
asyncio.create_task(save_partial_response_task(request.conversation_id, full_response_content))
raise # 重新抛出异常以让框架正确处理取消
except asyncio.TimeoutError:
# 处理 LLM 超时
# ... 类似逻辑,使用后台任务保存 ...
5. 验证结果
部署修复版本后,我们在 GKE 日志中成功观察到了预期的行为:
- 触发断开:日志显示
Stream cancelled (client disconnected) for conversation 8... - 后台保存成功:紧接着显示
Saved partial assistant response in background task: conv=8 len=2007
数据成功写入数据库,彻底解决了因网络中断或超时导致的消息丢失问题。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)