代码实现:

import os
import asyncio
from typing import Optional
import dashscope
from qwen_agent.agents import Assistant
from qwen_agent.gui import WebUI
import pandas as pd
from sqlalchemy import create_engine
from qwen_agent.tools.base import BaseTool, register_tool
import matplotlib.pyplot as plt
import io
import base64
import time
import numpy as np

# 解决中文显示问题
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'SimSun', 'Arial Unicode MS']  # 优先使用的中文字体
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题

# 定义资源文件根目录
ROOT_RESOURCE = os.path.join(os.path.dirname(__file__), 'resource')

# 配置 DashScope
dashscope.api_key = os.getenv('DASHSCOPE_API_KEY', '')  # 从环境变量获取 API Key
dashscope.timeout = 30  # 设置超时时间为 30 秒

# ====== 门票助手 system prompt 和函数描述 ======
system_prompt = """我是门票助手,以下是关于门票订单表相关的字段,我可能会编写对应的SQL,对数据进行查询
-- 门票订单表
CREATE TABLE tkt_orders (
    order_time DATETIME,             -- 订单日期
    account_id INT,                  -- 预定用户ID
    gov_id VARCHAR(18),              -- 商品使用人ID(身份证号)
    gender VARCHAR(10),              -- 使用人性别
    age INT,                         -- 年龄
    province VARCHAR(30),           -- 使用人省份
    SKU VARCHAR(100),                -- 商品SKU名
    product_serial_no VARCHAR(30),  -- 商品ID
    eco_main_order_id VARCHAR(20),  -- 订单ID
    sales_channel VARCHAR(20),      -- 销售渠道
    status VARCHAR(30),             -- 商品状态
    order_value DECIMAL(10,2),       -- 订单金额
    quantity INT                     -- 商品数量
);
一日门票,对应多种SKU:
Universal Studios Beijing One-Day Dated Ticket-Standard
Universal Studios Beijing One-Day Dated Ticket-Child
Universal Studios Beijing One-Day Dated Ticket-Senior
二日门票,对应多种SKU:
USB 1.5-Day Dated Ticket Standard
USB 1.5-Day Dated Ticket Discounted
一日门票、二日门票查询
SUM(CASE WHEN SKU LIKE 'Universal Studios Beijing One-Day%' THEN quantity ELSE 0 END) AS one_day_ticket_sales,
SUM(CASE WHEN SKU LIKE 'USB%' THEN quantity ELSE 0 END) AS two_day_ticket_sales
我将回答用户关于门票相关的问题

每当 exc_sql 工具返回 markdown 表格和图片时,你必须原样输出工具返回的全部内容(包括图片 markdown),不要只总结表格,也不要省略图片。这样用户才能直接看到表格和图片。
"""

functions_desc = [
    {
        "name": "exc_sql",
        "description": "对于生成的SQL,进行SQL查询",
        "parameters": {
            "type": "object",
            "properties": {
                "sql_input": {
                    "type": "string",
                    "description": "生成的SQL语句",
                }
            },
            "required": ["sql_input"],
        },
    },
]

# ====== 会话隔离 DataFrame 存储 ======
# 用于存储每个会话的 DataFrame,避免多用户数据串扰
_last_df_dict = {}


def get_session_id(kwargs):
    """根据 kwargs 获取当前会话的唯一 session_id,这里用 messages 的 id"""
    messages = kwargs.get('messages')
    if messages is not None:
        return id(messages)
    return None


# ====== exc_sql 工具类实现 ======
@register_tool('exc_sql')
class ExcSQLTool(BaseTool):
    """
    SQL查询工具,执行传入的SQL语句并返回结果,并自动进行可视化。
    """
    description = '对于生成的SQL,进行SQL查询,并自动可视化'
    parameters = [{
        'name': 'sql_input',
        'type': 'string',
        'description': '生成的SQL语句',
        'required': True
    }]

    def call(self, params: str, **kwargs) -> str:
        import json
        import matplotlib.pyplot as plt
        import io, os, time
        import numpy as np
        args = json.loads(params)
        sql_input = args['sql_input']
        database = args.get('database', 'ubr')
        engine = create_engine(
            f'mysql+mysqlconnector://student123:student321@rm-uf6z891lon6dxuqblqo.mysql.rds.aliyuncs.com:3306/{database}?charset=utf8mb4',
            connect_args={'connect_timeout': 10}, pool_size=10, max_overflow=20
        )
        try:
            df = pd.read_sql(sql_input, engine)
            md = df.head(10).to_markdown(index=False)
            # 自动创建目录
            save_dir = os.path.join(os.path.dirname(__file__), 'image_show')
            os.makedirs(save_dir, exist_ok=True)
            filename = f'bar_{int(time.time() * 1000)}.png'
            save_path = os.path.join(save_dir, filename)
            # 生成图表
            generate_chart_png(df, save_path)
            img_path = os.path.join('image_show', filename)
            img_md = f'![柱状图]({img_path})'
            return f"{md}\n\n{img_md}"
        except Exception as e:
            return f"SQL执行或可视化出错: {str(e)}"


# ========== 通用可视化函数 ==========
def generate_chart_png(df_sql, save_path):
    columns = df_sql.columns
    x = np.arange(len(df_sql))
    # 获取object类型
    object_columns = df_sql.select_dtypes(include='O').columns.tolist()
    if columns[0] in object_columns:
        object_columns.remove(columns[0])
    num_columns = df_sql.select_dtypes(exclude='O').columns.tolist()
    if len(object_columns) > 0:
        # 对数据进行透视,以便为每个日期和销售渠道创建堆积柱状图
        pivot_df = df_sql.pivot_table(index=columns[0], columns=object_columns,
                                      values=num_columns,
                                      fill_value=0)
        # 绘制堆积柱状图
        fig, ax = plt.subplots(figsize=(10, 6))
        # 为每个销售渠道和票类型创建柱状图
        bottoms = None
        for col in pivot_df.columns:
            ax.bar(pivot_df.index, pivot_df[col], bottom=bottoms, label=str(col))
            if bottoms is None:
                bottoms = pivot_df[col].copy()
            else:
                bottoms += pivot_df[col]
    else:
        print('进入到else...')
        bottom = np.zeros(len(df_sql))
        for column in columns[1:]:
            plt.bar(x, df_sql[column], bottom=bottom, label=column)
            bottom += df_sql[column]
        plt.xticks(x, df_sql[columns[0]])
    plt.legend()
    plt.title("销售统计")
    plt.xlabel(columns[0])
    plt.ylabel("门票数量")
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig(save_path)
    plt.close()


# ====== 初始化门票助手服务 ======
def init_agent_service():
    """初始化门票助手服务"""
    llm_cfg = {
        'model': 'qwen-turbo-2025-04-28',
        'timeout': 30,
        'retry_count': 3,
    }
    try:
        bot = Assistant(
            llm=llm_cfg,
            name='门票助手',
            description='门票查询与订单分析',
            system_message=system_prompt,
            function_list=['exc_sql'],  # 移除绘图工具
        )
        print("助手初始化成功!")
        return bot
    except Exception as e:
        print(f"助手初始化失败: {str(e)}")
        raise


def app_tui():
    """终端交互模式

    提供命令行交互界面,支持:
    - 连续对话
    - 文件输入
    - 实时响应
    """
    try:
        # 初始化助手
        bot = init_agent_service()

        # 对话历史
        messages = []
        while True:
            try:
                # 获取用户输入
                query = input('user question: ')
                # 获取可选的文件输入
                file = input('file url (press enter if no file): ').strip()

                # 输入验证
                if not query:
                    print('user question cannot be empty!')
                    continue

                # 构建消息
                if not file:
                    messages.append({'role': 'user', 'content': query})
                else:
                    messages.append({'role': 'user', 'content': [{'text': query}, {'file': file}]})

                print("正在处理您的请求...")
                # 运行助手并处理响应
                response = []
                for response in bot.run(messages):
                    print('bot response:', response)
                messages.extend(response)
            except Exception as e:
                print(f"处理请求时出错: {str(e)}")
                print("请重试或输入新的问题")
    except Exception as e:
        print(f"启动终端模式失败: {str(e)}")


def app_gui():
    """图形界面模式,提供 Web 图形界面"""
    try:
        print("正在启动 Web 界面...")
        # 初始化助手
        bot = init_agent_service()
        # 配置聊天界面,列举3个典型门票查询问题
        chatbot_config = {
            'prompt.suggestions': [
                '2023年4、5、6月一日门票,二日门票的销量多少?帮我按照周进行统计',
                '2023年7月的不同省份的入园人数统计',
                '帮我查看2023年10月1-7日销售渠道订单金额排名',
            ]
        }
        print("Web 界面准备就绪,正在启动服务...")
        # 启动 Web 界面
        WebUI(
            bot,
            chatbot_config=chatbot_config
        ).run()
    except Exception as e:
        print(f"启动 Web 界面失败: {str(e)}")
        print("请检查网络连接和 API Key 配置")


if __name__ == '__main__':
    # 运行模式选择
    app_gui()  # 图形界面模式(默认)

在这里插入图片描述

1. 核心架构:大脑与手脚

代码中,Qwen-Agent 把复杂的 AI 应用拆解成了两个清晰的部分:

  • 大脑(Assistant):负责理解用户意图、生成 SQL、决定是否调用工具。
  • 手脚(Tools):负责连接真实的数据库,执行 SQL 并返回数据。
A. 赋予“大脑”上下文 (System Prompt)

在代码中,system_prompt 起到了至关重要的作用。

bot = Assistant(
    ...,
    system_message=system_prompt, # 这里注入了数据库表结构
    ...
)
  • 作用:Qwen-Agent 会自动将这段包含 CREATE TABLE tkt_orders... 的信息作为系统指令发送给模型。
  • 意义:这让大模型变成了“数据库专家”。它不需要真正看到数据库里的数据,只需要知道表结构(Schema),就能写出正确的 SQL 语句。
B. 赋予“手脚”工具能力 (Tool Registration)

这是 Qwen-Agent 最便利的地方。定义了一个类 ExcSQLTool,并用装饰器注册了它。

@register_tool('exc_sql')  # 1. 注册工具名
class ExcSQLTool(BaseTool):
    # 2. 定义工具描述(给模型看的)
    description = '对于生成的SQL,进行SQL查询'
    parameters = [...] # 3. 定义参数结构

    # 4. 定义具体执行逻辑(Python代码)
    def call(self, params: str, **kwargs) -> str:
        # 连接数据库、执行查询、返回Markdown表格
        ...
  • 自动化 Schema 生成:不需要像原生 Function Calling 那样手写复杂的 JSON Schema 字典传给模型。Qwen-Agent 会读取 parameters 属性,自动转换成模型能理解的格式。
  • 参数自动解析:当模型决定调用工具时,框架会自动解析模型返回的 JSON 字符串,并将其传入 call 函数的 params 参数中。

2. 自动化闭环 (The Loop)

app_tuiapp_gui 中,你只写了一行核心代码:

bot.run(messages)

这一个 run 函数背后,Qwen-Agent 帮你默默完成了以下复杂流程(ReAct Loop):

  1. 思考:将用户问题(“2023年7月各省份人数”)+ 表结构发给模型。模型判断需要写 SQL。
  2. 生成:模型生成 SQL 语句,并发出调用 exc_sql 工具的指令。
  3. 路由 & 执行:框架捕获到调用指令,找到你写的 ExcSQLTool 类,执行 call 函数(连接MySQL 数据库查询)。
  4. 观察:数据库返回 DataFrame,你的代码将其转为 Markdown 表格。
  5. 回复:框架将查询结果(表格数据)再次发给模型。模型阅读数据,生成最终的自然语言回答(“根据数据,7月份山东省人数最多…”)。

3. 可视化流程图

实际的运行流程如下:
在这里插入图片描述

4. 代码中的亮点与优势

相比于直接使用 OpenAI SDK,这段代码体现了以下优势:

  1. 零胶水代码

    • 没有写 if tool_calls: 的判断逻辑。
    • 没有写 messages.append(...) 来拼接工具结果。
    • 所有对话历史管理、工具调用链都由 Assistant 类内部处理了。
  2. 极速 UI 开发

    WebUI(bot, chatbot_config=chatbot_config).run()
    
    • 仅仅这一行代码,就提供了一个类似 ChatGPT 的网页界面。
    • 它支持流式输出、支持渲染 Markdown 表格(这对于 SQL 查询结果非常重要,因为 df.to_markdown() 生成的表格可以在 WebUI 中完美展示)。
  3. 结构化数据处理

    • 代码中利用 pandas 处理 SQL 结果,并截取前10行 (df.head(10))。防止过大的 Context Window 消耗,同时让 LLM 更容易阅读数据。

总结

这段代码是一个标准的 RAG (Retrieval-Augmented Generation) + Tool Use 案例。

  • RAG 的体现:虽然没有用向量库,但把 Schema 放在 Prompt 里,属于 Context RAG。
  • Agent 的体现:模型不仅仅是回答问题,而是拥有了“操作数据库”的实体能力。

Qwen-Agent 在这里充当了操作系统的角色,管理着用户、模型和数据库之间的所有交互细节。

相关资源,百度网盘:https://pan.baidu.com/s/1aO4rifTeo0pyoGkOuQ1R7w?pwd=2r5d

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐