📚 QuantumFlow工作流自动化从入门到精通 - 第2篇
在上一篇文章中,我们完成了技术选型和开发环境搭建。本文将深入数据库设计,这是整个工作流引擎的基石。一个优秀的数据模型设计,将决定系统的扩展性、性能和可维护性。


📋 本文概览

学习目标:

  • 掌握工作流引擎的核心数据模型设计
  • 理解PostgreSQL JSONB字段的高级用法
  • 学会设计高性能的数据库索引
  • 掌握Alembic数据库迁移工具

技术要点:

  • SQLAlchemy ORM模型定义
  • PostgreSQL JSONB索引优化
  • 数据库约束与关系设计
  • Alembic迁移脚本编写

预计阅读时间: 20分钟
前置知识: SQL基础、关系型数据库概念


🎯 一、数据模型设计原则

在开始具体设计之前,我们需要明确几个核心原则:

1.1 设计哲学

1. 灵活性优先

# ❌ 错误示范:为每种节点类型创建单独的表
class HttpRequestNode(Base):
    url = Column(String)
    method = Column(String)
    headers = Column(String)  # 字段会不断增加

class DatabaseQueryNode(Base):
    connection_string = Column(String)
    query = Column(Text)
    # 每种节点都需要不同的字段...

# ✅ 正确做法:使用JSONB存储灵活配置
class Node(Base):
    id = Column(UUID, primary_key=True)
    type = Column(String(50))  # 'http_request', 'database_query'
    config = Column(JSONB)  # 所有配置存储在这里
    
    # config示例:
    # {
    #   "url": "https://api.example.com",
    #   "method": "POST",
    #   "headers": {"Authorization": "Bearer xxx"}
    # }

2. 性能与规范化的平衡

-- 适度反规范化,提升查询性能
CREATE TABLE executions (
    id UUID PRIMARY KEY,
    workflow_id UUID NOT NULL,
    workflow_name VARCHAR(255),  -- 冗余字段,避免JOIN
    status VARCHAR(20),
    created_at TIMESTAMP,
    -- 存储快照,避免工作流修改后历史记录丢失
    workflow_snapshot JSONB
);

3. 可审计性

# 所有核心表都包含审计字段
class AuditMixin:
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    created_by = Column(UUID, ForeignKey('users.id'))
    updated_by = Column(UUID, ForeignKey('users.id'))

1.2 核心实体关系

┌─────────────┐         ┌─────────────┐         ┌─────────────┐
│   User      │────────▶│  Workflow   │────────▶│  Execution  │
│  (用户)     │  owns   │  (工作流)   │ triggers│  (执行记录) │
└─────────────┘         └─────────────┘         └─────────────┘
                              │                        │
                              │ contains               │ contains
                              ▼                        ▼
                        ┌─────────────┐         ┌─────────────┐
                        │    Node     │         │  NodeRun    │
                        │   (节点)    │         │ (节点执行)  │
                        └─────────────┘         └─────────────┘
                              │
                              │ connects
                              ▼
                        ┌─────────────┐
                        │    Edge     │
                        │    (边)     │
                        └─────────────┘

🗄️ 二、核心表设计

2.1 用户表(Users)

-- 用户表:存储系统用户信息
CREATE TABLE users (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    email VARCHAR(255) UNIQUE NOT NULL,
    username VARCHAR(100) UNIQUE NOT NULL,
    hashed_password VARCHAR(255) NOT NULL,
    full_name VARCHAR(255),
    is_active BOOLEAN DEFAULT TRUE,
    is_superuser BOOLEAN DEFAULT FALSE,
    
    -- 审计字段
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    last_login_at TIMESTAMP,
    
    -- 索引
    CONSTRAINT users_email_check CHECK (email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}$')
);

-- 索引
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_username ON users(username);
CREATE INDEX idx_users_is_active ON users(is_active) WHERE is_active = TRUE;

SQLAlchemy模型:

from sqlalchemy import Column, String, Boolean, DateTime
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.declarative import declarative_base
import uuid
from datetime import datetime

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    email = Column(String(255), unique=True, nullable=False, index=True)
    username = Column(String(100), unique=True, nullable=False, index=True)
    hashed_password = Column(String(255), nullable=False)
    full_name = Column(String(255))
    is_active = Column(Boolean, default=True, index=True)
    is_superuser = Column(Boolean, default=False)
    
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    last_login_at = Column(DateTime)
    
    def __repr__(self):
        return f"<User(username='{self.username}', email='{self.email}')>"

2.2 工作流表(Workflows)

这是系统的核心表,存储工作流的定义。

-- 工作流表:存储工作流定义
CREATE TABLE workflows (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR(255) NOT NULL,
    description TEXT,
    
    -- 工作流状态
    status VARCHAR(20) DEFAULT 'draft',  -- draft, active, paused, archived
    
    -- 触发器配置(JSONB存储灵活配置)
    trigger_config JSONB NOT NULL DEFAULT '{}',
    -- 示例:
    -- {
    --   "type": "webhook",
    --   "config": {
    --     "method": "POST",
    --     "path": "/webhook/abc123"
    --   }
    -- }
    
    -- 工作流全局配置
    settings JSONB DEFAULT '{}',
    -- 示例:
    -- {
    --   "timeout": 3600,
    --   "retry_policy": {
    --     "max_retries": 3,
    --     "retry_delay": 60
    --   },
    --   "error_handling": "continue"
    -- }
    
    -- 版本控制
    version INTEGER DEFAULT 1,
    is_latest BOOLEAN DEFAULT TRUE,
    parent_workflow_id UUID REFERENCES workflows(id),
    
    -- 统计信息(冗余字段,提升查询性能)
    total_executions INTEGER DEFAULT 0,
    successful_executions INTEGER DEFAULT 0,
    failed_executions INTEGER DEFAULT 0,
    last_execution_at TIMESTAMP,
    
    -- 所有者
    owner_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    
    -- 审计字段
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    created_by UUID REFERENCES users(id),
    updated_by UUID REFERENCES users(id)
);

-- 索引优化
CREATE INDEX idx_workflows_owner ON workflows(owner_id);
CREATE INDEX idx_workflows_status ON workflows(status);
CREATE INDEX idx_workflows_is_latest ON workflows(is_latest) WHERE is_latest = TRUE;
CREATE INDEX idx_workflows_created_at ON workflows(created_at DESC);

-- JSONB字段索引(支持高效查询)
CREATE INDEX idx_workflows_trigger_type ON workflows USING GIN ((trigger_config->'type'));
CREATE INDEX idx_workflows_settings ON workflows USING GIN (settings);

-- 全文搜索索引
CREATE INDEX idx_workflows_search ON workflows USING GIN (
    to_tsvector('english', coalesce(name, '') || ' ' || coalesce(description, ''))
);

SQLAlchemy模型:

from sqlalchemy import Column, String, Text, Integer, Boolean, DateTime, ForeignKey
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import relationship

class Workflow(Base):
    __tablename__ = "workflows"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    name = Column(String(255), nullable=False)
    description = Column(Text)
    status = Column(String(20), default='draft', index=True)
    
    # JSONB字段
    trigger_config = Column(JSONB, nullable=False, default={})
    settings = Column(JSONB, default={})
    
    # 版本控制
    version = Column(Integer, default=1)
    is_latest = Column(Boolean, default=True, index=True)
    parent_workflow_id = Column(UUID(as_uuid=True), ForeignKey('workflows.id'))
    
    # 统计信息
    total_executions = Column(Integer, default=0)
    successful_executions = Column(Integer, default=0)
    failed_executions = Column(Integer, default=0)
    last_execution_at = Column(DateTime)
    
    # 关系
    owner_id = Column(UUID(as_uuid=True), ForeignKey('users.id', ondelete='CASCADE'), nullable=False)
    owner = relationship("User", back_populates="workflows")
    
    nodes = relationship("Node", back_populates="workflow", cascade="all, delete-orphan")
    edges = relationship("Edge", back_populates="workflow", cascade="all, delete-orphan")
    executions = relationship("Execution", back_populates="workflow")
    
    # 审计字段
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False, index=True)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    created_by = Column(UUID(as_uuid=True), ForeignKey('users.id'))
    updated_by = Column(UUID(as_uuid=True), ForeignKey('users.id'))
    
    def __repr__(self):
        return f"<Workflow(name='{self.name}', status='{self.status}')>"

2.3 节点表(Nodes)

节点是工作流的基本执行单元。

-- 节点表:存储工作流中的节点
CREATE TABLE nodes (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workflow_id UUID NOT NULL REFERENCES workflows(id) ON DELETE CASCADE,
    
    -- 节点基本信息
    name VARCHAR(255) NOT NULL,
    description TEXT,
    type VARCHAR(50) NOT NULL,  -- 'trigger', 'action', 'condition', 'transform'
    
    -- 节点配置(核心:使用JSONB存储灵活配置)
    config JSONB NOT NULL DEFAULT '{}',
    -- 示例(HTTP请求节点):
    -- {
    --   "connector": "http",
    --   "method": "POST",
    --   "url": "https://api.example.com/users",
    --   "headers": {
    --     "Authorization": "Bearer {{secrets.api_token}}",
    --     "Content-Type": "application/json"
    --   },
    --   "body": {
    --     "name": "{{input.name}}",
    --     "email": "{{input.email}}"
    --   },
    --   "timeout": 30
    -- }
    
    -- 输入输出映射
    input_mapping JSONB DEFAULT '{}',
    output_mapping JSONB DEFAULT '{}',
    
    -- UI位置信息(用于可视化编辑器)
    position JSONB DEFAULT '{"x": 0, "y": 0}',
    
    -- 节点状态
    is_enabled BOOLEAN DEFAULT TRUE,
    
    -- 审计字段
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 索引
CREATE INDEX idx_nodes_workflow ON nodes(workflow_id);
CREATE INDEX idx_nodes_type ON nodes(type);
CREATE INDEX idx_nodes_is_enabled ON nodes(is_enabled) WHERE is_enabled = TRUE;

-- JSONB索引
CREATE INDEX idx_nodes_config ON nodes USING GIN (config);
CREATE INDEX idx_nodes_connector ON nodes USING GIN ((config->'connector'));

SQLAlchemy模型:

class Node(Base):
    __tablename__ = "nodes"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    workflow_id = Column(UUID(as_uuid=True), ForeignKey('workflows.id', ondelete='CASCADE'), nullable=False)
    
    name = Column(String(255), nullable=False)
    description = Column(Text)
    type = Column(String(50), nullable=False, index=True)
    
    # JSONB配置
    config = Column(JSONB, nullable=False, default={})
    input_mapping = Column(JSONB, default={})
    output_mapping = Column(JSONB, default={})
    position = Column(JSONB, default={"x": 0, "y": 0})
    
    is_enabled = Column(Boolean, default=True, index=True)
    
    # 关系
    workflow = relationship("Workflow", back_populates="nodes")
    outgoing_edges = relationship("Edge", foreign_keys="Edge.source_node_id", back_populates="source_node")
    incoming_edges = relationship("Edge", foreign_keys="Edge.target_node_id", back_populates="target_node")
    
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    def __repr__(self):
        return f"<Node(name='{self.name}', type='{self.type}')>"

2.4 边表(Edges)

边定义了节点之间的连接关系,构成工作流的DAG(有向无环图)。

-- 边表:定义节点之间的连接
CREATE TABLE edges (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workflow_id UUID NOT NULL REFERENCES workflows(id) ON DELETE CASCADE,
    
    -- 源节点和目标节点
    source_node_id UUID NOT NULL REFERENCES nodes(id) ON DELETE CASCADE,
    target_node_id UUID NOT NULL REFERENCES nodes(id) ON DELETE CASCADE,
    
    -- 边的类型
    type VARCHAR(20) DEFAULT 'default',  -- 'default', 'conditional', 'error'
    
    -- 条件配置(用于条件分支)
    condition JSONB DEFAULT '{}',
    -- 示例:
    -- {
    --   "expression": "{{output.status}} == 'success'",
    --   "operator": "equals",
    --   "value": "success"
    -- }
    
    -- 边的标签(用于UI显示)
    label VARCHAR(100),
    
    -- 审计字段
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    
    -- 约束:防止自环和重复边
    CONSTRAINT no_self_loop CHECK (source_node_id != target_node_id),
    CONSTRAINT unique_edge UNIQUE (workflow_id, source_node_id, target_node_id, type)
);

-- 索引
CREATE INDEX idx_edges_workflow ON edges(workflow_id);
CREATE INDEX idx_edges_source ON edges(source_node_id);
CREATE INDEX idx_edges_target ON edges(target_node_id);
CREATE INDEX idx_edges_type ON edges(type);

-- JSONB索引
CREATE INDEX idx_edges_condition ON edges USING GIN (condition);

SQLAlchemy模型:

class Edge(Base):
    __tablename__ = "edges"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    workflow_id = Column(UUID(as_uuid=True), ForeignKey('workflows.id', ondelete='CASCADE'), nullable=False)
    
    source_node_id = Column(UUID(as_uuid=True), ForeignKey('nodes.id', ondelete='CASCADE'), nullable=False)
    target_node_id = Column(UUID(as_uuid=True), ForeignKey('nodes.id', ondelete='CASCADE'), nullable=False)
    
    type = Column(String(20), default='default', index=True)
    condition = Column(JSONB, default={})
    label = Column(String(100))
    
    # 关系
    workflow = relationship("Workflow", back_populates="edges")
    source_node = relationship("Node", foreign_keys=[source_node_id], back_populates="outgoing_edges")
    target_node = relationship("Node", foreign_keys=[target_node_id], back_populates="incoming_edges")
    
    created_at = Column(DateTime, default=datetime.utcnow)
    
    def __repr__(self):
        return f"<Edge(source={self.source_node_id}, target={self.target_node_id})>"

2.5 执行记录表(Executions)

记录每次工作流的执行情况。

-- 执行记录表:记录工作流执行历史
CREATE TABLE executions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workflow_id UUID NOT NULL REFERENCES workflows(id) ON DELETE CASCADE,
    
    -- 冗余字段(提升查询性能)
    workflow_name VARCHAR(255),
    workflow_version INTEGER,
    
    -- 执行状态
    status VARCHAR(20) DEFAULT 'pending',  -- pending, running, success, failed, cancelled
    
    -- 触发信息
    trigger_type VARCHAR(50),  -- 'manual', 'webhook', 'schedule', 'api'
    trigger_data JSONB DEFAULT '{}',
    
    -- 执行上下文(存储工作流快照,防止工作流修改后历史记录不准确)
    workflow_snapshot JSONB NOT NULL,
    
    -- 输入输出
    input_data JSONB DEFAULT '{}',
    output_data JSONB DEFAULT '{}',
    
    -- 错误信息
    error_message TEXT,
    error_stack TEXT,
    
    -- 执行时间统计
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    duration_ms INTEGER,  -- 执行耗时(毫秒)
    
    -- 触发者
    triggered_by UUID REFERENCES users(id),
    
    -- 审计字段
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 索引优化
CREATE INDEX idx_executions_workflow ON executions(workflow_id);
CREATE INDEX idx_executions_status ON executions(status);
CREATE INDEX idx_executions_created_at ON executions(created_at DESC);
CREATE INDEX idx_executions_triggered_by ON executions(triggered_by);

-- 复合索引(常见查询场景)
CREATE INDEX idx_executions_workflow_status ON executions(workflow_id, status);
CREATE INDEX idx_executions_workflow_created ON executions(workflow_id, created_at DESC);

-- JSONB索引
CREATE INDEX idx_executions_trigger_data ON executions USING GIN (trigger_data);
CREATE INDEX idx_executions_input_data ON executions USING GIN (input_data);

SQLAlchemy模型:

class Execution(Base):
    __tablename__ = "executions"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    workflow_id = Column(UUID(as_uuid=True), ForeignKey('workflows.id', ondelete='CASCADE'), nullable=False)
    
    # 冗余字段
    workflow_name = Column(String(255))
    workflow_version = Column(Integer)
    
    status = Column(String(20), default='pending', index=True)
    
    trigger_type = Column(String(50))
    trigger_data = Column(JSONB, default={})
    
    # 快照和数据
    workflow_snapshot = Column(JSONB, nullable=False)
    input_data = Column(JSONB, default={})
    output_data = Column(JSONB, default={})
    
    # 错误信息
    error_message = Column(Text)
    error_stack = Column(Text)
    
    # 时间统计
    started_at = Column(DateTime)
    completed_at = Column(DateTime)
    duration_ms = Column(Integer)
    
    triggered_by = Column(UUID(as_uuid=True), ForeignKey('users.id'))
    
    # 关系
    workflow = relationship("Workflow", back_populates="executions")
    node_runs = relationship("NodeRun", back_populates="execution", cascade="all, delete-orphan")
    
    created_at = Column(DateTime, default=datetime.utcnow, index=True)
    
    def __repr__(self):
        return f"<Execution(workflow='{self.workflow_name}', status='{self.status}')>"

2.6 节点执行记录表(Node Runs)

记录单个节点的执行详情。

-- 节点执行记录表:记录每个节点的执行情况
CREATE TABLE node_runs (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    execution_id UUID NOT NULL REFERENCES executions(id) ON DELETE CASCADE,
    node_id UUID NOT NULL REFERENCES nodes(id) ON DELETE CASCADE,
    
    -- 节点信息快照
    node_name VARCHAR(255),
    node_type VARCHAR(50),
    
    -- 执行状态
    status VARCHAR(20) DEFAULT 'pending',  -- pending, running, success, failed, skipped
    
    -- 输入输出
    input_data JSONB DEFAULT '{}',
    output_data JSONB DEFAULT '{}',
    
    -- 错误信息
    error_message TEXT,
    error_stack TEXT,
    
    -- 重试信息
    retry_count INTEGER DEFAULT 0,
    max_retries INTEGER DEFAULT 3,
    
    -- 执行时间
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    duration_ms INTEGER,
    
    -- 审计字段
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 索引
CREATE INDEX idx_node_runs_execution ON node_runs(execution_id);
CREATE INDEX idx_node_runs_node ON node_runs(node_id);
CREATE INDEX idx_node_runs_status ON node_runs(status);
CREATE INDEX idx_node_runs_created_at ON node_runs(created_at DESC);

-- 复合索引
CREATE INDEX idx_node_runs_execution_status ON node_runs(execution_id, status);

SQLAlchemy模型:

class NodeRun(Base):
    __tablename__ = "node_runs"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    execution_id = Column(UUID(as_uuid=True), ForeignKey('executions.id', ondelete='CASCADE'), nullable=False)
    node_id = Column(UUID(as_uuid=True), ForeignKey('nodes.id', ondelete='CASCADE'), nullable=False)
    
    # 快照
    node_name = Column(String(255))
    node_type = Column(String(50))
    
    status = Column(String(20), default='pending', index=True)
    
    input_data = Column(JSONB, default={})
    output_data = Column(JSONB, default={})
    
    error_message = Column(Text)
    error_stack = Column(Text)
    
    retry_count = Column(Integer, default=0)
    max_retries = Column(Integer, default=3)
    
    started_at = Column(DateTime)
    completed_at = Column(DateTime)
    duration_ms = Column(Integer)
    
    # 关系
    execution = relationship("Execution", back_populates="node_runs")
    
    created_at = Column(DateTime, default=datetime.utcnow, index=True)
    
    def __repr__(self):
        return f"<NodeRun(node='{self.node_name}', status='{self.status}')>"

2.7 连接器表(Connectors)

管理第三方应用的连接配置。

-- 连接器表:管理第三方应用连接
CREATE TABLE connectors (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    
    -- 连接器基本信息
    name VARCHAR(255) NOT NULL,
    type VARCHAR(50) NOT NULL,  -- 'http', 'database', 'email', 'slack', etc.
    description TEXT,
    
    -- 连接器配置
    config JSONB NOT NULL DEFAULT '{}',
    -- 示例(Slack连接器):
    -- {
    --   "workspace": "my-workspace",
    --   "bot_token": "xoxb-xxx",
    --   "webhook_url": "https://hooks.slack.com/services/xxx"
    -- }
    
    -- 认证信息(加密存储)
    credentials JSONB DEFAULT '{}',
    
    -- 连接器状态
    is_active BOOLEAN DEFAULT TRUE,
    last_tested_at TIMESTAMP,
    test_status VARCHAR(20),  -- 'success', 'failed', 'pending'
    
    -- 所有者
    owner_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    
    -- 审计字段
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    created_by UUID REFERENCES users(id),
    updated_by UUID REFERENCES users(id)
);

-- 索引
CREATE INDEX idx_connectors_owner ON connectors(owner_id);
CREATE INDEX idx_connectors_type ON connectors(type);
CREATE INDEX idx_connectors_is_active ON connectors(is_active) WHERE is_active = TRUE;

-- JSONB索引
CREATE INDEX idx_connectors_config ON connectors USING GIN (config);

SQLAlchemy模型:

class Connector(Base):
    __tablename__ = "connectors"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    
    name = Column(String(255), nullable=False)
    type = Column(String(50), nullable=False, index=True)
    description = Column(Text)
    
    config = Column(JSONB, nullable=False, default={})
    credentials = Column(JSONB, default={})  # 需要加密
    
    is_active = Column(Boolean, default=True, index=True)
    last_tested_at = Column(DateTime)
    test_status = Column(String(20))
    
    owner_id = Column(UUID(as_uuid=True), ForeignKey('users.id', ondelete='CASCADE'), nullable=False)
    
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    created_by = Column(UUID(as_uuid=True), ForeignKey('users.id'))
    updated_by = Column(UUID(as_uuid=True), ForeignKey('users.id'))
    
    def __repr__(self):
        return f"<Connector(name='{self.name}', type='{self.type}')>"

🚀 三、PostgreSQL JSONB字段的高级用法

3.1 为什么选择JSONB?

JSONB的优势:

  1. 灵活性:无需预定义schema,适合存储动态配置
  2. 性能:二进制存储,查询速度快
  3. 索引支持:GIN索引支持高效查询
  4. 操作符丰富:支持JSON路径查询、包含检查等

3.2 JSONB查询示例

-- 1. 基本查询:查找特定类型的触发器
SELECT * FROM workflows 
WHERE trigger_config->>'type' = 'webhook';

-- 2. 嵌套查询:查找使用POST方法的HTTP节点
SELECT * FROM nodes 
WHERE config->'method' = '"POST"';

-- 3. 包含检查:查找包含特定配置的工作流
SELECT * FROM workflows 
WHERE trigger_config @> '{"type": "webhook"}';

-- 4. 数组操作:查找headers中包含Authorization的节点
SELECT * FROM nodes 
WHERE config->'headers' ? 'Authorization';

-- 5. JSON路径查询(PostgreSQL 12+)
SELECT * FROM nodes 
WHERE config @@ '$.headers.Authorization like_regex "Bearer.*"';

-- 6. 更新JSONB字段
UPDATE workflows 
SET settings = jsonb_set(
    settings, 
    '{timeout}', 
    '7200'
) 
WHERE id = 'xxx';

-- 7. 删除JSONB键
UPDATE nodes 
SET config = config - 'deprecated_field' 
WHERE type = 'http_request';

-- 8. 合并JSONB对象
UPDATE workflows 
SET settings = settings || '{"new_option": true}'::jsonb 
WHERE id = 'xxx';

3.3 JSONB索引策略

-- 1. GIN索引(通用索引,支持所有JSONB操作符)
CREATE INDEX idx_workflows_trigger_config ON workflows USING GIN (trigger_config);

-- 2. 表达式索引(针对特定字段)
CREATE INDEX idx_workflows_trigger_type ON workflows USING GIN ((trigger_config->'type'));

-- 3. 部分索引(只索引特定条件的行)
CREATE INDEX idx_active_webhooks ON workflows USING GIN (trigger_config) 
WHERE trigger_config->>'type' = 'webhook' AND status = 'active';

-- 4. 复合索引(结合普通字段和JSONB字段)
CREATE INDEX idx_workflow_status_trigger ON workflows(status, (trigger_config->>'type'));

性能对比测试:

import time
from sqlalchemy import create_engine, text

engine = create_engine("postgresql://...")

# 测试1:无索引查询
start = time.time()
with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT * FROM workflows 
        WHERE trigger_config->>'type' = 'webhook'
    """))
    rows = result.fetchall()
print(f"无索引: {time.time() - start:.3f}秒, {len(rows)}行")
# 输出:无索引: 2.456秒, 1000行

# 测试2:GIN索引查询
start = time.time()
with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT * FROM workflows 
        WHERE trigger_config @> '{"type": "webhook"}'
    """))
    rows = result.fetchall()
print(f"GIN索引: {time.time() - start:.3f}秒, {len(rows)}行")
# 输出:GIN索引: 0.023秒, 1000行(性能提升100倍+)

📊 四、数据库索引优化策略

4.1 索引设计原则

1. 为高频查询字段创建索引

-- 分析查询日志,找出高频查询
-- 示例:查询最近7天的执行记录
EXPLAIN ANALYZE
SELECT * FROM executions 
WHERE workflow_id = 'xxx' 
  AND created_at > NOW() - INTERVAL '7 days'
ORDER BY created_at DESC 
LIMIT 20;

-- 优化:创建复合索引
CREATE INDEX idx_executions_workflow_created 
ON executions(workflow_id, created_at DESC);

2. 避免过度索引

-- ❌ 错误:为每个字段都创建索引
CREATE INDEX idx_workflows_name ON workflows(name);
CREATE INDEX idx_workflows_description ON workflows(description);
CREATE INDEX idx_workflows_status ON workflows(status);
CREATE INDEX idx_workflows_version ON workflows(version);
-- 索引过多会降低写入性能

-- ✅ 正确:只为高频查询字段创建索引
CREATE INDEX idx_workflows_status ON workflows(status);
CREATE INDEX idx_workflows_owner_status ON workflows(owner_id, status);

3. 使用部分索引减少索引大小

-- 只索引活跃的工作流(减少索引大小70%)
CREATE INDEX idx_active_workflows 
ON workflows(owner_id, created_at DESC) 
WHERE status = 'active';

-- 只索引失败的执行记录
CREATE INDEX idx_failed_executions 
ON executions(workflow_id, created_at DESC) 
WHERE status = 'failed';

4.2 索引监控与维护

-- 1. 查看表的索引使用情况
SELECT 
    schemaname,
    tablename,
    indexname,
    idx_scan,  -- 索引扫描次数
    idx_tup_read,  -- 索引读取行数
    idx_tup_fetch  -- 索引获取行数
FROM pg_stat_user_indexes 
WHERE schemaname = 'public'
ORDER BY idx_scan DESC;

-- 2. 查找未使用的索引
SELECT 
    schemaname,
    tablename,
    indexname
FROM pg_stat_user_indexes 
WHERE idx_scan = 0 
  AND indexname NOT LIKE '%_pkey';

-- 3. 查看索引大小
SELECT 
    tablename,
    indexname,
    pg_size_pretty(pg_relation_size(indexrelid)) AS index_size
FROM pg_stat_user_indexes 
WHERE schemaname = 'public'
ORDER BY pg_relation_size(indexrelid) DESC;

-- 4. 重建索引(定期维护)
REINDEX INDEX CONCURRENTLY idx_workflows_owner;

🔧 五、Alembic数据库迁移

5.1 初始化Alembic

cd backend

# 初始化Alembic
alembic init alembic

# 目录结构
# alembic/
# ├── versions/
# ├── env.py
# ├── script.py.mako
# └── README

5.2 配置Alembic

编辑 alembic/env.py

from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
import os
import sys

# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))

from app.core.config import settings
from app.models.base import Base  # 导入所有模型

# Alembic配置
config = context.config

# 从环境变量读取数据库URL
config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)

# 设置target_metadata
target_metadata = Base.metadata

def run_migrations_offline():
    """离线模式运行迁移"""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online():
    """在线模式运行迁移"""
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(
            connection=connection, 
            target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

5.3 创建初始迁移

# 生成迁移脚本
alembic revision --autogenerate -m "Initial migration: create core tables"

# 查看生成的迁移文件
# alembic/versions/xxxx_initial_migration.py

生成的迁移脚本示例:

"""Initial migration: create core tables

Revision ID: 001_initial
Revises: 
Create Date: 2025-12-06 10:00:00.000000

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers
revision = '001_initial'
down_revision = None
branch_labels = None
depends_on = None

def upgrade():
    # 创建users表
    op.create_table(
        'users',
        sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
        sa.Column('email', sa.String(length=255), nullable=False),
        sa.Column('username', sa.String(length=100), nullable=False),
        sa.Column('hashed_password', sa.String(length=255), nullable=False),
        sa.Column('full_name', sa.String(length=255)),
        sa.Column('is_active', sa.Boolean(), default=True),
        sa.Column('is_superuser', sa.Boolean(), default=False),
        sa.Column('created_at', sa.DateTime(), nullable=False),
        sa.Column('updated_at', sa.DateTime()),
        sa.Column('last_login_at', sa.DateTime()),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('email'),
        sa.UniqueConstraint('username')
    )
    op.create_index('idx_users_email', 'users', ['email'])
    op.create_index('idx_users_username', 'users', ['username'])
    
    # 创建workflows表
    op.create_table(
        'workflows',
        sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
        sa.Column('name', sa.String(length=255), nullable=False),
        sa.Column('description', sa.Text()),
        sa.Column('status', sa.String(length=20), default='draft'),
        sa.Column('trigger_config', postgresql.JSONB(), nullable=False),
        sa.Column('settings', postgresql.JSONB()),
        sa.Column('version', sa.Integer(), default=1),
        sa.Column('is_latest', sa.Boolean(), default=True),
        sa.Column('parent_workflow_id', postgresql.UUID(as_uuid=True)),
        sa.Column('total_executions', sa.Integer(), default=0),
        sa.Column('successful_executions', sa.Integer(), default=0),
        sa.Column('failed_executions', sa.Integer(), default=0),
        sa.Column('last_execution_at', sa.DateTime()),
        sa.Column('owner_id', postgresql.UUID(as_uuid=True), nullable=False),
        sa.Column('created_at', sa.DateTime(), nullable=False),
        sa.Column('updated_at', sa.DateTime()),
        sa.Column('created_by', postgresql.UUID(as_uuid=True)),
        sa.Column('updated_by', postgresql.UUID(as_uuid=True)),
        sa.ForeignKeyConstraint(['owner_id'], ['users.id'], ondelete='CASCADE'),
        sa.ForeignKeyConstraint(['parent_workflow_id'], ['workflows.id']),
        sa.PrimaryKeyConstraint('id')
    )
    op.create_index('idx_workflows_owner', 'workflows', ['owner_id'])
    op.create_index('idx_workflows_status', 'workflows', ['status'])
    
    # 创建GIN索引
    op.execute("""
        CREATE INDEX idx_workflows_trigger_type 
        ON workflows USING GIN ((trigger_config->'type'))
    """)
    
    # ... 其他表的创建(nodes, edges, executions等)

def downgrade():
    # 删除表(逆序)
    op.drop_table('node_runs')
    op.drop_table('executions')
    op.drop_table('edges')
    op.drop_table('nodes')
    op.drop_table('connectors')
    op.drop_table('workflows')
    op.drop_table('users')

5.4 执行迁移

# 查看当前版本
alembic current

# 查看迁移历史
alembic history

# 升级到最新版本
alembic upgrade head

# 降级到上一个版本
alembic downgrade -1

# 升级到特定版本
alembic upgrade 001_initial

# 查看SQL(不执行)
alembic upgrade head --sql

🧪 六、测试数据生成

创建 backend/scripts/seed_data.py

"""
测试数据生成脚本
"""
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from app.models import User, Workflow, Node, Edge, Connector
from app.core.security import get_password_hash
import uuid
from datetime import datetime

# 数据库连接
DATABASE_URL = "postgresql+asyncpg://quantumflow:quantumflow_dev_password@localhost:5432/quantumflow"

async def seed_users(session: AsyncSession):
    """创建测试用户"""
    users = [
        User(
            email="admin@quantumflow.com",
            username="admin",
            hashed_password=get_password_hash("admin123"),
            full_name="Admin User",
            is_superuser=True
        ),
        User(
            email="demo@quantumflow.com",
            username="demo",
            hashed_password=get_password_hash("demo123"),
            full_name="Demo User"
        )
    ]
    
    for user in users:
        session.add(user)
    
    await session.commit()
    print(f"✅ 创建了 {len(users)} 个用户")
    return users

async def seed_connectors(session: AsyncSession, owner: User):
    """创建测试连接器"""
    connectors = [
        Connector(
            name="Slack Webhook",
            type="slack",
            description="发送消息到Slack频道",
            config={
                "webhook_url": "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXX"
            },
            owner_id=owner.id
        ),
        Connector(
            name="SendGrid Email",
            type="email",
            description="通过SendGrid发送邮件",
            config={
                "api_key": "SG.xxxxxxxxxxxx",
                "from_email": "noreply@quantumflow.com"
            },
            owner_id=owner.id
        )
    ]
    
    for connector in connectors:
        session.add(connector)
    
    await session.commit()
    print(f"✅ 创建了 {len(connectors)} 个连接器")
    return connectors

async def seed_workflows(session: AsyncSession, owner: User):
    """创建测试工作流"""
    # 工作流1:Webhook触发的Slack通知
    workflow1 = Workflow(
        name="新用户注册通知",
        description="当新用户注册时,发送Slack通知",
        status="active",
        trigger_config={
            "type": "webhook",
            "config": {
                "method": "POST",
                "path": "/webhook/new-user"
            }
        },
        settings={
            "timeout": 60,
            "retry_policy": {
                "max_retries": 3,
                "retry_delay": 10
            }
        },
        owner_id=owner.id
    )
    session.add(workflow1)
    await session.flush()  # 获取workflow1.id
    
    # 创建节点
    trigger_node = Node(
        workflow_id=workflow1.id,
        name="Webhook触发器",
        type="trigger",
        config={
            "connector": "webhook",
            "method": "POST"
        },
        position={"x": 100, "y": 100}
    )
    
    transform_node = Node(
        workflow_id=workflow1.id,
        name="数据转换",
        type="transform",
        config={
            "script": """
                output = {
                    "message": f"新用户注册: {input['username']} ({input['email']})",
                    "timestamp": datetime.now().isoformat()
                }
            """
        },
        position={"x": 300, "y": 100}
    )
    
    slack_node = Node(
        workflow_id=workflow1.id,
        name="发送Slack消息",
        type="action",
        config={
            "connector": "slack",
            "channel": "#notifications",
            "message": "{{input.message}}"
        },
        position={"x": 500, "y": 100}
    )
    
    session.add_all([trigger_node, transform_node, slack_node])
    await session.flush()
    
    # 创建边
    edge1 = Edge(
        workflow_id=workflow1.id,
        source_node_id=trigger_node.id,
        target_node_id=transform_node.id,
        type="default"
    )
    
    edge2 = Edge(
        workflow_id=workflow1.id,
        source_node_id=transform_node.id,
        target_node_id=slack_node.id,
        type="default"
    )
    
    session.add_all([edge1, edge2])
    
    await session.commit()
    print(f"✅ 创建了工作流: {workflow1.name}")
    return [workflow1]

async def main():
    """主函数"""
    engine = create_async_engine(DATABASE_URL, echo=True)
    async_session = sessionmaker(
        engine, class_=AsyncSession, expire_on_commit=False
    )
    
    async with async_session() as session:
        print("🌱 开始生成测试数据...")
        
        # 创建用户
        users = await seed_users(session)
        admin_user = users[0]
        
        # 创建连接器
        await seed_connectors(session, admin_user)
        
        # 创建工作流
        await seed_workflows(session, admin_user)
        
        print("✅ 测试数据生成完成!")

if __name__ == "__main__":
    asyncio.run(main())

运行脚本:

cd backend
python scripts/seed_data.py

📈 七、性能优化建议

7.1 查询优化

# ❌ N+1查询问题
workflows = session.query(Workflow).all()
for workflow in workflows:
    print(workflow.owner.username)  # 每次循环都查询数据库

# ✅ 使用joinedload预加载关联数据
from sqlalchemy.orm import joinedload

workflows = session.query(Workflow)\
    .options(joinedload(Workflow.owner))\
    .all()
for workflow in workflows:
    print(workflow.owner.username)  # 不再查询数据库

7.2 批量操作

# ❌ 逐条插入
for i in range(1000):
    node = Node(name=f"Node {i}", ...)
    session.add(node)
    session.commit()  # 1000次数据库往返

# ✅ 批量插入
nodes = [Node(name=f"Node {i}", ...) for i in range(1000)]
session.bulk_save_objects(nodes)
session.commit()  # 1次数据库往返

7.3 连接池配置

from sqlalchemy import create_engine

engine = create_engine(
    DATABASE_URL,
    pool_size=10,  # 连接池大小
    max_overflow=20,  # 最大溢出连接数
    pool_pre_ping=True,  # 连接前测试可用性
    pool_recycle=3600  # 1小时回收连接
)

💡 本文小结

核心要点回顾:

  1. 数据模型设计

    • 7张核心表:Users、Workflows、Nodes、Edges、Executions、NodeRuns、Connectors
    • 使用JSONB存储灵活配置,平衡灵活性与性能
    • 适度反规范化,提升查询性能
  2. JSONB字段妙用

    • 存储动态配置(节点config、工作流settings)
    • GIN索引支持高效查询
    • 丰富的操作符(@>?->等)
  3. 索引优化策略

    • 为高频查询字段创建索引
    • 使用复合索引优化多条件查询
    • 部分索引减少索引大小
    • 定期监控和维护索引
  4. Alembic迁移

    • 版本化管理数据库schema
    • 支持升级和回滚
    • 团队协作友好

下一步预告:

在下一篇《后端API框架搭建:FastAPI + SQLAlchemy最佳实践》中,我们将:

  • 设计完整的RESTful API
  • 实现工作流的CRUD操作
  • 集成JWT认证
  • 编写API文档

📦 本文资源

代码文件:

  • models/ - 完整的SQLAlchemy模型
  • alembic/versions/001_initial.py - 初始迁移脚本
  • scripts/seed_data.py - 测试数据生成脚本
  • ER图.pdf - 实体关系图

获取方式:
所有代码已整理在文章附件中,可直接下载使用。


🤔 思考题

  1. JSONB vs 关系表:在什么情况下应该使用JSONB,什么情况下应该拆分成关系表?

  2. 索引权衡:为什么不为所有字段都创建索引?索引的代价是什么?

  3. 数据快照:为什么要在executions表中存储workflow_snapshot?有什么替代方案?

欢迎在评论区分享你的思考!


作者:DREAMVFIA
专注于企业级应用架构设计与工作流自动化技术

版权声明:
本文为QuantumFlow专栏原创内容,遵循MIT开源协议。欢迎转载,请注明出处。


最后更新时间:2025-12-10

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐