突破协作瓶颈:Automa实时工作流编辑功能开发指南

【免费下载链接】automa A browser extension for automating your browser by connecting blocks 【免费下载链接】automa 项目地址: https://gitcode.com/gh_mirrors/au/automa

你是否还在为团队协作编辑工作流时的版本冲突而烦恼?是否经历过因无法实时同步修改导致的重复劳动?本文将带你从零开始实现基于WebRTC的Automa实时协作编辑功能,让多人协作编辑工作流如同面对面操作般流畅。

读完本文你将获得:

  • 掌握WebRTC在浏览器扩展中的集成方法
  • 实现工作流节点实时同步的核心技术
  • 解决多人编辑冲突的实用策略
  • 完整的功能开发步骤与代码示例

功能架构概览

Automa作为一款通过连接模块实现浏览器自动化的扩展,其工作流数据管理核心在src/stores/workflow.js中实现。该文件使用Pinia状态管理库,通过useWorkflowStore定义了工作流的CRUD操作、状态管理和持久化逻辑。

实时协作功能将基于以下架构实现:

mermaid

核心实现将涉及三个关键部分:

  1. WebRTC连接管理模块
  2. 操作变换(OT)算法实现
  3. 工作流编辑器状态同步

准备工作与依赖安装

首先需要安装WebRTC相关依赖,在项目根目录执行:

npm install simple-peer webrtc-adapter

创建WebRTC服务模块文件:

mkdir -p src/service/webrtc
touch src/service/webrtc/WebRTCSevice.js
touch src/service/webrtc/SignalingService.js

核心模块实现

1. WebRTC服务实现

创建src/service/webrtc/WebRTCSevice.js文件,实现P2P连接管理:

import Peer from 'simple-peer';
import adapter from 'webrtc-adapter';
import EventEmitter from 'events';

export default class WebRTCSevice extends EventEmitter {
  constructor() {
    super();
    this.peers = new Map();
    this.isInitiator = false;
    this.roomId = null;
  }

  // 初始化P2P连接
  init(roomId, isInitiator = false) {
    this.roomId = roomId;
    this.isInitiator = isInitiator;
    
    // 监听信令服务消息
    this.signaling = new SignalingService(roomId);
    this.signaling.on('signal', (data) => this.handleSignal(data));
    this.signaling.on('peer-disconnected', (peerId) => this.handlePeerDisconnect(peerId));
  }

  // 处理信令消息
  handleSignal({ peerId, signal }) {
    if (!this.peers.has(peerId)) {
      this.createPeer(peerId);
    }
    
    const peer = this.peers.get(peerId);
    peer.signal(signal);
  }

  // 创建新的P2P连接
  createPeer(peerId) {
    const peer = new Peer({
      initiator: this.isInitiator,
      trickle: false,
      config: {
        iceServers: [
          { urls: 'stun:stun.l.google.com:19302' },
          { urls: 'stun:stun1.l.google.com:19302' }
        ]
      }
    });

    // 监听数据通道事件
    peer.on('signal', (signal) => {
      this.signaling.sendSignal(peerId, signal);
    });

    peer.on('connect', () => {
      this.emit('peer-connected', peerId);
    });

    peer.on('data', (data) => {
      this.handleData(peerId, JSON.parse(data.toString()));
    });

    peer.on('error', (err) => {
      console.error('WebRTC error:', err);
      this.emit('error', err);
    });

    peer.on('close', () => {
      this.peers.delete(peerId);
      this.emit('peer-disconnected', peerId);
    });

    this.peers.set(peerId, peer);
    return peer;
  }

  // 处理接收到的数据
  handleData(peerId, data) {
    switch (data.type) {
      case 'workflow-operation':
        this.emit('workflow-operation', { peerId, operation: data.payload });
        break;
      case 'cursor-position':
        this.emit('cursor-update', { peerId, position: data.payload });
        break;
      default:
        console.warn('Unknown data type:', data.type);
    }
  }

  // 发送工作流操作
  sendWorkflowOperation(operation) {
    const data = JSON.stringify({
      type: 'workflow-operation',
      payload: operation,
      timestamp: Date.now()
    });

    this.broadcast(data);
  }

  // 广播消息到所有连接的对等端
  broadcast(data) {
    this.peers.forEach(peer => {
      if (peer.connected) {
        peer.send(data);
      }
    });
  }

  // 关闭所有连接
  destroy() {
    this.peers.forEach(peer => {
      peer.destroy();
    });
    this.peers.clear();
    this.signaling.disconnect();
  }
}

2. 信令服务实现

创建src/service/webrtc/SignalingService.js文件,处理连接协调:

import EventEmitter from 'events';

export default class SignalingService extends EventEmitter {
  constructor(roomId) {
    super();
    this.roomId = roomId;
    this.socket = null;
    this.peerId = this.generatePeerId();
  }

  // 生成唯一的对等端ID
  generatePeerId() {
    return Math.random().toString(36).substring(2, 10);
  }

  // 连接到信令服务器
  connect() {
    // 在实际实现中,这里应该连接到你的信令服务器
    // 此处使用localStorage模拟信令交换
    this.setupLocalStorageSignaling();
    this.emit('connected', this.peerId);
  }

  // 使用localStorage模拟信令交换(仅用于开发测试)
  setupLocalStorageSignaling() {
    const storageKey = `automa-signaling-${this.roomId}`;
    
    // 初始化房间
    const roomData = JSON.parse(localStorage.getItem(storageKey) || '{}');
    if (!roomData.peers) roomData.peers = {};
    roomData.peers[this.peerId] = { online: true };
    localStorage.setItem(storageKey, JSON.stringify(roomData));
    
    // 监听存储变化
    window.addEventListener('storage', (e) => {
      if (e.key !== storageKey) return;
      
      try {
        const newData = JSON.parse(e.newValue || '{}');
        const oldData = JSON.parse(e.oldValue || '{}');
        
        // 检测新加入的对等端
        Object.keys(newData.peers || {}).forEach(peerId => {
          if (!oldData.peers || !oldData.peers[peerId]) {
            if (peerId !== this.peerId) {
              this.sendOffer(peerId);
            }
          }
        });
        
        // 检测信号消息
        if (newData.signals && newData.signals[this.peerId]) {
          const signal = newData.signals[this.peerId];
          this.emit('signal', {
            peerId: signal.from,
            signal: signal.data
          });
          
          // 清除已处理的信号
          newData.signals[this.peerId] = null;
          localStorage.setItem(storageKey, JSON.stringify(newData));
        }
      } catch (err) {
        console.error('Signaling error:', err);
      }
    });
  }
  
  // 发送信号
  sendSignal(toPeerId, signalData) {
    const storageKey = `automa-signaling-${this.roomId}`;
    const roomData = JSON.parse(localStorage.getItem(storageKey) || '{}');
    
    if (!roomData.signals) roomData.signals = {};
    roomData.signals[toPeerId] = {
      from: this.peerId,
      data: signalData,
      timestamp: Date.now()
    };
    
    localStorage.setItem(storageKey, JSON.stringify(roomData));
  }
  
  // 断开连接
  disconnect() {
    const storageKey = `automa-signaling-${this.roomId}`;
    const roomData = JSON.parse(localStorage.getItem(storageKey) || '{}');
    
    if (roomData.peers && roomData.peers[this.peerId]) {
      roomData.peers[this.peerId].online = false;
      localStorage.setItem(storageKey, JSON.stringify(roomData));
    }
  }
}

3. 工作流状态同步

修改src/stores/workflow.js,添加协作编辑支持:

// 在文件顶部导入WebRTC服务
import WebRTCSevice from '@/service/webrtc/WebRTCSevice';
import { v4 as uuidv4 } from 'uuid';

// 在state中添加协作相关状态
state: () => ({
  // ... 现有状态
  collaboration: {
    active: false,
    roomId: null,
    userId: null,
    peers: [],
    webrtcService: null,
    operationQueue: [],
    isProcessing: false,
    version: 0
  }
}),

// 添加协作相关actions
actions: {
  // ... 现有方法
  
  // 初始化协作会话
  initCollaboration({ roomId, userId }) {
    if (this.collaboration.active) {
      this.endCollaboration();
    }
    
    this.collaboration.active = true;
    this.collaboration.roomId = roomId;
    this.collaboration.userId = userId;
    
    // 创建WebRTC服务实例
    const webrtcService = new WebRTCSevice();
    webrtcService.init(roomId);
    
    // 监听WebRTC事件
    webrtcService.on('peer-connected', (peerId) => {
      this.collaboration.peers.push(peerId);
      this.$patch({ collaboration: { peers: [...this.collaboration.peers] } });
      
      // 发送当前工作流状态给新连接的对等端
      this.broadcastWorkflowState();
    });
    
    webrtcService.on('peer-disconnected', (peerId) => {
      this.collaboration.peers = this.collaboration.peers.filter(id => id !== peerId);
      this.$patch({ collaboration: { peers: [...this.collaboration.peers] } });
    });
    
    webrtcService.on('workflow-operation', ({ peerId, operation }) => {
      this.processRemoteOperation(operation);
    });
    
    this.collaboration.webrtcService = webrtcService;
  },
  
  // 结束协作会话
  endCollaboration() {
    if (this.collaboration.webrtcService) {
      this.collaboration.webrtcService.destroy();
      this.collaboration.webrtcService = null;
    }
    
    this.$patch({
      collaboration: {
        active: false,
        roomId: null,
        peers: [],
        operationQueue: []
      }
    });
  },
  
  // 广播工作流状态
  broadcastWorkflowState() {
    if (!this.collaboration.active || !this.collaboration.webrtcService) return;
    
    const stateData = {
      workflows: this.workflows,
      version: this.collaboration.version,
      timestamp: Date.now()
    };
    
    this.collaboration.webrtcService.broadcast(JSON.stringify({
      type: 'workflow-state',
      payload: stateData
    }));
  },
  
  // 广播操作
  broadcastOperation(operation) {
    if (!this.collaboration.active || !this.collaboration.webrtcService) return;
    
    // 添加操作元数据
    const enrichedOp = {
      ...operation,
      id: uuidv4(),
      userId: this.collaboration.userId,
      timestamp: Date.now(),
      baseVersion: this.collaboration.version
    };
    
    // 先本地应用操作
    this.applyOperation(enrichedOp);
    
    // 广播操作
    this.collaboration.webrtcService.broadcast(JSON.stringify({
      type: 'workflow-operation',
      payload: enrichedOp
    }));
  },
  
  // 处理远程操作
  processRemoteOperation(operation) {
    if (this.collaboration.version > operation.baseVersion) {
      // 版本冲突,需要合并操作
      this.collaboration.operationQueue.push(operation);
      this.processOperationQueue();
    } else {
      // 直接应用操作
      this.applyOperation(operation);
    }
  },
  
  // 应用操作
  applyOperation(operation) {
    switch (operation.type) {
      case 'add-node':
        this.addCollaborativeNode(operation.data);
        break;
      case 'update-node':
        this.updateCollaborativeNode(operation.data.id, operation.data.changes);
        break;
      case 'delete-node':
        this.deleteCollaborativeNode(operation.data.id);
        break;
      case 'add-edge':
        this.addCollaborativeEdge(operation.data);
        break;
      case 'delete-edge':
        this.deleteCollaborativeEdge(operation.data.id);
        break;
    }
    
    // 更新版本号
    this.collaboration.version++;
  },
  
  // 处理操作队列,解决冲突
  processOperationQueue() {
    if (this.collaboration.isProcessing) return;
    
    this.collaboration.isProcessing = true;
    
    // 按时间戳排序操作
    this.collaboration.operationQueue.sort((a, b) => a.timestamp - b.timestamp);
    
    // 逐个处理操作
    while (this.collaboration.operationQueue.length > 0) {
      const op = this.collaboration.operationQueue[0];
      
      if (op.baseVersion <= this.collaboration.version) {
        // 可以安全应用
        this.applyOperation(op);
        this.collaboration.operationQueue.shift();
      } else {
        // 仍有冲突,等待更多操作
        break;
      }
    }
    
    this.collaboration.isProcessing = false;
  },
  
  // 协作添加节点
  addCollaborativeNode(nodeData) {
    const workflowId = nodeData.workflowId;
    if (!this.workflows[workflowId]) return;
    
    // 确保节点ID唯一
    const uniqueNodeId = `collab-${uuidv4()}`;
    const node = { ...nodeData, id: uniqueNodeId };
    
    // 更新工作流
    if (!this.workflows[workflowId].drawflow.nodes) {
      this.workflows[workflowId].drawflow.nodes = [];
    }
    
    this.workflows[workflowId].drawflow.nodes.push(node);
    this.saveToStorage('workflows');
  },
  
  // 其他协作方法...
}

4. 编辑器集成

修改工作流编辑器组件src/components/newtab/workflow/WorkflowEditor.vue,添加协作编辑支持:

<template>
  <!-- 现有编辑器代码 -->
  
  <!-- 添加协作状态显示 -->
  <div v-if="collaborationActive" class="absolute top-4 right-4 z-10 flex items-center gap-2">
    <div class="flex -space-x-2">
      <div v-for="peerId in collaborationPeers" :key="peerId" class="w-6 h-6 rounded-full bg-blue-500 border-2 border-white"></div>
      <div class="w-6 h-6 rounded-full bg-green-500 border-2 border-white flex items-center justify-center text-white text-xs">
        You
      </div>
    </div>
    <span class="text-sm bg-black/50 text-white px-2 py-1 rounded">{{ collaborationPeers.length + 1 }} online</span>
  </div>
</template>

<script setup>
// 导入协作相关状态
import { useWorkflowStore } from '@/stores/workflow';

// 获取协作状态
const workflowStore = useWorkflowStore();
const collaborationActive = computed(() => workflowStore.collaboration.active);
const collaborationPeers = computed(() => workflowStore.collaboration.peers);

// 修改操作方法,添加协作支持
function updateBlockData(nodeId, data = {}) {
  if (isDisabled.value) return;
  
  const node = editor.findNode(nodeId);
  node.data = { ...node.data, ...data };
  
  emit('update:node', node);
  
  // 如果处于协作模式,广播操作
  if (collaborationActive.value) {
    workflowStore.broadcastOperation({
      type: 'update-node',
      data: {
        id: nodeId,
        workflowId: props.workflowId,
        changes: data
      }
    });
  }
}

// 同样修改deleteBlock, onConnect等方法...

// 添加协作控制方法
function startCollaborationSession() {
  const roomId = prompt('Enter collaboration room ID:');
  if (!roomId) return;
  
  const userId = `user-${Math.random().toString(36).substring(2, 8)}`;
  workflowStore.initCollaboration({ roomId, userId });
}

function endCollaborationSession() {
  workflowStore.endCollaboration();
}
</script>

<style>
/* 添加协作相关样式 */
.collaboration-cursor {
  position: absolute;
  pointer-events: none;
  transition: transform 0.1s ease;
  z-index: 1000;
}

.collaboration-cursor::after {
  content: attr(data-user);
  position: absolute;
  top: -20px;
  left: 0;
  background: rgba(0, 0, 0, 0.7);
  color: white;
  padding: 2px 6px;
  border-radius: 3px;
  font-size: 12px;
  white-space: nowrap;
}
</style>

协作编辑流程

工作流协作编辑的完整流程如下:

  1. 创建/加入房间:用户创建或加入协作房间,获取唯一roomId
  2. 建立P2P连接:通过信令服务发现其他对等端,建立WebRTC连接
  3. 初始状态同步:新加入者获取当前最新的工作流状态
  4. 实时操作同步:所有编辑操作实时广播给其他对等端
  5. 冲突解决:通过操作变换算法处理并发编辑冲突
  6. 状态维护:保持版本号同步,确保协作一致性

工作流协作流程

测试与调试

创建协作测试页面src/newtab/pages/CollaborationTest.vue:

<template>
  <div class="p-6">
    <h1 class="text-2xl font-bold mb-6">Automa Collaboration Test</h1>
    
    <div class="mb-6">
      <input v-model="roomId" placeholder="Room ID" class="p-2 border rounded mr-2">
      <button @click="startCollaboration" class="p-2 bg-blue-500 text-white rounded">
        Start Collaboration
      </button>
      <button @click="endCollaboration" class="p-2 bg-red-500 text-white rounded ml-2">
        End Collaboration
      </button>
    </div>
    
    <div v-if="collaborationActive" class="mb-6 p-4 bg-green-50 border border-green-200 rounded">
      <p>Collaboration active in room: {{ roomId }}</p>
      <p>Connected peers: {{ peerCount }}</p>
    </div>
    
    <workflow-editor 
      v-if="collaborationActive"
      :workflow-id="testWorkflowId"
    />
  </div>
</template>

<script setup>
import { ref, computed } from 'vue';
import { useWorkflowStore } from '@/stores/workflow';

const roomId = ref(`test-${Math.random().toString(36).substring(2, 8)}`);
const workflowStore = useWorkflowStore();
const collaborationActive = computed(() => workflowStore.collaboration.active);
const peerCount = computed(() => workflowStore.collaboration.peers.length);
const testWorkflowId = ref(null);

// 创建测试工作流
async function createTestWorkflow() {
  const workflows = await workflowStore.insert({
    name: 'Collaboration Test Workflow',
    description: 'Test workflow for collaboration feature',
    drawflow: {
      nodes: [],
      edges: []
    }
  });
  
  testWorkflowId.value = Object.keys(workflows)[0];
}

function startCollaboration() {
  if (!roomId.value) return;
  
  createTestWorkflow().then(() => {
    const userId = `user-${Math.random().toString(36).substring(2, 8)}`;
    workflowStore.initCollaboration({ roomId: roomId.value, userId });
  });
}

function endCollaboration() {
  workflowStore.endCollaboration();
}
</script>

部署与使用

协作功能实现后,需要部署信令服务器(生产环境不应使用localStorage模拟)。可以使用简单的Node.js信令服务器:

# 创建信令服务器目录
mkdir -p server/signaling
cd server/signaling
npm init -y
npm install ws

# 创建server.js
cat > server.js << 'EOF'
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

const rooms = new Map();

wss.on('connection', (ws) => {
  let currentRoom = null;
  let peerId = null;

  ws.on('message', (data) => {
    try {
      const message = JSON.parse(data.toString());
      
      switch (message.type) {
        case 'join':
          currentRoom = message.roomId;
          peerId = message.peerId;
          
          if (!rooms.has(currentRoom)) {
            rooms.set(currentRoom, new Map());
          }
          
          const room = rooms.get(currentRoom);
          room.set(peerId, ws);
          
          // 通知房间内其他对等端
          room.forEach((client, id) => {
            if (id !== peerId && client.readyState === WebSocket.OPEN) {
              client.send(JSON.stringify({
                type: 'peer-joined',
                peerId
              }));
            }
          });
          
          break;
          
        case 'signal':
          if (!currentRoom || !peerId) return;
          
          const room = rooms.get(currentRoom);
          if (!room) return;
          
          const targetWs = room.get(message.targetPeerId);
          if (targetWs && targetWs.readyState === WebSocket.OPEN) {
            targetWs.send(JSON.stringify({
              type: 'signal',
              from: peerId,
              data: message.signal
            }));
          }
          
          break;
      }
    } catch (err) {
      console.error('Signaling server error:', err);
    }
  });

  ws.on('close', () => {
    if (currentRoom && peerId && rooms.has(currentRoom)) {
      const room = rooms.get(currentRoom);
      room.delete(peerId);
      
      // 通知其他对等端
      room.forEach((client, id) => {
        if (client.readyState === WebSocket.OPEN) {
          client.send(JSON.stringify({
            type: 'peer-left',
            peerId
          }));
        }
      });
      
      // 清理空房间
      if (room.size === 0) {
        rooms.delete(currentRoom);
      }
    }
  });
});

console.log('Signaling server running on ws://localhost:8080');
EOF

# 启动服务器
node server.js

总结与展望

通过本文实现的WebRTC集成方案,Automa现在支持多用户实时协作编辑工作流,主要优势包括:

  • 低延迟:P2P直连减少延迟,提供流畅协作体验
  • 去中心化:无需中心服务器存储工作流状态
  • 冲突解决:通过操作变换算法处理并发编辑冲突
  • 易于集成:模块化设计,与现有工作流系统无缝集成

未来可以进一步优化的方向:

  1. 添加用户在线状态和光标位置显示
  2. 实现操作历史记录和撤销/重做功能
  3. 添加权限控制,支持只读/编辑权限设置
  4. 优化冲突解决算法,处理更复杂的编辑场景

要开始使用协作功能,只需克隆仓库并按照README中的指南进行安装:

git clone https://gitcode.com/gh_mirrors/au/automa
cd automa
npm install
npm run dev

现在,你已经掌握了在Automa中实现实时协作编辑功能的完整方案。这个功能将极大提升团队协作效率,让工作流开发过程更加顺畅和高效。

希望本文对你有所帮助,如有任何问题或建议,请在项目仓库提交issue或PR。

【免费下载链接】automa A browser extension for automating your browser by connecting blocks 【免费下载链接】automa 项目地址: https://gitcode.com/gh_mirrors/au/automa

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐