Eino检查点:执行状态保存与恢复

【免费下载链接】eino 【免费下载链接】eino 项目地址: https://gitcode.com/GitHub_Trending/ei/eino

概述

在复杂的LLM应用开发中,长时间运行的图(Graph)执行过程经常会遇到中断、故障或需要暂停的情况。Eino框架提供了强大的检查点(Checkpoint)机制,能够自动保存和恢复执行状态,确保应用在中断后能够从断点继续执行,而不是从头开始。

检查点机制是Eino框架的核心功能之一,它解决了以下几个关键问题:

  • 执行中断恢复:在长时间运行的AI应用中,网络波动、服务重启等情况时有发生
  • 状态持久化:保存中间计算结果,避免重复计算
  • 调试和监控:提供执行快照,便于问题排查和性能分析
  • 分布式协调:支持多节点间的状态同步和恢复

检查点核心概念

检查点存储接口

Eino定义了CheckPointStore接口来抽象检查点的存储机制:

type CheckPointStore interface {
    Get(ctx context.Context, checkPointID string) ([]byte, bool, error)
    Set(ctx context.Context, checkPointID string, checkPoint []byte) error
}

序列化接口

支持自定义序列化机制:

type Serializer interface {
    Serialize(v any) ([]byte, error)
    Deserialize(data []byte, v any) error
}

检查点数据结构

检查点包含完整的执行状态信息:

type checkpoint struct {
    Channels       map[string]channel          // 通道数据
    Inputs         map[string]any              // 节点输入
    State          any                         // 全局状态
    SkipPreHandler map[string]bool             // 跳过预处理标记
    RerunNodes     []string                    // 需要重新运行的节点
    
    ToolsNodeExecutedTools map[string]map[string]string  // 已执行工具记录
    SubGraphs       map[string]*checkpoint     // 子图检查点
}

检查点使用场景

1. 基本检查点使用

// 创建内存存储
store := newInMemoryStore()

// 编译图时启用检查点
g := NewGraph[string, string]()
// ... 添加节点和边
r, err := g.Compile(ctx, WithCheckPointStore(store))

// 执行时指定检查点ID
result, err := r.Invoke(ctx, "input", WithCheckPointID("checkpoint_1"))

2. 中断和恢复

// 设置中断点 - 在特定节点前后中断
r, err := g.Compile(ctx, 
    WithCheckPointStore(store),
    WithInterruptAfterNodes([]string{"node1"}),
    WithInterruptBeforeNodes([]string{"node2"}))

// 执行时会抛出中断错误
_, err := r.Invoke(ctx, "input", WithCheckPointID("checkpoint_1"))
if err != nil {
    // 提取中断信息
    info, existed := ExtractInterruptInfo(err)
    if existed {
        // 处理中断状态
        fmt.Printf("中断节点: %v\n", info.AfterNodes)
    }
}

// 从检查点恢复执行
result, err := r.Invoke(ctx, "", WithCheckPointID("checkpoint_1"))

3. 状态修改器

// 恢复时修改状态
result, err := r.Invoke(ctx, "", WithCheckPointID("checkpoint_1"),
    WithStateModifier(func(ctx context.Context, path NodePath, state any) error {
        // 修改状态值
        state.(*MyState).Value = "modified"
        return nil
    }))

检查点配置选项

Eino提供了丰富的检查点配置选项:

选项 描述 使用场景
WithCheckPointStore 设置检查点存储 持久化存储配置
WithCheckPointID 指定检查点ID 标识特定执行实例
WithWriteToCheckPointID 指定写入检查点ID 分离读写检查点
WithForceNewRun 强制全新运行 忽略现有检查点
WithStateModifier 状态修改器 恢复时修改状态
WithInterruptBeforeNodes 节点前中断 调试和监控
WithInterruptAfterNodes 节点后中断 执行控制

流处理与检查点

Eino的检查点机制完美支持流处理:

mermaid

// 流处理检查点示例
streamResult, err := r.Stream(ctx, "input", WithCheckPointID("stream_checkpoint"))
if err != nil {
    info, existed := ExtractInterruptInfo(err)
    if existed {
        // 从检查点恢复流处理
        streamResult, err = r.Stream(ctx, "", WithCheckPointID("stream_checkpoint"))
        // 继续处理流数据
        for {
            chunk, err := streamResult.Recv()
            if err == io.EOF {
                break
            }
            // 处理数据块
        }
    }
}

子图检查点

Eino支持嵌套子图的检查点管理:

// 创建子图
subG := NewGraph[string, string]()
// ... 配置子图

// 主图中包含子图
g := NewGraph[string, string]()
err := g.AddGraphNode("subgraph_node", subG, 
    WithGraphCompileOptions(WithInterruptAfterNodes([]string{"sub_node1"})))

// 子图的检查点会自动嵌套在主图检查点中
info, existed := ExtractInterruptInfo(err)
if existed && info.SubGraphs != nil {
    subInfo := info.SubGraphs["subgraph_node"]
    // 处理子图中断信息
}

自定义类型序列化

对于自定义类型,需要注册序列化支持:

// 注册自定义类型
type MyCustomStruct struct {
    Value string
}

func init() {
    RegisterSerializableType[MyCustomStruct]("my_custom_struct")
}

// 或者在运行时注册
err := RegisterSerializableType[MyCustomStruct]("my_custom_struct")
if err != nil {
    // 处理注册错误
}

错误处理与中断

中断错误类型

Eino提供了多种中断错误处理机制:

// 1. 普通中断错误
if err != nil {
    info, existed := ExtractInterruptInfo(err)
    if existed {
        // 处理中断
    }
}

// 2. 重新运行中断
func myNodeFunc(ctx context.Context, input string) (string, error) {
    if shouldRerun {
        return "", NewInterruptAndRerunErr("需要重新运行")
    }
    return "result", nil
}

// 3. 检查重新运行错误
extra, isRerun := IsInterruptRerunError(err)
if isRerun {
    fmt.Printf("重新运行原因: %v\n", extra)
}

中断信息结构

type InterruptInfo struct {
    State           any                    // 全局状态
    BeforeNodes     []string              // 待执行节点
    AfterNodes      []string              // 已执行节点
    RerunNodes      []string              // 需要重新运行的节点
    RerunNodesExtra map[string]any        // 重新运行额外信息
    SubGraphs       map[string]*InterruptInfo  // 子图中断信息
}

实践示例

示例1:简单的检查点使用

func SimpleCheckpointExample() {
    store := newInMemoryStore()
    
    g := NewGraph[string, string]()
    g.AddLambdaNode("node1", func(ctx context.Context, input string) (string, error) {
        return input + "_processed", nil
    })
    g.AddEdge(START, "node1")
    g.AddEdge("node1", END)
    
    // 编译时启用检查点
    r, _ := g.Compile(ctx, WithCheckPointStore(store))
    
    // 第一次执行并保存检查点
    result, err := r.Invoke(ctx, "input", WithCheckPointID("exec_1"))
    if err != nil {
        if info, ok := ExtractInterruptInfo(err); ok {
            // 从检查点恢复
            result, _ = r.Invoke(ctx, "", WithCheckPointID("exec_1"))
        }
    }
}

示例2:带状态修改的检查点

func CheckpointWithStateModification() {
    type ExecutionState struct {
        Attempts int
        LastResult string
    }
    
    g := NewGraph[string, string](WithGenLocalState(func(ctx context.Context) *ExecutionState {
        return &ExecutionState{}
    }))
    
    // ... 图配置
    
    r, _ := g.Compile(ctx, WithCheckPointStore(store))
    
    _, err := r.Invoke(ctx, "input", WithCheckPointID("run_1"))
    if err != nil {
        result, _ := r.Invoke(ctx, "", WithCheckPointID("run_1"),
            WithStateModifier(func(ctx context.Context, path NodePath, state any) error {
                state.(*ExecutionState).Attempts++
                state.(*ExecutionState).LastResult = "recovered"
                return nil
            }))
    }
}

性能考虑

使用检查点时需要考虑以下性能因素:

  1. 序列化开销:大型状态的序列化/反序列化成本
  2. 存储IO:检查点存储的读写性能
  3. 内存占用:检查点数据的内存使用
  4. 网络传输:分布式环境下的检查点传输

建议:

  • 对于频繁执行的短任务,可以禁用检查点
  • 使用高效的序列化格式(如Protocol Buffers)
  • 选择高性能的存储后端(如Redis、内存存储)
  • 定期清理过期的检查点

最佳实践

  1. 检查点命名策略:使用有意义的检查点ID,便于管理和查找
  2. 存储后端选择:根据业务需求选择合适的存储后端
  3. 错误处理:正确处理各种中断和错误场景
  4. 状态设计:设计简洁的状态结构,减少序列化开销
  5. 监控告警:监控检查点使用情况和性能指标

总结

Eino的检查点机制为LLM应用提供了强大的执行状态管理能力。通过检查点,开发者可以:

  • ✅ 实现执行中断的自动恢复
  • ✅ 保存和恢复复杂的执行状态
  • ✅ 支持流处理的中断和继续
  • ✅ 管理嵌套子图的执行状态
  • ✅ 自定义状态修改和序列化策略

检查点功能使得Eino框架特别适合构建需要高可靠性和长时间运行的AI应用,为生产环境部署提供了坚实的技术保障。

【免费下载链接】eino 【免费下载链接】eino 项目地址: https://gitcode.com/GitHub_Trending/ei/eino

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐