mediasoup源码走读(十三)——Transport
摘要: Mediasoup的Transport模块作为核心网络传输通道,实现了WebRTC/PlainRTP/Pipe等协议的抽象统一,提供ICE/DTLS协商、带宽自适应等能力。其工作流程包含创建、连接、媒体传输三个阶段,通过Worker进程管理Transport实例,与Router协同实现媒体路由。关键交互包括:Transport与Worker的IPC通信、Transport在Router的注
·
13.1、Transport 核心定位与模块交互
Transport 是 Mediasoup 的"网络传输通道",其核心价值体现在:
- 网络抽象:屏蔽底层协议差异(WebRTC/PlainRTP/Pipe)
- 双向通道:支持客户端与 Router 之间的媒体流传输
- ICE/DTLS协商:建立安全加密传输路径
- 带宽自适应:动态调整传输策略适应网络变化
模块交互全景图:
13.2、端到端工作流程
13.3、关键模块交互代码
13.3.1 Transport 与 Worker 的进程通信
Worker 创建 Transport:
// mediasoup/src/Worker.cpp
Transport* Worker::CreateTransport(
const std::string& type,
const json& data
) {
// 1. 生成唯一ID
std::string id = GenerateId("transport");
// 2. 根据类型创建Transport实例
Transport* transport;
if (type == "webrtc") {
transport = new WebRtcTransport(this, id, data);
} else if (type == "plain-rtp") {
transport = new PlainRtpTransport(this, id, data);
} else if (type == "pipe") {
transport = new PipeTransport(this, id, data);
} else {
throw std::invalid_argument("Unknown transport type");
}
// 3. 注册到Worker管理
transports_.insert({id, transport});
return transport;
}
Transport 与 Worker 的 IPC 通信:
// mediasoup/src/Transport.cpp
void Transport::SendToWorker(const std::string& method, const json& data) {
worker_->SendRequest(method, data);
}
13.3.2 Transport 与 Router 的注册管理
Transport 注册到 Router:
// mediasoup/src/Transport.cpp
void Transport::RegisterWithRouter(Router* router) {
router->AddTransport(this);
}
// mediasoup/src/Router.cpp
void Router::AddTransport(Transport* transport) {
transports_.insert({transport->id, transport});
}
Transport 关闭时通知 Router:
// mediasoup/src/Transport.cpp
void Transport::Close() {
if (isClosed) return;
isClosed = true;
Emit("close");
// 通知Router
if (router) {
router->OnTransportClosed(this);
}
}
Router 处理 Transport 关闭:
// mediasoup/src/Router.cpp
void Router::OnTransportClosed(Transport* transport) {
transports_.erase(transport->id);
SendToWorker("transport-closed", {{"id", transport->id}});
}
13.3.3 Transport 与 Producer 的媒体传输
Producer 发送 RTP 包:
// mediasoup/src/Producer.cpp
void Producer::SendRtpPacket(RtpPacket* packet) {
if (isClosed || isPaused) return;
// 1. 调用Transport发送
transport->SendRtpPacket(packet);
// 2. 通知RtpObserver
if (rtpObserver) {
rtpObserver->OnRtpPacket(packet);
}
}
Transport 发送 RTP 包:
// mediasoup/src/Transport.cpp
void Transport::SendRtpPacket(RtpPacket* packet) {
// 1. 更新统计
stats.totalPackets++;
stats.totalBytes += packet->size();
// 2. 发送到网络
SendToNetwork(packet);
// 3. 通知Producer传输状态
if (producer) {
producer->OnTransportData(packet);
}
}
13.3.4 Transport 与 Consumer 的媒体接收
Transport 接收 RTP 包:
// mediasoup/src/Transport.cpp
void Transport::ReceiveRtpPacket(RtpPacket* packet) {
// 1. 查找对应的Consumer
Consumer* consumer = GetConsumerByRtpPacket(packet);
if (!consumer) {
return; // 未找到Consumer
}
// 2. 转发给Consumer
consumer->OnRtpPacket(packet);
}
Consumer 处理 RTP 包:
// mediasoup/src/Consumer.cpp
void Consumer::OnRtpPacket(RtpPacket* packet) {
// 1. 检查是否已关闭
if (isClosed) return;
// 2. 更新统计
stats.totalPackets++;
stats.totalBytes += packet->size();
// 3. 通知应用层
Emit("rtp", packet);
// 4. 处理RTP包(转换为可播放格式)
ProcessRtpPacket(packet);
}
13.4、Transport 的完整生命周期管理
13.4.1 生命流程图
13.4.2 关键生命周期方法
创建:
// mediasoup/src/WebRtcTransport.cpp
WebRtcTransport::WebRtcTransport(
Worker* worker,
const std::string& id,
const json& data
) :
Transport(worker, id),
iceParameters(data["iceParameters"]),
dtlsParameters(data["dtlsParameters"]),
isClosed(false),
isPaused(false) {
// 1. 初始化ICE/DTLS
InitializeIceDtls();
// 2. 注册到Worker
worker->CreateTransport(this);
}
暂停/恢复:
// mediasoup/src/Transport.cpp
void Transport::Pause() {
if (isPaused) return;
isPaused = true;
Emit("pause");
}
void Transport::Resume() {
if (!isPaused) return;
isPaused = false;
Emit("resume");
}
关闭:
// mediasoup/src/Transport.cpp
void Transport::Close() {
if (isClosed) return;
isClosed = true;
Emit("close");
// 1. 通知Router
if (router) {
router->OnTransportClosed(this);
}
// 2. 通知Worker
worker_->OnTransportClosed(this);
// 3. 清理ICE/DTLS
if (ice) {
ice->Close();
ice = nullptr;
}
if (dtls) {
dtls->Close();
dtls = nullptr;
}
}
13.5、Transport 类型
13.5.1 WebRtcTransport
// mediasoup/src/WebRtcTransport.cpp
void WebRtcTransport::InitializeIceDtls() {
// 1. 创建ICE服务器
ice = new IceServer(iceParameters);
// 2. 创建DTLS连接
dtls = new DtlsConnection(dtlsParameters);
// 3. 绑定ICE和DTLS事件
ice->OnCandidateGathered([this](const IceCandidate& candidate) {
Emit("icecandidate", candidate);
});
dtls->OnConnected([this]() {
Emit("connect");
});
}
13.5.2 PlainRtpTransport
// mediasoup/src/PlainRtpTransport.cpp
void PlainRtpTransport::InitializeUdpSocket() {
// 1. 创建UDP socket
udpSocket = new UdpSocket(localPort);
// 2. 绑定端口
if (!udpSocket->Bind(localPort)) {
throw std::runtime_error("Failed to bind UDP port");
}
}
13.5.3 PipeTransport
// mediasoup/src/PipeTransport.cpp
void PipeTransport::InitializePipe() {
// 1. 创建本地管道
pipe = new LocalPipe(router1, router2);
// 2. 设置管道参数
pipe->SetMaxBitrate(maxBitrate);
}
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)