LangGraph-流式输出(Streaming)
(如进度、日志、内部状态)messages在涉及 LLM 的节点(如调用聊天模型)时,按 token / 元数据分段输出 LLM 的响应(或中间 token)debug尽可能输出最全面的信息,包括节点名、状态、更新等,是调试用途的流式模式。
注:以下所提的“文档”,是指LangGraph官方指南(https://langchain-ai.github.io/langgraph/guides/),参考版本是0.6.8。
以下代码的开发环境:
[project]
name = "my-langgraph"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"dotenv>=0.9.9",
"langchain>=0.3.27",
"langchain-openai>=0.3.33",
"langfuse>=3.5.0",
"langgraph>=0.6.7",
"langgraph-checkpoint-sqlite>=2.0.11",
"numpy>=2.3.3",
]
一、核心概念与功能总览
“流式输出”(streaming)在 LangGraph 中是指在图(Graph)、Agent 或 工作流执行过程中,边执行边产出中间结果,而不是等整个执行结束再一次性返回全部结果。这个机制在对话、反馈、进度更新、长任务等场景非常有意义。
文档从以下几个维度介绍如何在 LangGraph 中使用流式输出:
|
维度 / 视角 |
说明 / 作用 |
|
支持的流式模式(stream modes) |
定义可选的输出类型,如 state 的更新、完整状态、LLM token、工具更新、自定义数据、调试信息等 |
|
在 Agent 上使用 |
如何让 Agent 在执行期间就产出进展、LLM token、工具调用输出等 |
|
在 Graph / Workflow 上使用 |
让工作流节点(StateGraph)支持流输出,中间节点、子图输出、节点状态更新等 |
|
组合与控制 |
如何组合多种模式、过滤、禁用某些模型的流、在不同 Python 版本下兼容性处理等 |
下面我依次展开各部分内容。
二、支持的流式模式(Stream Modes)
文档列出了几种可传给 .stream() 或 .astream() 的模式(可以是一个,也可以是多个组合):
|
模式名 |
功能 / 输出内容 |
|
values |
每一步执行后,流式输出当前图(Graph)的完整状态值(即 state 全量) |
|
updates |
每一步执行后,输出那一步对状态的 增量更新(哪些节点改了哪些字段) |
|
custom |
输出用户在节点或工具内部使用 get_stream_writer() 发出的 自定义数据(如进度、日志、内部状态) |
|
messages |
在涉及 LLM 的节点(如调用聊天模型)时,按 token / 元数据分段输出 LLM 的响应(或中间 token) |
|
debug |
尽可能输出最全面的信息,包括节点名、状态、更新等,是调试用途的流式模式 |
组合模式:可以将多个模式同时指定,比如 stream_mode=["updates", "messages", "custom"],然后返回的流会以 (模式名, chunk) 的形式标识来自哪种模式的输出。
三、在 Agent 上启用流式输出
文档说明了如何在 LangGraph 的 Agent 上使用 .stream(...) 或 .astream(...) 方法启动流式输出,以及常见的几种用途:
1. Agent 进度(Agent Progress)
用 stream_mode="updates",可以实时看到 Agent 在每一步的节点活动,比如:
- LLM 节点发出工具调用请求
- 工具节点执行
- LLM 节点输出最终回答
- …
这样可以把 Agent 的执行过程拆解出来,便于监控、可视化等。
2. LLM Token 输出
用 stream_mode="messages",可以在 Agent 执行中,逐 Token 获取 LLM 的输出片段 + 对应元数据(如所属节点、调用上下文等)。
3. 工具内部输出 / 更新
如果工具内部希望输出中间状态(如进度、日志等),可以在工具内部调用 get_stream_writer()。工具执行时可发出这些中间消息,若用户开启了 custom 模式,就能被捕获到。
需要注意:如果在异步环境(尤其是 Python 早期版本如 < 3.11)下,get_stream_writer() 用法会受限,需要用特殊方式传 writer 参数。
4. 混合流 / 多模式流
如前所述,可以同时用多个模式(updates, messages, custom 等),从不同维度观察执行情况。
5. 禁用某些模型的流式输出
在一些复杂场景(如多 Agent 协同),你可能希望部分模型不要以流式输出。文档指出可以在初始化模型时设置 disable_streaming=True 来关闭该模型的流式能力。
6. 异步与 Python 版本兼容性
- 在 Python < 3.11 的版本中,async 语境下自动 context 传播受限,LangGraph 无法自动处理流式 writer 的上下文传递。此时你要显式在节点 / 工具函数签名中加入 writer 或传 RunnableConfig。
- 在 Python ≥ 3.11(或更高版本)环境中,这些限制大部分可以被自动处理。文档建议优先使用较新版本。
四、在 Graph / Workflow 上启用流式输出
除了 Agent,LangGraph 的核心之一是 StateGraph / workflow(节点 + 有向边 + 状态传播)机制,文档也说明了如何在这些图结构上使用流式输出:
1. 基本方式
Graph/Webflow 对象提供 .stream(inputs, stream_mode=…) 和 .astream(...) 方法,可将流输出作为迭代器返回。用法与 Agent 部分类似。
2. 节点输出 / 子图输出
- 使用 updates / values 模式可以观察每个节点对状态的更新或中间状态整体。
- 若 Graph 内部包含子图(subgraphs),可以指定参数 subgraphs=True,则流式输出中也会包含子图的中间输出。输出项会带上命名空间路径(namespace),用来标识这个输出是哪个子图 / 节点出来的。
- debug 模式可以输出更丰富的信息,便于调试 Graph 执行流程与状态演变。
3. 过滤 / 选择性流
- 按 LLM 调用标签(tags)过滤:在初始化模型时可以附加 tags,流出的 metadata 会包含这些标签。结合 stream_mode="messages",可以根据 metadata 的 tags 过滤只要某些特定模型的 token 输出。
- 按节点过滤:同样在 messages 模式下,metadata 包含 langgraph_node 字段,可以检查这个字段,只输出某个节点的 token 输出。
4. 自定义节点 / 工具中的数据流
节点或工具函数内部可用 get_stream_writer() 发出任意结构的数据(字典、日志、状态等)。外部若用 custom 模式接收,就可以得到这些自定义数据片段。适用于你希望在节点内部插入 progress、日志、阶段性结果等的场景。
5. 兼容任意 LLM / 自定义流模型客户端
文档指出,即便某个 LLM API 本身没有被 LangChain / LangGraph 支持其 std 流式接口,你也可以在节点中使用你的自定义流式客户端,把流片段通过 writer 发出。这样结合 custom 模式也能把这个外部 LLM 的流式结果纳入 LangGraph 的流体系中。
五、使用建议 / 注意事项
以下是文档中以及我自己的理解中,需要特别注意或推荐的地方:
1. 版本兼容性
由于文档本身提示在 v1.0 版本后会废弃,所以在未来升级时要注意流式 API 是否有变动。
2. Python 版本差异
在 Python 小于 3.11 的环境下,异步流式机制有一些手工操作(传 writer、显式 config 等)需要开发者注意。建议尽量使用现代版本(>= 3.11)以简化处理。
3. 流模式组合要合理
虽然可以组合多个模式(updates, messages, custom 等),但过多组合可能导致输出噪声或冗余信息。应根据使用场景挑选所需维度(是否关心 token、状态更新、内部进度等)。
4. 子图 / 嵌套结构
在复杂图 / 有子图调用的场景下,启用 subgraphs=True 能更完整地捕获子图的活动;但也会引入输出中的 namespace、路径等复杂性。使用者需要处理好命名与层次。
5. 在工具 / 节点中使用 get_stream_writer 的限制
- 在同步环境中比较直接可用。
- 在异步环境 / Python < 3.11,可能需要手工将 writer 作为参数传入节点 / 工具函数。文档有示例说明这类场景如何处理。
- 如果在工具内部调用 get_stream_writer,则该工具不能单独在图以外场景调用(文档里有这一提示)
6. 调试模式用途
如果遇到 Graph 执行异常、状态异常或输出不一致的问题,可以先用 debug 模式观察整个执行过程、状态演变、节点行为等,辅助定位问题。
六、总结(映射对比 & 应用场景)
总体来说,在指南文档中把 LangGraph 在执行期间的 增量输出能力 做了一个比较全面的介绍。其核心价值在于:
- 允许在长流程 / 多节点 / 多模型调用的系统中,边执行边观察,而不是要等整个流程完毕才能拿到结果。
- 提供多个维度(状态更新、完整状态、LLM token、工具内自定义数据、调试信息)来观察执行过程。
- 支持在 Agent、Graph / Workflow 两个层面使用流式输出。Agent 适合对话 + 工具调用这类场景;Graph 则适合更通用和复杂的状态驱动流程。
- 具备较强的灵活性:可以在节点 / 工具内部自定义输出流,也可以集成外部不兼容的 LLM 流客户端。
- 兼顾异步 / 同步、不同 Python 版本等环境下的兼容性(虽然在某些早期版本下需要额外配置或手工操作)。
在实际应用上,这种机制尤其适合:
- 聊天机器人 / Agent:希望用户看到系统“说话”的过程,而不是屏幕空白直到回复完整;
- 多阶段流程:例如信息检索 → 处理 → 生成,可能希望在每个阶段都向前端推进进度或中间结果;
- 大模型调用过程监控 / 日志 / 可视化:用于调试、监控或展示;
- 长任务 / 批处理任务:可以把中间结果、状态、日志逐步输出,而不是 “卡住” 等结束。
七、演示代码
以下代码演示使用不同流模式的输出。代码中使用的LLM是本地部署,兼容OpenAI接口的模型推理框架。需要运行时根据情况作出修改。
import time
from typing import TypedDict, Literal, Dict, Any
from langgraph.graph import StateGraph, START, END
from langgraph.config import get_stream_writer # 用于在节点内发自定义流
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
# 1) 定义流程状态
class FlowState(TypedDict, total=False):
task: str
plan: str
progress: str
result: str
# 2) 各节点函数 —— 返回“状态增量”(partial state)
def node_plan(state: FlowState) -> Dict[str, Any]:
"""
做一个简单规划,并通过自定义流输出。
"""
writer = get_stream_writer() # custom 流
writer({"event": "plan:start", "msg": f"Planning for task: {state.get('task')}"})
plan_text = f"Steps: parse → compute → summarize ({state.get('task')})"
writer({"event": "plan:done", "plan": plan_text})
return {"plan": plan_text, "progress": "planned"}
def node_work(state: FlowState) -> Dict[str, Any]:
"""
模拟计算过程,持续通过自定义流输出进度(progress)。
"""
writer = get_stream_writer()
for i in range(1, 4):
time.sleep(0.4) # 模拟耗时
writer({"event": "work:tick", "step": i, "msg": f"processing chunk {i}/3"})
writer({"event": "work:done", "msg": "all chunks processed"})
return {"progress": "computed"}
def node_finalize(state: FlowState) -> Dict[str, Any]:
"""
总结结果,并输出一个最终事件。
"""
writer = get_stream_writer()
result = f"✅ Done: {state.get('task')} | plan=({state.get('plan')})"
writer({"event": "final", "result_preview": result[:80]})
return {"result": result, "progress": "finished"}
def node_finalize_llm(state: FlowState) -> Dict[str, Any]:
"""
让 LLM 负责总结;LangGraph 会把 token 以 (token, metadata) 形式推到“messages”流
"""
llm = ChatOpenAI(model="qwen", api_key="empty", base_url="http://192.168.0.156:8000/v1", temperature=0.2)
prompt = f"总结任务的计划和结果: {state.get('task')}\nPlan: {state.get('plan')}"
msg = llm.invoke([HumanMessage(content=prompt)])
return {"result": msg.content, "progress": "finished"}
# 3) 组装图
def build_graph():
graph = StateGraph(FlowState)
graph.add_node("plan", node_plan)
graph.add_node("work", node_work)
graph.add_node("finalize", node_finalize_llm)
graph.add_edge(START, "plan")
graph.add_edge("plan", "work")
graph.add_edge("work", "finalize")
graph.add_edge("finalize", END)
return graph.compile()
def stream_example_main():
graph = build_graph()
# 关键:用 stream_mode 同时订阅“状态增量 updates”与“自定义流 custom”
# - updates: 哪个节点改了哪些字段
# - custom: 节点内 get_stream_writer() 发出的任意数据
for mode, chunk in graph.stream(
{"task": "demo: build a tiny streaming flow"},
stream_mode=["updates", "values"],
):
if mode == "updates":
# 每个节点执行后,会吐出该节点对状态的增量
print("🔸 [updates]", chunk)
elif mode == "custom":
# 节点内部进度/日志/阶段性结果
print("🟣 [custom ]", chunk)
elif mode == "messages":
token, metadata = chunk
print("🔹 [token ]", token) # 逐 token
# print("meta:", metadata) # 需要的话也可以看元数据(包含节点名等)
elif mode == "values":
# 每个节点执行完成后,此时整个 state 的完整快照
print("🟢 [values ]", chunk)
# 执行完成后,也可以直接拿最终状态(普通 .invoke 亦可)
final_state = graph.invoke({"task": "demo: build a tiny streaming flow"})
print("\n🏁 FINAL STATE:", final_state)
if __name__ == "__main__":
stream_example_main()
更多推荐
所有评论(0)