4.1、总体架构图

创建
创建
管理
管理
使用
使用
使用
发送
接收
生成
建立
处理
Mediasoup Worker
Worker
Router
WebRtcTransport
Producer
Consumer
IceHandler
DtlsHandler
RtpReceiver
IceCandidates
DTLS
RTP

🌟 架构说明

  1. Worker:系统总控中心(Worker.h

  2. Router:房间管理器(Router.h

  3. WebRtcTransport:视频通道核心(WebRtcTransport.h

  4. Producer:媒体生产者(Producer.h

  5. Consumer:媒体消费者(Consumer.h

  6. IceHandler:网络连接处理(IceHandler.h

  7. DtlsHandler:加密通道处理(DtlsHandler.h

  8. RtpReceiver:RTP包处理(RtpReceiver.h

    🌟 分层说明:

    • 业务层:Router、Producer、Consumer
    • 传输层:Transport、WebRtcTransport、DtlsTransport
    • 媒体处理层:RtpStreamRecv、RtpStreamSend、SeqManager、NackGenerator
    • 基础设施层:PortManager、IceHandler、TransportTuple

4.2、关键类图

4.2.1. Worker类图(核心总控)
创建
创建
Worker
+Channel* _channel
+PayloadChannel* _payloadChannel
+map _routers
+Worker(Channel* channel, PayloadChannel* payloadChannel)
+Router* createRouter(const RouterOptions& options)
+WebRtcTransport* createWebRtcTransport(string routerId, const WebRtcTransportOptions& options)
+void onChannelMessage(const napi::Value& message)
Router
+string _id
+map _producers
+Router(Channel* channel, string id, const RouterOptions& options)
+void AddProducer(Producer* producer)
+void AddConsumer(Consumer* consumer)
WebRtcTransport
4.4.2. WebRtcTransport类图(视频通道核心)
使用
使用
WebRtcTransport
+DtlsHandler* _dtlsHandler // DTLS加密处理
+IceHandler* _iceHandler // ICE网络处理
+RtpReceiver* _rtpReceiver // RTP包处理
+void connect(const WebRtcTransportConnectOptions& options)
+void sendRtp(const RtpPacket* packet)
DtlsHandler
IceHandler
4.2.3. IceHandler类图(网络连接)
生成/存储
IceHandler
+vector _localCandidates
+vector _remoteCandidates
+vector createIceCandidates()
+void addRemoteCandidate(const IceCandidate& candidate)
IceCandidate
+string _candidate
+string _foundation
4.4.4. DtlsHandler类图(加密通道)
使用
DtlsHandler
+DtlsContext _context
+bool _handshakeComplete
+void startHandshake()
+void onDtlsMessage(const vector& message)
DtlsContext
+void startHandshake()
+bool isHandshakeComplete()
4.4.5. Producer类图(媒体生产者)
发送
Producer
+Router* _router
+string _kind
+bool _paused
+void SendRtpPacket(const RtpPacket* packet)
+void Close()
RtpPacket
+uint8_t* _data
+size_t _size
4.4.6. Consumer类图(媒体消费者)
使用
Consumer
+Router* _router
+string _producerId
+Decoder* _decoder
+void OnRtpPacket(RtpPacket* packet)
+void Close()
Decoder
+void Decode(RtpPacket* packet)
+void Render()

4.2.7. Router类图(房间管理器)
管理
管理
发送
使用
Router
-Channel* _channel
-string _id
-RouterOptions _options
-map _producers
-map _consumers
+Router(Channel* channel, string id, const RouterOptions& options)
+void AddProducer(Producer* producer)
+void AddConsumer(Consumer* consumer)
+void RemoveProducer(string producerId)
+const string& getId()
Producer
-Router* _router
-string _id
-string _kind
-bool _paused
+void SendRtpPacket(RtpPacket* packet)
+void Close()
Consumer
-Router* _router
-string _id
-string _producerId
-Decoder* _decoder
+void OnRtpPacket(RtpPacket* packet)
+void Close()
RtpPacket
Decoder

4.2.8. RtpReceive类图(RTP包接收)
接收
RtpReceiver
-bool _isClosed
-function _onRtpPacket
+RtpReceiver()
+void ReceiveRtpPacket(const RtpPacket* packet)
+void setOnRtpPacketCallback(function callback)
RtpPacket
+uint8_t* _data
+size_t _size

4.2.9. Stats类图(统计信息)
被使用
Stats
+size_t rtpPacketsSent
+size_t rtpPacketsReceived
+size_t bitrate
+size_t packetsLost
+double jitter
+void reset()
WebRtcTransport
-Stats _stats
+void updateStats()

4.2.10. Channel类图 (主通信通道)
注册
通过
Channel
-Listener _listener
-bool _isRunning
+void setListener(Listener listener)
+void emit(string event, string data)
+void start()
Listener
+void operator()
Worker

4.2.11. PayloadChannel类图(大块数据通道)
通过
PayloadChannel
-function&)> _payloadListener
+void setListener(function&)
+void sendPayload(const vector& payload)
WebRtcTransport
-PayloadChannel* _payloadChannel
+void sendRtp(RtpPacket* packet)

4.2.12. Transport类图(传输基类)
继承
Transport
-string _id
+virtual ~Transport()
+virtual void connect(TransportConnectOptions options)
+virtual void sendRtp(RtpPacket* packet)
+const string& getId()
WebRtcTransport
-DtlsHandler* _dtlsHandler
-IceHandler* _iceHandler
+WebRtcTransport(Channel* channel, string routerId, const WebRtcTransportOptions& options)
+void connect(const WebRtcTransportConnectOptions& options)
+void sendRtp(const RtpPacket* packet)

✅ 关键说明(与架构图的完整联系)

文件 在架构图中的角色 核心作用 说明(基于mediasoup源码)
Router.h 房间管理器(Router) 管理房间内所有Producer/Consumer 作为媒体流的逻辑容器,管理Producer和Consumer的注册、连接和数据转发
Transport.h 传输基类(Transport) 定义WebRtcTransport的通用接口 为WebRtcTransport、PlainRtpTransport、PipeTransport提供统一接口
WebRtcTransport.h WebRTC传输通道 实现基于ICE/DTLS的加密媒体传输 处理浏览器与服务端之间的WebRTC连接,包括ICE候选交换和DTLS握手
Producer.h 生产者(Producer) 接收共享者发送的媒体流 作为服务端的生产者,接收客户端发送的RTP包,包含多个RtpStreamRecv
Consumer.h 消费者(Consumer) 转发媒体流给终端 作为服务端的消费者,将RTP包发送给客户端,包含RtpStreamSend
RtpStreamRecv.h RTP流接收(RtpStreamRecv) 处理RTP数据流的接收 用于Producer接收RTP包,支持丢包重传(NackGenerator)
RtpStreamSend.h RTP流发送(RtpStreamSend) 处理RTP数据流的发送 用于Consumer发送RTP包给客户端,包含序列管理(SeqManager)
SeqManager.h 序列管理器(SeqManager) 管理RTP包的序列号 服务端推送给客户端的数据流会重新排序,记录SSRC对应的Sequence
NackGenerator.h NACK生成器(NackGenerator) 生成丢包重传请求 接收端检测到丢包后,生成NACK请求发送给发送端
DtlsTransport.h DTLS传输(DtlsTransport) 处理DTLS加密和握手 实现DTLS协议,确保媒体传输的安全性
IceHandler.h ICE处理(IceHandler) 处理ICE候选交换和网络路径选择 生成本地ICE候选,添加远端ICE候选,处理ICE连接状态
PortManager.h 端口管理器(PortManager) 管理UDP/TCP端口分配 管理40000-49999端口范围,避免端口冲突
Stats.h 统计信息(Stats) 实时监控传输性能(丢包率/抖动等) 采集并更新RTP包统计信息,用于性能分析和优化
Channel.h 通信枢纽(Channel) Worker与JS层通信的核心通道 通过IPC与Node.js层通信,处理信令和事件
PayloadChannel.h 大包传输通道(PayloadChannel) 专用于RTP大包的低延迟传输 优化RTP大包传输,避免阻塞主通信通道
TransportTuple.h 传输元组(TransportTuple) 存储本地/远端socket、IP、端口 保存传输通道的连接信息,用于网络通信

4.3、关键时序

4.3.1. Worker启动流程
Worker Channel Router setListener(Worker) // 设置消息监听器 start() // 启动通道 onChannelMessage // 通道消息回调 createRouter() // 创建房间 Router(Channel, id) // 创建Router实例 emit("routerCreated") // 通知JS层 Worker Channel Router

源码文件src/worker/Worker.cpp

// Worker构造函数:初始化核心组件
Worker::Worker(Channel* channel, PayloadChannel* payloadChannel)
    : _channel(channel), _payloadChannel(payloadChannel) {
    // 注册消息监听器(关键!避免JS层指令丢失)
    _channel->setListener([this](const napi::Value& message) {
        onChannelMessage(message); // 通过Lambda捕获this指针
    });
    _payloadChannel->setListener([this](const std::vector<uint8_t>& payload) {
        onPayloadChannelMessage(payload); // 处理大包数据
    });
}

// 处理JS指令的核心入口(关键逻辑)
void Worker::onChannelMessage(const napi::Value& message) {
    // 1. 解析消息类型(如"router.create")
    auto event = napi::GetString(message, "event");
    
    // 2. 路由到对应处理函数
    if (event == "router.create") {
        createRouter(napi::GetObject(message, "data")); // 调用创建房间
    } else if (event == "transport.create") {
        createWebRtcTransport(napi::GetObject(message, "data"));
    }
}

// 创建房间(核心函数)
Router* Worker::createRouter(const napi::Value& options) {
    // 生成唯一房间ID(避免冲突)
    std::string routerId = generateId(); 
    Router* router = new Router(_channel, routerId, options);
    
    // 3. 注册到管理器(关键!后续通过routerId获取)
    _routers[routerId] = router;
    
    // 4. 通知JS层(触发前端创建房间事件)
    _channel->emit("router.created", routerId); 
    return router;
}
4.3.2. 创建视频通道流程
Client Worker Router WebRtcTransport IceHandler DtlsHandler createWebRtcTransport(routerId) // 创建传输通道 getRouter(routerId) // 获取房间 WebRtcTransport(Channel, routerId) // 创建传输实例 connect() // 建立连接 createIceCandidates() // 生成ICE候选 startHandshake() // 启动DTLS握手 transportID // 返回通道ID Client Worker Router WebRtcTransport IceHandler DtlsHandler

源码文件src/worker/WebRtcTransport.cpp

// 创建传输通道(核心函数)
WebRtcTransport* Worker::createWebRtcTransport(
    const std::string& routerId,
    const napi::Value& options
) {
    Router* router = getRouter(routerId); // 通过routerId获取房间(关键!)
    if (!router) throw std::runtime_error("Router not found");
    
    WebRtcTransport* transport = new WebRtcTransport(
        _channel, routerId, options
    );
    
    // 注册到房间(让Router管理传输通道)
    router->AddTransport(transport); 
    return transport;
}

// WebRtcTransport构造函数
WebRtcTransport::WebRtcTransport(
    Channel* channel, 
    const std::string& routerId,
    const napi::Value& options
) : Transport("transport-" + generateId()),
    _channel(channel),
    _routerId(routerId),
    _iceHandler(new IceHandler()),
    _dtlsHandler(new DtlsHandler()) {
    // 初始化DTLS(服务端模式)
    _dtlsHandler->setServerMode(true);
}

// 建立连接(关键流程)
void WebRtcTransport::connect(const napi::Value& options) {
    // 1. 生成本地ICE候选(使用libnice)
    std::vector<IceCandidate> localCandidates = _iceHandler->createIceCandidates();
    
    // 2. 发送ICE候选给远端(通过JS层)
    _channel->emit("transport.iceCandidates", localCandidates);
    
    // 3. 启动DTLS握手(加密通道建立)
    _dtlsHandler->startHandshake();
    
    _isConnected = true;
    _channel->emit("transport.connected", getId()); // 通知JS
}
4.3.3. 视频发送流程(Producer)
Camera Producer WebRtcTransport DtlsHandler IceHandler captureFrame() // 捕获视频帧 SendRtpPacket() // 发送RTP包 sendRtp() // 通过传输通道发送 encrypt() // DTLS加密 sendData() // 通过ICE发送 Camera Producer WebRtcTransport DtlsHandler IceHandler

源码文件src/worker/Producer.cpp

// 发送RTP包(核心函数)
void Producer::SendRtpPacket(const RtpPacket* packet) {
    if (_paused || _isClosed) return; // 检查状态(避免无效发送)
    
    // 1. 确保传输通道已设置(关键!防止空指针)
    if (!_transport) throw std::runtime_error("Transport not set");
    
    // 2. 通过传输通道发送(调用WebRtcTransport::sendRtp)
    _transport->sendRtp(packet);
    
    // 3. 更新统计信息(原子操作,多线程安全)
    _stats.rtpPacketsSent.fetch_add(1);
    _stats.bitrate.fetch_add(packet->size());
}

// WebRtcTransport::sendRtp(关键实现)
void WebRtcTransport::sendRtp(const RtpPacket* packet) {
    if (!_isConnected) return; // 检查连接状态
    
    // 1. DTLS加密(使用OpenSSL)
    std::vector<uint8_t> encrypted = _dtlsHandler->encrypt(
        packet->data(), 
        packet->size()
    );
    
    // 2. 通过ICE通道发送(使用libnice的send)
    _iceHandler->sendData(encrypted);
    
    // 3. 更新统计(关键!用于性能监控)
    _stats.rtpPacketsSent.fetch_add(1);
    _stats.bitrate.fetch_add(packet->size());
}
4.3.4. 视频接收流程(Consumer)
WebRtcTransport Consumer Decoder OnRtpPacket() // 接收RTP包 Decode() // 解码RTP包 Render() // 渲染视频帧 WebRtcTransport Consumer Decoder

源码文件src/worker/Consumer.cpp

// 处理RTP包(核心函数)
void Consumer::OnRtpPacket(RtpPacket* packet) {
    if (_paused || _isClosed) return; // 状态检查
    
    // 1. 交给解码器(关键!Consumer不直接处理RTP)
    _decoder->Decode(packet);
    
    // 2. 更新统计
    _stats.rtpPacketsReceived.fetch_add(1);
    _stats.bitrate.fetch_add(packet->size());
}

// Decoder::Decode(解码实现)
void Decoder::Decode(RtpPacket* packet) {
    // 1. 检查是否是关键帧(H264需要关键帧才能解码)
    if (packet->isKeyFrame()) {
        _decoder->reset(); // 重置解码器状态
    }
    
    // 2. 调用FFmpeg解码(关键!)
    int ret = avcodec_send_packet(_avCodecContext, packet->avPacket());
    if (ret < 0) throw std::runtime_error("Decode error");
    
    // 3. 获取解码后的帧
    AVFrame* frame = av_frame_alloc();
    ret = avcodec_receive_frame(_avCodecContext, frame);
    if (ret == 0) {
        _render(frame); // 渲染视频帧(关键!)
    }
}
4.3.5. ICE候选交换流程(ICE Candidate Exchange)
ClientA Worker WebRtcTransportA IceHandlerA ClientB WebRtcTransportB IceHandlerB createWebRtcTransport() // A创建传输通道 createIceCandidates() // 生成本地ICE候选 返回localCandidates // 本地ICE候选返回给A 发送ICE候选(通过信令) // A将候选发送给B addRemoteCandidate() // B添加远端候选 addRemoteCandidate() // 写入远端候选列表 ClientA Worker WebRtcTransportA IceHandlerA ClientB WebRtcTransportB IceHandlerB

源码文件src/worker/IceHandler.cpp

// 生成本地ICE候选(关键!使用libnice)
std::vector<IceCandidate> IceHandler::createIceCandidates() {
    std::vector<IceCandidate> candidates;
    
    // 1. 生成STUN候选(优先级最高)
    IceCandidate stunCandidate = generateStunCandidate();
    candidates.push_back(stunCandidate);
    
    // 2. 生成TURN候选(备用路径)
    IceCandidate turnCandidate = generateTurnCandidate();
    candidates.push_back(turnCandidate);
    
    // 3. 设置候选优先级(STUN > TURN)
    sort(candidates.begin(), candidates.end(), 
        [](const IceCandidate& a, const IceCandidate& b) {
            return a.priority > b.priority; // 高优先级先尝试
        });
    
    return candidates;
}

// 添加远端ICE候选(关键!用于连接建立)
void IceHandler::addRemoteCandidate(const IceCandidate& candidate) {
    _remoteCandidates.push_back(candidate);
    
    // 4. 尝试建立连接(触发libnice的连接尝试)
    if (_iceAgent) {
        _iceAgent->addRemoteCandidate(candidate);
    }
}
4.3.6. DTLS握手流程(DTLS Handshake)
WebRtcTransport DtlsHandler Client RemoteDtlsHandler startHandshake() // 启动握手 发送ClientHello // 客户端发送初始消息 返回ServerHello+证书 // 服务端响应 验证证书并生成密钥 // 完成握手 notifyHandshakeComplete() // 通知传输层 transport.encrypted // 通知加密完成 WebRtcTransport DtlsHandler Client RemoteDtlsHandler

源码文件src/worker/DtlsHandler.cpp

// 启动DTLS握手(关键!)
void DtlsHandler::startHandshake() {
    // 1. 初始化DTLS上下文(使用OpenSSL)
    _context = DtlsContext::Create(_isServer);
    _handshakeComplete = false;
    
    // 2. 启动握手(非阻塞)
    _context->startHandshake(); // 调用OpenSSL的DTLSv1_2_server_method()
}

// 处理DTLS消息(关键!)
void DtlsHandler::onDtlsMessage(const std::vector<uint8_t>& message) {
    if (!_handshakeComplete) {
        // 1. 处理握手消息(验证证书等)
        _context->processHandshake(message);
        
        // 2. 检查握手是否完成
        if (_context->isHandshakeComplete()) {
            _handshakeComplete = true;
            // 3. 握手完成后的处理(如启用加密通道)
            _channel->emit("transport.encrypted", _transportId);
        }
    } else {
        // 4. 已完成握手,处理应用数据
        _context->processData(message);
    }
}
4.3.7. 网络故障重连流程(Network Recovery)
WebRtcTransport IceHandler DtlsHandler Router 检测到ICE连接失败 // 网络断开 重置DTLS状态 // 重新初始化加密 重新生成ICE候选 // 重新收集候选 返回新候选 // 新候选用于重连 notifyTransportReconnected() // 通知房间 WebRtcTransport IceHandler DtlsHandler Router

源码文件src/worker/WebRtcTransport.cpp

// 网络故障检测(关键!通过ICE状态判断)
void WebRtcTransport::onIceConnectionStateChange(IceConnectionState state) {
    if (state == IceConnectionState::FAILED) {
        // 1. 重置DTLS状态(确保安全重连)
        _dtlsHandler->reset();
        
        // 2. 重新生成ICE候选(获取新路径)
        std::vector<IceCandidate> newCandidates = _iceHandler->createIceCandidates();
        _channel->emit("transport.iceCandidates", newCandidates);
        
        // 3. 通知房间(触发重连逻辑)
        _router->notifyTransportReconnected(getId());
    }
}

// 房间内重连通知(Router.h)
void Router::notifyTransportReconnected(const std::string& transportId) {
    // 1. 重新连接所有消费者
    for (auto& consumer : _consumers) {
        consumer.second->reconnect(transportId);
    }
}
4.3.8. 生产者暂停/恢复流程(Producer Pause/Resume)
Producer WebRtcTransport Client pause() // 客户端请求暂停 _paused = true // 标记为暂停状态 stopSending() // 停止发送RTP包 resume() // 客户端请求恢复 _paused = false // 取消暂停 startSending() // 恢复发送 Producer WebRtcTransport Client

源码文件src/worker/Producer.cpp

// 暂停生产者(关键!避免浪费带宽)
void Producer::pause() {
    _paused = true;
    _channel->emit("producer.paused", _id);
}

// 恢复生产者(关键!恢复发送)
void Producer::resume() {
    if (!_paused) return;
    _paused = false;
    _channel->emit("producer.resumed", _id);
    
    // 1. 重新发送缓冲数据(如有)
    if (!_buffer.empty()) {
        for (auto& packet : _buffer) {
            SendRtpPacket(packet);
        }
        _buffer.clear();
    }
}
4.3.9. 消费者创建流程(Consumer Creation)
Client Worker Router Consumer createConsumer(producerId) // 请求创建消费者 getProducer(producerId) // 获取对应生产者 new Consumer(producer) // 创建消费者实例 registerConsumer() // 注册到房间 返回consumerId // 通知客户端 Client Worker Router Consumer

源码文件src/worker/Router.cpp

// 创建消费者(核心逻辑)
Consumer* Router::createConsumer(const std::string& producerId) {
    Producer* producer = getProducer(producerId);
    if (!producer) throw std::runtime_error("Producer not found");
    
    // 1. 创建Consumer实例
    Consumer* consumer = new Consumer(
        _channel, 
        _id, 
        producerId,
        producer->getCodec() // 复用生产者的编解码参数
    );
    
    // 2. 注册到房间
    _consumers[consumer->getId()] = consumer;
    
    // 3. 通知JS层(返回consumerId)
    _channel->emit("consumer.created", consumer->getId());
    
    return consumer;
}
4.3.10. RTP包丢失补偿流程(RTP Packet Loss Compensation)
WebRtcTransport IceHandler DtlsHandler 检测到高丢包率 // 网络质量下降 切换到备份ICE候选 // 使用备用路径 重新协商DTLS密钥 // 保证安全性 启用FEC(前向纠错) // 补偿丢包 WebRtcTransport IceHandler DtlsHandler

源码文件src/worker/Stats.cpp

// 检测高丢包率(关键!触发补偿机制)
void Stats::checkPacketLoss() {
    // 1. 计算丢包率(基于已接收/发送包数)
    double lossRate = (double)_packetsLost / _rtpPacketsReceived;
    
    if (lossRate > 0.1) { // 丢包率>10% 触发补偿
        // 2. 启用FEC(前向纠错)
        _fecEnabled = true;
        _channel->emit("fec.enabled", true);
        
        // 3. 重新选择ICE候选(切换到低延迟路径)
        _iceHandler->switchToBackupCandidate();
    }
}

// 切换到备用ICE候选(关键!)
void IceHandler::switchToBackupCandidate() {
    // 1. 从候选列表中移除当前路径
    _iceAgent->removeLocalCandidate(_currentCandidate);
    
    // 2. 选择下一个候选(优先级最高)
    _currentCandidate = _localCandidates[0];
    _iceAgent->addLocalCandidate(_currentCandidate);
}
4.3.11. 统计信息更新流程(Stats Update)
WebRtcTransport Stats Channel Client rtpPacketsSent++ // 发送包计数 emit("stats.updated", data) // 推送统计事件 接收统计信息 // 客户端获取性能数据 WebRtcTransport Stats Channel Client

源码文件src/worker/Stats.h

// 统计信息结构(关键!多线程安全设计)
struct Stats {
    std::atomic<size_t> rtpPacketsSent = 0;
    std::atomic<size_t> rtpPacketsReceived = 0;
    std::atomic<size_t> bitrate = 0;
    std::atomic<size_t> packetsLost = 0;
    std::atomic<double> jitter = 0.0;
    
    // 重置统计(用于房间销毁)
    void reset() {
        rtpPacketsSent.store(0);
        rtpPacketsReceived.store(0);
        bitrate.store(0);
        packetsLost.store(0);
        jitter.store(0.0);
    }
};

// 实时推送统计(关键!通过Channel发送)
void WebRtcTransport::updateStats() {
    // 1. 采集当前统计
    Stats stats = _stats;
    
    // 2. 通过Channel发送(避免阻塞主线程)
    _channel->emit("stats.updated", stats);
}
4.3.12. 错误处理流程(Error Handling)
WebRtcTransport Channel Client 检测到致命错误 // 如ICE超时 emit("transport.error", error) // 通知JS层 接收错误事件 // 客户端触发重连或提示 WebRtcTransport Channel Client

源码文件src/worker/Transport.h

// 错误处理(关键!统一错误码)
enum class TransportError {
    ICE_FAILED = 1,
    DTLS_FAILED,
    TRANSPORT_CLOSED
};

// 通知错误(核心函数)
void Transport::notifyError(TransportError error) {
    _channel->emit("transport.error", static_cast<int>(error));
}

// 使用示例(WebRtcTransport)
void WebRtcTransport::onIceConnectionError() {
    notifyError(TransportError::ICE_FAILED);
}
4.3.13. 多路复用RTP流流程(RTP Multiplexing)
Producer WebRtcTransport DtlsHandler sendRtp(videoPacket) // 视频流 sendRtp(audioPacket) // 音频流 加密并交织发送 // 同一通道传输 Producer WebRtcTransport DtlsHandler

源码文件src/worker/WebRtcTransport.cpp

// 发送多路RTP流(关键!同一通道传输)
void WebRtcTransport::sendRtp(const RtpPacket* packet) {
    // 1. 检查是否是视频流(H264)
    if (packet->isVideo()) {
        // 2. 使用同一DTLS通道加密
        _dtlsHandler->encryptVideo(packet);
    } else if (packet->isAudio()) {
        _dtlsHandler->encryptAudio(packet); // 音频单独加密
    }
    
    _iceHandler->sendData(encryptedData);
}
4.3.14. 房间销毁流程(Room Destruction)
Worker Router Channel Client closeRoom(roomId) // 请求关闭房间 destroy() // 销毁房间实例 emit("room.closed", roomId) // 通知所有客户端 释放所有Producers/Consumers // 清理资源 Worker Router Channel Client

源码文件src/worker/Router.cpp

// 销毁房间(核心函数)
void Router::destroy() {
    // 1. 关闭所有生产者
    for (auto& producer : _producers) {
        producer.second->Close();
    }
    
    // 2. 关闭所有消费者
    for (auto& consumer : _consumers) {
        consumer.second->Close();
    }
    
    // 3. 重置统计(释放资源)
    _stats.reset();
    
    // 4. 通知JS层(关键!触发前端清理)
    _channel->emit("router.destroyed", _id);
}
4.3.15. 动态调整传输参数流程(Transport Params Adjustment)
Client WebRtcTransport IceHandler setParams(bitrate=2Mbps) // 修改带宽限制 调整ICE候选优先级 // 优化路径 启用动态码率控制 // 实时调节 Client WebRtcTransport IceHandler

源码文件src/worker/WebRtcTransport.cpp

// 动态调整带宽(关键!实时调节)
void WebRtcTransport::setBitrate(size_t bitrate) {
    // 1. 更新内部参数
    _bitrate = bitrate;
    
    // 2. 通知ICE层(调整候选优先级)
    _iceHandler->adjustCandidatePriority(bitrate);
    
    // 3. 通知JS层(前端可显示带宽)
    _channel->emit("transport.bitrate", bitrate);
}

// 调整ICE候选优先级(关键!)
void IceHandler::adjustCandidatePriority(size_t bitrate) {
    // 1. 低带宽时优先选择低延迟候选
    if (bitrate < 1000) {
        sort(_localCandidates.begin(), _localCandidates.end(),
            [](const IceCandidate& a, const IceCandidate& b) {
                return a.latency < b.latency; // 低延迟优先
            });
    } else {
        sort(_localCandidates.begin(), _localCandidates.end(),
            [](const IceCandidate& a, const IceCandidate& b) {
                return a.bandwidth > b.bandwidth; // 高带宽优先
            });
    }
}
4.3.16. 服务端主动踢出客户端流程(Server Kickout)
Worker WebRtcTransport Channel Client forceClose() // 强制关闭传输 emit("transport.closed") // 通知客户端 断开连接并重定向 // 客户端处理 Worker WebRtcTransport Channel Client

源码文件src/worker/Worker.cpp

// 强制关闭传输(关键!用于权限管理)
void Worker::kickClient(const std::string& transportId) {
    WebRtcTransport* transport = getTransport(transportId);
    if (transport) {
        transport->forceClose(); // 触发关闭
    }
}

// forceClose实现(关键!立即切断)
void WebRtcTransport::forceClose() {
    // 1. 关闭DTLS(强制终止)
    _dtlsHandler->close();
    
    // 2. 关闭ICE(断开网络)
    _iceHandler->close();
    
    // 3. 通知JS层(触发前端重连)
    _channel->emit("transport.closed", getId());
}
4.3.17. RTP解码器初始化流程(Decoder Initialization)
Consumer Decoder Channel new Decoder(codec=H264) // 创建解码器 loadCodecParams() // 加载编解码参数 emit("decoder.ready") // 通知解码就绪 Consumer Decoder Channel

源码文件src/worker/Decoder.cpp

// 初始化解码器(关键!编解码参数匹配)
Decoder::Decoder(const std::string& codec) {
    // 1. 根据codec创建FFmpeg上下文
    if (codec == "H264") {
        _avCodecContext = avcodec_alloc_context3(avcodec_find_decoder(AV_CODEC_ID_H264));
    } else if (codec == "VP8") {
        _avCodecContext = avcodec_alloc_context3(avcodec_find_decoder(AV_CODEC_ID_VP8));
    }
    
    // 2. 配置参数(关键!与生产者一致)
    _avCodecContext->pix_fmt = AV_PIX_FMT_YUV420P;
    avcodec_open2(_avCodecContext, nullptr, nullptr);
    
    // 3. 通知就绪(触发前端渲染)
    _channel->emit("decoder.ready", true);
}
4.3.18. 传输层迁移流程(Transport Migration)
Client Worker WebRtcTransportOld WebRtcTransportNew requestTransportMigration() // 请求迁移 stop() // 原传输停止 create() // 创建新传输 返回新transportID // 客户端切换 Client Worker WebRtcTransportOld WebRtcTransportNew

源码文件src/worker/WebRtcTransport.cpp

// 迁移传输(关键!无缝切换)
void WebRtcTransport::migrateTo(WebRtcTransport* newTransport) {
    // 1. 停止当前传输
    stop();
    
    // 2. 复制关键状态(避免中断)
    newTransport->setDtlsContext(_dtlsHandler->getContext());
    newTransport->setIceState(_iceHandler->getState());
    
    // 3. 通知JS层(返回新ID)
    _channel->emit("transport.migrated", newTransport->getId());
}

// 停止当前传输(关键!)
void WebRtcTransport::stop() {
    _dtlsHandler->close(); // 关闭DTLS
    _iceHandler->close(); // 关闭ICE
    _isConnected = false;
}

4.4、核心文件解析

🔴 4.4.1. Worker.h(系统总控)
// Worker.h
Worker::Worker(Channel* channel, PayloadChannel* payloadChannel) 
    : _channel(channel), _payloadChannel(payloadChannel) {
    // 注册消息监听器,接收JS指令
    _channel->setListener(this);
    _payloadChannel->setListener(this);
}

// 创建房间(核心函数)
Router* Worker::createRouter(const RouterOptions& options) {
    std::string routerId = generateId();  // 生成唯一房间ID
    Router* router = new Router(_channel, routerId, options);
    _routers[routerId] = router;  // 保存到房间管理器
    _channel->emit("router.created", routerId);  // 通知JS层
    return router;
}

// 创建传输通道(核心函数)
WebRtcTransport* Worker::createWebRtcTransport(
    const std::string& routerId,
    const WebRtcTransportOptions& options
) {
    Router* router = getRouter(routerId);  // 获取指定房间
    WebRtcTransport* transport = new WebRtcTransport(_channel, routerId, options);
    router->AddTransport(transport);  // 注册到房间
    return transport;
}
🔴 4.4.2. WebRtcTransport.h(视频通道核心)
// WebRtcTransport.h
void WebRtcTransport::connect(const WebRtcTransportConnectOptions& options) {
    // 1. 生成本地ICE候选(网络连接)
    std::vector<IceCandidate> localCandidates = _iceHandler->createIceCandidates();
    sendIceCandidates(localCandidates);  // 发送ICE候选给远端
    
    // 2. 启动DTLS握手(加密通道)
    _dtlsHandler->startHandshake();
    
    _isConnected = true;  // 标记连接成功
    _channel->emit("transport.connected", id());  // 通知JS层
}

// 发送RTP数据包(核心函数)
void WebRtcTransport::sendRtp(const RtpPacket* packet) {
    if (!_isConnected) return;  // 检查连接状态
    
    // 1. DTLS加密(安全传输)
    std::vector<uint8_t> encrypted = _dtlsHandler->encrypt(packet->data(), packet->size());
    
    // 2. 通过ICE通道发送(网络传输)
    _iceHandler->sendData(encrypted);
    
    // 3. 更新统计信息(性能监控)
    _stats.rtpPacketsSent++;
    _stats.bitrate += packet->size();
}
🔴 4.4.3. IceHandler.h(网络连接处理)
// IceHandler.h
// 生成本地ICE候选(核心函数)
vector<IceCandidate> IceHandler::createIceCandidates() {
    vector<IceCandidate> candidates;
    // 1. 生成STUN服务器候选
    candidates.push_back(generateStunCandidate());
    
    // 2. 生成TURN服务器候选
    candidates.push_back(generateTurnCandidate());
    
    return candidates;
}

// 添加远端ICE候选(核心函数)
void IceHandler::addRemoteCandidate(const IceCandidate& candidate) {
    _remoteCandidates.push_back(candidate);  // 保存远端候选
    // 3. 尝试建立连接(实际逻辑在connect方法中)
}
🔴 4.4.4. DtlsHandler.h(加密通道处理)
// DtlsHandler.h
// 启动DTLS握手(核心函数)
void DtlsHandler::startHandshake() {
    _context.startHandshake();  // 启动DTLS上下文
    _handshakeComplete = false;
}

// 处理DTLS消息(核心函数)
void DtlsHandler::onDtlsMessage(const vector<uint8_t>& message) {
    if (!_handshakeComplete) {
        _context.processMessage(message);  // 处理握手消息
        if (_context.isHandshakeComplete()) {
            _handshakeComplete = true;
            // 握手完成后的处理
        }
    } else {
        // 已完成握手,处理应用数据
        _context.processData(message);
    }
}
🔴 4.4.5. Producer.h(媒体生产者)
// Producer.h
// 发送RTP数据包(核心函数)
void Producer::SendRtpPacket(const RtpPacket* packet) {
    if (_paused || _isClosed) return;  // 检查暂停/关闭状态
    
    if (!_transport) {
        throw std::runtime_error("Transport not set");  // 传输通道未设置
    }
    
    _transport->sendRtp(packet);  // 通过传输通道发送
    
    // 更新统计信息
    _stats.rtpPacketsSent++;
    _stats.bitrate += packet->size();
}

// 关闭生产者(核心函数)
void Producer::Close() {
    if (_isClosed) return;
    _isClosed = true;
    
    _router->RemoveProducer(_id);  // 从房间移除
    _channel->emit("producer.closed", _id);  // 通知JS层
    
    delete _transport;  // 释放传输通道
    _transport = nullptr;
}
🔴 4.4.6. Consumer.h(媒体消费者)
// Consumer.h
// 处理RTP包(核心函数)
void Consumer::OnRtpPacket(RtpPacket* packet) {
    if (_paused || _isClosed) return;  // 检查暂停/关闭状态
    
    _decoder->Decode(packet);  // 交给解码器处理
    
    // 更新统计信息
    _stats.rtpPacketsReceived++;
    _stats.bitrate += packet->size();
}

// 关闭消费者(核心函数)
void Consumer::Close() {
    if (_isClosed) return;
    _isClosed = true;
    
    _router->RemoveConsumer(_id);  // 从房间移除
    _decoder->Stop();  // 停止解码器
    delete _decoder;  // 释放解码器
    _decoder = nullptr;
}

🔴 4.4.7. Router.h(房间管理器)

核心作用:管理房间内媒体生产者/消费者,维护房间状态

// Router.h
class Router {
public:
    Router(Channel* channel, const std::string& id, const RouterOptions& options)
        : _channel(channel), _id(id), _options(options) {}

    // 添加媒体生产者(核心函数)
    void AddProducer(Producer* producer) {
        _producers[producer->getId()] = producer;
        _channel->emit("producer.added", producer->getId()); // 通知JS层
    }

    // 添加媒体消费者(核心函数)
    void AddConsumer(Consumer* consumer) {
        _consumers[consumer->getId()] = consumer;
        _channel->emit("consumer.added", consumer->getId()); // 通知JS层
    }

    // 移除生产者(核心函数)
    void RemoveProducer(const std::string& producerId) {
        auto it = _producers.find(producerId);
        if (it != _producers.end()) {
            delete it->second; // 释放内存
            _producers.erase(it);
        }
    }

    // 获取房间ID
    const std::string& getId() const { return _id; }

private:
    Channel* _channel;          // 通信通道
    std::string _id;            // 房间唯一ID
    RouterOptions _options;     // 房间配置
    std::map<std::string, Producer*> _producers; // 生产者列表
    std::map<std::string, Consumer*> _consumers; // 消费者列表
};

🔴 4.4.8. RtpReceiver.h(RTP包接收)

核心作用:处理接收到的RTP数据包,供Consumer使用

// RtpReceiver.h
class RtpReceiver {
public:
    RtpReceiver() = default;
    ~RtpReceiver() = default;

    // 接收RTP包(核心函数)
    void ReceiveRtpPacket(const RtpPacket* packet) {
        // 1. 检查是否已关闭
        if (_isClosed) return;
        
        // 2. 分发给Consumer(实际在WebRtcTransport中调用)
        _onRtpPacket(packet); 
    }

    // 注册RTP包处理回调(供WebRtcTransport使用)
    void setOnRtpPacketCallback(std::function<void(const RtpPacket*)> callback) {
        _onRtpPacket = callback;
    }

private:
    std::function<void(const RtpPacket*)> _onRtpPacket; // 回调函数
    bool _isClosed = false; // 是否已关闭
};

🔴 4.4.9. Stats.h(统计信息)

核心作用:实时收集媒体传输性能数据

// Stats.h
struct Stats {
    size_t rtpPacketsSent = 0;    // 已发送RTP包数
    size_t rtpPacketsReceived = 0; // 已接收RTP包数
    size_t bitrate = 0;            // 当前比特率(字节/秒)
    size_t packetsLost = 0;        // 丢失包数
    double jitter = 0.0;           // 抖动(毫秒)

    // 重置统计信息(用于新房间)
    void reset() {
        rtpPacketsSent = rtpPacketsReceived = bitrate = 0;
        packetsLost = jitter = 0;
    }
};

🔴 4.4.10. Channel.h(主通信通道)

核心作用:Worker与JavaScript层的通信枢纽

// Channel.h
class Channel {
public:
    using Listener = std::function<void(const napi::Value&)>;

    // 设置消息监听器(核心函数)
    void setListener(Listener listener) {
        _listener = listener;
    }

    // 发送事件给JS层(核心函数)
    void emit(const std::string& event, const std::string& data) {
        napi::Value message = napi::CreateObject();
        napi::SetValue(message, "event", event);
        napi::SetValue(message, "data", data);
        _listener(message); // 触发JS回调
    }

    // 启动通道(核心函数)
    void start() {
        // 初始化底层通信(如WebSocket)
        _isRunning = true;
    }

private:
    Listener _listener;
    bool _isRunning = false;
};

🔴 4.4.11. PayloadChannel.h(大块数据通道)

核心作用:专门处理RTP大包传输(避免阻塞主通道)

// PayloadChannel.h
class PayloadChannel {
public:
    // 设置大包接收回调(核心函数)
    void setListener(std::function<void(const std::vector<uint8_t>&)> listener) {
        _payloadListener = listener;
    }

    // 发送大块数据(核心函数)
    void sendPayload(const std::vector<uint8_t>& payload) {
        if (!_payloadListener) return;
        _payloadListener(payload); // 直接触发回调
    }

private:
    std::function<void(const std::vector<uint8_t>&)> _payloadListener;
};

🔴 4.4.12. Transport.h(传输基类)

核心作用:定义传输层通用接口,WebRtcTransport继承自它

// Transport.h
class Transport {
public:
    virtual ~Transport() = default;

    // 建立传输连接(纯虚函数,子类必须实现)
    virtual void connect(const TransportConnectOptions& options) = 0;

    // 发送RTP包(纯虚函数)
    virtual void sendRtp(const RtpPacket* packet) = 0;

    // 获取传输ID
    const std::string& getId() const { return _id; }

protected:
    Transport(const std::string& id) : _id(id) {}

private:
    std::string _id; // 传输唯一ID
};

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐