注:以下所提的“文档”,是指LangGraph官方指南(https://langchain-ai.github.io/langgraph/guides/),参考版本是0.6.8。

以下代码的开发环境

[project]
name = "my-langgraph"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
    "dotenv>=0.9.9",
    "langchain>=0.3.27",
    "langchain-openai>=0.3.33",
    "langfuse>=3.5.0",
    "langgraph>=0.6.7",
    "langgraph-checkpoint-sqlite>=2.0.11",
    "numpy>=2.3.3",
]

一、核心机制与模块

1. Checkpointer(检查点保存器)与 Threads

  • Checkpointer:是负责把图(Graph)执行过程中的状态快照(checkpoint)保存下来(磁盘、数据库、内存等)的组件。
  • 线程(Thread):当 graph 被执行(invoke / stream 等)时,需要指定一个 thread_id,用来标识该执行历程。每一步(super-step)执行完毕后,当前状态就会以 checkpoint 的形式“落盘”(或保存)到这个 thread 上。这个 thread 记录了一系列 checkpoint 的状态历史。
    • thread_id 是外部传入 config 的一部分:{"configurable": {"thread_id": "some_id"}}。
    • 执行完成后,可以通过 thread 拿到最新状态或历史状态。

2. Checkpoint(状态快照)

一个 checkpoint(状态快照)记录了当时图的执行状态。它主要包含以下几个维度:

  • config:对应的配置(thread_id, checkpoint_id 等)
  • metadata:元数据(如哪些节点做了写操作、step 编号、来源等)
  • values:各条 state 通道(state variables / channels)在这一点上的值
  • next:下一个应执行的节点名称(或空,如果执行结束)
  • tasks:接下来要执行的任务(Pregel 执行模型下的任务队列)
  • 时间戳 / 父 checkpoint 信息等也被记录以便追踪历史 / 关系。

文档举了一个简单例子:图含有两个节点 node_a、node_b,使用 InMemorySaver 作为 checkpointer,执行时会产生 4 个 checkpoint(包括起始、两个中间、结束)

Checkpoint 数量 & 含义

  • 初始空状态(START)
  • 每个节点执行前 / 执行后状态
  • 最终状态后无下一个节点

这些 checkpoint 形成线程的历史轨迹。

3. 获取 / 回看 / 重放(Get State, State History, Replay)

  • get_state(config)
    给定某个 thread_id(和可选 checkpoint_id),可以获取该线程在那一刻的状态快照 (StateSnapshot)。这个快照包括 state values、metadata、next 任务等。
  • get_state_history(config)
    获取某个线程的全部历史 checkpoint(按时间排序),最早到最新。便于回顾执行轨迹。
  • Replay / Time Travel
    可以通过在 invoke 时传入 checkpoint_id,让图“重放”到某个 checkpoint,然后从那一点继续执行,而不是重新从头跑。
    • 若步骤已存在,LangGraph 会“回放”(即不重新执行)那些前面已经执行过的步骤,只对 checkpoint_id 之后的节点重新运行。
    • 这样可以“时光旅行”(time travel)、“分叉 (fork)”执行轨迹、调试、探索不同路径等。

4. 更新状态(update_state)

在某些场景下,我们希望在中途人工或程序地修改状态(state),甚至回到某个 checkpoint 修改后再继续(类似游走 / 分叉)。文档提供了 graph.update_state(...) 的接口支持。

  • config:指定要更新哪个 thread(或哪个 checkpoint)
  • values:要更新的 state 通道值(可能覆盖、或与 reducer 合并)
  • as_node(可选):指定这次更新应被视为哪个节点发出的更新,以决定接下来执行哪个节点
  • 注意 reducer 机制
    • 对于有 reducer(累加 / 合并逻辑)的 state 通道,update_state 会将新值 “合并上去” 而不是简单覆盖
    • 对于没有 reducer 的通道,则直接覆盖
    • 例如:如果 state 有通道 bar 类型是 list[str] 和定义了累加 reducer,那么 update_state(..., {"bar": ["b"]}) 会把新的 ["b"] append 到原来的 bar 而不是替换。
  • as_node 的作用是控制哪一个节点被视为“最后修改状态”的节点,从而决定下一个执行节点。

这种能力很强,可用于“人工干预 / 审核 / 分支探索 / 校正状态”等场景。

5. Store(Memory Store / 跨线程状态存储)

注:checkpointer和store的实现不一样,一个是xxSaver,另一个是xxStore。

Checkpointer 保存的是针对某个 thread 的状态快照 —— 也就是图在当前线程 / 会话中的状态轨迹。但如果你想跨线程(不同 thread_id)共享某些信息(如用户的长期记忆、配置、偏好等),单靠 checkpoint 是不够的。于是文档引入了 Store 接口Memory Store 概念。

  • BaseStore / Store:抽象接口,允许存储 键值 / 记忆(namespace + key + value)
  • InMemoryStore:一个内存实现(适用于实验 / 本地)
  • 语义搜索支持:Store 支持给记忆做 embedding,以便后续的语义检索(semantic search)
    • 可以在 store 初始化时传入 embedding model、维度、要嵌入的字段等配置
    • 搜索时可按自然语言查询获取相关记忆
  • 在 LangGraph 中,把 store 与 checkpointer 一起装入 graph.compile(...),然后在节点内部可以访问 store(通过 node 参数注入)
  • 节点可以用 store.put(...) 保存记忆条目,用 store.search(...) 检索历史记忆
  • Store 存储是跨线程共享(只要命名空间一致、user_id 相同等)
  • 文档以对话机器人为例,演示如何用 user_id 做命名空间,把用户对话记忆存在 store 中,然后在不同 thread / 会话中继续检索使用这些记忆。

6. Checkpointer 实现 / 序列化 / 加密

Checkpointer 实现(Libraries)

LangGraph 提供多个持久化后端选项,以适应不同的使用场景:

  • langgraph-checkpoint(基础包,包含 BaseCheckpointSaver 接口和 InMemorySaver)
  • langgraph-checkpoint-sqlite:SQLite 后端实现 SqliteSaver(同步)及 AsyncSqliteSaver(异步)
  • langgraph-checkpoint-postgres:PostgreSQL 后端实现 PostgresSaver / AsyncPostgresSaver,用于生产或平台级持久化

序列化 / Serializer

  • Checkpointer 在保存 checkpoint 时需要把 state 中的值(可能是复杂对象)序列化。文档定义了 SerializerProtocol,默认提供的实现是 JsonPlusSerializer,支持多种类型(日期、枚举、LangChain/LangGraph 类型等)
  • 若某些类型不被默认序列化器支持(如 Pandas DataFrame、复杂自定义对象等),可以开启 pickle_fallback=True,在 JsonPlus 序列化失败时 fallback 到 pickle。
  • 加密:如果希望保存的状态是加密的,可使用 EncryptedSerializer(例如基于 AES)作为 serde 参数传给 checkpointer。
    • 文档提供 EncryptedSerializer.from_pycryptodome_aes() 方法,它会从环境变量 LANGGRAPH_AES_KEY 中读取密钥,也支持手动传入密钥。
    • SQLite / PostgreSQL 的 saver 实现都能接受加密 serde。

7. 支持能力 / 应用场景

通过持久化机制,LangGraph 支持以下高级能力 / 应用场景:

  • Human-in-the-loop(人工干预 / 审核)
    用户可以查看某个 checkpoint 的状态,做校正 / 审批 / 修改,然后让图从那一点继续执行。持久化机制使得这种中断 + 恢复执行成为可能。
  • Memory(跨会话记忆 / 长期记忆)
    通过 Store,可以把用户级别 / 会话级别长期记忆持久化,且在不同 thread / 会话中共享、检索。非常适合对话机器人 / Agent 的持久记忆能力。
  • Time Travel / Replay / Forking
    可以回到过去某个 checkpoint,重放或分叉执行不同路线用于调试 / 策略探索。
  • Fault Tolerance / 错误恢复
    如果某一步节点执行失败,系统可以从最近成功的 checkpoint 恢复,跳过那些已成功执行的节点,不必从头重新跑。
    • 当某个 super-step 内部部分节点成功、部分失败时,LangGraph 会把成功节点的写入(pending writes)持久化下来,失败的节点可以重试。
    • 这样即使程序 crash、断电、bug 中断,也能恢复执行。

二、优势、限制与注意事项

优势

  • 自动化 vs 可配置灵活:对于大多数普通用例,LangGraph API 会自动处理 checkpoint / persistence,无需手动插桩。
  • 多后端支持:可以选 InMemory(开发 / 测试用)、SQLite、Postgres 等,适配轻量 / 中型 / 规模化场景。
  • 加密支持:可选加密存储,增强安全性。
  • 丰富应用场景:支持人工干预、重放、记忆、容错、状态更新等复杂功能。
  • Reducer 与 更新一致性:update_state 与节点写入路径共用 reducer 逻辑,保持一致的处理方式。

限制 / 风险 / 注意点

  • 文档即将废弃:在当前版本的文档中所述,这部分在 v1.0 中可能被重构或替换,未来使用应留意新版文档。
  • 复杂对象序列化:默认 JsonPlusSerializer 虽然支持较多类型,但并不保证能序列化任意自定义对象。若遇到不支持的类型需要用 pickle_fallback 或自写 Serializer。
  • 性能与存储开销:保存 checkpoint、记录历史、加密、数据库读写等会带来开销。在设计高并发 / 大状态 / 长历史时要关注性能瓶颈。
  • 一致性 / 冲突 / 更新控制:在有多个并发执行 / 多线程 / 分叉场景中,状态更新、checkpoint 冲突、fork 管理、合并策略等可能较复杂。要理解 as_node、reducer 合并逻辑、pending writes 管理等机制。
  • Store 与 Checkpointer 的协同:Store 是跨线程共享的内存 / 存储,而 checkpointer 是线程内部状态快照机制。使用时要设计好命名空间、隔离性与一致性。
  • 加密密钥管理:如果启用加密,需要妥善管理密钥(如环境变量 LANGGRAPH_AES_KEY)。
  • 错误恢复边界:在极端错误 /节点中断场景,pending writes、部分写成功的状态、事务边界等要仔细理解其内部机制。

三、总结(核心思路 + 使用建议)

在这篇文章中系统地讲了 LangGraph 的持久性设计 —— 通过 checkpoint / thread 架构保存执行状态、支持重放 / 更新 / 并行 / 恢复 / 分叉;再结合 Store 支持跨线程 / 跨会话的长期记忆能力。借助这些机制,LangGraph 可以支持:

  • 在 Agent / 工作流运行过程中 “可暂停 → 审核 → 恢复”
  • 多次对话 / 会话之间的用户记忆 / 上下文保留
  • 重放 / 调试 / 时间旅行 / 分支探索
  • 出错恢复 / 容错执行

四、演示代码

import sqlite3
import time
import uuid
from typing import TypedDict, Annotated, Dict, Any, List
from operator import add

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.store.memory import InMemoryStore
from langgraph.config import get_stream_writer

"""
使用 SQLite checkpointer(自动记录每一步 checkpoint)
使用 thread_id 在同一个线程里多次运行、查看历史
使用 update_state 做“人工干预/回写后继续跑”
使用 Store(InMemoryStore) 在节点里读写“跨线程记忆”
仅用控制台消费执行过程(无 Web 界面)
"""

# ---------- 1) 定义“带 reducer 的状态结构” ----------
# notes/result 等字段将写入 checkpoint;其中 notes 使用 list 合并(add reducer)
class FlowState(TypedDict, total=False):
    task: str
    notes: Annotated[List[str], add]
    result: str


# ---------- 2) 节点函数 ----------
def node_plan(state: FlowState, *, store=None, config=None) -> Dict[str, Any]:
    """
    - 产生一个计划说明,写入 notes
    - 向 Store 里写一条“跨线程记忆”(基于 user_id 命名空间)
    - 通过 get_stream_writer() 发一条自定义日志(纯控制台演示)
    """
    writer = get_stream_writer()
    task = state.get("task", "no-task")
    plan = f"Plan for: {task} -> steps(parse, compute, summarize)"
    writer({"node": "plan", "event": "start", "msg": f"planning [{task}]"})
    writer({"node": "plan", "event": "done", "plan": plan})

    # 写一条“长期记忆”(跨线程共享):namespace 通常包含用户维度
    try:
        user_id = (config or {}).get("configurable", {}).get("user_id", "anon")
        ns = ("user", user_id)  # 命名空间 (family, key)
        # 🚩 InMemoryStore 采用 key-value;这里 value 用 dict,包含一个 text 字段,便于未来做语义搜索
        key = uuid.uuid4().hex  # 这个key值相当于记录主键,如果值相同会覆盖
        store.put(namespace=ns, key=key, value={"text": f"remember: handled task '{task}'"})
    except Exception as e:
        writer({"node": "plan", "event": "store_put_failed", "error": str(e)})

    return {"notes": [plan]}  # 使用 reducer: 会append到 notes


def node_work(state: FlowState) -> Dict[str, Any]:
    """
    - 模拟耗时处理(sleep)
    - 追加 notes
    """
    writer = get_stream_writer()
    for i in range(1, 4):
        time.sleep(0.3)
        writer({"node": "work", "event": "tick", "step": i})
    return {"notes": ["work: processed 3 chunks"]}


def node_finalize(state: FlowState, *, store=None, config=None) -> Dict[str, Any]:
    """
    - 汇总结果(result)
    - 从 Store 里读取该用户的“记忆条目”并统计条数,写入 notes
    """
    writer = get_stream_writer()
    task = state.get("task", "no-task")
    result = f"✅ Done: {task} | notes={len(state.get('notes', []))}"
    writer({"node": "finalize", "event": "preview", "result": result})

    # 读取用户记忆
    try:
        user_id = (config or {}).get("configurable", {}).get("user_id", "anon")
        ns = ("user", user_id)
        # InMemoryStore 没有强制要求查询参数;此处用最简单方式把该 ns 下所有条目取出
        # search 的具体签名可能包含 query/k 等参数,这里做最兼容的兜底
        items = []
        try:
            items = list(store.search(ns, query=None, limit=100))  # 尝试通用 search
        except TypeError:
            # 某些版本需要显式 k 参数或不接受 query=None,这里做兜底
            try:
                items = list(store.search(ns, limit=100))
            except Exception:
                items = []

        result += f" | memory_items={len(items)}"
        return {"result": result, "notes": ["finalize: summarized and counted memory"]}
    except Exception as e:
        writer({"node": "finalize", "event": "store_search_failed", "error": str(e)})
        return {"result": result}


# ---------- 3) 组装图并挂载“持久化 + Store” ----------
def build_app(db_path: str = "checkpoints.sqlite"):
    # 3.1 checkpointer(SQLite)
    conn = sqlite3.connect(db_path, check_same_thread=False)
    saver = SqliteSaver(conn)

    # 3.2 store(内存版;可替换为 PG/向量库等)
    store = InMemoryStore()

    # 3.3 构建图
    g = StateGraph(FlowState)
    g.add_node("plan", node_plan)
    g.add_node("work", node_work)
    g.add_node("finalize", node_finalize)

    g.add_edge(START, "plan")
    g.add_edge("plan", "work")
    g.add_edge("work", "finalize")
    g.add_edge("finalize", END)

    # 3.4 编译:接入 checkpointer 和 store
    app = g.compile(checkpointer=saver, store=store)
    return app


def print_line(title: str):
    print("\n" + "=" * 12, title, "=" * 12)


def main():
    # 可重复运行:保留同一个 checkpoints.sqlite,实现跨多次运行的持久性
    app = build_app("persistence_example.sqlite")

    # 统一用户、不同线程可共享 store 里的记忆
    user_id = "u-10086"
    thread_id = "t-001"

    base_config = {"configurable": {"thread_id": thread_id, "user_id": user_id}}

    print_line("RUN #1 (stream with updates+custom)")
    for mode, chunk in app.stream(
        {"task": "make a persistent flow"},
        config=base_config,
        stream_mode=["updates", "custom"],
    ):
        if mode == "updates":
            print("🔸 [updates]", chunk)
        elif mode == "custom":
            print("🟣 [custom ]", chunk)

    # 1) 读取“当前状态/检查点”
    print_line("GET CURRENT STATE")
    st = app.get_state(base_config)
    # st 里含有:config / values / next / tasks / metadata 等
    # 为了演示简洁,我们只打印 values 和下一步节点提示
    print("values:", st.values)
    print("next :", st.next)

    # 2) 查看“历史(全 checkpoint 轨迹)”
    print_line("STATE HISTORY")
    hist = list(app.get_state_history(base_config))
    print(f"history length: {len(hist)}")
    if hist:
        print("first checkpoint values:", hist[0].values)
        print("last  checkpoint values:", hist[-1].values)

    # 3) “人工干预”:把 notes 回写一条,自行指定 as_node=work,让流程从 finalize 继续
    print_line("UPDATE STATE (human-in-the-loop)")
    app.update_state(
        config=base_config,
        values={"notes": ["manual: hotfix before finalize"]},  # 使用 reducer:会 append
        as_node="work",  # 视作最近一次写入来自节点 'work',所以下一跳应到 'finalize'
    )

    # 4) 继续跑(从最新 checkpoint 的“下一步”开始)
    # input参数必需为None才会从“下一步”开始,如果指定了值,例如“{}”,则会从头开始
    print_line("RESUME AFTER UPDATE")
    for mode, chunk in app.stream(None, config=base_config, stream_mode=["updates", "custom"]):
        if mode == "updates":
            print("🔸 [updates]", chunk)
        elif mode == "custom":
            print("🟣 [custom ]", chunk)

    # 可以看到“manual: hotfix before finalize”在notes的倒数第2个
    st = app.get_state(base_config)
    print("values:", st.values)

    # 5) 展示跨线程共享的“记忆”:新开一个线程,但 user_id 不变
    print_line("NEW THREAD SHARES MEMORY")
    thread2_config = {"configurable": {"thread_id": "t-002", "user_id": user_id}}
    final2 = app.invoke({"task": "second thread uses shared memory"}, config=thread2_config)
    print("thread-2 final:", final2)

    # 6) 再看一下新线程的历史长度
    print_line("THREAD-2 HISTORY")
    hist2 = list(app.get_state_history(thread2_config))
    print(f"thread-2 history length: {len(hist2)}")
    if hist2:
        print("thread-2 last values:", hist2[-1].values)

def f1():
    store = InMemoryStore()
    ns = ("user", "k1")
    task = "fff"
    store.put(namespace=ns, key="k1", value={"text": f"remember: handled task '{task}'", "key": "k1"})
    store.put(namespace=ns, key="k2", value={"text": "second thread uses shared memory", "key": "k2"})
    store.put(namespace=ns, key="k3", value={"text": "hotfix before finalize", "key": "k3"})
    # r = store.search(ns, filter={"key": "k3"})
    # r = store.search(ns, query="aa")
    r = store.search(ns, query=None, limit=100)
    print(r)

Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐