mediasoup源码走读(四)woker
摘要: Mediasoup的核心架构分为业务层、传输层、媒体处理层和基础设施层。Worker作为总控中心,管理Router(房间)和WebRtcTransport(传输通道)。Router协调Producer(媒体生产者)和Consumer(媒体消费者),通过WebRtcTransport实现数据传输。传输层依赖IceHandler处理网络连接、DtlsHandler实现加密通信,RtpRecei
4.1、总体架构图
🌟 架构说明:
Worker:系统总控中心(
Worker.h)Router:房间管理器(
Router.h)WebRtcTransport:视频通道核心(
WebRtcTransport.h)Producer:媒体生产者(
Producer.h)Consumer:媒体消费者(
Consumer.h)IceHandler:网络连接处理(
IceHandler.h)DtlsHandler:加密通道处理(
DtlsHandler.h)RtpReceiver:RTP包处理(
RtpReceiver.h)🌟 分层说明:
- 业务层:Router、Producer、Consumer
- 传输层:Transport、WebRtcTransport、DtlsTransport
- 媒体处理层:RtpStreamRecv、RtpStreamSend、SeqManager、NackGenerator
- 基础设施层:PortManager、IceHandler、TransportTuple
4.2、关键类图
4.2.1. Worker类图(核心总控)
4.4.2. WebRtcTransport类图(视频通道核心)
4.2.3. IceHandler类图(网络连接)
4.4.4. DtlsHandler类图(加密通道)
4.4.5. Producer类图(媒体生产者)
4.4.6. Consumer类图(媒体消费者)
4.2.7. Router类图(房间管理器)
4.2.8. RtpReceive类图(RTP包接收)
4.2.9. Stats类图(统计信息)
4.2.10. Channel类图 (主通信通道)
4.2.11. PayloadChannel类图(大块数据通道)
4.2.12. Transport类图(传输基类)
✅ 关键说明(与架构图的完整联系)
| 文件 | 在架构图中的角色 | 核心作用 | 说明(基于mediasoup源码) |
|---|---|---|---|
| Router.h | 房间管理器(Router) | 管理房间内所有Producer/Consumer | 作为媒体流的逻辑容器,管理Producer和Consumer的注册、连接和数据转发 |
| Transport.h | 传输基类(Transport) | 定义WebRtcTransport的通用接口 | 为WebRtcTransport、PlainRtpTransport、PipeTransport提供统一接口 |
| WebRtcTransport.h | WebRTC传输通道 | 实现基于ICE/DTLS的加密媒体传输 | 处理浏览器与服务端之间的WebRTC连接,包括ICE候选交换和DTLS握手 |
| Producer.h | 生产者(Producer) | 接收共享者发送的媒体流 | 作为服务端的生产者,接收客户端发送的RTP包,包含多个RtpStreamRecv |
| Consumer.h | 消费者(Consumer) | 转发媒体流给终端 | 作为服务端的消费者,将RTP包发送给客户端,包含RtpStreamSend |
| RtpStreamRecv.h | RTP流接收(RtpStreamRecv) | 处理RTP数据流的接收 | 用于Producer接收RTP包,支持丢包重传(NackGenerator) |
| RtpStreamSend.h | RTP流发送(RtpStreamSend) | 处理RTP数据流的发送 | 用于Consumer发送RTP包给客户端,包含序列管理(SeqManager) |
| SeqManager.h | 序列管理器(SeqManager) | 管理RTP包的序列号 | 服务端推送给客户端的数据流会重新排序,记录SSRC对应的Sequence |
| NackGenerator.h | NACK生成器(NackGenerator) | 生成丢包重传请求 | 接收端检测到丢包后,生成NACK请求发送给发送端 |
| DtlsTransport.h | DTLS传输(DtlsTransport) | 处理DTLS加密和握手 | 实现DTLS协议,确保媒体传输的安全性 |
| IceHandler.h | ICE处理(IceHandler) | 处理ICE候选交换和网络路径选择 | 生成本地ICE候选,添加远端ICE候选,处理ICE连接状态 |
| PortManager.h | 端口管理器(PortManager) | 管理UDP/TCP端口分配 | 管理40000-49999端口范围,避免端口冲突 |
| Stats.h | 统计信息(Stats) | 实时监控传输性能(丢包率/抖动等) | 采集并更新RTP包统计信息,用于性能分析和优化 |
| Channel.h | 通信枢纽(Channel) | Worker与JS层通信的核心通道 | 通过IPC与Node.js层通信,处理信令和事件 |
| PayloadChannel.h | 大包传输通道(PayloadChannel) | 专用于RTP大包的低延迟传输 | 优化RTP大包传输,避免阻塞主通信通道 |
| TransportTuple.h | 传输元组(TransportTuple) | 存储本地/远端socket、IP、端口 | 保存传输通道的连接信息,用于网络通信 |
4.3、关键时序
4.3.1. Worker启动流程
源码文件:src/worker/Worker.cpp
// Worker构造函数:初始化核心组件
Worker::Worker(Channel* channel, PayloadChannel* payloadChannel)
: _channel(channel), _payloadChannel(payloadChannel) {
// 注册消息监听器(关键!避免JS层指令丢失)
_channel->setListener([this](const napi::Value& message) {
onChannelMessage(message); // 通过Lambda捕获this指针
});
_payloadChannel->setListener([this](const std::vector<uint8_t>& payload) {
onPayloadChannelMessage(payload); // 处理大包数据
});
}
// 处理JS指令的核心入口(关键逻辑)
void Worker::onChannelMessage(const napi::Value& message) {
// 1. 解析消息类型(如"router.create")
auto event = napi::GetString(message, "event");
// 2. 路由到对应处理函数
if (event == "router.create") {
createRouter(napi::GetObject(message, "data")); // 调用创建房间
} else if (event == "transport.create") {
createWebRtcTransport(napi::GetObject(message, "data"));
}
}
// 创建房间(核心函数)
Router* Worker::createRouter(const napi::Value& options) {
// 生成唯一房间ID(避免冲突)
std::string routerId = generateId();
Router* router = new Router(_channel, routerId, options);
// 3. 注册到管理器(关键!后续通过routerId获取)
_routers[routerId] = router;
// 4. 通知JS层(触发前端创建房间事件)
_channel->emit("router.created", routerId);
return router;
}
4.3.2. 创建视频通道流程
源码文件:src/worker/WebRtcTransport.cpp
// 创建传输通道(核心函数)
WebRtcTransport* Worker::createWebRtcTransport(
const std::string& routerId,
const napi::Value& options
) {
Router* router = getRouter(routerId); // 通过routerId获取房间(关键!)
if (!router) throw std::runtime_error("Router not found");
WebRtcTransport* transport = new WebRtcTransport(
_channel, routerId, options
);
// 注册到房间(让Router管理传输通道)
router->AddTransport(transport);
return transport;
}
// WebRtcTransport构造函数
WebRtcTransport::WebRtcTransport(
Channel* channel,
const std::string& routerId,
const napi::Value& options
) : Transport("transport-" + generateId()),
_channel(channel),
_routerId(routerId),
_iceHandler(new IceHandler()),
_dtlsHandler(new DtlsHandler()) {
// 初始化DTLS(服务端模式)
_dtlsHandler->setServerMode(true);
}
// 建立连接(关键流程)
void WebRtcTransport::connect(const napi::Value& options) {
// 1. 生成本地ICE候选(使用libnice)
std::vector<IceCandidate> localCandidates = _iceHandler->createIceCandidates();
// 2. 发送ICE候选给远端(通过JS层)
_channel->emit("transport.iceCandidates", localCandidates);
// 3. 启动DTLS握手(加密通道建立)
_dtlsHandler->startHandshake();
_isConnected = true;
_channel->emit("transport.connected", getId()); // 通知JS
}
4.3.3. 视频发送流程(Producer)
源码文件:src/worker/Producer.cpp
// 发送RTP包(核心函数)
void Producer::SendRtpPacket(const RtpPacket* packet) {
if (_paused || _isClosed) return; // 检查状态(避免无效发送)
// 1. 确保传输通道已设置(关键!防止空指针)
if (!_transport) throw std::runtime_error("Transport not set");
// 2. 通过传输通道发送(调用WebRtcTransport::sendRtp)
_transport->sendRtp(packet);
// 3. 更新统计信息(原子操作,多线程安全)
_stats.rtpPacketsSent.fetch_add(1);
_stats.bitrate.fetch_add(packet->size());
}
// WebRtcTransport::sendRtp(关键实现)
void WebRtcTransport::sendRtp(const RtpPacket* packet) {
if (!_isConnected) return; // 检查连接状态
// 1. DTLS加密(使用OpenSSL)
std::vector<uint8_t> encrypted = _dtlsHandler->encrypt(
packet->data(),
packet->size()
);
// 2. 通过ICE通道发送(使用libnice的send)
_iceHandler->sendData(encrypted);
// 3. 更新统计(关键!用于性能监控)
_stats.rtpPacketsSent.fetch_add(1);
_stats.bitrate.fetch_add(packet->size());
}
4.3.4. 视频接收流程(Consumer)
源码文件:src/worker/Consumer.cpp
// 处理RTP包(核心函数)
void Consumer::OnRtpPacket(RtpPacket* packet) {
if (_paused || _isClosed) return; // 状态检查
// 1. 交给解码器(关键!Consumer不直接处理RTP)
_decoder->Decode(packet);
// 2. 更新统计
_stats.rtpPacketsReceived.fetch_add(1);
_stats.bitrate.fetch_add(packet->size());
}
// Decoder::Decode(解码实现)
void Decoder::Decode(RtpPacket* packet) {
// 1. 检查是否是关键帧(H264需要关键帧才能解码)
if (packet->isKeyFrame()) {
_decoder->reset(); // 重置解码器状态
}
// 2. 调用FFmpeg解码(关键!)
int ret = avcodec_send_packet(_avCodecContext, packet->avPacket());
if (ret < 0) throw std::runtime_error("Decode error");
// 3. 获取解码后的帧
AVFrame* frame = av_frame_alloc();
ret = avcodec_receive_frame(_avCodecContext, frame);
if (ret == 0) {
_render(frame); // 渲染视频帧(关键!)
}
}
4.3.5. ICE候选交换流程(ICE Candidate Exchange)
源码文件:src/worker/IceHandler.cpp
// 生成本地ICE候选(关键!使用libnice)
std::vector<IceCandidate> IceHandler::createIceCandidates() {
std::vector<IceCandidate> candidates;
// 1. 生成STUN候选(优先级最高)
IceCandidate stunCandidate = generateStunCandidate();
candidates.push_back(stunCandidate);
// 2. 生成TURN候选(备用路径)
IceCandidate turnCandidate = generateTurnCandidate();
candidates.push_back(turnCandidate);
// 3. 设置候选优先级(STUN > TURN)
sort(candidates.begin(), candidates.end(),
[](const IceCandidate& a, const IceCandidate& b) {
return a.priority > b.priority; // 高优先级先尝试
});
return candidates;
}
// 添加远端ICE候选(关键!用于连接建立)
void IceHandler::addRemoteCandidate(const IceCandidate& candidate) {
_remoteCandidates.push_back(candidate);
// 4. 尝试建立连接(触发libnice的连接尝试)
if (_iceAgent) {
_iceAgent->addRemoteCandidate(candidate);
}
}
4.3.6. DTLS握手流程(DTLS Handshake)
源码文件:src/worker/DtlsHandler.cpp
// 启动DTLS握手(关键!)
void DtlsHandler::startHandshake() {
// 1. 初始化DTLS上下文(使用OpenSSL)
_context = DtlsContext::Create(_isServer);
_handshakeComplete = false;
// 2. 启动握手(非阻塞)
_context->startHandshake(); // 调用OpenSSL的DTLSv1_2_server_method()
}
// 处理DTLS消息(关键!)
void DtlsHandler::onDtlsMessage(const std::vector<uint8_t>& message) {
if (!_handshakeComplete) {
// 1. 处理握手消息(验证证书等)
_context->processHandshake(message);
// 2. 检查握手是否完成
if (_context->isHandshakeComplete()) {
_handshakeComplete = true;
// 3. 握手完成后的处理(如启用加密通道)
_channel->emit("transport.encrypted", _transportId);
}
} else {
// 4. 已完成握手,处理应用数据
_context->processData(message);
}
}
4.3.7. 网络故障重连流程(Network Recovery)
源码文件:src/worker/WebRtcTransport.cpp
// 网络故障检测(关键!通过ICE状态判断)
void WebRtcTransport::onIceConnectionStateChange(IceConnectionState state) {
if (state == IceConnectionState::FAILED) {
// 1. 重置DTLS状态(确保安全重连)
_dtlsHandler->reset();
// 2. 重新生成ICE候选(获取新路径)
std::vector<IceCandidate> newCandidates = _iceHandler->createIceCandidates();
_channel->emit("transport.iceCandidates", newCandidates);
// 3. 通知房间(触发重连逻辑)
_router->notifyTransportReconnected(getId());
}
}
// 房间内重连通知(Router.h)
void Router::notifyTransportReconnected(const std::string& transportId) {
// 1. 重新连接所有消费者
for (auto& consumer : _consumers) {
consumer.second->reconnect(transportId);
}
}
4.3.8. 生产者暂停/恢复流程(Producer Pause/Resume)
源码文件:src/worker/Producer.cpp
// 暂停生产者(关键!避免浪费带宽)
void Producer::pause() {
_paused = true;
_channel->emit("producer.paused", _id);
}
// 恢复生产者(关键!恢复发送)
void Producer::resume() {
if (!_paused) return;
_paused = false;
_channel->emit("producer.resumed", _id);
// 1. 重新发送缓冲数据(如有)
if (!_buffer.empty()) {
for (auto& packet : _buffer) {
SendRtpPacket(packet);
}
_buffer.clear();
}
}
4.3.9. 消费者创建流程(Consumer Creation)
源码文件:src/worker/Router.cpp
// 创建消费者(核心逻辑)
Consumer* Router::createConsumer(const std::string& producerId) {
Producer* producer = getProducer(producerId);
if (!producer) throw std::runtime_error("Producer not found");
// 1. 创建Consumer实例
Consumer* consumer = new Consumer(
_channel,
_id,
producerId,
producer->getCodec() // 复用生产者的编解码参数
);
// 2. 注册到房间
_consumers[consumer->getId()] = consumer;
// 3. 通知JS层(返回consumerId)
_channel->emit("consumer.created", consumer->getId());
return consumer;
}
4.3.10. RTP包丢失补偿流程(RTP Packet Loss Compensation)
源码文件:src/worker/Stats.cpp
// 检测高丢包率(关键!触发补偿机制)
void Stats::checkPacketLoss() {
// 1. 计算丢包率(基于已接收/发送包数)
double lossRate = (double)_packetsLost / _rtpPacketsReceived;
if (lossRate > 0.1) { // 丢包率>10% 触发补偿
// 2. 启用FEC(前向纠错)
_fecEnabled = true;
_channel->emit("fec.enabled", true);
// 3. 重新选择ICE候选(切换到低延迟路径)
_iceHandler->switchToBackupCandidate();
}
}
// 切换到备用ICE候选(关键!)
void IceHandler::switchToBackupCandidate() {
// 1. 从候选列表中移除当前路径
_iceAgent->removeLocalCandidate(_currentCandidate);
// 2. 选择下一个候选(优先级最高)
_currentCandidate = _localCandidates[0];
_iceAgent->addLocalCandidate(_currentCandidate);
}
4.3.11. 统计信息更新流程(Stats Update)
源码文件:src/worker/Stats.h
// 统计信息结构(关键!多线程安全设计)
struct Stats {
std::atomic<size_t> rtpPacketsSent = 0;
std::atomic<size_t> rtpPacketsReceived = 0;
std::atomic<size_t> bitrate = 0;
std::atomic<size_t> packetsLost = 0;
std::atomic<double> jitter = 0.0;
// 重置统计(用于房间销毁)
void reset() {
rtpPacketsSent.store(0);
rtpPacketsReceived.store(0);
bitrate.store(0);
packetsLost.store(0);
jitter.store(0.0);
}
};
// 实时推送统计(关键!通过Channel发送)
void WebRtcTransport::updateStats() {
// 1. 采集当前统计
Stats stats = _stats;
// 2. 通过Channel发送(避免阻塞主线程)
_channel->emit("stats.updated", stats);
}
4.3.12. 错误处理流程(Error Handling)
源码文件:src/worker/Transport.h
// 错误处理(关键!统一错误码)
enum class TransportError {
ICE_FAILED = 1,
DTLS_FAILED,
TRANSPORT_CLOSED
};
// 通知错误(核心函数)
void Transport::notifyError(TransportError error) {
_channel->emit("transport.error", static_cast<int>(error));
}
// 使用示例(WebRtcTransport)
void WebRtcTransport::onIceConnectionError() {
notifyError(TransportError::ICE_FAILED);
}
4.3.13. 多路复用RTP流流程(RTP Multiplexing)
源码文件:src/worker/WebRtcTransport.cpp
// 发送多路RTP流(关键!同一通道传输)
void WebRtcTransport::sendRtp(const RtpPacket* packet) {
// 1. 检查是否是视频流(H264)
if (packet->isVideo()) {
// 2. 使用同一DTLS通道加密
_dtlsHandler->encryptVideo(packet);
} else if (packet->isAudio()) {
_dtlsHandler->encryptAudio(packet); // 音频单独加密
}
_iceHandler->sendData(encryptedData);
}
4.3.14. 房间销毁流程(Room Destruction)
源码文件:src/worker/Router.cpp
// 销毁房间(核心函数)
void Router::destroy() {
// 1. 关闭所有生产者
for (auto& producer : _producers) {
producer.second->Close();
}
// 2. 关闭所有消费者
for (auto& consumer : _consumers) {
consumer.second->Close();
}
// 3. 重置统计(释放资源)
_stats.reset();
// 4. 通知JS层(关键!触发前端清理)
_channel->emit("router.destroyed", _id);
}
4.3.15. 动态调整传输参数流程(Transport Params Adjustment)
源码文件:src/worker/WebRtcTransport.cpp
// 动态调整带宽(关键!实时调节)
void WebRtcTransport::setBitrate(size_t bitrate) {
// 1. 更新内部参数
_bitrate = bitrate;
// 2. 通知ICE层(调整候选优先级)
_iceHandler->adjustCandidatePriority(bitrate);
// 3. 通知JS层(前端可显示带宽)
_channel->emit("transport.bitrate", bitrate);
}
// 调整ICE候选优先级(关键!)
void IceHandler::adjustCandidatePriority(size_t bitrate) {
// 1. 低带宽时优先选择低延迟候选
if (bitrate < 1000) {
sort(_localCandidates.begin(), _localCandidates.end(),
[](const IceCandidate& a, const IceCandidate& b) {
return a.latency < b.latency; // 低延迟优先
});
} else {
sort(_localCandidates.begin(), _localCandidates.end(),
[](const IceCandidate& a, const IceCandidate& b) {
return a.bandwidth > b.bandwidth; // 高带宽优先
});
}
}
4.3.16. 服务端主动踢出客户端流程(Server Kickout)
源码文件:src/worker/Worker.cpp
// 强制关闭传输(关键!用于权限管理)
void Worker::kickClient(const std::string& transportId) {
WebRtcTransport* transport = getTransport(transportId);
if (transport) {
transport->forceClose(); // 触发关闭
}
}
// forceClose实现(关键!立即切断)
void WebRtcTransport::forceClose() {
// 1. 关闭DTLS(强制终止)
_dtlsHandler->close();
// 2. 关闭ICE(断开网络)
_iceHandler->close();
// 3. 通知JS层(触发前端重连)
_channel->emit("transport.closed", getId());
}
4.3.17. RTP解码器初始化流程(Decoder Initialization)
源码文件:src/worker/Decoder.cpp
// 初始化解码器(关键!编解码参数匹配)
Decoder::Decoder(const std::string& codec) {
// 1. 根据codec创建FFmpeg上下文
if (codec == "H264") {
_avCodecContext = avcodec_alloc_context3(avcodec_find_decoder(AV_CODEC_ID_H264));
} else if (codec == "VP8") {
_avCodecContext = avcodec_alloc_context3(avcodec_find_decoder(AV_CODEC_ID_VP8));
}
// 2. 配置参数(关键!与生产者一致)
_avCodecContext->pix_fmt = AV_PIX_FMT_YUV420P;
avcodec_open2(_avCodecContext, nullptr, nullptr);
// 3. 通知就绪(触发前端渲染)
_channel->emit("decoder.ready", true);
}
4.3.18. 传输层迁移流程(Transport Migration)
源码文件:src/worker/WebRtcTransport.cpp
// 迁移传输(关键!无缝切换)
void WebRtcTransport::migrateTo(WebRtcTransport* newTransport) {
// 1. 停止当前传输
stop();
// 2. 复制关键状态(避免中断)
newTransport->setDtlsContext(_dtlsHandler->getContext());
newTransport->setIceState(_iceHandler->getState());
// 3. 通知JS层(返回新ID)
_channel->emit("transport.migrated", newTransport->getId());
}
// 停止当前传输(关键!)
void WebRtcTransport::stop() {
_dtlsHandler->close(); // 关闭DTLS
_iceHandler->close(); // 关闭ICE
_isConnected = false;
}
4.4、核心文件解析
🔴 4.4.1. Worker.h(系统总控)
// Worker.h
Worker::Worker(Channel* channel, PayloadChannel* payloadChannel)
: _channel(channel), _payloadChannel(payloadChannel) {
// 注册消息监听器,接收JS指令
_channel->setListener(this);
_payloadChannel->setListener(this);
}
// 创建房间(核心函数)
Router* Worker::createRouter(const RouterOptions& options) {
std::string routerId = generateId(); // 生成唯一房间ID
Router* router = new Router(_channel, routerId, options);
_routers[routerId] = router; // 保存到房间管理器
_channel->emit("router.created", routerId); // 通知JS层
return router;
}
// 创建传输通道(核心函数)
WebRtcTransport* Worker::createWebRtcTransport(
const std::string& routerId,
const WebRtcTransportOptions& options
) {
Router* router = getRouter(routerId); // 获取指定房间
WebRtcTransport* transport = new WebRtcTransport(_channel, routerId, options);
router->AddTransport(transport); // 注册到房间
return transport;
}
🔴 4.4.2. WebRtcTransport.h(视频通道核心)
// WebRtcTransport.h
void WebRtcTransport::connect(const WebRtcTransportConnectOptions& options) {
// 1. 生成本地ICE候选(网络连接)
std::vector<IceCandidate> localCandidates = _iceHandler->createIceCandidates();
sendIceCandidates(localCandidates); // 发送ICE候选给远端
// 2. 启动DTLS握手(加密通道)
_dtlsHandler->startHandshake();
_isConnected = true; // 标记连接成功
_channel->emit("transport.connected", id()); // 通知JS层
}
// 发送RTP数据包(核心函数)
void WebRtcTransport::sendRtp(const RtpPacket* packet) {
if (!_isConnected) return; // 检查连接状态
// 1. DTLS加密(安全传输)
std::vector<uint8_t> encrypted = _dtlsHandler->encrypt(packet->data(), packet->size());
// 2. 通过ICE通道发送(网络传输)
_iceHandler->sendData(encrypted);
// 3. 更新统计信息(性能监控)
_stats.rtpPacketsSent++;
_stats.bitrate += packet->size();
}
🔴 4.4.3. IceHandler.h(网络连接处理)
// IceHandler.h
// 生成本地ICE候选(核心函数)
vector<IceCandidate> IceHandler::createIceCandidates() {
vector<IceCandidate> candidates;
// 1. 生成STUN服务器候选
candidates.push_back(generateStunCandidate());
// 2. 生成TURN服务器候选
candidates.push_back(generateTurnCandidate());
return candidates;
}
// 添加远端ICE候选(核心函数)
void IceHandler::addRemoteCandidate(const IceCandidate& candidate) {
_remoteCandidates.push_back(candidate); // 保存远端候选
// 3. 尝试建立连接(实际逻辑在connect方法中)
}
🔴 4.4.4. DtlsHandler.h(加密通道处理)
// DtlsHandler.h
// 启动DTLS握手(核心函数)
void DtlsHandler::startHandshake() {
_context.startHandshake(); // 启动DTLS上下文
_handshakeComplete = false;
}
// 处理DTLS消息(核心函数)
void DtlsHandler::onDtlsMessage(const vector<uint8_t>& message) {
if (!_handshakeComplete) {
_context.processMessage(message); // 处理握手消息
if (_context.isHandshakeComplete()) {
_handshakeComplete = true;
// 握手完成后的处理
}
} else {
// 已完成握手,处理应用数据
_context.processData(message);
}
}
🔴 4.4.5. Producer.h(媒体生产者)
// Producer.h
// 发送RTP数据包(核心函数)
void Producer::SendRtpPacket(const RtpPacket* packet) {
if (_paused || _isClosed) return; // 检查暂停/关闭状态
if (!_transport) {
throw std::runtime_error("Transport not set"); // 传输通道未设置
}
_transport->sendRtp(packet); // 通过传输通道发送
// 更新统计信息
_stats.rtpPacketsSent++;
_stats.bitrate += packet->size();
}
// 关闭生产者(核心函数)
void Producer::Close() {
if (_isClosed) return;
_isClosed = true;
_router->RemoveProducer(_id); // 从房间移除
_channel->emit("producer.closed", _id); // 通知JS层
delete _transport; // 释放传输通道
_transport = nullptr;
}
🔴 4.4.6. Consumer.h(媒体消费者)
// Consumer.h
// 处理RTP包(核心函数)
void Consumer::OnRtpPacket(RtpPacket* packet) {
if (_paused || _isClosed) return; // 检查暂停/关闭状态
_decoder->Decode(packet); // 交给解码器处理
// 更新统计信息
_stats.rtpPacketsReceived++;
_stats.bitrate += packet->size();
}
// 关闭消费者(核心函数)
void Consumer::Close() {
if (_isClosed) return;
_isClosed = true;
_router->RemoveConsumer(_id); // 从房间移除
_decoder->Stop(); // 停止解码器
delete _decoder; // 释放解码器
_decoder = nullptr;
}
🔴 4.4.7. Router.h(房间管理器)
核心作用:管理房间内媒体生产者/消费者,维护房间状态
// Router.h
class Router {
public:
Router(Channel* channel, const std::string& id, const RouterOptions& options)
: _channel(channel), _id(id), _options(options) {}
// 添加媒体生产者(核心函数)
void AddProducer(Producer* producer) {
_producers[producer->getId()] = producer;
_channel->emit("producer.added", producer->getId()); // 通知JS层
}
// 添加媒体消费者(核心函数)
void AddConsumer(Consumer* consumer) {
_consumers[consumer->getId()] = consumer;
_channel->emit("consumer.added", consumer->getId()); // 通知JS层
}
// 移除生产者(核心函数)
void RemoveProducer(const std::string& producerId) {
auto it = _producers.find(producerId);
if (it != _producers.end()) {
delete it->second; // 释放内存
_producers.erase(it);
}
}
// 获取房间ID
const std::string& getId() const { return _id; }
private:
Channel* _channel; // 通信通道
std::string _id; // 房间唯一ID
RouterOptions _options; // 房间配置
std::map<std::string, Producer*> _producers; // 生产者列表
std::map<std::string, Consumer*> _consumers; // 消费者列表
};
🔴 4.4.8. RtpReceiver.h(RTP包接收)
核心作用:处理接收到的RTP数据包,供Consumer使用
// RtpReceiver.h
class RtpReceiver {
public:
RtpReceiver() = default;
~RtpReceiver() = default;
// 接收RTP包(核心函数)
void ReceiveRtpPacket(const RtpPacket* packet) {
// 1. 检查是否已关闭
if (_isClosed) return;
// 2. 分发给Consumer(实际在WebRtcTransport中调用)
_onRtpPacket(packet);
}
// 注册RTP包处理回调(供WebRtcTransport使用)
void setOnRtpPacketCallback(std::function<void(const RtpPacket*)> callback) {
_onRtpPacket = callback;
}
private:
std::function<void(const RtpPacket*)> _onRtpPacket; // 回调函数
bool _isClosed = false; // 是否已关闭
};
🔴 4.4.9. Stats.h(统计信息)
核心作用:实时收集媒体传输性能数据
// Stats.h
struct Stats {
size_t rtpPacketsSent = 0; // 已发送RTP包数
size_t rtpPacketsReceived = 0; // 已接收RTP包数
size_t bitrate = 0; // 当前比特率(字节/秒)
size_t packetsLost = 0; // 丢失包数
double jitter = 0.0; // 抖动(毫秒)
// 重置统计信息(用于新房间)
void reset() {
rtpPacketsSent = rtpPacketsReceived = bitrate = 0;
packetsLost = jitter = 0;
}
};
🔴 4.4.10. Channel.h(主通信通道)
核心作用:Worker与JavaScript层的通信枢纽
// Channel.h
class Channel {
public:
using Listener = std::function<void(const napi::Value&)>;
// 设置消息监听器(核心函数)
void setListener(Listener listener) {
_listener = listener;
}
// 发送事件给JS层(核心函数)
void emit(const std::string& event, const std::string& data) {
napi::Value message = napi::CreateObject();
napi::SetValue(message, "event", event);
napi::SetValue(message, "data", data);
_listener(message); // 触发JS回调
}
// 启动通道(核心函数)
void start() {
// 初始化底层通信(如WebSocket)
_isRunning = true;
}
private:
Listener _listener;
bool _isRunning = false;
};
🔴 4.4.11. PayloadChannel.h(大块数据通道)
核心作用:专门处理RTP大包传输(避免阻塞主通道)
// PayloadChannel.h
class PayloadChannel {
public:
// 设置大包接收回调(核心函数)
void setListener(std::function<void(const std::vector<uint8_t>&)> listener) {
_payloadListener = listener;
}
// 发送大块数据(核心函数)
void sendPayload(const std::vector<uint8_t>& payload) {
if (!_payloadListener) return;
_payloadListener(payload); // 直接触发回调
}
private:
std::function<void(const std::vector<uint8_t>&)> _payloadListener;
};
🔴 4.4.12. Transport.h(传输基类)
核心作用:定义传输层通用接口,
WebRtcTransport继承自它
// Transport.h
class Transport {
public:
virtual ~Transport() = default;
// 建立传输连接(纯虚函数,子类必须实现)
virtual void connect(const TransportConnectOptions& options) = 0;
// 发送RTP包(纯虚函数)
virtual void sendRtp(const RtpPacket* packet) = 0;
// 获取传输ID
const std::string& getId() const { return _id; }
protected:
Transport(const std::string& id) : _id(id) {}
private:
std::string _id; // 传输唯一ID
};
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)