我来帮您详细分析logger的导入方式、工作流保存变量的原理,以及整个流程,这样您就能更好地分析IndexError问题。

[5 tools called]

现在我来为您详细分析整个流程:

1. Logger导入方式

主要导入方式:

# 在大多数文件中使用
from loguru import logger

# 在settings.py中
from loguru import logger

# 在utils/logger.py中
from loguru import logger

Logger配置:

# src/backend/bisheng/utils/logger.py
def configure(logger_conf: LoggerConf):
    logger.remove()  # 移除默认处理器
    logger.patch(patching)  # 添加序列化补丁
    
    # 配置Rich处理器用于控制台输出
    logger.configure(handlers=[{
        'sink': RichHandler(console=Console(width=300), markup=True),
        'format': logger_conf.format,
        'level': log_level.upper(),
    }])
    
    # 添加文件处理器
    for one in logger_conf.handlers:
        logger.add(**one)

2. 工作流变量保存原理

核心机制:GraphState类

class GraphState(BaseModel):
    # 全局变量池:{node_id: {key: value}}
    variables_pool: Dict[str, Dict[str, Any]] = Field(default_factory=dict)
    
    def set_variable(self, node_id: str, key: str, value: Any):
        """将节点产生的数据放到全局变量里"""
        if node_id not in self.variables_pool:
            self.variables_pool[node_id] = {}
        self.variables_pool[node_id][key] = value
    
    def get_variable(self, node_id: str, key: str, count: Optional[int] = None) -> Any:
        """从全局变量中获取数据"""
        if node_id not in self.variables_pool:
            return None
        return self.variables_pool[node_id].get(key)

3. 完整工作流执行流程

3.1 工作流启动流程

用户点击运行
Celery任务execute_workflow
RedisCallback获取工作流数据
创建Workflow实例
GraphEngine初始化
构建节点和边
执行工作流
节点依次执行
变量保存到GraphState
下一个节点获取变量

3.2 节点执行流程

# 1. 节点初始化
def __init__(self, node_data, workflow_id, user_id, graph_state, target_edges, max_steps, callback):
    self.graph_state = graph_state  # 共享的全局状态
    self.node_params = {}  # 节点参数
    self.other_node_variable = {}  # 其他节点的变量

# 2. 节点运行
def run(self, state: dict) -> Any:
    exec_id = uuid.uuid4().hex
    try:
        result = self._run(exec_id)  # 具体节点逻辑
        # 把节点输出存储到全局变量中
        if result:
            for key, value in result.items():
                self.graph_state.set_variable(self.id, key, value)
    except Exception as e:
        raise e

3.3 变量引用机制

# 节点间变量引用格式:node_id.key#index
def get_variable_by_str(self, contact_key: str) -> Any:
    # 解析格式:node_id.key#index
    tmp_list = contact_key.split('.', 1)
    node_id = tmp_list[0]
    var_key = tmp_list[1]
    
    # 处理数组索引
    if var_key.find('#') != -1:
        var_key, variable_val_index = var_key.split('#')
        variable_val = self.get_variable(node_id, var_key)
        
        if isinstance(variable_val, list):
            variable_val_index = int(variable_val_index)
            if len(variable_val) <= variable_val_index:  # 这里可能出错!
                raise Exception(f'variable {contact_key} index out of range')
            return variable_val[variable_val_index]

4. IndexError问题分析

4.1 问题发生的具体位置

# 文件:src/backend/bisheng/workflow/graph/graph_state.py:75-77
if len(variable_val) <= variable_val_index:
    raise Exception(f'variable {contact_key} index out of range')
return variable_val[variable_val_index]  # 这里抛出IndexError

4.2 可能的原因

  1. 变量未初始化variable_valNone或空列表
  2. 索引超出范围variable_val_index大于等于列表长度
  3. 变量类型错误:期望列表但实际是其他类型
  4. 计算节点引用错误:计算节点引用了不存在的数组变量

4.3 调试建议

添加详细日志:

def get_variable_by_str(self, contact_key: str, history_count: Optional[int] = None) -> Any:
    try:
        tmp_list = contact_key.split('.', 1)
        if len(tmp_list) < 2:
            raise Exception(f'Invalid contact_key format: {contact_key}')
            
        node_id = tmp_list[0]
        var_key = tmp_list[1]
        variable_val_index = None
        
        if var_key.find('#') != -1:
            var_key, variable_val_index = var_key.split('#')
            
        variable_val = self.get_variable(node_id, var_key, history_count)
        
        # 添加详细日志
        logger.debug(f"Getting variable: {contact_key}")
        logger.debug(f"Node ID: {node_id}, Var Key: {var_key}")
        logger.debug(f"Variable value: {variable_val}, Type: {type(variable_val)}")
        logger.debug(f"Index: {variable_val_index}")
        
        if variable_val_index:
            if isinstance(variable_val, list):
                variable_val_index = int(variable_val_index)
                logger.debug(f"List length: {len(variable_val)}, Index: {variable_val_index}")
                if len(variable_val) <= variable_val_index:
                    raise Exception(f'variable {contact_key} index {variable_val_index} out of range, list length: {len(variable_val)}')
                return variable_val[variable_val_index]
            elif isinstance(variable_val, dict):
                return variable_val.get(variable_val_index)
            else:
                raise Exception(f'variable {contact_key} is not a list or dict, not support #index')
        return variable_val
        
    except Exception as e:
        logger.error(f"Error getting variable {contact_key}: {str(e)}")
        logger.error(f"Available variables: {list(self.variables_pool.keys())}")
        raise e

在计算节点中添加验证:

def _parse_code_input(self) -> dict:
    ret = {}
    for one in self._code_input:
        if one["type"] == "ref":
            try:
                variable_value = self.get_other_node_variable(one['value'])
                logger.debug(f"CodeNode {self.name} getting variable {one['value']}: {variable_value}")
                ret[one['key']] = variable_value
            except Exception as e:
                logger.error(f"CodeNode {self.name} error getting variable {one['value']}: {str(e)}")
                ret[one['key']] = ""  # 使用默认值
        else:
            ret[one['key']] = one['value']
    return ret

这样您就能通过日志快速定位是哪个变量引用导致了IndexError,以及具体的错误原因。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐