bisheng 的 智能体执行的流程
1. 前端发送请求 → WorkflowClient初始化2. 检查工作流状态 → 生成唯一ID3. 创建RedisCallback → 设置工作流数据4. 提交Celery任务 → execute_workflow.delay()5. Celery Worker执行 → 创建Workflow实例6. GraphEngine构建 → 解析节点和边关系7. 节点依次执行 → 数据存储到GraphSt
我来帮你分析智能体工作流的整个过程。让我先搜索相关的代码来理解整个流程。
[3 tools called]
现在让我继续搜索更多关键信息,特别是关于节点数据存储、变量传递和线程调用的部分:
[3 tools called]
现在让我读取一些关键文件来更深入了解整个流程:
[3 tools called]
现在让我搜索更多关于节点如何获取其他节点变量的信息:
[2 tools called]
现在让我搜索更多关于节点如何解析模板变量的信息:
[2 tools called]
现在让我创建一个流程图来总结整个智能体工作流的执行过程:
[1 tool called]
基于我对代码的深入分析,让我为你详细解释智能体工作流的整个过程:
智能体工作流执行流程分析
1. 请求入口和客户端初始化
当发送请求到智能体入口时,会执行:
chat_client = WorkflowClient(request, client_key, client_id, chat_id,
login_user.user_id, login_user, work_type, websocket)
流程:
ChatManager.dispatch_client()创建WorkflowClient实例WorkflowClient继承自BaseClient,负责处理WebSocket连接和消息- 客户端通过
handle_message()接收前端消息,提交到线程池执行
2. 工作流初始化过程
关键步骤:
-
状态检查 (
check_status):- 检查工作流是否已在运行
- 生成唯一执行ID (
unique_id)
-
工作流初始化 (
init_workflow):# 创建Redis回调对象 self.workflow = RedisCallback(unique_id, workflow_id, self.chat_id, str(self.user_id)) # 设置工作流数据和状态 self.workflow.set_workflow_data(workflow_data) self.workflow.set_workflow_status(WorkflowStatus.WAITING.value) # 发起Celery异步任务 execute_workflow.delay(unique_id, workflow_id, self.chat_id, str(self.user_id))
3. Celery异步任务执行
Celery任务流程:
-
任务定义 (
execute_workflow):@bisheng_celery.task def execute_workflow(unique_id: str, workflow_id: str, chat_id: str, user_id: str): _execute_workflow(unique_id, workflow_id, chat_id, user_id) -
工作流执行 (
_execute_workflow):- 创建
RedisCallback对象 - 从Redis获取工作流数据
- 创建
Workflow实例 - 调用
workflow.run()执行工作流
- 创建
4. 工作流引擎和节点管理
GraphEngine 核心功能:
-
节点构建 (
build_nodes):- 解析工作流数据中的节点信息
- 为每个节点创建对应的节点实例
- 建立节点之间的连接关系
-
边管理 (
build_edges):- 处理节点之间的连接关系
- 支持条件分支和并行执行
- 处理扇入扇出节点
-
状态管理:
- 使用
GraphState管理全局变量 - 支持节点间的数据传递
- 使用
5. 节点数据存储和变量传递
数据存储机制:
-
全局状态管理 (
GraphState):class GraphState: # 全局变量池: {node_id: {key: value}} variables_pool: Dict[str, Dict[str, Any]] = Field(default_factory=dict) def set_variable(self, node_id: str, key: str, value: Any): """将节点产生的数据放到全局变量里""" if node_id not in self.variables_pool: self.variables_pool[node_id] = {} self.variables_pool[node_id][key] = value -
节点输出存储:
# 在BaseNode.run()中 if result: for key, value in result.items(): self.graph_state.set_variable(self.id, key, value)
6. 节点变量获取机制
变量获取流程:
-
模板变量解析:
def parse_msg_with_variables(self, msg: str) -> (str, list[str]): msg_template = PromptTemplateParser(template=msg) variables = msg_template.extract() # 提取变量如 {{#node_id.key#}} if len(variables) > 0: var_map = {} for one in variables: var_map[one] = self.get_other_node_variable(one) msg = msg_template.format(var_map) return msg, variables -
变量值获取:
def get_other_node_variable(self, variable_key: str) -> Any: """从全局变量中获取其他节点的变量值""" value = self.graph_state.get_variable_by_str(variable_key) self.other_node_variable[variable_key] = value return value
7. 线程池和异步执行
线程管理:
-
线程池初始化:
# 创建线程池管理器 thread_pool = ThreadPoolManager(5) -
任务提交:
# 在BaseClient.handle_message()中 thread_pool.submit(trace_id, self.wrapper_task, trace_id, self._handle_message, message, trace_id=trace_id) -
异步任务处理:
- 支持同步和异步函数
- 自动创建事件循环处理异步任务
- 任务完成后自动清理
8. Redis回调和消息传递
Redis回调机制:
-
状态管理:
# Redis键名 self.workflow_data_key = f'workflow:{unique_id}:data' self.workflow_status_key = f'workflow:{unique_id}:status' self.workflow_event_key = f'workflow:{unique_id}:event' -
事件回调:
on_node_start: 节点开始执行on_node_end: 节点执行结束on_output_msg: 输出消息on_stream_msg: 流式消息
9. 节点执行时机
执行触发条件:
- 首次执行: 工作流启动时自动执行
- 条件分支: 根据条件节点结果决定下一步
- 用户输入: 等待用户输入后继续执行
- 并行执行: 多个节点可以并行执行
10. 完整执行流程总结
1. 前端发送请求 → WorkflowClient初始化
2. 检查工作流状态 → 生成唯一ID
3. 创建RedisCallback → 设置工作流数据
4. 提交Celery任务 → execute_workflow.delay()
5. Celery Worker执行 → 创建Workflow实例
6. GraphEngine构建 → 解析节点和边关系
7. 节点依次执行 → 数据存储到GraphState
8. 变量传递 → 通过模板解析获取其他节点变量
9. Redis回调 → 向前端推送执行状态
10. 工作流完成 → 清理资源
关键特点:
- 异步执行: 使用Celery处理长时间运行的工作流
- 状态持久化: 通过Redis存储工作流状态和数据
- 变量传递: 通过GraphState实现节点间数据共享
- 模板解析: 支持动态变量替换
- 线程池管理: 高效处理并发请求
- 事件驱动: 通过回调机制实现实时状态更新
这就是整个智能体工作流从请求到执行的完整流程,每个环节都有明确的职责分工和数据处理机制。
让我搜索Redis回调向前端推送数据的相关代码:
[2 tools called]
现在让我搜索具体的发送消息到前端的代码:
[2 tools called]
现在让我搜索具体的消息推送机制:
[2 tools called]
现在让我搜索WorkflowClient如何从Redis获取消息并推送给前端:
[2 tools called]
基于我对代码的深入分析,现在我可以详细回答你关于Redis回调向前端推送数据的问题:
Redis回调向前端推送数据的完整流程
1. 消息推送的核心机制
Redis回调 → 前端推送的完整链路:
节点执行 → RedisCallback事件 → Redis存储 → WorkflowClient轮询 → WebSocket推送 → 前端接收
2. 具体的代码实现位置
第一步:RedisCallback发送消息到Redis
在 src/backend/bisheng/worker/workflow/redis_callback.py 中:
def send_chat_response(self, chat_response: ChatResponse):
""" 发送聊天消息 """
self.insert_workflow_response(chat_response.dict()) # 存储到Redis
def insert_workflow_response(self, event: dict):
# 将事件推送到Redis队列
self.redis_client.rpush(self.workflow_event_key, json.dumps(event), expiration=self.workflow_expire_time)
关键Redis键:
workflow_event_key = f'workflow:{unique_id}:event'- 存储事件消息的队列
第二步:WorkflowClient从Redis获取消息
在 src/backend/bisheng/chat/clients/workflow_client.py 中:
async def _workflow_run(self):
# 需要不断从redis中获取workflow返回的消息
async for event in self.workflow.get_response_until_break():
await self.send_json(event) # 发送到前端
第三步:RedisCallback的get_response_until_break方法
在 src/backend/bisheng/worker/workflow/redis_callback.py 中:
async def get_response_until_break(self) -> AsyncIterator[ChatResponse]:
""" 不断获取workflow的response,直到遇到运行结束或者待输入 """
while True:
# 获取工作流状态
status_info = self.get_workflow_status()
if status_info['status'] in [WorkflowStatus.FAILED.value, WorkflowStatus.SUCCESS.value]:
# 工作流结束,获取所有剩余消息
while True:
chat_response = self.get_workflow_response() # 从Redis获取消息
if not chat_response:
break
yield chat_response # 返回给WorkflowClient
break
elif status_info['status'] == WorkflowStatus.INPUT.value:
# 等待用户输入,获取所有消息后暂停
while True:
chat_response = self.get_workflow_response()
if not chat_response:
break
yield chat_response
break
# 继续轮询获取消息
chat_response = self.get_workflow_response()
if chat_response:
yield chat_response
await asyncio.sleep(0.1) # 避免过度轮询
def get_workflow_response(self) -> ChatResponse | None:
# 从Redis队列中弹出消息
response = self.redis_client.lpop(self.workflow_event_key)
if response:
response = ChatResponse(**json.loads(response))
return response
return None
第四步:BaseClient的WebSocket发送机制
在 src/backend/bisheng/chat/clients/base.py 中:
async def send_json(self, message: ChatMessage | dict):
if isinstance(message, dict):
self.ws_msg_queue.put(message) # 放入消息队列
return
self.ws_msg_queue.put(message.dict())
async def consume_message(self):
""" 消费WebSocket消息队列,发送到前端 """
while True:
data = None
try:
data = self.ws_msg_queue.get_nowait()
await self.websocket.send_json(data) # 通过WebSocket发送到前端
except Empty:
pass
except Exception as e:
logger.error(f"consume_message error {data} error: {str(e)}")
break
await asyncio.sleep(0.01)
3. 消息类型和事件
RedisCallback支持的事件类型:
# 节点开始执行
def on_node_start(self, data: NodeStartData):
self.send_chat_response(ChatResponse(...))
# 节点执行结束
def on_node_end(self, data: NodeEndData):
self.send_chat_response(ChatResponse(...))
# 用户输入事件
def on_user_input(self, data: UserInputData):
self.send_chat_response(ChatResponse(...))
# 输出消息
def on_output_msg(self, data: OutputMsgData):
self.send_chat_response(ChatResponse(...))
# 流式消息
def on_stream_msg(self, data: StreamMsgData):
self.send_chat_response(ChatResponse(...))
# 流式消息结束
def on_stream_over(self, data: StreamMsgOverData):
self.send_chat_response(ChatResponse(...))
4. 前端接收机制
前端通过WebSocket接收消息:
在前端代码中(如 src/frontend/platform/src/components/bs-comp/chatComponent/ChatInput.tsx):
const createWebSocket = (chatId) => {
wsRef.current = new WebSocket(wsUrl)
wsRef.current.onmessage = (event) => {
const data = JSON.parse(event.data)
// 处理接收到的消息
handleMessage(data)
}
}
5. 完整的消息流转过程
1. 节点执行完成 → BaseNode.run()
2. 触发回调事件 → RedisCallback.on_node_end()
3. 创建ChatResponse → send_chat_response()
4. 存储到Redis → insert_workflow_response()
5. WorkflowClient轮询 → get_response_until_break()
6. 从Redis获取 → get_workflow_response()
7. 放入WebSocket队列 → send_json()
8. 消费队列发送 → consume_message()
9. WebSocket推送 → websocket.send_json()
10. 前端接收处理 → onmessage事件
6. 关键特点
- 异步非阻塞: 使用Redis队列实现异步消息传递
- 实时推送: 通过WebSocket实现实时消息推送
- 消息队列: 使用队列机制确保消息顺序和可靠性
- 状态管理: 通过工作流状态控制消息推送时机
- 错误处理: 完善的异常处理和重试机制
这就是Redis回调向前端推送数据的完整实现机制,核心是通过Redis队列作为中间件,WorkflowClient轮询获取消息并通过WebSocket推送给前端。
更多推荐
所有评论(0)