注:以下所提的“文档”,是指LangGraph官方指南(https://langchain-ai.github.io/langgraph/guides/),参考版本是0.6.8。

以下代码的开发环境

[project]
name = "my-langgraph"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
    "dotenv>=0.9.9",
    "langchain>=0.3.27",
    "langchain-openai>=0.3.33",
    "langfuse>=3.5.0",
    "langgraph>=0.6.7",
    "langgraph-checkpoint-sqlite>=2.0.11",
    "numpy>=2.3.3",
]

一、核心概念与功能总览

“流式输出”(streaming)在 LangGraph 中是指在图(Graph)、Agent 或 工作流执行过程中,边执行边产出中间结果,而不是等整个执行结束再一次性返回全部结果。这个机制在对话、反馈、进度更新、长任务等场景非常有意义。

文档从以下几个维度介绍如何在 LangGraph 中使用流式输出:

维度 / 视角

说明 / 作用

支持的流式模式(stream modes)

定义可选的输出类型,如 state 的更新、完整状态、LLM token、工具更新、自定义数据、调试信息等

在 Agent 上使用

如何让 Agent 在执行期间就产出进展、LLM token、工具调用输出等

在 Graph / Workflow 上使用

让工作流节点(StateGraph)支持流输出,中间节点、子图输出、节点状态更新等

组合与控制

如何组合多种模式、过滤、禁用某些模型的流、在不同 Python 版本下兼容性处理等

下面我依次展开各部分内容。

二、支持的流式模式(Stream Modes)

文档列出了几种可传给 .stream() 或 .astream() 的模式(可以是一个,也可以是多个组合):

模式名

功能 / 输出内容

values

每一步执行后,流式输出当前图(Graph)的完整状态值(即 state 全量)

updates

每一步执行后,输出那一步对状态的 增量更新(哪些节点改了哪些字段)

custom

输出用户在节点或工具内部使用 get_stream_writer() 发出的 自定义数据(如进度、日志、内部状态)

messages

在涉及 LLM 的节点(如调用聊天模型)时,按 token / 元数据分段输出 LLM 的响应(或中间 token)

debug

尽可能输出最全面的信息,包括节点名、状态、更新等,是调试用途的流式模式

组合模式:可以将多个模式同时指定,比如 stream_mode=["updates", "messages", "custom"],然后返回的流会以 (模式名, chunk) 的形式标识来自哪种模式的输出。

三、在 Agent 上启用流式输出

文档说明了如何在 LangGraph 的 Agent 上使用 .stream(...) 或 .astream(...) 方法启动流式输出,以及常见的几种用途:

1. Agent 进度(Agent Progress)
用 stream_mode="updates",可以实时看到 Agent 在每一步的节点活动,比如:

  • LLM 节点发出工具调用请求
  • 工具节点执行
  • LLM 节点输出最终回答

这样可以把 Agent 的执行过程拆解出来,便于监控、可视化等。

2. LLM Token 输出
用 stream_mode="messages",可以在 Agent 执行中,逐 Token 获取 LLM 的输出片段 + 对应元数据(如所属节点、调用上下文等)。

3. 工具内部输出 / 更新
如果工具内部希望输出中间状态(如进度、日志等),可以在工具内部调用 get_stream_writer()。工具执行时可发出这些中间消息,若用户开启了 custom 模式,就能被捕获到。
需要注意:如果在异步环境(尤其是 Python 早期版本如 < 3.11)下,get_stream_writer() 用法会受限,需要用特殊方式传 writer 参数。

4. 混合流 / 多模式流
如前所述,可以同时用多个模式(updates, messages, custom 等),从不同维度观察执行情况。

5. 禁用某些模型的流式输出
在一些复杂场景(如多 Agent 协同),你可能希望部分模型不要以流式输出。文档指出可以在初始化模型时设置 disable_streaming=True 来关闭该模型的流式能力。

6. 异步与 Python 版本兼容性

  • 在 Python < 3.11 的版本中,async 语境下自动 context 传播受限,LangGraph 无法自动处理流式 writer 的上下文传递。此时你要显式在节点 / 工具函数签名中加入 writer 或传 RunnableConfig。
  • 在 Python ≥ 3.11(或更高版本)环境中,这些限制大部分可以被自动处理。文档建议优先使用较新版本。

四、在 Graph / Workflow 上启用流式输出

除了 Agent,LangGraph 的核心之一是 StateGraph / workflow(节点 + 有向边 + 状态传播)机制,文档也说明了如何在这些图结构上使用流式输出:

1. 基本方式
Graph/Webflow 对象提供 .stream(inputs, stream_mode=…) 和 .astream(...) 方法,可将流输出作为迭代器返回。用法与 Agent 部分类似。

2. 节点输出 / 子图输出

  • 使用 updates / values 模式可以观察每个节点对状态的更新或中间状态整体。
  • 若 Graph 内部包含子图(subgraphs),可以指定参数 subgraphs=True,则流式输出中也会包含子图的中间输出。输出项会带上命名空间路径(namespace),用来标识这个输出是哪个子图 / 节点出来的。
  • debug 模式可以输出更丰富的信息,便于调试 Graph 执行流程与状态演变。

3. 过滤 / 选择性流

  • 按 LLM 调用标签(tags)过滤:在初始化模型时可以附加 tags,流出的 metadata 会包含这些标签。结合 stream_mode="messages",可以根据 metadata 的 tags 过滤只要某些特定模型的 token 输出。
  • 按节点过滤:同样在 messages 模式下,metadata 包含 langgraph_node 字段,可以检查这个字段,只输出某个节点的 token 输出。

4. 自定义节点 / 工具中的数据流
节点或工具函数内部可用 get_stream_writer() 发出任意结构的数据(字典、日志、状态等)。外部若用 custom 模式接收,就可以得到这些自定义数据片段。适用于你希望在节点内部插入 progress、日志、阶段性结果等的场景。

5. 兼容任意 LLM / 自定义流模型客户端
文档指出,即便某个 LLM API 本身没有被 LangChain / LangGraph 支持其 std 流式接口,你也可以在节点中使用你的自定义流式客户端,把流片段通过 writer 发出。这样结合 custom 模式也能把这个外部 LLM 的流式结果纳入 LangGraph 的流体系中。

五、使用建议 / 注意事项

以下是文档中以及我自己的理解中,需要特别注意或推荐的地方:

1. 版本兼容性
由于文档本身提示在 v1.0 版本后会废弃,所以在未来升级时要注意流式 API 是否有变动。

2. Python 版本差异
在 Python 小于 3.11 的环境下,异步流式机制有一些手工操作(传 writer、显式 config 等)需要开发者注意。建议尽量使用现代版本(>= 3.11)以简化处理。

3. 流模式组合要合理
虽然可以组合多个模式(updates, messages, custom 等),但过多组合可能导致输出噪声或冗余信息。应根据使用场景挑选所需维度(是否关心 token、状态更新、内部进度等)。

4. 子图 / 嵌套结构
在复杂图 / 有子图调用的场景下,启用 subgraphs=True 能更完整地捕获子图的活动;但也会引入输出中的 namespace、路径等复杂性。使用者需要处理好命名与层次。

5. 在工具 / 节点中使用 get_stream_writer 的限制

  • 在同步环境中比较直接可用。
  • 在异步环境 / Python < 3.11,可能需要手工将 writer 作为参数传入节点 / 工具函数。文档有示例说明这类场景如何处理。
  • 如果在工具内部调用 get_stream_writer,则该工具不能单独在图以外场景调用(文档里有这一提示)

6. 调试模式用途
如果遇到 Graph 执行异常、状态异常或输出不一致的问题,可以先用 debug 模式观察整个执行过程、状态演变、节点行为等,辅助定位问题。

六、总结(映射对比 & 应用场景)

总体来说,在指南文档中把 LangGraph 在执行期间的 增量输出能力 做了一个比较全面的介绍。其核心价值在于:

  • 允许在长流程 / 多节点 / 多模型调用的系统中,边执行边观察,而不是要等整个流程完毕才能拿到结果。
  • 提供多个维度(状态更新、完整状态、LLM token、工具内自定义数据、调试信息)来观察执行过程。
  • 支持在 Agent、Graph / Workflow 两个层面使用流式输出。Agent 适合对话 + 工具调用这类场景;Graph 则适合更通用和复杂的状态驱动流程。
  • 具备较强的灵活性:可以在节点 / 工具内部自定义输出流,也可以集成外部不兼容的 LLM 流客户端。
  • 兼顾异步 / 同步、不同 Python 版本等环境下的兼容性(虽然在某些早期版本下需要额外配置或手工操作)。

在实际应用上,这种机制尤其适合:

  • 聊天机器人 / Agent:希望用户看到系统“说话”的过程,而不是屏幕空白直到回复完整;
  • 多阶段流程:例如信息检索 → 处理 → 生成,可能希望在每个阶段都向前端推进进度或中间结果;
  • 大模型调用过程监控 / 日志 / 可视化:用于调试、监控或展示;
  • 长任务 / 批处理任务:可以把中间结果、状态、日志逐步输出,而不是 “卡住” 等结束。

七、演示代码

以下代码演示使用不同流模式的输出。代码中使用的LLM是本地部署,兼容OpenAI接口的模型推理框架。需要运行时根据情况作出修改。

import time
from typing import TypedDict, Literal, Dict, Any

from langgraph.graph import StateGraph, START, END
from langgraph.config import get_stream_writer  # 用于在节点内发自定义流
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage


# 1) 定义流程状态
class FlowState(TypedDict, total=False):
    task: str
    plan: str
    progress: str
    result: str


# 2) 各节点函数 —— 返回“状态增量”(partial state)
def node_plan(state: FlowState) -> Dict[str, Any]:
    """
    做一个简单规划,并通过自定义流输出。
    """
    writer = get_stream_writer()  # custom 流
    writer({"event": "plan:start", "msg": f"Planning for task: {state.get('task')}"})
    plan_text = f"Steps: parse → compute → summarize ({state.get('task')})"
    writer({"event": "plan:done", "plan": plan_text})
    return {"plan": plan_text, "progress": "planned"}


def node_work(state: FlowState) -> Dict[str, Any]:
    """
    模拟计算过程,持续通过自定义流输出进度(progress)。
    """
    writer = get_stream_writer()
    for i in range(1, 4):
        time.sleep(0.4)  # 模拟耗时
        writer({"event": "work:tick", "step": i, "msg": f"processing chunk {i}/3"})
    writer({"event": "work:done", "msg": "all chunks processed"})
    return {"progress": "computed"}


def node_finalize(state: FlowState) -> Dict[str, Any]:
    """
    总结结果,并输出一个最终事件。
    """
    writer = get_stream_writer()
    result = f"✅ Done: {state.get('task')} | plan=({state.get('plan')})"
    writer({"event": "final", "result_preview": result[:80]})
    return {"result": result, "progress": "finished"}

def node_finalize_llm(state: FlowState) -> Dict[str, Any]:
    """
    让 LLM 负责总结;LangGraph 会把 token 以 (token, metadata) 形式推到“messages”流
    """
    llm = ChatOpenAI(model="qwen", api_key="empty", base_url="http://192.168.0.156:8000/v1", temperature=0.2)
    prompt = f"总结任务的计划和结果: {state.get('task')}\nPlan: {state.get('plan')}"
    msg = llm.invoke([HumanMessage(content=prompt)])
    return {"result": msg.content, "progress": "finished"}

# 3) 组装图
def build_graph():
    graph = StateGraph(FlowState)
    graph.add_node("plan", node_plan)
    graph.add_node("work", node_work)
    graph.add_node("finalize", node_finalize_llm)

    graph.add_edge(START, "plan")
    graph.add_edge("plan", "work")
    graph.add_edge("work", "finalize")
    graph.add_edge("finalize", END)

    return graph.compile()


def stream_example_main():
    graph = build_graph()

    # 关键:用 stream_mode 同时订阅“状态增量 updates”与“自定义流 custom”
    # - updates: 哪个节点改了哪些字段
    # - custom: 节点内 get_stream_writer() 发出的任意数据
    for mode, chunk in graph.stream(
        {"task": "demo: build a tiny streaming flow"},
        stream_mode=["updates", "values"],
    ):
        if mode == "updates":
            # 每个节点执行后,会吐出该节点对状态的增量
            print("🔸 [updates]", chunk)
        elif mode == "custom":
            # 节点内部进度/日志/阶段性结果
            print("🟣 [custom ]", chunk)
        elif mode == "messages":
            token, metadata = chunk
            print("🔹 [token  ]", token)  # 逐 token
            # print("meta:", metadata)             # 需要的话也可以看元数据(包含节点名等)
        elif mode == "values":
            # 每个节点执行完成后,此时整个 state 的完整快照
            print("🟢 [values ]", chunk)

    # 执行完成后,也可以直接拿最终状态(普通 .invoke 亦可)
    final_state = graph.invoke({"task": "demo: build a tiny streaming flow"})
    print("\n🏁 FINAL STATE:", final_state)

if __name__ == "__main__":
    stream_example_main()

Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐