Eino检查点:执行状态保存与恢复
在复杂的LLM应用开发中,长时间运行的图(Graph)执行过程经常会遇到中断、故障或需要暂停的情况。Eino框架提供了强大的检查点(Checkpoint)机制,能够自动保存和恢复执行状态,确保应用在中断后能够从断点继续执行,而不是从头开始。检查点机制是Eino框架的核心功能之一,它解决了以下几个关键问题:- **执行中断恢复**:在长时间运行的AI应用中,网络波动、服务重启等情况时有发生...
Eino检查点:执行状态保存与恢复
【免费下载链接】eino 项目地址: https://gitcode.com/GitHub_Trending/ei/eino
概述
在复杂的LLM应用开发中,长时间运行的图(Graph)执行过程经常会遇到中断、故障或需要暂停的情况。Eino框架提供了强大的检查点(Checkpoint)机制,能够自动保存和恢复执行状态,确保应用在中断后能够从断点继续执行,而不是从头开始。
检查点机制是Eino框架的核心功能之一,它解决了以下几个关键问题:
- 执行中断恢复:在长时间运行的AI应用中,网络波动、服务重启等情况时有发生
- 状态持久化:保存中间计算结果,避免重复计算
- 调试和监控:提供执行快照,便于问题排查和性能分析
- 分布式协调:支持多节点间的状态同步和恢复
检查点核心概念
检查点存储接口
Eino定义了CheckPointStore接口来抽象检查点的存储机制:
type CheckPointStore interface {
Get(ctx context.Context, checkPointID string) ([]byte, bool, error)
Set(ctx context.Context, checkPointID string, checkPoint []byte) error
}
序列化接口
支持自定义序列化机制:
type Serializer interface {
Serialize(v any) ([]byte, error)
Deserialize(data []byte, v any) error
}
检查点数据结构
检查点包含完整的执行状态信息:
type checkpoint struct {
Channels map[string]channel // 通道数据
Inputs map[string]any // 节点输入
State any // 全局状态
SkipPreHandler map[string]bool // 跳过预处理标记
RerunNodes []string // 需要重新运行的节点
ToolsNodeExecutedTools map[string]map[string]string // 已执行工具记录
SubGraphs map[string]*checkpoint // 子图检查点
}
检查点使用场景
1. 基本检查点使用
// 创建内存存储
store := newInMemoryStore()
// 编译图时启用检查点
g := NewGraph[string, string]()
// ... 添加节点和边
r, err := g.Compile(ctx, WithCheckPointStore(store))
// 执行时指定检查点ID
result, err := r.Invoke(ctx, "input", WithCheckPointID("checkpoint_1"))
2. 中断和恢复
// 设置中断点 - 在特定节点前后中断
r, err := g.Compile(ctx,
WithCheckPointStore(store),
WithInterruptAfterNodes([]string{"node1"}),
WithInterruptBeforeNodes([]string{"node2"}))
// 执行时会抛出中断错误
_, err := r.Invoke(ctx, "input", WithCheckPointID("checkpoint_1"))
if err != nil {
// 提取中断信息
info, existed := ExtractInterruptInfo(err)
if existed {
// 处理中断状态
fmt.Printf("中断节点: %v\n", info.AfterNodes)
}
}
// 从检查点恢复执行
result, err := r.Invoke(ctx, "", WithCheckPointID("checkpoint_1"))
3. 状态修改器
// 恢复时修改状态
result, err := r.Invoke(ctx, "", WithCheckPointID("checkpoint_1"),
WithStateModifier(func(ctx context.Context, path NodePath, state any) error {
// 修改状态值
state.(*MyState).Value = "modified"
return nil
}))
检查点配置选项
Eino提供了丰富的检查点配置选项:
| 选项 | 描述 | 使用场景 |
|---|---|---|
WithCheckPointStore |
设置检查点存储 | 持久化存储配置 |
WithCheckPointID |
指定检查点ID | 标识特定执行实例 |
WithWriteToCheckPointID |
指定写入检查点ID | 分离读写检查点 |
WithForceNewRun |
强制全新运行 | 忽略现有检查点 |
WithStateModifier |
状态修改器 | 恢复时修改状态 |
WithInterruptBeforeNodes |
节点前中断 | 调试和监控 |
WithInterruptAfterNodes |
节点后中断 | 执行控制 |
流处理与检查点
Eino的检查点机制完美支持流处理:
// 流处理检查点示例
streamResult, err := r.Stream(ctx, "input", WithCheckPointID("stream_checkpoint"))
if err != nil {
info, existed := ExtractInterruptInfo(err)
if existed {
// 从检查点恢复流处理
streamResult, err = r.Stream(ctx, "", WithCheckPointID("stream_checkpoint"))
// 继续处理流数据
for {
chunk, err := streamResult.Recv()
if err == io.EOF {
break
}
// 处理数据块
}
}
}
子图检查点
Eino支持嵌套子图的检查点管理:
// 创建子图
subG := NewGraph[string, string]()
// ... 配置子图
// 主图中包含子图
g := NewGraph[string, string]()
err := g.AddGraphNode("subgraph_node", subG,
WithGraphCompileOptions(WithInterruptAfterNodes([]string{"sub_node1"})))
// 子图的检查点会自动嵌套在主图检查点中
info, existed := ExtractInterruptInfo(err)
if existed && info.SubGraphs != nil {
subInfo := info.SubGraphs["subgraph_node"]
// 处理子图中断信息
}
自定义类型序列化
对于自定义类型,需要注册序列化支持:
// 注册自定义类型
type MyCustomStruct struct {
Value string
}
func init() {
RegisterSerializableType[MyCustomStruct]("my_custom_struct")
}
// 或者在运行时注册
err := RegisterSerializableType[MyCustomStruct]("my_custom_struct")
if err != nil {
// 处理注册错误
}
错误处理与中断
中断错误类型
Eino提供了多种中断错误处理机制:
// 1. 普通中断错误
if err != nil {
info, existed := ExtractInterruptInfo(err)
if existed {
// 处理中断
}
}
// 2. 重新运行中断
func myNodeFunc(ctx context.Context, input string) (string, error) {
if shouldRerun {
return "", NewInterruptAndRerunErr("需要重新运行")
}
return "result", nil
}
// 3. 检查重新运行错误
extra, isRerun := IsInterruptRerunError(err)
if isRerun {
fmt.Printf("重新运行原因: %v\n", extra)
}
中断信息结构
type InterruptInfo struct {
State any // 全局状态
BeforeNodes []string // 待执行节点
AfterNodes []string // 已执行节点
RerunNodes []string // 需要重新运行的节点
RerunNodesExtra map[string]any // 重新运行额外信息
SubGraphs map[string]*InterruptInfo // 子图中断信息
}
实践示例
示例1:简单的检查点使用
func SimpleCheckpointExample() {
store := newInMemoryStore()
g := NewGraph[string, string]()
g.AddLambdaNode("node1", func(ctx context.Context, input string) (string, error) {
return input + "_processed", nil
})
g.AddEdge(START, "node1")
g.AddEdge("node1", END)
// 编译时启用检查点
r, _ := g.Compile(ctx, WithCheckPointStore(store))
// 第一次执行并保存检查点
result, err := r.Invoke(ctx, "input", WithCheckPointID("exec_1"))
if err != nil {
if info, ok := ExtractInterruptInfo(err); ok {
// 从检查点恢复
result, _ = r.Invoke(ctx, "", WithCheckPointID("exec_1"))
}
}
}
示例2:带状态修改的检查点
func CheckpointWithStateModification() {
type ExecutionState struct {
Attempts int
LastResult string
}
g := NewGraph[string, string](WithGenLocalState(func(ctx context.Context) *ExecutionState {
return &ExecutionState{}
}))
// ... 图配置
r, _ := g.Compile(ctx, WithCheckPointStore(store))
_, err := r.Invoke(ctx, "input", WithCheckPointID("run_1"))
if err != nil {
result, _ := r.Invoke(ctx, "", WithCheckPointID("run_1"),
WithStateModifier(func(ctx context.Context, path NodePath, state any) error {
state.(*ExecutionState).Attempts++
state.(*ExecutionState).LastResult = "recovered"
return nil
}))
}
}
性能考虑
使用检查点时需要考虑以下性能因素:
- 序列化开销:大型状态的序列化/反序列化成本
- 存储IO:检查点存储的读写性能
- 内存占用:检查点数据的内存使用
- 网络传输:分布式环境下的检查点传输
建议:
- 对于频繁执行的短任务,可以禁用检查点
- 使用高效的序列化格式(如Protocol Buffers)
- 选择高性能的存储后端(如Redis、内存存储)
- 定期清理过期的检查点
最佳实践
- 检查点命名策略:使用有意义的检查点ID,便于管理和查找
- 存储后端选择:根据业务需求选择合适的存储后端
- 错误处理:正确处理各种中断和错误场景
- 状态设计:设计简洁的状态结构,减少序列化开销
- 监控告警:监控检查点使用情况和性能指标
总结
Eino的检查点机制为LLM应用提供了强大的执行状态管理能力。通过检查点,开发者可以:
- ✅ 实现执行中断的自动恢复
- ✅ 保存和恢复复杂的执行状态
- ✅ 支持流处理的中断和继续
- ✅ 管理嵌套子图的执行状态
- ✅ 自定义状态修改和序列化策略
检查点功能使得Eino框架特别适合构建需要高可靠性和长时间运行的AI应用,为生产环境部署提供了坚实的技术保障。
【免费下载链接】eino 项目地址: https://gitcode.com/GitHub_Trending/ei/eino
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)