GraphMindStudio工作流节点关系算法
它提供了一套完整的工具来创建、连接和配置复杂的工作流链,支持批量处理、键映射和数据流分析等功能。CreateStringWorkflowConfigFromJson() - 从JSON创建字符串工作流配置。GetOutputKeysForNode() - 获取节点的输出键。GetTargetNodesForKey() - 获取键的目标节点。GetInputKeysForNode() - 获取节点的
ModularMethod 模块文档
概述
ModularMethod 是一个用于工作流配置、连接和管理的C#静态类库。它提供了一套完整的工具来创建、连接和配置复杂的工作流链,支持批量处理、键映射和数据流分析等功能。
- 配置参数类 (ModularParameter)
1.1 WorkflowParameterConfig
工作流参数配置主类,用于管理工作流的完整配置。
属性:
Workflows - JobjWorkflow列表,工作流定义集合
PathMappingsJson - 路径映射的JSON字符串
BatchConfigJson - 批量配置的JSON字符串
KeyMappingJson - 键映射的JSON字符串
ChainCount - 链数量,默认为1
AdditionalData - 附加数据字典
主要方法:
ToStringWorkflowConfig() - 转换为StringWorkflowConfig
FromSingleJson() - 从单个JSON配置创建实例
CreateStringWorkflowConfigFromJson() - 从JSON创建字符串工作流配置
1.2 WorkflowItemConfig
工作流项配置,表示单个工作流的基本信息。
属性:
WorkflowName - 工作流名称
NodeJson - 节点JSON配置
ChainCount - 链数量
1.3 UniversalWorkflowConfig
通用工作流配置,用于统一配置格式。
属性:
PathMappings - 路径映射字典
BatchConfig - BatchConfig对象
Workflows - WorkflowItemConfig列表
KeyMapping - 键映射字典
ChainCount - 链数量
1.4 StringWorkflowConfig
字符串工作流配置,用于JSON序列化。
属性:
PathMappingsJson - 路径映射JSON
BatchConfigJson - 批量配置JSON
WorkflowsJson - 工作流JSON
KeyMappingJson - 键映射JSON
ChainCount - 链数量
1.5 JobjWorkflow
JToken格式的工作流包装器。
属性:
NodeJson - JToken格式的节点配置
ChainCount - 链数量
1.6 BatchConfig 和 PathConfiguration
批量配置相关类:
BatchConfig - 批量配置主类
PathConfiguration - 路径配置项
- 工作流连接模块
2.1 WorkflowConnectionConfig
工作流连接配置。
属性:
Workflows - ChainInfo列表
KeyMapping - KeyMappingConfig对象
2.2 WorkflowConnectionResult
工作流连接结果。
属性:
ConnectedWorkflow - 连接后的工作流链
Connections - 连接关系列表
KeyMappingReport - 键映射报告
Success - 是否成功
Message - 消息
2.3 KeyRelationshipGraph
键关系图,用于分析数据流。
方法:
GetInputKeysForNode() - 获取节点的输入键
GetOutputKeysForNode() - 获取节点的输出键
GetSourceNodeForKey() - 获取键的源节点
GetTargetNodesForKey() - 获取键的目标节点
2.4 ChainInfo
工作流链信息。
主要方法:
GetAllNodeCallkeys() - 获取所有节点调用键
GetPredecessorNodes() - 获取前驱节点
GetSuccessorNodes() - 获取后继节点
AnalyzeDataFlow() - 分析数据流
2.5 DataFlowAnalysisResult
数据流分析结果。
属性:
StartNode - 起始节点
KeyRelationships - 键关系列表
DataFlowPaths - 数据流路径列表
AnalysisReport - 分析报告
- 核心工具模块
3.1 SimplifyNodes
简化节点处理工具类。
主要方法:
ParseKeysFromJson() - 从JSON解析键列表
CalculateKeyRelationships() - 计算键关系
CreateChainInfo() - 创建链信息
链创建流程:
解析基础节点配置
为每个链创建节点副本
应用链配置(键后缀、节点连接)
计算节点间的依赖关系
构建完整的链信息
3.2 WorkflowConnector
工作流连接器。
包含子类:
KeyMappingService - 键映射服务
WorkflowConnectTools - 工作流连接工具
WorkflowConnectionOrchestrator - 工作流连接协调器
连接流程:
验证配置
构建节点缓存
合并所有工作流节点
建立工作流连接
执行键映射
生成连接结果
3.3 DataFlowKeyExtractor
数据流键提取器。
主要方法:
GetAllInputKeys() - 获取所有输入键
GetAllOutputKeys() - 获取所有输出键
GetInputKeysByNode() - 按节点获取输入键
GetOutputKeysByNode() - 按节点获取输出键
3.4 DynamicBatchConfigurator
动态批量配置器。
主要方法:
Configure() - 配置批量处理
ConfigureFromJson() - 从JSON配置批量处理
批量配置特性:
支持固定值、路径值和路径键三种值来源
可自定义后缀格式
支持链式批量创建
3.5 StringWorkflowConnector
字符串工作流连接器。
主要方法:
ConfigureWorkflows() - 配置工作流
配置流程:
转换为通用配置格式
应用路径映射
应用批量配置
创建并连接工作流
- 使用示例
4.1 创建简单工作流链
csharp
// 从JSON创建链信息
var json = @“[{”“basicConfig”“:{”“Callkey”“:”“node1"”},…}]";
var chainInfo = SimplifyNodes.CreateChainInfo(json, chainCount: 3, name: “示例链”);
// 分析数据流
var analysisResult = chainInfo.AnalyzeDataFlow();
Console.WriteLine(analysisResult.AnalysisReport);
4.2 连接多个工作流
csharp
// 创建多个工作流
var workflow1 = SimplifyNodes.CreateChainInfo(json1, 2);
var workflow2 = SimplifyNodes.CreateChainInfo(json2, 2);
// 配置键映射
var keyMapping = new Dictionary<string, string>
{
{“output1”, “input1”},
{“output2”, “input2”}
};
// 连接工作流
var result = WorkflowConnectionOrchestrator.ConnectAndRunWorkflows(
new List { workflow1, workflow2 },
keyMapping
);
4.3 批量配置示例
csharp
// 批量配置JSON
var batchConfigJson = @“{
““defaultCount””: 3,
““pathConfigurations””: [{
““targetPrefix””: ““data.items””,
““value””: ““template””,
““valueSource””: ““fixed””,
““suffixFormat””: “”{index}”"
}]
}";
// 应用到数据字典
var data = new Dictionary<string, object>();
DynamicBatchConfigurator.ConfigureFromJson(data, batchConfigJson);
5. 数据流分析
5.1 键关系类型
ParameterKey - 参数键,作为节点输入
ResultKey - 结果键,作为节点输出
5.2 数据流路径分析
系统可以分析从起始节点开始的所有可能数据流路径,检测:
完整路径(到达终端节点)
循环依赖
路径深度
- 配置说明
6.1 JSON配置结构
json
{
“PathMappings”: {…},
“BatchConfig”: {…},
“Workflows”: […],
“KeyMapping”: {…},
“Count”: 2
}
6.2 节点配置要求
每个节点配置必须包含:
basicConfig.Callkey - 节点唯一标识
funcPipelineConfig.ParameterKeys - 参数键列表
funcPipelineConfig.ResultKeys - 结果键列表
nextNodeConfig.NextNodeKey - 下一节点键
- 错误处理
模块提供了完善的错误处理机制:
配置验证失败时抛出ArgumentException
连接失败时返回包含错误信息的WorkflowConnectionResult
JSON解析失败时提供详细的异常信息
- 扩展性
8.1 自定义配置操作
可以通过ChainedNodeConfig.ConfigActions添加自定义节点配置逻辑。
8.2 附加数据支持
WorkflowParameterConfig.AdditionalData可用于存储自定义扩展数据。
- 性能考虑
使用节点缓存提高连接性能
支持批量操作减少重复配置
提供数据流分析优化建议
- 注意事项
所有节点必须有唯一的Callkey
链数量必须大于0
键映射需要源键和目标键都存在
批量配置时要注意路径格式
public static class ModularMethod
{
#region 配置参数类
public static class ModularParameter
{
public class WorkflowParameterConfig
{
public List Workflows { get; set; } = new List();
public string PathMappingsJson { get; set; } = “{}”;
public string BatchConfigJson { get; set; } = “{}”;
public string KeyMappingJson { get; set; } = “{}”;
public int ChainCount { get; set; } = 1;
public Dictionary<string, object> AdditionalData { get; set; } = new Dictionary<string, object>();
public WorkflowParameterConfig() { }
public WorkflowParameterConfig(
List<JobjWorkflow> workflows,
string pathMappingsJson,
string batchConfigJson,
string keyMappingJson,
int chainCount,
Dictionary<string, object> additionalData = null)
{
Workflows = workflows ?? new List<JobjWorkflow>();
PathMappingsJson = pathMappingsJson ?? "{}";
BatchConfigJson = batchConfigJson ?? "{}";
KeyMappingJson = keyMappingJson ?? "{}";
ChainCount = chainCount;
AdditionalData = additionalData ?? new Dictionary<string, object>();
}
public StringWorkflowConfig ToStringWorkflowConfig()
{
var workflowsArray = new JArray(Workflows.Select(w => w.ToJObject()));
string workflowsJson = workflowsArray.ToString(Formatting.None);
return new StringWorkflowConfig
{
PathMappingsJson = PathMappingsJson,
BatchConfigJson = BatchConfigJson,
WorkflowsJson = workflowsJson,
KeyMappingJson = KeyMappingJson,
ChainCount = ChainCount
};
}
public static WorkflowParameterConfig FromSingleJson(string completeConfigJson,
Dictionary<string, JToken> workflowDictionary = null)
{
var jObject = JObject.Parse(completeConfigJson);
int globalCount = jObject["Count"]?.Value<int>() ?? 1;
var workflows = BuildWorkflowsFromConfig(jObject, globalCount, workflowDictionary);
var configurations = ProcessConfigurations(jObject, globalCount);
return new WorkflowParameterConfig(
workflows: workflows,
pathMappingsJson: configurations.pathMappings,
batchConfigJson: configurations.batchConfig,
keyMappingJson: configurations.keyMapping,
chainCount: globalCount
);
}
public static StringWorkflowConfig CreateStringWorkflowConfigFromJson(
string completeConfigJson, Dictionary<string, JToken> workflowDictionary = null)
{
return FromSingleJson(completeConfigJson, workflowDictionary).ToStringWorkflowConfig();
}
private static List<JobjWorkflow> BuildWorkflowsFromConfig(JObject jObject, int globalCount,
Dictionary<string, JToken> workflowDictionary)
{
var workflows = new List<JobjWorkflow>();
if (jObject["Workflows"] is JArray workflowsArray)
{
foreach (var workflowObj in workflowsArray)
{
var workflow = CreateWorkflowFromObject(workflowObj, globalCount, workflowDictionary);
if (workflow != null) workflows.Add(workflow);
}
}
return workflows;
}
private static JobjWorkflow CreateWorkflowFromObject(JToken workflowObj, int globalCount,
Dictionary<string, JToken> workflowDictionary)
{
var workflowKey = workflowObj["WorkflowKey"]?.ToString();
if (workflowDictionary == null || !workflowDictionary.TryGetValue(workflowKey, out JToken nodeJson))
return null;
return new JobjWorkflow(nodeJson, globalCount);
}
private static (string pathMappings, string batchConfig, string keyMapping)
ProcessConfigurations(JObject jObject, int globalCount)
{
return (
pathMappings: GetJsonString(jObject, "PathMappings"),
batchConfig: ProcessBatchConfig(jObject, globalCount),
keyMapping: GetJsonString(jObject, "KeyMapping")
);
}
private static string GetJsonString(JObject jObject, string propertyName)
{
return jObject[propertyName]?.ToString(Formatting.None) ?? "{}";
}
private static string ProcessBatchConfig(JObject jObject, int globalCount)
{
var batchConfigJson = GetJsonString(jObject, "BatchConfig");
if (batchConfigJson == "{}") return batchConfigJson;
var batchConfig = JObject.Parse(batchConfigJson);
batchConfig["defaultCount"] = globalCount;
if (batchConfig["pathConfigurations"] is JArray pathConfigs)
{
foreach (var config in pathConfigs)
{
if (config["customCount"] != null) config["customCount"] = globalCount;
}
}
return batchConfig.ToString(Formatting.None);
}
}
public class WorkflowItemConfig
{
public string WorkflowName { get; set; } = string.Empty;
public string NodeJson { get; set; } = string.Empty;
public int ChainCount { get; set; } = 1;
}
public class UniversalWorkflowConfig
{
public Dictionary<string, string> PathMappings { get; set; } = new Dictionary<string, string>();
public BatchConfig BatchConfig { get; set; } = new BatchConfig();
public List<WorkflowItemConfig> Workflows { get; set; } = new List<WorkflowItemConfig>();
public Dictionary<string, string> KeyMapping { get; set; } = new Dictionary<string, string>();
public int ChainCount { get; set; } = 1;
}
public class StringWorkflowConfig
{
public string PathMappingsJson { get; set; } = "{}";
public string BatchConfigJson { get; set; } = "{}";
public string WorkflowsJson { get; set; } = "[]";
public string KeyMappingJson { get; set; } = "{}";
public int ChainCount { get; set; } = 1;
}
public class JobjWorkflow
{
public JToken NodeJson { get; set; }
public int ChainCount { get; set; }
public JobjWorkflow(JToken nodeJson, int chainCount)
{
NodeJson = nodeJson;
ChainCount = chainCount;
}
public JObject ToJObject()
{
return new JObject
{
["NodeJson"] = NodeJson,
["ChainCount"] = ChainCount
};
}
}
public class PathConfiguration
{
[JsonProperty("targetPrefix")]
public string TargetPrefix { get; set; } = string.Empty;
[JsonProperty("value")]
public string Value { get; set; } = string.Empty;
[JsonProperty("valueSource")]
public string ValueSource { get; set; } = "fixed";
[JsonProperty("suffixFormat")]
public string SuffixFormat { get; set; } = "_{index}_";
[JsonProperty("customCount")]
public int? CustomCount { get; set; }
}
public class BatchConfig
{
[JsonProperty("defaultCount")]
public int DefaultCount { get; set; } = 1;
[JsonProperty("pathConfigurations")]
public List<PathConfiguration> PathConfigurations { get; set; } = new();
}
}
#endregion
#region 工作流连接模块
public class WorkflowConnectionConfig
{
public List<ChainInfo> Workflows { get; set; } = new List<ChainInfo>();
public KeyMappingConfig KeyMapping { get; set; } = new KeyMappingConfig();
}
public class KeyMappingConfig
{
public Dictionary<string, string> CustomMappings { get; set; } = new Dictionary<string, string>();
}
public class WorkflowConnectionResult
{
public ChainInfo ConnectedWorkflow { get; set; }
public List<WorkflowConnection> Connections { get; set; } = new List<WorkflowConnection>();
public KeyMappingReport KeyMappingReport { get; set; }
public bool Success { get; set; }
public string Message { get; set; }
}
public class WorkflowConnection
{
public string FromWorkflow { get; set; }
public string ToWorkflow { get; set; }
public string FromNode { get; set; }
public string ToNode { get; set; }
public List<KeyMapping> KeyMappings { get; set; } = new List<KeyMapping>();
public DateTime ConnectedAt { get; set; } = DateTime.Now;
}
public class KeyMapping
{
public string SourceKey { get; set; }
public string TargetKey { get; set; }
}
public class KeyMappingReport
{
public int TotalMappings { get; set; }
public int SuccessfulMappings { get; set; }
public int FailedMappings { get; set; }
public List<KeyMapping> AllMappings { get; set; } = new List<KeyMapping>();
public List<string> UnmappedKeys { get; set; } = new List<string>();
public string Summary { get; set; }
}
public enum KeyType { ParameterKey, ResultKey }
public class KeyRelationship
{
public string Key { get; set; }
public string SourceNodeCallkey { get; set; }
public List<string> TargetNodeCallkeys { get; set; } = new List<string>();
public KeyType Type { get; set; }
public bool IsExternalInput => string.IsNullOrEmpty(SourceNodeCallkey);
}
public class KeyRelationshipGraph
{
public Dictionary<string, KeyRelationship> KeyRelationships { get; set; } = new Dictionary<string, KeyRelationship>();
public Dictionary<string, List<string>> NodeInputKeys { get; set; } = new Dictionary<string, List<string>>();
public Dictionary<string, List<string>> NodeOutputKeys { get; set; } = new Dictionary<string, List<string>>();
public List<string> GetInputKeysForNode(string nodeCallkey) =>
NodeInputKeys.GetValueOrDefault(nodeCallkey, new List<string>());
public List<string> GetOutputKeysForNode(string nodeCallkey) =>
NodeOutputKeys.GetValueOrDefault(nodeCallkey, new List<string>());
public string GetSourceNodeForKey(string key) =>
KeyRelationships.GetValueOrDefault(key)?.SourceNodeCallkey;
public List<string> GetTargetNodesForKey(string key) =>
KeyRelationships.GetValueOrDefault(key)?.TargetNodeCallkeys ?? new List<string>();
}
public class NodeChainSettings
{
public int ChainIndex { get; set; }
public int NodeIndex { get; set; }
public string KeySuffix { get; set; }
public string NextNodeCallkey { get; set; }
public bool IsLastInChain { get; set; }
public bool IsLastInEntireFlow { get; set; }
}
public class ChainedNodeConfig
{
public string ChaineName { get; set; }
public List<NodeConfig> BaseChainConfig { get; set; }
public List<Func<NodeConfig, NodeConfig>> ConfigActions { get; set; }
public int ChainCount { get; set; }
public string CreatedChain { get; set; }
}
public class DataFlowPath
{
public List<string> Nodes { get; set; } = new List<string>();
public int Depth => Nodes.Count;
public bool IsComplete { get; set; }
public bool HasCycle { get; set; }
}
public class DataFlowAnalysisResult
{
public string StartNode { get; set; } = string.Empty;
public List<KeyRelationship> KeyRelationships { get; set; } = new List<KeyRelationship>();
public List<DataFlowPath> DataFlowPaths { get; set; } = new List<DataFlowPath>();
public string AnalysisReport { get; set; } = string.Empty;
}
public class ChainInfo
{
public string ChainName { get; set; } = null;
public NodeConfig StartNode { get; set; }
public NodeConfig EndNode { get; set; }
public List<NodeConfig> AllNodes { get; set; } = new List<NodeConfig>();
public KeyRelationshipGraph KeyRelationships { get; set; } = new KeyRelationshipGraph();
public List<string> AllParameterKeys { get; set; } = new List<string>();
public List<string> AllResultKeys { get; set; } = new List<string>();
public List<string> GetAllNodeCallkeys() =>
AllNodes?.Select(n => n.basicConfig.Callkey).ToList() ?? new List<string>();
public string GetStartNodeCallkey() => StartNode?.basicConfig.Callkey;
public string GetEndNodeCallkey() => EndNode?.basicConfig.Callkey;
public NodeConfig GetNodeByCallkey(string callkey) =>
AllNodes?.FirstOrDefault(n => n.basicConfig.Callkey == callkey);
public List<NodeConfig> GetPredecessorNodes(string nodeCallkey)
{
var node = GetNodeByCallkey(nodeCallkey);
if (node == null) return new List<NodeConfig>();
return KeyRelationships?.GetInputKeysForNode(nodeCallkey)
.Select(key => KeyRelationships.GetSourceNodeForKey(key))
.Where(callkey => !string.IsNullOrEmpty(callkey))
.Select(GetNodeByCallkey)
.Where(n => n != null)
.Distinct()
.ToList() ?? new List<NodeConfig>();
}
public List<NodeConfig> GetSuccessorNodes(string nodeCallkey)
{
var node = GetNodeByCallkey(nodeCallkey);
if (node == null) return new List<NodeConfig>();
return KeyRelationships?.GetOutputKeysForNode(nodeCallkey)
.SelectMany(key => KeyRelationships.GetTargetNodesForKey(key))
.Select(GetNodeByCallkey)
.Where(n => n != null)
.Distinct()
.ToList() ?? new List<NodeConfig>();
}
public DataFlowAnalysisResult AnalyzeDataFlow()
{
if (KeyRelationships?.KeyRelationships == null || AllNodes?.Count == 0)
return new DataFlowAnalysisResult { AnalysisReport = "无效的链信息" };
var startNode = GetStartNodeCallkey();
if (string.IsNullOrEmpty(startNode))
return new DataFlowAnalysisResult { AnalysisReport = "错误: 无法确定起始节点" };
var result = new DataFlowAnalysisResult
{
StartNode = startNode,
KeyRelationships = KeyRelationships.KeyRelationships.Values.ToList(),
DataFlowPaths = AnalyzeFlowPaths(startNode)
};
result.AnalysisReport = GenerateAnalysisReport(result);
return result;
}
private List<DataFlowPath> AnalyzeFlowPaths(string startNode)
{
var flowPaths = new List<DataFlowPath>();
TraverseDataFlow(startNode, new List<string>(), flowPaths, new HashSet<string>());
return flowPaths;
}
private void TraverseDataFlow(string currentNode, List<string> currentPath,
List<DataFlowPath> flowPaths, HashSet<string> visited)
{
if (visited.Contains(currentNode))
{
var cyclePath = new List<string>(currentPath) { currentNode };
flowPaths.Add(new DataFlowPath { Nodes = cyclePath, IsComplete = false, HasCycle = true });
return;
}
visited.Add(currentNode);
currentPath.Add(currentNode);
var successorNodes = GetSuccessorNodes(currentNode);
if (successorNodes == null || successorNodes.Count == 0)
{
flowPaths.Add(new DataFlowPath { Nodes = new List<string>(currentPath), IsComplete = true, HasCycle = false });
}
else
{
foreach (var successorNode in successorNodes)
{
var nextNodeCallkey = successorNode.basicConfig.Callkey;
if (!string.IsNullOrEmpty(nextNodeCallkey))
{
TraverseDataFlow(nextNodeCallkey, new List<string>(currentPath), flowPaths, new HashSet<string>(visited));
}
}
}
}
private static string GenerateAnalysisReport(DataFlowAnalysisResult result)
{
var output = new StringBuilder();
output.AppendLine("=== 数据流分析 ===");
output.AppendLine($"\n--- 键关系统计 (共{result.KeyRelationships.Count}个) ---");
var externalInputs = result.KeyRelationships.Count(r => r.IsExternalInput);
var internalKeys = result.KeyRelationships.Count - externalInputs;
output.AppendLine($"外部输入键: {externalInputs}个");
output.AppendLine($"内部产生键: {internalKeys}个");
var typeGroups = result.KeyRelationships.GroupBy(r => r.Type);
foreach (var group in typeGroups)
output.AppendLine($"{group.Key}: {group.Count()}个");
output.AppendLine($"\n--- 数据流路径 (从 '{result.StartNode}' 开始, 共{result.DataFlowPaths.Count}条路径) ---");
var hasCycles = result.DataFlowPaths.Any(p => p.HasCycle);
if (hasCycles) output.AppendLine("警告: 发现循环依赖!");
var orderedPaths = result.DataFlowPaths
.OrderByDescending(p => p.Depth)
.ThenBy(p => p.HasCycle)
.ThenBy(p => string.Join("", p.Nodes));
foreach (var path in orderedPaths)
{
var statusIndicator = path.HasCycle ? "[循环] " : (path.IsComplete ? "[完整] " : "[中断] ");
output.AppendLine($"{statusIndicator}{string.Join(" → ", path.Nodes)} (深度: {path.Depth})");
}
var completePaths = result.DataFlowPaths.Count(p => p.IsComplete && !p.HasCycle);
var cyclePaths = result.DataFlowPaths.Count(p => p.HasCycle);
var maxDepth = result.DataFlowPaths.DefaultIfEmpty().Max(p => p?.Depth ?? 0);
output.AppendLine($"\n路径统计: 完整路径{completePaths}条, 循环路径{cyclePaths}条, 最大深度{maxDepth}");
return output.ToString();
}
}
#endregion
#region 核心工具模块
public static class SimplifyNodes
{
public static List<string> ParseKeysFromJson(string keysJson)
{
if (string.IsNullOrEmpty(keysJson)) return new List<string>();
try
{
return JsonTransation.ParseParameterKeys(keysJson);
}
catch
{
try
{
return JsonConvert.DeserializeObject<List<string>>(keysJson) ?? new List<string>();
}
catch
{
return new List<string>();
}
}
}
private static string ProcessKeysWithSuffix(string keysJson, string suffix)
{
if (string.IsNullOrEmpty(keysJson)) return keysJson;
var keys = ParseKeysFromJson(keysJson);
var processedKeys = keys.Select(key => $"{key}._{suffix}_").ToList();
return JsonTransation.StringifyParameterKeys(processedKeys);
}
public static void CalculateKeyRelationships(ChainInfo chainInfo)
{
if (chainInfo?.AllNodes == null) return;
var graph = new KeyRelationshipGraph();
var allParameterKeys = new HashSet<string>();
var allResultKeys = new HashSet<string>();
foreach (var node in chainInfo.AllNodes)
{
var callkey = node.basicConfig.Callkey;
if (string.IsNullOrEmpty(callkey)) continue;
var parameterKeys = ParseKeysFromJson(node.funcPipelineConfig.ParameterKeys);
var resultKeys = ParseKeysFromJson(node.funcPipelineConfig.ResultKeys);
graph.NodeInputKeys[callkey] = parameterKeys;
graph.NodeOutputKeys[callkey] = resultKeys;
allParameterKeys.UnionWith(parameterKeys);
allResultKeys.UnionWith(resultKeys);
foreach (var resultKey in resultKeys)
{
if (!graph.KeyRelationships.ContainsKey(resultKey))
{
graph.KeyRelationships[resultKey] = new KeyRelationship
{
Key = resultKey,
SourceNodeCallkey = callkey,
Type = KeyType.ResultKey
};
}
}
}
foreach (var node in chainInfo.AllNodes)
{
var callkey = node.basicConfig.Callkey;
if (string.IsNullOrEmpty(callkey)) continue;
var parameterKeys = graph.NodeInputKeys[callkey];
foreach (var paramKey in parameterKeys)
{
if (graph.KeyRelationships.TryGetValue(paramKey, out var relationship))
{
if (!relationship.TargetNodeCallkeys.Contains(callkey))
relationship.TargetNodeCallkeys.Add(callkey);
}
else
{
graph.KeyRelationships[paramKey] = new KeyRelationship
{
Key = paramKey,
SourceNodeCallkey = null,
TargetNodeCallkeys = new List<string> { callkey },
Type = KeyType.ParameterKey
};
}
}
}
chainInfo.KeyRelationships = graph;
chainInfo.AllParameterKeys = allParameterKeys.ToList();
chainInfo.AllResultKeys = allResultKeys.ToList();
}
public static ChainInfo CreateChainInfo(string json, int chainCount = 1, string name = null)
{
if (string.IsNullOrEmpty(json) || chainCount < 1)
return CreateEmptyChainInfo();
var baseNodes = JsonConvert.DeserializeObject<List<NodeConfig>>(json);
if (baseNodes?.Count == 0)
return CreateEmptyChainInfo();
var chainConfig = new ChainedNodeConfig
{
ChaineName = name,
BaseChainConfig = baseNodes,
ConfigActions = CreateDefaultConfigActions(baseNodes.Count),
ChainCount = chainCount,
CreatedChain = json
};
var chainInfo = CreateChainedNodes(chainConfig);
DebugPrint(json, true, "CreateChainInfojson");
DebugPrint(baseNodes, true, "baseNodes");
DebugPrint(chainInfo, true, "CreateChainInfo");
CreateConfigsForWorkflow(chainInfo);
CalculateKeyRelationships(chainInfo);
return chainInfo;
}
private static ChainInfo CreateEmptyChainInfo() =>
new ChainInfo { AllNodes = new List<NodeConfig>() };
private static ChainInfo CreateChainedNodes(ChainedNodeConfig chainConfig)
{
if (chainConfig?.ChainCount <= 0 || chainConfig.BaseChainConfig?.Count == 0)
return CreateEmptyChainInfo();
chainConfig.CreatedChain = chainConfig.CreatedChain;
var allChains = new List<List<NodeConfig>>();
for (int chainIndex = 0; chainIndex < chainConfig.ChainCount; chainIndex++)
{
var chainNodes = new List<NodeConfig>();
var baseNodes = chainConfig.BaseChainConfig;
for (int nodeIndex = 0; nodeIndex < baseNodes.Count; nodeIndex++)
{
var node = CreateConfiguredNode(baseNodes[nodeIndex], chainConfig, chainIndex, nodeIndex, chainNodes);
chainNodes.Add(node);
}
allChains.Add(chainNodes);
}
return new ChainInfo
{
ChainName = chainConfig.ChaineName,
StartNode = allChains.FirstOrDefault()?.FirstOrDefault(),
EndNode = allChains.LastOrDefault()?.LastOrDefault(),
AllNodes = allChains.SelectMany(chain => chain).ToList()
};
}
private static NodeConfig CreateConfiguredNode(NodeConfig template, ChainedNodeConfig chainConfig,
int chainIndex, int nodeIndex, List<NodeConfig> currentChain)
{
var node = CreateNodeCopy(template, chainIndex);
var settings = CreateChainSettings(chainConfig, chainIndex, nodeIndex);
ApplyChainConfiguration(node, settings);
ApplyDependencyAndVisibility(node, chainIndex, nodeIndex, currentChain);
ApplyCustomConfiguration(node, chainConfig, nodeIndex);
return node;
}
private static NodeConfig CreateNodeCopy(NodeConfig template, int chainIndex)
{
var node = template.DeepClone();
var identifier = $"{template.Symbol}_c{chainIndex}";
node.Symbol = identifier;
node.basicConfig.Callkey = identifier;
return node;
}
private static NodeChainSettings CreateChainSettings(ChainedNodeConfig chainConfig, int chainIndex, int nodeIndex)
{
var baseNodes = chainConfig.BaseChainConfig;
string nextNodeKey = null;
if (nodeIndex < baseNodes.Count - 1)
{
nextNodeKey = $"{baseNodes[nodeIndex + 1].Symbol}_c{chainIndex}";
}
else if (chainIndex < chainConfig.ChainCount - 1)
{
nextNodeKey = $"{baseNodes[0].Symbol}_c{chainIndex + 1}";
}
return new NodeChainSettings
{
ChainIndex = chainIndex,
NodeIndex = nodeIndex,
KeySuffix = $"{chainIndex + 1}",
NextNodeCallkey = nextNodeKey,
IsLastInChain = nodeIndex == baseNodes.Count - 1,
IsLastInEntireFlow = (chainIndex == chainConfig.ChainCount - 1) && (nodeIndex == baseNodes.Count - 1)
};
}
private static void ApplyChainConfiguration(NodeConfig node, NodeChainSettings settings)
{
DebugPrint(node.nextNodeConfig.UpdateConfig, false, " Anode.nextNodeConfig.UpdateConfig ");
node.funcPipelineConfig.ParameterKeys = ProcessKeysWithSuffix(node.funcPipelineConfig.ParameterKeys, settings.KeySuffix);
node.funcPipelineConfig.ResultKeys = ProcessKeysWithSuffix(node.funcPipelineConfig.ResultKeys, settings.KeySuffix);
node.nextNodeConfig.UpdateConfig = ProcessKeysWithSuffix(node.nextNodeConfig.UpdateConfig, settings.KeySuffix);
node.nextNodeConfig.NextNodeKey = settings.NextNodeCallkey;
node.nextNodeConfig.HandleNextNodeRun = (settings.NextNodeCallkey != null).ToString().ToLower();
}
private static void ApplyDependencyAndVisibility(NodeConfig node, int chainIndex, int nodeIndex, List<NodeConfig> currentChain)
{
if (chainIndex == 0 && nodeIndex == 0) return;
if (nodeIndex > 0 && currentChain.Count > 0)
{
var previousNode = currentChain.Last();
node.getStrConfig.GetConfigSymbol = previousNode.Symbol;
node.addNodeConfig.AddConfigAddKey = previousNode.Symbol;
}
node.basicConfig.Show = "false";
}
private static void ApplyCustomConfiguration(NodeConfig node, ChainedNodeConfig chainConfig, int nodeIndex)
{
if (chainConfig.ConfigActions?.Count > nodeIndex)
chainConfig.ConfigActions[nodeIndex]?.Invoke(node);
}
private static List<Func<NodeConfig, NodeConfig>> CreateDefaultConfigActions(int count)
{
return Enumerable.Range(0, count)
.Select(_ => new Func<NodeConfig, NodeConfig>(config => TemplateConfig(config, "Default")))
.ToList();
}
}
public static class WorkflowConnector
{
public static class KeyMappingService
{
public static List<KeyMapping> MapKeys(List<string> sourceKeys, List<string> targetKeys, KeyMappingConfig config)
{
if (sourceKeys == null || targetKeys == null || config?.CustomMappings == null)
return new List<KeyMapping>();
return config.CustomMappings
.Where(mapping => sourceKeys.Contains(mapping.Key) && targetKeys.Contains(mapping.Value))
.Select(mapping => new KeyMapping { SourceKey = mapping.Key, TargetKey = mapping.Value })
.ToList();
}
}
public static class WorkflowConnectTools
{
private static Dictionary<string, NodeConfig> _nodeCache = new Dictionary<string, NodeConfig>();
public static WorkflowConnectionResult ConnectWorkflows(WorkflowConnectionConfig config)
{
var result = new WorkflowConnectionResult();
try
{
ValidateConfigOrThrow(config);
BuildNodeCache(config.Workflows);
var allNodes = MergeAllWorkflowNodes(config.Workflows);
var connections = EstablishWorkflowConnections(config);
var mappingResults = ExecuteKeyMappingAndUpdateNodes(connections, config.KeyMapping);
result.Connections = connections;
result.KeyMappingReport = GenerateMappingReport(mappingResults);
result.ConnectedWorkflow = CreateConnectedChainInfo(config, allNodes);
result.Success = true;
result.Message = $"成功连接 {config.Workflows.Count} 个工作流";
SimplifyNodes.CalculateKeyRelationships(result.ConnectedWorkflow);
}
catch (Exception ex)
{
result.Success = false;
result.Message = $"连接工作流时出错: {ex.Message}";
}
finally
{
_nodeCache.Clear();
}
return result;
}
private static void ValidateConfigOrThrow(WorkflowConnectionConfig config)
{
if (config == null) throw new ArgumentNullException(nameof(config));
if (config.Workflows == null || config.Workflows.Count < 2)
throw new ArgumentException("至少需要2个工作流进行连接");
for (int i = 0; i < config.Workflows.Count; i++)
{
var workflow = config.Workflows[i];
if (workflow?.AllNodes == null || workflow.AllNodes.Count == 0)
throw new ArgumentException($"第 {i + 1} 个工作流无效");
}
}
private static void BuildNodeCache(List<ChainInfo> workflows)
{
_nodeCache.Clear();
foreach (var workflow in workflows.Where(w => w?.AllNodes != null))
{
foreach (var node in workflow.AllNodes.Where(n => n?.basicConfig?.Callkey != null))
{
_nodeCache[node.basicConfig.Callkey] = node;
}
}
}
private static List<NodeConfig> MergeAllWorkflowNodes(List<ChainInfo> workflows)
{
return workflows
.Where(w => w?.AllNodes != null)
.SelectMany(w => w.AllNodes)
.ToList();
}
private static List<WorkflowConnection> EstablishWorkflowConnections(WorkflowConnectionConfig config)
{
var connections = new List<WorkflowConnection>();
for (int i = 0; i < config.Workflows.Count - 1; i++)
{
var connection = CreateWorkflowConnection(config.Workflows[i], config.Workflows[i + 1], i);
if (connection != null) connections.Add(connection);
}
return connections;
}
private static WorkflowConnection CreateWorkflowConnection(ChainInfo fromWorkflow, ChainInfo toWorkflow, int index)
{
var fromNode = fromWorkflow.EndNode;
var toNode = toWorkflow.StartNode;
if (fromNode == null || toNode == null) return null;
fromNode.nextNodeConfig.NextNodeKey = toNode.basicConfig.Callkey;
fromNode.nextNodeConfig.HandleNextNodeRun = "true";
return new WorkflowConnection
{
FromWorkflow = fromWorkflow.GetStartNodeCallkey() ?? $"Workflow_{index}",
ToWorkflow = toWorkflow.GetStartNodeCallkey() ?? $"Workflow_{index + 1}",
FromNode = fromNode.basicConfig.Callkey,
ToNode = toNode.basicConfig.Callkey
};
}
private static List<KeyMapping> ExecuteKeyMappingAndUpdateNodes(
List<WorkflowConnection> connections, KeyMappingConfig mappingConfig)
{
var allMappings = new List<KeyMapping>();
foreach (var connection in connections)
{
var fromNode = FindNodeByCallkey(connection.FromNode);
var toNode = FindNodeByCallkey(connection.ToNode);
if (fromNode == null || toNode == null) continue;
var sourceKeys = SimplifyNodes.ParseKeysFromJson(fromNode.funcPipelineConfig.ResultKeys);
var targetKeys = SimplifyNodes.ParseKeysFromJson(toNode.funcPipelineConfig.ParameterKeys);
var mappings = KeyMappingService.MapKeys(sourceKeys, targetKeys, mappingConfig);
connection.KeyMappings = mappings;
allMappings.AddRange(mappings);
UpdateNodeParameterKeys(toNode, mappings);
}
return allMappings;
}
private static void UpdateNodeParameterKeys(NodeConfig node, List<KeyMapping> mappings)
{
var originalKeys = SimplifyNodes.ParseKeysFromJson(node.funcPipelineConfig.ParameterKeys);
var updatedKeys = originalKeys.Select(targetKey =>
mappings.FirstOrDefault(m => m.TargetKey == targetKey)?.SourceKey ?? targetKey
).ToList();
node.funcPipelineConfig.ParameterKeys = JsonTransation.StringifyParameterKeys(updatedKeys);
}
private static KeyMappingReport GenerateMappingReport(List<KeyMapping> allMappings)
{
return new KeyMappingReport
{
AllMappings = allMappings,
TotalMappings = allMappings.Count,
SuccessfulMappings = allMappings.Count,
FailedMappings = 0,
Summary = $"键映射完成: 总共 {allMappings.Count} 个映射"
};
}
private static ChainInfo CreateConnectedChainInfo(WorkflowConnectionConfig config, List<NodeConfig> allNodes)
{
return new ChainInfo
{
StartNode = config.Workflows.First().StartNode,
EndNode = config.Workflows.Last().EndNode,
AllNodes = allNodes
};
}
private static NodeConfig FindNodeByCallkey(string callkey) =>
_nodeCache.TryGetValue(callkey, out var node) ? node : null;
public static WorkflowConnectionResult ConnectWorkflowsSimple(List<ChainInfo> workflows)
{
var config = new WorkflowConnectionConfig
{
Workflows = workflows,
KeyMapping = new KeyMappingConfig()
};
return ConnectWorkflows(config);
}
}
public static class WorkflowConnectionOrchestrator
{
public static List<ChainInfo> CreateWorkflowsFromConfig(UniversalWorkflowConfig config)
{
if (config.Workflows == null || config.Workflows.Count == 0)
throw new Exception("工作流配置为空");
return config.Workflows.Select(workflowConfig =>
{
var chainCount = workflowConfig.ChainCount > 0 ? workflowConfig.ChainCount : config.ChainCount;
var workflow = SimplifyNodes.CreateChainInfo(workflowConfig.NodeJson, chainCount);
if (workflow == null) throw new Exception($"创建工作流失败: {workflowConfig.WorkflowName}");
return workflow;
}).ToList();
}
public static ChainInfo ConnectAndRunWorkflows(List<ChainInfo> workflows, Dictionary<string, string> keyMapping)
{
if (workflows == null || workflows.Count == 0)
throw new ArgumentException("工作流列表不能为空");
var connectionConfig = new WorkflowConnectionConfig
{
Workflows = workflows,
KeyMapping = new KeyMappingConfig { CustomMappings = keyMapping ?? new Dictionary<string, string>() }
};
var result = WorkflowConnector.WorkflowConnectTools.ConnectWorkflows(connectionConfig);
if (!result.Success) throw new Exception($"工作流连接失败: {result.Message}");
CreateConfigsForWorkflow(result.ConnectedWorkflow);
return result.ConnectedWorkflow;
}
}
}
public static class DataFlowKeyExtractor
{
public static List<string> GetAllInputKeys(DataFlowAnalysisResult result)
{
return result?.KeyRelationships?
.Where(r => r.IsExternalInput)
.Select(r => r.Key)
.Distinct()
.ToList() ?? new List<string>();
}
public static List<string> GetAllOutputKeys(DataFlowAnalysisResult result)
{
return result?.KeyRelationships?
.Where(r => r.TargetNodeCallkeys == null || !r.TargetNodeCallkeys.Any())
.Select(r => r.Key)
.Distinct()
.ToList() ?? new List<string>();
}
public static Dictionary<string, List<string>> GetInputKeysByNode(DataFlowAnalysisResult result)
{
var nodeInputs = new Dictionary<string, List<string>>();
if (result?.KeyRelationships == null) return nodeInputs;
foreach (var relationship in result.KeyRelationships)
{
if (relationship.TargetNodeCallkeys != null)
{
foreach (var targetNode in relationship.TargetNodeCallkeys)
{
if (!string.IsNullOrEmpty(targetNode))
{
if (!nodeInputs.ContainsKey(targetNode))
nodeInputs[targetNode] = new List<string>();
nodeInputs[targetNode].Add(relationship.Key);
}
}
}
}
return nodeInputs;
}
public static Dictionary<string, List<string>> GetOutputKeysByNode(DataFlowAnalysisResult result)
{
var nodeOutputs = new Dictionary<string, List<string>>();
if (result?.KeyRelationships == null) return nodeOutputs;
foreach (var relationship in result.KeyRelationships)
{
if (!string.IsNullOrEmpty(relationship.SourceNodeCallkey))
{
var sourceNode = relationship.SourceNodeCallkey;
if (!nodeOutputs.ContainsKey(sourceNode))
nodeOutputs[sourceNode] = new List<string>();
nodeOutputs[sourceNode].Add(relationship.Key);
}
}
return nodeOutputs;
}
}
public class DynamicBatchConfigurator
{
public static void Configure(Dictionary<string, object> dictionary, BatchConfig config)
{
if (dictionary == null || config == null) return;
foreach (var pathConfig in config.PathConfigurations)
{
ConfigureSinglePath(dictionary, pathConfig, config.DefaultCount);
}
}
public static void ConfigureFromJson(Dictionary<string, object> dictionary, string jsonString)
{
if (dictionary == null || string.IsNullOrEmpty(jsonString)) return;
try
{
var config = JsonConvert.DeserializeObject<BatchConfig>(jsonString);
if (config != null)
{
Configure(dictionary, config);
}
}
catch (Exception ex)
{
DebugPrint($"批量配置反序列化失败: {ex.Message}");
throw new Exception($"批量配置反序列化失败: {ex.Message}", ex);
}
}
private static void ConfigureSinglePath(Dictionary<string, object> dictionary, PathConfiguration config, int defaultCount)
{
int count = config.CustomCount ?? defaultCount;
if (count <= 0) return;
string actualValue = GetActualValue(dictionary, config);
if (string.IsNullOrEmpty(actualValue) && config.ValueSource != "pathKey") return;
for (int i = 0; i < count; i++)
{
string suffix = FormatSuffix(config.SuffixFormat, i, count);
string fullKey = BuildFullKey(config.TargetPrefix, suffix);
SetValueFrame(dictionary, fullKey, actualValue);
}
}
private static string GetActualValue(Dictionary<string, object> dictionary, PathConfiguration config)
{
return config.ValueSource?.ToLower() switch
{
"fixed" => config.Value,
"path" => GetValueFrame<string>(dictionary, config.Value),
"pathkey" => config.Value,
_ => config.Value
};
}
private static string FormatSuffix(string format, int index, int total)
{
return format.Replace("{index}", (index + 1).ToString())
.Replace("{total}", total.ToString());
}
private static string BuildFullKey(string baseKey, string suffix)
{
if (string.IsNullOrEmpty(baseKey)) return suffix;
if (string.IsNullOrEmpty(suffix)) return baseKey;
return baseKey.EndsWith(".") ? $"{baseKey}{suffix}" : $"{baseKey}.{suffix}";
}
}
public static class StringWorkflowConnector
{
public static ChainInfo ConfigureWorkflows(object config, Dictionary<string, object> targetDic = null)
{
if (config == null) throw new ArgumentNullException(nameof(config));
try
{
var universalConfig = ConvertToUniversalConfig(config);
return ExecuteWorkflowConfiguration(universalConfig, targetDic);
}
catch (Exception ex)
{
throw new Exception($"工作流配置失败: {ex.Message}", ex);
}
}
private static ChainInfo ExecuteWorkflowConfiguration(UniversalWorkflowConfig config, Dictionary<string, object> targetDic)
{
ApplyPathMappings(config, targetDic);
ApplyBatchConfiguration(config, targetDic);
return CreateAndConnectWorkflows(config);
}
private static void ApplyPathMappings(UniversalWorkflowConfig config, Dictionary<string, object> targetDic)
{
if (config.PathMappings != null && targetDic != null)
{
foreach (var mapping in config.PathMappings)
{
SetValueFrame(targetDic, mapping.Key, mapping.Value);
}
}
}
private static void ApplyBatchConfiguration(UniversalWorkflowConfig config, Dictionary<string, object> targetDic)
{
if (config.BatchConfig != null && targetDic != null)
{
DynamicBatchConfigurator.Configure(targetDic, config.BatchConfig);
}
}
private static ChainInfo CreateAndConnectWorkflows(UniversalWorkflowConfig config)
{
var workflows = WorkflowConnectionOrchestrator.CreateWorkflowsFromConfig(config);
return WorkflowConnectionOrchestrator.ConnectAndRunWorkflows(workflows, config.KeyMapping);
}
private static UniversalWorkflowConfig ConvertToUniversalConfig(object config)
{
return config switch
{
string jsonConfig => Deserialize<UniversalWorkflowConfig>(jsonConfig),
UniversalWorkflowConfig uc => uc,
StringWorkflowConfig sc => ConvertFromLegacyConfig(sc),
_ => throw new ArgumentException($"不支持的配置类型: {config.GetType()}")
};
}
private static UniversalWorkflowConfig ConvertFromLegacyConfig(StringWorkflowConfig legacyConfig)
{
return new UniversalWorkflowConfig
{
PathMappings = Deserialize<Dictionary<string, string>>(legacyConfig.PathMappingsJson),
BatchConfig = Deserialize<BatchConfig>(legacyConfig.BatchConfigJson),
Workflows = Deserialize<List<WorkflowItemConfig>>(legacyConfig.WorkflowsJson),
KeyMapping = Deserialize<Dictionary<string, string>>(legacyConfig.KeyMappingJson),
ChainCount = legacyConfig.ChainCount
};
}
}
#endregion
}
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)