智能BI落地的难点,在于对于结果的准确度要高,主要工作会包含数据治理,而非rag知识库搭建,在BI场景中rag知识库可用于企业黑话描述(企业中的术语),来优化LLM理解用户问题。基于这个实现背景下,方案设计如下。

一、核心思想

数据仓库中有上千甚至上万张表,进行分层存储,但是大部分进行数据应用时,只会使用到“应用集市层(ads/dm层)” 或者 “预聚合层(dws层)” 中的几张表。

基于这个思想我们做了以下几个数据治理动作(很重要):

 1. 分段:设计了数据应用概念,每个数据应用中包含多张表(其实用一张宽表对LLM更好,但是不灵活),这个动作和rag知识库录入时的分段很像。
 2. 总结:对数据应用进行描述,类似于给rag段进行总结,LLM进行选择(检索)数据应用时,基于描述选择最适合的应用来解决问题
 3. 案例sql:给数据应用添加案例sql,prompt中暗示LLM一定要参考案例sql进行问题解决
 4. 完善资产:数仓中有些表的描述、血缘不太清楚或者缺失,完善数据表的字段描述含义、不同表之间的字段关联方式。

基于以上设计,评估自研工具和调研开源工具后,引入了OpenMetadata平台,有开源和商业两种方案,以下OpenMetadata工具截图:

为了获取Finebi中的数据集和BI看板资产,基于OpenMetadata api 自研了Finebi connector来同步元数据
请添加图片描述

二、架构设计

2.1、整体架构

  1. 三层架构
    • 表示层(UI):
      • 钉钉机器人聊天窗口(
      • 按照企业日常工作工具进行选择,例如企业微信,slack等
    • 业务逻辑层(BLL):
      • 使用langgraph,dify api、crewai 等其他ai框架开发的Agentic应用
    • 数据访问层(DAL):
      • 使用mcp封装了企业资产平台、数仓平台、报表平台等数据,
      • RAG知识库按需搭建
      • 长短期记忆数据持久化
  2. 运维监控
    • 模型观测:
      • langfuse 记录每个用户的对话记录
  3. 权限控制:
    • 钉钉入口获取员工的工号,作为权限识别ID(根据企业日常工具进行定制开发)

整体架构

三、落地方案

3.1、Langchain ReAct Agentic 实现链路

实现功能:

  1. 钉钉入口,基于用户id识别权限
  2. 自助Finebi看板获取
  3. 基于数据应用的sql生成
  4. 自助Finebi数据集获取、分析
    落地1

3.2、优化:Langgraph Multi ReAct Agentic 设计

Agentic项目结构:

  • agentci_app模块:集成多个ai框架的agent实现
  • service模块:集成用户的交互方式,目前实现的钉钉机器人

上面实现链路中【Agentic Ai (React框架)】节点可以用 Multi ReAct Agentic替换,实现更多的“专家角色” 处理不同领域的问题。

  1. 多agentic方案
    请添加图片描述
  2. Langgraph 实现多agent
async def multi_agentic(user_query, user_id):
    Langfuse(
        secret_key="sk-xxxxx",
        public_key="pk-xxxxx",
        host="http://xxxxxxx"
    )

    langfuse = get_client()
    langfuse_handler = CallbackHandler()
    langfuse.update_current_trace()
    # 获取agent和工具
    llm = dashcope_llm()
    dingding_token = get_dingding_token()
    tools = await get_mcp_tools(dingding_token, user_id)

    # 用户会话记忆线程id配置
    now = datetime.now()
    midnight = datetime(year=now.year, month=now.month, day=now.day, tzinfo=now.tzinfo)
    timestamp = time.mktime(midnight.timetuple())
    config = {
        "configurable": {
            "thread_id":f"{user_id}{int(timestamp)}_3"
        },
        "callbacks": [langfuse_handler],
        "metadata": {
            "langfuse_user_id": "user-id",
        }
    }

    expert_set = {
        "expert_bi": "BI专家",
        "expert_data": "数据专家",
        "expert_classifier": "智能助手"
    }

    expert_bi = create_short_term_expert_node(llm,  bi_prompt, tools)
    expert_data = create_short_term_expert_node(llm, data_prompt, tools)
    expert_classifier = create_short_term_expert_node(llm, classifier_prompt,  [], agent_type="classifier")
    # summarization_node = create_summarization_node(llm)

    DB_URI = f"postgresql://{os.getenv('PG_USER')}:{os.getenv('PG_PASSWORD')}@{os.getenv('PG_HOST')}:{os.getenv('PG_PORT')}/{os.getenv('PG_DB')}?sslmode=disable"
    async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer:
        # await store.setup()
        # 创建graph
        graph_builder = StateGraph(WorkFlowState)

        # ********* 添加节点 *********
        # graph_builder.add_node("summarize", summarization_node)
        graph_builder.add_node("expert_classifier", expert_classifier)
        graph_builder.add_node("expert_bi", expert_bi)
        graph_builder.add_node("expert_data", expert_data)
        graph_builder.add_node("tools_bi", ToolNode(tools))
        graph_builder.add_node("tools_data", ToolNode(tools))

        # ********* 添加边 *********
        # graph_builder.add_edge(START, "summarize")
        # graph_builder.add_edge("summarize", "expert_classifier")
        graph_builder.add_edge(START, "expert_classifier")

        # 路由决策,问题分类
        graph_builder.add_conditional_edges(
            "expert_classifier",
            question_classifier_router,
            {
                "expert_bi": "expert_bi",
                "expert_data": "expert_data",
                "end": END,
            },
        )

        # BI 专家:1、路由决策,是否需要调用工具 2、 反馈思考
        graph_builder.add_conditional_edges("expert_bi",
                                            tools_condition,
                                            {
                                                "tools":"tools_bi",
                                                END:END
                                             }
                                            )
        graph_builder.add_edge("tools_bi", "expert_bi")

        # 数据 专家:1、路由决策,是否需要调用工具 2、 反馈思考
        graph_builder.add_conditional_edges("expert_data",
                                            tools_condition,
                                            {
                                                "tools": "tools_data",
                                                END: END
                                            }
                                            )
        graph_builder.add_edge("tools_data", "expert_data")

        # 编译
        graph = graph_builder.compile(
            checkpointer=checkpointer
        )

        result = ""
        total_tokens = 0
        expert = "智能助手"
        async for chunk in graph.astream({"messages": [{"role": "user", "content": user_query}], "next_agent":"",  "iter_count": 0},
                                         stream_mode='values',
                                         config=config
                                         ):
            for msg in chunk["messages"][-1:]:
                try:
                    expert = expert_set[chunk.get("next_agent", "expert_classifier")]
                except Exception as e:
                    print(e)
                    pass
                if msg.type == "tool":
                    yield f"{msg.name} tool calling..."
                if msg.type == "ai":
                    result = msg.text()
                    try:
                        total_tokens = msg.usage_metadata["total_tokens"]
                        # model = msg.response_metadata["model_name"]
                    except Exception as e:
                        print(e)
                        pass
                    yield f"{expert} thinking..."
        langfuse.flush()
        yield f"【{expert}】:\n\n{result}\n\ntotal_tokens: {total_tokens}"

请添加图片描述

3.3、mcp开发

请添加图片描述
请添加图片描述

四、案例展示

  1. 非数据相关问题

    • 举例:今天天气怎么样
    • 期望结果:不路由到后台专家agent,直接返回
      请添加图片描述
  2. 数据应用问题

    • 举例:我有哪些数据应用
    • 期望结果:数据专家回答,并返回用户权限下的数据应用信息
      请添加图片描述
  3. BI相关问题

    • 举例:我有哪些看板
    • 期望结果:BI专家回答,并返回用户权限下的看板
      请添加图片描述
    • 举例:看下门诊经营分析
    • 期望结果:BI专家回答,并返回用户所需看板
      请添加图片描述
  4. 指定专家处理问题

    • 举例:数据专家告诉我,我问了哪些问题
    • 期望结果:数据专家回答问题(目前记忆按自然天存储,考虑到成本及数据时效性,配置只拉取当天记忆) 请添加图片描述
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐