本篇为spring-ai-alibaba学习系列第四十篇

前面介绍 ParalellExecutorNode 会为后续的 m 个 CoderNode 分配任务

现在来看一下处理型任务的处理节点 coder_{i}

该类节点主要负责执行一些操作,例如执行python代码、调用mcp等

提示词

以下是该文档的中文翻译:

---
当前时间: {{ CURRENT_TIME }}
---

你是一个由 `supervisor` 代理管理的 `coder` 代理。
你是一名专业的软件工程师,精通 Python 脚本编写。你的任务是分析需求、使用 Python 实现高效的解决方案,并清晰地记录你的方法和结果。

# 步骤

1. **需求分析**:仔细审查任务描述,理解目标、约束条件和预期成果。
2. **制定解决方案**:确定任务是否需要 Python。概述实现解决方案所需的步骤。
3. **实现解决方案**:
   - 使用 Python 进行数据分析、算法实现或问题解决。
   - 在 Python 中使用 `print(...)` 打印输出以显示结果或调试值。
4. **列出所有必需的第三方依赖项**:以 `requirements.txt` 的格式列出此 Python 代码所需的所有第三方依赖项,例如 `numpy==2.2.6`。如果没有第三方依赖项,则跳过此步骤。
5. **测试解决方案**:验证实现以确保其满足要求并处理边缘情况。
6. **记录方法论**:提供对你方法的清晰解释,包括你选择背后的原因和所做的任何假设。
7. **展示结果**:清楚地显示最终输出和任何必要的中间结果。

# 注意事项

- 始终确保解决方案高效且遵循最佳实践。
- 优雅地处理边缘情况,例如空文件或缺失输入。
- 在代码中使用注释以提高可读性和可维护性。
- 如果你想查看某个值的输出,必须使用 `print(...)` 将其打印出来。
- 始终且仅使用 Python 进行数学运算。
- 始终使用 `yfinance` 获取金融市场数据:
  - 使用 `yf.download()` 获取历史数据
  - 使用 `Ticker` 对象访问公司信息
  - 使用适当的时间范围进行数据检索
- 所需的 Python 包未预安装,你应该提供 `requirements.txt`:
  - `pandas` 用于数据操作
  - `numpy` 用于数值运算
  - `yfinance` 用于金融市场数据
- 始终以 **{{ locale }}** 语言环境进行输出。
- 如果代码执行失败,在最多重试 3 次后必须中止。

使用方法

spring.ai.alibaba.deepresearch.python-coder 相关配置

Coder节点的Python执行器跑在Docker容器中,需要额外为其配置Docker信息

在配置文件的spring.ai.alibaba.deepreserch.python-coder.docker-host字段中设置DockerHost,默认为unix:///var/run/docker.sock

本项目需要使用python:3-slim镜像创建临时容器,也可以自己定制包含一些常用的第三方库的镜像,第三方库需要安装在镜像的/app/dependency文件夹里,在配置文件中设置spring.ai.alibaba.deepreserch.python-coder.image-name的值指定镜像名称

节点产出

coder_content_{i}:码农节点大模型返回的响应数据

源码跟踪

跟踪:在 DeepResearchConfiguration 中,会根据用户配置的码农节点个数生成 m 个coder_{i} 节点实例,类型为 CoderNode,每个节点有唯一的节点 id;

添加从 paralell_executor 到 coder_{i} 的边和从 coder_{i} 到 research_team 的边,这意味着 m 个 coder_{i} 节点会并行处理

创建时需要4个参数 coderAgent, String.valueOf(i), reflectionProcessor, mcpProviderFactory

i 为当前节点编号

coderAgent:ChatClient类型,添加了 coder 提示词,并注册了一个可以执行 python 代码的工具

reflectionProcessor:反思处理器

mcpProviderFactory:mcp提供者工厂

研究:CoderNode 的 apply 方法整体流程如下:

1)首先获取研究计划中属于当前节点需要处理的步骤

2)若开启反思且步骤状态为待反思,则进入反思处理逻辑,反思通过则修改状态为完成,否则修改状态为待处理

3)修改步骤状态为处理中

4)若开启mcp,将mcp注册进 coderAgent,然后将当前步骤内容及反思内容传入 coderAgent 获取响应

5)根据是否开启反思,将步骤状态修改为待反思或完成

附 apply 方法源码

    public Map<String, Object> apply(OverAllState state) throws Exception {
		logger.info("coder node {} is running for thread: {}", executorNodeId, state.value("thread_id", "__default__"));

		Plan currentPlan = StateUtil.getPlan(state);
		Map<String, Object> updated = new HashMap<>();

		Plan.Step assignedStep = findAssignedStep(currentPlan);

		if (assignedStep == null) {
			logger.info("No remaining steps to be executed by {}", nodeName);
			return updated;
		}

		// Handle reflection logic
		if (reflectionProcessor != null) {
			ReflectionProcessor.ReflectionHandleResult reflectionResult = reflectionProcessor
				.handleReflection(assignedStep, nodeName, "coder");

			if (!ReflectionUtil.shouldContinueAfterReflection(reflectionResult)) {
				logger.debug("Step {} reflection processing completed, skipping execution", assignedStep.getTitle());
				return updated;
			}
		}

		// Mark step as processing
		assignedStep.setExecutionStatus(StateUtil.EXECUTION_STATUS_PROCESSING_PREFIX + nodeName);

		try {
			// Build task messages
			List<Message> messages = List.of(new UserMessage(
					buildTaskMessageWithReflectionHistory(assignedStep, state.value("locale", "en-US"))));
			logger.debug("{} Node message: {}", nodeName, messages);

			// 调用agent
			var requestSpec = coderAgent.prompt().messages(messages);

			// 使用MCP工厂创建MCP客户端
			AsyncMcpToolCallbackProvider mcpProvider = mcpFactory != null
					? mcpFactory.createProvider(state, "coderAgent") : null;
			if (mcpProvider != null) {
				requestSpec = requestSpec.toolCallbacks(mcpProvider.getToolCallbacks());
			}

			// Create stream with error handling
			var streamResult = requestSpec.stream()
				.chatResponse()
				.doOnError(error -> StateUtil.handleStepError(assignedStep, nodeName, error, logger));

			// Add step title
			boolean isReflectionNode = assignedStep.getReflectionHistory() != null
					&& !assignedStep.getReflectionHistory().isEmpty();
			String prefix = isReflectionNode ? StreamNodePrefixEnum.CODER_REFLECT_LLM_STREAM.getPrefix()
					: StreamNodePrefixEnum.CODER_LLM_STREAM.getPrefix();
			String nodeNum = NodeStepTitleUtil.registerStepTitle(state, isReflectionNode, executorNodeId, "Coder",
					assignedStep.getTitle(), prefix);

			logger.info("CoderNode {} starting streaming with key: {}", executorNodeId, nodeNum);

			var generator = StreamingChatGenerator.builder()
				.startingNode(nodeNum)
				.startingState(state)
				.mapResult(response -> {
					// Only handle successful responses - errors are handled in doOnError
					String coderContent = response.getResult().getOutput().getText();
					assignedStep
						.setExecutionStatus(ReflectionUtil.getCompletionStatus(reflectionProcessor != null, nodeName));
					assignedStep.setExecutionRes(Objects.requireNonNull(coderContent));
					logger.info("{} completed, content: {}", nodeName, coderContent);

					updated.put("coder_content_" + executorNodeId, coderContent);
					return updated;
				})
				.buildWithChatResponse(streamResult);

			updated.put("coder_content_" + executorNodeId, generator);
			return updated;
		}
		catch (Exception e) {
			// Handle any exception that occurs before or during stream setup
			StateUtil.handleStepError(assignedStep, nodeName, e, logger);
			return updated;
		}
	}

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐