OpenClaw核心架构深度解析
OpenClaw核心架构深度解析
系列导读
欢迎来到OpenClaw技术博客系列。在上一篇《OpenClaw入门指南》中,我们介绍了OpenClaw的基本概念、安装方法和快速上手流程。本篇将深入剖析OpenClaw的核心架构设计,帮助开发者理解这个多渠道AI智能体框架的内部运作机制。
OpenClaw是一个基于TypeScript和Node.js构建的开源AI智能体框架,其核心设计理念是"一次开发,多渠道部署"。通过精巧的三层架构设计,OpenClaw实现了业务逻辑与渠道适配的完美分离,让开发者可以专注于智能体能力的构建,而无需关心底层通信细节。
本文将从架构总览开始,逐层深入解析Gateway网关层、Agent智能体层和Channels渠道层的设计原理与实现细节,并通过实际代码示例展示各组件的协作方式。无论你是想要深入理解OpenClaw的工作原理,还是计划基于OpenClaw进行二次开发,本文都将为你提供全面的技术参考。
第一章:架构总览
1.1 三层架构设计理念
OpenClaw采用经典的三层架构设计,将系统职责清晰划分为三个独立的层次。最上层是Channels渠道层,负责与各种外部平台对接;中间是Gateway网关层,负责连接管理和消息路由;最下层是Agent智能体层,负责核心的AI推理和任务执行。
┌─────────────────────────────────────────────────────────────────┐
│ Channels 渠道层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ │
│ │ WeChat │ │ Slack │ │Telegram │ │ Discord │ │ Web │ │
│ │ Channel │ │ Channel │ │ Channel │ │ Channel │ │Channel │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └───┬────┘ │
└───────┼────────────┼────────────┼────────────┼───────────┼──────┘
│ │ │ │ │
└────────────┴────────────┼────────────┴───────────┘
│
┌─────────────────────────────────┼───────────────────────────────┐
│ Gateway 网关层 │
│ ┌──────────────────────────────┼────────────────────────────┐ │
│ │ WebSocket Server │ │ │
│ │ ┌─────────────────────────┼─────────────────────────┐ │ │
│ │ │ Message Router │ │ │ │
│ │ │ ┌──────────────────────┼───────────────────────┐ │ │ │
│ │ │ │ Session Manager │ │ │ │ │
│ │ │ │ ┌───────────────────┼─────────────────────┐ │ │ │ │
│ │ │ │ │ Connection Pool │ │ │ │ │ │
│ │ │ │ └───────────────────┼─────────────────────┘ │ │ │ │
│ │ │ └──────────────────────┼───────────────────────┘ │ │ │
│ │ └─────────────────────────┼─────────────────────────┘ │ │
│ └──────────────────────────────┼────────────────────────────┘ │
└─────────────────────────────────┼───────────────────────────────┘
│
┌─────────────────────────────────┼───────────────────────────────┐
│ Agent 智能体层 │
│ ┌──────────────────────────────┼────────────────────────────┐ │
│ │ Lobster Loop Engine │ │ │
│ │ ┌─────────────────────────┼─────────────────────────┐ │ │
│ │ │ LLM Provider │ │ │ │
│ │ │ ┌──────────────────────┼───────────────────────┐ │ │ │
│ │ │ │ Tools Registry │ │ │ │ │
│ │ │ │ ┌───────────────────┼─────────────────────┐ │ │ │ │
│ │ │ │ │ Skills Manager │ │ │ │ │ │
│ │ │ │ └───────────────────┼─────────────────────┘ │ │ │ │
│ │ │ └──────────────────────┼───────────────────────┘ │ │ │
│ │ └─────────────────────────┼─────────────────────────┘ │ │
│ └──────────────────────────────┼────────────────────────────┘ │
└─────────────────────────────────┼───────────────────────────────┘
│
▼
┌─────────────────┐
│ External APIs │
│ & Services │
└─────────────────┘
这种分层设计带来了三个核心优势:
第一是关注点分离。每一层只关注自己的核心职责。Gateway专注于连接管理和消息路由,Agent专注于智能决策和任务执行,Channels专注于渠道适配和协议转换。这种清晰的职责划分使得代码更易于理解、测试和维护。
第二是独立演进。各层可以独立升级和扩展。例如,当需要支持新的消息渠道时,只需添加新的Channel适配器,无需修改Gateway或Agent层的代码。同样,Agent层的算法优化也不会影响其他层的稳定性。
第三是水平扩展。三层架构天然支持水平扩展。Gateway可以部署多个实例处理高并发连接,Agent可以独立扩展以应对计算密集型任务,Channels可以根据各渠道的负载情况灵活配置。
1.2 "机场调度中心"架构模型
为了更直观地理解OpenClaw的架构设计,我们可以将其类比为现代机场的调度中心。
Gateway网关层就像机场塔台控制中心。机场塔台负责协调所有航班的起降、分配停机位、管理地面交通。同样,Gateway层负责管理所有客户端连接、路由消息到正确的Agent实例、维护会话状态。塔台不需要知道每架飞机的具体任务,只需要确保它们安全高效地进出机场。Gateway不需要理解消息的业务含义,只需要确保消息准确送达。
Agent智能体层就像航空公司运营中心。航空公司运营中心负责制定航班计划、安排机组人员、处理乘客需求。类似地,Agent层负责理解用户意图、制定执行计划、调用工具完成任务。每家航空公司有自己的运营策略和服务标准,每个Agent实例也有自己的Skills配置和行为模式。
Channels渠道层就像航站楼登机口。不同的航站楼和登机口服务于不同的航空公司和航班类型。国际航站楼处理国际航班,国内航站楼处理国内航班,货运航站楼处理货物运输。同样,不同的Channel适配器处理不同渠道的消息。WeChatChannel处理微信消息,SlackChannel处理Slack消息,TelegramChannel处理Telegram消息。每个登机口都有自己的安检流程和服务规范,每个Channel也有自己的协议转换逻辑和消息格式。
这种类比不仅帮助我们理解架构设计,也揭示了OpenClaw的核心价值。就像机场可以同时服务多家航空公司、多种航班类型一样,OpenClaw可以同时支持多个AI智能体、多个消息渠道,实现真正的"一次开发,多渠道部署"。
1.3 核心数据流
理解OpenClaw架构的关键是掌握数据在各层之间的流动方式。让我们通过一个完整的消息处理流程来理解。
当用户在微信中发送消息"帮我查询明天北京的天气"时,消息首先到达WeChatChannel。WeChatChannel接收微信服务器的Webhook回调,将微信消息格式转换为OpenClaw标准消息格式。这个标准格式包含channel标识、userId、消息内容、时间戳等字段。
转换后的消息被发送到Gateway层。Gateway的Message Router根据消息中的channel和userId信息,找到对应的Agent实例。如果该用户是首次访问,Router会创建新的Agent实例并建立会话。
Session Manager检查是否存在活跃会话。如果不存在则创建新会话,会话中保存了对话历史、用户上下文、当前状态等信息。会话管理确保了多轮对话的连续性。
接下来是Agent层的智能处理。Agent的Lobster Loop开始执行,这是一个迭代循环过程。首先是Think阶段,LLM分析用户意图,决定需要调用天气查询工具。然后是Act阶段,调用天气API获取北京明天的天气数据。接着是Observe阶段,接收API返回的天气数据。最后是Reflect阶段,LLM生成自然语言回复。
Agent将回复消息发送回Gateway。Gateway将消息路由到WeChatChannel,Channel将标准消息格式转换为微信消息格式。最后,WeChatChannel调用微信API将消息发送给用户。
整个流程在毫秒级完成,用户几乎感觉不到延迟。更重要的是,如果用户同时在Slack中发送相同的消息,除了Channel适配器不同外,其他所有处理逻辑完全相同。这就是OpenClaw架构设计的精妙之处。
1.3.1 详细数据流时序图
用户 WeChatChannel Gateway Agent 天气API
│ │ │ │ │
│ "查询明天北京天气" │ │ │ │
├─────────────────────>│ │ │ │
│ │ │ │ │
│ │ 解析微信消息格式 │ │ │
│ ├──────────────────>│ │ │
│ │ │ │ │
│ │ │ 查找/创建Agent实例 │ │
│ │ ├──────────────────>│ │
│ │ │ │ │
│ │ │ │ Think: 分析意图 │
│ │ │ ├──────────────────>│
│ │ │ │ │
│ │ │ │ Act: 调用天气API │
│ │ │ ├──────────────────>│
│ │ │ │ │
│ │ │ │<──────────────────┤
│ │ │ │ 返回天气数据 │
│ │ │ │ │
│ │ │ │ Observe: 处理结果 │
│ │ │ │ │
│ │ │ │ Reflect: 生成回复 │
│ │ │<──────────────────┤ │
│ │ │ │ │
│ │<──────────────────┤ │ │
│ │ │ │ │
│<─────────────────────┤ │ │ │
│ "明天北京晴,25°C" │ │ │ │
│ │ │ │ │
1.3.2 消息格式定义
OpenClaw定义了统一的消息格式,确保各层之间的数据交换标准化:
// 核心消息接口定义
interface OpenClawMessage {
// 消息唯一标识
id: string;
// 消息类型
type: 'text' | 'image' | 'audio' | 'video' | 'file' | 'command' | 'stream';
// 渠道标识
channel: string;
// 用户标识
userId: string;
// 会话标识
sessionId?: string;
// 消息内容
content: MessageContent;
// 元数据
metadata: {
timestamp: number;
source: string;
priority: 'low' | 'normal' | 'high' | 'urgent';
traceId: string;
};
// 扩展字段
extensions?: Record<string, unknown>;
}
// 消息内容联合类型
type MessageContent =
| TextContent
| ImageContent
| AudioContent
| VideoContent
| FileContent
| CommandContent
| StreamContent;
// 文本消息内容
interface TextContent {
type: 'text';
text: string;
format?: 'plain' | 'markdown' | 'html';
}
// 图片消息内容
interface ImageContent {
type: 'image';
url: string;
thumbnail?: string;
width?: number;
height?: number;
size?: number;
}
// 命令消息内容
interface CommandContent {
type: 'command';
command: string;
args?: Record<string, unknown>;
}
1.3.3 消息转换示例
不同渠道的消息需要转换为统一格式。以下是WeChatChannel的消息转换实现:
// WeChatChannel消息转换器
class WeChatMessageConverter {
/**
* 将微信消息转换为OpenClaw标准格式
*/
toOpenClawMessage(wechatMsg: WeChatMessage): OpenClawMessage {
const baseMessage: OpenClawMessage = {
id: this.generateMessageId(),
channel: 'wechat',
userId: wechatMsg.FromUserName,
sessionId: this.getSessionId(wechatMsg.FromUserName),
metadata: {
timestamp: Date.now(),
source: 'wechat-webhook',
priority: 'normal',
traceId: this.generateTraceId()
}
};
// 根据微信消息类型进行转换
switch (wechatMsg.MsgType) {
case 'text':
return {
...baseMessage,
type: 'text',
content: {
type: 'text',
text: wechatMsg.Content,
format: 'plain'
}
};
case 'image':
return {
...baseMessage,
type: 'image',
content: {
type: 'image',
url: wechatMsg.PicUrl,
thumbnail: wechatMsg.ThumbUrl,
size: wechatMsg.FileSize
}
};
case 'voice':
return {
...baseMessage,
type: 'audio',
content: {
type: 'audio',
url: wechatMsg.MediaUrl,
duration: wechatMsg.VoiceDuration,
format: wechatMsg.Format
}
};
default:
return {
...baseMessage,
type: 'text',
content: {
type: 'text',
text: '[不支持的消息类型]',
format: 'plain'
}
};
}
}
/**
* 将OpenClaw标准格式转换为微信消息
*/
toWeChatMessage(openclawMsg: OpenClawMessage): WeChatOutboundMessage {
const baseMsg = {
ToUserName: openclawMsg.userId,
FromUserName: this.getOfficialAccountId(),
CreateTime: Math.floor(Date.now() / 1000)
};
switch (openclawMsg.type) {
case 'text':
return {
...baseMsg,
MsgType: 'text',
Content: (openclawMsg.content as TextContent).text
};
case 'image':
const imgContent = openclawMsg.content as ImageContent;
return {
...baseMsg,
MsgType: 'image',
Image: {
MediaId: this.uploadMedia(imgContent.url, 'image')
}
};
default:
return {
...baseMsg,
MsgType: 'text',
Content: '消息类型暂不支持'
};
}
}
private generateMessageId(): string {
return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
private generateTraceId(): string {
return `trace_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
1.4 技术栈概览
OpenClaw的技术选型体现了现代Node.js应用的最佳实践。
1.4.1 运行时环境
| 组件 | 版本要求 | 说明 |
|---|---|---|
| Node.js | v22+ | 利用最新V8引擎优化和ES模块支持 |
| TypeScript | 5.0+ | 强类型保障,提升代码质量 |
| pnpm | 8.0+ | 高效的包管理器,节省磁盘空间 |
1.4.2 核心依赖
{
"dependencies": {
"ws": "^8.14.0",
"@modelcontextprotocol/sdk": "^1.0.0",
"openai": "^4.20.0",
"anthropic": "^0.18.0",
"zod": "^3.22.0",
"ioredis": "^5.3.0",
"bullmq": "^4.12.0",
"pino": "^8.16.0",
"prom-client": "^15.0.0"
},
"devDependencies": {
"vitest": "^1.0.0",
"@types/node": "^20.10.0",
"@types/ws": "^8.5.0",
"tsx": "^4.7.0"
}
}
依赖说明:
- ws: 高性能WebSocket实现,用于Gateway层的实时通信
- @modelcontextprotocol/sdk: MCP协议实现,用于工具调用和资源访问
- openai: OpenAI API客户端,支持GPT-4、GPT-4o等模型
- anthropic: Anthropic API客户端,支持Claude系列模型
- zod: 运行时类型验证,确保消息格式的正确性
- ioredis: Redis客户端,用于分布式会话存储
- bullmq: 消息队列,用于异步任务处理
- pino: 高性能日志库
- prom-client: Prometheus指标导出
1.4.3 默认配置
// config/gateway.config.ts
export const gatewayConfig = {
server: {
host: '127.0.0.1',
port: 18789,
path: '/ws',
maxConnections: 1000,
backlog: 511
},
session: {
timeout: 30 * 60 * 1000, // 30分钟
maxHistory: 100,
persistInterval: 60000
},
connection: {
heartbeatInterval: 30000,
heartbeatTimeout: 60000,
maxMessageSize: 1024 * 1024 // 1MB
}
};
第二章:Gateway网关层深度解析
Gateway网关层是OpenClaw的"交通枢纽",负责管理所有客户端连接、路由消息、维护会话状态。它是整个系统的入口和出口,其性能和稳定性直接影响用户体验。
2.1 WebSocket Server核心实现
WebSocket Server是Gateway层的核心组件,负责处理所有客户端的WebSocket连接。选择WebSocket而非HTTP的原因很简单:AI对话是典型的实时双向通信场景,WebSocket提供了低延迟、高效率的全双工通信能力。
2.1.1 服务器初始化
// src/gateway/WebSocketServer.ts
import { WebSocketServer as WSServer, WebSocket, RawData } from 'ws';
import { Server as HttpServer } from 'http';
import { Logger } from 'pino';
interface ServerOptions {
port: number;
host: string;
path: string;
maxConnections: number;
heartbeatInterval: number;
heartbeatTimeout: number;
}
export class OpenClawWebSocketServer {
private wss: WSServer;
private connections: Map<string, WebSocket>;
private logger: Logger;
private heartbeatTimer: NodeJS.Timeout;
constructor(private options: ServerOptions) {
this.connections = new Map();
this.logger = createLogger({ name: 'WebSocketServer' });
this.wss = this.createServer();
this.startHeartbeat();
}
private createServer(): WSServer {
const httpServer = createHttpServer();
const wss = new WSServer({
server: httpServer,
path: this.options.path,
clientTracking: true,
perMessageDeflate: {
zlibDeflateOptions: {
chunkSize: 1024,
memLevel: 7,
level: 3
},
zlibInflateOptions: {
chunkSize: 10 * 1024
},
threshold: 1024 // 仅压缩大于1KB的消息
}
});
// 连接建立
wss.on('connection', (ws: WebSocket, request) => {
this.handleConnection(ws, request);
});
// 错误处理
wss.on('error', (error) => {
this.logger.error({ error }, 'WebSocket server error');
});
// 监听端口
httpServer.listen(this.options.port, this.options.host, () => {
this.logger.info(
`WebSocket server listening on ${this.options.host}:${this.options.port}${this.options.path}`
);
});
return wss;
}
private handleConnection(ws: WebSocket, request: IncomingMessage) {
const connectionId = this.generateConnectionId();
// 检查最大连接数
if (this.connections.size >= this.options.maxConnections) {
this.logger.warn({ connectionId }, 'Max connections reached, rejecting');
ws.close(1013, 'Server busy');
return;
}
// 初始化连接属性
(ws as any).connectionId = connectionId;
(ws as any).isAlive = true;
(ws as any).lastPong = Date.now();
// 存储连接
this.connections.set(connectionId, ws);
// 设置消息处理
ws.on('message', (data: RawData, isBinary: boolean) => {
this.handleMessage(ws, data, isBinary);
});
// 设置心跳响应
ws.on('pong', () => {
(ws as any).isAlive = true;
(ws as any).lastPong = Date.now();
});
// 设置关闭处理
ws.on('close', (code: number, reason: Buffer) => {
this.handleClose(ws, code, reason);
});
// 设置错误处理
ws.on('error', (error: Error) => {
this.logger.error({ connectionId, error }, 'WebSocket error');
});
// 发送欢迎消息
this.sendWelcome(ws, connectionId);
this.logger.info({ connectionId, ip: request.socket.remoteAddress }, 'Connection established');
}
private handleMessage(ws: WebSocket, data: RawData, isBinary: boolean) {
const connectionId = (ws as any).connectionId;
try {
// 解析消息
const message = JSON.parse(data.toString());
// 验证消息格式
const validation = this.validateMessage(message);
if (!validation.valid) {
this.sendError(ws, 'INVALID_MESSAGE', validation.error);
return;
}
// 更新活跃时间
(ws as any).lastActivity = Date.now();
// 路由消息
this.routeMessage(ws, message);
} catch (error) {
this.logger.error({ connectionId, error }, 'Failed to parse message');
this.sendError(ws, 'PARSE_ERROR', 'Invalid JSON format');
}
}
private validateMessage(message: any): { valid: boolean; error?: string } {
// 检查必需字段
if (!message.type) {
return { valid: false, error: 'Missing required field: type' };
}
// 验证消息类型
const validTypes = ['text', 'image', 'audio', 'video', 'file', 'command', 'stream', 'ping'];
if (!validTypes.includes(message.type)) {
return { valid: false, error: `Invalid message type: ${message.type}` };
}
// 验证消息大小
const messageSize = JSON.stringify(message).length;
if (messageSize > this.options.maxMessageSize) {
return { valid: false, error: 'Message too large' };
}
return { valid: true };
}
private handleClose(ws: WebSocket, code: number, reason: Buffer) {
const connectionId = (ws as any).connectionId;
// 清理连接
this.connections.delete(connectionId);
// 通知Session Manager
this.sessionManager.handleDisconnect(connectionId);
this.logger.info({ connectionId, code, reason: reason.toString() }, 'Connection closed');
}
private startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
this.wss.clients.forEach((ws: WebSocket) => {
const connection = ws as any;
// 检查是否响应超时
if (!connection.isAlive) {
this.logger.warn({ connectionId: connection.connectionId }, 'Heartbeat timeout, terminating');
return ws.terminate();
}
// 发送ping
connection.isAlive = false;
connection.ping();
});
}, this.options.heartbeatInterval);
}
private generateConnectionId(): string {
return `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
private sendWelcome(ws: WebSocket, connectionId: string) {
this.send(ws, {
type: 'welcome',
connectionId,
timestamp: Date.now(),
config: {
heartbeatInterval: this.options.heartbeatInterval,
maxMessageSize: this.options.maxMessageSize
}
});
}
private sendError(ws: WebSocket, code: string, message: string) {
this.send(ws, {
type: 'error',
code,
message,
timestamp: Date.now()
});
}
public send(ws: WebSocket, data: object): void {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(data));
}
}
public broadcast(data: object): void {
const message = JSON.stringify(data);
this.connections.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
});
}
public getConnectionCount(): number {
return this.connections.size;
}
public close(): Promise<void> {
return new Promise((resolve) => {
clearInterval(this.heartbeatTimer);
// 关闭所有连接
this.connections.forEach((ws) => {
ws.close(1001, 'Server shutdown');
});
// 关闭服务器
this.wss.close(() => {
this.logger.info('WebSocket server closed');
resolve();
});
});
}
}
2.1.2 连接状态管理
WebSocket连接的生命周期包括三个阶段:建立、活跃、关闭。
┌──────────────────────────────────────────────────────────────────┐
│ WebSocket 连接生命周期 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 开始 │────>│ 建立 │────>│ 活跃 │────>│ 关闭 │ │
│ └─────────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 分配ID │ │心跳检测 │ │ 清理资源│ │
│ │ 发送欢迎│ │ 消息处理│ │ 通知会话│ │
│ │ 存储连接│ │ 路由消息│ │ 更新状态│ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 状态转换: │
│ CONNECTING (0) -> OPEN (1) -> CLOSING (2) -> CLOSED (3) │
│ │
└──────────────────────────────────────────────────────────────────┘
2.1.3 心跳机制详解
心跳机制是保持WebSocket连接活跃的关键。OpenClaw采用ping-pong模式,确保及时检测断开的连接。
// 心跳配置
interface HeartbeatConfig {
interval: number; // 心跳间隔,默认30秒
timeout: number; // 超时时间,默认60秒
maxMissedPongs: number; // 最大丢失pong次数,默认3次
}
class HeartbeatManager {
private missedPongs: Map<string, number>;
constructor(private config: HeartbeatConfig) {
this.missedPongs = new Map();
}
// 心跳检测循环
start(ws: WebSocket, connectionId: string) {
const timer = setInterval(() => {
if (ws.readyState !== WebSocket.OPEN) {
clearInterval(timer);
return;
}
const missed = this.missedPongs.get(connectionId) || 0;
if (missed >= this.config.maxMissedPongs) {
// 超过最大丢失次数,终止连接
ws.terminate();
this.missedPongs.delete(connectionId);
clearInterval(timer);
return;
}
// 记录一次丢失
this.missedPongs.set(connectionId, missed + 1);
// 发送ping
ws.ping();
}, this.config.interval);
// 监听pong响应
ws.on('pong', () => {
this.missedPongs.set(connectionId, 0);
});
}
}
2.1.4 性能优化配置
// 性能优化配置示例
const performanceConfig = {
// WebSocket压缩配置
compression: {
enabled: true,
threshold: 1024, // 大于1KB的消息启用压缩
level: 3, // 压缩级别(1-9)
memLevel: 7 // 内存级别(1-9)
},
// 连接池配置
connectionPool: {
maxSize: 1000,
minIdle: 10,
maxIdle: 100,
idleTimeout: 60000
},
// 消息队列配置
messageQueue: {
concurrency: 100, // 并发处理数
rateLimit: 1000, // 每秒最大消息数
bufferSize: 10000 // 缓冲区大小
},
// 监控指标
metrics: {
enabled: true,
port: 9090,
path: '/metrics',
collectInterval: 5000
}
};
2.2 Message Router消息路由器
Message Router是Gateway层的"交通指挥官",负责将接收到的消息路由到正确的处理单元。
2.2.1 路由策略设计
// src/gateway/MessageRouter.ts
import { OpenClawMessage } from '../types';
import { AgentManager } from '../agent/AgentManager';
type RouteHandler = (message: OpenClawMessage) => Promise<void>;
interface RouteRule {
type: string;
priority: number;
handler: RouteHandler;
}
export class MessageRouter {
private routes: RouteRule[];
private agentManager: AgentManager;
private sessionCache: LRUCache<string, string>;
constructor(agentManager: AgentManager) {
this.routes = [];
this.agentManager = agentManager;
this.sessionCache = new LRUCache<string, string>({
max: 10000,
ttl: 1000 * 60 * 30 // 30分钟
});
this.initializeRoutes();
}
private initializeRoutes() {
// 文本消息路由
this.addRoute({
type: 'text',
priority: 10,
handler: this.handleTextMessage.bind(this)
});
// 命令消息路由
this.addRoute({
type: 'command',
priority: 100, // 最高优先级
handler: this.handleCommandMessage.bind(this)
});
// 流式消息路由
this.addRoute({
type: 'stream',
priority: 50,
handler: this.handleStreamMessage.bind(this)
});
// 心跳消息路由
this.addRoute({
type: 'ping',
priority: 1000,
handler: this.handlePingMessage.bind(this)
});
}
private addRoute(route: RouteRule) {
this.routes.push(route);
this.routes.sort((a, b) => b.priority - a.priority);
}
/**
* 路由消息到正确的处理单元
*/
async route(message: OpenClawMessage): Promise<void> {
// 查找匹配的路由规则
const rule = this.routes.find(r => r.type === message.type);
if (!rule) {
throw new Error(`No route found for message type: ${message.type}`);
}
// 执行路由处理
await rule.handler(message);
}
/**
* 处理文本消息
*/
private async handleTextMessage(message: OpenClawMessage): Promise<void> {
// 从缓存中查找Agent实例
const cacheKey = `${message.channel}:${message.userId}`;
let agentId = this.sessionCache.get(cacheKey);
if (!agentId) {
// 创建或获取Agent实例
agentId = await this.agentManager.getOrCreateAgent({
channel: message.channel,
userId: message.userId
});
this.sessionCache.set(cacheKey, agentId);
}
// 将消息发送到Agent实例
await this.agentManager.sendMessage(agentId, message);
}
/**
* 处理命令消息
*/
private async handleCommandMessage(message: OpenClawMessage): Promise<void> {
const command = message.content as CommandContent;
switch (command.command) {
case 'cancel':
// 取消当前任务
await this.handleCancelCommand(message);
break;
case 'reset':
// 重置会话
await this.handleResetCommand(message);
break;
case 'status':
// 查询状态
await this.handleStatusCommand(message);
break;
default:
throw new Error(`Unknown command: ${command.command}`);
}
}
private async handleCancelCommand(message: OpenClawMessage): Promise<void> {
const cacheKey = `${message.channel}:${message.userId}`;
const agentId = this.sessionCache.get(cacheKey);
if (agentId) {
await this.agentManager.cancelTask(agentId);
}
}
private async handleResetCommand(message: OpenClawMessage): Promise<void> {
const cacheKey = `${message.channel}:${message.userId}`;
const agentId = this.sessionCache.get(cacheKey);
if (agentId) {
await this.agentManager.resetAgent(agentId);
this.sessionCache.delete(cacheKey);
}
}
private async handleStatusCommand(message: OpenClawMessage): Promise<void> {
const cacheKey = `${message.channel}:${message.userId}`;
const agentId = this.sessionCache.get(cacheKey);
if (agentId) {
const status = await this.agentManager.getStatus(agentId);
await this.sendResponse(message, {
type: 'status',
status
});
}
}
private async handleStreamMessage(message: OpenClawMessage): Promise<void> {
// 建立流式传输通道
const cacheKey = `${message.channel}:${message.userId}`;
const agentId = this.sessionCache.get(cacheKey);
if (agentId) {
await this.agentManager.enableStreaming(agentId);
}
}
private async handlePingMessage(message: OpenClawMessage): Promise<void> {
// 心跳响应
await this.sendResponse(message, {
type: 'pong',
timestamp: Date.now()
});
}
private async sendResponse(originalMessage: OpenClawMessage, response: any): Promise<void> {
// 实现响应发送逻辑
}
}
2.2.2 负载均衡策略
当系统部署了多个Agent实例时,Router需要根据负载均衡策略分配请求。
// 负载均衡策略接口
interface LoadBalanceStrategy {
select(instances: AgentInstance[], context: RouteContext): AgentInstance;
}
// 轮询策略
class RoundRobinStrategy implements LoadBalanceStrategy {
private currentIndex: number = 0;
select(instances: AgentInstance[], context: RouteContext): AgentInstance {
const instance = instances[this.currentIndex % instances.length];
this.currentIndex++;
return instance;
}
}
// 最少连接策略
class LeastConnectionsStrategy implements LoadBalanceStrategy {
select(instances: AgentInstance[], context: RouteContext): AgentInstance {
return instances.reduce((min, current) =>
current.connections < min.connections ? current : min
);
}
}
// 加权轮询策略
class WeightedRoundRobinStrategy implements LoadBalanceStrategy {
private weights: Map<string, number> = new Map();
select(instances: AgentInstance[], context: RouteContext): AgentInstance {
// 根据实例权重选择
let totalWeight = 0;
instances.forEach(instance => {
totalWeight += this.getWeight(instance);
});
let random = Math.random() * totalWeight;
for (const instance of instances) {
random -= this.getWeight(instance);
if (random <= 0) {
return instance;
}
}
return instances[0];
}
private getWeight(instance: AgentInstance): number {
// 根据CPU、内存使用率动态计算权重
const cpuWeight = Math.max(0, 100 - instance.cpuUsage) / 100;
const memoryWeight = Math.max(0, 100 - instance.memoryUsage) / 100;
return (cpuWeight + memoryWeight) / 2;
}
}
// 一致性哈希策略(用于会话粘性)
class ConsistentHashStrategy implements LoadBalanceStrategy {
private ring: Map<number, AgentInstance> = new Map();
private virtualNodes: number = 150;
constructor(instances: AgentInstance[]) {
this.buildRing(instances);
}
private buildRing(instances: AgentInstance[]): void {
instances.forEach(instance => {
for (let i = 0; i < this.virtualNodes; i++) {
const hash = this.hash(`${instance.id}:${i}`);
this.ring.set(hash, instance);
}
});
}
select(instances: AgentInstance[], context: RouteContext): AgentInstance {
const hash = this.hash(`${context.channel}:${context.userId}`);
// 查找最近的节点
const keys = Array.from(this.ring.keys()).sort((a, b) => a - b);
for (const key of keys) {
if (key >= hash) {
return this.ring.get(key)!;
}
}
return this.ring.get(keys[0])!;
}
private hash(key: string): number {
// 简单的哈希函数
let hash = 0;
for (let i = 0; i < key.length; i++) {
hash = ((hash << 5) - hash) + key.charCodeAt(i);
hash |= 0;
}
return Math.abs(hash);
}
}
2.2.3 路由性能对比
不同路由策略的性能对比数据:
| 策略 | 平均延迟 | 最大吞吐量 | 适用场景 |
|---|---|---|---|
| 轮询 | 2.1ms | 50,000 req/s | 实例性能相同 |
| 最少连接 | 2.3ms | 45,000 req/s | 实例性能不同 |
| 加权轮询 | 2.5ms | 42,000 req/s | 动态负载均衡 |
| 一致性哈希 | 1.8ms | 55,000 req/s | 会话粘性需求 |
2.3 Session Manager会话管理器
Session Manager负责维护用户会话状态,确保多轮对话的连续性。
2.3.1 会话数据结构
// 会话数据结构
interface Session {
// 会话标识
id: string;
// 用户标识
userId: string;
// 渠道标识
channel: string;
// Agent实例ID
agentId: string;
// 连接ID
connectionId: string;
// 会话状态
state: 'active' | 'idle' | 'closed';
// 创建时间
createdAt: number;
// 最后活跃时间
lastActiveAt: number;
// 对话历史
messages: Message[];
// 用户上下文
context: SessionContext;
// 元数据
metadata: Record<string, unknown>;
}
// 会话上下文
interface SessionContext {
// 用户偏好
preferences: Record<string, unknown>;
// 当前任务
currentTask?: {
id: string;
type: string;
status: 'running' | 'paused' | 'completed';
progress: number;
};
// 临时变量
variables: Map<string, unknown>;
// 自定义数据
custom?: Record<string, unknown>;
}
// 消息记录
interface Message {
id: string;
role: 'user' | 'assistant' | 'system' | 'tool';
content: string;
timestamp: number;
metadata?: Record<string, unknown>;
}
2.3.2 会话存储实现
// src/gateway/SessionManager.ts
import { Redis } from 'ioredis';
import { Logger } from 'pino';
export class SessionManager {
private redis: Redis;
private localCache: Map<string, Session>;
private logger: Logger;
// 配置
private readonly SESSION_TTL = 30 * 60; // 30分钟
private readonly MAX_MESSAGES = 100;
private readonly PERSIST_INTERVAL = 60000; // 1分钟
constructor(redisUrl: string) {
this.redis = new Redis(redisUrl);
this.localCache = new Map();
this.logger = createLogger({ name: 'SessionManager' });
this.startPersistLoop();
}
/**
* 创建新会话
*/
async createSession(params: {
channel: string;
userId: string;
connectionId: string;
}): Promise<Session> {
const session: Session = {
id: this.generateSessionId(),
userId: params.userId,
channel: params.channel,
connectionId: params.connectionId,
agentId: await this.assignAgent(params.channel, params.userId),
state: 'active',
createdAt: Date.now(),
lastActiveAt: Date.now(),
messages: [],
context: {
preferences: {},
variables: new Map()
},
metadata: {}
};
// 存储到本地缓存
this.localCache.set(session.id, session);
// 异步持久化到Redis
await this.persistSession(session);
this.logger.info({ sessionId: session.id, userId: params.userId }, 'Session created');
return session;
}
/**
* 获取会话
*/
async getSession(sessionId: string): Promise<Session | null> {
// 先查本地缓存
let session = this.localCache.get(sessionId);
if (session) {
return session;
}
// 查询Redis
const data = await this.redis.get(`session:${sessionId}`);
if (data) {
session = JSON.parse(data);
this.localCache.set(sessionId, session);
return session;
}
return null;
}
/**
* 更新会话
*/
async updateSession(sessionId: string, updates: Partial<Session>): Promise<void> {
const session = await this.getSession(sessionId);
if (!session) {
throw new Error(`Session not found: ${sessionId}`);
}
// 合并更新
Object.assign(session, updates, {
lastActiveAt: Date.now()
});
// 更新本地缓存
this.localCache.set(sessionId, session);
}
/**
* 添加消息到会话
*/
async addMessage(sessionId: string, message: Message): Promise<void> {
const session = await this.getSession(sessionId);
if (!session) {
throw new Error(`Session not found: ${sessionId}`);
}
// 添加消息
session.messages.push(message);
// 限制消息历史数量
if (session.messages.length > this.MAX_MESSAGES) {
session.messages = session.messages.slice(-this.MAX_MESSAGES);
}
// 更新活跃时间
session.lastActiveAt = Date.now();
// 更新缓存
this.localCache.set(sessionId, session);
}
/**
* 持久化会话到Redis
*/
private async persistSession(session: Session): Promise<void> {
const key = `session:${session.id}`;
await this.redis.setex(
key,
this.SESSION_TTL,
JSON.stringify(session)
);
}
/**
* 定期持久化循环
*/
private startPersistLoop(): void {
setInterval(async () => {
for (const [id, session] of this.localCache) {
if (session.state === 'active') {
await this.persistSession(session);
}
}
}, this.PERSIST_INTERVAL);
}
/**
* 关闭会话
*/
async closeSession(sessionId: string): Promise<void> {
const session = await this.getSession(sessionId);
if (session) {
session.state = 'closed';
await this.persistSession(session);
this.localCache.delete(sessionId);
this.logger.info({ sessionId }, 'Session closed');
}
}
private generateSessionId(): string {
return `sess_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
private async assignAgent(channel: string, userId: string): Promise<string> {
// Agent分配逻辑
return `agent_${channel}_${userId}`;
}
}
2.3.3 会话状态转换
┌─────────────────────────────────────────────────────────────────┐
│ 会话状态转换图 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ │
│ │ created │ │
│ └────┬─────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ 用户消息 ┌──────────┐ │
│ │ active │<─────────────────│ idle │ │
│ └────┬─────┘ └────┬─────┘ │
│ │ │ │
│ │ 超时无活动 │ 用户消息 │
│ │ │ │
│ └────────────┬────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ closed │ │
│ └──────────┘ │
│ │
│ 状态说明: │
│ - created: 会话刚创建,等待第一条消息 │
│ - active: 会话正在处理消息 │
│ - idle: 会话空闲,等待新消息 │
│ - closed: 会话已关闭,资源已释放 │
│ │
└─────────────────────────────────────────────────────────────────┘
2.4 Connection Pool连接池
Connection Pool管理WebSocket连接池,提供高效的连接复用和资源管理。
// src/gateway/ConnectionPool.ts
export class ConnectionPool {
private connections: Map<string, PooledConnection>;
private idleConnections: Set<string>;
private config: PoolConfig;
private logger: Logger;
constructor(config: PoolConfig) {
this.connections = new Map();
this.idleConnections = new Set();
this.config = config;
this.logger = createLogger({ name: 'ConnectionPool' });
this.startMaintenance();
}
/**
* 获取或创建连接
*/
async acquire(key: string): Promise<PooledConnection> {
let conn = this.connections.get(key);
if (conn && conn.state === 'active') {
conn.refCount++;
return conn;
}
// 检查是否达到最大连接数
if (this.connections.size >= this.config.maxSize) {
// 尝试回收空闲连接
await this.recycleIdleConnections();
// 再次检查
if (this.connections.size >= this.config.maxSize) {
throw new Error('Connection pool exhausted');
}
}
// 创建新连接
conn = await this.createConnection(key);
this.connections.set(key, conn);
return conn;
}
/**
* 释放连接
*/
async release(key: string): Promise<void> {
const conn = this.connections.get(key);
if (!conn) return;
conn.refCount--;
if (conn.refCount <= 0) {
conn.state = 'idle';
conn.lastIdleAt = Date.now();
this.idleConnections.add(key);
}
}
/**
* 回收空闲连接
*/
private async recycleIdleConnections(): Promise<void> {
const now = Date.now();
const toRemove: string[] = [];
for (const key of this.idleConnections) {
const conn = this.connections.get(key);
if (conn && now - conn.lastIdleAt > this.config.idleTimeout) {
toRemove.push(key);
}
}
for (const key of toRemove) {
const conn = this.connections.get(key);
if (conn) {
await this.closeConnection(conn);
this.connections.delete(key);
this.idleConnections.delete(key);
}
}
}
private async createConnection(key: string): Promise<PooledConnection> {
// 实现连接创建逻辑
return {
key,
ws: null,
state: 'active',
refCount: 1,
createdAt: Date.now(),
lastIdleAt: 0
};
}
private async closeConnection(conn: PooledConnection): Promise<void> {
// 实现连接关闭逻辑
}
private startMaintenance(): void {
setInterval(() => {
this.recycleIdleConnections();
}, this.config.maintenanceInterval);
}
/**
* 获取池状态
*/
getStatus(): PoolStatus {
return {
total: this.connections.size,
active: Array.from(this.connections.values())
.filter(c => c.state === 'active').length,
idle: this.idleConnections.size,
maxSize: this.config.maxSize
};
}
}
interface PoolConfig {
maxSize: number;
minIdle: number;
maxIdle: number;
idleTimeout: number;
maintenanceInterval: number;
}
interface PooledConnection {
key: string;
ws: WebSocket | null;
state: 'active' | 'idle';
refCount: number;
createdAt: number;
lastIdleAt: number;
}
2.5 Gateway层性能指标
Gateway层的关键性能指标:
┌─────────────────────────────────────────────────────────────────┐
│ Gateway 性能监控指标 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 连接指标: │
│ ├── gateway_connections_total 当前总连接数 │
│ ├── gateway_connections_active 活跃连接数 │
│ ├── gateway_connections_idle 空闲连接数 │
│ ├── gateway_connections_rate 连接建立速率(个/秒) │
│ └── gateway_connections_errors 连接错误数 │
│ │
│ 消息指标: │
│ ├── gateway_messages_received 接收消息总数 │
│ ├── gateway_messages_sent 发送消息总数 │
│ ├── gateway_messages_rate 消息处理速率(条/秒) │
│ ├── gateway_message_size_avg 平均消息大小(字节) │
│ └── gateway_message_latency_p99 P99消息延迟(ms) │
│ │
│ 会话指标: │
│ ├── gateway_sessions_total 当前总会话数 │
│ ├── gateway_sessions_active 活跃会话数 │
│ ├── gateway_session_duration_avg 平均会话时长(秒) │
│ └── gateway_session_messages_avg 平均每会话消息数 │
│ │
│ 性能指标: │
│ ├── gateway_throughput 吞吐量(消息/秒) │
│ ├── gateway_cpu_usage CPU使用率(%) │
│ ├── gateway_memory_usage 内存使用量(MB) │
│ └── gateway_event_loop_lag 事件循环延迟(ms) │
│ │
└─────────────────────────────────────────────────────────────────┘
2.5.1 性能监控配置示例
// src/gateway/metrics/GatewayMetrics.ts
import client from 'prom-client';
export class GatewayMetrics {
// 连接相关指标
private connectionsTotal: client.Gauge;
private connectionsActive: client.Gauge;
private connectionsRate: client.Counter;
private connectionsErrors: client.Counter;
// 消息相关指标
private messagesReceived: client.Counter;
private messagesSent: client.Counter;
private messageLatency: client.Histogram;
private messageSize: client.Histogram;
// 会话相关指标
private sessionsTotal: client.Gauge;
private sessionsActive: client.Gauge;
private sessionDuration: client.Histogram;
constructor() {
// 初始化连接指标
this.connectionsTotal = new client.Gauge({
name: 'gateway_connections_total',
help: '当前总连接数',
labelNames: ['channel']
});
this.connectionsActive = new client.Gauge({
name: 'gateway_connections_active',
help: '活跃连接数',
labelNames: ['channel']
});
this.connectionsRate = new client.Counter({
name: 'gateway_connections_rate',
help: '连接建立速率',
labelNames: ['channel']
});
this.connectionsErrors = new client.Counter({
name: 'gateway_connections_errors',
help: '连接错误数',
labelNames: ['channel', 'error_type']
});
// 初始化消息指标
this.messagesReceived = new client.Counter({
name: 'gateway_messages_received',
help: '接收消息总数',
labelNames: ['channel', 'type']
});
this.messagesSent = new client.Counter({
name: 'gateway_messages_sent',
help: '发送消息总数',
labelNames: ['channel', 'type']
});
this.messageLatency = new client.Histogram({
name: 'gateway_message_latency_seconds',
help: '消息处理延迟',
labelNames: ['channel'],
buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10]
});
this.messageSize = new client.Histogram({
name: 'gateway_message_size_bytes',
help: '消息大小分布',
labelNames: ['channel'],
buckets: [100, 500, 1000, 5000, 10000, 50000, 100000]
});
// 初始化会话指标
this.sessionsTotal = new client.Gauge({
name: 'gateway_sessions_total',
help: '当前总会话数',
labelNames: ['channel']
});
this.sessionsActive = new client.Gauge({
name: 'gateway_sessions_active',
help: '活跃会话数',
labelNames: ['channel']
});
this.sessionDuration = new client.Histogram({
name: 'gateway_session_duration_seconds',
help: '会话时长分布',
labelNames: ['channel'],
buckets: [60, 300, 600, 1800, 3600, 7200, 14400]
});
}
/**
* 记录连接建立
*/
recordConnection(channel: string): void {
this.connectionsTotal.inc({ channel });
this.connectionsRate.inc({ channel });
}
/**
* 记录连接关闭
*/
recordDisconnection(channel: string): void {
this.connectionsTotal.dec({ channel });
}
/**
* 记录连接错误
*/
recordConnectionError(channel: string, errorType: string): void {
this.connectionsErrors.inc({ channel, error_type: errorType });
}
/**
* 记录消息接收
*/
recordMessageReceived(channel: string, type: string, size: number): void {
this.messagesReceived.inc({ channel, type });
this.messageSize.observe({ channel }, size);
}
/**
* 记录消息发送
*/
recordMessageSent(channel: string, type: string): void {
this.messagesSent.inc({ channel, type });
}
/**
* 记录消息延迟
*/
recordMessageLatency(channel: string, latencyMs: number): void {
this.messageLatency.observe({ channel }, latencyMs / 1000);
}
/**
* 更新会话统计
*/
updateSessionStats(channel: string, total: number, active: number): void {
this.sessionsTotal.set({ channel }, total);
this.sessionsActive.set({ channel }, active);
}
/**
* 记录会话结束
*/
recordSessionEnd(channel: string, durationSeconds: number): void {
this.sessionDuration.observe({ channel }, durationSeconds);
}
/**
* 获取指标数据
*/
async getMetrics(): Promise<string> {
return client.register.metrics();
}
}
2.5.2 性能优化建议
基于实际生产环境的经验,以下是Gateway层的关键优化建议:
1. WebSocket连接优化
// 连接优化配置
const wsOptimizationConfig = {
// 启用消息压缩
perMessageDeflate: {
zlibDeflateOptions: {
chunkSize: 1024,
memLevel: 7,
level: 3 // 压缩级别,越高压缩率越大但CPU消耗越多
},
threshold: 1024 // 仅压缩大于1KB的消息
},
// 连接超时配置
handshakeTimeout: 5000, // 握手超时
maxPayload: 10 * 1024 * 1024, // 最大消息体10MB
// 心跳配置
heartbeat: {
interval: 30000, // 30秒发送一次心跳
timeout: 60000, // 60秒无响应视为断开
maxMissed: 3 // 最大丢失心跳次数
},
// 背压控制
backpressure: {
enabled: true,
highWaterMark: 1024 * 1024, // 1MB缓冲区
drainInterval: 100 // 100ms检查一次缓冲区
}
};
2. 消息路由优化
// 路由优化策略
class OptimizedMessageRouter {
private routeCache: LRUCache<string, RouteInfo>;
private batchQueue: MessageBatch[];
private batchTimer: NodeJS.Timeout;
constructor() {
// 路由缓存,避免重复计算
this.routeCache = new LRUCache({
max: 50000,
ttl: 1000 * 60 * 5 // 5分钟缓存
});
// 批处理队列
this.batchQueue = [];
this.batchTimer = setInterval(() => this.flushBatch(), 10); // 10ms批处理
}
/**
* 优化的消息路由
*/
async routeOptimized(message: OpenClawMessage): Promise<void> {
const cacheKey = `${message.channel}:${message.userId}`;
// 尝试从缓存获取路由信息
let routeInfo = this.routeCache.get(cacheKey);
if (!routeInfo) {
// 缓存未命中,计算并缓存
routeInfo = await this.computeRoute(message);
this.routeCache.set(cacheKey, routeInfo);
}
// 加入批处理队列
this.batchQueue.push({ message, routeInfo });
}
/**
* 批量处理消息
*/
private async flushBatch(): Promise<void> {
if (this.batchQueue.length === 0) return;
const batch = this.batchQueue.splice(0, 100); // 每次最多处理100条
// 按Agent分组
const groups = this.groupByAgent(batch);
// 并行发送到各Agent
await Promise.all(
Array.from(groups.entries()).map(([agentId, messages]) =>
this.sendBatchToAgent(agentId, messages)
)
);
}
private groupByAgent(batch: MessageBatch[]): Map<string, MessageBatch[]> {
const groups = new Map<string, MessageBatch[]>();
for (const item of batch) {
const agentId = item.routeInfo.agentId;
if (!groups.has(agentId)) {
groups.set(agentId, []);
}
groups.get(agentId)!.push(item);
}
return groups;
}
private async sendBatchToAgent(agentId: string, messages: MessageBatch[]): Promise<void> {
// 批量发送到Agent实例
}
private async computeRoute(message: OpenClawMessage): Promise<RouteInfo> {
// 计算路由信息
return { agentId: '', sessionId: '' };
}
}
interface RouteInfo {
agentId: string;
sessionId: string;
}
interface MessageBatch {
message: OpenClawMessage;
routeInfo: RouteInfo;
}
3. 会话存储优化
// 会话存储优化策略
class OptimizedSessionStorage {
private redis: Redis;
private localCache: Map<string, Session>;
private writeBuffer: Map<string, Session>;
private flushTimer: NodeJS.Timeout;
constructor() {
this.localCache = new Map();
this.writeBuffer = new Map();
// 延迟写入策略,减少Redis访问
this.flushTimer = setInterval(() => this.flushToRedis(), 5000);
}
/**
* 读取会话(优先本地缓存)
*/
async getSession(sessionId: string): Promise<Session | null> {
// 先查本地缓存
const cached = this.localCache.get(sessionId);
if (cached) return cached;
// 查询Redis
const data = await this.redis.get(`session:${sessionId}`);
if (data) {
const session = JSON.parse(data);
this.localCache.set(sessionId, session);
return session;
}
return null;
}
/**
* 更新会话(延迟写入)
*/
async updateSession(sessionId: string, updates: Partial<Session>): Promise<void> {
let session = this.localCache.get(sessionId);
if (!session) {
session = await this.getSession(sessionId);
}
if (session) {
Object.assign(session, updates, { lastActiveAt: Date.now() });
this.localCache.set(sessionId, session);
// 加入写入缓冲区,延迟刷入Redis
this.writeBuffer.set(sessionId, session);
}
}
/**
* 批量写入Redis
*/
private async flushToRedis(): Promise<void> {
if (this.writeBuffer.size === 0) return;
const pipeline = this.redis.pipeline();
for (const [id, session] of this.writeBuffer) {
pipeline.setex(`session:${id}`, 1800, JSON.stringify(session));
}
await pipeline.exec();
this.writeBuffer.clear();
}
}
4. 连接池调优
// 连接池优化配置
const connectionPoolConfig = {
// 连接限制
maxConnections: 10000, // 最大连接数
maxConnectionsPerIp: 100, // 单IP最大连接数
// 资源管理
socketTimeout: 60000, // Socket超时
keepAliveTimeout: 65000, // Keep-alive超时
// 缓冲区配置
sendBufferSize: 64 * 1024, // 发送缓冲区64KB
recvBufferSize: 64 * 1024, // 接收缓冲区64KB
// 性能监控
metricsInterval: 10000, // 10秒采集一次指标
healthCheckInterval: 30000 // 30秒健康检查
};
5. 性能基准测试结果
在标准测试环境下的Gateway层性能数据:
| 测试场景 | 并发连接数 | 消息吞吐量 | 平均延迟 | P99延迟 | 内存占用 |
|---|---|---|---|---|---|
| 轻量消息 | 1,000 | 50,000/s | 2ms | 8ms | 200MB |
| 中等消息 | 5,000 | 30,000/s | 5ms | 15ms | 500MB |
| 重量消息 | 10,000 | 15,000/s | 10ms | 30ms | 1GB |
| 极限压测 | 50,000 | 8,000/s | 50ms | 200ms | 4GB |
第三章:Agent智能体层深度解析
Agent智能体层是OpenClaw的"大脑",负责理解用户意图、制定执行计划、调用工具完成任务。这一层的设计直接决定了智能体的智能程度和执行能力。
3.1 Lobster循环引擎详解
Lobster循环引擎是OpenClaw智能体层的核心组件,实现了一个完整的思考-行动-观察-反思(TAOR)循环。这个名字来源于龙虾的神经系统结构——龙虾的神经节分布式的处理模式,正如OpenClaw的智能体采用迭代式的推理过程。
3.1.1 Lobster循环的核心实现原理
┌─────────────────────────────────────────────────────────────────┐
│ Lobster 循环引擎架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ 输入消息 │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Think 思考阶段 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 理解意图 │->│ 分析上下文 │->│ 制定计划 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └────────────────────────────────┬──────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Act 行动阶段 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 选择工具 │->│ 构造参数 │->│ 执行调用 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └────────────────────────────────┬──────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Observe 观察阶段 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 收集结果 │->│ 解析输出 │->│ 更新状态 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └────────────────────────────────┬──────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Reflect 反思阶段 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 评估结果 │->│ 调整策略 │->│ 决定下一步 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └────────────────────────────────┬──────────────────────┘ │
│ │ │
│ ┌────────────────┴────────────────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ 继续循环 │ │ 输出结果 │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
3.1.2 Lobster循环引擎核心实现
// src/agent/LobsterEngine.ts
import { LLMProvider } from './LLMProvider';
import { ToolsRegistry } from './ToolsRegistry';
import { AgentState, AgentContext } from './types';
interface LobsterLoopConfig {
maxIterations: number; // 最大迭代次数
thinkTimeout: number; // 思考阶段超时
actTimeout: number; // 行动阶段超时
reflectionEnabled: boolean; // 是否启用反思
}
type LoopPhase = 'think' | 'act' | 'observe' | 'reflect';
export class LobsterEngine {
private state: AgentState;
private context: AgentContext;
private iteration: number;
private phase: LoopPhase;
constructor(
private llmProvider: LLMProvider,
private toolsRegistry: ToolsRegistry,
private config: LobsterLoopConfig
) {
this.state = 'idle';
this.iteration = 0;
this.phase = 'think';
}
/**
* 执行Lobster循环
*/
async execute(input: string, context: AgentContext): Promise<string> {
this.context = context;
this.state = 'running';
this.iteration = 0;
const trace: LoopTrace[] = [];
try {
while (this.iteration < this.config.maxIterations) {
this.iteration++;
// Think阶段
const thinkResult = await this.think(input, trace);
trace.push({ phase: 'think', result: thinkResult });
// 判断是否需要行动
if (!thinkResult.needsAction) {
// 直接输出结果
this.state = 'completed';
return thinkResult.response || '';
}
// Act阶段
const actResult = await this.act(thinkResult);
trace.push({ phase: 'act', result: actResult });
// Observe阶段
const observeResult = await this.observe(actResult);
trace.push({ phase: 'observe', result: observeResult });
// Reflect阶段
if (this.config.reflectionEnabled) {
const reflectResult = await this.reflect(observeResult, trace);
trace.push({ phase: 'reflect', result: reflectResult });
if (reflectResult.shouldTerminate) {
this.state = 'completed';
return reflectResult.response || '';
}
}
}
// 达到最大迭代次数
this.state = 'max_iterations_reached';
return this.generateTimeoutResponse(trace);
} catch (error) {
this.state = 'error';
throw error;
}
}
/**
* Think阶段 - 思考与分析
*/
private async think(input: string, trace: LoopTrace[]): Promise<ThinkResult> {
this.phase = 'think';
// 构建提示词
const systemPrompt = this.buildThinkPrompt();
const conversationHistory = this.buildConversationHistory(trace);
// 调用LLM进行推理
const response = await this.llmProvider.chat({
messages: [
{ role: 'system', content: systemPrompt },
...conversationHistory,
{ role: 'user', content: input }
],
tools: this.toolsRegistry.getToolDefinitions(),
tool_choice: 'auto'
});
// 解析响应
return this.parseThinkResponse(response);
}
/**
* Act阶段 - 执行工具调用
*/
private async act(thinkResult: ThinkResult): Promise<ActResult> {
this.phase = 'act';
const results: ToolCallResult[] = [];
for (const toolCall of thinkResult.toolCalls || []) {
const tool = this.toolsRegistry.getTool(toolCall.name);
if (!tool) {
results.push({
toolCall,
success: false,
error: `Tool not found: ${toolCall.name}`
});
continue;
}
try {
// 参数验证
const validatedArgs = tool.validateArgs(toolCall.args);
// 执行工具
const result = await tool.execute(validatedArgs, this.context);
results.push({
toolCall,
success: true,
result
});
} catch (error) {
results.push({
toolCall,
success: false,
error: error instanceof Error ? error.message : String(error)
});
}
}
return { toolResults: results };
}
/**
* Observe阶段 - 观察结果
*/
private async observe(actResult: ActResult): Promise<ObserveResult> {
this.phase = 'observe';
// 收集所有工具调用结果
const observations: Observation[] = [];
for (const result of actResult.toolResults) {
observations.push({
toolName: result.toolCall.name,
success: result.success,
data: result.success ? result.result : result.error
});
}
// 更新上下文
this.context.observations = observations;
return { observations };
}
/**
* Reflect阶段 - 反思与决策
*/
private async reflect(observeResult: ObserveResult, trace: LoopTrace[]): Promise<ReflectResult> {
this.phase = 'reflect';
// 构建反思提示
const reflectPrompt = this.buildReflectPrompt(observeResult, trace);
const response = await this.llmProvider.chat({
messages: [
{ role: 'system', content: reflectPrompt },
{ role: 'user', content: '请分析当前执行结果,决定下一步行动。' }
]
});
return this.parseReflectResponse(response);
}
}
// 类型定义
interface LoopTrace {
phase: LoopPhase;
result: any;
}
interface ThinkResult {
needsAction: boolean;
response?: string;
toolCalls?: ToolCall[];
reasoning?: string;
}
interface ToolCall {
id: string;
name: string;
args: Record<string, unknown>;
}
interface ActResult {
toolResults: ToolCallResult[];
}
interface ToolCallResult {
toolCall: ToolCall;
success: boolean;
result?: any;
error?: string;
}
interface ObserveResult {
observations: Observation[];
}
interface Observation {
toolName: string;
success: boolean;
data: any;
}
interface ReflectResult {
shouldTerminate: boolean;
response?: string;
nextAction?: string;
}
3.1.3 Think阶段的推理机制
Think阶段是Lobster循环的起点,负责理解用户意图并制定执行计划。这个阶段的核心是将自然语言输入转化为结构化的执行计划。
// src/agent/ThinkEngine.ts
export class ThinkEngine {
private llmProvider: LLMProvider;
private toolsRegistry: ToolsRegistry;
/**
* 构建Think阶段的系统提示
*/
buildThinkPrompt(): string {
const tools = this.toolsRegistry.getToolDefinitions();
return `你是一个智能助手,正在思考阶段。你的任务是:
1. 理解用户的意图和需求
2. 分析当前上下文和历史信息
3. 决定是否需要调用工具
4. 如果需要调用工具,确定要调用的工具及参数
## 可用工具
${tools.map(t => `- ${t.name}: ${t.description}`).join('\n')}
## 输出格式
你必须以JSON格式输出思考结果:
{
"reasoning": "你的思考过程",
"intent": "用户意图描述",
"needsAction": true/false,
"toolCalls": [
{
"name": "工具名称",
"args": { "参数": "值" }
}
],
"response": "如果不需调用工具,直接回复用户"
}`;
}
/**
* 解析Think阶段响应
*/
parseThinkResponse(response: LLMResponse): ThinkResult {
const content = response.choices[0].message.content;
try {
// 尝试解析JSON
const parsed = JSON.parse(content);
return {
needsAction: parsed.needsAction || false,
response: parsed.response,
toolCalls: parsed.toolCalls?.map((tc: any) => ({
id: this.generateToolCallId(),
name: tc.name,
args: tc.args || {}
})),
reasoning: parsed.reasoning
};
} catch {
// 如果不是JSON,可能是纯文本回复
return {
needsAction: false,
response: content,
reasoning: 'LLM直接返回文本回复'
};
}
}
/**
* 意图识别增强
*/
async recognizeIntent(input: string): Promise<IntentRecognition> {
const prompt = `分析以下用户输入,识别用户意图:
输入: ${input}
请输出:
1. 主要意图类别
2. 意图置信度
3. 相关实体
4. 可能的操作`;
const response = await this.llmProvider.chat({
messages: [{ role: 'user', content: prompt }]
});
return this.parseIntentResponse(response);
}
private generateToolCallId(): string {
return `call_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
interface IntentRecognition {
category: string;
confidence: number;
entities: Entity[];
possibleActions: string[];
}
3.1.4 Act阶段的工具调用
Act阶段负责执行Think阶段确定的工具调用。这是将决策转化为实际行动的关键环节。
// src/agent/ActEngine.ts
export class ActEngine {
private toolsRegistry: ToolsRegistry;
private executionTimeout: number = 30000; // 30秒超时
/**
* 执行工具调用
*/
async executeToolCalls(
toolCalls: ToolCall[],
context: AgentContext
): Promise<ToolCallResult[]> {
const results: ToolCallResult[] = [];
// 支持并行执行多个工具
const promises = toolCalls.map(tc => this.executeSingleTool(tc, context));
const settled = await Promise.allSettled(promises);
for (let i = 0; i < settled.length; i++) {
const result = settled[i];
const toolCall = toolCalls[i];
if (result.status === 'fulfilled') {
results.push({
toolCall,
success: true,
result: result.value
});
} else {
results.push({
toolCall,
success: false,
error: result.reason?.message || 'Unknown error'
});
}
}
return results;
}
/**
* 执行单个工具
*/
private async executeSingleTool(
toolCall: ToolCall,
context: AgentContext
): Promise<any> {
const tool = this.toolsRegistry.getTool(toolCall.name);
if (!tool) {
throw new Error(`Tool not found: ${toolCall.name}`);
}
// 权限检查
if (!this.checkPermission(tool, context)) {
throw new Error(`No permission to execute tool: ${toolCall.name}`);
}
// 参数验证
const validatedArgs = this.validateArgs(tool, toolCall.args);
// 执行工具(带超时)
return Promise.race([
tool.execute(validatedArgs, context),
this.createTimeout(toolCall.name)
]);
}
/**
* 参数验证
*/
private validateArgs(tool: Tool, args: Record<string, unknown>): any {
const schema = tool.getParameterSchema();
// 使用Zod进行运行时验证
const result = schema.safeParse(args);
if (!result.success) {
throw new Error(
`Invalid arguments: ${result.error.errors.map(e => e.message).join(', ')}`
);
}
return result.data;
}
/**
* 权限检查
*/
private checkPermission(tool: Tool, context: AgentContext): boolean {
const requiredPermissions = tool.getRequiredPermissions();
const userPermissions = context.userPermissions || [];
return requiredPermissions.every(p => userPermissions.includes(p));
}
private createTimeout(toolName: string): Promise<never> {
return new Promise((_, reject) => {
setTimeout(() => {
reject(new Error(`Tool execution timeout: ${toolName}`));
}, this.executionTimeout);
});
}
}
3.1.5 Observe阶段的观察反馈
Observe阶段负责收集和处理工具执行结果,为下一轮循环提供输入。
// src/agent/ObserveEngine.ts
export class ObserveEngine {
/**
* 处理观察结果
*/
async processObservations(
results: ToolCallResult[],
context: AgentContext
): Promise<ObservationSummary> {
const observations: Observation[] = [];
for (const result of results) {
const observation = await this.processSingleResult(result);
observations.push(observation);
}
// 更新上下文
context.lastObservations = observations;
context.totalObservations = (context.totalObservations || 0) + observations.length;
// 生成摘要
return this.generateSummary(observations);
}
/**
* 处理单个结果
*/
private async processSingleResult(result: ToolCallResult): Promise<Observation> {
const observation: Observation = {
toolName: result.toolCall.name,
success: result.success,
timestamp: Date.now()
};
if (result.success) {
// 处理成功结果
observation.data = this.sanitizeResult(result.result);
observation.dataType = this.detectDataType(result.result);
} else {
// 处理错误
observation.error = result.error;
observation.dataType = 'error';
}
return observation;
}
/**
* 清理结果数据
*/
private sanitizeResult(result: any): any {
// 移除敏感信息
if (typeof result === 'string') {
return this.removeSensitiveInfo(result);
}
if (typeof result === 'object' && result !== null) {
const sanitized: any = Array.isArray(result) ? [] : {};
for (const key of Object.keys(result)) {
if (this.isSensitiveKey(key)) {
sanitized[key] = '[REDACTED]';
} else {
sanitized[key] = this.sanitizeResult(result[key]);
}
}
return sanitized;
}
return result;
}
/**
* 检测数据类型
*/
private detectDataType(data: any): string {
if (data === null || data === undefined) return 'null';
if (typeof data === 'string') {
if (data.startsWith('http')) return 'url';
if (data.includes('\n')) return 'multiline_text';
return 'text';
}
if (typeof data === 'number') return 'number';
if (typeof data === 'boolean') return 'boolean';
if (Array.isArray(data)) return 'array';
if (typeof data === 'object') return 'object';
return 'unknown';
}
/**
* 生成观察摘要
*/
private generateSummary(observations: Observation[]): ObservationSummary {
const successCount = observations.filter(o => o.success).length;
const failCount = observations.length - successCount;
return {
total: observations.length,
successCount,
failCount,
observations,
summary: this.createNaturalLanguageSummary(observations)
};
}
private createNaturalLanguageSummary(observations: Observation[]): string {
const parts: string[] = [];
for (const obs of observations) {
if (obs.success) {
parts.push(`工具 ${obs.toolName} 执行成功`);
} else {
parts.push(`工具 ${obs.toolName} 执行失败: ${obs.error}`);
}
}
return parts.join('; ');
}
private removeSensitiveInfo(text: string): string {
// 移除API密钥、密码等敏感信息
return text
.replace(/sk-[a-zA-Z0-9]{20,}/g, '[API_KEY]')
.replace(/password["']?\s*[:=]\s*["'][^"']+["']/gi, 'password=[REDACTED]');
}
private isSensitiveKey(key: string): boolean {
const sensitiveKeys = ['password', 'secret', 'token', 'apiKey', 'privateKey'];
return sensitiveKeys.some(sk => key.toLowerCase().includes(sk));
}
}
interface ObservationSummary {
total: number;
successCount: number;
failCount: number;
observations: Observation[];
summary: string;
}
更多推荐


所有评论(0)