LangChain v1.1.0 于 2025 年 11 月 25 日发布,中间件(Middleware)作为 create_agent 的核心特性,为 Agent 开发带来了前所未有的灵活性和可扩展性。

本文将深入解读中间件机制,并通过实战示例帮助你快速上手。


一、什么是中间件?

中间件(Middleware) 是 LangChain v1 引入的核心抽象,它允许开发者在 Agent 执行流程的各个阶段插入自定义逻辑,实现上下文工程(Context Engineering)——在正确的时间将正确的信息传递给模型。

中间件的核心价值:

  • 动态提示词管理:根据上下文动态调整系统提示
  • 对话历史压缩:自动摘要过长的对话历史
  • 工具访问控制:根据用户权限选择性暴露工具
  • 状态管理:跨执行周期维护自定义状态
  • 安全护栏:输入输出验证、PII 脱敏、内容审核

Agent Flow

Yes

No

Input

before_agent

before_model

wrap_model_call

Model Call

after_model

Tool Call?

wrap_tool_call

Tool Exec

after_agent

Output


二、中间件钩子详解

LangChain 中间件提供 6 个核心钩子,覆盖 Agent 执行的完整生命周期:

钩子 执行时机 典型用例
before_agent Agent 调用前 加载记忆、验证输入
before_model 每次 LLM 调用前 更新提示词、裁剪消息
wrap_model_call 包裹 LLM 调用 拦截/修改请求和响应
wrap_tool_call 包裹工具调用 拦截/修改工具执行
after_model 每次 LLM 响应后 验证输出、应用护栏
after_agent Agent 完成后 保存结果、清理资源

三、内置中间件一览

3.1 SummarizationMiddleware - 对话历史摘要

当对话历史接近 Token 上限时,自动使用 LLM 压缩旧消息,保留近期上下文。

from langchain.agents import create_agent
from langchain.agents.middleware import SummarizationMiddleware

agent = create_agent(
    model="gpt-4o",
    tools=[weather_tool, calculator_tool],
    middleware=[
        SummarizationMiddleware(
            model="gpt-4o-mini",        # 用于生成摘要的模型
            trigger={"tokens": 4000},   # 触发摘要的 Token 阈值
            keep={"messages": 20},      # 保留最近 20 条消息
        ),
    ],
)

v1.1 增强:支持基于模型 Profile 的灵活触发点配置,实现上下文感知的摘要策略。


3.2 PIIMiddleware - PII 敏感信息处理

检测并处理个人身份信息(PII),支持脱敏(redact)、掩码(mask)、阻断(block)等策略。

from langchain.agents.middleware import PIIMiddleware

agent = create_agent(
    model="claude-sonnet-4-5-20250929",
    tools=[read_email, send_email],
    middleware=[
        # 脱敏邮箱地址
        PIIMiddleware("email", strategy="redact", apply_to_input=True),
        # 使用正则检测并阻断电话号码
        PIIMiddleware(
            "phone_number",
            detector=r"(?:\+?\d{1,3}[\s.-]?)?(?:\(?\d{2,4}\)?[\s.-]?)?\d{3,4}[\s.-]?\d{4}",
            strategy="block"
        ),
    ]
)

3.3 HumanInTheLoopMiddleware - 人机协作审批

对敏感工具调用暂停执行,等待人工审批。

from langchain.agents.middleware import HumanInTheLoopMiddleware

agent = create_agent(
    model="claude-sonnet-4-5-20250929",
    tools=[read_email, send_email],
    middleware=[
        HumanInTheLoopMiddleware(
            interrupt_on={
                "send_email": {
                    "description": "请审核此邮件后再发送",
                    "allowed_decisions": ["approve", "edit", "reject"]
                }
            }
        )
    ]
)

3.4 TodoListMiddleware - 任务规划

为 Agent 提供任务规划和跟踪能力,自动注入 write_todos 工具和相关系统提示。

from langchain.agents.middleware import TodoListMiddleware

agent = create_agent(
    model="gpt-4o",
    tools=[read_file, write_file, run_tests],
    middleware=[TodoListMiddleware()],
)

适用场景:

  • 需要跨多个工具协调的复杂多步骤任务
  • 需要进度可见性的长时间运行操作

3.5 ModelRetryMiddleware - 模型调用重试(v1.1 新增)

自动重试失败的模型调用,支持可配置的指数退避策略,提升 Agent 可靠性。

from langchain.agents.middleware import ModelRetryMiddleware

agent = create_agent(
    model="gpt-4o",
    tools=[...],
    middleware=[
        ModelRetryMiddleware(
            max_retries=3,
            backoff_factor=2.0,  # 指数退避因子
        ),
    ],
)

3.6 OpenAI Content Moderation - 内容审核(v1.1 新增)

使用 OpenAI 的审核端点检测和处理不安全内容,支持检查用户输入、模型输出和工具结果。

from langchain.agents.middleware import OpenAIModerationMiddleware

agent = create_agent(
    model="openai:gpt-4o",
    tools=[search_tool, database_tool],
    middleware=[
        OpenAIModerationMiddleware(
            model="openai:gpt-4o",
            moderation_model="omni-moderation-latest",
            check_input=True,   # 检查用户输入
            check_output=True,  # 检查模型输出
            exit_behavior="end",
        ),
    ],
)

四、自定义中间件实战

4.1 基础结构

自定义中间件需要继承 AgentMiddleware 类并实现所需的钩子方法:

from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware, AgentState
from typing import Any

class MyCustomMiddleware(AgentMiddleware):
    
    def before_model(self, state: AgentState, runtime) -> dict[str, Any] | None:
        """在每次模型调用前执行"""
        print(f"即将调用模型,当前消息数: {len(state['messages'])}")
        return None  # 返回 None 表示不修改状态
    
    def after_model(self, state: AgentState, runtime) -> dict[str, Any] | None:
        """在每次模型响应后执行"""
        print("模型调用完成")
        return None

4.2 实战案例:调用计数器中间件

实现一个跟踪模型调用次数并在超限时终止的中间件:

from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.agents.middleware import AgentState, AgentMiddleware
from typing_extensions import NotRequired
from typing import Any


# 扩展状态 Schema
class CustomState(AgentState):
    model_call_count: NotRequired[int]
    user_id: NotRequired[str]


class CallCounterMiddleware(AgentMiddleware[CustomState]):
    """调用计数器中间件:限制模型调用次数"""
    
    state_schema = CustomState
    
    def __init__(self, max_calls: int = 10):
        self.max_calls = max_calls

    def before_model(self, state: CustomState, runtime) -> dict[str, Any] | None:
        count = state.get("model_call_count", 0)
        if count >= self.max_calls:
            print(f"已达到最大调用次数 {self.max_calls},终止执行")
            return {"jump_to": "end"}  # 跳转到结束节点
        return None

    def after_model(self, state: CustomState, runtime) -> dict[str, Any] | None:
        current_count = state.get("model_call_count", 0)
        return {"model_call_count": current_count + 1}


# 使用中间件
agent = create_agent(
    model="gpt-4o",
    middleware=[CallCounterMiddleware(max_calls=5)],
    tools=[],
)

# 调用时传入自定义状态
result = agent.invoke({
    "messages": [HumanMessage("你好,请介绍一下自己")],
    "model_call_count": 0,
    "user_id": "user-123",
})

4.3 实战案例:基于用户等级的动态模型选择

根据用户专业等级动态切换模型和工具:

from dataclasses import dataclass
from typing import Callable
from langchain_openai import ChatOpenAI
from langchain.agents.middleware import AgentMiddleware, ModelRequest
from langchain.agents.middleware.types import ModelResponse


@dataclass
class UserContext:
    user_expertise: str = "beginner"  # beginner | expert


class ExpertiseBasedMiddleware(AgentMiddleware):
    """根据用户等级动态选择模型和工具"""
    
    def wrap_model_call(
        self,
        request: ModelRequest,
        handler: Callable[[ModelRequest], ModelResponse]
    ) -> ModelResponse:
        user_level = request.runtime.context.user_expertise

        if user_level == "expert":
            # 专家用户:更强大的模型 + 高级工具
            model = ChatOpenAI(model="gpt-4o")
            tools = [advanced_search, data_analysis, code_execution]
        else:
            # 初学者:轻量模型 + 基础工具
            model = ChatOpenAI(model="gpt-4o-mini")
            tools = [simple_search, basic_calculator]

        return handler(request.override(model=model, tools=tools))


agent = create_agent(
    model="gpt-4o",
    tools=[simple_search, advanced_search, basic_calculator, data_analysis, code_execution],
    middleware=[ExpertiseBasedMiddleware()],
    context_schema=UserContext
)

五、中间件组合与执行顺序

多个中间件按照列表顺序依次执行,形成洋葱模型

agent = create_agent(
    model="gpt-4o",
    tools=[...],
    middleware=[
        PIIMiddleware("email", strategy="redact"),      # 第 1 层
        SummarizationMiddleware(model="gpt-4o-mini"),   # 第 2 层
        HumanInTheLoopMiddleware(interrupt_on={...}),   # 第 3 层
        CallCounterMiddleware(max_calls=10),            # 第 4 层
    ],
)

执行流程:

  1. 进入阶段:按顺序执行 before_* 钩子(1→2→3→4)
  2. 核心执行:模型调用 / 工具调用
  3. 退出阶段:按逆序执行 after_* 钩子(4→3→2→1)

六、最佳实践

  1. 从简单开始:先使用静态提示词和工具,仅在必要时添加动态特性
  2. 增量测试:每次只添加一个中间件,验证其行为
  3. 监控性能:跟踪模型调用次数、Token 使用量和延迟
  4. 善用内置中间件:优先使用 SummarizationMiddlewarePIIMiddleware 等成熟方案
  5. 区分瞬态与持久化
    • 瞬态(Transient)before_model 中的消息裁剪仅影响当前调用
    • 持久化(Persistent)SummarizationMiddleware 会永久更新状态

七、完整项目案例

本项目取自B站 九天Hector ℓ DeepSeek-V3.2+LangChain 1.1智能体开发实战
项目开源地址(笔者开源版,非原版):https://gitee.com/ye_sheng0839/agentic-rag

核心代码解释

from typing import List, Optional, Tuple
from langchain.tools import tool
from langchain.agents import create_agent
from langchain_core.messages import AIMessage, ToolMessage
from langchain_core.documents import Document

from app.core.config import get_llm
from app.services.file_service import FileService
from app.schemas.api_schemas import DocSource

class AgentService:
    
    @staticmethod
    def chat_with_agent(query: str, kb_name: Optional[str], top_k: int) -> Tuple[str, List[DocSource]]:
        """
        Agentic RAG 主流程:
        1. 动态加载 Metadata 
        2. 动态生成 System Prompt
        3. 动态绑定 VectorStore Tool
        """
        llm = get_llm()
        tools = []
        
        # 默认 System Prompt
        system_context = "你是一名乐于助人的AI助手,请直接回答用户的问题。用户可以上传文档,你会基于用户上传的文档知识进行回答。"

        # === 中间件逻辑:如果有知识库,则注入上下文 ===
        if kb_name:
            vector_store = FileService.load_vector_store(kb_name)
            if vector_store:
                
                # 1. 读取元数据,构建动态 Prompt
                metadata = FileService.load_kb_metadata(kb_name)
                topics = metadata.get("topics", [])
                topics_str = "、".join(topics) if topics else "通用文档"
                
                system_context = (
                    f"你是一名基于知识库【{kb_name}】的智能助手。\n"
                    f"该知识库主要包含以下主题内容:**{topics_str}**。\n"
                    "当用户的问题涉及到上述内容或细节时,请务必调用 retrieve_context 工具检索信息来回答。\n"
                    "如果问题与知识库无关(例如闲聊),请用你的通用知识回答,并简要告知用户该问题超出了当前知识库范围。"
                )

                # 2. 定义绑定了当前 vector_store 的工具
                @tool(response_format="content_and_artifact")
                def retrieve_context(search_query: str):
                    """Retrieve information to help answer a query."""
                    # 使用 with_score 是为了给前端提供置信度,虽然 LLM 主要看 content
                    docs_and_scores = vector_store.similarity_search_with_score(search_query, k=top_k)
                    
                    # 序列化给 LLM 看 (仅文本)
                    serialized = "\n\n".join(
                        (f"Source: {doc.metadata}\nContent: {doc.page_content}")
                        for doc, score in docs_and_scores
                    )
                    
                    # 构造 Artifact (包含分数,给前端用)
                    artifacts = []
                    for doc, score in docs_and_scores:
                        # 兼容处理:确保 artifact 里存的是易于解析的对象或原始 Document
                        # 这里我们存原始 Document 对象,稍后在外部解析
                        # 为了携带 score,我们动态给 doc 加个属性,或者封装一下
                        doc.metadata["score"] = float(score) # 将分数注入 metadata 方便携带
                        artifacts.append(doc)
                    
                    return serialized, artifacts
                
                tools = [retrieve_context]

        # === 创建 Agent ===
        # 使用 create_agent (LangChain 1.1 标准)
        agent = create_agent(llm, tools, system_prompt=system_context)

        # === 执行 Agent ===
        messages = [{"role": "user", "content": query}]
        response = agent.invoke({"messages": messages})
        
        # === 解析结果 ===
        # 从 response['messages'] 中提取最终回答和 Artifact
        final_answer = ""
        sources = []

        if "messages" in response:
            msg_list = response["messages"]
            
            # 1. 获取最后一条 AI 回复
            last_msg = msg_list[-1]
            if isinstance(last_msg, AIMessage):
                final_answer = last_msg.content

            # 2. 遍历获取 ToolMessage 中的 Artifact
            for msg in msg_list:
                if isinstance(msg, ToolMessage) and msg.artifact:
                    for doc in msg.artifact:
                        if isinstance(doc, Document):
                            # 从 metadata 中取出我们刚才塞进去的 score
                            score = doc.metadata.get("score", 0.0)
                            
                            sources.append(DocSource(
                                content=doc.page_content,
                                metadata=doc.metadata,
                                score=score
                            ))
        
        return final_answer, sources

    @staticmethod
    def recall_test(kb_name: str, query: str, top_k: int) -> List[DocSource]:
        """
        召回测试 (不走 Agent,直接查向量库)
        """
        vector_store = FileService.load_vector_store(kb_name)
        if not vector_store:
            raise ValueError(f"Knowledge base '{kb_name}' not found.")
            
        docs_and_scores = vector_store.similarity_search_with_score(query, k=top_k)
        
        results = []
        for doc, score in docs_and_scores:
            results.append(DocSource(
                content=doc.page_content,
                metadata=doc.metadata,
                score=float(score)
            ))
        return results
模块一:动态上下文注入 (Dynamic Context Injection)

核心概念:拒绝千篇一律的 System Prompt,根据知识库内容实时调整 AI 人设。

chat_with_agent 函数的开头,我们并没有使用写死的 Prompt,而是引入了一个"中间件"逻辑:

# === 中间件逻辑 ===
if kb_name:
    # 1. 实时读取磁盘上的元数据 (metadata.json)
    metadata = FileService.load_kb_metadata(kb_name)
    topics = metadata.get("topics", [])
    
    # 2. 动态组装 System Prompt
    system_context = (
        f"你是一名基于知识库【{kb_name}】的智能助手。\n"
        f"该知识库主要包含以下主题内容:**{topics_str}**。\n"
        "..."
    )
  • 技术解读
    • 按需加载:只有当用户指定了 kb_name 时,系统才会去加载对应的向量库和元数据。
    • Prompt 模板化:利用 Python 的 f-string,将提取到的 topics(如"薪资管理"、“API接口”)嵌入到系统提示词中。
    • 价值:这让 Agent 具备了元认知能力。它知道自己"懂什么",从而在回答问题时更自信,或者在遇到无关问题时能准确拒绝。
模块二:运行时工具绑定 (Runtime Tool Binding)

核心概念:利用 Python 的闭包 (Closure) 特性,为每一次对话创建专属的检索工具。

请注意 retrieve_context 函数定义的位置——它是在 chat_with_agent 函数内部定义的,而不是全局定义的。

# 在函数内部定义 Tool
@tool(response_format="content_and_artifact")
def retrieve_context(search_query: str):
    # 这里直接使用了外部作用域的 vector_store 变量
    docs_and_scores = vector_store.similarity_search_with_score(search_query, k=top_k)
    # ...
  • 技术解读
    • 闭包机制:这个 Tool 捕获了当前请求上下文中的 vector_storetop_k 参数。
    • 隔离性:用户 A 的请求会生成一个绑定了 A 知识库的 Tool;用户 B 的请求会生成另一个。两者互不干扰,即使并发执行也不会串库。
    • 动态挂载tools = [retrieve_context] 这一行是在运行时决定的。如果没有知识库,tools 就是空的,Agent 自动退化为普通聊天模式。
模块三:双轨制数据流 (Content & Artifact)

核心概念:LangChain 1.1 的杀手级特性,解决"AI 看的内容"与"前端展示的内容"需求不一致的问题。

我们在 @tool 装饰器中指定了 response_format="content_and_artifact",这是实现引用透明化的关键。

# 1. 给 LLM 看的 (Serialized):纯文本,省 Token,易理解
serialized = "\n\n".join(f"Source: {doc.metadata}\nContent: {doc.page_content}" ...)

# 2. 给前端看的 (Artifact):原始对象,带分数,结构化
artifacts = []
for doc, score in docs_and_scores:
    doc.metadata["score"] = float(score) # 注入置信度分数
    artifacts.append(doc)

return serialized, artifacts
  • 技术解读
    • Content (serialized):这是喂给大模型的上下文。我们去掉了不必要的干扰信息,只保留文本和必要的元数据,帮助模型生成答案。
    • Artifact (artifacts):这是"副作用"数据。大模型看不到这个列表,但 LangChain 会把它保留在 ToolMessage 中。我们利用它将 相关性评分 (Score)原始文档对象 透传给前端,用于渲染"引用来源"卡片。
模块四:标准化执行与解析 (Standard Execution & Parsing)

核心概念:遵循 LangChain 标准协议,精准提取多模态输出。

最后是 Agent 的执行和结果提取环节,这里体现了后端开发的严谨性。

# 1. 标准化创建 Agent
agent = create_agent(llm, tools, system_prompt=system_context)

# 2. 执行并获取完整消息历史
response = agent.invoke({"messages": messages})

# 3. 解析逻辑
if "messages" in response:
    msg_list = response["messages"]
    # 提取回答:最后一条消息通常是 AI 的回答
    last_msg = msg_list[-1]
    
    # 提取引用:遍历寻找 ToolMessage 中的 artifact 字段
    for msg in msg_list:
        if isinstance(msg, ToolMessage) and msg.artifact:
            # ... 转换为前端所需的 DocSource 格式 ...
  • 技术解读
    • create_agent:这是 LangChain 0.2/0.3 (1.1+ API) 推荐的工厂函数,它屏蔽了底层的 Prompt 拼装细节(如 AgentScratchPad)。
    • 消息回溯:由于 Agent 可能进行多轮思考(虽然 RAG 通常是一轮),我们需要遍历 messages 列表来找回 Tool 的执行结果(即 artifact)。这是获取检索来源最准确的方式,比正则匹配文本要可靠得多。

八、总结

LangChain 1.1 的中间件系统为 Agent 开发提供了强大的扩展能力:

特性 说明
可组合性 多个中间件可自由组合,各司其职
生命周期钩子 6 个钩子覆盖完整执行流程
状态扩展 自定义状态 Schema 支持跨调用数据传递
生产就绪 内置中间件经过充分测试,可直接用于生产环境

无论是实现对话历史压缩、敏感信息脱敏、人机协作审批,还是构建复杂的动态上下文工程策略,中间件都能帮助你以优雅、可维护的方式实现目标。


参考资料

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐