基于Qwen-Agent实现门票助手及详解
基于qwen-agent实现门票助手及详解
·
代码实现:
import os
import asyncio
from typing import Optional
import dashscope
from qwen_agent.agents import Assistant
from qwen_agent.gui import WebUI
import pandas as pd
from sqlalchemy import create_engine
from qwen_agent.tools.base import BaseTool, register_tool
import matplotlib.pyplot as plt
import io
import base64
import time
import numpy as np
# 解决中文显示问题
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'SimSun', 'Arial Unicode MS'] # 优先使用的中文字体
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# 定义资源文件根目录
ROOT_RESOURCE = os.path.join(os.path.dirname(__file__), 'resource')
# 配置 DashScope
dashscope.api_key = os.getenv('DASHSCOPE_API_KEY', '') # 从环境变量获取 API Key
dashscope.timeout = 30 # 设置超时时间为 30 秒
# ====== 门票助手 system prompt 和函数描述 ======
system_prompt = """我是门票助手,以下是关于门票订单表相关的字段,我可能会编写对应的SQL,对数据进行查询
-- 门票订单表
CREATE TABLE tkt_orders (
order_time DATETIME, -- 订单日期
account_id INT, -- 预定用户ID
gov_id VARCHAR(18), -- 商品使用人ID(身份证号)
gender VARCHAR(10), -- 使用人性别
age INT, -- 年龄
province VARCHAR(30), -- 使用人省份
SKU VARCHAR(100), -- 商品SKU名
product_serial_no VARCHAR(30), -- 商品ID
eco_main_order_id VARCHAR(20), -- 订单ID
sales_channel VARCHAR(20), -- 销售渠道
status VARCHAR(30), -- 商品状态
order_value DECIMAL(10,2), -- 订单金额
quantity INT -- 商品数量
);
一日门票,对应多种SKU:
Universal Studios Beijing One-Day Dated Ticket-Standard
Universal Studios Beijing One-Day Dated Ticket-Child
Universal Studios Beijing One-Day Dated Ticket-Senior
二日门票,对应多种SKU:
USB 1.5-Day Dated Ticket Standard
USB 1.5-Day Dated Ticket Discounted
一日门票、二日门票查询
SUM(CASE WHEN SKU LIKE 'Universal Studios Beijing One-Day%' THEN quantity ELSE 0 END) AS one_day_ticket_sales,
SUM(CASE WHEN SKU LIKE 'USB%' THEN quantity ELSE 0 END) AS two_day_ticket_sales
我将回答用户关于门票相关的问题
每当 exc_sql 工具返回 markdown 表格和图片时,你必须原样输出工具返回的全部内容(包括图片 markdown),不要只总结表格,也不要省略图片。这样用户才能直接看到表格和图片。
"""
functions_desc = [
{
"name": "exc_sql",
"description": "对于生成的SQL,进行SQL查询",
"parameters": {
"type": "object",
"properties": {
"sql_input": {
"type": "string",
"description": "生成的SQL语句",
}
},
"required": ["sql_input"],
},
},
]
# ====== 会话隔离 DataFrame 存储 ======
# 用于存储每个会话的 DataFrame,避免多用户数据串扰
_last_df_dict = {}
def get_session_id(kwargs):
"""根据 kwargs 获取当前会话的唯一 session_id,这里用 messages 的 id"""
messages = kwargs.get('messages')
if messages is not None:
return id(messages)
return None
# ====== exc_sql 工具类实现 ======
@register_tool('exc_sql')
class ExcSQLTool(BaseTool):
"""
SQL查询工具,执行传入的SQL语句并返回结果,并自动进行可视化。
"""
description = '对于生成的SQL,进行SQL查询,并自动可视化'
parameters = [{
'name': 'sql_input',
'type': 'string',
'description': '生成的SQL语句',
'required': True
}]
def call(self, params: str, **kwargs) -> str:
import json
import matplotlib.pyplot as plt
import io, os, time
import numpy as np
args = json.loads(params)
sql_input = args['sql_input']
database = args.get('database', 'ubr')
engine = create_engine(
f'mysql+mysqlconnector://student123:student321@rm-uf6z891lon6dxuqblqo.mysql.rds.aliyuncs.com:3306/{database}?charset=utf8mb4',
connect_args={'connect_timeout': 10}, pool_size=10, max_overflow=20
)
try:
df = pd.read_sql(sql_input, engine)
md = df.head(10).to_markdown(index=False)
# 自动创建目录
save_dir = os.path.join(os.path.dirname(__file__), 'image_show')
os.makedirs(save_dir, exist_ok=True)
filename = f'bar_{int(time.time() * 1000)}.png'
save_path = os.path.join(save_dir, filename)
# 生成图表
generate_chart_png(df, save_path)
img_path = os.path.join('image_show', filename)
img_md = f''
return f"{md}\n\n{img_md}"
except Exception as e:
return f"SQL执行或可视化出错: {str(e)}"
# ========== 通用可视化函数 ==========
def generate_chart_png(df_sql, save_path):
columns = df_sql.columns
x = np.arange(len(df_sql))
# 获取object类型
object_columns = df_sql.select_dtypes(include='O').columns.tolist()
if columns[0] in object_columns:
object_columns.remove(columns[0])
num_columns = df_sql.select_dtypes(exclude='O').columns.tolist()
if len(object_columns) > 0:
# 对数据进行透视,以便为每个日期和销售渠道创建堆积柱状图
pivot_df = df_sql.pivot_table(index=columns[0], columns=object_columns,
values=num_columns,
fill_value=0)
# 绘制堆积柱状图
fig, ax = plt.subplots(figsize=(10, 6))
# 为每个销售渠道和票类型创建柱状图
bottoms = None
for col in pivot_df.columns:
ax.bar(pivot_df.index, pivot_df[col], bottom=bottoms, label=str(col))
if bottoms is None:
bottoms = pivot_df[col].copy()
else:
bottoms += pivot_df[col]
else:
print('进入到else...')
bottom = np.zeros(len(df_sql))
for column in columns[1:]:
plt.bar(x, df_sql[column], bottom=bottom, label=column)
bottom += df_sql[column]
plt.xticks(x, df_sql[columns[0]])
plt.legend()
plt.title("销售统计")
plt.xlabel(columns[0])
plt.ylabel("门票数量")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(save_path)
plt.close()
# ====== 初始化门票助手服务 ======
def init_agent_service():
"""初始化门票助手服务"""
llm_cfg = {
'model': 'qwen-turbo-2025-04-28',
'timeout': 30,
'retry_count': 3,
}
try:
bot = Assistant(
llm=llm_cfg,
name='门票助手',
description='门票查询与订单分析',
system_message=system_prompt,
function_list=['exc_sql'], # 移除绘图工具
)
print("助手初始化成功!")
return bot
except Exception as e:
print(f"助手初始化失败: {str(e)}")
raise
def app_tui():
"""终端交互模式
提供命令行交互界面,支持:
- 连续对话
- 文件输入
- 实时响应
"""
try:
# 初始化助手
bot = init_agent_service()
# 对话历史
messages = []
while True:
try:
# 获取用户输入
query = input('user question: ')
# 获取可选的文件输入
file = input('file url (press enter if no file): ').strip()
# 输入验证
if not query:
print('user question cannot be empty!')
continue
# 构建消息
if not file:
messages.append({'role': 'user', 'content': query})
else:
messages.append({'role': 'user', 'content': [{'text': query}, {'file': file}]})
print("正在处理您的请求...")
# 运行助手并处理响应
response = []
for response in bot.run(messages):
print('bot response:', response)
messages.extend(response)
except Exception as e:
print(f"处理请求时出错: {str(e)}")
print("请重试或输入新的问题")
except Exception as e:
print(f"启动终端模式失败: {str(e)}")
def app_gui():
"""图形界面模式,提供 Web 图形界面"""
try:
print("正在启动 Web 界面...")
# 初始化助手
bot = init_agent_service()
# 配置聊天界面,列举3个典型门票查询问题
chatbot_config = {
'prompt.suggestions': [
'2023年4、5、6月一日门票,二日门票的销量多少?帮我按照周进行统计',
'2023年7月的不同省份的入园人数统计',
'帮我查看2023年10月1-7日销售渠道订单金额排名',
]
}
print("Web 界面准备就绪,正在启动服务...")
# 启动 Web 界面
WebUI(
bot,
chatbot_config=chatbot_config
).run()
except Exception as e:
print(f"启动 Web 界面失败: {str(e)}")
print("请检查网络连接和 API Key 配置")
if __name__ == '__main__':
# 运行模式选择
app_gui() # 图形界面模式(默认)

1. 核心架构:大脑与手脚
代码中,Qwen-Agent 把复杂的 AI 应用拆解成了两个清晰的部分:
- 大脑(Assistant):负责理解用户意图、生成 SQL、决定是否调用工具。
- 手脚(Tools):负责连接真实的数据库,执行 SQL 并返回数据。
A. 赋予“大脑”上下文 (System Prompt)
在代码中,system_prompt 起到了至关重要的作用。
bot = Assistant(
...,
system_message=system_prompt, # 这里注入了数据库表结构
...
)
- 作用:Qwen-Agent 会自动将这段包含
CREATE TABLE tkt_orders...的信息作为系统指令发送给模型。 - 意义:这让大模型变成了“数据库专家”。它不需要真正看到数据库里的数据,只需要知道表结构(Schema),就能写出正确的 SQL 语句。
B. 赋予“手脚”工具能力 (Tool Registration)
这是 Qwen-Agent 最便利的地方。定义了一个类 ExcSQLTool,并用装饰器注册了它。
@register_tool('exc_sql') # 1. 注册工具名
class ExcSQLTool(BaseTool):
# 2. 定义工具描述(给模型看的)
description = '对于生成的SQL,进行SQL查询'
parameters = [...] # 3. 定义参数结构
# 4. 定义具体执行逻辑(Python代码)
def call(self, params: str, **kwargs) -> str:
# 连接数据库、执行查询、返回Markdown表格
...
- 自动化 Schema 生成:不需要像原生 Function Calling 那样手写复杂的 JSON Schema 字典传给模型。Qwen-Agent 会读取
parameters属性,自动转换成模型能理解的格式。 - 参数自动解析:当模型决定调用工具时,框架会自动解析模型返回的 JSON 字符串,并将其传入
call函数的params参数中。
2. 自动化闭环 (The Loop)
在 app_tui 或 app_gui 中,你只写了一行核心代码:
bot.run(messages)
这一个 run 函数背后,Qwen-Agent 帮你默默完成了以下复杂流程(ReAct Loop):
- 思考:将用户问题(“2023年7月各省份人数”)+ 表结构发给模型。模型判断需要写 SQL。
- 生成:模型生成 SQL 语句,并发出调用
exc_sql工具的指令。 - 路由 & 执行:框架捕获到调用指令,找到你写的
ExcSQLTool类,执行call函数(连接MySQL 数据库查询)。 - 观察:数据库返回 DataFrame,你的代码将其转为 Markdown 表格。
- 回复:框架将查询结果(表格数据)再次发给模型。模型阅读数据,生成最终的自然语言回答(“根据数据,7月份山东省人数最多…”)。
3. 可视化流程图
实际的运行流程如下:
4. 代码中的亮点与优势
相比于直接使用 OpenAI SDK,这段代码体现了以下优势:
-
零胶水代码:
- 没有写
if tool_calls:的判断逻辑。 - 没有写
messages.append(...)来拼接工具结果。 - 所有对话历史管理、工具调用链都由
Assistant类内部处理了。
- 没有写
-
极速 UI 开发:
WebUI(bot, chatbot_config=chatbot_config).run()- 仅仅这一行代码,就提供了一个类似 ChatGPT 的网页界面。
- 它支持流式输出、支持渲染 Markdown 表格(这对于 SQL 查询结果非常重要,因为
df.to_markdown()生成的表格可以在 WebUI 中完美展示)。
-
结构化数据处理:
- 代码中利用
pandas处理 SQL 结果,并截取前10行 (df.head(10))。防止过大的 Context Window 消耗,同时让 LLM 更容易阅读数据。
- 代码中利用
总结
这段代码是一个标准的 RAG (Retrieval-Augmented Generation) + Tool Use 案例。
- RAG 的体现:虽然没有用向量库,但把 Schema 放在 Prompt 里,属于 Context RAG。
- Agent 的体现:模型不仅仅是回答问题,而是拥有了“操作数据库”的实体能力。
Qwen-Agent 在这里充当了操作系统的角色,管理着用户、模型和数据库之间的所有交互细节。
相关资源,百度网盘:https://pan.baidu.com/s/1aO4rifTeo0pyoGkOuQ1R7w?pwd=2r5d
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐

所有评论(0)