在这里插入图片描述

文章目录


本节学习目标

通过本节课的学习,你将全面掌握:

  1. 理解HIL应用场景:准确判断哪些业务场景需要人机交互,识别缺信息补全、高风险审批、质量兜底三大经典模式
  2. 掌握HIL核心原理:深入理解interrupt()的工作机制、检查点持久化、以及暂停→持久化→恢复的三步闭环
  3. 实现工作流暂停与恢复:通过interrupt()Command搭建完整的可中断工作流
  4. 构建人工审批流程:实现创建草稿→人工审批→根据审批结果执行的完整模式
  5. 实现输入拦截与二次确认:在关键操作前校验并拦截,等待人工确认或参数修正
  6. 开发对话式交互系统:构建Agent主动向用户求助的完整对话工作流
  7. 掌握生产级HIL设计规范:可重入节点设计、幂等性保证、会话超时控制、多中断顺序管理
  8. 排查常见问题:中断后无法恢复、状态丢失、重复执行等典型故障的定位与修复

人机交互应用场景

在真实业务里,智能体经常会遇到三类“必须让人上场”的情况:

场景一:信息缺失(User-fixable)

典型问题:智能体缺账号、缺订单号、缺日期、指令不清、缺少必要信息。

示例场景

  • 客服Agent需要用户的订单号才能查询物流,但用户没有提供
  • 预订系统需要用户提供出行日期,但用户只说了“帮我订张机票”
  • 数据分析Agent需要明确的时间范围才能生成报表

HIL方案:工作流暂停,提示用户补齐信息,恢复后继续执行。调研显示,78%的智能客服系统在处理复杂查询时需要人工介入,HIL机制让系统具备实时纠错能力。

场景二:高风险动作(Approval required)

典型动作:退款、下单、发通知消息、写数据库、删除数据、修改线上配置、调用外部系统执行不可逆操作。

示例场景

  • 退货Agent决定为用户办理退款,需要人工审批金额和时间
  • 运维Agent准备删除数据库表或文件,需要双重确认
  • 金融交易Agent发起转账,需要高级审批

为什么需要HIL:这些动作一旦执行错误,影响可能非常大。行业调研显示,引入人工校验环节的自动化系统,任务完成准确率可提升40%以上。

场景三:质量兜底(Quality gate)

典型内容:对外回复、对外邮件、对外发布文案、用户协议、合同条款。

示例场景

  • 内容生成Agent撰写了对外发布的公告,需要人工审核
  • 销售Agent生成了报价单,需要上级确认
  • 客服Agent准备发送正式回复,需要质检人员审核

HIL方案:智能体生成内容后暂停,等待人工审核,审核通过/修改后恢复执行。

其他应用场景一览

应用场景 HIL作用 典型触发条件
金融交易验证 高风险操作双重确认 单笔交易金额 > 阈值
医疗诊断辅助 人类专家最终决策 模型置信度低于阈值
数据标注平台 AI预标+人工校验 每批次完成后抽样
企业采购审批 多层审批流转 采购金额超过审批线
智能客服转人工 复杂问题转接 模型三次失败或用户主动要求

HIL核心原理

什么是Human-in-the-Loop?

Human-in-the-loop(HIL)是一种架构模式,在这种模式中,系统在执行关键步骤时需要等待外部人工反馈才能继续执行。它背后的核心思想用一个口诀概括就是:Pause → Persist → Resume(暂停 → 持久化 → 恢复)。

对比传统的input(),传统方案只是简单阻塞,无法跨请求恢复,而HIL才是真正的“可恢复的长流程”能力:

维度 传统input() LangGraph HIL
状态保存 仅内存,程序退出即丢失 检查点持久化,永久保存
跨会话恢复 ❌ 不支持 ✅ 通过thread_id恢复
异步审批 ❌ 必须同步输入 ✅ 可等待数小时/数天
历史追踪 ❌ 无 ✅ 完整状态历史
生产可用 ❌ 否 ✅ 是

interrupt()函数详解

interrupt()是LangGraph提供的内置函数,用于在节点内部触发可恢复的中断。它的工作机制如下:

  1. 暂停执行:调用interrupt()时,当前节点立即停止执行,LangGraph抛出一个可恢复异常
  2. 状态持久化:当前完整的graph状态(包括所有字段、待执行节点等)通过checkpointer被保存
  3. 返回中断信息:执行结果中会包含__interrupt__字段,其中包含传给interrupt()的值
  4. 等待恢复:graph无限期等待,直到外部提供恢复指令
  5. 恢复执行:使用Command(resume=value)重新调用graph,interrupt()返回提供的恢复值,节点从开头重新执行

基础示例

from langgraph.types import interrupt, Command

def ask_human_node(state):
    # 暂停并询问用户年龄
    answer = interrupt("what is your age?")
    print(f"用户回答: {answer}")
    return {"age": answer}

关键特性总结

特性 说明
返回值类型 任意JSON可序列化值
checkpoint必需 ✅ 必须配合checkpointer使用
中断标识 返回__interrupt__字段
恢复方式 Command(resume=value)
节点重入 恢复时从节点开头重新执行

静态中断 vs 动态中断

LangGraph支持两种中断机制:

类型 触发方式 适用场景 示例
静态中断 compile()时配置interrupt_before/interrupt_after 固定位置的强制拦截,如每一步都需要审核 graph.compile(checkpointer=memory, interrupt_after=["send_node"])
动态中断 节点内调用interrupt() 条件性暂停,仅在特定条件下才触发 高风险工具调用前审批

静态中断示例

# 在敏感节点执行后强制中断
graph = builder.compile(
    checkpointer=memory,
    interrupt_after=["send_email"]  # 发邮件后暂停等待确认
)

Checkpointer的必要性

interrupt必须配合checkpointer使用。原因是:中断会跨请求、跨时间、跨服务器实例,如果没有持久化存储,当人工审批结束、系统重新调用graph时,LangGraph无法知道之前的执行进度。

from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

生产环境应使用持久化存储:

Checkpointer类型 适用场景 特点
MemorySaver 开发/测试 内存存储,服务重启后丢失
SqliteSaver 中小规模生产 SQLite文件持久化,单机部署
PostgresSaver 高并发生产 PostgreSQL数据库,支持集群
RedisSaver 高性能场景 分布式缓存,低延迟
自定义Checkpointer 特殊需求 实现BaseCheckpointSaver接口

thread_id与会话隔离

使用checkpointer时,需要通过thread_id来区分不同的会话/用户:

config = {"configurable": {"thread_id": "user_123_session_456"}}

# 首次执行,触发中断
graph.invoke(initial_state, config=config)  # 返回包含__interrupt__的结果

# 恢复执行,使用相同的thread_id
graph.invoke(Command(resume="批准"), config=config)
  • thread_id相同 → 从同一个检查点恢复继续执行
  • thread_id不同 → 启动全新的独立会话,互不干扰

暂停/恢复工作流

核心三步流程

from typing import TypedDict, Optional
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt, Command

class HILState(TypedDict):
    value: str
    human_input: Optional[str]

def node_with_interrupt(state: HILState):
    # 步骤1:调用interrupt,暂停执行
    user_response = interrupt("请输入您的意见")
    
    # 步骤3:恢复执行时,user_response接收到Command(resume=...)的值
    return {"human_input": user_response}

builder = StateGraph(HILState)
builder.add_node("ask", node_with_interrupt)
builder.add_edge(START, "ask")
builder.add_edge("ask", END)

memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

config = {"configurable": {"thread_id": "test_001"}}

# 步骤1执行 → 遇到interrupt,返回中断信息
result = graph.invoke({"value": "initial", "human_input": None}, config=config)
print(result)  # 输出包含__interrupt__字段

# 步骤2:人工审批
human_decision = "批准"

# 步骤3:恢复执行
result = graph.invoke(Command(resume=human_decision), config=config)
print(result)  # 最终结果

使用流式接口(.stream)处理中断

使用.stream()方法可以更优雅地捕获中断事件:

config = {"configurable": {"thread_id": "test_002"}}

for chunk in graph.stream({"value": "initial", "human_input": None}, config=config):
    if "__interrupt__" in chunk:
        print(f"工作流已暂停,等待输入: {chunk['__interrupt__']}")
        # 在实际应用中,这里会触发通知(短信/邮件/Webhook)通知人工审批
    else:
        print(f"执行完成: {chunk}")

人工介入审批流程

完整审批流程实现

以下是一个完整的邮件发送审批工作流:Agent起草邮件 → 人工审批 → 根据审批结果(批准/拒绝/修改参数)执行相应操作。

from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt, Command
from langchain_openai import ChatOpenAI

class ApprovalState(TypedDict):
    # 邮件草稿内容
    draft: str
    # 审批状态: pending, approved, rejected, modified
    approval_status: Literal["pending", "approved", "rejected", "modified"]
    # 审批反馈意见
    approval_feedback: str
    # 发送状态
    email_sent: bool
    # 错误信息
    error_message: str

def draft_email_node(state: ApprovalState) -> dict:
    """节点1:Agent起草邮件内容(模拟调用大模型)"""
    # 实际应用中,这里会调用LLM生成邮件
    print("[Agent] 正在起草邮件...")
    draft = "尊敬的客户:\n\n您的会员等级已自动升级为黄金会员,尊享8折优惠!\n\n如需取消,请联系客服。\n\n此致\n智能客服系统"
    return {"draft": draft, "approval_status": "pending"}

def human_approval_node(state: ApprovalState) -> dict:
    """
    节点2:人工审批节点
    调用interrupt暂停,等待人工审批决策
    支持:批准(approve)、拒绝(reject)、修改参数(edit)
    """
    print("\n" + "="*50)
    print("等待人工审批")
    print("="*50)
    print(f"邮件草稿:\n{state['draft']}\n")
    print("请选择操作:")
    print("  1. 批准发送")
    print("  2. 拒绝发送")
    print("  3. 修改草稿后再发")
    print("="*50)
    
    # 暂停并等待人工输入
    # 恢复时可以传入字符串表示决策: "approve", "reject", 或包含新草稿的字典
    decision = interrupt({
        "message": "请审批此邮件草稿",
        "draft": state["draft"],
        "options": ["approve", "reject", "edit"]
    })
    
    # 解析决策结果
    if decision == "approve" or decision.get("action") == "approve":
        return {"approval_status": "approved", "approval_feedback": "已批准"}
    elif decision == "reject" or decision.get("action") == "reject":
        return {"approval_status": "rejected", "approval_feedback": "已拒绝"}
    elif decision.get("action") == "edit":
        # 人工修改了草稿(可在真实系统中调用编辑器)
        new_draft = decision.get("new_draft", state["draft"])
        return {
            "approval_status": "modified",
            "approval_feedback": "已修改",
            "draft": new_draft
        }
    else:
        return {"approval_status": "rejected", "approval_feedback": "无效的审批决策"}

def decision_router(state: ApprovalState) -> Literal["send_email", "log_rejection", "human_approval"]:
    """根据审批结果路由到不同节点"""
    if state["approval_status"] == "approved":
        print("[路由] 审批通过 → 执行发送")
        return "send_email"
    elif state["approval_status"] == "modified":
        print("[路由] 草稿已修改 → 重新进入审批")
        return "human_approval"
    else:  # rejected
        print("[路由] 已拒绝 → 记录日志")
        return "log_rejection"

def send_email_node(state: ApprovalState) -> dict:
    """节点3:实际发送邮件"""
    print(f"\n[系统] 正在发送邮件...")
    print(f"邮件内容:\n{state['draft']}")
    print(f"[系统] 邮件已发送!")
    return {"email_sent": True}

def log_rejection_node(state: ApprovalState) -> dict:
    """节点3b:记录拒绝日志"""
    print(f"\n[系统] 邮件审批被拒绝,已记录到日志")
    print(f"  拒绝前草稿: {state['draft'][:50]}...")
    return {"email_sent": False}

# 构建工作流
builder = StateGraph(ApprovalState)
builder.add_node("draft_email", draft_email_node)
builder.add_node("human_approval", human_approval_node)
builder.add_node("send_email", send_email_node)
builder.add_node("log_rejection", log_rejection_node)

builder.add_edge(START, "draft_email")
builder.add_edge("draft_email", "human_approval")
builder.add_conditional_edges("human_approval", decision_router)
builder.add_edge("send_email", END)
builder.add_edge("log_rejection", END)

# 编译时配置检查点
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

# 执行测试
config = {"configurable": {"thread_id": "approval_001"}}
initial_state = {
    "draft": "",
    "approval_status": "pending",
    "approval_feedback": "",
    "email_sent": False,
    "error_message": ""
}

print("="*60)
print("首次执行工作流(触发审批中断)")
print("="*60)

for chunk in graph.stream(initial_state, config=config):
    if "__interrupt__" in chunk:
        print(f"\n【中断信息】{chunk['__interrupt__']}")
        print("\n模拟人工审批(批准发送)...")
        # 恢复执行,传入审批决策
        result = graph.invoke(Command(resume="approve"), config=config)
        print(f"\n【最终结果】邮件已发送: {result['email_sent']}")
        break

审批模式进阶:支持参数编辑

允许人工在审批时修改参数是一个非常实用的功能:

def advanced_approval_node(state):
    # 发送更丰富的中断信息,让审批界面可以展示更多上下文
    ctx = interrupt({
        "type": "approval",
        "title": "请审批操作",
        "details": {
            "action": "delete_table",
            "table_name": state["target_table"],
            "affected_rows": state["row_count"],
            "backup_exists": state["has_backup"]
        },
        "options": ["approve", "reject", "edit_params"]
    })
    
    if ctx.get("action") == "edit_params":
        # 人工编辑了参数(如改表名或限制影响范围)
        new_params = ctx.get("params", {})
        return {
            "approved": True,
            "modified_params": new_params,
            "approval_note": "参数已人工修正"
        }
    elif ctx.get("action") == "approve":
        return {"approved": True, "modified_params": None}
    else:
        return {"approved": False}

输入拦截与二次确认

校验用户输入并拦截

实际生产中常常需要:拦截用户输入 → 验证合法性 → 不合法则循环要求重新输入。

from typing import TypedDict, Optional
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt, Command

class ValidationState(TypedDict):
    user_input: Optional[str]
    is_valid: bool
    error_message: str
    final_result: Optional[str]

def ask_for_input(state: ValidationState) -> dict:
    """节点1:请求用户输入"""
    if state.get("user_input") is None:
        # 首次执行,请求输入
        user_input = interrupt({
            "question": "请输入一个1到100之间的数字:",
            "description": "用于后续计算的数值输入"
        })
        return {"user_input": user_input}
    else:
        # 重试场景,已有输入
        return {}

def validate_input(state: ValidationState) -> dict:
    """节点2:校验输入"""
    try:
        num = int(state["user_input"])
        if 1 <= num <= 100:
            return {
                "is_valid": True,
                "error_message": "",
                "final_result": f"✓ 验证通过: {num}"
            }
        else:
            return {
                "is_valid": False,
                "error_message": "❌ 输入超出范围,请输入1-100之间的数字",
                "final_result": None
            }
    except ValueError:
        return {
            "is_valid": False,
            "error_message": "❌ 输入无效,请输入一个有效的数字",
            "final_result": None
        }

def process_input(state: ValidationState) -> dict:
    """节点3:处理有效输入"""
    num = int(state["user_input"])
    return {"final_result": f"✔ 您输入的数字 {num} 的平方是 {num ** 2}"}

def after_validation_router(state: ValidationState) -> str:
    """路由:校验通过则处理,不通过则重新输入"""
    if state["is_valid"]:
        return "process"
    else:
        return "retry"

def retry_node(state: ValidationState) -> dict:
    """重试节点:清空输入,准备重新收集"""
    print(f"⚠️ {state['error_message']}")
    # 清空之前的输入,让ask节点重新请求
    return {"user_input": None, "is_valid": False}

# 构建工作流
builder = StateGraph(ValidationState)
builder.add_node("ask", ask_for_input)
builder.add_node("validate", validate_input)
builder.add_node("process", process_input)
builder.add_node("retry", retry_node)

builder.add_edge(START, "ask")
builder.add_edge("ask", "validate")
builder.add_conditional_edges("validate", after_validation_router, {
    "process": "process",
    "retry": "retry"
})
builder.add_edge("retry", "ask")
builder.add_edge("process", END)

memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

# 执行测试
config = {"configurable": {"thread_id": "input_001"}}
initial_state = {"user_input": None, "is_valid": False, "error_message": "", "final_result": None}

print("="*60)
print("用户输入收集与校验流程")
print("="*60)

for chunk in graph.stream(initial_state, config=config):
    if "__interrupt__" in chunk:
        # 模拟用户输入无效值
        print("\n【触发中断,等待用户输入】")
        print("模拟输入: 150(超出范围)...")
        result = graph.invoke(Command(resume="150"), config=config)
        # 再次检查是否还有中断(校验不通过,应再次请求输入)
        if "__interrupt__" in result:
            print("【再次中断】输入无效,请重新输入")
            print("模拟输入: 42(有效值)...")
            final = graph.invoke(Command(resume="42"), config=config)
            print(f"\n最终结果: {final['final_result']}")
        break

关键操作前的二次确认

def dangerous_operation_node(state):
    """高风险操作节点,执行前必须二次确认"""
    confirmation = interrupt({
        "type": "confirmation",
        "severity": "high",
        "operation": "delete_all_records",
        "will_affect": state["record_count"],
        "warning": "此操作不可逆,请仔细确认"
    })
    
    if confirmation.get("confirmed") is True:
        # 执行危险操作
        result = perform_deletion(state)
        return {"operation_result": result, "confirmed": True}
    elif confirmation.get("confirmed") is False:
        return {"operation_result": "cancelled", "confirmed": False}
    else:
        # 等待有效决策
        return {}

对话式交互实战

Agent主动向用户求助

构建一个餐厅预订系统,Agent自主分析用户需求,当信息不足时主动向用户提问并等待回复:用户说出意图 → Agent分析缺失信息 → 如缺项则暂停提问 → 用户补充后循环 → 信息完整则完成预订。

from typing import TypedDict, Annotated, Literal
from operator import add
from langgraph.graph import StateGraph, START, END, add_messages
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt, Command
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI

class ReservationState(TypedDict):
    """餐厅预订系统状态"""
    messages: Annotated[list, add_messages]
    collected_info: dict  # 已收集的信息: date, time, people, name, phone
    missing_info: list   # 缺失的信息字段
    booking_confirmed: bool
    booking_reference: str
    error_message: str

class BusinessHours:
    """业务规则库"""
    @staticmethod
    def is_valid_time(time_str: str) -> bool:
        # 营业时间 11:00-15:00, 17:00-22:00
        hour = int(time_str.split(":")[0])
        if 11 <= hour < 15:
            return True
        if 17 <= hour < 22:
            return True
        return False
    
    @staticmethod
    def date_is_future(date_str: str) -> bool:
        from datetime import datetime
        try:
            date = datetime.strptime(date_str, "%Y-%m-%d")
            return date >= datetime.now()
        except:
            return False

def agent_analyze(state: ReservationState) -> dict:
    """Agent分析用户需求,判断缺失哪些信息"""
    model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
    
    # 系统提示
    system_prompt = f"""你是餐厅预订助手。你需要从对话中提取以下预订信息:
    - date: 预订日期(格式 YYYY-MM-DD,必须是未来日期,营业时间11:00-15:00和17:00-22:00)
    - time: 预订时间(格式 HH:MM)
    - people: 用餐人数(数字,必须>0)
    - name: 预订人姓名
    - phone: 联系电话

    当前已收集的信息: {state.get('collected_info', {})}
    
    只输出 JSON 格式,例如: {{"missing": ["date", "time"], "extracted": {{"date": "2025-12-25", "people": 4}}}}
    """
    
    response = model.invoke([
        SystemMessage(content=system_prompt),
        *state["messages"]
    ])
    
    import json
    try:
        analysis = json.loads(response.content)
    except:
        # 解析失败,继续缺省判断
        analysis = {"missing": ["date", "time", "people"]}
    
    missing = analysis.get("missing", [])
    extracted = analysis.get("extracted", {})
    
    # 合并提取的信息
    collected = {**state.get("collected_info", {}), **extracted}
    
    return {"collected_info": collected, "missing_info": missing}

def ask_human_node(state: ReservationState) -> dict:
    """当信息不足时,生成问题并向用户提问"""
    missing = state["missing_info"]
    collected = state.get("collected_info", {})
    
    # 构建提问内容
    if "date" in missing:
        question = "请问您计划几号来用餐?(格式:YYYY-MM-DD)"
    elif "time" in missing:
        question = "请问您几点到店?(午餐 11:00-15:00, 晚餐 17:00-22:00)"
    elif "people" in missing:
        question = "请问一共几位用餐?"
    elif "name" in missing:
        question = "请问您的姓名是?"
    elif "phone" in missing:
        question = "请问您的联系电话是?"
    else:
        # 理论上不应该走到这里
        question = "请补充缺少的预订信息"
    
    print(f"\n[Agent] 需要补充信息: {missing}")
    
    # 暂停等待用户回答
    user_response = interrupt({
        "question": question,
        "collected_so_far": collected,
        "missing_fields": missing
    })
    
    # 将用户回答添加到消息历史
    new_message = HumanMessage(content=str(user_response))
    return {"messages": [new_message]}

def verify_collected(state: ReservationState) -> dict:
    """验证已收集的信息是否完整且有效"""
    collected = state.get("collected_info", {})
    missing = []
    errors = []
    
    # 验证日期
    if not collected.get("date"):
        missing.append("date")
    elif not BusinessHours.date_is_future(collected["date"]):
        errors.append("日期必须是未来日期")
        missing.append("date")
    
    # 验证时间
    if not collected.get("time"):
        missing.append("time")
    elif not BusinessHours.is_valid_time(collected["time"]):
        errors.append("时间必须在营业时间内")
        missing.append("time")
    
    # 验证人数
    if not collected.get("people"):
        missing.append("people")
    elif collected["people"] <= 0:
        errors.append("人数必须大于0")
        missing.append("people")
    
    # 验证姓名
    if not collected.get("name"):
        missing.append("name")
    
    # 验证电话
    if not collected.get("phone"):
        missing.append("phone")
    
    return {"missing_info": missing, "error_message": "; ".join(errors) if errors else ""}

def route_after_verify(state: ReservationState) -> Literal["ask_human", "confirm_booking", "abort"]:
    """路由:有缺失则继续询问,完全收集则确认预订"""
    missing = state.get("missing_info", [])
    if missing:
        return "ask_human"
    elif state.get("error_message"):
        # 如果验证有错误(如日期无效),记录后放弃本次预订
        return "abort"
    else:
        return "confirm_booking"

def confirm_booking(state: ReservationState) -> dict:
    """最终确认并完成预订"""
    collected = state.get("collected_info", {})
    
    confirmation = interrupt({
        "type": "confirmation",
        "message": f"请确认预订信息:",
        "details": collected,
        "options": ["confirm", "cancel"]
    })
    
    if confirmation == "confirm":
        import uuid
        ref = str(uuid.uuid4())[:8]
        print(f"\n✅ 预订已确认!预订号: {ref}")
        return {"booking_confirmed": True, "booking_reference": ref}
    else:
        print("\n❌ 预订已取消")
        return {"booking_confirmed": False, "booking_reference": ""}

def abort_booking(state: ReservationState) -> dict:
    """处理预订失败"""
    print(f"\n❌ 预订失败: {state.get('error_message', '信息无效')}")
    return {"booking_confirmed": False, "booking_reference": ""}

# 构建工作流
builder = StateGraph(ReservationState)
builder.add_node("agent_analyze", agent_analyze)
builder.add_node("ask_human", ask_human_node)
builder.add_node("verify_collected", verify_collected)
builder.add_node("confirm_booking", confirm_booking)
builder.add_node("abort", abort_booking)

builder.add_edge(START, "agent_analyze")
builder.add_edge("agent_analyze", "verify_collected")
builder.add_conditional_edges("verify_collected", route_after_verify)
builder.add_edge("ask_human", "agent_analyze")  # 用户回答后重新分析
builder.add_edge("confirm_booking", END)
builder.add_edge("abort", END)

memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

# 执行测试
config = {"configurable": {"thread_id": "booking_001"}}
initial_state = {
    "messages": [HumanMessage(content="我想预订餐厅")],
    "collected_info": {},
    "missing_info": [],
    "booking_confirmed": False,
    "booking_reference": "",
    "error_message": ""
}

print("="*60)
print("智能餐厅预订助手")
print("="*60)
result = graph.invoke(initial_state, config=config)

生产级HIL设计规范

规范1:节点必须设计为可重入和幂等

恢复时会从被中断节点的开头重新执行。因此,绝对不能这样写:

# ❌ 危险:中断前执行了不可逆操作
def dangerous_node(state):
    # 发送邮件 → 会重复发送!
    send_email(state["draft"])
    # 然后才中断
    approval = interrupt("请审批")
    return {"approved": approval}

而应该这样做:

# ✅ 安全:中断后再执行外部操作
def safe_node(state):
    # 先中断,等待确认
    approval = interrupt("请审批")
    # 确认通过后再执行外部操作
    if approval == "approve":
        send_email(state["draft"])
    return {}

核心原则:把写数据库、扣费、发消息等事务性动作放在interrupt()之后,并为每个操作加上幂等键(如“订单号 + step_id”)。

规范2:中断值必须JSON可序列化

# ❌ 不可序列化
interrupt(open("/tmp/file", "r"))

# ✅ 可序列化
interrupt({"file_path": "/tmp/file", "line_count": 100})

规范3:一个节点中使用多个中断的注意事项

LangGraph会维护一个恢复值列表,与中断调用顺序匹配。当节点包含多个interrupt调用时:

def multiple_interrupts_node(state):
    # 第一次中断
    name = interrupt("请输入姓名")
    # 第二次中断
    age = interrupt("请输入年龄")
    return {"name": name, "age": age}

两次恢复需要分别传入值:

# 第一次恢复:传入姓名
result = graph.invoke(Command(resume="张三"), config)
# 第二次恢复:传入年龄
result = graph.invoke(Command(resume="25"), config)

规范4:会话超时控制与清理机制

生产环境中需要清理长时间挂起的中断会话:

from datetime import datetime, timedelta

class SessionManager:
    def __init__(self, timeout_minutes: int = 30):
        self.session_timeout = timedelta(minutes=timeout_minutes)
        self.sessions = {}  # thread_id -> last_active
    
    def is_expired(self, thread_id: str) -> bool:
        if thread_id not in self.sessions:
            return True
        return datetime.now() - self.sessions[thread_id] > self.session_timeout
    
    def touch(self, thread_id: str):
        self.sessions[thread_id] = datetime.now()
    
    def cleanup_expired(self, graph, configs: list):
        """清理超时会话的中断状态"""
        for cfg in configs:
            thread_id = cfg["configurable"]["thread_id"]
            if self.is_expired(thread_id):
                state = graph.get_state(cfg)
                if state and state.tasks:  # 存在中断
                    # 记录并清理
                    print(f"清理超时会话: {thread_id}")

规范5:生产环境必须使用持久化Checkpointer

# 开发/单机测试
from langgraph.checkpoint.sqlite import SqliteSaver

sqlite_saver = SqliteSaver.from_conn_string("checkpoints.db")
graph = builder.compile(checkpointer=sqlite_saver)

# 高并发生产环境
from langgraph.checkpoint.postgres import PostgresSaver

conn_string = "postgresql://user:pass@localhost:5432/langgraph"
with PostgresSaver.from_conn_string(conn_string) as checkpointer:
    checkpointer.setup()
    graph = builder.compile(checkpointer=checkpointer)

规范6:合理使用中断时的状态检测

当Graph被中断后(无论是静态还是动态),可以通过get_state()来检查当前的快照状态,包括正在等待执行的下一个节点和活跃的中断信息:

# 获取当前graph的状态快照
state_snapshot = graph.get_state(config)
print(f"当前状态值: {state_snapshot.values}")
print(f"下一个待执行节点: {state_snapshot.next}")
print(f"活跃的中断信息: {state_snapshot.interrupts}")

# 根据state_snapshot.next来判断是从哪个节点恢复
if state_snapshot.next:
    print(f"将从节点 {state_snapshot.next} 恢复执行")

规范7:生产部署的配置模板

# langgraph_production.yaml
human_in_loop:
  enabled: true
  checkpointer:
    type: postgres
    dsn: postgresql://user:pass@localhost:5432/langgraph
    pool_size: 20
    timeout: 30
  session:
    timeout_minutes: 30
    max_sessions_per_user: 5
  interrupts:
    max_per_node: 5
    recovery_limit: 10
  notifications:
    webhook: https://alerts.company.com/langgraph/interrupt

常见问题排查

问题1:中断后无法恢复,graph一直卡住

现象:调用Command(resume=...)后graph没有继续执行。

原因

  1. 没有使用相同的thread_id
  2. checkpointer未正确配置
  3. 恢复调用时传入了新的初始状态而不是Command

解决方案

# 错误用法
graph.invoke({"value": "new"}, config)  # 创建了新会话

# 正确用法
graph.invoke(Command(resume="approved"), config)  # 恢复会话

问题2:节点重复执行导致副作用

现象:恢复后节点内部代码执行了两次,例如发送了两次邮件。

原因:恢复执行时节点会从开头重新执行,且interrupt()之前有对外部系统的操作。

解决方案:把有副作用的操作移到interrupt()之后,并添加幂等控制。

问题3:服务重启后中断状态丢失

现象:服务重启后,之前的中断会话无法恢复。

原因:使用了MemorySaver,重启后检查点数据全部丢失。

解决方案:生产环境必须使用持久化Checkpointer(SQLite/PostgreSQL/Redis)。

问题4:多个中断的恢复值顺序错误

现象:多个interrupt()的恢复值与调用顺序不匹配。

原因:自己维护了错误的恢复值列表。

解决方案:LangGraph会自动按顺序匹配,保持Command(resume=...)的调用顺序与interrupt()的调用顺序一致即可。

问题5:静态中断无法在节点内动态判断条件

现象:使用interrupt_before强制拦截后,人工发现不需要审批但无法跳过。

解决方案:改用动态interrupt(),在节点内根据条件决定是否触发中断:

def conditional_approval_node(state):
    if state.get("amount") > 10000:  # 仅大额交易需要审批
        approval = interrupt("大额交易需要审批,请确认")
        return {"approved": approval}
    else:
        return {"approved": True}

问题6:会话超时导致资源泄漏

现象:大量中断会话未被清理,内存/数据库压力增大。

解决方案:实现清理机制,定时扫描超时(如30分钟未操作)的中断状态并标记为过期或自动拒绝。

问题7:LLM调用中断与人工审批混淆

现象:Agent调用工具失败时触发了人工审批,但实际只需自动重试。

解决方案:按官方建议分类处理:

错误类型 处理方式
临时错误(网络波动/限流) 自动重试(RetryPolicy)
LLM可恢复错误(工具失败/解析失败) 写入state,回到agent再试
用户可修复错误(缺信息/指令不清) interrupt()暂停,等人补齐
未知错误 冒泡给开发者

本节总结+思考题

核心知识点回顾

概念 说明
HIL三步曲 Pause → Persist → Resume(暂停 → 持久化 → 恢复)
interrupt() 节点内触发中断,必须配合checkpointer
Command(resume=...) 提供恢复值,使中断节点继续执行
thread_id 隔离不同用户会话,相同ID恢复相同会话
静态中断 compile(interrupt_before/after=[]),固定位置拦截
动态中断 节点内interrupt(),条件性触发
可重入节点 重复执行不会产生副作用
幂等操作 即使重复执行也产生相同结果

课后思考题

基础题

  1. interrupt()和静态中断(interrupt_before/interrupt_after)的根本区别是什么?各适合什么场景?
  2. 为什么interrupt()必须配合checkpointer使用?MemorySaverPostgresSaver在生产环境各有哪些适用场景?
  3. 恢复执行时,被中断的节点是从哪个位置开始执行的?这对节点的代码编写提出了什么要求?

进阶题

  1. 设计一个多级审批流程(一级审批通过后自动进入二级审批,两级都通过才最终执行),请画出图结构并用代码实现关键逻辑。
  2. 解释一个节点内包含多个interrupt()调用时,LangGraph是如何按顺序匹配恢复值的?为什么这种设计便于处理复杂的交互流程?
  3. 如果工作流中某个节点可能因以下原因被反复中断:缺信息、工具调用失败、审批拒绝并编辑参数。如何判断该是重新请求用户补充信息、尝试自动重试还是终止流程?

实践题

  1. 运行本节课的邮件审批示例,用自己的测试数据模拟批准/拒绝/修改三种场景,最终验证工作流的恢复和分支逻辑是否按预期运行。
  2. 为餐厅预订系统添加一个modify_booking节点,允许用户在预订完成后修改已有预订信息(日期、人数等),修改时重新触发审批流程。
  3. 在生产场景中,某些审批可能需要数小时甚至跨天。设计与LangGraph配合的异步审批状态管理(含超时处理与状态回查),用代码实现核心部分。

思考题

  1. 设想一个金融交易系统:下单→风控校验→人工审批→执行。如果审批节点因为人工决策需要退回修改订单参数(如修改金额),你会如何设计状态管理来保证修改后的参数正确传递并继续后续审批流程?讨论State结构、中断恢复参数和幂等设计之间的配合策略。

下节课预告

第11节课:断点调试、状态回溯与时间旅行功能精讲

HIL让我们明白“暂停工作流等人工”的巨大价值,但调试时如果能在任意历史节点暂停、回滚、分析状态、重新执行,效率会更高。下一节课我们将学习:

  • 断点原理:检查点存储机制与状态快照结构
  • 手动断点配置interrupt_before/interrupt_after的调试用法
  • 自动断点策略:条件触发式断点
  • 状态快照保存:使用检查点保存任意时刻的状态
  • 时间旅行回溯get_state_history()查看历史,update_state()回滚重建
  • 回滚与重试流程:从失败节点重新执行
  • 调试工具使用:结合VSCode调试器与LangSmith
  • 复杂工作流排错实战:通过时间旅行定位状态突变源头

课前准备

  • 熟悉第5课的状态管理和本节课的checkpointer机制
  • 准备一个包含5个以上节点的复杂工作流用于调试实践
  • 预习graph.get_state_history()graph.update_state()的API

下一节课后,你将拥有专业级的LangGraph调试能力,任何复杂工作流都能轻松排查问题!


🔗《20节课 LangGraph 从入门到精通》系列课程导航

去订阅

🌟 感谢您耐心阅读到这里!
💡 如果本文对您有所启发欢迎:
👍 点赞📌 收藏 📤 分享给更多需要的伙伴。
🗣️ 期待在评论区看到您的想法, 共同进步。
🔔 关注我,持续获取更多干货内容~
🤗 我们下篇文章见~

Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐