AI之FastAPI+ollama调用嵌入模型OllamaBgeEmbeddings
AI之FastAPI+ollama调用嵌入模型OllamaBgeEmbeddings
·
以下是对该 FastAPI 代码的逐行解析和详细说明:
代码结构概览
from fastapi import Depends # 导入依赖注入模块
def get_embedder():
return OllamaBgeEmbeddings(
base_url="http://ollama-cluster:11434",
timeout=30,
max_retries=5
)
@app.post("/embed")
async def embed_text(
text: str,
embedder: OllamaBgeEmbeddings = Depends(get_embedder)
):
return {"embedding": embedder.embed_query(text)}
逐行代码解析
1. 依赖项定义
def get_embedder():
return OllamaBgeEmbeddings(
base_url="http://ollama-cluster:11434",
timeout=30,
max_retries=5
)
- 功能:创建并返回一个
OllamaBgeEmbeddings实例 - 关键参数:
base_url:Ollama 服务集群地址(可以是负载均衡器地址)timeout=30:请求超时时间(秒)max_retries=5:失败请求最大重试次数
- 设计意图:
将嵌入器的初始化逻辑封装为可复用的依赖项,实现配置集中管理
2. 路由定义
@app.post("/embed")
async def embed_text(
text: str,
embedder: OllamaBgeEmbeddings = Depends(get_embedder)
):
return {"embedding": embedder.embed_query(text)}
- 路由特性:
@app.post("/embed"):定义 POST 方法端点/embedasync def:声明异步处理函数(需配合异步客户端)
- 参数解析:
text: str:从请求体中获取的文本字段embedder=Depends(...):通过依赖注入获取嵌入器实例
- 返回值:
将嵌入结果包装为 JSON 格式返回
关键机制详解
1. 单例模式实现原理
- 默认行为:
FastAPI 的Depends()会在 每个请求 中调用get_embedder(),产生 新实例 - 实现真正单例:
需添加缓存装饰器(修正代码):from functools import lru_cache @lru_cache(maxsize=1) def get_embedder(): return OllamaBgeEmbeddings(...)lru_cache确保函数 只执行一次,后续调用直接返回缓存实例maxsize=1限制缓存单个实例
2. 依赖注入优势
| 特性 | 说明 |
|---|---|
| 解耦 | 路由函数无需关心嵌入器的初始化细节 |
| 可测试性 | 可轻松替换为 Mock 对象进行单元测试 |
| 生命周期管理 | 配合 FastAPI 的依赖注入系统管理资源(需结合上下文管理器实现更精细控制) |
| 配置集中化 | 所有相关路由共享同一配置源 |
3. 异步处理优化
- 当前代码问题:
若OllamaBgeEmbeddings使用同步requests库,在异步路由中会 阻塞事件循环 - 改进方案:
# 使用异步HTTP客户端(需修改OllamaBgeEmbeddings实现) import httpx class AsyncOllamaBgeEmbeddings: async def embed_query_async(self, text: str): async with httpx.AsyncClient() as client: response = await client.post(...)
生产环境增强建议
1. 添加健康检查
@app.get("/health")
def health_check(embedder: OllamaBgeEmbeddings = Depends(get_embedder)):
try:
embedder.embed_query("health check")
return {"status": "OK"}
except Exception as e:
raise HTTPException(500, detail=str(e))
2. 限流保护
from fastapi import FastAPI
from fastapi_limiter import FastAPILimiter
app = FastAPI()
FastAPILimiter.init(app, key_func=lambda: "global_limit")
@app.post("/embed")
@RateLimiter(times=100, minutes=1) # 允许每分钟100次请求
async def embed_text(...):
...
3. 性能监控集成
from prometheus_fastapi_instrumentator import Instrumentator
Instrumentator().instrument(app).expose(app)
完整优化版代码
from fastapi import FastAPI, Depends, HTTPException
from functools import lru_cache
from typing import Optional
import httpx
import logging
app = FastAPI()
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AsyncOllamaBgeEmbeddings:
def __init__(self, base_url: str, timeout: int = 30):
self.client = httpx.AsyncClient(base_url=base_url, timeout=timeout)
async def embed_query(self, text: str) -> Optional[list[float]]:
try:
response = await self.client.post(
"/api/embeddings",
json={"model": "bge-m3:latest", "prompt": text}
)
response.raise_for_status()
return response.json()["embedding"]
except Exception as e:
logger.error(f"Embedding failed: {str(e)}")
return None
@lru_cache(maxsize=1)
def get_embedder() -> AsyncOllamaBgeEmbeddings:
return AsyncOllamaBgeEmbeddings(
base_url="http://ollama-cluster:11434",
timeout=30
)
@app.post("/embed")
async def embed_text(
text: str,
embedder: AsyncOllamaBgeEmbeddings = Depends(get_embedder)
) -> dict:
if not text:
raise HTTPException(400, detail="Text cannot be empty")
embedding = await embedder.embed_query(text)
if not embedding:
raise HTTPException(500, detail="Embedding service error")
return {"embedding": embedding}
架构图示
客户端
│
└─▶ FastAPI 服务 (http://api-server:8000)
│
├─▶ /embed 端点 (POST)
│ │
│ └─▶ 依赖注入系统
│ │
│ └─▶ OllamaBgeEmbeddings 单例
│ │
│ └─▶ Ollama 集群 (http://ollama-cluster:11434)
│
└─▶ /health 端点 (GET) → 健康状态检测
通过这种设计,您可以获得:
- 高并发处理能力(依赖异步客户端)
- 资源高效利用(单例模式 + 连接池)
- 完善的错误处理和监控
- 易于扩展的微服务架构
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)