从 0.x 到 1.0:LangChain 生产级 Agent 实战指南
LangChain 1.0 于 2025 年 10 月 23 日正式发布,标志着 AI Agent 开发进入生产就绪阶段。本系统梳理从 0.x 到 1.0 的演进脉络、核心架构理念及实战应用。内容概括:从 Chain 设计转向 Agent 优先,引入中间件机制、统一抽象层静态/动态模型选择、工具调用优化、结构化输出策略中间件钩子、状态管理、流式输出、错误处理本指南适合 AI 工程师、架构师及技术决
一、核心摘要
核心观点: LangChain 1.0 于 2025 年 10 月 23 日正式发布,标志着 AI Agent 开发进入生产就绪阶段。本系统梳理从 0.x 到 1.0 的演进脉络、核心架构理念及实战应用。
内容概括:
- 版本演进: 从 Chain 设计转向 Agent 优先,引入中间件机制、统一抽象层
- 架构革新: 静态/动态模型选择、工具调用优化、结构化输出策略
- 生产级特性: 中间件钩子、状态管理、流式输出、错误处理
价值说明: 本指南适合 AI 工程师、架构师及技术决策者阅读,提供从设计到落地的完整路径。文中所有代码示例均基于 Python 3.10+ 环境。
二、LangChain 版本演进全景
2.1 版本时间线与里程碑
| 版本 | 发布时间 | 核心特征 | 定位 | 引用来源 |
|---|---|---|---|---|
| 0.1.x | 2023年初 | 简单 Chain 组合 | 实验阶段 | [7†] |
| 0.2.x | 2024年中 | Agent 原型出现 | 试生产 | [3†] |
| 0.3.x | 2025年初 | 多模态支持 | 工程化探索 | [3†] |
| 1.0.0 | 2025年10月23日 | 生产就绪架构 | 正式版 | [1†] |
版本演进逻辑: LangChain 经历了"实验验证 → 工程化探索 → 生产就绪"三个阶段。1.0 版本终结了此前架构碎片化局面,以"生产就绪"为核心定位,重新定义智能 Agent 开发范式 [1†]。
2.2 核心架构理念的变迁
0.x 时代的特征与局限
特征描述: 0.x 版本以 Chain 为核心设计理念,强调线性组合的执行流程。
主要局限:
- 僵化的 Chain 设计: 难以支持复杂的分支逻辑和动态决策 [6†]
- 缺少生产级控制: 缺乏中间件钩子,难以实现日志、监控、重试等企业级需求
- 模型切换重复工作: 不同模型间切换需要大量代码修改 [6†]
1.0 代的架构革新
核心变化: LangChain 1.0 彻底重构 Agent 开发范式,主要变化集中在 create_agent() 方法 [4†]:
- 统一 Agent 抽象: 提供标准化接口,支持多种执行模式
- 中间件架构: 提供 5 大钩子函数(before_model、after_model、wrap_model_call、wrap_tool_call、before_agentExecute)[2†]
- 静态/动态模型选择: 支持运行时根据上下文动态切换模型 [0†]
- 结构化输出优化: 引入 ToolStrategy 和 ProviderStrategy 策略模式 [9†]
架构对比:
| 维度 | 0.x 版本 | 1.0 版本 |
|---|---|---|
| 设计范式 | Chain 线性组合 | Agent 循环 + 中间件 |
| 模型管理 | 静态绑定 | 静态/动态选择 |
| 扩展性 | 有限,需修改源码 | 高,通过中间件钩子 |
| 生产级特性 | 缺乏完善的监控 | 内置日志、重试、限流 |
小结: 1.0 版本通过中间件机制和统一抽象,解决了 0.x 版本在工程化、生产可控性方面的不足,为构建企业级 Agent 系统奠定了基础。
三、LangChain 1.0 核心架构深度解析
3.1 系统架构总览
LangChain 1.0 的 Agent 系统采用分层架构设计:
┌─────────────────────────────────────────────┐
│ 用户接口层(UI/API) │
├─────────────────────────────────────────────┤
│ 中间件层(Middleware) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │日志监控 │ │权限控制 │ │错误处理 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
├─────────────────────────────────────────────┤
│ Agent 运行时层 │
│ ┌──────────────────────────────────────┐ │
│ │ 状态管理(State Management) │ │
│ │ 工具调度(Tool Orchestration) │ │
│ │ 模型接口(Model Interface) │ │
│ └──────────────────────────────────────┘ │
├─────────────────────────────────────────────┤
│ 工具层(Tools) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │搜索工具 │ │数据库工具 │ │自定义工具 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────────┘
架构说明:
- 用户接口层: 提供 REST API、gRPC、Streamlit 等多种交互方式
- 中间件层: 通过钩子函数实现横切关注点(日志、监控、权限等)
- Agent 运行时层: 核心执行引擎,管理状态、调度工具和模型
- 工具层: 可插拔的工具生态,支持 HTTP、数据库、第三方 API 等
3.2 核心组件详解
3.2.1 模型(Model)抽象层
LangChain 1.0 提供了统一的模型接口,支持多种模型提供商。
1. 静态模型配置
静态模型在创建 Agent 时一次性指定,执行期间保持不变:
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
# 方式1:使用模型标识符字符串
agent = create_agent(
model="gpt-4o", # 自动推断为 openai:gpt-4o
tools=[search_tool, get_weather],
system_prompt="You are a helpful assistant."
)
# 方式2:直接使用模型实例
model = ChatOpenAI(
model="gpt-4o",
temperature=0.1,
max_tokens=1000,
timeout=30
)
agent = create_agent(
model=model,
tools=[search_tool, get_weather]
)
2. 动态模型选择
动态模型允许在运行时根据状态和上下文自动切换:
from langchain.agents.middleware import wrap_model_call
basic_model = ChatOpenAI(model="gpt-4o-mini")
advanced_model = ChatOpenAI(model="gpt-4o")
@wrap_model_call
def dynamic_model_selection(request: ModelRequest, handler) -> ModelResponse:
"""根据对话复杂度选择模型"""
message_count = len(request.state["messages"])
if message_count > 10:
# 长对话使用高级模型
model = advanced_model
else:
# 短对话使用基础模型
model = basic_model
return handler(request.override(model=model))
agent = create_agent(
model=basic_model, # 默认模型
tools=[search_tool, get_weather],
middleware=[dynamic_model_selection]
)
应用场景: 动态模型选择适用于成本优化(简单任务用便宜模型)、性能优化(复杂任务用强模型)、多租户场景(不同用户用不同模型)等。
3.2.2 工具(Tools)系统
LangChain 1.0 的工具系统支持多种工具调用模式,提供了强大的灵活性。
1. 工具定义与注册
from langchain.tools import tool
# 方式1:使用装饰器定义工具
@tool
def search(query: str) -> str:
"""搜索信息"""
return f"Results for: {query}"
@tool
def get_weather(location: str) -> str:
"""获取天气信息"""
return f"Weather in {location}: Sunny, 72°F"
# 方式2:继承 BaseTool
from langchain.tools import BaseTool
class DatabaseTool(BaseTool):
name = "database_query"
description = "Execute database queries"
def _run(self, query: str) -> str:
# 执行数据库查询
return f"Result for query: {query}"
async def _arun(self, query: str) -> str:
return self._run(query)
# 注册工具到 Agent
agent = create_agent(
model="gpt-4o",
tools=[search, get_weather, DatabaseTool()]
)
2. 工具错误处理
from langchain.agents.middleware import wrap_tool_call
from langchain.messages import ToolMessage
@wrap_tool_call
def handle_tool_errors(request, handler):
"""处理工具执行错误"""
try:
return handler(request)
except Exception as e:
# 返回自定义错误消息给模型
return ToolMessage(
content=f"Tool error: Please check your input and try again. ({str(e)})",
tool_call_id=request.tool_call["id"]
)
agent = create_agent(
model="gpt-4o",
tools=[search, get_weather],
middleware=[handle_tool_errors]
)
3. 并行与串行工具调用
LangChain 1.0 支持智能调度多个工具调用:
# 串行工具调用
agent = create_agent(
model="gpt-4o",
tools=[search, get_weather, analyze_data]
)
# 并行工具调用(通过中间件实现)
from langchain.agents.middleware import ParallelToolExecution
parallel_executor = ParallelToolExecution(
tools=[search, get_weather],
max_concurrent=3
)
agent = create_agent(
model="gpt-4o",
tools=[analyze_data],
middleware=[parallel_executor]
)
3.2.3 中间件(Middleware)机制
中间件是 LangChain 1.0 的核心创新,提供了强大的扩展能力。
中间件生命周期钩子:
from langchain.agents.middleware import AgentMiddleware
class CustomMiddleware(AgentMiddleware):
"""自定义中间件"""
def before_model(self, state, runtime):
"""模型调用前钩子"""
# 可以修改 state、添加日志、权限检查等
print(f"Before model call, messages count: {len(state['messages'])}")
return None
def after_model(self, state, runtime, response):
"""模型调用后钩子"""
# 可以处理模型响应、添加监控等
print(f"After model call, response: {response}")
return None
def wrap_model_call(self, request, handler):
"""包装模型调用(替代 before_model + after_model)"""
# 自定义模型选择逻辑
return handler(request)
def wrap_tool_call(self, request, handler):
"""包装工具调用"""
# 自定义工具执行逻辑
return handler(request)
def before_agent_execute(self, state, runtime):
"""Agent 执行前钩子"""
return None
# 使用中间件
agent = create_agent(
model="gpt-4o",
tools=[search, get_weather],
middleware=[CustomMiddleware()]
)
内置中间件示例:
from langchain.agents.middleware import SummarizationMiddleware, HumanInTheLoopMiddleware
# 1. 摘要中间件(自动总结历史消息)
summarizationMiddleware = SummarizationMiddleware(
model="claude-sonnet-4-5-20250929",
trigger={"tokens": 1000} # 当消息超过1000个token时触发摘要
)
# 2. 人机循环中间件(允许人工介入审批)
humanInTheLoopMiddleware = HumanInTheLoopMiddleware(
interruptOn={
"sendEmail": {
"allowedDecisions": ["approve", "edit", "reject"]
}
}
)
agent = create_agent(
model="gpt-4o",
tools=[read_email, send_email],
middleware=[summarizationMiddleware, humanInTheLoopMiddleware]
)
3.3 状态管理(State Management)
LangChain 1.0 提供了灵活的状态管理机制,支持短期记忆和长期存储。
3.3.1 短期记忆(Short-term Memory)
Agent 自动维护对话历史作为短期记忆:
# 默认情况下,Agent 自动维护 messages 列表
result = agent.invoke({
"messages": [
{"role": "user", "content": "What's the weather in San Francisco?"}
]
})
# 查看对话历史
print(result["messages"])
3.3.2 自定义状态模式
from langchain.agents import AgentState
from typing import TypedDict
class CustomState(TypedDict):
"""自定义状态模式"""
messages: list # 必须包含 messages
user_preferences: dict # 自定义字段
session_id: str # 会话ID
# 通过中间件扩展状态
class StateMiddleware(AgentMiddleware):
state_schema = CustomState
def before_model(self, state: CustomState, runtime):
# 访问自定义状态字段
preferences = state.get("user_preferences", {})
print(f"User preferences: {preferences}")
return None
# 通过 state_schema 快速定义
agent = create_agent(
model="gpt-4o",
tools=[search, get_weather],
state_schema=CustomState
)
# 使用自定义状态
result = agent.invoke({
"messages": [{"role": "user", "content": "Hi"}],
"user_preferences": {"language": "zh", "style": "formal"},
"session_id": "session_123"
})
注意事项:
- 自定义状态模式必须扩展
AgentState(包含messages字段) - 使用 TypedDict 定义状态类型(不再支持 Pydantic 模型和数据类)[9†]
3.3.3 长期存储(Long-term Memory)
对于需要跨会话持久化的场景,可以使用外部存储:
from langchain.agents.memory import LongTermMemory
# 初始化长期存储
memory = LongTermMemory(
storage_type="redis", # 或 "mongodb", "postgresql"
connection_params={
"host": "localhost",
"port": 6379,
"db": 0
}
)
# 创建支持长期记忆的 Agent
agent = create_agent(
model="gpt-4o",
tools=[search, get_weather],
memory=memory # 传入 memory 参数
)
# 查询长期记忆
historical_context = memory.get_context(user_id="user_123")
3.4 结构化输出(Structured Output)
LangChain 1.0 提供了两种结构化输出策略,确保输出符合预期格式。
3.4.1 ToolStrategy(工具调用策略)
ToolStrategy 通过人工工具调用生成结构化输出,适用于所有支持工具调用的模型:
from pydantic import BaseModel
from langchain.agents import create_agent
from langchain.agents.structured_output import ToolStrategy
class ContactInfo(BaseModel):
name: str
email: str
phone: str
agent = create_agent(
model="gpt-4o-mini",
tools=[search_tool],
response_format=ToolStrategy(ContactInfo) # 使用 Tool 策略
)
result = agent.invoke({
"messages": [{
"role": "user",
"content": "Extract contact info from: John Doe, john@example.com, (555) 123-4567"
}]
})
print(result["structured_response"])
# 输出: ContactInfo(name='John Doe', email='john@example.com', phone='555-123-4567')
3.4.2 ProviderStrategy(提供商原生策略)
ProviderStrategy 使用模型原生的结构化输出生成,更可靠但仅支持支持该功能的提供商(如 OpenAI):
from langchain.agents.structured_output import ProviderStrategy
agent = create_agent(
model="gpt-4o", # 仅支持 ProviderStrategy 的模型
tools=[search_tool],
response_format=ProviderStrategy(ContactInfo)
)
result = agent.invoke({
"messages": [{
"role": "user",
"content": "Extract contact info from: John Doe, john@example.com, (555) 123-4567"
}]
})
重要提示:
- 在 LangChain 1.0 中,直接传递 schema(如
response_format=ContactInfo)不再支持,必须显式使用ToolStrategy或ProviderStrategy[9†] - 预绑定工具的模型(
model.bind_tools())在使用结构化输出时不支持,应传递工具列表给create_agent()[9†]
四、生产级实战案例
4.1 案例 1:Text-to-SQL 智能数据分析系统
场景描述: 构建一个将自然语言转换为 SQL 查询的智能数据分析系统,支持 RAG(检索增强生成)和数据库查询。
4.1.1 系统架构设计
┌─────────────────────────────────────────────┐
│ Streamlit 前端界面 │
├─────────────────────────────────────────────┤
│ FastAPI 后端服务 │
├─────────────────────────────────────────────┤
│ LangChain Agent (Text-to-SQL) │
│ ┌──────────────────────────────────────┐ │
│ │ 1. 检索相关表结构(Retriever) │ │
│ │ 2. 生成 SQL(LLM + Prompt) │ │
│ │ 3. 验证 SQL(安全检查) │ │
│ │ 4. 执行查询(Database) │ │
│ └──────────────────────────────────────┘ │
├─────────────────────────────────────────────┤
│ 向量数据库(Chroma) + 嵌入模型 │
└─────────────────────────────────────────────┘
4.1.2 完整实现代码
1. 环境准备
pip install langchain faiss-cpu chromadb \
fastapi uvicorn streamlit \
langchain-openai langchain-community \
sqlglot python-dotenv
2. 数据库初始化
# sample_db/create_sample_db.py
import sqlite3
import os
def create_sample_db():
"""创建示例数据库和表"""
os.makedirs("sample_db", exist_ok=True)
conn = sqlite3.connect("sample_db/sample.db")
cur = conn.cursor()
# 创建客户表
cur.execute("""CREATE TABLE IF NOT EXISTS customers (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT,
signup_date TEXT
)""")
# 创建订单表
cur.execute("""CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY,
customer_id INTEGER,
total_amount REAL,
status TEXT,
created_at TEXT,
FOREIGN KEY(customer_id) REFERENCES customers(id)
)""")
# 插入示例数据
customers = [
(1, "Alice Johnson", "alice@example.com", "2024-12-01"),
(2, "Bob Lee", "bob@example.com", "2024-12-05"),
(3, "Carol Singh", "carol@example.com", "2024-12-10"),
(4, "David Kim", "david.kim@example.com", "2024-12-12")
]
orders = [
(1, 1, 120.50, "completed", "2025-01-03", "First order"),
(2, 1, 15.00, "pending", "2025-01-07", "Gift wrap"),
(3, 2, 250.00, "completed", "2025-02-10", "Bulk order"),
(4, 3, 75.25, "completed", "2025-02-15", "Regular order")
]
cur.executemany("INSERT OR REPLACE INTO customers VALUES (?,?,?,?)", customers)
cur.executemany("INSERT OR REPLACE INTO orders VALUES (?,?,?,?,?,?)", orders)
conn.commit()
conn.close()
print("Sample database created successfully!")
if __name__ == "__main__":
create_sample_db()
3. 向量索引构建
# ingestion/index_sqlite.py
import sqlite3
import os
from tqdm import tqdm
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
def row_hash(values):
"""为行生成唯一哈希值"""
import hashlib
return hashlib.sha256("|".join(map(str, values)).encode()).hexdigest()
def row_to_text(table, cols, row):
"""将 SQLite 行记录转换为可读文本块"""
return f"Table: {table}\n" + "\n".join([f"{c}: {v}" for c, v in zip(cols, row)])
def index_table(conn, table):
"""将表索引进向量库"""
cur = conn.cursor()
cur.execute(f"PRAGMA table_info({table})")
cols = [c[1] for c in cur.fetchall()]
cur.execute(f"SELECT {','.join(cols)} FROM {table}")
rows = cur.fetchall()
embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
vectorstore = Chroma(
collection_name="sqlite_docs",
persist_directory="./chroma_persist",
embedding_function=embeddings
)
docs, ids, metas = [], [], []
for r in rows:
txt = row_to_text(table, cols, r)
pk = str(r[0])
hid = row_hash(r)
ids.append(f"{table}:{pk}")
docs.append(txt)
metas.append({"table": table, "pk": pk, "hash": hid})
vectorstore.add_texts(texts=docs, metadatas=metas, ids=ids)
print(f"Indexed table: {table}")
def main():
"""主索引流程"""
SQLITE_PATH = os.getenv("SQLITE_PATH", "sample_db/sample.db")
conn = sqlite3.connect(SQLITE_PATH)
cur = conn.cursor()
cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tables = [t[0] for t in cur.fetchall()]
for table in tqdm(tables, desc="Indexing tables"):
index_table(conn, table)
conn.close()
print("Indexing completed and persisted to Chroma!")
if __name__ == "__main__":
main()
4. LangChain Agent 实现
# server/langgraph_nodes.py
import re
from langchain_core.runnables import RunnableLambda
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
# 初始化检索器
embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
vectorstore = Chroma(
collection_name="sqlite_docs",
persist_directory="./chroma_persist",
embedding_function=embeddings
)
retriever = vectorstore.as_retriever()
async def retriever_node(state):
"""检索相关表结构和数据"""
docs = await retriever.ainvoke(state["question"])
state["retrieved_docs"] = [d.page_content for d in docs]
return state
def build_sql_prompt():
"""构建 SQL 生成提示词"""
from langchain_core.prompts import PromptTemplate
return PromptTemplate.from_template(
"""You are a SQL generator. Based on the context below, generate a single read-only SQLite SELECT query (no semicolon, no multiple statements).
Context:
{context}
Question:
{question}
Only return the SELECT SQL statement."""
)
def clean_llm_sql_output(text):
"""清洗 LLM 输出,仅保留 SELECT 语句"""
out = str(text or "").strip()
# 移除 Markdown 代码围栏
out = re.sub(r"```(?:sql)?\n?", "", out, flags=re.IGNORECASE).replace("```", "").strip()
# 提取 SELECT 语句
match = re.search(r"(select\b.*)", out, flags=re.IGNORECASE | re.DOTALL)
if match:
out = match.group(1).rstrip(";").strip()
return out
async def sql_generator_node(state):
"""生成 SQL 查询"""
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
prompt = build_sql_prompt()
context = "\n\n".join(state.get("retrieved_docs", []))
prompt_text = prompt.format(context=context, question=state["question"])
out = await llm.ainvoke(prompt_text)
sql = clean_llm_sql_output(out.content)
state["generated_sql"] = sql
return state
5. SQL 验证与执行
# server/sql_validator.py
import sqlglot
def validate_sql(sql, allowed_tables):
"""验证 SQL 安全性"""
ALLOWED_STATEMENTS = {"select"}
DISALLOWED = {"delete", "update", "insert", "drop", "alter"}
# 检查分号和多语句
if ";" in sql:
return False, "不允许分号或多语句"
# 检查禁用关键字
for kw in DISALLOWED:
if f"{kw}" in sql.lower():
return False, f"不允许的关键字: {kw}"
# 解析 SQL
try:
parsed = sqlglot.parse_one(sql, read="sqlite")
except Exception as e:
return False, f"SQL 解析错误: {e}"
# 检查语句类型
if parsed.key.lower() not in ALLOWED_STATEMENTS:
return False, "只允许 SELECT 语句"
# 检查表白名单
def extract_tables(s):
return set(re.findall(r"\bfrom\s+([a-zA-Z0-9_]+)", s, flags=re.IGNORECASE))
tables = extract_tables(sql)
if not tables.issubset(set(allowed_tables)):
return False, f"使用了不在白名单的表: {tables - set(allowed_tables)}"
return True, "ok"
# server/executor.py
import sqlite3
def execute_sql(sql, row_limit=1000):
"""执行 SQL 查询"""
def enforce_limit(q, limit):
q_low = q.lower()
if " limit " in q_low:
return q
return f"{q}LIMIT {int(limit)}"
def open_ro_conn():
conn = sqlite3.connect("sample_db/sample.db")
conn.execute("PRAGMA query_only = 1") # 只读模式
return conn
sql = enforce_limit(sql, row_limit)
conn = open_ro_conn()
conn.execute("PRAGMA busy_timeout = 5000") # 忙等待
cur = conn.cursor()
cur.execute(sql)
cols = [c[0] for c in cur.description] if cur.description else []
rows = cur.fetchmany(row_limit)
conn.close()
return cols, rows
6. FastAPI 后端服务
# server/main.py
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv
from server.langgraph_nodes import retriever_node, sql_generator_node
from server.sql_validator import validate_sql
from server.executor import execute_sql
load_dotenv()
app = FastAPI(title="RAG Text-to-SQL API")
class QueryRequest(BaseModel):
question: str
show_sql: bool = True
@app.post("/query")
async def query(req: QueryRequest):
"""运行完整 RAG→SQL→验证→执行流程"""
# 1. 检索相关文档
state = {"question": req.question, "messages": []}
state = await retriever_node(state)
# 2. 生成 SQL
state = await sql_generator_node(state)
sql = state["generated_sql"]
# 3. 验证 SQL
allowed_tables = ["customers", "orders"]
ok, reason = validate_sql(sql, allowed_tables)
if not ok:
raise HTTPException(status_code=400, detail=f"SQL 验证失败: {reason}. SQL: {sql}")
# 4. 执行查询
cols, rows = execute_sql(sql)
result = [dict(zip(cols, r)) for r in rows]
return {
"sql": sql if req.show_sql else None,
"cols": cols,
"rows": result
}
if __name__ == "__main__":
import uvicorn
uvicorn.app.run(app, host="0.0.0.0", port=8000)
7. Streamlit 前端界面
# app.py
import streamlit as st
import requests
API_URL = "http://localhost:8000/query"
st.set_page_config(page_title="RAG Text-to-SQL Demo", layout="centered")
st.title("🤖 RAG Text-to-SQL 智能分析系统")
with st.form("query_form"):
question = st.text_input(
"输入自然语言问题",
value="显示上个月注册的所有客户"
)
show_sql = st.checkbox("显示生成的 SQL", value=True)
submitted = st.form_submit_button("🚀 查询")
if submitted:
if not question.strip():
st.warning("请输入问题")
else:
with st.spinner("查询中..."):
try:
resp = requests.post(
API_URL,
json={"question": question, "show_sql": show_sql},
timeout=60
)
resp.raise_for_status()
data = resp.json()
if show_sql:
st.subheader("🧠 生成的 SQL")
st.code(data.get("sql", ""), language="sql")
st.subheader("📊 查询结果")
rows = data.get("rows", [])
if rows:
st.dataframe(rows)
else:
st.info("该查询未返回任何行")
except requests.exceptions.HTTPError as http_err:
st.error(f"HTTP 错误: {http_err} - {resp.text}")
except requests.exceptions.ConnectionError:
st.error("无法连接 API。请确认 FastAPI 在 localhost:8000 运行中。")
except requests.exceptions.Timeout:
st.error("请求超时,请稍后再试。")
except Exception as e:
st.error(f"意外错误: {e}")
8. 运行与部署
# 1. 创建数据库
python sample_db/create_sample_db.py
# 2. 构建向量索引
python ingestion/index_sqlite.py
# 3. 启动 FastAPI 后端
uvicorn server.main:app --reload --host 0.0.0.0 --port 8000
# 4. 启动 Streamlit 前端(在新终端)
streamlit run app.py --server.port 8501
4.1.3 系统特性与优化
1. 安全特性
- 只读 SQL 查询(禁止写操作)
- 表白名单验证
- SQL 注入防护(通过 sqlglot 解析)
2. 性能优化
- 向量检索加速查询相关表结构
- SQL 结果集限制(防止大量数据返回)
- 异步处理提高并发能力
3. 扩展方向
- 增量索引更新(监听数据库变化)
- 查询缓存(Redis 缓存热点查询)
- 多数据源支持(PostgreSQL、MySQL 等)
- 用户权限管理(不同用户不同表权限)
4.2 案例 2:多模态 RAG 智能问答系统
场景描述: 构建一个支持文本、图像、音频和 PDF 多模态文档的 RAG 系统,实现智能问答和知识检索。
4.2.1 系统架构
┌─────────────────────────────────────────────┐
│ Web 前端(React/Vue) │
├─────────────────────────────────────────────┤
│ API 网关(Nginx) │
├─────────────────────────────────────────────┤
│ LangChain Agent │
│ ┌──────────────────────────────────────┐ │
│ │ 多模态文档解析器 │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │文本解析│ │图像解析│ │音频解析│ │ │
│ │ └────────┘ └────────┘ └────────┘ │ │
│ │ 向量检索(FAISS/Qdrant) │ │
│ │ 生成回答(LLM + RAG) │ │
│ └──────────────────────────────────────┘ │
├─────────────────────────────────────────────┤
│ 文件存储(S3/OSS) + 元数据数据库 │
└─────────────────────────────────────────────┘
4.2.2 核心实现代码
1. 多模态文档解析器
# processors/multimodal_processor.py
from typing import List, Dict, Any
import pytesseract
from PIL import Image
import pdf2image
import speech_recognition as sr
from langchain.docstore.document import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
class MultiModalProcessor:
"""多模态文档处理器"""
def __init__(self):
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
def process_text(self, text: str, metadata: Dict[str, Any] = None) -> List[Document]:
"""处理文本文档"""
docs = self.text_splitter.split_text(text)
return [Document(page_content=doc, metadata=metadata or {}) for doc in docs]
def process_image(self, image_path: str, metadata: Dict[str, Any] = None) -> List[Document]:
"""处理图像文档(OCR)"""
try:
image = Image.open(image_path)
text = pytesseract.pytesseract.image_to_string(image)
docs = self.text_splitter.split_text(text)
return [Document(page_content=doc, metadata={**(metadata or {}), "source": image_path}) for doc in docs]
except Exception as e:
print(f"Error processing image {image_path}: {e}")
return []
def process_audio(self, audio_path: str, metadata: Dict[str, Any] = None) -> List[Document]:
"""处理音频文档(语音识别)"""
try:
r = sr.Recognizer()
with sr.AudioFile(audio_path) as source:
audio = r.record(source)
text = r.recognize_google(audio, language='zh-CN')
docs = self.text_splitter.split_text(text)
return [Document(page_content=doc, metadata={**(metadata or {}), "source": audio_path}) for doc in docs]
except Exception as e:
print(f"Error processing audio {audio_path}: {e}")
return []
def process_pdf(self, pdf_path: str, metadata: Dict[str, Any] = None) -> List[Document]:
"""处理 PDF 文档"""
try:
images = pdf2image.convertfrompath(pdf_path)
docs = []
for i, image in enumerate(images):
text = pytesseract.pytesseract.image_to_string(image)
docs.extend(self.text_splitter.split_text(text))
return [Document(page_content=doc, metadata={**(metadata or {}), "source": pdf_path}) for doc in docs]
except Exception as e:
print(f"Error processing PDF {pdf_path}: {e}")
return []
2. 向量检索与存储
# retrieval/vector_store.py
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from typing import List, Dict, Any
class VectorStoreManager:
"""向量存储管理器"""
def __init__(self):
self.embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
self.vectorstore = None
def add_documents(self, documents: List[Document]):
"""添加文档到向量库"""
if self.vectorstore is None:
self.vectorstore = FAISS.from_documents(
documents,
self.embeddings
)
else:
self.vectorstore.add_documents(documents)
def similarity_search(self, query: str, k: int = 4) -> List[Document]:
"""相似性检索"""
if self.vectorstore is None:
return []
return self.vectorstore.similarity_search(query, k=k)
def save(self, path: str):
"""保存向量库"""
if self.vectorstore:
self.vectorstore.save_local(path)
def load(self, path: str):
"""加载向量库"""
self.vectorstore = FAISS.load_local(path, self.embeddings)
3. LangChain Agent 实现
# agents/multimodal_agent.py
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
class MultiModalRAGAgent:
"""多模态 RAG Agent"""
def __init__(self, vector_store):
self.vector_store = vector_store
self.llm = ChatOpenAI(model="gpt-4o")
self.agent = self._create_agent()
def _create_agent(self):
"""创建 Agent"""
prompt = PromptTemplate.from_messages([
("system", "你是一个智能助手,基于多模态文档库回答用户问题。请基于检索到的上下文准确回答。"),
("human", "{question}")
])
return create_agent(
model=self.llm,
tools=[self._retrieval_tool],
system_prompt="You are a helpful assistant that answers questions based on retrieved context.",
middleware=[
self._context_injection_middleware
]
)
async def _retrieval_tool(self, query: str) -> str:
"""检索工具"""
docs = self.vector_store.similarity_search(query, k=5)
context = "\n\n".join([doc.page_content for doc in docs])
return context
def _context_injection_middleware(self, request, handler):
"""上下文注入中间件"""
# 检索相关文档
docs = self.vector_store.similarity_search(request.state["question"], k=5)
context = "\n\n".join([doc.page_content for doc in docs])
# 将上下文注入到 state 中
request.state["retrieved_context"] = context
return handler(request)
def ask(self, question: str) -> str:
"""提问"""
return self.agent.invoke({
"messages": [{"role": "user", "content": question}]
})
4. 完整系统组装
# main.py
import os
from processors.multimodal_processor import MultiModalProcessor
from retrieval.vector_store import VectorStoreManager
from agents.multimodal_agent import MultiModalRAGAgent
def main():
# 初始化处理器
processor = MultiModalProcessor()
# 处理文档
all_documents = []
# 处理文本文件
text_files = ["docs/intro.txt", "docs/guide.md"]
for file_path in text_files:
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read()
docs = processor.process_text(text, metadata={"source": file_path})
all_documents.extend(docs)
# 处理图像文件
image_files = ["docs/screenshot1.png", "docs/diagram.jpg"]
for file_path in image_files:
docs = processor.process_image(file_path, metadata={"source": file_path})
all_documents.extend(docs)
# 处理 PDF 文件
pdf_files = ["docs/manual.pdf", "docs/report.pdf"]
for file_path in pdf_files:
docs = processor.process_pdf(file_path, metadata={"source": file_path})
all_documents.extend(docs)
# 初始化向量库
vector_store = VectorStoreManager()
vector_store.add_documents(all_documents)
vector_store.save("./vector_store")
# 创建 Agent
agent = MultiModalRAGAgent(vector_store)
# 交互式问答
print("🤖 多模态 RAG 系统已就绪,可以开始提问了!")
while True:
question = input("👤 你: ")
if question.lower() in ["quit", "exit"]:
break
response = agent.ask(question)
print(f"🤖 助手: {response}")
if __name__ == "__main__":
main()
4.2.3 系统特性
1. 多模态支持
- 文本、图像、音频、 PDF 统一处理
- OCR 识别提取图像和 PDF 中的文字
- 语音识别转换音频为文字
2. 智能检索
- 语义相似度检索
- 支持混合检索(关键词 + 语义)
- 检索结果排序和过滤
3. 生产级特性
- 文档增量更新
- 查询缓存
- 日志记录和监控
4.3 案例 3:智能客服 Agent 系统
场景描述: 构建一个支持多渠道(网页、微信、邮件)的智能客服系统,能够自动回答用户问题、工单分类、智能路由。
4.3.1 系统架构
┌─────────────────────────────────────────────┐
│ 多渠道接入(Web/WeChat/Email) │
├─────────────────────────────────────────────┤
│ 消息路由与标准化 │
├─────────────────────────────────────────────┤
│ LangChain Agent │
│ ┌──────────────────────────────────────┐ │
│ │ 意图识别(Intent Classification) │ │
│ │ 实体抽取(Entity Extraction) │ │
│ │ 知识检索(FAQ/RAG) │ │
│ │ 工单生成(Ticket Creation) │ │
│ └──────────────────────────────────────┘ │
├─────────────────────────────────────────────┤
│ 知识库(向量库) + CRM 系统 │
└─────────────────────────────────────────────┘
4.3.2 核心实现
1. 多渠道消息接入
# channels/message_router.py
from typing import Dict, Any, Union
from dataclasses import dataclass
@dataclass
class Message:
"""消息对象"""
id: str
channel: str # web, wechat, email
user_id: str
content: str
metadata: Dict[str, Any] = None
class MessageRouter:
"""消息路由器"""
def __init__(self):
self.handlers = {
"web": self._handle_web_message,
"wechat": self._handle_wechat_message,
"email": self._handle_email_message
}
def route(self, message: Message) -> Dict[str, Any]:
"""路由消息到对应处理器"""
handler = self.handlers.get(message.channel)
if not handler:
raise ValueError(f"Unsupported channel: {message.channel}")
return handler(message)
def _handle_web_message(self, message: Message) -> Dict[str, Any]:
"""处理网页消息"""
return {
"user_id": message.user_id,
"content": message.content,
"channel": "web",
"metadata": message.metadata
}
def _handle_wechat_message(self, message: Message) -> Dict[str, Any]:
"""处理微信消息"""
# 微信消息特殊处理
content = self._clean_wechat_content(message.content)
return {
"user_id": message.user_id,
"content": content,
"channel": "wechat",
"metadata": message.metadata
}
def _handle_email_message(self, message: Message) -> Dict[str, Any]:
"""处理邮件消息"""
# 邮件解析
subject = message.metadata.get("subject", "")
body = message.content
return {
"user_id": message.user_id,
"content": f"Subject: {subject}\n{body}",
"channel": "email",
"metadata": message.metadata
}
2. 意图识别与实体抽取
# agents/intent_agent.py
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import List, Dict, Any
class IntentResult(BaseModel):
"""意图识别结果"""
intent: str = Field(description="意图类别")
confidence: float = Field(description="置信度")
entities: Dict[str, Any] = Field(description="实体信息")
class IntentAgent:
"""意图识别 Agent"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o")
self.agent = self._create_agent()
def _create_agent(self):
"""创建意图识别 Agent"""
from langchain.tools import tool
@tool
def classify_intent(query: str) -> str:
"""分类用户意图"""
# 意图类别:咨询、投诉、建议、技术支持、订单查询等
return self.llm.invoke(f"Classify the user's intent into one of: [inquiry, complaint, suggestion, technical_support, order_inquiry]. User query: {query}")
@tool
def extract_entities(query: str, intent: str) -> Dict[str, Any]:
"""抽取实体信息"""
# 根据不同意图抽取不同实体
if intent == "order_inquiry":
return self.llm.invoke(f"Extract order number and product name from: {query}")
elif intent == "technical_support":
return self.llm.invoke(f"Extract issue description and error message from: {query}")
else:
return {}
return create_agent(
model=self.llm,
tools=[classify_intent, extract_entities],
system_prompt="You are an intent classification and entity extraction assistant."
)
def process(self, query: str) -> IntentResult:
"""处理用户查询"""
# 调用 Agent 识别意图和抽取实体
result = self.agent.invoke({"messages": [{"role": "user", "content": query}]})
return IntentResult(
intent=result["intent"],
confidence=result["confidence"],
entities=result["entities"]
)
3. 知识检索与回答生成
# agents/knowledge_agent.py
from langchain.agents import create_agent
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
class KnowledgeAgent:
"""知识检索 Agent"""
def __init__(self, knowledge_base_path: str = "./knowledge_base"):
self.embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
self.vectorstore = FAISS.load_local(knowledge_base_path, self.embeddings)
self.llm = ChatOpenAI(model="gpt-4o")
self.agent = self._create_agent()
def _create_agent(self):
"""创建知识检索 Agent"""
from langchain.tools import tool
@tool
def search_knowledge(query: str) -> str:
"""检索知识库"""
docs = self.vectorstore.similarity_search(query, k=5)
context = "\n\n".join([doc.page_content for doc in docs])
return context
return create_agent(
model=self.llm,
tools=[search_knowledge],
system_prompt="You are a helpful customer service assistant. Answer questions based on the retrieved knowledge base."
)
def answer_question(self, query: str, intent: str, entities: Dict[str, Any]) -> str:
"""回答用户问题"""
# 根据意图和实体定制回答
context = f"Intent: {intent}\nEntities: {entities}\nQuery: {query}"
response = self.agent.invoke({
"messages": [{"role": "user", "content": context}]
})
return response
4. 工单管理系统
# systems/ticket_system.py
from dataclasses import dataclass
from typing import List, Dict, Any
from datetime import datetime
@dataclass
class Ticket:
"""工单对象"""
id: str
user_id: str
channel: str
intent: str
content: str
status: str # open, in_progress, resolved, closed
priority: str # low, medium, high, urgent
created_at: datetime
updated_at: datetime
assigned_to: str = None
metadata: Dict[str, Any] = None
class TicketSystem:
"""工单管理系统"""
def __init__(self):
self.tickets: Dict[str, Ticket] = {}
self.ticket_counter = 1
def create_ticket(self, user_id: str, channel: str, intent: str, content: str, metadata: Dict[str, Any] = None) -> Ticket:
"""创建工单"""
ticket_id = f"TKT-{self.ticket_counter:06d}"
self.ticket_counter += 1
ticket = Ticket(
id=ticket_id,
user_id=user_id,
channel=channel,
intent=intent,
content=content,
status="open",
priority=self._calculate_priority(intent),
created_at=datetime.now(),
updated_at=datetime.now(),
metadata=metadata or {}
)
self.tickets[ticket_id] = ticket
return ticket
def _calculate_priority(self, intent: str) -> str:
"""根据意图计算优先级"""
priority_map = {
"complaint": "high",
"technical_support": "medium",
"order_inquiry": "low",
"suggestion": "low"
}
return priority_map.get(intent, "medium")
def update_ticket(self, ticket_id: str, **kwargs) -> Ticket:
"""更新工单"""
ticket = self.tickets.get(ticket_id)
if not ticket:
raise ValueError(f"Ticket not found: {ticket_id}")
for key, value in kwargs.items():
if hasattr(ticket, key):
setattr(ticket, key, value)
ticket.updated_at = datetime.now()
return ticket
def assign_ticket(self, ticket_id: str, agent_id: str):
"""分配工单"""
self.update_ticket(ticket_id, assigned_to=agent_id, status="in_progress")
def resolve_ticket(self, ticket_id: str):
"""解决工单"""
self.update_ticket(ticket_id, status="resolved")
def close_ticket(self, ticket_id: str):
"""关闭工单"""
self.update_ticket(ticket_id, status="closed")
def get_tickets_by_user(self, user_id: str) -> List[Ticket]:
"""获取用户的所有工单"""
return [t for t in self.tickets.values() if t.user_id == user_id]
def get_open_tickets(self) -> List[Ticket]:
"""获取所有打开的工单"""
return [t for t in self.tickets.values() if t.status in ["open", "in_progress"]]
5. 完整系统组装
# main.py
import asyncio
from channels.message_router import MessageRouter, Message
from agents.intent_agent import IntentAgent
from agents.knowledge_agent import KnowledgeAgent
from systems.ticket_system import TicketSystem
class CustomerServiceSystem:
"""智能客服系统"""
def __init__(self):
self.message_router = MessageRouter()
self.intent_agent = IntentAgent()
self.knowledge_agent = KnowledgeAgent()
self.ticket_system = TicketSystem()
async def handle_message(self, message: Message):
"""处理用户消息"""
# 1. 路由和标准化消息
routed_message = self.message_router.route(message)
# 2. 意图识别和实体抽取
intent_result = self.intent_agent.process(routed_message["content"])
# 3. 生成回答
response = self.knowledge_agent.answer_question(
routed_message["content"],
intent_result.intent,
intent_result.entities
)
# 4. 如果是复杂问题,创建工单
if intent_result.intent in ["complaint", "technical_support"] and intent_result.confidence > 0.8:
ticket = self.ticket_system.create_ticket(
user_id=routed_message["user_id"],
channel=routed_message["channel"],
intent=intent_result.intent,
content=routed_message["content"],
metadata=intent_result.entities
)
response += f"\n\n已为您创建工单 {ticket.id},我们会尽快处理。"
return response
async def main():
"""主函数"""
system = CustomerServiceSystem()
# 模拟用户消息
messages = [
Message(id="1", channel="web", user_id="user_123", content="我的订单什么时候到货?"),
Message(id="2", channel="wechat", user_id="user_456", content="产品无法使用,需要帮助"),
Message(id="3", channel="email", user_id="user_789", content="建议增加新功能")
]
for msg in messages:
response = await system.handle_message(msg)
print(f"用户: {msg.content}")
print(f"助手: {response}")
print("-" * 50)
if __name__ == "__main__":
asyncio.run(main())
4.3.3 系统特性
1. 多渠道接入
- 统一消息接口
- 不同渠道消息格式标准化
- 渠道特定处理逻辑
2. 智能意图识别
- 自动分类用户意图
- 实体信息抽取
- 置信度评估
3. 知识检索与问答
- RAG 检索 FAQ 知识库
- 上下文感知回答生成
- 多轮对话支持
4. 工单管理
- 自动工单创建
- 优先级计算
- 工单状态跟踪
- 人工分配与处理
五、迁移指南:从 0.x 到 1.0
5.1 核心变更对照表
| 变更点 | 0.x 版本 | 1.0 版本 | 迁移建议 |
|---|---|---|---|
| Agent 创建 | create_react_agent() |
create_agent() |
函数名变更,参数调整 [4†] |
| 导入路径 | @langchain/langgraph/prebuilts |
langchain |
导入路径变更 |
| 系统提示 | prompt 参数 |
system_prompt 参数 |
参数名变更 |
| 模型绑定 | model.bind_tools() |
不支持预绑定 | 传递工具列表给 create_agent() [9†] |
| 状态管理 | Pydantic 模型 | TypedDict | 使用 TypedDict 定义状态 [9†] |
| 结构化输出 | response_format=Schema |
response_format=ToolStrategy(Schema) |
使用策略模式 [9†] |
| 中间件 | 不支持 | 全面支持 | 迁移到中间件机制 |
| 动态提示 | 不支持 | @dynamic_prompt 装饰器 |
使用中间件实现动态提示 |
5.2 迁移步骤与最佳实践
步骤 1:更新依赖包
pip install --upgrade langchain==1.0.0 \
langchain-openai==1.0.0 \
langchain-community==1.0.0
步骤 2:更新 Agent 创建代码
# 0.x 版本
from langchain.agents import create_react_agent
agent = create_react_agent(
model=model,
tools=tools,
prompt="You are a helpful assistant." # prompt 参数
)
# 1.0 版本
from langchain.agents import create_agent
agent = create_agent(
model=model,
tools=tools,
system_prompt="You are a helpful assistant." # system_prompt 参数
)
步骤 3:迁移中间件逻辑
# 0.x 版本:使用函数钩子
def before_model_hook(state):
# 自定义逻辑
pass
agent = create_react_agent(
model=model,
tools=tools,
prompt="You are a helpful assistant.",
callbacks=[before_model_hook] # 使用回调
)
# 1.0 版本:使用中间件
from langchain.agents.middleware import AgentMiddleware
class CustomMiddleware(AgentMiddleware):
def before_model(self, state, runtime):
# 自定义逻辑
return None
agent = create_agent(
model=model,
tools=tools,
system_prompt="You are a helpful assistant.",
middleware=[CustomMiddleware()] # 使用中间件
)
步骤 4:更新状态管理
# 0.x 版本:使用 Pydantic 模型
from pydantic import BaseModel
class CustomState(BaseModel):
messages: list
user_data: dict
# 1.0 版本:使用 TypedDict
from typing import TypedDict
class CustomState(TypedDict):
messages: list
user_data: dict
步骤 5:处理工具调用变更
# 0.x 版本:预绑定工具
model = ChatOpenAI(model="gpt-4o")
model = model.bind_tools([search_tool, get_weather])
agent = create_agent(model=model, tools=[])
# 1.0 版本:传递工具列表
agent = create_agent(
model="gpt-4o", # 传递模型标识符或实例
tools=[search_tool, get_weather] # 传递工具列表
)
5.3 兼容性注意事项
- Python 版本要求: LangChain 1.0 要求 Python 3.10 或更高版本 [13†]
- 删除的 API: 某些 0.x 版本的 API 已被移除,如
prompted output[9†] - 默认行为变更: 如
create_agent不再接受预绑定工具,需要传递工具列表 [9†] - 中间件优先级: 中间件执行顺序影响行为,需要仔细测试
六、生产级部署与运维
6.1 系统部署架构
┌─────────────────────────────────────────────┐
│ 负载均衡器(Nginx) │
├─────────────────────────────────────────────┤
│ API 网关(Kong/APISIX) │
├─────────────────────────────────────────────┤
│ LangChain Agent 服务(多实例) │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │Agent 1 │ │Agent 2 │ │Agent N │ │
│ └────────┘ └────────┘ └────────┘ │
├─────────────────────────────────────────────┤
│ 消息队列(Redis/RabbitMQ) │
├─────────────────────────────────────────────┤
│ 数据库(PostgreSQL/MongoDB) │
└─────────────────────────────────────────────┘
6.2 容器化部署
Dockerfile
FROM python:3.10-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "server.main:app", "--host", "0.0.0.0", "--port", "8000"]
docker-compose.yml
version: '3.8'
services:
agent-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DATABASE_URL=${DATABASE_URL}
depends_on:
- redis
- postgres
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=langchain_db
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
redis_data:
postgres_data:
6.3 监控与日志
1. 日志配置
# logging_config.py
import logging
import sys
from pythonjsonlogger import jsonlogger
def setup_logging():
"""配置结构化日志"""
logHandler = logging.StreamHandler(sys.stdout)
logHandler.setFormatter(jsonlogger.JsonFormatter())
logger = logging.getLogger()
logger.addHandler(logHandler)
logger.setLevel(logging.INFO)
return logger
# 在应用中使用
from logging_config import setup_logging
logger = setup_logging()
2. 性能监控
# monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time
# 定义指标
REQUEST_COUNT = Counter('agent_requests_total', 'Total number of requests', ['method', 'endpoint'])
REQUEST_DURATION = Histogram('agent_request_duration_seconds', 'Request duration')
ACTIVE_AGENTS = Gauge('agent_active_instances', 'Number of active agent instances')
def track_request(func):
"""请求跟踪装饰器"""
def wrapper(*args, **kwargs):
start_time = time.time()
try:
return func(*args, **kwargs)
finally:
duration = time.time() - start_time
REQUEST_DURATION.observe(duration)
return wrapper
# 使用示例
@track_request
def handle_request(request):
# 处理请求
pass
3. 健康检查
# healthcheck.py
from fastapi import FastAPI
from typing import Dict, Any
app = FastAPI()
@app.get("/health")
async def health_check() -> Dict[str, Any]:
"""健康检查端点"""
return {
"status": "healthy",
"timestamp": str(datetime.now()),
"version": "1.0.0"
}
@app.get("/ready")
async def readiness_check() -> Dict[str, Any]:
"""就绪检查"""
# 检查依赖服务连接
db_connected = check_database_connection()
redis_connected = check_redis_connection()
if db_connected and redis_connected:
return {"status": "ready"}
else:
raise HTTPException(status_code=503, detail="Service not ready")
6.4 扩展性与性能优化
1. 水平扩展
- 无状态设计: Agent 服务设计为无状态,便于水平扩展
- 负载均衡: 使用 Nginx 或云服务商的负载均衡器分发请求
- 自动伸缩: 根据 CPU 使用率、请求队列长度等指标自动调整实例数量
2. 缓存策略
# caching/redis_cache.py
import redis
import json
from typing import Any, Optional
class RedisCache:
"""Redis 缓存客户端"""
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
def get(self, key: str) -> Optional[Any]:
"""获取缓存"""
value = self.redis.get(key)
if value:
return json.loads(value)
return None
def set(self, key: str, value: Any, ttl: int = 3600):
"""设置缓存"""
self.redis.setex(key, ttl, json.dumps(value))
def delete(self, key: str):
"""删除缓存"""
self.redis.delete(key)
# 使用示例
cache = RedisCache("redis://localhost:6379")
def cached_agent_invoke(agent_id: str, query: str):
"""带缓存的 Agent 调用"""
cache_key = f"agent:{agent_id}:query:{hash(query)}"
cached_result = cache.get(cache_key)
if cached_result:
return cached_result
result = agent.invoke(query)
cache.set(cache_key, result, ttl=1800) # 缓存30分钟
return result
3. 异步处理
# async_processing.py
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncAgentProcessor:
"""异步 Agent 处理器"""
def __init__(self, agent, max_workers: int = 10):
self.agent = agent
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def process_batch(self, queries: list) -> list:
"""批量处理查询"""
loop = asyncio.get_event_loop()
# 提交到线程池执行
tasks = [
loop.run_in_executor(self.executor, self.agent.invoke, q)
for q in queries
]
results = await asyncio.gather(*tasks)
return results
# 使用示例
processor = AsyncAgentProcessor(agent)
results = await processor.process_batch([
"What's the weather in SF?",
"Search for AI news",
"Analyze the data"
])
七、总结与展望
7.1 核心要点回顾
-
版本演进: LangChain 1.0 从 Chain 设计转向 Agent 优先,引入中间件机制和统一抽象,为生产级应用奠定基础 [1†]
-
架构革新: 静态/动态模型选择、工具调用优化、结构化输出策略等核心特性,显著提升了系统的灵活性和可控性 [0†]
-
中间件机制: 提供 5 大钩子函数,支持日志、监控、权限控制、错误处理等横切关注点的统一管理 [2†]
-
生产级特性: 状态管理、流式输出、错误处理、性能优化等企业级功能,满足生产环境的复杂需求
7.2 实践建议
-
渐进式迁移: 从小范围试点开始,逐步迁移核心业务到 1.0 版本,避免大规模重构风险
-
中间件优先: 充分利用中间件机制实现日志、监控、权限等通用功能,避免代码重复
-
安全第一: 严格验证工具调用、SQL 查询等外部输入,防止注入攻击和数据泄露
-
性能监控: 建立完善的监控体系,实时跟踪系统性能、错误率和用户体验
-
测试覆盖: 编写单元测试、集成测试和端到端测试,确保系统稳定性和功能正确性
7.3 未来展望
LangChain 1.0 的发布标志着 AI Agent 开发进入新阶段。未来发展方向包括:
-
更智能的决策: 结合强化学习、多模态融合等技术,提升 Agent 的智能水平和决策能力
-
更强的协作: 多 Agent 协作、Agent 与人类协同工作,形成更复杂的智能系统
-
更广的连接: 深度集成企业系统(ERP、CRM、OA 等),实现业务流程全自动化
-
更好的体验: 自然语言交互、多模态交互、个性化定制,提升用户体验
通过本指南的学习和实践,相信您能够构建出稳定、高效、可扩展的生产级 AI Agent 系统,为业务创造价值。
完整示例代码仓库: 本指南所有案例代码已整理在 GitHub 仓库中,供参考和学习使用。
引用来源: 本指南基于 LangChain 官方文档、技术博客和开源项目案例编写,确保内容的准确性和时效性。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)