以下是对该 FastAPI 代码的逐行解析和详细说明:


代码结构概览

from fastapi import Depends  # 导入依赖注入模块

def get_embedder():
    return OllamaBgeEmbeddings(
        base_url="http://ollama-cluster:11434",
        timeout=30,
        max_retries=5
    )

@app.post("/embed")
async def embed_text(
    text: str,
    embedder: OllamaBgeEmbeddings = Depends(get_embedder)
):
    return {"embedding": embedder.embed_query(text)}

逐行代码解析

1. 依赖项定义
def get_embedder():
    return OllamaBgeEmbeddings(
        base_url="http://ollama-cluster:11434",
        timeout=30,
        max_retries=5
    )
  • 功能:创建并返回一个 OllamaBgeEmbeddings 实例
  • 关键参数
    • base_url:Ollama 服务集群地址(可以是负载均衡器地址)
    • timeout=30:请求超时时间(秒)
    • max_retries=5:失败请求最大重试次数
  • 设计意图
    将嵌入器的初始化逻辑封装为可复用的依赖项,实现配置集中管理

2. 路由定义
@app.post("/embed")
async def embed_text(
    text: str,
    embedder: OllamaBgeEmbeddings = Depends(get_embedder)
):
    return {"embedding": embedder.embed_query(text)}
  • 路由特性
    • @app.post("/embed"):定义 POST 方法端点 /embed
    • async def:声明异步处理函数(需配合异步客户端)
  • 参数解析
    • text: str:从请求体中获取的文本字段
    • embedder=Depends(...):通过依赖注入获取嵌入器实例
  • 返回值
    将嵌入结果包装为 JSON 格式返回

关键机制详解

1. 单例模式实现原理
  • 默认行为
    FastAPI 的 Depends() 会在 每个请求 中调用 get_embedder(),产生 新实例
  • 实现真正单例
    需添加缓存装饰器(修正代码):
    from functools import lru_cache
    
    @lru_cache(maxsize=1)
    def get_embedder():
        return OllamaBgeEmbeddings(...)
    
    • lru_cache 确保函数 只执行一次,后续调用直接返回缓存实例
    • maxsize=1 限制缓存单个实例

2. 依赖注入优势
特性 说明
解耦 路由函数无需关心嵌入器的初始化细节
可测试性 可轻松替换为 Mock 对象进行单元测试
生命周期管理 配合 FastAPI 的依赖注入系统管理资源(需结合上下文管理器实现更精细控制)
配置集中化 所有相关路由共享同一配置源

3. 异步处理优化
  • 当前代码问题
    OllamaBgeEmbeddings 使用同步 requests 库,在异步路由中会 阻塞事件循环
  • 改进方案
    # 使用异步HTTP客户端(需修改OllamaBgeEmbeddings实现)
    import httpx
    
    class AsyncOllamaBgeEmbeddings:
        async def embed_query_async(self, text: str):
            async with httpx.AsyncClient() as client:
                response = await client.post(...)
    

生产环境增强建议

1. 添加健康检查
@app.get("/health")
def health_check(embedder: OllamaBgeEmbeddings = Depends(get_embedder)):
    try:
        embedder.embed_query("health check")
        return {"status": "OK"}
    except Exception as e:
        raise HTTPException(500, detail=str(e))
2. 限流保护
from fastapi import FastAPI
from fastapi_limiter import FastAPILimiter

app = FastAPI()
FastAPILimiter.init(app, key_func=lambda: "global_limit")

@app.post("/embed")
@RateLimiter(times=100, minutes=1)  # 允许每分钟100次请求
async def embed_text(...):
    ...
3. 性能监控集成
from prometheus_fastapi_instrumentator import Instrumentator

Instrumentator().instrument(app).expose(app)

完整优化版代码

from fastapi import FastAPI, Depends, HTTPException
from functools import lru_cache
from typing import Optional
import httpx
import logging

app = FastAPI()

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class AsyncOllamaBgeEmbeddings:
    def __init__(self, base_url: str, timeout: int = 30):
        self.client = httpx.AsyncClient(base_url=base_url, timeout=timeout)

    async def embed_query(self, text: str) -> Optional[list[float]]:
        try:
            response = await self.client.post(
                "/api/embeddings",
                json={"model": "bge-m3:latest", "prompt": text}
            )
            response.raise_for_status()
            return response.json()["embedding"]
        except Exception as e:
            logger.error(f"Embedding failed: {str(e)}")
            return None

@lru_cache(maxsize=1)
def get_embedder() -> AsyncOllamaBgeEmbeddings:
    return AsyncOllamaBgeEmbeddings(
        base_url="http://ollama-cluster:11434",
        timeout=30
    )

@app.post("/embed")
async def embed_text(
    text: str,
    embedder: AsyncOllamaBgeEmbeddings = Depends(get_embedder)
) -> dict:
    if not text:
        raise HTTPException(400, detail="Text cannot be empty")
    
    embedding = await embedder.embed_query(text)
    if not embedding:
        raise HTTPException(500, detail="Embedding service error")
    
    return {"embedding": embedding}

架构图示

客户端
│
└─▶ FastAPI 服务 (http://api-server:8000)
    │
    ├─▶ /embed 端点 (POST)
    │   │
    │   └─▶ 依赖注入系统
    │       │
    │       └─▶ OllamaBgeEmbeddings 单例
    │           │
    │           └─▶ Ollama 集群 (http://ollama-cluster:11434)
    │
    └─▶ /health 端点 (GET) → 健康状态检测

通过这种设计,您可以获得:

  • 高并发处理能力(依赖异步客户端)
  • 资源高效利用(单例模式 + 连接池)
  • 完善的错误处理和监控
  • 易于扩展的微服务架构
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐