🧩 5.1、整体架构图

发送RTP包
接收RTP包
路由决策
维护关系
维护关系
存储流
请求流
转发数据
发送RTP包
推流客户端
WebRtcTransport
Worker进程
Router
Producer
Consumer
WebRtcTransport
观看客户端

说明

  • Worker进程:C++子进程,负责媒体处理(ICE/DTLS/RTP)
  • Router:媒体流逻辑中枢,维护Producer-Consumer关系
  • Producer:媒体源(推流端),存储并管理RTP流
  • Consumer:媒体接收端(观看端),请求并消费RTP流
  • WebRtcTransport:网络传输层,处理RTP/RTCP协议

📦 5.2、RTP包流转全流程

🌟 关键流程图(含函数名中文注释)

推流客户端
WebRtcTransport
Router
Producer
Consumer
WebRtcTransport
观看客户端
sendRtpPacket
onTransportProducerRtpPacketReceived
addRtpPacket
遍历Consumer列表
onRtpPacket

说明

  1. sendRtpPacket():推流客户端发送RTP包
  2. onTransportProducerRtpPacketReceived():Worker接收RTP包后触发
  3. addRtpPacket():Producer存储RTP包
  4. onRtpPacket():Consumer处理RTP包
  5. sendRtpPacket():Consumer转发RTP包给观看端

⏱️ 5.3、关键时序图

推流客户端 WebRtcTransport Worker Router Producer Consumer 观看客户端 sendRtpPacket(rtpPacket) onTransportProducerRtpPacketReceived(producer, rtpPacket) onTransportProducerRtpPacketReceived(producer, rtpPacket) addRtpPacket(rtpPacket) 遍历mapProducerConsumers[producer] onRtpPacket(rtpPacket) sendRtpPacket(rtpPacket) 发送RTP包 loop [每个Consumer] 解码并渲染 推流客户端 WebRtcTransport Worker Router Producer Consumer 观看客户端

说明

  • onTransportProducerRtpPacketReceived():Worker接收RTP包的核心入口
  • addRtpPacket():Producer存储RTP包,用于RTX重传
  • mapProducerConsumers[producer]:Router维护的Producer-Consumer映射
  • onRtpPacket():Consumer处理RTP包的核心方法

📊 5.4、关键类图关联

包含
关联
关联
包含
包含
关联
«interface»
EnhancedEventEmitter
+emit(event, data)
+on(event, callback)
Worker
-channel: Channel 与JS层通信
-routerMap: Map 管理多个Router
+createRouter(options)
+onTransportProducerRtpPacketReceived(producer, packet)
Router
-producerMap: Map 所有Producer
-consumerMap: Map 所有Consumer
-mapProducerConsumers: Map> Producer-Consumer关系
-mapConsumerProducer: Map Consumer-Producer关系
+onTransportProducerRtpPacketReceived(producer, packet)
+onTransportNewProducer(transport, producer)
+onTransportNewConsumer(transport, consumer, producerId)
Producer
-rtpStreamMap: Map 存储RTP流
-producerId: string Producer唯一ID
+addRtpPacket(packet)
+getRtpStream(ssrc)
+requestRtx(ssrc, seq)
Consumer
-rtpStreamMap: Map 存储RTP流
-producer: Producer 关联的Producer
-consumerId: string Consumer唯一ID
+onRtpPacket(packet)
+requestNack(seq)
WebRtcTransport
-localSdp: string 本地SDP
-remoteSdp: string 远端SDP
+sendRtpPacket(packet)
+sendRtcpPacket(packet)
RtpStream
-ssrc: uint32 SSRC
-sequenceNumber: uint16 序列号
-timestamp: uint32 时间戳
-packetList: Queue 包队列
+addPacket(packet)
+getPacket(seq)

说明

  • mapProducerConsumers:Router维护的Producer-Consumer映射(一对多)
  • mapConsumerProducer:Router维护的Consumer-Producer映射(一对一)
  • RtpStream:RTP流管理单元,存储RTP包队列
  • addPacket():RtpStream存储RTP包

🔧 5.5、关键代码片段

5.5.1. Router核心处理逻辑(C++层)

// 文件: src/Router.cpp

/**
 * 处理接收到的RTP包(核心入口函数)
 * @param producer 关联的Producer
 * @param packet 接收到的RTP包
 */
void Router::onTransportProducerRtpPacketReceived(Producer* producer, RtpPacket* packet) {
    // 1. 检查是否为RTX包(重传包)
    if (packet->isRtx()) {
        handleRtxPacket(producer, packet); // 处理RTX重传包
        return;
    }

    // 2. 检查RTP包是否为新流(SSRC首次出现)
    if (!producer->hasRtpStream(packet->ssrc())) {
        // 2.1 创建新的RTP流
        RtpStream* rtpStream = new RtpStream(packet->ssrc(), packet);
        producer->addRtpStream(rtpStream);
        
        // 2.2 通知所有关联的Consumer新流已建立
        auto& consumers = mapProducerConsumers[producer];
        for (auto* consumer : consumers) {
            consumer->onNewRtpStream(rtpStream); // Consumer处理新流
        }
    }

    // 3. 将RTP包添加到Producer的RTP流中
    producer->addRtpPacket(packet);

    // 4. 遍历所有关联的Consumer,转发RTP包
    auto& consumers = mapProducerConsumers[producer];
    for (auto* consumer : consumers) {
        consumer->onRtpPacket(packet); // Consumer处理RTP包
    }
}

/**
 * 处理RTX重传包
 * @param producer 关联的Producer
 * @param rtxPacket RTX重传包
 */
void Router::handleRtxPacket(Producer* producer, RtpPacket* rtxPacket) {
    // 1. 从RTX包中提取原始RTP包信息
    RtpPacket* originalPacket = rtxPacket->getOriginalPacket();
    
    // 2. 检查原始包是否已存在(是否已接收过)
    if (!producer->hasRtpStream(originalPacket->ssrc())) {
        // 2.1 如果原始包不存在,请求重传
        producer->requestRtx(originalPacket->ssrc(), originalPacket->sequenceNumber());
        return;
    }

    // 3. 将原始RTP包转发给所有关联的Consumer
    auto& consumers = mapProducerConsumers[producer];
    for (auto* consumer : consumers) {
        consumer->onRtpPacket(originalPacket); // 转发原始包
    }
}

/**
 * 新Producer创建时的处理
 * @param transport 关联的Transport
 * @param producer 新创建的Producer
 */
void Router::onTransportNewProducer(Transport* transport, Producer* producer) {
    // 1. 将Producer添加到Router的Producer列表
    producerMap[producer->id()] = producer;
    
    // 2. 初始化Producer的Consumer集合
    mapProducerConsumers[producer] = std::unordered_set<Consumer*>();
    
    // 3. 通知应用层(可选)
    emit("producer", producer);
}

/**
 * 新Consumer创建时的处理
 * @param transport 关联的Transport
 * @param consumer 新创建的Consumer
 * @param producerId 要消费的Producer ID
 */
void Router::onTransportNewConsumer(Transport* transport, Consumer* consumer, const std::string& producerId) {
    // 1. 查找目标Producer
    auto producerIt = producerMap.find(producerId);
    if (producerIt == producerMap.end()) {
        throw std::runtime_error("Producer not found: " + producerId);
    }
    Producer* producer = producerIt->second;
    
    // 2. 建立Consumer与Producer的关联
    mapConsumerProducer[consumer] = producer;
    mapProducerConsumers[producer].insert(consumer);
    
    // 3. 通知应用层(可选)
    emit("consumer", consumer);
    
    // 4. 如果Producer已有数据,立即发送
    if (producer->isRtpStreamActive()) {
        producer->sendRtpStreamToConsumer(consumer);
    }
}

5.5.2. Producer核心处理逻辑(C++层)

// 文件: src/Producer.cpp

/**
 * 添加RTP包到Producer的RTP流
 * @param packet 要添加的RTP包
 */
void Producer::addRtpPacket(RtpPacket* packet) {
    // 1. 获取或创建RTP流
    RtpStream* rtpStream = getOrCreateRtpStream(packet->ssrc());
    
    // 2. 将RTP包添加到RTP流队列
    rtpStream->addPacket(packet);
    
    // 3. 更新统计信息
    updateStatistics(packet);
}

/**
 * 获取或创建RTP流
 * @param ssrc RTP流的SSRC
 * @return RtpStream指针
 */
RtpStream* Producer::getOrCreateRtpStream(uint32_t ssrc) {
    // 1. 检查是否已存在该SSRC的RTP流
    auto it = rtpStreamMap.find(ssrc);
    if (it != rtpStreamMap.end()) {
        return it->second;
    }
    
    // 2. 创建新的RTP流
    RtpStream* rtpStream = new RtpStream(ssrc);
    rtpStreamMap[ssrc] = rtpStream;
    return rtpStream;
}

/**
 * 请求RTX重传(当Consumer请求重传时)
 * @param ssrc RTP流的SSRC
 * @param sequenceNumber 要重传的序列号
 */
void Producer::requestRtx(uint32_t ssrc, uint16_t sequenceNumber) {
    // 1. 获取RTP流
    auto it = rtpStreamMap.find(ssrc);
    if (it == rtpStreamMap.end()) {
        return; // 不存在该流
    }
    RtpStream* rtpStream = it->second;
    
    // 2. 检查序列号是否在范围内
    if (sequenceNumber < rtpStream->getFirstSequenceNumber() ||
        sequenceNumber > rtpStream->getLastSequenceNumber()) {
        return; // 序列号超出范围
    }
    
    // 3. 获取要重传的包
    RtpPacket* packet = rtpStream->getPacket(sequenceNumber);
    if (!packet) {
        return; // 未找到包
    }
    
    // 4. 创建RTX包并发送
    RtpPacket* rtxPacket = createRtxPacket(packet);
    sendRtxPacket(rtxPacket);
}

/**
 * 发送RTP流到Consumer(用于新Consumer建立连接)
 * @param consumer 要发送的Consumer
 */
void Producer::sendRtpStreamToConsumer(Consumer* consumer) {
    // 1. 遍历所有RTP流
    for (auto& pair : rtpStreamMap) {
        RtpStream* rtpStream = pair.second;
        
        // 2. 获取当前RTP流的最新包
        RtpPacket* latestPacket = rtpStream->getLastPacket();
        if (latestPacket) {
            // 3. 发送最新包给Consumer
            consumer->onRtpPacket(latestPacket);
        }
    }
}

5.5.3. Consumer核心处理逻辑(C++层)

// 文件: src/Consumer.cpp

/**
 * 处理接收到的RTP包
 * @param packet 接收到的RTP包
 */
void Consumer::onRtpPacket(RtpPacket* packet) {
    // 1. 更新接收统计信息
    updateStatistics(packet);
    
    // 2. 检查是否需要NACK(丢包重传请求)
    if (isPacketLost(packet->sequenceNumber)) {
        requestNack(packet->sequenceNumber()); // 请求重传
    }
    
    // 3. 将RTP包转发给WebRtcTransport
    transport->sendRtpPacket(packet);
}

/**
 * 检查序列号是否丢失
 * @param sequenceNumber 要检查的序列号
 * @return 是否丢失
 */
bool Consumer::isPacketLost(uint16_t sequenceNumber) {
    // 1. 检查是否是第一个包
    if (firstSequenceNumber == -1) {
        firstSequenceNumber = sequenceNumber;
        return false;
    }
    
    // 2. 检查序列号是否连续
    if (sequenceNumber == nextExpectedSequenceNumber) {
        nextExpectedSequenceNumber = (sequenceNumber + 1) % 65536;
        return false;
    }
    
    // 3. 如果序列号不是下一个,说明有丢包
    return true;
}

/**
 * 请求NACK重传(发送NACK请求给Producer)
 * @param sequenceNumber 丢失的序列号
 */
void Consumer::requestNack(uint16_t sequenceNumber) {
    // 1. 创建NACK包
    RtcpPacket* nackPacket = createNackPacket(sequenceNumber);
    
    // 2. 发送NACK包到Producer
    transport->sendRtcpPacket(nackPacket);
}

/**
 * 处理新RTP流(当Producer有新流时)
 * @param rtpStream 新的RTP流
 */
void Consumer::onNewRtpStream(RtpStream* rtpStream) {
    // 1. 将RTP流添加到Consumer的RTP流集合
    rtpStreamMap[rtpStream->ssrc()] = rtpStream;
    
    // 2. 通知应用层
    emit("newRtpStream", rtpStream);
}

5.5.4. RTP流管理核心逻辑(C++层)

// 文件: src/RtpStream.cpp

/**
 * RtpStream构造函数
 * @param ssrc RTP流的SSRC
 * @param firstPacket 首个RTP包
 */
RtpStream::RtpStream(uint32_t ssrc, RtpPacket* firstPacket)
    : ssrc(ssrc),
      firstSequenceNumber(firstPacket->sequenceNumber()),
      lastSequenceNumber(firstPacket->sequenceNumber()) {
    
    // 1. 将首个包加入队列
    packetList.push(firstPacket);
}

/**
 * 添加RTP包到流
 * @param packet 要添加的RTP包
 */
void RtpStream::addPacket(RtpPacket* packet) {
    // 1. 检查序列号是否连续
    if (packet->sequenceNumber() == lastSequenceNumber + 1) {
        // 2. 序列号连续,添加到队列末尾
        packetList.push(packet);
        lastSequenceNumber = packet->sequenceNumber();
    } else if (packet->sequenceNumber() > lastSequenceNumber) {
        // 3. 序列号跳跃,可能有丢包
        // 4. 但不处理,先存储
        packetList.push(packet);
        lastSequenceNumber = packet->sequenceNumber();
    } else {
        // 5. 序列号回绕(超过65535)
        if (packet->sequenceNumber() < firstSequenceNumber) {
            // 6. 重传包,直接覆盖
            packetList.push(packet);
        }
    }
}

/**
 * 获取指定序列号的RTP包
 * @param sequenceNumber 要获取的序列号
 * @return RtpPacket指针,若不存在返回nullptr
 */
RtpPacket* RtpStream::getPacket(uint16_t sequenceNumber) {
    // 1. 遍历包队列
    for (auto& packet : packetList) {
        if (packet->sequenceNumber() == sequenceNumber) {
            return packet;
        }
    }
    return nullptr;
}

/**
 * 获取最新RTP包
 * @return 最新RTP包指针
 */
RtpPacket* RtpStream::getLastPacket() {
    if (!packetList.empty()) {
        return packetList.back();
    }
    return nullptr;
}

🌟 5.6、关键机制解析

5.6.1. RTX(重传)机制

Consumer Router Producer 发送NACK请求 请求RTX重传 发送RTX包 转发RTX包 Consumer Router Producer

工作流程

  1. Consumer检测到丢包(序列号不连续)
  2. Consumer发送NACK请求给Router
  3. Router将NACK转发给Producer
  4. Producer生成RTX包(包含原始包信息)
  5. Router将RTX包转发给Consumer

5.6.2. NACK(丢包重传请求)机制

Consumer Router Producer 检测到丢包 发送NACK包 请求重传 发送RTX包 转发RTX包 Consumer Router Producer

关键点

  • NACK是Consumer向Producer请求重传的机制
  • Router作为中介转发NACK请求
  • Producer生成RTX包(重传包)并发送

5.6.3. 流量控制机制

// 在Consumer::onRtpPacket()中
void Consumer::onRtpPacket(RtpPacket* packet) {
    // 1. 更新接收统计
    updateStatistics(packet);
    
    // 2. 检查是否超过带宽限制
    if (isBandwidthExceeded()) {
        // 3. 请求降级(如降低码率)
        requestBandwidthReduction();
        return;
    }
    
    // 4. 正常转发
    transport->sendRtpPacket(packet);
}

机制说明

  • Consumer实时监控接收带宽
  • 超过阈值时请求Producer降级
  • 通过RTCP反馈机制实现

📊 5.7、关键数据结构关系图

mapProducerConsumers
mapConsumerProducer
rtpStreamMap
rtpStreamMap
packetList
Router
Producer
Consumer
RtpStream
RtpPacket

关系说明

  1. mapProducerConsumers:Router维护Producer-Consumer映射(一对多)
  2. mapConsumerProducer:Router维护Consumer-Producer映射(一对一)
  3. rtpStreamMap:Producer/Consumer存储RTP流
  4. packetList:RtpStream存储RTP包队列

💡 5.8、这样设计的优势

  1. 选择性转发:Router仅转发Consumer需要的RTP包,避免全量复制

    • 4人视频会议:4个Producer → 4×3=12个RTP包 → 实际转发12个包(不是48个)
  2. RTX重传优化:只重传丢失的包,而非整个流

    • 丢包率10% → 仅重传10%的包,而非100%
  3. 内存高效:RtpStream仅存储最近的RTP包(滑动窗口)

    • 通常只存储100个包(约2秒视频数据)
  4. 事件驱动:通过EnhancedEventEmitter实现解耦

    • Producer、Consumer、Router之间通过事件通信

🌟 5.9、总结:RTP流处理全生命周期

发送RTP包
onTransportProducerRtpPacketReceived
存储RTP包
遍历Consumer
处理RTP包
发送RTP包
检测丢包
转发
请求RTX
发送RTX包
转发
推流客户端
WebRtcTransport
Router
Producer
Consumer
WebRtcTransport
观看客户端
NACK请求

完整生命周期

  1. 推流客户端发送RTP包 → WebRtcTransport
  2. Router接收RTP包,存储到Producer
  3. Router遍历Consumer列表,转发RTP包
  4. Consumer处理RTP包,检测丢包
  5. Consumer请求NACK重传 → Router转发
  6. Producer生成RTX包 → Router转发给Consumer
  7. 观看客户端接收RTP包并渲染
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐