实战 LangGraph:打造“快思考”与“慢思考”结合的财富管理 AI 智能体
实战 LangGraph:打造“快思考”与“慢思考”结合的财富管理 AI 智能体
前言
在构建 AI 应用(Agent)时,我们经常面临一个权衡:是追求响应速度,还是追求逻辑深度?
- 反应式(Reactive):像条件反射一样,查汇率、查股价,要求秒级响应。
- 深思熟虑(Deliberative):像专家会诊一样,做资产配置、退休规划,需要多步推理和数据综合。
本文将基于 LangGraph 和 通义千问 (Qwen),实现一个混合智能体 (Hybrid Agent)。它具备“大脑皮层”和“脑干”双重架构,能够根据用户问题的复杂程度,自动在“即时响应模式”和“深度分析模式”之间切换。
1. 系统架构设计
本系统的核心在于 协调层(Router),它像一个交通指挥官,将请求分发给不同的处理单元。
1.1 架构分层
- 感知层 (Assess):分析用户意图,判断是紧急查询还是复杂规划。
- 反应层 (Reactive):配备工具(Tools),直接调用 API 回答问题(如查询上证指数)。
- 认知层 (Deliberative):包含数据收集、深度分析、策略生成三个顺序步骤。
1.2 状态流转图 (Mermaid)

2. 核心代码实现
2.1 环境准备与状态定义
使用 LangGraph 的核心在于定义一个全局状态(State),它贯穿整个工作流。
from typing import TypedDict, Optional, Literal, Dict, Any
from langgraph.graph import StateGraph, END
# 定义智能体的“记忆”结构
class WealthAdvisorState(TypedDict):
# 输入
user_query: str # 用户问题
customer_profile: Optional[Dict] # 客户画像
# 内部状态
query_type: Optional[Literal["emergency", "informational", "analytical"]]
processing_mode: Optional[Literal["reactive", "deliberative"]] # 关键决策变量
market_data: Optional[Dict] # 收集到的数据
analysis_results: Optional[Dict] # 分析结果
# 输出
final_response: Optional[str] # 最终回复
error: Optional[str]
2.2 协调层:大脑的决策 (Assess)
这是智能体的入口。我们通过 Prompt 让 LLM 判断用户的意图。
# 提示词简略版:请判断查询是 emergency 还是 analytical,返回 JSON
ASSESSMENT_PROMPT = """...请评估以下用户查询...
1. 查询类型: emergency / analytical
2. 建议的处理模式: reactive / deliberative..."""
def assess_query(state: WealthAdvisorState) -> WealthAdvisorState:
"""评估节点"""
# 调用 LLM 进行分类
chain = prompt | llm | JsonOutputParser()
result = chain.invoke({"user_query": state["user_query"]})
# 更新状态中的 processing_mode,决定后续走向
return {
**state,
"query_type": result.get("query_type", "emergency"),
"processing_mode": result.get("processing_mode", "reactive"),
}
2.3 反应式路径:快思考 (Reactive)
利用 LangChain 经典的 Agent 结构,绑定工具(Tools),实现“看-想-做”的快速闭环。
def query_shanghai_index(_: str = "") -> str:
"""模拟工具:查询上证指数"""
return "上证指数 当前点位: 3125.62,涨跌: +0.20%(模拟数据)"
def reactive_processing(state: WealthAdvisorState) -> WealthAdvisorState:
"""反应式节点"""
tools = [Tool(name="上证指数查询", func=query_shanghai_index, ...)]
# 初始化一个能够调用工具的 Agent
agent_executor = AgentExecutor.from_agent_and_tools(...)
# 执行并获取结果
result = agent_executor.run(state["user_query"])
return {**state, "final_response": result}
2.4 深思熟虑路径:慢思考 (Deliberative)
这是一条长链条,分为三个步骤,模拟人类顾问的工作流。
- Collect Data: 决定需要哪些数据(宏观经济、行业趋势等)。
- Analyze: 结合客户画像(风险偏好)和收集到的数据进行推理。
- Recommend: 生成人性化的建议书。
def collect_data(state: WealthAdvisorState) -> WealthAdvisorState:
"""步骤1:数据收集"""
# LLM 决定需要什么数据,并模拟获取(实际项目中这里会调用外部API)
result = chain.invoke(...)
return {**state, "market_data": result.get("collected_data")}
def analyze_data(state: WealthAdvisorState) -> WealthAdvisorState:
"""步骤2:深度分析"""
# 输入:用户查询 + 客户画像 + 市场数据
result = chain.invoke(...)
return {**state, "analysis_results": result}
def generate_recommendations(state: WealthAdvisorState) -> WealthAdvisorState:
"""步骤3:生成建议"""
# 将复杂的分析结果转化为自然语言
result = chain.invoke(...)
return {**state, "final_response": result}
2.5 构建图 (Graph Construction)
这是 LangGraph 最精彩的部分。我们将节点连接起来,并定义条件边(Conditional Edges)。
def create_wealth_advisor_workflow() -> StateGraph:
workflow = StateGraph(WealthAdvisorState)
# 1. 添加节点
workflow.add_node("assess", assess_query)
workflow.add_node("reactive", reactive_processing)
workflow.add_node("collect_data", collect_data)
workflow.add_node("analyze", analyze_data)
workflow.add_node("recommend", generate_recommendations)
workflow.add_node("respond", respond_function)
# 2. 设置入口
workflow.set_entry_point("assess")
# 3. 添加条件路由(关键!)
# 如果 mode 是 reactive,走 reactive 节点;否则走 collect_data 节点
workflow.add_conditional_edges(
"assess",
lambda x: "reactive" if x.get("processing_mode") == "reactive" else "collect_data",
{
"reactive": "reactive",
"collect_data": "collect_data"
}
)
# 4. 添加固定边(深思熟虑模式的顺序流)
workflow.add_edge("collect_data", "analyze")
workflow.add_edge("analyze", "recommend")
workflow.add_edge("recommend", "respond")
# 反应式模式也汇聚到响应节点
workflow.add_edge("reactive", "respond")
workflow.add_edge("respond", END)
return workflow.compile()
3. 运行效果演示
我们使用相同的智能体,测试两种不同类型的查询。
场景 A:快速查询
用户输入:“今天上证指数表现如何?”
智能体日志:
assess节点:识别为emergency类型,模式reactive。reactive节点:调用工具上证指数查询。- 输出:“上证指数当前点位 3125.62,上涨 0.20%。”
(耗时:约 1.5 秒)
场景 B:复杂规划
用户输入:“考虑到我准备退休,请根据当前市场评估我的投资组合。”(客户画像:平衡型)
智能体日志:
assess节点:识别为analytical类型,模式deliberative。collect_data节点:模拟收集了债券收益率、通胀数据。analyze节点:评估发现客户当前股票占比过高,风险暴露较大。recommend节点:生成建议报告。- 输出:“建议您将股票仓位从 60% 降至 40%,增加固收类资产配置…”
(耗时:约 8 秒,经过了深度的推理链)
4. 总结
通过 LangGraph,我们成功打破了传统 LLM 应用“一条道走到黑”的线性限制。这个混合智能体的优势在于:
- 成本效益:简单问题不需要消耗大量 Token 去做深度推理。
- 专业性:复杂问题通过分步骤(Chain of Thought)处理,逻辑更严密,幻觉更少。
- 可扩展性:可以在
reactive层添加更多 API,也可以在deliberative层增加“风险控制审核”等更多环节。
下一步优化方向:
- 接入真实的金融数据库(如 Tushare, Bloomberg)。
- 在
deliberative模式中加入“人机交互”节点(Human-in-the-loop),在发出建议前由人工顾问确认。
5. 完整代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
混合智能体(Hybrid Agent)- 财富管理投顾AI助手
基于LangGraph实现的混合型智能体,结合反应式架构的即时响应能力和深思熟虑架构的长期规划能力,
通过协调层动态切换处理模式,提供智能化财富管理咨询服务。
三层架构:
1. 底层(反应式):即时响应客户查询,提供快速反馈
2. 中层(协调):评估任务类型和优先级,动态选择处理模式
3. 顶层(深思熟虑):进行复杂的投资分析和长期财务规划
"""
import os
import json
import re
from datetime import datetime
from typing import Dict, List, Any, Literal, TypedDict, Optional, Union, Tuple, cast
from langchain_classic.agents import AgentOutputParser, LLMSingleActionAgent, AgentExecutor
from langchain_classic.chains.llm import LLMChain
from langchain_core.agents import AgentFinish
from langchain_core.prompts import ChatPromptTemplate, StringPromptTemplate
from langchain_community.llms import Tongyi
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.tools import Tool
from langgraph.graph import StateGraph, END
import requests # 新增:用于实时行情API请求
import warnings
from pydantic.v1 import BaseModel, Field
warnings.filterwarnings("ignore")
# 设置API密钥
DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY")
# 创建LLM实例
llm = Tongyi(model_name="qwen3-max", dashscope_api_key=DASHSCOPE_API_KEY)
# 定义客户信息数据结构
class CustomerProfile(BaseModel):
"""客户画像信息"""
customer_id: str = Field(..., description="客户ID")
risk_tolerance: Literal["保守型", "稳健型", "平衡型", "成长型", "进取型"] = Field(..., description="风险承受能力")
investment_horizon: Literal["短期", "中期", "长期"] = Field(..., description="投资期限")
financial_goals: List[str] = Field(..., description="财务目标")
investment_preferences: List[str] = Field(..., description="投资偏好")
portfolio_value: float = Field(..., description="投资组合总价值")
current_allocations: Dict[str, float] = Field(..., description="当前资产配置")
# 定义应急响应输出
class EmergencyResponseOutput(BaseModel):
"""紧急查询的即时响应"""
response_type: str = Field(..., description="响应类型")
direct_answer: str = Field(..., description="直接回答")
data_points: Optional[Dict[str, Any]] = Field(None, description="相关数据点")
suggested_actions: Optional[List[str]] = Field(None, description="建议操作")
# 定义深度分析输出
class InvestmentAnalysisOutput(BaseModel):
"""深度投资分析结果"""
market_assessment: str = Field(..., description="市场评估")
portfolio_analysis: Dict[str, Any] = Field(..., description="投资组合分析")
recommendations: List[Dict[str, Any]] = Field(..., description="投资建议")
risk_analysis: Dict[str, Any] = Field(..., description="风险分析")
expected_outcomes: Dict[str, Any] = Field(..., description="预期结果")
# 定义状态类型
class WealthAdvisorState(TypedDict):
"""财富顾问智能体的状态"""
# 输入
user_query: str # 用户查询
customer_profile: Optional[Dict[str, Any]] # 客户画像
# 处理状态
query_type: Optional[Literal["emergency", "informational", "analytical"]] # 查询类型
processing_mode: Optional[Literal["reactive", "deliberative"]] # 处理模式
emergency_response: Optional[Dict[str, Any]] # 紧急响应结果
market_data: Optional[Dict[str, Any]] # 市场数据
analysis_results: Optional[Dict[str, Any]] # 分析结果
# 输出
final_response: Optional[str] # 最终响应
# 控制流
current_phase: Optional[str]
error: Optional[str] # 错误信息
# 提示模板
ASSESSMENT_PROMPT = """你是一个财富管理投顾AI助手的协调层。请评估以下用户查询,确定其类型和应该采用的处理模式。
用户查询: {user_query}
请判断:
1. 查询类型:
- "emergency": 紧急的或直接的查询,需要立即响应(如市场状况、账户信息、产品信息等)
- "informational": 信息性的查询,需要特定领域知识(如税务政策、投资工具介绍等)
- "analytical": 需要深度分析的查询(如投资组合优化、长期理财规划等)
2. 建议的处理模式:
- "reactive": 适用于需要快速反应的查询
- "deliberative": 适用于需要深度思考和分析的查询
请以JSON格式返回结果,包含以下字段:
- query_type: 查询类型(上述三种类型之一)
- processing_mode: 处理模式(上述两种模式之一)
- reasoning: 决策理由的简要说明
"""
REACTIVE_PROMPT = """你是一个财富管理投顾AI助手,专注于提供快速准确的响应。请针对用户的查询提供直接的回答。
用户查询: {user_query}
客户信息:
{customer_profile}
请提供:
1. 直接回答用户问题
2. 相关的关键数据点(如适用)
3. 建议的后续操作(如适用)
以JSON格式返回响应,包含以下字段:
- response_type: 响应类型
- direct_answer: 直接回答
- data_points: 相关数据点(可选)
- suggested_actions: 建议操作(可选)
"""
DATA_COLLECTION_PROMPT = """你是一个财富管理投顾AI助手的数据收集模块。基于以下用户查询,确定需要收集哪些市场和财务数据进行深入分析。
用户查询: {user_query}
客户信息:
{customer_profile}
请确定需要收集的数据类型,例如:
- 资产类别表现数据
- 经济指标
- 行业趋势
- 历史回报率
- 风险指标
- 税收信息
- 其他相关数据
以JSON格式返回结果,包含以下字段:
- required_data_types: 需要收集的数据类型列表
- data_sources: 建议的数据来源列表
- collected_data: 模拟收集的数据(为简化示例,请生成合理的模拟数据)
"""
ANALYSIS_PROMPT = """你是一个财富管理投顾AI助手的分析引擎。请根据收集的数据对用户的投资情况进行深入分析。
用户查询: {user_query}
客户信息:
{customer_profile}
市场数据:
{market_data}
请提供全面的投资分析,包括:
1. 当前市场状况评估
2. 客户投资组合分析
3. 个性化投资建议
4. 风险评估
5. 预期结果和回报预测
以JSON格式返回分析结果,包含以下字段:
- market_assessment: 市场评估
- portfolio_analysis: 投资组合分析
- recommendations: 投资建议列表
- risk_analysis: 风险分析
- expected_outcomes: 预期结果
"""
RECOMMENDATION_PROMPT = """你是一个财富管理投顾AI助手。请根据深入分析结果,为客户准备最终的咨询建议。
用户查询: {user_query}
客户信息:
{customer_profile}
分析结果:
{analysis_results}
请提供专业、个性化且详细的投资建议,语言应友好易懂,避免过多专业术语。建议应包括:
1. 总体投资策略
2. 具体行动步骤
3. 资产配置建议
4. 风险管理策略
5. 时间框架
6. 预期收益
7. 后续跟进计划
返回格式应为自然语言文本,适合直接呈现给客户。
"""
def query_shanghai_index(_: str = "") -> str:
"""上证指数实时查询工具(模拟版),返回固定的行情数据"""
# 直接返回模拟数据,避免外部API不可用导致报错
name = "上证指数"
price = "3125.62"
change = "6.32"
pct = "0.20"
return f"{name} 当前点位: {price},涨跌: {change},涨跌幅: {pct}%(模拟数据)"
# 第一阶段:情境评估 - 确定查询类型和处理模式
def assess_query(state: WealthAdvisorState) -> WealthAdvisorState:
print("[DEBUG] 进入节点: assess_query")
"""评估用户查询,确定类型和处理模式"""
try:
# 准备提示
prompt = ChatPromptTemplate.from_template(ASSESSMENT_PROMPT)
# 构建输入
input_data = {
"user_query": state["user_query"],
}
# 调用LLM
chain = prompt | llm | JsonOutputParser()
result = chain.invoke(input_data)
print("[DEBUG] LLM评估输出:", result)
print(
f"[DEBUG] 分支判断: processing_mode={result.get('processing_mode', '未知')}, query_type={result.get('query_type', '未知')}")
# 获取处理模式,确保有值
processing_mode = result.get("processing_mode", "reactive")
if processing_mode not in ["reactive", "deliberative"]:
processing_mode = "reactive" # 默认使用反应式处理
# 获取查询类型,确保有值
query_type = result.get("query_type", "emergency")
if query_type not in ["emergency", "informational", "analytical"]:
query_type = "emergency" # 默认为紧急查询
# ==========================
# 更新状态
updated_state = {
**state,
"query_type": query_type,
"processing_mode": processing_mode,
}
return updated_state
except Exception as e:
return {
**state,
"error": f"评估阶段出错: {str(e)}",
"final_response": "评估查询时发生错误,无法处理您的请求。"
}
# 反应式处理 - 快速响应简单查询
def reactive_processing(state: WealthAdvisorState) -> WealthAdvisorState:
print("[DEBUG] 进入节点: reactive_processing")
"""反应式处理模式,提供快速响应,支持工具调用"""
try:
# 定义工具列表
tools = [
Tool(
name="上证指数查询",
func=query_shanghai_index,
description="用于查询上证指数的最新行情,输入内容可为空或任意字符串"
),
]
# 可扩展:此处可继续添加其他反应式工具
# 构建Agent提示模板
class SimplePromptTemplate(StringPromptTemplate):
def format(self, **kwargs):
return f"用户问题: {kwargs['input']}\n请根据需要调用工具,直接给出答案。"
prompt = SimplePromptTemplate(input_variables=["input", "intermediate_steps"])
llm_chain = LLMChain(llm=llm, prompt=prompt)
tool_names = [tool.name for tool in tools]
# 修正:继承AgentOutputParser,确保兼容性
class SimpleOutputParser(AgentOutputParser):
def parse(self, text):
# 直接将LLM输出作为最终答案
return AgentFinish(return_values={"output": text.strip()}, log=text)
output_parser = SimpleOutputParser()
agent = LLMSingleActionAgent(
llm_chain=llm_chain,
output_parser=output_parser,
stop=["\nObservation:"],
allowed_tools=tool_names,
)
agent_executor = AgentExecutor.from_agent_and_tools(
agent=agent, tools=tools, verbose=False
)
# 运行Agent
user_query = state["user_query"]
result = agent_executor.run(user_query)
return {
**state,
"final_response": result
}
except Exception as e:
return {
**state,
"error": f"反应式处理出错: {str(e)}",
"final_response": "处理您的查询时发生错误,无法提供响应。"
}
# 数据收集 - 收集进行深度分析所需的数据
def collect_data(state: WealthAdvisorState) -> WealthAdvisorState:
print("[DEBUG] 进入节点: collect_data")
"""收集市场数据和客户信息进行深入分析"""
try:
# 准备提示
prompt = ChatPromptTemplate.from_template(DATA_COLLECTION_PROMPT)
# 构建输入
input_data = {
"user_query": state["user_query"],
"customer_profile": json.dumps(state.get("customer_profile", {}), ensure_ascii=False, indent=2)
}
# 调用LLM
chain = prompt | llm | JsonOutputParser()
result = chain.invoke(input_data)
# 更新状态
return {
**state,
"market_data": result.get("collected_data", {}),
"current_phase": "analyze"
}
except Exception as e:
return {
**state,
"error": f"数据收集阶段出错: {str(e)}",
"current_phase": "collect_data" # 保持在当前阶段
}
# 深度分析 - 分析数据和客户情况
def analyze_data(state: WealthAdvisorState) -> WealthAdvisorState:
print("[DEBUG] 进入节点: analyze_data")
"""进行深度投资分析"""
try:
# 确保必要数据已收集
if not state.get("market_data"):
return {
**state,
"error": "分析阶段缺少市场数据",
"current_phase": "collect_data" # 回到数据收集阶段
}
# 准备提示
prompt = ChatPromptTemplate.from_template(ANALYSIS_PROMPT)
# 构建输入
input_data = {
"user_query": state["user_query"],
"customer_profile": json.dumps(state.get("customer_profile", {}), ensure_ascii=False, indent=2),
"market_data": json.dumps(state.get("market_data", {}), ensure_ascii=False, indent=2)
}
# 调用LLM
chain = prompt | llm | JsonOutputParser()
result = chain.invoke(input_data)
# 更新状态
return {
**state,
"analysis_results": result,
"current_phase": "recommend"
}
except Exception as e:
return {
**state,
"error": f"分析阶段出错: {str(e)}",
"current_phase": "analyze" # 保持在当前阶段
}
# 生成建议 - 根据分析结果提供投资建议
def generate_recommendations(state: WealthAdvisorState) -> WealthAdvisorState:
print("[DEBUG] 进入节点: generate_recommendations")
"""生成投资建议和行动计划"""
try:
# 确保分析结果已存在
if not state.get("analysis_results"):
return {
**state,
"error": "建议生成阶段缺少分析结果",
"current_phase": "analyze" # 回到分析阶段
}
# 准备提示
prompt = ChatPromptTemplate.from_template(RECOMMENDATION_PROMPT)
# 构建输入
input_data = {
"user_query": state["user_query"],
"customer_profile": json.dumps(state.get("customer_profile", {}), ensure_ascii=False, indent=2),
"analysis_results": json.dumps(state.get("analysis_results", {}), ensure_ascii=False, indent=2)
}
# 调用LLM
chain = prompt | llm | StrOutputParser()
result = chain.invoke(input_data)
# 更新状态
return {
**state,
"final_response": result,
"current_phase": "respond"
}
except Exception as e:
return {
**state,
"error": f"建议生成阶段出错: {str(e)}",
"current_phase": "recommend" # 保持在当前阶段
}
# 创建智能体工作流
def create_wealth_advisor_workflow() -> StateGraph:
"""创建财富顾问混合智能体工作流"""
# 创建状态图
workflow = StateGraph(WealthAdvisorState)
# 添加节点,每个节点都确保返回完整的状态
workflow.add_node("assess", assess_query)
workflow.add_node("reactive", reactive_processing)
workflow.add_node("collect_data", collect_data)
workflow.add_node("analyze", analyze_data)
workflow.add_node("recommend", generate_recommendations)
# 定义一个显式的响应节点函数
def respond_function(state: WealthAdvisorState) -> WealthAdvisorState:
"""最终响应生成节点,原样返回状态"""
# 确保final_response字段有值
if not state.get("final_response"):
state = {
**state,
"final_response": "无法生成响应。请检查处理流程。",
"error": state.get("error", "未知错误")
}
return state
workflow.add_node("respond", respond_function)
# 设置入口点
workflow.set_entry_point("assess")
# 添加分支路由
workflow.add_conditional_edges(
"assess",
lambda x: "reactive" if x.get("processing_mode") == "reactive" else "collect_data",
{
"reactive": "reactive",
"collect_data": "collect_data"
}
)
# 添加固定路径边
workflow.add_edge("reactive", "respond")
workflow.add_edge("collect_data", "analyze")
workflow.add_edge("analyze", "recommend")
workflow.add_edge("recommend", "respond")
workflow.add_edge("respond", END)
# 编译工作流
return workflow.compile()
# 示例客户画像数据
SAMPLE_CUSTOMER_PROFILES = {
"customer1": {
"customer_id": "C10012345",
"risk_tolerance": "平衡型",
"investment_horizon": "中期",
"financial_goals": ["退休规划", "子女教育金"],
"investment_preferences": ["ESG投资", "科技行业"],
"portfolio_value": 1500000.0,
"current_allocations": {
"股票": 0.40,
"债券": 0.30,
"现金": 0.10,
"另类投资": 0.20
}
},
"customer2": {
"customer_id": "C10067890",
"risk_tolerance": "进取型",
"investment_horizon": "长期",
"financial_goals": ["财富增长", "资产配置多元化"],
"investment_preferences": ["新兴市场", "高成长行业"],
"portfolio_value": 3000000.0,
"current_allocations": {
"股票": 0.65,
"债券": 0.15,
"现金": 0.05,
"另类投资": 0.15
}
}
}
# 运行智能体
def run_wealth_advisor(user_query: str, customer_id: str = "customer1") -> Dict[str, Any]:
"""运行财富顾问智能体并返回结果"""
# 创建工作流
agent = create_wealth_advisor_workflow()
# 获取客户画像
customer_profile = SAMPLE_CUSTOMER_PROFILES.get(customer_id, SAMPLE_CUSTOMER_PROFILES["customer1"])
# 准备初始状态
initial_state = {
"user_query": user_query,
"customer_profile": customer_profile,
"query_type": None,
"processing_mode": None,
"emergency_response": None,
"market_data": None,
"analysis_results": None,
"final_response": None,
"current_phase": "assess",
"error": None
}
try:
print("LangGraph Mermaid流程图:")
print(agent.get_graph().draw_mermaid())
# 运行智能体并捕获可能的异常
result = agent.invoke(initial_state)
return result
except Exception as e:
error_msg = str(e)
print(f"捕获异常: {error_msg}")
# 返回带有错误信息的状态
return {
**initial_state,
"error": f"执行过程中发生错误: {error_msg}",
"final_response": "很抱歉,处理您的请求时出现了问题。"
}
# 主函数
if __name__ == "__main__":
print("=== 混合智能体 - 财富管理投顾AI助手 ===\n")
print("使用模型:Qwen-Turbo-2025-04-28\n")
print("\n" + "-" * 50 + "\n")
# 示例查询
SAMPLE_QUERIES = [
# 紧急/简单查询 - 适合反应式处理
"今天上证指数的表现如何?",
"我的投资组合中科技股占比是多少?",
"请解释一下什么是ETF?",
# 分析性查询 - 适合深思熟虑处理
"根据当前市场情况,我应该如何调整投资组合以应对可能的经济衰退?",
"考虑到我的退休目标,请评估我当前的投资策略并提供优化建议。",
"我想为子女准备教育金,请帮我设计一个10年期的投资计划。"
]
# 用户选择查询示例或输入自定义查询
print("请选择一个示例查询或输入您自己的查询:\n")
for i, query in enumerate(SAMPLE_QUERIES, 1):
print(f"{i}. {query}")
print("0. 输入自定义查询")
choice = input("\n请输入选项数字(0-6): ")
if choice == "0":
user_query = input("请输入您的查询: ")
else:
try:
idx = int(choice) - 1
if 0 <= idx < len(SAMPLE_QUERIES):
user_query = SAMPLE_QUERIES[idx]
else:
print("无效选择,使用默认查询")
user_query = SAMPLE_QUERIES[0]
except ValueError:
print("无效输入,使用默认查询")
user_query = SAMPLE_QUERIES[0]
# 选择客户
customer_id = "customer1" # 默认客户
customer_choice = input("\n选择客户 (1: 平衡型投资者, 2: 进取型投资者): ")
if customer_choice == "2":
customer_id = "customer2"
print(f"\n用户查询: {user_query}")
print(f"选择客户: {SAMPLE_CUSTOMER_PROFILES[customer_id]['risk_tolerance']} 投资者")
print("\n正在处理...\n")
try:
# 运行智能体
start_time = datetime.now()
result = run_wealth_advisor(user_query, customer_id)
end_time = datetime.now()
# 如果有错误,显示错误信息并退出
if result.get("error"):
print(f"处理过程中发生错误: {result['error']}")
print(f"\n最终响应: {result.get('final_response', '未能生成响应')}")
process_time = (end_time - start_time).total_seconds()
print(f"\n处理用时: {process_time:.2f}秒")
exit(1)
# 显示处理模式
process_mode = result.get("processing_mode", "未知")
if process_mode == "reactive":
print("【处理模式: 反应式】- 快速响应简单查询")
else:
print("【处理模式: 深思熟虑】- 深度分析复杂查询")
# 显示结果
print("\n=== 响应结果 ===\n")
print(result.get("final_response", "未生成响应"))
# 显示处理时间
process_time = (end_time - start_time).total_seconds()
print(f"\n处理用时: {process_time:.2f}秒")
except Exception as e:
print(f"\n运行过程中发生意外错误: {str(e)}")
附:运行环境
- Python 3.9+
langgraphlangchainlangchain-communitydashscope(通义千问 SDK)
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐

所有评论(0)