bisheng 节点执行流程
我来帮您详细分析logger的导入方式、工作流保存变量的原理,以及整个流程,这样您就能更好地分析IndexError问题。
·
我来帮您详细分析logger的导入方式、工作流保存变量的原理,以及整个流程,这样您就能更好地分析IndexError问题。
[5 tools called]
现在我来为您详细分析整个流程:
1. Logger导入方式
主要导入方式:
# 在大多数文件中使用
from loguru import logger
# 在settings.py中
from loguru import logger
# 在utils/logger.py中
from loguru import logger
Logger配置:
# src/backend/bisheng/utils/logger.py
def configure(logger_conf: LoggerConf):
logger.remove() # 移除默认处理器
logger.patch(patching) # 添加序列化补丁
# 配置Rich处理器用于控制台输出
logger.configure(handlers=[{
'sink': RichHandler(console=Console(width=300), markup=True),
'format': logger_conf.format,
'level': log_level.upper(),
}])
# 添加文件处理器
for one in logger_conf.handlers:
logger.add(**one)
2. 工作流变量保存原理
核心机制:GraphState类
class GraphState(BaseModel):
# 全局变量池:{node_id: {key: value}}
variables_pool: Dict[str, Dict[str, Any]] = Field(default_factory=dict)
def set_variable(self, node_id: str, key: str, value: Any):
"""将节点产生的数据放到全局变量里"""
if node_id not in self.variables_pool:
self.variables_pool[node_id] = {}
self.variables_pool[node_id][key] = value
def get_variable(self, node_id: str, key: str, count: Optional[int] = None) -> Any:
"""从全局变量中获取数据"""
if node_id not in self.variables_pool:
return None
return self.variables_pool[node_id].get(key)
3. 完整工作流执行流程
3.1 工作流启动流程
3.2 节点执行流程
# 1. 节点初始化
def __init__(self, node_data, workflow_id, user_id, graph_state, target_edges, max_steps, callback):
self.graph_state = graph_state # 共享的全局状态
self.node_params = {} # 节点参数
self.other_node_variable = {} # 其他节点的变量
# 2. 节点运行
def run(self, state: dict) -> Any:
exec_id = uuid.uuid4().hex
try:
result = self._run(exec_id) # 具体节点逻辑
# 把节点输出存储到全局变量中
if result:
for key, value in result.items():
self.graph_state.set_variable(self.id, key, value)
except Exception as e:
raise e
3.3 变量引用机制
# 节点间变量引用格式:node_id.key#index
def get_variable_by_str(self, contact_key: str) -> Any:
# 解析格式:node_id.key#index
tmp_list = contact_key.split('.', 1)
node_id = tmp_list[0]
var_key = tmp_list[1]
# 处理数组索引
if var_key.find('#') != -1:
var_key, variable_val_index = var_key.split('#')
variable_val = self.get_variable(node_id, var_key)
if isinstance(variable_val, list):
variable_val_index = int(variable_val_index)
if len(variable_val) <= variable_val_index: # 这里可能出错!
raise Exception(f'variable {contact_key} index out of range')
return variable_val[variable_val_index]
4. IndexError问题分析
4.1 问题发生的具体位置
# 文件:src/backend/bisheng/workflow/graph/graph_state.py:75-77
if len(variable_val) <= variable_val_index:
raise Exception(f'variable {contact_key} index out of range')
return variable_val[variable_val_index] # 这里抛出IndexError
4.2 可能的原因
- 变量未初始化:
variable_val为None或空列表 - 索引超出范围:
variable_val_index大于等于列表长度 - 变量类型错误:期望列表但实际是其他类型
- 计算节点引用错误:计算节点引用了不存在的数组变量
4.3 调试建议
添加详细日志:
def get_variable_by_str(self, contact_key: str, history_count: Optional[int] = None) -> Any:
try:
tmp_list = contact_key.split('.', 1)
if len(tmp_list) < 2:
raise Exception(f'Invalid contact_key format: {contact_key}')
node_id = tmp_list[0]
var_key = tmp_list[1]
variable_val_index = None
if var_key.find('#') != -1:
var_key, variable_val_index = var_key.split('#')
variable_val = self.get_variable(node_id, var_key, history_count)
# 添加详细日志
logger.debug(f"Getting variable: {contact_key}")
logger.debug(f"Node ID: {node_id}, Var Key: {var_key}")
logger.debug(f"Variable value: {variable_val}, Type: {type(variable_val)}")
logger.debug(f"Index: {variable_val_index}")
if variable_val_index:
if isinstance(variable_val, list):
variable_val_index = int(variable_val_index)
logger.debug(f"List length: {len(variable_val)}, Index: {variable_val_index}")
if len(variable_val) <= variable_val_index:
raise Exception(f'variable {contact_key} index {variable_val_index} out of range, list length: {len(variable_val)}')
return variable_val[variable_val_index]
elif isinstance(variable_val, dict):
return variable_val.get(variable_val_index)
else:
raise Exception(f'variable {contact_key} is not a list or dict, not support #index')
return variable_val
except Exception as e:
logger.error(f"Error getting variable {contact_key}: {str(e)}")
logger.error(f"Available variables: {list(self.variables_pool.keys())}")
raise e
在计算节点中添加验证:
def _parse_code_input(self) -> dict:
ret = {}
for one in self._code_input:
if one["type"] == "ref":
try:
variable_value = self.get_other_node_variable(one['value'])
logger.debug(f"CodeNode {self.name} getting variable {one['value']}: {variable_value}")
ret[one['key']] = variable_value
except Exception as e:
logger.error(f"CodeNode {self.name} error getting variable {one['value']}: {str(e)}")
ret[one['key']] = "" # 使用默认值
else:
ret[one['key']] = one['value']
return ret
这样您就能通过日志快速定位是哪个变量引用导致了IndexError,以及具体的错误原因。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)