我来帮你分析智能体工作流的整个过程。让我先搜索相关的代码来理解整个流程。

[3 tools called]

现在让我继续搜索更多关键信息,特别是关于节点数据存储、变量传递和线程调用的部分:

[3 tools called]

现在让我读取一些关键文件来更深入了解整个流程:

[3 tools called]

现在让我搜索更多关于节点如何获取其他节点变量的信息:

[2 tools called]

现在让我搜索更多关于节点如何解析模板变量的信息:

[2 tools called]

现在让我创建一个流程图来总结整个智能体工作流的执行过程:

[1 tool called]

基于我对代码的深入分析,让我为你详细解释智能体工作流的整个过程:

智能体工作流执行流程分析

1. 请求入口和客户端初始化

当发送请求到智能体入口时,会执行:

chat_client = WorkflowClient(request, client_key, client_id, chat_id, 
                           login_user.user_id, login_user, work_type, websocket)

流程:

  • ChatManager.dispatch_client() 创建 WorkflowClient 实例
  • WorkflowClient 继承自 BaseClient,负责处理WebSocket连接和消息
  • 客户端通过 handle_message() 接收前端消息,提交到线程池执行

2. 工作流初始化过程

关键步骤:

  1. 状态检查 (check_status):

    • 检查工作流是否已在运行
    • 生成唯一执行ID (unique_id)
  2. 工作流初始化 (init_workflow):

    # 创建Redis回调对象
    self.workflow = RedisCallback(unique_id, workflow_id, self.chat_id, str(self.user_id))
    
    # 设置工作流数据和状态
    self.workflow.set_workflow_data(workflow_data)
    self.workflow.set_workflow_status(WorkflowStatus.WAITING.value)
    
    # 发起Celery异步任务
    execute_workflow.delay(unique_id, workflow_id, self.chat_id, str(self.user_id))
    

3. Celery异步任务执行

Celery任务流程:

  1. 任务定义 (execute_workflow):

    @bisheng_celery.task
    def execute_workflow(unique_id: str, workflow_id: str, chat_id: str, user_id: str):
        _execute_workflow(unique_id, workflow_id, chat_id, user_id)
    
  2. 工作流执行 (_execute_workflow):

    • 创建 RedisCallback 对象
    • 从Redis获取工作流数据
    • 创建 Workflow 实例
    • 调用 workflow.run() 执行工作流

4. 工作流引擎和节点管理

GraphEngine 核心功能:

  1. 节点构建 (build_nodes):

    • 解析工作流数据中的节点信息
    • 为每个节点创建对应的节点实例
    • 建立节点之间的连接关系
  2. 边管理 (build_edges):

    • 处理节点之间的连接关系
    • 支持条件分支和并行执行
    • 处理扇入扇出节点
  3. 状态管理:

    • 使用 GraphState 管理全局变量
    • 支持节点间的数据传递

5. 节点数据存储和变量传递

数据存储机制:

  1. 全局状态管理 (GraphState):

    class GraphState:
        # 全局变量池: {node_id: {key: value}}
        variables_pool: Dict[str, Dict[str, Any]] = Field(default_factory=dict)
        
        def set_variable(self, node_id: str, key: str, value: Any):
            """将节点产生的数据放到全局变量里"""
            if node_id not in self.variables_pool:
                self.variables_pool[node_id] = {}
            self.variables_pool[node_id][key] = value
    
  2. 节点输出存储:

    # 在BaseNode.run()中
    if result:
        for key, value in result.items():
            self.graph_state.set_variable(self.id, key, value)
    

6. 节点变量获取机制

变量获取流程:

  1. 模板变量解析:

    def parse_msg_with_variables(self, msg: str) -> (str, list[str]):
        msg_template = PromptTemplateParser(template=msg)
        variables = msg_template.extract()  # 提取变量如 {{#node_id.key#}}
        
        if len(variables) > 0:
            var_map = {}
            for one in variables:
                var_map[one] = self.get_other_node_variable(one)
            msg = msg_template.format(var_map)
        return msg, variables
    
  2. 变量值获取:

    def get_other_node_variable(self, variable_key: str) -> Any:
        """从全局变量中获取其他节点的变量值"""
        value = self.graph_state.get_variable_by_str(variable_key)
        self.other_node_variable[variable_key] = value
        return value
    

7. 线程池和异步执行

线程管理:

  1. 线程池初始化:

    # 创建线程池管理器
    thread_pool = ThreadPoolManager(5)
    
  2. 任务提交:

    # 在BaseClient.handle_message()中
    thread_pool.submit(trace_id, self.wrapper_task, trace_id, 
                      self._handle_message, message, trace_id=trace_id)
    
  3. 异步任务处理:

    • 支持同步和异步函数
    • 自动创建事件循环处理异步任务
    • 任务完成后自动清理

8. Redis回调和消息传递

Redis回调机制:

  1. 状态管理:

    # Redis键名
    self.workflow_data_key = f'workflow:{unique_id}:data'
    self.workflow_status_key = f'workflow:{unique_id}:status'
    self.workflow_event_key = f'workflow:{unique_id}:event'
    
  2. 事件回调:

    • on_node_start: 节点开始执行
    • on_node_end: 节点执行结束
    • on_output_msg: 输出消息
    • on_stream_msg: 流式消息

9. 节点执行时机

执行触发条件:

  1. 首次执行: 工作流启动时自动执行
  2. 条件分支: 根据条件节点结果决定下一步
  3. 用户输入: 等待用户输入后继续执行
  4. 并行执行: 多个节点可以并行执行

10. 完整执行流程总结

1. 前端发送请求 → WorkflowClient初始化
2. 检查工作流状态 → 生成唯一ID
3. 创建RedisCallback → 设置工作流数据
4. 提交Celery任务 → execute_workflow.delay()
5. Celery Worker执行 → 创建Workflow实例
6. GraphEngine构建 → 解析节点和边关系
7. 节点依次执行 → 数据存储到GraphState
8. 变量传递 → 通过模板解析获取其他节点变量
9. Redis回调 → 向前端推送执行状态
10. 工作流完成 → 清理资源

关键特点:

  • 异步执行: 使用Celery处理长时间运行的工作流
  • 状态持久化: 通过Redis存储工作流状态和数据
  • 变量传递: 通过GraphState实现节点间数据共享
  • 模板解析: 支持动态变量替换
  • 线程池管理: 高效处理并发请求
  • 事件驱动: 通过回调机制实现实时状态更新

这就是整个智能体工作流从请求到执行的完整流程,每个环节都有明确的职责分工和数据处理机制。

让我搜索Redis回调向前端推送数据的相关代码:

[2 tools called]

现在让我搜索具体的发送消息到前端的代码:

[2 tools called]

现在让我搜索具体的消息推送机制:

[2 tools called]

现在让我搜索WorkflowClient如何从Redis获取消息并推送给前端:

[2 tools called]

基于我对代码的深入分析,现在我可以详细回答你关于Redis回调向前端推送数据的问题:

Redis回调向前端推送数据的完整流程

1. 消息推送的核心机制

Redis回调 → 前端推送的完整链路:

节点执行 → RedisCallback事件 → Redis存储 → WorkflowClient轮询 → WebSocket推送 → 前端接收

2. 具体的代码实现位置

第一步:RedisCallback发送消息到Redis

src/backend/bisheng/worker/workflow/redis_callback.py 中:

def send_chat_response(self, chat_response: ChatResponse):
    """ 发送聊天消息 """
    self.insert_workflow_response(chat_response.dict())  # 存储到Redis

def insert_workflow_response(self, event: dict):
    # 将事件推送到Redis队列
    self.redis_client.rpush(self.workflow_event_key, json.dumps(event), expiration=self.workflow_expire_time)

关键Redis键:

  • workflow_event_key = f'workflow:{unique_id}:event' - 存储事件消息的队列
第二步:WorkflowClient从Redis获取消息

src/backend/bisheng/chat/clients/workflow_client.py 中:

async def _workflow_run(self):
    # 需要不断从redis中获取workflow返回的消息
    async for event in self.workflow.get_response_until_break():
        await self.send_json(event)  # 发送到前端
第三步:RedisCallback的get_response_until_break方法

src/backend/bisheng/worker/workflow/redis_callback.py 中:

async def get_response_until_break(self) -> AsyncIterator[ChatResponse]:
    """ 不断获取workflow的response,直到遇到运行结束或者待输入 """
    while True:
        # 获取工作流状态
        status_info = self.get_workflow_status()
        
        if status_info['status'] in [WorkflowStatus.FAILED.value, WorkflowStatus.SUCCESS.value]:
            # 工作流结束,获取所有剩余消息
            while True:
                chat_response = self.get_workflow_response()  # 从Redis获取消息
                if not chat_response:
                    break
                yield chat_response  # 返回给WorkflowClient
            break
            
        elif status_info['status'] == WorkflowStatus.INPUT.value:
            # 等待用户输入,获取所有消息后暂停
            while True:
                chat_response = self.get_workflow_response()
                if not chat_response:
                    break
                yield chat_response
            break
            
        # 继续轮询获取消息
        chat_response = self.get_workflow_response()
        if chat_response:
            yield chat_response
        await asyncio.sleep(0.1)  # 避免过度轮询

def get_workflow_response(self) -> ChatResponse | None:
    # 从Redis队列中弹出消息
    response = self.redis_client.lpop(self.workflow_event_key)
    if response:
        response = ChatResponse(**json.loads(response))
        return response
    return None
第四步:BaseClient的WebSocket发送机制

src/backend/bisheng/chat/clients/base.py 中:

async def send_json(self, message: ChatMessage | dict):
    if isinstance(message, dict):
        self.ws_msg_queue.put(message)  # 放入消息队列
        return
    self.ws_msg_queue.put(message.dict())

async def consume_message(self):
    """ 消费WebSocket消息队列,发送到前端 """
    while True:
        data = None
        try:
            data = self.ws_msg_queue.get_nowait()
            await self.websocket.send_json(data)  # 通过WebSocket发送到前端
        except Empty:
            pass
        except Exception as e:
            logger.error(f"consume_message error {data} error: {str(e)}")
            break
        await asyncio.sleep(0.01)

3. 消息类型和事件

RedisCallback支持的事件类型:

# 节点开始执行
def on_node_start(self, data: NodeStartData):
    self.send_chat_response(ChatResponse(...))

# 节点执行结束  
def on_node_end(self, data: NodeEndData):
    self.send_chat_response(ChatResponse(...))

# 用户输入事件
def on_user_input(self, data: UserInputData):
    self.send_chat_response(ChatResponse(...))

# 输出消息
def on_output_msg(self, data: OutputMsgData):
    self.send_chat_response(ChatResponse(...))

# 流式消息
def on_stream_msg(self, data: StreamMsgData):
    self.send_chat_response(ChatResponse(...))

# 流式消息结束
def on_stream_over(self, data: StreamMsgOverData):
    self.send_chat_response(ChatResponse(...))

4. 前端接收机制

前端通过WebSocket接收消息:

在前端代码中(如 src/frontend/platform/src/components/bs-comp/chatComponent/ChatInput.tsx):

const createWebSocket = (chatId) => {
    wsRef.current = new WebSocket(wsUrl)
    
    wsRef.current.onmessage = (event) => {
        const data = JSON.parse(event.data)
        // 处理接收到的消息
        handleMessage(data)
    }
}

5. 完整的消息流转过程

1. 节点执行完成 → BaseNode.run() 
2. 触发回调事件 → RedisCallback.on_node_end()
3. 创建ChatResponse → send_chat_response()
4. 存储到Redis → insert_workflow_response()
5. WorkflowClient轮询 → get_response_until_break()
6. 从Redis获取 → get_workflow_response()
7. 放入WebSocket队列 → send_json()
8. 消费队列发送 → consume_message()
9. WebSocket推送 → websocket.send_json()
10. 前端接收处理 → onmessage事件

6. 关键特点

  • 异步非阻塞: 使用Redis队列实现异步消息传递
  • 实时推送: 通过WebSocket实现实时消息推送
  • 消息队列: 使用队列机制确保消息顺序和可靠性
  • 状态管理: 通过工作流状态控制消息推送时机
  • 错误处理: 完善的异常处理和重试机制

这就是Redis回调向前端推送数据的完整实现机制,核心是通过Redis队列作为中间件,WorkflowClient轮询获取消息并通过WebSocket推送给前端。

Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐