LangChain 1.1 版本“中间件“特性解读与实战运用
LangChain 1.1版本引入了强大的中间件(Middleware)特性,为Agent开发提供了灵活的生命周期管理。笔者对该特性进行了详细解读,并在实战案例中进行讲解
LangChain v1.1.0 于 2025 年 11 月 25 日发布,中间件(Middleware)作为
create_agent的核心特性,为 Agent 开发带来了前所未有的灵活性和可扩展性。
本文将深入解读中间件机制,并通过实战示例帮助你快速上手。
一、什么是中间件?
中间件(Middleware) 是 LangChain v1 引入的核心抽象,它允许开发者在 Agent 执行流程的各个阶段插入自定义逻辑,实现上下文工程(Context Engineering)——在正确的时间将正确的信息传递给模型。
中间件的核心价值:
- 动态提示词管理:根据上下文动态调整系统提示
- 对话历史压缩:自动摘要过长的对话历史
- 工具访问控制:根据用户权限选择性暴露工具
- 状态管理:跨执行周期维护自定义状态
- 安全护栏:输入输出验证、PII 脱敏、内容审核
二、中间件钩子详解
LangChain 中间件提供 6 个核心钩子,覆盖 Agent 执行的完整生命周期:
| 钩子 | 执行时机 | 典型用例 |
|---|---|---|
before_agent |
Agent 调用前 | 加载记忆、验证输入 |
before_model |
每次 LLM 调用前 | 更新提示词、裁剪消息 |
wrap_model_call |
包裹 LLM 调用 | 拦截/修改请求和响应 |
wrap_tool_call |
包裹工具调用 | 拦截/修改工具执行 |
after_model |
每次 LLM 响应后 | 验证输出、应用护栏 |
after_agent |
Agent 完成后 | 保存结果、清理资源 |
三、内置中间件一览
3.1 SummarizationMiddleware - 对话历史摘要
当对话历史接近 Token 上限时,自动使用 LLM 压缩旧消息,保留近期上下文。
from langchain.agents import create_agent
from langchain.agents.middleware import SummarizationMiddleware
agent = create_agent(
model="gpt-4o",
tools=[weather_tool, calculator_tool],
middleware=[
SummarizationMiddleware(
model="gpt-4o-mini", # 用于生成摘要的模型
trigger={"tokens": 4000}, # 触发摘要的 Token 阈值
keep={"messages": 20}, # 保留最近 20 条消息
),
],
)
v1.1 增强:支持基于模型 Profile 的灵活触发点配置,实现上下文感知的摘要策略。
3.2 PIIMiddleware - PII 敏感信息处理
检测并处理个人身份信息(PII),支持脱敏(redact)、掩码(mask)、阻断(block)等策略。
from langchain.agents.middleware import PIIMiddleware
agent = create_agent(
model="claude-sonnet-4-5-20250929",
tools=[read_email, send_email],
middleware=[
# 脱敏邮箱地址
PIIMiddleware("email", strategy="redact", apply_to_input=True),
# 使用正则检测并阻断电话号码
PIIMiddleware(
"phone_number",
detector=r"(?:\+?\d{1,3}[\s.-]?)?(?:\(?\d{2,4}\)?[\s.-]?)?\d{3,4}[\s.-]?\d{4}",
strategy="block"
),
]
)
3.3 HumanInTheLoopMiddleware - 人机协作审批
对敏感工具调用暂停执行,等待人工审批。
from langchain.agents.middleware import HumanInTheLoopMiddleware
agent = create_agent(
model="claude-sonnet-4-5-20250929",
tools=[read_email, send_email],
middleware=[
HumanInTheLoopMiddleware(
interrupt_on={
"send_email": {
"description": "请审核此邮件后再发送",
"allowed_decisions": ["approve", "edit", "reject"]
}
}
)
]
)
3.4 TodoListMiddleware - 任务规划
为 Agent 提供任务规划和跟踪能力,自动注入 write_todos 工具和相关系统提示。
from langchain.agents.middleware import TodoListMiddleware
agent = create_agent(
model="gpt-4o",
tools=[read_file, write_file, run_tests],
middleware=[TodoListMiddleware()],
)
适用场景:
- 需要跨多个工具协调的复杂多步骤任务
- 需要进度可见性的长时间运行操作
3.5 ModelRetryMiddleware - 模型调用重试(v1.1 新增)
自动重试失败的模型调用,支持可配置的指数退避策略,提升 Agent 可靠性。
from langchain.agents.middleware import ModelRetryMiddleware
agent = create_agent(
model="gpt-4o",
tools=[...],
middleware=[
ModelRetryMiddleware(
max_retries=3,
backoff_factor=2.0, # 指数退避因子
),
],
)
3.6 OpenAI Content Moderation - 内容审核(v1.1 新增)
使用 OpenAI 的审核端点检测和处理不安全内容,支持检查用户输入、模型输出和工具结果。
from langchain.agents.middleware import OpenAIModerationMiddleware
agent = create_agent(
model="openai:gpt-4o",
tools=[search_tool, database_tool],
middleware=[
OpenAIModerationMiddleware(
model="openai:gpt-4o",
moderation_model="omni-moderation-latest",
check_input=True, # 检查用户输入
check_output=True, # 检查模型输出
exit_behavior="end",
),
],
)
四、自定义中间件实战
4.1 基础结构
自定义中间件需要继承 AgentMiddleware 类并实现所需的钩子方法:
from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware, AgentState
from typing import Any
class MyCustomMiddleware(AgentMiddleware):
def before_model(self, state: AgentState, runtime) -> dict[str, Any] | None:
"""在每次模型调用前执行"""
print(f"即将调用模型,当前消息数: {len(state['messages'])}")
return None # 返回 None 表示不修改状态
def after_model(self, state: AgentState, runtime) -> dict[str, Any] | None:
"""在每次模型响应后执行"""
print("模型调用完成")
return None
4.2 实战案例:调用计数器中间件
实现一个跟踪模型调用次数并在超限时终止的中间件:
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.agents.middleware import AgentState, AgentMiddleware
from typing_extensions import NotRequired
from typing import Any
# 扩展状态 Schema
class CustomState(AgentState):
model_call_count: NotRequired[int]
user_id: NotRequired[str]
class CallCounterMiddleware(AgentMiddleware[CustomState]):
"""调用计数器中间件:限制模型调用次数"""
state_schema = CustomState
def __init__(self, max_calls: int = 10):
self.max_calls = max_calls
def before_model(self, state: CustomState, runtime) -> dict[str, Any] | None:
count = state.get("model_call_count", 0)
if count >= self.max_calls:
print(f"已达到最大调用次数 {self.max_calls},终止执行")
return {"jump_to": "end"} # 跳转到结束节点
return None
def after_model(self, state: CustomState, runtime) -> dict[str, Any] | None:
current_count = state.get("model_call_count", 0)
return {"model_call_count": current_count + 1}
# 使用中间件
agent = create_agent(
model="gpt-4o",
middleware=[CallCounterMiddleware(max_calls=5)],
tools=[],
)
# 调用时传入自定义状态
result = agent.invoke({
"messages": [HumanMessage("你好,请介绍一下自己")],
"model_call_count": 0,
"user_id": "user-123",
})
4.3 实战案例:基于用户等级的动态模型选择
根据用户专业等级动态切换模型和工具:
from dataclasses import dataclass
from typing import Callable
from langchain_openai import ChatOpenAI
from langchain.agents.middleware import AgentMiddleware, ModelRequest
from langchain.agents.middleware.types import ModelResponse
@dataclass
class UserContext:
user_expertise: str = "beginner" # beginner | expert
class ExpertiseBasedMiddleware(AgentMiddleware):
"""根据用户等级动态选择模型和工具"""
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse]
) -> ModelResponse:
user_level = request.runtime.context.user_expertise
if user_level == "expert":
# 专家用户:更强大的模型 + 高级工具
model = ChatOpenAI(model="gpt-4o")
tools = [advanced_search, data_analysis, code_execution]
else:
# 初学者:轻量模型 + 基础工具
model = ChatOpenAI(model="gpt-4o-mini")
tools = [simple_search, basic_calculator]
return handler(request.override(model=model, tools=tools))
agent = create_agent(
model="gpt-4o",
tools=[simple_search, advanced_search, basic_calculator, data_analysis, code_execution],
middleware=[ExpertiseBasedMiddleware()],
context_schema=UserContext
)
五、中间件组合与执行顺序
多个中间件按照列表顺序依次执行,形成洋葱模型:
agent = create_agent(
model="gpt-4o",
tools=[...],
middleware=[
PIIMiddleware("email", strategy="redact"), # 第 1 层
SummarizationMiddleware(model="gpt-4o-mini"), # 第 2 层
HumanInTheLoopMiddleware(interrupt_on={...}), # 第 3 层
CallCounterMiddleware(max_calls=10), # 第 4 层
],
)
执行流程:
- 进入阶段:按顺序执行
before_*钩子(1→2→3→4) - 核心执行:模型调用 / 工具调用
- 退出阶段:按逆序执行
after_*钩子(4→3→2→1)
六、最佳实践
- 从简单开始:先使用静态提示词和工具,仅在必要时添加动态特性
- 增量测试:每次只添加一个中间件,验证其行为
- 监控性能:跟踪模型调用次数、Token 使用量和延迟
- 善用内置中间件:优先使用
SummarizationMiddleware、PIIMiddleware等成熟方案 - 区分瞬态与持久化:
- 瞬态(Transient):
before_model中的消息裁剪仅影响当前调用 - 持久化(Persistent):
SummarizationMiddleware会永久更新状态
- 瞬态(Transient):
七、完整项目案例
本项目取自B站 九天Hector ℓ DeepSeek-V3.2+LangChain 1.1智能体开发实战
项目开源地址(笔者开源版,非原版):https://gitee.com/ye_sheng0839/agentic-rag
核心代码解释
from typing import List, Optional, Tuple
from langchain.tools import tool
from langchain.agents import create_agent
from langchain_core.messages import AIMessage, ToolMessage
from langchain_core.documents import Document
from app.core.config import get_llm
from app.services.file_service import FileService
from app.schemas.api_schemas import DocSource
class AgentService:
@staticmethod
def chat_with_agent(query: str, kb_name: Optional[str], top_k: int) -> Tuple[str, List[DocSource]]:
"""
Agentic RAG 主流程:
1. 动态加载 Metadata
2. 动态生成 System Prompt
3. 动态绑定 VectorStore Tool
"""
llm = get_llm()
tools = []
# 默认 System Prompt
system_context = "你是一名乐于助人的AI助手,请直接回答用户的问题。用户可以上传文档,你会基于用户上传的文档知识进行回答。"
# === 中间件逻辑:如果有知识库,则注入上下文 ===
if kb_name:
vector_store = FileService.load_vector_store(kb_name)
if vector_store:
# 1. 读取元数据,构建动态 Prompt
metadata = FileService.load_kb_metadata(kb_name)
topics = metadata.get("topics", [])
topics_str = "、".join(topics) if topics else "通用文档"
system_context = (
f"你是一名基于知识库【{kb_name}】的智能助手。\n"
f"该知识库主要包含以下主题内容:**{topics_str}**。\n"
"当用户的问题涉及到上述内容或细节时,请务必调用 retrieve_context 工具检索信息来回答。\n"
"如果问题与知识库无关(例如闲聊),请用你的通用知识回答,并简要告知用户该问题超出了当前知识库范围。"
)
# 2. 定义绑定了当前 vector_store 的工具
@tool(response_format="content_and_artifact")
def retrieve_context(search_query: str):
"""Retrieve information to help answer a query."""
# 使用 with_score 是为了给前端提供置信度,虽然 LLM 主要看 content
docs_and_scores = vector_store.similarity_search_with_score(search_query, k=top_k)
# 序列化给 LLM 看 (仅文本)
serialized = "\n\n".join(
(f"Source: {doc.metadata}\nContent: {doc.page_content}")
for doc, score in docs_and_scores
)
# 构造 Artifact (包含分数,给前端用)
artifacts = []
for doc, score in docs_and_scores:
# 兼容处理:确保 artifact 里存的是易于解析的对象或原始 Document
# 这里我们存原始 Document 对象,稍后在外部解析
# 为了携带 score,我们动态给 doc 加个属性,或者封装一下
doc.metadata["score"] = float(score) # 将分数注入 metadata 方便携带
artifacts.append(doc)
return serialized, artifacts
tools = [retrieve_context]
# === 创建 Agent ===
# 使用 create_agent (LangChain 1.1 标准)
agent = create_agent(llm, tools, system_prompt=system_context)
# === 执行 Agent ===
messages = [{"role": "user", "content": query}]
response = agent.invoke({"messages": messages})
# === 解析结果 ===
# 从 response['messages'] 中提取最终回答和 Artifact
final_answer = ""
sources = []
if "messages" in response:
msg_list = response["messages"]
# 1. 获取最后一条 AI 回复
last_msg = msg_list[-1]
if isinstance(last_msg, AIMessage):
final_answer = last_msg.content
# 2. 遍历获取 ToolMessage 中的 Artifact
for msg in msg_list:
if isinstance(msg, ToolMessage) and msg.artifact:
for doc in msg.artifact:
if isinstance(doc, Document):
# 从 metadata 中取出我们刚才塞进去的 score
score = doc.metadata.get("score", 0.0)
sources.append(DocSource(
content=doc.page_content,
metadata=doc.metadata,
score=score
))
return final_answer, sources
@staticmethod
def recall_test(kb_name: str, query: str, top_k: int) -> List[DocSource]:
"""
召回测试 (不走 Agent,直接查向量库)
"""
vector_store = FileService.load_vector_store(kb_name)
if not vector_store:
raise ValueError(f"Knowledge base '{kb_name}' not found.")
docs_and_scores = vector_store.similarity_search_with_score(query, k=top_k)
results = []
for doc, score in docs_and_scores:
results.append(DocSource(
content=doc.page_content,
metadata=doc.metadata,
score=float(score)
))
return results
模块一:动态上下文注入 (Dynamic Context Injection)
核心概念:拒绝千篇一律的 System Prompt,根据知识库内容实时调整 AI 人设。
在 chat_with_agent 函数的开头,我们并没有使用写死的 Prompt,而是引入了一个"中间件"逻辑:
# === 中间件逻辑 ===
if kb_name:
# 1. 实时读取磁盘上的元数据 (metadata.json)
metadata = FileService.load_kb_metadata(kb_name)
topics = metadata.get("topics", [])
# 2. 动态组装 System Prompt
system_context = (
f"你是一名基于知识库【{kb_name}】的智能助手。\n"
f"该知识库主要包含以下主题内容:**{topics_str}**。\n"
"..."
)
- 技术解读:
- 按需加载:只有当用户指定了
kb_name时,系统才会去加载对应的向量库和元数据。 - Prompt 模板化:利用 Python 的 f-string,将提取到的
topics(如"薪资管理"、“API接口”)嵌入到系统提示词中。 - 价值:这让 Agent 具备了元认知能力。它知道自己"懂什么",从而在回答问题时更自信,或者在遇到无关问题时能准确拒绝。
- 按需加载:只有当用户指定了
模块二:运行时工具绑定 (Runtime Tool Binding)
核心概念:利用 Python 的闭包 (Closure) 特性,为每一次对话创建专属的检索工具。
请注意 retrieve_context 函数定义的位置——它是在 chat_with_agent 函数内部定义的,而不是全局定义的。
# 在函数内部定义 Tool
@tool(response_format="content_and_artifact")
def retrieve_context(search_query: str):
# 这里直接使用了外部作用域的 vector_store 变量
docs_and_scores = vector_store.similarity_search_with_score(search_query, k=top_k)
# ...
- 技术解读:
- 闭包机制:这个 Tool 捕获了当前请求上下文中的
vector_store和top_k参数。 - 隔离性:用户 A 的请求会生成一个绑定了 A 知识库的 Tool;用户 B 的请求会生成另一个。两者互不干扰,即使并发执行也不会串库。
- 动态挂载:
tools = [retrieve_context]这一行是在运行时决定的。如果没有知识库,tools就是空的,Agent 自动退化为普通聊天模式。
- 闭包机制:这个 Tool 捕获了当前请求上下文中的
模块三:双轨制数据流 (Content & Artifact)
核心概念:LangChain 1.1 的杀手级特性,解决"AI 看的内容"与"前端展示的内容"需求不一致的问题。
我们在 @tool 装饰器中指定了 response_format="content_and_artifact",这是实现引用透明化的关键。
# 1. 给 LLM 看的 (Serialized):纯文本,省 Token,易理解
serialized = "\n\n".join(f"Source: {doc.metadata}\nContent: {doc.page_content}" ...)
# 2. 给前端看的 (Artifact):原始对象,带分数,结构化
artifacts = []
for doc, score in docs_and_scores:
doc.metadata["score"] = float(score) # 注入置信度分数
artifacts.append(doc)
return serialized, artifacts
- 技术解读:
- Content (serialized):这是喂给大模型的上下文。我们去掉了不必要的干扰信息,只保留文本和必要的元数据,帮助模型生成答案。
- Artifact (artifacts):这是"副作用"数据。大模型看不到这个列表,但 LangChain 会把它保留在
ToolMessage中。我们利用它将 相关性评分 (Score) 和 原始文档对象 透传给前端,用于渲染"引用来源"卡片。
模块四:标准化执行与解析 (Standard Execution & Parsing)
核心概念:遵循 LangChain 标准协议,精准提取多模态输出。
最后是 Agent 的执行和结果提取环节,这里体现了后端开发的严谨性。
# 1. 标准化创建 Agent
agent = create_agent(llm, tools, system_prompt=system_context)
# 2. 执行并获取完整消息历史
response = agent.invoke({"messages": messages})
# 3. 解析逻辑
if "messages" in response:
msg_list = response["messages"]
# 提取回答:最后一条消息通常是 AI 的回答
last_msg = msg_list[-1]
# 提取引用:遍历寻找 ToolMessage 中的 artifact 字段
for msg in msg_list:
if isinstance(msg, ToolMessage) and msg.artifact:
# ... 转换为前端所需的 DocSource 格式 ...
- 技术解读:
create_agent:这是 LangChain 0.2/0.3 (1.1+ API) 推荐的工厂函数,它屏蔽了底层的 Prompt 拼装细节(如AgentScratchPad)。- 消息回溯:由于 Agent 可能进行多轮思考(虽然 RAG 通常是一轮),我们需要遍历
messages列表来找回 Tool 的执行结果(即artifact)。这是获取检索来源最准确的方式,比正则匹配文本要可靠得多。
八、总结
LangChain 1.1 的中间件系统为 Agent 开发提供了强大的扩展能力:
| 特性 | 说明 |
|---|---|
| 可组合性 | 多个中间件可自由组合,各司其职 |
| 生命周期钩子 | 6 个钩子覆盖完整执行流程 |
| 状态扩展 | 自定义状态 Schema 支持跨调用数据传递 |
| 生产就绪 | 内置中间件经过充分测试,可直接用于生产环境 |
无论是实现对话历史压缩、敏感信息脱敏、人机协作审批,还是构建复杂的动态上下文工程策略,中间件都能帮助你以优雅、可维护的方式实现目标。
参考资料
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)