mediasoup源码走读(五)——RTP流处理
fill:#333;color:#333;color:#333;fill:none;important;important;important;important;发送RTP包存储RTP包遍历Consumer处理RTP包发送RTP包检测丢包转发请求RTX发送RTX包转发推流客户端RouterProducerConsumer观看客户端NACK请求完整生命周期推流客户端发送RTP包 → WebRtcT
·
🧩 5.1、整体架构图
说明:
- Worker进程:C++子进程,负责媒体处理(ICE/DTLS/RTP)
- Router:媒体流逻辑中枢,维护Producer-Consumer关系
- Producer:媒体源(推流端),存储并管理RTP流
- Consumer:媒体接收端(观看端),请求并消费RTP流
- WebRtcTransport:网络传输层,处理RTP/RTCP协议
📦 5.2、RTP包流转全流程
🌟 关键流程图(含函数名中文注释)
说明:
sendRtpPacket():推流客户端发送RTP包onTransportProducerRtpPacketReceived():Worker接收RTP包后触发addRtpPacket():Producer存储RTP包onRtpPacket():Consumer处理RTP包sendRtpPacket():Consumer转发RTP包给观看端
⏱️ 5.3、关键时序图
说明:
onTransportProducerRtpPacketReceived():Worker接收RTP包的核心入口addRtpPacket():Producer存储RTP包,用于RTX重传mapProducerConsumers[producer]:Router维护的Producer-Consumer映射onRtpPacket():Consumer处理RTP包的核心方法
📊 5.4、关键类图关联
说明:
mapProducerConsumers:Router维护的Producer-Consumer映射(一对多)mapConsumerProducer:Router维护的Consumer-Producer映射(一对一)RtpStream:RTP流管理单元,存储RTP包队列addPacket():RtpStream存储RTP包
🔧 5.5、关键代码片段
5.5.1. Router核心处理逻辑(C++层)
// 文件: src/Router.cpp
/**
* 处理接收到的RTP包(核心入口函数)
* @param producer 关联的Producer
* @param packet 接收到的RTP包
*/
void Router::onTransportProducerRtpPacketReceived(Producer* producer, RtpPacket* packet) {
// 1. 检查是否为RTX包(重传包)
if (packet->isRtx()) {
handleRtxPacket(producer, packet); // 处理RTX重传包
return;
}
// 2. 检查RTP包是否为新流(SSRC首次出现)
if (!producer->hasRtpStream(packet->ssrc())) {
// 2.1 创建新的RTP流
RtpStream* rtpStream = new RtpStream(packet->ssrc(), packet);
producer->addRtpStream(rtpStream);
// 2.2 通知所有关联的Consumer新流已建立
auto& consumers = mapProducerConsumers[producer];
for (auto* consumer : consumers) {
consumer->onNewRtpStream(rtpStream); // Consumer处理新流
}
}
// 3. 将RTP包添加到Producer的RTP流中
producer->addRtpPacket(packet);
// 4. 遍历所有关联的Consumer,转发RTP包
auto& consumers = mapProducerConsumers[producer];
for (auto* consumer : consumers) {
consumer->onRtpPacket(packet); // Consumer处理RTP包
}
}
/**
* 处理RTX重传包
* @param producer 关联的Producer
* @param rtxPacket RTX重传包
*/
void Router::handleRtxPacket(Producer* producer, RtpPacket* rtxPacket) {
// 1. 从RTX包中提取原始RTP包信息
RtpPacket* originalPacket = rtxPacket->getOriginalPacket();
// 2. 检查原始包是否已存在(是否已接收过)
if (!producer->hasRtpStream(originalPacket->ssrc())) {
// 2.1 如果原始包不存在,请求重传
producer->requestRtx(originalPacket->ssrc(), originalPacket->sequenceNumber());
return;
}
// 3. 将原始RTP包转发给所有关联的Consumer
auto& consumers = mapProducerConsumers[producer];
for (auto* consumer : consumers) {
consumer->onRtpPacket(originalPacket); // 转发原始包
}
}
/**
* 新Producer创建时的处理
* @param transport 关联的Transport
* @param producer 新创建的Producer
*/
void Router::onTransportNewProducer(Transport* transport, Producer* producer) {
// 1. 将Producer添加到Router的Producer列表
producerMap[producer->id()] = producer;
// 2. 初始化Producer的Consumer集合
mapProducerConsumers[producer] = std::unordered_set<Consumer*>();
// 3. 通知应用层(可选)
emit("producer", producer);
}
/**
* 新Consumer创建时的处理
* @param transport 关联的Transport
* @param consumer 新创建的Consumer
* @param producerId 要消费的Producer ID
*/
void Router::onTransportNewConsumer(Transport* transport, Consumer* consumer, const std::string& producerId) {
// 1. 查找目标Producer
auto producerIt = producerMap.find(producerId);
if (producerIt == producerMap.end()) {
throw std::runtime_error("Producer not found: " + producerId);
}
Producer* producer = producerIt->second;
// 2. 建立Consumer与Producer的关联
mapConsumerProducer[consumer] = producer;
mapProducerConsumers[producer].insert(consumer);
// 3. 通知应用层(可选)
emit("consumer", consumer);
// 4. 如果Producer已有数据,立即发送
if (producer->isRtpStreamActive()) {
producer->sendRtpStreamToConsumer(consumer);
}
}
5.5.2. Producer核心处理逻辑(C++层)
// 文件: src/Producer.cpp
/**
* 添加RTP包到Producer的RTP流
* @param packet 要添加的RTP包
*/
void Producer::addRtpPacket(RtpPacket* packet) {
// 1. 获取或创建RTP流
RtpStream* rtpStream = getOrCreateRtpStream(packet->ssrc());
// 2. 将RTP包添加到RTP流队列
rtpStream->addPacket(packet);
// 3. 更新统计信息
updateStatistics(packet);
}
/**
* 获取或创建RTP流
* @param ssrc RTP流的SSRC
* @return RtpStream指针
*/
RtpStream* Producer::getOrCreateRtpStream(uint32_t ssrc) {
// 1. 检查是否已存在该SSRC的RTP流
auto it = rtpStreamMap.find(ssrc);
if (it != rtpStreamMap.end()) {
return it->second;
}
// 2. 创建新的RTP流
RtpStream* rtpStream = new RtpStream(ssrc);
rtpStreamMap[ssrc] = rtpStream;
return rtpStream;
}
/**
* 请求RTX重传(当Consumer请求重传时)
* @param ssrc RTP流的SSRC
* @param sequenceNumber 要重传的序列号
*/
void Producer::requestRtx(uint32_t ssrc, uint16_t sequenceNumber) {
// 1. 获取RTP流
auto it = rtpStreamMap.find(ssrc);
if (it == rtpStreamMap.end()) {
return; // 不存在该流
}
RtpStream* rtpStream = it->second;
// 2. 检查序列号是否在范围内
if (sequenceNumber < rtpStream->getFirstSequenceNumber() ||
sequenceNumber > rtpStream->getLastSequenceNumber()) {
return; // 序列号超出范围
}
// 3. 获取要重传的包
RtpPacket* packet = rtpStream->getPacket(sequenceNumber);
if (!packet) {
return; // 未找到包
}
// 4. 创建RTX包并发送
RtpPacket* rtxPacket = createRtxPacket(packet);
sendRtxPacket(rtxPacket);
}
/**
* 发送RTP流到Consumer(用于新Consumer建立连接)
* @param consumer 要发送的Consumer
*/
void Producer::sendRtpStreamToConsumer(Consumer* consumer) {
// 1. 遍历所有RTP流
for (auto& pair : rtpStreamMap) {
RtpStream* rtpStream = pair.second;
// 2. 获取当前RTP流的最新包
RtpPacket* latestPacket = rtpStream->getLastPacket();
if (latestPacket) {
// 3. 发送最新包给Consumer
consumer->onRtpPacket(latestPacket);
}
}
}
5.5.3. Consumer核心处理逻辑(C++层)
// 文件: src/Consumer.cpp
/**
* 处理接收到的RTP包
* @param packet 接收到的RTP包
*/
void Consumer::onRtpPacket(RtpPacket* packet) {
// 1. 更新接收统计信息
updateStatistics(packet);
// 2. 检查是否需要NACK(丢包重传请求)
if (isPacketLost(packet->sequenceNumber)) {
requestNack(packet->sequenceNumber()); // 请求重传
}
// 3. 将RTP包转发给WebRtcTransport
transport->sendRtpPacket(packet);
}
/**
* 检查序列号是否丢失
* @param sequenceNumber 要检查的序列号
* @return 是否丢失
*/
bool Consumer::isPacketLost(uint16_t sequenceNumber) {
// 1. 检查是否是第一个包
if (firstSequenceNumber == -1) {
firstSequenceNumber = sequenceNumber;
return false;
}
// 2. 检查序列号是否连续
if (sequenceNumber == nextExpectedSequenceNumber) {
nextExpectedSequenceNumber = (sequenceNumber + 1) % 65536;
return false;
}
// 3. 如果序列号不是下一个,说明有丢包
return true;
}
/**
* 请求NACK重传(发送NACK请求给Producer)
* @param sequenceNumber 丢失的序列号
*/
void Consumer::requestNack(uint16_t sequenceNumber) {
// 1. 创建NACK包
RtcpPacket* nackPacket = createNackPacket(sequenceNumber);
// 2. 发送NACK包到Producer
transport->sendRtcpPacket(nackPacket);
}
/**
* 处理新RTP流(当Producer有新流时)
* @param rtpStream 新的RTP流
*/
void Consumer::onNewRtpStream(RtpStream* rtpStream) {
// 1. 将RTP流添加到Consumer的RTP流集合
rtpStreamMap[rtpStream->ssrc()] = rtpStream;
// 2. 通知应用层
emit("newRtpStream", rtpStream);
}
5.5.4. RTP流管理核心逻辑(C++层)
// 文件: src/RtpStream.cpp
/**
* RtpStream构造函数
* @param ssrc RTP流的SSRC
* @param firstPacket 首个RTP包
*/
RtpStream::RtpStream(uint32_t ssrc, RtpPacket* firstPacket)
: ssrc(ssrc),
firstSequenceNumber(firstPacket->sequenceNumber()),
lastSequenceNumber(firstPacket->sequenceNumber()) {
// 1. 将首个包加入队列
packetList.push(firstPacket);
}
/**
* 添加RTP包到流
* @param packet 要添加的RTP包
*/
void RtpStream::addPacket(RtpPacket* packet) {
// 1. 检查序列号是否连续
if (packet->sequenceNumber() == lastSequenceNumber + 1) {
// 2. 序列号连续,添加到队列末尾
packetList.push(packet);
lastSequenceNumber = packet->sequenceNumber();
} else if (packet->sequenceNumber() > lastSequenceNumber) {
// 3. 序列号跳跃,可能有丢包
// 4. 但不处理,先存储
packetList.push(packet);
lastSequenceNumber = packet->sequenceNumber();
} else {
// 5. 序列号回绕(超过65535)
if (packet->sequenceNumber() < firstSequenceNumber) {
// 6. 重传包,直接覆盖
packetList.push(packet);
}
}
}
/**
* 获取指定序列号的RTP包
* @param sequenceNumber 要获取的序列号
* @return RtpPacket指针,若不存在返回nullptr
*/
RtpPacket* RtpStream::getPacket(uint16_t sequenceNumber) {
// 1. 遍历包队列
for (auto& packet : packetList) {
if (packet->sequenceNumber() == sequenceNumber) {
return packet;
}
}
return nullptr;
}
/**
* 获取最新RTP包
* @return 最新RTP包指针
*/
RtpPacket* RtpStream::getLastPacket() {
if (!packetList.empty()) {
return packetList.back();
}
return nullptr;
}
🌟 5.6、关键机制解析
5.6.1. RTX(重传)机制
工作流程:
- Consumer检测到丢包(序列号不连续)
- Consumer发送NACK请求给Router
- Router将NACK转发给Producer
- Producer生成RTX包(包含原始包信息)
- Router将RTX包转发给Consumer
5.6.2. NACK(丢包重传请求)机制
关键点:
- NACK是Consumer向Producer请求重传的机制
- Router作为中介转发NACK请求
- Producer生成RTX包(重传包)并发送
5.6.3. 流量控制机制
// 在Consumer::onRtpPacket()中
void Consumer::onRtpPacket(RtpPacket* packet) {
// 1. 更新接收统计
updateStatistics(packet);
// 2. 检查是否超过带宽限制
if (isBandwidthExceeded()) {
// 3. 请求降级(如降低码率)
requestBandwidthReduction();
return;
}
// 4. 正常转发
transport->sendRtpPacket(packet);
}
机制说明:
- Consumer实时监控接收带宽
- 超过阈值时请求Producer降级
- 通过RTCP反馈机制实现
📊 5.7、关键数据结构关系图
关系说明:
mapProducerConsumers:Router维护Producer-Consumer映射(一对多)mapConsumerProducer:Router维护Consumer-Producer映射(一对一)rtpStreamMap:Producer/Consumer存储RTP流packetList:RtpStream存储RTP包队列
💡 5.8、这样设计的优势
-
选择性转发:Router仅转发Consumer需要的RTP包,避免全量复制
- 4人视频会议:4个Producer → 4×3=12个RTP包 → 实际转发12个包(不是48个)
-
RTX重传优化:只重传丢失的包,而非整个流
- 丢包率10% → 仅重传10%的包,而非100%
-
内存高效:RtpStream仅存储最近的RTP包(滑动窗口)
- 通常只存储100个包(约2秒视频数据)
-
事件驱动:通过
EnhancedEventEmitter实现解耦- Producer、Consumer、Router之间通过事件通信
🌟 5.9、总结:RTP流处理全生命周期
完整生命周期:
- 推流客户端发送RTP包 → WebRtcTransport
- Router接收RTP包,存储到Producer
- Router遍历Consumer列表,转发RTP包
- Consumer处理RTP包,检测丢包
- Consumer请求NACK重传 → Router转发
- Producer生成RTX包 → Router转发给Consumer
- 观看客户端接收RTP包并渲染
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)