13.1、Transport 核心定位与模块交互

Transport 是 Mediasoup 的"网络传输通道",其核心价值体现在:

  • 网络抽象:屏蔽底层协议差异(WebRTC/PlainRTP/Pipe)
  • 双向通道:支持客户端与 Router 之间的媒体流传输
  • ICE/DTLS协商:建立安全加密传输路径
  • 带宽自适应:动态调整传输策略适应网络变化

模块交互全景图

创建/管理
注册/管理
发送/接收
媒体流转发
媒体流接收
进程通信
路由管理
网络协议
媒体发布
媒体订阅
Transport
Worker
Router
RTP/RTCP
Producer
Consumer
Network

13.2、端到端工作流程

应用层 Mediasoup Router Transport Producer Consumer Worker进程 Network createTransport({type: 'webrtc'}) 创建Transport实例 注册到Worker管理 注册到Router管理 connect({iceParameters, ...}) 建立ICE连接 ICE连接成功 触发'connect'事件 produce() 创建Producer 注册回调 SendRtpPacket(packet) 发送RTP数据包 接收RTP数据包 OnRtpPacket(packet) 触发track事件 close() 通知关闭 通知关闭 应用层 Mediasoup Router Transport Producer Consumer Worker进程 Network

13.3、关键模块交互代码

13.3.1 Transport 与 Worker 的进程通信

Worker 创建 Transport

// mediasoup/src/Worker.cpp
Transport* Worker::CreateTransport(
  const std::string& type,
  const json& data
) {
  // 1. 生成唯一ID
  std::string id = GenerateId("transport");
  
  // 2. 根据类型创建Transport实例
  Transport* transport;
  if (type == "webrtc") {
    transport = new WebRtcTransport(this, id, data);
  } else if (type == "plain-rtp") {
    transport = new PlainRtpTransport(this, id, data);
  } else if (type == "pipe") {
    transport = new PipeTransport(this, id, data);
  } else {
    throw std::invalid_argument("Unknown transport type");
  }
  
  // 3. 注册到Worker管理
  transports_.insert({id, transport});
  
  return transport;
}

Transport 与 Worker 的 IPC 通信

// mediasoup/src/Transport.cpp
void Transport::SendToWorker(const std::string& method, const json& data) {
  worker_->SendRequest(method, data);
}
13.3.2 Transport 与 Router 的注册管理

Transport 注册到 Router

// mediasoup/src/Transport.cpp
void Transport::RegisterWithRouter(Router* router) {
  router->AddTransport(this);
}

// mediasoup/src/Router.cpp
void Router::AddTransport(Transport* transport) {
  transports_.insert({transport->id, transport});
}

Transport 关闭时通知 Router

// mediasoup/src/Transport.cpp
void Transport::Close() {
  if (isClosed) return;
  
  isClosed = true;
  Emit("close");
  
  // 通知Router
  if (router) {
    router->OnTransportClosed(this);
  }
}

Router 处理 Transport 关闭

// mediasoup/src/Router.cpp
void Router::OnTransportClosed(Transport* transport) {
  transports_.erase(transport->id);
  SendToWorker("transport-closed", {{"id", transport->id}});
}
13.3.3 Transport 与 Producer 的媒体传输

Producer 发送 RTP 包

// mediasoup/src/Producer.cpp
void Producer::SendRtpPacket(RtpPacket* packet) {
  if (isClosed || isPaused) return;
  
  // 1. 调用Transport发送
  transport->SendRtpPacket(packet);
  
  // 2. 通知RtpObserver
  if (rtpObserver) {
    rtpObserver->OnRtpPacket(packet);
  }
}

Transport 发送 RTP 包

// mediasoup/src/Transport.cpp
void Transport::SendRtpPacket(RtpPacket* packet) {
  // 1. 更新统计
  stats.totalPackets++;
  stats.totalBytes += packet->size();
  
  // 2. 发送到网络
  SendToNetwork(packet);
  
  // 3. 通知Producer传输状态
  if (producer) {
    producer->OnTransportData(packet);
  }
}
13.3.4 Transport 与 Consumer 的媒体接收

Transport 接收 RTP 包

// mediasoup/src/Transport.cpp
void Transport::ReceiveRtpPacket(RtpPacket* packet) {
  // 1. 查找对应的Consumer
  Consumer* consumer = GetConsumerByRtpPacket(packet);
  
  if (!consumer) {
    return; // 未找到Consumer
  }
  
  // 2. 转发给Consumer
  consumer->OnRtpPacket(packet);
}

Consumer 处理 RTP 包

// mediasoup/src/Consumer.cpp
void Consumer::OnRtpPacket(RtpPacket* packet) {
  // 1. 检查是否已关闭
  if (isClosed) return;
  
  // 2. 更新统计
  stats.totalPackets++;
  stats.totalBytes += packet->size();
  
  // 3. 通知应用层
  Emit("rtp", packet);
  
  // 4. 处理RTP包(转换为可播放格式)
  ProcessRtpPacket(packet);
}

13.4、Transport 的完整生命周期管理

13.4.1 生命流程图
Creating
Active
Closed
Paused
13.4.2 关键生命周期方法

创建

// mediasoup/src/WebRtcTransport.cpp
WebRtcTransport::WebRtcTransport(
  Worker* worker,
  const std::string& id,
  const json& data
) : 
  Transport(worker, id),
  iceParameters(data["iceParameters"]),
  dtlsParameters(data["dtlsParameters"]),
  isClosed(false),
  isPaused(false) {
  
  // 1. 初始化ICE/DTLS
  InitializeIceDtls();
  
  // 2. 注册到Worker
  worker->CreateTransport(this);
}

暂停/恢复

// mediasoup/src/Transport.cpp
void Transport::Pause() {
  if (isPaused) return;
  isPaused = true;
  Emit("pause");
}

void Transport::Resume() {
  if (!isPaused) return;
  isPaused = false;
  Emit("resume");
}

关闭

// mediasoup/src/Transport.cpp
void Transport::Close() {
  if (isClosed) return;
  
  isClosed = true;
  Emit("close");
  
  // 1. 通知Router
  if (router) {
    router->OnTransportClosed(this);
  }
  
  // 2. 通知Worker
  worker_->OnTransportClosed(this);
  
  // 3. 清理ICE/DTLS
  if (ice) {
    ice->Close();
    ice = nullptr;
  }
  
  if (dtls) {
    dtls->Close();
    dtls = nullptr;
  }
}

13.5、Transport 类型

13.5.1 WebRtcTransport
// mediasoup/src/WebRtcTransport.cpp
void WebRtcTransport::InitializeIceDtls() {
  // 1. 创建ICE服务器
  ice = new IceServer(iceParameters);
  
  // 2. 创建DTLS连接
  dtls = new DtlsConnection(dtlsParameters);
  
  // 3. 绑定ICE和DTLS事件
  ice->OnCandidateGathered([this](const IceCandidate& candidate) {
    Emit("icecandidate", candidate);
  });
  
  dtls->OnConnected([this]() {
    Emit("connect");
  });
}
13.5.2 PlainRtpTransport
// mediasoup/src/PlainRtpTransport.cpp
void PlainRtpTransport::InitializeUdpSocket() {
  // 1. 创建UDP socket
  udpSocket = new UdpSocket(localPort);
  
  // 2. 绑定端口
  if (!udpSocket->Bind(localPort)) {
    throw std::runtime_error("Failed to bind UDP port");
  }
}
13.5.3 PipeTransport
// mediasoup/src/PipeTransport.cpp
void PipeTransport::InitializePipe() {
  // 1. 创建本地管道
  pipe = new LocalPipe(router1, router2);
  
  // 2. 设置管道参数
  pipe->SetMaxBitrate(maxBitrate);
}

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐