📋 执行摘要

🎯 核心改进

本方案基于用户提出的2-Agent架构进行深度优化,提出 双系统协作架构

  • 🔵 分析系统: Supervisor + 3专业Agent + Memory Manager (4+1架构)
  • 🟡 配置系统: Config Assistant Agent + 4个AI工具

全面解决稳定性、性能、Token管理和配置优化四大核心问题。

🏗️ 架构对比一览

维度 用户方案(2-Agent) 优化方案(5 Multi-Agent) 提升幅度
稳定性 ⚠️ Agent1职责过重(5步骤) ✅ 单一职责+故障隔离 ⭐⭐⭐⭐⭐
性能 ⚠️ 部分并发 ✅ 高度并发+分层LLM ⭐⭐⭐⭐
Token管理 ⚠️ 粗粒度管理 ✅ 分层预算+智能压缩 ⭐⭐⭐⭐⭐
配置管理 ❌ 纯人工 ✅ AI辅助优化 ⭐⭐⭐⭐⭐
断点恢复 ❌ 不支持 ✅ Checkpointing机制 ⭐⭐⭐⭐⭐
可扩展性 ⚠️ 一般 ✅ 模块化+易扩展 ⭐⭐⭐⭐

🤖 完整多Agent架构设计

在这里插入图片描述

💎 关键技术特性

特性 说明 应用场景
🎯 Supervisor Pattern 监督者模式智能调度 分析系统
🤖 Config Assistant AI辅助配置优化 配置系统
💾 Checkpointing 自动断点+故障恢复 分析系统
🧠 Memory Management 三级缓存 两个系统
📊 Token Budget 分层预算(16K) 分析系统
Smart Compression 智能压缩 两个系统
🔄 SubGraph Agent内部子工作流 两个系统

📈 预期收益

  • 💰 成本优化: Token智能压缩 + 配置优化 → 降低40-50%
  • 响应加速: Checkpoint恢复 + 并发执行 → 提升60%
  • 🛡️ 稳定增强: 故障隔离 + 自动重试 → 可用性提升80%
  • 🎯 配置质量: AI辅助优化 + 冲突检测 → 错误率降低90%
  • 📈 灵活扩展: 模块化设计 → 新Agent 10分钟接入

🗓️ 实施计划 (8-9周)

Phase 1 (2周)    → 多Agent框架 + Supervisor
Phase 2 (1.5周)  → Token管理器 + 智能压缩
Phase 3 (1.5周)  → Checkpointing + Memory
Phase 4 (2周)    → Config Assistant Agent
Phase 5 (2周)    → 集成测试 + 性能调优 + 灰度上线

📋 目录


1. 架构分析与改进建议

1.1 原架构思路分析

✅ 优点
  1. 高度模块化: 场景分类、数据源、工作流分离清晰
  2. 配置驱动: 通过数据库配置实现灵活调整
  3. 两阶段查询: 数据源提示词的两阶段查询设计巧妙,减少token消耗
  4. 结果追踪: 完整的结果记录机制
⚠️ 需要完善的点
  1. 缓存策略: 配置预加载后需要缓存失效机制
  2. 并发控制: 多场景并发分析时的资源管理
  3. 容错机制: LLM调用失败、数据源超时的降级策略
  4. 监控指标: 性能监控、成本追踪、质量评估
  5. 版本管理: 提示词版本控制和A/B测试能力
  6. 配置管理: 缺少智能配置优化和验证机制

1.2 架构改进建议

🎯 核心改进
  1. 双系统架构 - 分析系统 + 配置系统独立运行
  2. 多Agent协作 - 5个专业化Agent分工明确
  3. 上下文管理机制 - 使用LangGraph Memory避免token溢出
  4. 智能Token预算 - 动态调整提示词大小,防止信息丢失
  5. AI辅助配置 - Config Assistant Agent智能优化配置
  6. 提示词版本管理 - 支持多版本提示词并存
  7. 智能缓存策略 - 多级缓存(内存+Redis)
  8. 实时监控系统 - 全链路追踪与质量监控
🤖 为什么需要Config Assistant Agent?

问题场景:

管理员修改场景配置:
  thinking_steps: 3 → 5
  max_queries_per_step: 5 → 10
  max_tokens: 4000 (未改)

❌ 传统方式: 直接保存 → 运行时Token不足崩溃

✅ AI检测: "Token预算不足!5步×10查询需要8000 tokens
           建议: 增加到8000或启用智能压缩"

核心价值:

  • ✅ 提示词智能优化 (Token压缩45%)
  • ✅ 配置冲突自动检测 (避免90%错误)
  • ✅ 参数智能推荐 (基于历史数据)
  • ✅ 性能分析和优化建议

2. 完整系统架构设计

2.1 双系统全景架构图

在这里插入图片描述

2.2 Agent协作流程图

分析请求
配置请求
Step 1
超出
通过
Step 2
Step 3
有建议/冲突
无问题
用户请求
请求类型
Supervisor Agent
任务分解
Config Assistant Agent
Planning Agent
规划任务
场景识别
加载场景配置
提示词选择
二级查询
查询规划
Token预算检查
提示词压缩
规划完成
保存到Memory
Execution Agent
执行任务
从Memory加载计划
并发查询执行
API查询
MCP查询
数据库查询
收集结果
保存到Memory
Analysis Agent
分析任务
从Memory加载结果
综合分析
LLM研判
格式化输出
持久化存储
返回分析结果
配置分析
智能优化
AI工具集
验证结果
生成建议报告
等待人工确认
应用配置
失效缓存
通知分析系统
返回配置结果

2.3 双系统协作模式对比

维度 分析系统 配置系统
用户角色 安全分析师、运维人员 AI工程师、系统管理员
使用频率 高频(每分钟多次) 低频(每天几次)
实时性要求 实时(秒级响应) 异步(分钟级处理)
LLM模型 分层(gpt-3.5/4o-mini/4) 主要用gpt-4
资源需求 CPU密集+LLM密集 LLM密集
部署策略 多副本+负载均衡 单副本即可
故障影响 直接影响业务 不影响分析流程
交互方式 同步API调用 异步任务+通知

3. 分析系统核心模块设计

3.1 Supervisor Agent (协调者Agent)

职责
  • 任务分解与调度
  • Agent生命周期管理
  • 流程控制与异常处理
  • 全局状态协调
核心实现
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver

class SupervisorAgent:
    """
    Supervisor Agent - 协调多个专业化Agent的执行
    
    使用LangGraph的Supervisor Pattern实现
    """
    
    def __init__(self):
        self.llm = self._create_llm()
        self.checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
        self.agents = {
            "planning": PlanningAgent(),
            "execution": ExecutionAgent(),
            "analysis": AnalysisAgent()
        }
    
    def build_graph(self) -> StateGraph:
        """构建Supervisor工作流图"""
        workflow = StateGraph(MultiAgentState)
        
        # 添加Agent节点
        workflow.add_node("supervisor", self.supervisor_node)
        workflow.add_node("planning_agent", self.agents["planning"].run)
        workflow.add_node("execution_agent", self.agents["execution"].run)
        workflow.add_node("analysis_agent", self.agents["analysis"].run)
        
        # 设置路由逻辑
        workflow.add_conditional_edges(
            "supervisor",
            self.route_decision,
            {
                "planning": "planning_agent",
                "execution": "execution_agent",
                "analysis": "analysis_agent",
                "FINISH": END
            }
        )
        
        # 所有Agent完成后返回Supervisor
        workflow.add_edge("planning_agent", "supervisor")
        workflow.add_edge("execution_agent", "supervisor")
        workflow.add_edge("analysis_agent", "supervisor")
        
        workflow.set_entry_point("supervisor")
        
        return workflow.compile(checkpointer=self.checkpointer)
    
    async def supervisor_node(self, state: MultiAgentState):
        """
        Supervisor节点 - 决定下一步调用哪个Agent
        """
        # 使用LLM分析当前状态,决定下一步
        messages = [
            SystemMessage(content=self._get_supervisor_prompt()),
            HumanMessage(content=f"当前状态: {state}")
        ]
        
        response = await self.llm.ainvoke(messages)
        next_agent = self._parse_next_agent(response)
        
        return {"next_agent": next_agent}
    
    def route_decision(self, state: MultiAgentState) -> str:
        """路由决策"""
        return state.get("next_agent", "FINISH")

3.2 Planning Agent (规划者Agent)

职责
  • 场景识别与分类
  • 查询计划生成
  • 提示词智能选择(二级查询)
  • Token预算管理
核心实现
class PlanningAgent:
    """
    Planning Agent - 负责分析和规划
    
    使用LangGraph的SubGraph功能实现内部工作流
    """
    
    def __init__(self):
        self.llm = self._create_llm()
        self.config_manager = ConfigManager()
        self.prompt_manager = PromptManager()
        self.token_budget = TokenBudgetManager()
    
    async def run(self, state: MultiAgentState) -> MultiAgentState:
        """执行规划任务"""
        
        # Step 1: 场景识别
        scenario = await self.identify_scenario(state["request"])
        
        # Step 2: 加载场景配置
        config = await self.config_manager.get_scenario_config(scenario)
        
        # Step 3: 提示词二级查询
        prompt_templates = await self.prompt_manager.two_stage_query(
            scenario, state
        )
        
        # Step 4: Token预算检查与优化
        optimized_prompts = await self.token_budget.optimize(
            prompt_templates,
            max_tokens=config.max_tokens
        )
        
        # Step 5: 查询计划生成
        query_plan = await self.generate_query_plan(
            scenario, config, optimized_prompts, state
        )
        
        # 更新状态
        state.update({
            "scenario": scenario,
            "config": config,
            "prompts": optimized_prompts,
            "query_plan": query_plan,
            "planning_complete": True
        })
        
        return state

3.3 Execution Agent (执行者Agent)

职责
  • 并发查询执行
  • 结果收集与验证
  • 重试与降级处理
  • 查询性能监控
核心实现
class ExecutionAgent:
    """
    Execution Agent - 负责执行查询
    
    专注于查询执行,不涉及LLM调用,提升性能
    """
    
    def __init__(self):
        self.query_executor = QueryExecutor()
        self.semaphore = asyncio.Semaphore(10)  # 并发控制
    
    async def run(self, state: MultiAgentState) -> MultiAgentState:
        """执行查询任务"""
        
        # 从Memory加载查询计划
        query_plan = state["query_plan"]
        
        # 并发执行查询
        results = await self.execute_queries_parallel(query_plan)
        
        # 结果验证
        validated_results = self.validate_results(results)
        
        # 更新状态
        state.update({
            "query_results": validated_results,
            "execution_complete": True,
            "total_queries": len(results),
            "successful_queries": len(validated_results)
        })
        
        return state
    
    async def execute_queries_parallel(
        self, 
        query_plan: List[QueryPlan]
    ) -> List[QueryResult]:
        """并发执行查询(带限流)"""
        tasks = []
        for plan in query_plan:
            task = self.execute_with_semaphore(plan)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return self.handle_results(results)
    
    async def execute_with_semaphore(self, plan: QueryPlan):
        """带并发控制的查询执行"""
        async with self.semaphore:
            return await self.query_executor.execute(plan)

3.4 Analysis Agent (分析者Agent)

职责
  • 结果综合分析
  • LLM研判生成
  • 报告格式化
  • 结果持久化
核心实现
class AnalysisAgent:
    """
    Analysis Agent - 负责结果分析和研判
    
    使用LLM进行深度分析,生成最终报告
    """
    
    def __init__(self):
        self.llm = self._create_llm()
        self.token_budget = TokenBudgetManager()
        self.result_storage = ResultStorage()
    
    async def run(self, state: MultiAgentState) -> MultiAgentState:
        """执行分析任务"""
        
        # 从Memory加载查询结果
        query_results = state["query_results"]
        scenario_config = state["config"]
        
        # Token预算管理 - 压缩查询结果
        compressed_results = await self.token_budget.compress_results(
            query_results,
            max_tokens=scenario_config.max_result_tokens
        )
        
        # LLM综合分析
        analysis = await self.synthesize_analysis(
            compressed_results,
            scenario_config
        )
        
        # 格式化输出
        formatted_result = self.format_output(
            analysis,
            scenario_config.output_format
        )
        
        # 持久化存储
        result_id = await self.result_storage.save(
            scenario=state["scenario"],
            request=state["request"],
            analysis=analysis,
            result=formatted_result,
            metadata=state["metadata"]
        )
        
        # 更新状态
        state.update({
            "analysis": analysis,
            "result": formatted_result,
            "result_id": result_id,
            "analysis_complete": True
        })
        
        return state

3.5 Memory Manager (上下文管理器)

职责
  • Agent间状态共享
  • 上下文持久化(Checkpointing)
  • Token预算管理
  • 历史记录管理
核心实现
from langgraph.checkpoint import MemorySaver, SqliteSaver

class MemoryManager:
    """
    Memory Manager - 管理Agent间的共享状态
    
    使用LangGraph的Checkpointing机制
    """
    
    def __init__(self):
        # 内存缓存(快速访问)
        self.memory_saver = MemorySaver()
        
        # 持久化存储(可恢复)
        self.sqlite_saver = SqliteSaver.from_conn_string(
            "checkpoints.db"
        )
        
        self.token_budget = TokenBudgetManager()
    
    async def save_state(
        self, 
        thread_id: str, 
        state: MultiAgentState
    ):
        """保存状态到内存和持久化存储"""
        # 内存缓存
        await self.memory_saver.aput(thread_id, state)
        
        # 持久化
        await self.sqlite_saver.aput(thread_id, state)
    
    async def load_state(
        self, 
        thread_id: str
    ) -> MultiAgentState:
        """加载状态(优先从内存)"""
        # 先从内存加载
        state = await self.memory_saver.aget(thread_id)
        
        if state is None:
            # 从持久化存储恢复
            state = await self.sqlite_saver.aget(thread_id)
        
        return state

3.6 Token预算管理器

核心实现
import tiktoken

class TokenBudgetManager:
    """
    Token预算管理器 - 防止Token溢出
    
    关键特性:
    1. 动态提示词压缩
    2. 结果智能摘要
    3. 分层Token预算
    """
    
    def __init__(self):
        self.encoder = tiktoken.encoding_for_model("gpt-4")
        self.llm = self._create_summarizer()  # 使用快速模型做摘要
    
    def estimate_tokens(self, text: str) -> int:
        """估算文本的Token数量"""
        return len(self.encoder.encode(text))
    
    async def optimize(
        self, 
        prompt_templates: List[str],
        max_tokens: int = 8000
    ) -> List[str]:
        """
        优化提示词以适应Token预算
        
        策略:
        1. 计算总Token数
        2. 如果超出预算,进行压缩
        3. 优先保留核心提示词,压缩示例
        """
        total_tokens = sum(
            self.estimate_tokens(p) for p in prompt_templates
        )
        
        if total_tokens <= max_tokens:
            return prompt_templates
        
        # 需要压缩
        return await self._compress_prompts(
            prompt_templates,
            max_tokens
        )
    
    async def compress_results(
        self,
        results: List[QueryResult],
        max_tokens: int = 4000
    ) -> str:
        """
        压缩查询结果以适应Token预算
        
        策略:
        1. 提取关键信息
        2. 使用LLM生成摘要
        3. 保留原始数据引用
        """
        # 计算结果总Token数
        results_text = self._format_results(results)
        total_tokens = self.estimate_tokens(results_text)
        
        if total_tokens <= max_tokens:
            return results_text
        
        # 需要摘要
        summary = await self._llm_summarize(
            results_text,
            target_tokens=max_tokens
        )
        
        return summary

4. 配置系统核心模块设计

4.1 Config Assistant Agent (配置助手Agent)

职责
  • 配置变更分析
  • 智能优化建议
  • 冲突自动检测
  • 人工确认管理
架构图

在这里插入图片描述

核心实现
from langgraph.graph import StateGraph

class ConfigAssistantAgent:
    """
    Config Assistant Agent - 智能配置助手
    
    提供AI辅助的配置优化和验证
    """
    
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4", temperature=0)
        self.tools = {
            "prompt_optimizer": PromptOptimizerTool(),
            "conflict_detector": ConflictDetectorTool(),
            "param_recommender": ParamRecommenderTool(),
            "performance_analyzer": PerformanceAnalyzerTool()
        }
    
    def build_graph(self):
        """构建配置助手工作流"""
        workflow = StateGraph(ConfigState)
        
        # 添加节点
        workflow.add_node("analyze", self.analyze_config)
        workflow.add_node("optimize", self.optimize_config)
        workflow.add_node("validate", self.validate_config)
        workflow.add_node("recommend", self.recommend_changes)
        workflow.add_node("wait_approval", self.wait_for_approval)
        workflow.add_node("apply", self.apply_config)
        
        # 设置流程
        workflow.add_edge("analyze", "optimize")
        workflow.add_edge("optimize", "validate")
        workflow.add_conditional_edges(
            "validate",
            self.check_validation,
            {
                "conflicts": "recommend",
                "suggestions": "recommend",
                "ok": "apply"
            }
        )
        workflow.add_edge("recommend", "wait_approval")
        workflow.add_conditional_edges(
            "wait_approval",
            self.check_approval,
            {
                "approved": "apply",
                "rejected": END,
                "modified": "analyze"
            }
        )
        workflow.add_edge("apply", END)
        
        workflow.set_entry_point("analyze")
        
        return workflow.compile()
    
    async def analyze_config(self, state: ConfigState):
        """分析配置变更"""
        changes = state["changes"]
        
        # 解析配置类型
        config_type = self._identify_config_type(changes)
        
        # 加载相关上下文
        context = await self._load_context(config_type)
        
        state["config_type"] = config_type
        state["context"] = context
        
        return state
    
    async def optimize_config(self, state: ConfigState):
        """使用AI工具优化配置"""
        config_type = state["config_type"]
        changes = state["changes"]
        
        suggestions = []
        
        # 提示词优化
        if "prompt" in changes:
            prompt_suggestions = await self.tools["prompt_optimizer"].optimize(
                changes["prompt"]
            )
            suggestions.extend(prompt_suggestions)
        
        # 参数推荐
        if "scenario_config" in changes:
            param_suggestions = await self.tools["param_recommender"].recommend(
                changes["scenario_config"]
            )
            suggestions.extend(param_suggestions)
        
        state["suggestions"] = suggestions
        
        return state
    
    async def validate_config(self, state: ConfigState):
        """验证配置并检测冲突"""
        changes = state["changes"]
        context = state["context"]
        
        # 运行冲突检测
        conflicts = await self.tools["conflict_detector"].detect(
            changes, context
        )
        
        state["conflicts"] = conflicts
        state["validation_status"] = "conflicts" if conflicts else (
            "suggestions" if state.get("suggestions") else "ok"
        )
        
        return state

4.2 提示词优化器 (Prompt Optimizer Tool)

功能特性
  • Token计数与估算
  • 语义分析与理解
  • 智能压缩(保持语义)
  • Few-Shot示例优化
核心实现
class PromptOptimizerTool:
    """提示词智能优化工具"""
    
    async def optimize_prompt(
        self,
        original_prompt: str,
        scenario: str,
        constraints: PromptConstraints
    ) -> PromptOptimizationResult:
        """
        智能优化提示词
        
        优化策略:
        1. 语义分析 - 理解提示词意图
        2. Token压缩 - 在不丢失关键信息的情况下压缩
        3. 结构优化 - 调整提示词结构提升效果
        4. Few-Shot优化 - 选择最佳示例
        """
        
        # Step 1: 分析原始提示词
        analysis = await self.llm.ainvoke({
            "task": "analyze_prompt",
            "prompt": original_prompt,
            "scenario": scenario
        })
        
        # Step 2: 生成优化建议
        suggestions = []
        
        # 2.1 Token压缩建议
        if analysis.token_count > constraints.max_tokens:
            compressed = await self._compress_prompt(
                original_prompt,
                target_tokens=constraints.max_tokens
            )
            suggestions.append({
                "type": "compression",
                "original_tokens": analysis.token_count,
                "optimized_tokens": compressed.token_count,
                "optimized_prompt": compressed.text,
                "confidence": compressed.confidence
            })
        
        # 2.2 Few-Shot优化建议
        if analysis.has_few_shot:
            best_examples = await self._select_best_examples(
                scenario=scenario,
                current_examples=analysis.examples,
                max_examples=constraints.max_examples
            )
            suggestions.append({
                "type": "few_shot",
                "recommended_examples": best_examples,
                "reason": "基于历史效果选择最佳示例"
            })
        
        return PromptOptimizationResult(
            original_prompt=original_prompt,
            suggestions=suggestions,
            estimated_improvement=self._estimate_improvement(suggestions)
        )

4.3 冲突检测器 (Conflict Detector Tool)

检测类型
  1. 逻辑冲突 - 配置逻辑矛盾
  2. 依赖冲突 - 依赖关系不满足
  3. 覆盖冲突 - 多个配置覆盖同一场景
  4. 性能冲突 - 配置导致性能问题(如Token不足)
核心实现
class ConflictDetectorTool:
    """配置冲突智能检测"""
    
    async def detect_conflicts(
        self,
        config_changes: List[ConfigChange]
    ) -> ConflictReport:
        """
        检测配置冲突
        """
        
        conflicts = []
        
        # 1. 逻辑冲突检测
        logic_conflicts = await self._check_logic_conflicts(config_changes)
        conflicts.extend(logic_conflicts)
        
        # 2. 依赖冲突检测
        dependency_conflicts = await self._check_dependencies(config_changes)
        conflicts.extend(dependency_conflicts)
        
        # 3. Token预算冲突检测
        token_conflicts = await self._check_token_budget(config_changes)
        conflicts.extend(token_conflicts)
        
        # 4. 使用LLM进行深度分析
        if conflicts:
            ai_analysis = await self.llm.ainvoke({
                "task": "analyze_conflicts",
                "conflicts": conflicts,
                "context": config_changes
            })
            
            # AI提供解决建议
            for conflict in conflicts:
                conflict.ai_suggestion = ai_analysis.get_suggestion(
                    conflict.id
                )
        
        return ConflictReport(
            conflicts=conflicts,
            severity=self._assess_severity(conflicts),
            recommended_actions=self._generate_actions(conflicts)
        )
    
    async def _check_token_budget(self, changes):
        """检测Token预算冲突"""
        conflicts = []
        
        for change in changes:
            if change.type == "scenario_config":
                # 估算Token使用
                estimated_tokens = self._estimate_scenario_tokens(
                    change.config
                )
                
                if estimated_tokens > change.config.get("max_tokens", 16000):
                    conflicts.append(TokenBudgetConflict(
                        severity="high",
                        description=f"Token预算不足: 估算{estimated_tokens}, 配置{change.config.get('max_tokens')}",
                        suggestion="增加max_tokens或减少thinking_steps/max_queries_per_step"
                    ))
        
        return conflicts

4.4 参数推荐器 (Param Recommender Tool)

功能特性
  • 基于历史数据分析
  • 多目标优化(成本/性能/平衡)
  • AI辅助推荐
  • 成本估算
核心实现
class ParamRecommenderTool:
    """参数智能推荐工具"""
    
    async def recommend_params(
        self,
        scenario_code: str,
        optimization_goal: str = "balanced"
    ) -> ParamRecommendation:
        """
        基于历史数据推荐最佳参数
        
        优化目标:
        - "cost": 最低成本
        - "performance": 最高性能
        - "balanced": 平衡
        """
        
        # 1. 加载历史数据
        history = await self.db.get_scenario_history(scenario_code)
        
        # 2. 分析最佳参数组合
        best_configs = self._analyze_best_performers(
            history,
            optimization_goal
        )
        
        # 3. 使用LLM生成推荐
        recommendation = await self.llm.ainvoke({
            "task": "recommend_params",
            "scenario": scenario_code,
            "goal": optimization_goal,
            "historical_data": best_configs
        })
        
        return ParamRecommendation(
            scenario=scenario_code,
            recommended_params={
                "llm_model": recommendation.model,
                "temperature": recommendation.temperature,
                "max_tokens": recommendation.max_tokens,
                "thinking_steps": recommendation.thinking_steps,
                "max_queries_per_step": recommendation.max_queries
            },
            reasoning=recommendation.reasoning,
            expected_metrics={
                "avg_cost": recommendation.expected_cost,
                "avg_response_time": recommendation.expected_time,
                "accuracy_rate": recommendation.expected_accuracy
            }
        )

4.5 Config Assistant工作流示例

管理员 Config API Config Assistant Agent 智能优化器 配置验证器 数据库 缓存 提交配置修改 1 调用Config Assistant 2 分析配置变更 生成优化建议 3 AI分析+优化 4 返回建议 5 冲突检测 6 逻辑检查+AI验证 7 返回检测结果 8 返回建议给管理员 9 展示建议 10 确认/修改后重新提交 11 再次验证 12 最终验证 13 验证通过 14 alt [发现冲突或有优化建议] [配置OK] 保存配置 15 保存成功 16 失效相关缓存 17 缓存已清空 18 配置更新成功 19 返回结果 20 通知分析系统 配置已更新 管理员 Config API Config Assistant Agent 智能优化器 配置验证器 数据库 缓存

5. 数据库设计

5.1 AI分析场景表 (ai_analysis_scenarios)

CREATE TABLE ai_analysis_scenarios (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    scenario_code VARCHAR(100) UNIQUE NOT NULL COMMENT '场景唯一标识',
    scenario_name VARCHAR(200) NOT NULL COMMENT '场景名称',
    description TEXT COMMENT '场景描述',
    category VARCHAR(50) COMMENT '场景分类: security/performance/business',
    
    -- 核心提示词
    core_prompt TEXT NOT NULL COMMENT '核心提示词模板',
    
    -- 返回数据格式
    output_format JSON COMMENT '返回数据的JSON Schema',
    
    -- 执行参数
    thinking_steps INT DEFAULT 3 COMMENT '思考步长',
    max_queries_per_step INT DEFAULT 5 COMMENT '每步最大查询次数',
    
    -- LLM配置
    llm_model VARCHAR(50) DEFAULT 'gpt-4o-mini' COMMENT 'LLM模型',
    llm_temperature DECIMAL(3,2) DEFAULT 0.7 COMMENT 'LLM温度',
    max_tokens INT DEFAULT 8000 COMMENT '最大Token数',
    max_result_tokens INT DEFAULT 4000 COMMENT '结果最大Token数',
    
    -- 提示词选择
    auto_select_prompts BOOLEAN DEFAULT TRUE COMMENT '是否自动选择数据源提示词',
    applicable_prompt_types JSON COMMENT '可用的提示词类型列表',
    
    -- 缓存配置
    cache_enabled BOOLEAN DEFAULT TRUE,
    cache_ttl INT DEFAULT 1800 COMMENT '缓存时间(秒)',
    
    -- 状态
    enabled BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    created_by VARCHAR(100),
    
    INDEX idx_scenario_code (scenario_code),
    INDEX idx_category (category),
    INDEX idx_enabled (enabled)
) COMMENT='AI分析场景配置表';

5.2 数据源提示词表 (datasource_prompts)

CREATE TABLE datasource_prompts (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    prompt_type VARCHAR(100) UNIQUE NOT NULL COMMENT '提示词类型: sls_log_query/mcp_threat_intel等',
    prompt_name VARCHAR(200) NOT NULL COMMENT '提示词名称',
    
    -- 类型描述(用于二级查询的第一阶段)
    type_description TEXT NOT NULL COMMENT '简短描述,用于LLM筛选',
    when_to_use TEXT COMMENT '使用场景说明',
    
    -- 详细提示词(二级查询的第二阶段)
    detailed_prompt TEXT NOT NULL COMMENT '详细的提示词内容',
    
    -- 参数Schema
    param_schema JSON COMMENT '查询参数的JSON Schema',
    
    -- 使用示例
    usage_examples JSON COMMENT 'Few-Shot示例',
    
    -- 适用场景
    applicable_scenarios JSON COMMENT '适用的场景列表',
    
    -- Token信息
    estimated_tokens INT COMMENT '估算的Token数量',
    
    -- 版本控制
    version VARCHAR(20) DEFAULT '1.0',
    is_latest BOOLEAN DEFAULT TRUE,
    
    -- 状态
    enabled BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    
    INDEX idx_prompt_type (prompt_type),
    INDEX idx_version (version, is_latest),
    INDEX idx_enabled (enabled)
) COMMENT='数据源提示词配置表';

5.3 AI分析结果表 (ai_analysis_results)

CREATE TABLE ai_analysis_results (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    request_id VARCHAR(100) UNIQUE NOT NULL COMMENT '请求唯一标识',
    
    -- 场景信息
    scenario_id BIGINT NOT NULL COMMENT '场景ID',
    scenario_code VARCHAR(100) NOT NULL COMMENT '场景代码',
    
    -- 请求信息
    request_data JSON COMMENT '原始请求数据',
    
    -- 执行过程
    planning_result JSON COMMENT 'Planning Agent的结果',
    execution_result JSON COMMENT 'Execution Agent的结果',
    analysis_result JSON COMMENT 'Analysis Agent的结果',
    
    -- 最终结果
    final_result JSON COMMENT '最终分析结果',
    
    -- 性能指标
    total_execution_time INT COMMENT '总执行时间(毫秒)',
    planning_time INT COMMENT 'Planning时间',
    execution_time INT COMMENT 'Execution时间',
    analysis_time INT COMMENT 'Analysis时间',
    
    -- Token使用
    total_tokens_used INT COMMENT '总Token消耗',
    planning_tokens INT COMMENT 'Planning Token',
    analysis_tokens INT COMMENT 'Analysis Token',
    estimated_cost DECIMAL(10,4) COMMENT '估算成本(美元)',
    
    -- 查询统计
    total_queries INT COMMENT '总查询次数',
    successful_queries INT COMMENT '成功查询次数',
    failed_queries INT COMMENT '失败查询次数',
    
    -- 状态
    status VARCHAR(50) COMMENT 'success/failed/partial',
    error_message TEXT COMMENT '错误信息',
    
    -- 时间
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    completed_at TIMESTAMP COMMENT '完成时间',
    
    INDEX idx_scenario_id (scenario_id),
    INDEX idx_scenario_code (scenario_code),
    INDEX idx_status (status),
    INDEX idx_created_at (created_at),
    FOREIGN KEY (scenario_id) REFERENCES ai_analysis_scenarios(id)
) COMMENT='AI分析结果记录表';

5.4 配置变更历史表 (config_change_history) - 新增

CREATE TABLE config_change_history (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    config_type VARCHAR(50) NOT NULL COMMENT '配置类型: scenario/prompt/llm',
    config_id BIGINT COMMENT '配置记录ID',
    
    -- 变更信息
    change_type VARCHAR(50) NOT NULL COMMENT '变更类型: create/update/delete',
    old_value JSON COMMENT '旧值',
    new_value JSON COMMENT '新值',
    change_summary TEXT COMMENT '变更摘要',
    
    -- AI分析
    ai_suggestions JSON COMMENT 'Config Assistant的优化建议',
    conflicts_detected JSON COMMENT '检测到的冲突',
    
    -- 审批信息
    approval_status VARCHAR(50) DEFAULT 'pending' COMMENT 'pending/approved/rejected',
    approved_by VARCHAR(100) COMMENT '审批人',
    approved_at TIMESTAMP COMMENT '审批时间',
    rejection_reason TEXT COMMENT '拒绝原因',
    
    -- 效果评估
    performance_before JSON COMMENT '变更前性能指标',
    performance_after JSON COMMENT '变更后性能指标',
    effectiveness_score DECIMAL(5,2) COMMENT '有效性评分',
    
    -- 元数据
    created_by VARCHAR(100) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    
    INDEX idx_config_type (config_type),
    INDEX idx_config_id (config_id),
    INDEX idx_approval_status (approval_status),
    INDEX idx_created_at (created_at)
) COMMENT='配置变更历史表';

5.5 配置效果评估表 (config_effectiveness) - 新增

CREATE TABLE config_effectiveness (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    scenario_code VARCHAR(100) NOT NULL,
    config_version VARCHAR(50) NOT NULL,
    
    -- 评估周期
    evaluation_period_start TIMESTAMP NOT NULL,
    evaluation_period_end TIMESTAMP NOT NULL,
    
    -- 性能指标
    avg_response_time INT COMMENT '平均响应时间(毫秒)',
    avg_token_usage INT COMMENT '平均Token使用',
    avg_cost DECIMAL(10,4) COMMENT '平均成本',
    
    -- 质量指标
    success_rate DECIMAL(5,2) COMMENT '成功率',
    accuracy_score DECIMAL(5,2) COMMENT '准确率(人工评估)',
    user_satisfaction DECIMAL(3,2) COMMENT '用户满意度',
    
    -- 统计数据
    total_requests INT COMMENT '总请求数',
    successful_requests INT COMMENT '成功请求数',
    failed_requests INT COMMENT '失败请求数',
    
    -- AI评估
    ai_assessment JSON COMMENT 'Config Assistant的自动评估',
    improvement_suggestions JSON COMMENT '改进建议',
    
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    
    INDEX idx_scenario_code (scenario_code),
    INDEX idx_config_version (config_version),
    INDEX idx_evaluation_period (evaluation_period_start, evaluation_period_end),
    UNIQUE KEY uk_scenario_period (scenario_code, config_version, evaluation_period_start)
) COMMENT='配置效果评估表';

6. 时序流程设计

6.1 双系统协作完整时序图

在这里插入图片描述

sequenceDiagram
    autonumber
    
    actor Analyst as 安全分析师
    actor Admin as 管理员
    participant Gateway as API网关
    
    box rgb(227, 242, 253) 分析系统
        participant Supervisor as Supervisor Agent
        participant Planning as Planning Agent
        participant Execution as Execution Agent
        participant Analysis as Analysis Agent
        participant Memory as Memory Manager
    end
    
    box rgb(255, 249, 196) 配置系统
        participant ConfigAgent as Config Assistant
        participant Tools as AI工具集
    end
    
    participant Cache as 缓存服务
    participant DB as 数据库
    participant DataSource as 数据源
    
    rect rgb(240, 248, 255)
        Note over Admin,Tools: 场景1: 管理员优化配置
        
        Admin->>Gateway: 提交场景配置修改
        Gateway->>ConfigAgent: 路由到Config Assistant
        
        ConfigAgent->>Tools: 提示词优化分析
        Tools->>Tools: Token计算+语义分析
        Tools-->>ConfigAgent: 优化建议(压缩45%)
        
        ConfigAgent->>Tools: 冲突检测
        Tools->>DB: 读取相关配置
        Tools->>Tools: AI分析潜在冲突
        Tools-->>ConfigAgent: 检测报告(Token预算风险)
        
        ConfigAgent-->>Admin: 返回优化建议+风险提示
        
        Admin->>Gateway: 确认应用优化配置
        Gateway->>ConfigAgent: 应用配置
        
        ConfigAgent->>DB: 保存新配置
        ConfigAgent->>Cache: 失效相关缓存
        ConfigAgent->>Gateway: 发送变更通知
        Gateway-->>Admin: 配置更新成功
    end
    
    rect rgb(232, 245, 233)
        Note over Analyst,DataSource: 场景2: 分析师使用优化后的配置
        
        Analyst->>Gateway: 发起告警分析请求
        Gateway->>Supervisor: 路由到Supervisor
        
        Supervisor->>Planning: 调度Planning Agent
        
        Planning->>Cache: 读取场景配置(新配置)
        Cache-->>Planning: 返回优化后的配置
        
        Note over Planning: Token预算检查<br/>配置已优化,预算充足
        
        Planning->>Planning: 生成查询计划
        Planning->>Memory: 保存规划结果
        
        Supervisor->>Execution: 调度Execution Agent
        Execution->>Memory: 加载查询计划
        
        par 并发查询
            Execution->>DataSource: API查询1
            Execution->>DataSource: API查询2
            Execution->>DataSource: MCP查询
        end
        
        DataSource-->>Execution: 返回查询结果
        Execution->>Memory: 保存查询结果
        
        Supervisor->>Analysis: 调度Analysis Agent
        Analysis->>Memory: 加载查询结果
        
        Note over Analysis: 结果压缩(智能摘要)<br/>Token使用在预算内
        
        Analysis->>Analysis: LLM综合分析
        Analysis->>DB: 存储分析结果
        Analysis->>Memory: 创建Checkpoint
        
        Supervisor-->>Gateway: 返回分析结果
        Gateway-->>Analyst: 响应结果
    end
    
    rect rgb(255, 243, 224)
        Note over ConfigAgent,DB: 场景3: 配置系统收集性能反馈
        
        ConfigAgent->>DB: 查询配置使用效果
        DB-->>ConfigAgent: 返回性能指标
        
        ConfigAgent->>Tools: 性能分析
        Tools->>Tools: 趋势分析+效果评估
        Tools-->>ConfigAgent: 分析报告
        
        alt 发现优化空间
            ConfigAgent->>Admin: 推送优化建议
        end
    end

6.2 Config Assistant配置优化流程

管理员 管理界面 Config API Config Assistant 提示词优化器 冲突检测器 参数推荐器 数据库 修改提示词配置 1 POST /api/config/prompt/optimize 2 调用Config Assistant 3 分析提示词 4 Token计数(5000 tokens) 5 语义分析 6 生成压缩版本(2000 tokens) 7 优化建议 8 检测冲突 9 读取相关配置 10 返回配置数据 11 逻辑验证 12 AI深度分析 13 无冲突 14 推荐参数 15 查询历史数据 16 历史性能数据 17 统计分析 18 推荐最佳参数 19 返回综合建议 20 展示建议 21 显示: • 压缩后提示词(节省60%) • 推荐参数 • 预期效果 22 确认应用 23 POST /api/config/prompt/apply 24 应用配置 25 保存配置 26 记录变更历史 27 配置已应用 28 成功 29 配置更新成功 30 管理员 管理界面 Config API Config Assistant 提示词优化器 冲突检测器 参数推荐器 数据库

7. 技术栈与性能优化

7.1 技术栈选型

分析系统技术栈
组件 技术选型 说明
Agent框架 LangGraph 原生支持多Agent、Checkpointing
LLM服务 OpenAI GPT-4/3.5 分层使用降低成本
异步框架 Python asyncio 高并发查询执行
状态管理 SqliteSaver + MemorySaver 双层状态管理
Token管理 tiktoken 精确Token计算
配置系统技术栈
组件 技术选型 说明
Agent框架 LangGraph 独立的工作流
LLM服务 OpenAI GPT-4 高质量分析
任务队列 Celery + Redis 异步处理配置任务
通知服务 WebSocket 实时推送建议

7.2 Token预算分配

41% 0% 56% 3% Token预算分配 (16K total) Planning Agent Execution Agent Analysis Agent 缓冲区

详细分配:

  • Planning Agent (6.5K):

    • 场景识别: 500 tokens (gpt-3.5-turbo)
    • 查询规划: 2000 tokens (gpt-4o-mini)
    • 提示词: 4000 tokens (可压缩)
  • Execution Agent (0):

    • 纯I/O操作,无LLM调用
  • Analysis Agent (9K):

    • 查询结果: 6000 tokens (可压缩)
    • 综合分析: 3000 tokens (gpt-4)
  • 缓冲区 (0.5K):

    • 预留缓冲,处理边界情况

7.3 性能优化策略

1. 缓存策略
class MultiLevelCache:
    """三级缓存策略"""
    
    # L1: 内存缓存 (最快,毫秒级)
    memory_cache = TTLCache(maxsize=1000, ttl=300)
    
    # L2: Redis缓存 (快,秒级)
    redis_client = Redis()
    
    # L3: 数据库 (慢,但持久化)
    db_session = SessionLocal()
    
    async def get_config(self, key):
        # L1
        if key in self.memory_cache:
            return self.memory_cache[key]
        
        # L2
        value = await self.redis_client.get(key)
        if value:
            self.memory_cache[key] = value
            return value
        
        # L3
        value = await self.db_session.get(key)
        if value:
            await self.redis_client.set(key, value, ex=1800)
            self.memory_cache[key] = value
        
        return value
2. 并发控制
# Execution Agent使用Semaphore控制并发
semaphore = asyncio.Semaphore(10)  # 最多10个并发查询

async def execute_queries(plans):
    async with semaphore:
        results = await asyncio.gather(*[
            query_executor.execute(plan) for plan in plans
        ], return_exceptions=True)
    return results
3. Token压缩
# 智能压缩提示词
if token_count > max_tokens:
    compressed = await token_manager.compress_prompt(
        original_prompt,
        target_tokens=max_tokens,
        preserve_semantics=True
    )

9. 多Agent架构深度分析

9.1 架构对比

维度 单体架构 2-Agent架构 5-Agent架构 (推荐)
职责划分 ❌ 混杂 ⚠️ Agent1过重 ✅ 单一职责
稳定性 ❌ 低 ⚠️ 中 ✅ 高
性能 ❌ 串行 ⚠️ 部分优化 ✅ 高度并行
Token管理 ❌ 无 ⚠️ 粗粒度 ✅ 精细化
配置管理 ❌ 纯人工 ❌ 纯人工 ✅ AI辅助
断点恢复 ❌ 不支持 ❌ 不支持 ✅ 支持
可扩展性 ❌ 困难 ⚠️ 一般 ✅ 易扩展

9.2 双系统协作优势

为什么需要独立的Config Assistant系统?

  1. 职责正交:

    • 分析系统: 实时业务处理
    • 配置系统: 支撑系统优化
  2. 用户角色不同:

    • 分析系统: 运维人员、分析师
    • 配置系统: AI工程师、管理员
  3. 资源隔离:

    • 配置优化可能很耗时
    • 不应占用分析流程的资源
  4. 风险隔离:

    • 配置系统故障不影响分析
    • 渐进式上线降低风险

9.3 核心优势总结

高扩展性

  • 场景驱动,易于添加新场景
  • 提示词模块化,灵活组合
  • Agent可独立扩展

高稳定性

  • 多级缓存机制
  • 完善的容错降级
  • Checkpointing断点恢复

高性能

  • 配置预加载
  • 并发查询控制
  • Token智能压缩

易维护

  • 配置与代码分离
  • AI辅助配置优化
  • 完整的变更历史

智能化

  • Config Assistant自动优化
  • 冲突自动检测
  • 参数智能推荐

10. 总结

10.1 核心创新

  1. 5-Agent协作架构:

    • 分析系统: Supervisor + Planning + Execution + Analysis + Memory Manager
    • 配置系统: Config Assistant Agent
  2. 双系统独立运行:

    • 职责清晰,资源隔离
    • 配置优化不影响分析性能
  3. AI辅助配置管理:

    • 提示词智能优化(节省45% Token)
    • 配置冲突自动检测(避免90%错误)
    • 参数智能推荐(基于历史数据)
  4. 完善的Token管理:

    • 分层预算(Planning 6.5K + Analysis 9K)
    • 智能压缩机制
    • 实时监控告警
  5. Checkpointing机制:

    • 自动断点保存
    • 故障快速恢复
    • 节省70% LLM调用

10.2 预期收益

指标 当前 优化后 提升
响应时间 15-20秒 6-10秒 ↑60%
Token成本 100% 50-60% ↓40-50%
系统可用性 95% 99.5%+ ↑80%
配置错误率 20% 2% ↓90%
新Agent接入 2天 10分钟 ↑99%

10.3 实施建议

推荐方案

采用完整5-Agent架构

  • 分析系统(4 Agents) + 配置系统(1 Agent)
  • 双系统独立部署,通过通知机制协作

渐进式实施

  • Phase 1-3: 分析系统(6周)
  • Phase 4: Config Assistant(2周)
  • Phase 5: 集成测试和上线(2周)

优先级排序

  1. 核心多Agent框架(必须)
  2. Token管理器(必须)
  3. Checkpointing(推荐)
  4. Config Assistant(推荐,但可后置)
风险控制
风险 影响 应对措施
复杂度提升 完善文档和培训
性能开销 充分测试,优化缓存
Token管理误判 预留缓冲区,监控告警
Config Assistant误判 人工最终确认机制

附录

A. 术语表

核心概念
  • 场景 (Scenario): 特定的AI分析应用场景
  • 提示词模板 (Prompt Template): 可复用的提示词配置
  • 工作流 (Workflow): LangGraph定义的处理流程
  • 节点 (Node): 工作流中的处理单元
  • 二级查询 (Two-Stage Query): 先查描述后查详情的查询策略
多Agent架构术语
  • Agent: 具有特定职责的独立处理单元
  • Supervisor Agent: 协调者,负责任务分解和Agent调度
  • Planning Agent: 规划者,负责场景识别和查询规划
  • Execution Agent: 执行者,负责并发执行数据查询
  • Analysis Agent: 分析者,负责结果综合分析和研判
  • Config Assistant Agent: 配置助手,负责智能配置管理
  • Memory Manager: 上下文管理器,负责Agent间状态共享
  • Token Budget: Token预算,控制LLM调用的Token使用量
  • Checkpointing: 检查点机制,自动保存执行状态支持恢复
  • SubGraph: 子图,Agent内部的独立工作流
  • State: 状态对象,在Agent间传递的数据结构

B. 参考资料

LangGraph官方文档
Token管理
其他资源
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐