前言

在构建 AI 应用(Agent)时,我们经常面临一个权衡:是追求响应速度,还是追求逻辑深度

  • 反应式(Reactive):像条件反射一样,查汇率、查股价,要求秒级响应。
  • 深思熟虑(Deliberative):像专家会诊一样,做资产配置、退休规划,需要多步推理和数据综合。

本文将基于 LangGraph通义千问 (Qwen),实现一个混合智能体 (Hybrid Agent)。它具备“大脑皮层”和“脑干”双重架构,能够根据用户问题的复杂程度,自动在“即时响应模式”和“深度分析模式”之间切换。

1. 系统架构设计

本系统的核心在于 协调层(Router),它像一个交通指挥官,将请求分发给不同的处理单元。

1.1 架构分层

  1. 感知层 (Assess):分析用户意图,判断是紧急查询还是复杂规划。
  2. 反应层 (Reactive):配备工具(Tools),直接调用 API 回答问题(如查询上证指数)。
  3. 认知层 (Deliberative):包含数据收集、深度分析、策略生成三个顺序步骤。

1.2 状态流转图 (Mermaid)

在这里插入图片描述


2. 核心代码实现

2.1 环境准备与状态定义

使用 LangGraph 的核心在于定义一个全局状态(State),它贯穿整个工作流。

from typing import TypedDict, Optional, Literal, Dict, Any
from langgraph.graph import StateGraph, END

# 定义智能体的“记忆”结构
class WealthAdvisorState(TypedDict):
    # 输入
    user_query: str                  # 用户问题
    customer_profile: Optional[Dict] # 客户画像
    
    # 内部状态
    query_type: Optional[Literal["emergency", "informational", "analytical"]]
    processing_mode: Optional[Literal["reactive", "deliberative"]] # 关键决策变量
    market_data: Optional[Dict]      # 收集到的数据
    analysis_results: Optional[Dict] # 分析结果
    
    # 输出
    final_response: Optional[str]    # 最终回复
    error: Optional[str]

2.2 协调层:大脑的决策 (Assess)

这是智能体的入口。我们通过 Prompt 让 LLM 判断用户的意图。

# 提示词简略版:请判断查询是 emergency 还是 analytical,返回 JSON
ASSESSMENT_PROMPT = """...请评估以下用户查询...
1. 查询类型: emergency / analytical
2. 建议的处理模式: reactive / deliberative..."""

def assess_query(state: WealthAdvisorState) -> WealthAdvisorState:
    """评估节点"""
    # 调用 LLM 进行分类
    chain = prompt | llm | JsonOutputParser()
    result = chain.invoke({"user_query": state["user_query"]})
    
    # 更新状态中的 processing_mode,决定后续走向
    return {
        **state,
        "query_type": result.get("query_type", "emergency"),
        "processing_mode": result.get("processing_mode", "reactive"),
    }

2.3 反应式路径:快思考 (Reactive)

利用 LangChain 经典的 Agent 结构,绑定工具(Tools),实现“看-想-做”的快速闭环。

def query_shanghai_index(_: str = "") -> str:
    """模拟工具:查询上证指数"""
    return "上证指数 当前点位: 3125.62,涨跌: +0.20%(模拟数据)"

def reactive_processing(state: WealthAdvisorState) -> WealthAdvisorState:
    """反应式节点"""
    tools = [Tool(name="上证指数查询", func=query_shanghai_index, ...)]
    
    # 初始化一个能够调用工具的 Agent
    agent_executor = AgentExecutor.from_agent_and_tools(...)
    
    # 执行并获取结果
    result = agent_executor.run(state["user_query"])
    return {**state, "final_response": result}

2.4 深思熟虑路径:慢思考 (Deliberative)

这是一条长链条,分为三个步骤,模拟人类顾问的工作流。

  1. Collect Data: 决定需要哪些数据(宏观经济、行业趋势等)。
  2. Analyze: 结合客户画像(风险偏好)和收集到的数据进行推理。
  3. Recommend: 生成人性化的建议书。
def collect_data(state: WealthAdvisorState) -> WealthAdvisorState:
    """步骤1:数据收集"""
    # LLM 决定需要什么数据,并模拟获取(实际项目中这里会调用外部API)
    result = chain.invoke(...) 
    return {**state, "market_data": result.get("collected_data")}

def analyze_data(state: WealthAdvisorState) -> WealthAdvisorState:
    """步骤2:深度分析"""
    # 输入:用户查询 + 客户画像 + 市场数据
    result = chain.invoke(...)
    return {**state, "analysis_results": result}

def generate_recommendations(state: WealthAdvisorState) -> WealthAdvisorState:
    """步骤3:生成建议"""
    # 将复杂的分析结果转化为自然语言
    result = chain.invoke(...)
    return {**state, "final_response": result}

2.5 构建图 (Graph Construction)

这是 LangGraph 最精彩的部分。我们将节点连接起来,并定义条件边(Conditional Edges)

def create_wealth_advisor_workflow() -> StateGraph:
    workflow = StateGraph(WealthAdvisorState)

    # 1. 添加节点
    workflow.add_node("assess", assess_query)
    workflow.add_node("reactive", reactive_processing)
    workflow.add_node("collect_data", collect_data)
    workflow.add_node("analyze", analyze_data)
    workflow.add_node("recommend", generate_recommendations)
    workflow.add_node("respond", respond_function)

    # 2. 设置入口
    workflow.set_entry_point("assess")

    # 3. 添加条件路由(关键!)
    # 如果 mode 是 reactive,走 reactive 节点;否则走 collect_data 节点
    workflow.add_conditional_edges(
        "assess",
        lambda x: "reactive" if x.get("processing_mode") == "reactive" else "collect_data",
        {
            "reactive": "reactive",
            "collect_data": "collect_data"
        }
    )

    # 4. 添加固定边(深思熟虑模式的顺序流)
    workflow.add_edge("collect_data", "analyze")
    workflow.add_edge("analyze", "recommend")
    workflow.add_edge("recommend", "respond")
    
    # 反应式模式也汇聚到响应节点
    workflow.add_edge("reactive", "respond")
    workflow.add_edge("respond", END)

    return workflow.compile()

3. 运行效果演示

我们使用相同的智能体,测试两种不同类型的查询。

场景 A:快速查询

用户输入:“今天上证指数表现如何?”

智能体日志

  1. assess 节点:识别为 emergency 类型,模式 reactive
  2. reactive 节点:调用工具 上证指数查询
  3. 输出:“上证指数当前点位 3125.62,上涨 0.20%。”
    (耗时:约 1.5 秒)

场景 B:复杂规划

用户输入:“考虑到我准备退休,请根据当前市场评估我的投资组合。”(客户画像:平衡型)

智能体日志

  1. assess 节点:识别为 analytical 类型,模式 deliberative
  2. collect_data 节点:模拟收集了债券收益率、通胀数据。
  3. analyze 节点:评估发现客户当前股票占比过高,风险暴露较大。
  4. recommend 节点:生成建议报告。
  5. 输出:“建议您将股票仓位从 60% 降至 40%,增加固收类资产配置…”
    (耗时:约 8 秒,经过了深度的推理链)

4. 总结

通过 LangGraph,我们成功打破了传统 LLM 应用“一条道走到黑”的线性限制。这个混合智能体的优势在于:

  1. 成本效益:简单问题不需要消耗大量 Token 去做深度推理。
  2. 专业性:复杂问题通过分步骤(Chain of Thought)处理,逻辑更严密,幻觉更少。
  3. 可扩展性:可以在 reactive 层添加更多 API,也可以在 deliberative 层增加“风险控制审核”等更多环节。

下一步优化方向

  • 接入真实的金融数据库(如 Tushare, Bloomberg)。
  • deliberative 模式中加入“人机交互”节点(Human-in-the-loop),在发出建议前由人工顾问确认。

5. 完整代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
混合智能体(Hybrid Agent)- 财富管理投顾AI助手

基于LangGraph实现的混合型智能体,结合反应式架构的即时响应能力和深思熟虑架构的长期规划能力,
通过协调层动态切换处理模式,提供智能化财富管理咨询服务。

三层架构:
1. 底层(反应式):即时响应客户查询,提供快速反馈
2. 中层(协调):评估任务类型和优先级,动态选择处理模式
3. 顶层(深思熟虑):进行复杂的投资分析和长期财务规划
"""

import os
import json
import re
from datetime import datetime
from typing import Dict, List, Any, Literal, TypedDict, Optional, Union, Tuple, cast

from langchain_classic.agents import AgentOutputParser, LLMSingleActionAgent, AgentExecutor
from langchain_classic.chains.llm import LLMChain
from langchain_core.agents import AgentFinish
from langchain_core.prompts import ChatPromptTemplate, StringPromptTemplate
from langchain_community.llms import Tongyi
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.tools import Tool
from langgraph.graph import StateGraph, END
import requests  # 新增:用于实时行情API请求
import warnings

from pydantic.v1 import BaseModel, Field

warnings.filterwarnings("ignore")

# 设置API密钥
DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY")

# 创建LLM实例
llm = Tongyi(model_name="qwen3-max", dashscope_api_key=DASHSCOPE_API_KEY)


# 定义客户信息数据结构
class CustomerProfile(BaseModel):
    """客户画像信息"""
    customer_id: str = Field(..., description="客户ID")
    risk_tolerance: Literal["保守型", "稳健型", "平衡型", "成长型", "进取型"] = Field(..., description="风险承受能力")
    investment_horizon: Literal["短期", "中期", "长期"] = Field(..., description="投资期限")
    financial_goals: List[str] = Field(..., description="财务目标")
    investment_preferences: List[str] = Field(..., description="投资偏好")
    portfolio_value: float = Field(..., description="投资组合总价值")
    current_allocations: Dict[str, float] = Field(..., description="当前资产配置")


# 定义应急响应输出
class EmergencyResponseOutput(BaseModel):
    """紧急查询的即时响应"""
    response_type: str = Field(..., description="响应类型")
    direct_answer: str = Field(..., description="直接回答")
    data_points: Optional[Dict[str, Any]] = Field(None, description="相关数据点")
    suggested_actions: Optional[List[str]] = Field(None, description="建议操作")


# 定义深度分析输出
class InvestmentAnalysisOutput(BaseModel):
    """深度投资分析结果"""
    market_assessment: str = Field(..., description="市场评估")
    portfolio_analysis: Dict[str, Any] = Field(..., description="投资组合分析")
    recommendations: List[Dict[str, Any]] = Field(..., description="投资建议")
    risk_analysis: Dict[str, Any] = Field(..., description="风险分析")
    expected_outcomes: Dict[str, Any] = Field(..., description="预期结果")


# 定义状态类型
class WealthAdvisorState(TypedDict):
    """财富顾问智能体的状态"""
    # 输入
    user_query: str  # 用户查询
    customer_profile: Optional[Dict[str, Any]]  # 客户画像

    # 处理状态
    query_type: Optional[Literal["emergency", "informational", "analytical"]]  # 查询类型
    processing_mode: Optional[Literal["reactive", "deliberative"]]  # 处理模式
    emergency_response: Optional[Dict[str, Any]]  # 紧急响应结果
    market_data: Optional[Dict[str, Any]]  # 市场数据
    analysis_results: Optional[Dict[str, Any]]  # 分析结果

    # 输出
    final_response: Optional[str]  # 最终响应

    # 控制流
    current_phase: Optional[str]
    error: Optional[str]  # 错误信息


# 提示模板
ASSESSMENT_PROMPT = """你是一个财富管理投顾AI助手的协调层。请评估以下用户查询,确定其类型和应该采用的处理模式。

用户查询: {user_query}

请判断:
1. 查询类型: 
   - "emergency": 紧急的或直接的查询,需要立即响应(如市场状况、账户信息、产品信息等)
   - "informational": 信息性的查询,需要特定领域知识(如税务政策、投资工具介绍等)
   - "analytical": 需要深度分析的查询(如投资组合优化、长期理财规划等)

2. 建议的处理模式:
   - "reactive": 适用于需要快速反应的查询
   - "deliberative": 适用于需要深度思考和分析的查询

请以JSON格式返回结果,包含以下字段:
- query_type: 查询类型(上述三种类型之一)
- processing_mode: 处理模式(上述两种模式之一)
- reasoning: 决策理由的简要说明
"""

REACTIVE_PROMPT = """你是一个财富管理投顾AI助手,专注于提供快速准确的响应。请针对用户的查询提供直接的回答。

用户查询: {user_query}

客户信息:
{customer_profile}

请提供:
1. 直接回答用户问题
2. 相关的关键数据点(如适用)
3. 建议的后续操作(如适用)

以JSON格式返回响应,包含以下字段:
- response_type: 响应类型
- direct_answer: 直接回答
- data_points: 相关数据点(可选)
- suggested_actions: 建议操作(可选)
"""

DATA_COLLECTION_PROMPT = """你是一个财富管理投顾AI助手的数据收集模块。基于以下用户查询,确定需要收集哪些市场和财务数据进行深入分析。

用户查询: {user_query}

客户信息:
{customer_profile}

请确定需要收集的数据类型,例如:
- 资产类别表现数据
- 经济指标
- 行业趋势
- 历史回报率
- 风险指标
- 税收信息
- 其他相关数据

以JSON格式返回结果,包含以下字段:
- required_data_types: 需要收集的数据类型列表
- data_sources: 建议的数据来源列表
- collected_data: 模拟收集的数据(为简化示例,请生成合理的模拟数据)
"""

ANALYSIS_PROMPT = """你是一个财富管理投顾AI助手的分析引擎。请根据收集的数据对用户的投资情况进行深入分析。

用户查询: {user_query}

客户信息:
{customer_profile}

市场数据:
{market_data}

请提供全面的投资分析,包括:
1. 当前市场状况评估
2. 客户投资组合分析
3. 个性化投资建议
4. 风险评估
5. 预期结果和回报预测

以JSON格式返回分析结果,包含以下字段:
- market_assessment: 市场评估
- portfolio_analysis: 投资组合分析
- recommendations: 投资建议列表
- risk_analysis: 风险分析
- expected_outcomes: 预期结果
"""

RECOMMENDATION_PROMPT = """你是一个财富管理投顾AI助手。请根据深入分析结果,为客户准备最终的咨询建议。

用户查询: {user_query}

客户信息:
{customer_profile}

分析结果:
{analysis_results}

请提供专业、个性化且详细的投资建议,语言应友好易懂,避免过多专业术语。建议应包括:
1. 总体投资策略
2. 具体行动步骤
3. 资产配置建议
4. 风险管理策略
5. 时间框架
6. 预期收益
7. 后续跟进计划

返回格式应为自然语言文本,适合直接呈现给客户。
"""


def query_shanghai_index(_: str = "") -> str:
    """上证指数实时查询工具(模拟版),返回固定的行情数据"""
    # 直接返回模拟数据,避免外部API不可用导致报错
    name = "上证指数"
    price = "3125.62"
    change = "6.32"
    pct = "0.20"
    return f"{name} 当前点位: {price},涨跌: {change},涨跌幅: {pct}%(模拟数据)"


# 第一阶段:情境评估 - 确定查询类型和处理模式
def assess_query(state: WealthAdvisorState) -> WealthAdvisorState:
    print("[DEBUG] 进入节点: assess_query")
    """评估用户查询,确定类型和处理模式"""

    try:
        # 准备提示
        prompt = ChatPromptTemplate.from_template(ASSESSMENT_PROMPT)

        # 构建输入
        input_data = {
            "user_query": state["user_query"],
        }

        # 调用LLM
        chain = prompt | llm | JsonOutputParser()
        result = chain.invoke(input_data)
        print("[DEBUG] LLM评估输出:", result)
        print(
            f"[DEBUG] 分支判断: processing_mode={result.get('processing_mode', '未知')}, query_type={result.get('query_type', '未知')}")
        # 获取处理模式,确保有值
        processing_mode = result.get("processing_mode", "reactive")
        if processing_mode not in ["reactive", "deliberative"]:
            processing_mode = "reactive"  # 默认使用反应式处理
        # 获取查询类型,确保有值
        query_type = result.get("query_type", "emergency")
        if query_type not in ["emergency", "informational", "analytical"]:
            query_type = "emergency"  # 默认为紧急查询
        # ==========================
        # 更新状态
        updated_state = {
            **state,
            "query_type": query_type,
            "processing_mode": processing_mode,
        }
        return updated_state
    except Exception as e:
        return {
            **state,
            "error": f"评估阶段出错: {str(e)}",
            "final_response": "评估查询时发生错误,无法处理您的请求。"
        }


# 反应式处理 - 快速响应简单查询
def reactive_processing(state: WealthAdvisorState) -> WealthAdvisorState:
    print("[DEBUG] 进入节点: reactive_processing")
    """反应式处理模式,提供快速响应,支持工具调用"""
    try:
        # 定义工具列表
        tools = [
            Tool(
                name="上证指数查询",
                func=query_shanghai_index,
                description="用于查询上证指数的最新行情,输入内容可为空或任意字符串"
            ),
        ]

        # 可扩展:此处可继续添加其他反应式工具

        # 构建Agent提示模板
        class SimplePromptTemplate(StringPromptTemplate):
            def format(self, **kwargs):
                return f"用户问题: {kwargs['input']}\n请根据需要调用工具,直接给出答案。"

        prompt = SimplePromptTemplate(input_variables=["input", "intermediate_steps"])
        llm_chain = LLMChain(llm=llm, prompt=prompt)
        tool_names = [tool.name for tool in tools]

        # 修正:继承AgentOutputParser,确保兼容性
        class SimpleOutputParser(AgentOutputParser):
            def parse(self, text):
                # 直接将LLM输出作为最终答案
                return AgentFinish(return_values={"output": text.strip()}, log=text)

        output_parser = SimpleOutputParser()
        agent = LLMSingleActionAgent(
            llm_chain=llm_chain,
            output_parser=output_parser,
            stop=["\nObservation:"],
            allowed_tools=tool_names,
        )
        agent_executor = AgentExecutor.from_agent_and_tools(
            agent=agent, tools=tools, verbose=False
        )
        # 运行Agent
        user_query = state["user_query"]
        result = agent_executor.run(user_query)
        return {
            **state,
            "final_response": result
        }
    except Exception as e:
        return {
            **state,
            "error": f"反应式处理出错: {str(e)}",
            "final_response": "处理您的查询时发生错误,无法提供响应。"
        }


# 数据收集 - 收集进行深度分析所需的数据
def collect_data(state: WealthAdvisorState) -> WealthAdvisorState:
    print("[DEBUG] 进入节点: collect_data")
    """收集市场数据和客户信息进行深入分析"""

    try:
        # 准备提示
        prompt = ChatPromptTemplate.from_template(DATA_COLLECTION_PROMPT)

        # 构建输入
        input_data = {
            "user_query": state["user_query"],
            "customer_profile": json.dumps(state.get("customer_profile", {}), ensure_ascii=False, indent=2)
        }

        # 调用LLM
        chain = prompt | llm | JsonOutputParser()
        result = chain.invoke(input_data)

        # 更新状态
        return {
            **state,
            "market_data": result.get("collected_data", {}),
            "current_phase": "analyze"
        }
    except Exception as e:
        return {
            **state,
            "error": f"数据收集阶段出错: {str(e)}",
            "current_phase": "collect_data"  # 保持在当前阶段
        }


# 深度分析 - 分析数据和客户情况
def analyze_data(state: WealthAdvisorState) -> WealthAdvisorState:
    print("[DEBUG] 进入节点: analyze_data")
    """进行深度投资分析"""

    try:
        # 确保必要数据已收集
        if not state.get("market_data"):
            return {
                **state,
                "error": "分析阶段缺少市场数据",
                "current_phase": "collect_data"  # 回到数据收集阶段
            }

        # 准备提示
        prompt = ChatPromptTemplate.from_template(ANALYSIS_PROMPT)

        # 构建输入
        input_data = {
            "user_query": state["user_query"],
            "customer_profile": json.dumps(state.get("customer_profile", {}), ensure_ascii=False, indent=2),
            "market_data": json.dumps(state.get("market_data", {}), ensure_ascii=False, indent=2)
        }

        # 调用LLM
        chain = prompt | llm | JsonOutputParser()
        result = chain.invoke(input_data)

        # 更新状态
        return {
            **state,
            "analysis_results": result,
            "current_phase": "recommend"
        }
    except Exception as e:
        return {
            **state,
            "error": f"分析阶段出错: {str(e)}",
            "current_phase": "analyze"  # 保持在当前阶段
        }


# 生成建议 - 根据分析结果提供投资建议
def generate_recommendations(state: WealthAdvisorState) -> WealthAdvisorState:
    print("[DEBUG] 进入节点: generate_recommendations")
    """生成投资建议和行动计划"""

    try:
        # 确保分析结果已存在
        if not state.get("analysis_results"):
            return {
                **state,
                "error": "建议生成阶段缺少分析结果",
                "current_phase": "analyze"  # 回到分析阶段
            }

        # 准备提示
        prompt = ChatPromptTemplate.from_template(RECOMMENDATION_PROMPT)

        # 构建输入
        input_data = {
            "user_query": state["user_query"],
            "customer_profile": json.dumps(state.get("customer_profile", {}), ensure_ascii=False, indent=2),
            "analysis_results": json.dumps(state.get("analysis_results", {}), ensure_ascii=False, indent=2)
        }

        # 调用LLM
        chain = prompt | llm | StrOutputParser()
        result = chain.invoke(input_data)

        # 更新状态
        return {
            **state,
            "final_response": result,
            "current_phase": "respond"
        }
    except Exception as e:
        return {
            **state,
            "error": f"建议生成阶段出错: {str(e)}",
            "current_phase": "recommend"  # 保持在当前阶段
        }


# 创建智能体工作流
def create_wealth_advisor_workflow() -> StateGraph:
    """创建财富顾问混合智能体工作流"""

    # 创建状态图
    workflow = StateGraph(WealthAdvisorState)

    # 添加节点,每个节点都确保返回完整的状态
    workflow.add_node("assess", assess_query)
    workflow.add_node("reactive", reactive_processing)
    workflow.add_node("collect_data", collect_data)
    workflow.add_node("analyze", analyze_data)
    workflow.add_node("recommend", generate_recommendations)

    # 定义一个显式的响应节点函数
    def respond_function(state: WealthAdvisorState) -> WealthAdvisorState:
        """最终响应生成节点,原样返回状态"""
        # 确保final_response字段有值
        if not state.get("final_response"):
            state = {
                **state,
                "final_response": "无法生成响应。请检查处理流程。",
                "error": state.get("error", "未知错误")
            }
        return state

    workflow.add_node("respond", respond_function)

    # 设置入口点
    workflow.set_entry_point("assess")

    # 添加分支路由
    workflow.add_conditional_edges(
        "assess",
        lambda x: "reactive" if x.get("processing_mode") == "reactive" else "collect_data",
        {
            "reactive": "reactive",
            "collect_data": "collect_data"
        }
    )

    # 添加固定路径边
    workflow.add_edge("reactive", "respond")
    workflow.add_edge("collect_data", "analyze")
    workflow.add_edge("analyze", "recommend")
    workflow.add_edge("recommend", "respond")
    workflow.add_edge("respond", END)

    # 编译工作流
    return workflow.compile()


# 示例客户画像数据
SAMPLE_CUSTOMER_PROFILES = {
    "customer1": {
        "customer_id": "C10012345",
        "risk_tolerance": "平衡型",
        "investment_horizon": "中期",
        "financial_goals": ["退休规划", "子女教育金"],
        "investment_preferences": ["ESG投资", "科技行业"],
        "portfolio_value": 1500000.0,
        "current_allocations": {
            "股票": 0.40,
            "债券": 0.30,
            "现金": 0.10,
            "另类投资": 0.20
        }
    },
    "customer2": {
        "customer_id": "C10067890",
        "risk_tolerance": "进取型",
        "investment_horizon": "长期",
        "financial_goals": ["财富增长", "资产配置多元化"],
        "investment_preferences": ["新兴市场", "高成长行业"],
        "portfolio_value": 3000000.0,
        "current_allocations": {
            "股票": 0.65,
            "债券": 0.15,
            "现金": 0.05,
            "另类投资": 0.15
        }
    }
}


# 运行智能体
def run_wealth_advisor(user_query: str, customer_id: str = "customer1") -> Dict[str, Any]:
    """运行财富顾问智能体并返回结果"""

    # 创建工作流
    agent = create_wealth_advisor_workflow()

    # 获取客户画像
    customer_profile = SAMPLE_CUSTOMER_PROFILES.get(customer_id, SAMPLE_CUSTOMER_PROFILES["customer1"])

    # 准备初始状态
    initial_state = {
        "user_query": user_query,
        "customer_profile": customer_profile,
        "query_type": None,
        "processing_mode": None,
        "emergency_response": None,
        "market_data": None,
        "analysis_results": None,
        "final_response": None,
        "current_phase": "assess",
        "error": None
    }

    try:
        print("LangGraph Mermaid流程图:")
        print(agent.get_graph().draw_mermaid())

        # 运行智能体并捕获可能的异常
        result = agent.invoke(initial_state)
        return result
    except Exception as e:
        error_msg = str(e)
        print(f"捕获异常: {error_msg}")
        # 返回带有错误信息的状态
        return {
            **initial_state,
            "error": f"执行过程中发生错误: {error_msg}",
            "final_response": "很抱歉,处理您的请求时出现了问题。"
        }


# 主函数
if __name__ == "__main__":
    print("=== 混合智能体 - 财富管理投顾AI助手 ===\n")
    print("使用模型:Qwen-Turbo-2025-04-28\n")
    print("\n" + "-" * 50 + "\n")

    # 示例查询
    SAMPLE_QUERIES = [
        # 紧急/简单查询 - 适合反应式处理
        "今天上证指数的表现如何?",
        "我的投资组合中科技股占比是多少?",
        "请解释一下什么是ETF?",

        # 分析性查询 - 适合深思熟虑处理
        "根据当前市场情况,我应该如何调整投资组合以应对可能的经济衰退?",
        "考虑到我的退休目标,请评估我当前的投资策略并提供优化建议。",
        "我想为子女准备教育金,请帮我设计一个10年期的投资计划。"
    ]

    # 用户选择查询示例或输入自定义查询
    print("请选择一个示例查询或输入您自己的查询:\n")
    for i, query in enumerate(SAMPLE_QUERIES, 1):
        print(f"{i}. {query}")
    print("0. 输入自定义查询")

    choice = input("\n请输入选项数字(0-6): ")

    if choice == "0":
        user_query = input("请输入您的查询: ")
    else:
        try:
            idx = int(choice) - 1
            if 0 <= idx < len(SAMPLE_QUERIES):
                user_query = SAMPLE_QUERIES[idx]
            else:
                print("无效选择,使用默认查询")
                user_query = SAMPLE_QUERIES[0]
        except ValueError:
            print("无效输入,使用默认查询")
            user_query = SAMPLE_QUERIES[0]

    # 选择客户
    customer_id = "customer1"  # 默认客户
    customer_choice = input("\n选择客户 (1: 平衡型投资者, 2: 进取型投资者): ")
    if customer_choice == "2":
        customer_id = "customer2"

    print(f"\n用户查询: {user_query}")
    print(f"选择客户: {SAMPLE_CUSTOMER_PROFILES[customer_id]['risk_tolerance']} 投资者")
    print("\n正在处理...\n")

    try:
        # 运行智能体
        start_time = datetime.now()
        result = run_wealth_advisor(user_query, customer_id)
        end_time = datetime.now()

        # 如果有错误,显示错误信息并退出
        if result.get("error"):
            print(f"处理过程中发生错误: {result['error']}")
            print(f"\n最终响应: {result.get('final_response', '未能生成响应')}")
            process_time = (end_time - start_time).total_seconds()
            print(f"\n处理用时: {process_time:.2f}秒")
            exit(1)

        # 显示处理模式
        process_mode = result.get("processing_mode", "未知")
        if process_mode == "reactive":
            print("【处理模式: 反应式】- 快速响应简单查询")
        else:
            print("【处理模式: 深思熟虑】- 深度分析复杂查询")

        # 显示结果
        print("\n=== 响应结果 ===\n")
        print(result.get("final_response", "未生成响应"))

        # 显示处理时间
        process_time = (end_time - start_time).total_seconds()
        print(f"\n处理用时: {process_time:.2f}秒")

    except Exception as e:
        print(f"\n运行过程中发生意外错误: {str(e)}")

附:运行环境

  • Python 3.9+
  • langgraph
  • langchain
  • langchain-community
  • dashscope (通义千问 SDK)
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐