WEBRTC 带你精通《平滑发送模块》下
📊图1:WebRTC平滑发送机制概览- 展示网络拥塞问题与WebRTC Pacing解决方案的对比在实时音视频通信的世界里,网络就像一条拥挤的高速公路。如果所有车辆(数据包)都同时涌入,必然会造成严重的交通拥堵。WebRTC通过精巧的Pacing机制来解决这一问题,就像在高速公路入口设置红绿灯一样,控制车辆有序进入。特性功能描述⚡平滑发送避免突发拥塞🎯优先级保证音频包优先发送📊自适应调整应对
前言:为什么需要平滑发送?

📊 图1:WebRTC平滑发送机制概览 - 展示网络拥塞问题与WebRTC Pacing解决方案的对比
在实时音视频通信的世界里,网络就像一条拥挤的高速公路。如果所有车辆(数据包)都同时涌入,必然会造成严重的交通拥堵。WebRTC通过精巧的Pacing机制来解决这一问题,就像在高速公路入口设置红绿灯一样,控制车辆有序进入。
现实中的痛点场景
想象这样几个场景:
- 视频会议中的卡顿:当你在视频会议中移动时,编码器可能瞬间产生大量关键帧数据
- 直播推流的不稳定:主播在游戏中快速移动,码率突然飙升导致推流中断
- 音视频不同步:网络波动时音频包被视频包"挤占",造成延迟不一致
这些问题的根源都指向一个核心:如何平滑、有序地发送数据包。
核心架构:七大模块的精妙协作
WebRTC的Pacing机制由七个核心模块组成,每个模块各司其职,又相互配合:
🏗️ WebRTC Pacing 核心架构 - 七大模块协同工作
═══════════════════════════════════════════════════════════════════
┌─────────────── 🎯 应用层 ───────────────────┐
│ │
│ 📦 RtpSender ┌──→ 🎪 TaskQueuePacedSender │
│ (数据包生产者) │ (异步任务调度器) │
│ ↓ │ │
└────────┼───────────────┼────────────────────────────┘
│ │
┌────────┼───── 🧠 调速控制层 ──────┼──────────────────┐
│ ↓ ↓ │ │
│ 🎛️ PacingController (核心控制器) │ │
│ ↓ ↙ ↘ │ │
│ 📦 PrioritizedPacketQueue 📡 BitrateProber │
│ (优先级队列) (码率探测器) │
│ ↓ │
│ ├─ 🔄 StreamQueue (流级队列) │
│ └─ 🎪 Round-Robin (轮询调度) │
└────────┼───────────────────────────┼────────────────┘
│ │
┌────────┼─── 🌐 网络发送层 ──────────┼────────────────┐
│ ↓ │ │
│ 📤 PacketSender ──→ 🚀 RtpSenderEgress │
│ (发送接口) (出口处理) │
│ ↓ │
│ 🌐 Network Transport │
│ (网络传输) │
└───────────────────────────┼────────────────────────┘
⚙️ 支撑组件: 📊 关键状态:
├─ ⏰ Clock (时钟服务) ├─ 💰 media_debt_ (媒体债务)
├─ 🔧 FieldTrials (配置) ├─ 🎯 pacing_rate_ (调速速率)
└─ 📈 Statistics (监控) └─ ⚡ adjusted_rate_ (调整速率)
1. TaskQueuePacedSender:异步任务的指挥官
class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
public:
void EnqueuePackets(std::vector<std::unique_ptr<RtpPacketToSend>> packets) override;
void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override;
private:
PacingController pacing_controller_;
TimeDelta max_hold_back_window_; // 最大延迟窗口
bool is_started_;
};
设计思考:为什么需要TaskQueue?
- 避免阻塞:网络发送不能阻塞主线程
- 精确调度:需要精确的时间控制
- 异常隔离:网络异常不影响其他模块
2. PacingController:漏桶算法的大脑
PacingController是整个系统的核心,实现了改进的漏桶算法:
class PacingController {
private:
// 核心状态变量
DataSize media_debt_; // 媒体包债务
DataSize padding_debt_; // 填充包债务
DataRate pacing_rate_; // 目标发送速率
DataRate adjusted_media_rate_; // 调整后的媒体速率
// 核心算法
void UpdateBudgetWithElapsedTime(TimeDelta delta);
void UpdateBudgetWithSentData(DataSize size);
void ProcessPackets();
};
3. PrioritizedPacketQueue:智能的优先级调度器
class PrioritizedPacketQueue {
private:
static constexpr int kNumPriorityLevels = 5;
std::deque<QueuedPacket> packets_[kNumPriorityLevels];
std::unordered_map<uint32_t, std::unique_ptr<StreamQueue>> streams_;
};
优先级策略:
- Audio (最高优先级) - 音频包延迟敏感
- Retransmission - 重传包需要快速恢复
- Video - 视频包正常优先级
- FEC - 前向纠错包
- Padding (最低优先级) - 填充包
漏桶算法的工程实现详解
核心思想:预算管理
WebRTC的Pacing不是简单的令牌桶,而是基于"债务预算"的改进算法:

💰 图3:债务预算模型 vs 传统令牌桶算法对比 - 展示WebRTC独特的债务预算机制优势
💳 债务预算算法的四大核心优势:
- ⚡ 允许透支:应对突发流量,视频关键帧优先发送
- 🎯 精确控制:债务量精确对应发送速率,控制更细粒度
- 📊 动态调整:根据网络状况实时调整速率
- 🔄 平滑发送:避免过度突发,保证网络传输稳定性
📈 实际运行示例 (以2Mbps速率为例):
t=0ms: debt=0KB → 发送1KB包 → debt=1KBt=10ms: debt=1KB-2.5KB=-1.5KB → 可发送3.5KB突发t=20ms: debt=-1.5KB+2KB=0.5KB → 继续正常发送- 上限保护:
max_debt = 2Mbps × 30ms = 7.5KB
void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) {
// 时间流逝,减少债务(增加可用预算)
media_debt_ -= std::min(media_debt_, adjusted_media_rate_ * delta);
padding_debt_ -= std::min(padding_debt_, padding_rate_ * delta);
}
void PacingController::UpdateBudgetWithSentData(DataSize size) {
// 发送数据包,增加债务(减少可用预算)
media_debt_ += size;
media_debt_ = std::min(media_debt_, adjusted_media_rate_ * kMaxDebtInTime);
}
发送决策逻辑
std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(
const PacedPacketInfo& pacing_info,
Timestamp target_send_time,
Timestamp now) {
// 1. 检查是否为探测包(探测包有最高优先级)
const bool is_probe = pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe;
// 2. 检查队列是否为空
if (packet_queue_.Empty()) {
return nullptr;
}
// 3. 检查是否允许发送(核心预算检查)
if (NextUnpacedSendTime().IsInfinite() && !is_probe) {
if (congested_) {
return nullptr; // 拥塞时暂停发送
}
// 预算检查:计算"清零"当前债务需要的时间
TimeDelta flush_time = media_debt_ / adjusted_media_rate_;
if (now + flush_time > target_send_time) {
return nullptr; // 预算不足,等待
}
}
return packet_queue_.Pop();
}
设计亮点:
- 提前发送检查:允许在预算"即将足够"时提前发送
- 探测包优先:网络探测包绕过正常的预算检查
- 拥塞感知:拥塞时自动暂停发送
ProcessPackets:主处理循环

🚀 图4:ProcessPackets主处理循环详解 - 展示WebRTC发送主循环的完整逻辑流程
🔒 双重保护机制:
- 断路器检查:
iteration < circuit_breaker_threshold(防死循环) - 队列积压检查:
media_debt vs max_wait_time(防过度积压)
void PacingController::ProcessPackets() {
const Timestamp now = CurrentTime();
// 1. 计算处理间隔,用于统计
TimeDelta pacer_interval = now - last_pacer_time_;
// 2. 保活机制:长时间无数据时发送keep-alive
if (ShouldSendKeepalive(now)) {
std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets =
packet_sender_->GeneratePadding(DataSize::Bytes(1));
// 发送1字节填充包维持连接
}
// 3. 更新预算(漏桶算法核心)
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_send_time);
if (elapsed_time > TimeDelta::Zero()) {
UpdateBudgetWithElapsedTime(elapsed_time);
}
// 4. 探测模式处理
bool is_probing = prober_.is_probing();
if (is_probing) {
pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());
recommended_probe_size = prober_.RecommendedMinProbeSize();
}
// 5. 主发送循环(带断路器保护)
DataSize data_sent = DataSize::Zero();
for (int iteration = 0; iteration < circuit_breaker_threshold_; ++iteration) {
std::unique_ptr<RtpPacketToSend> rtp_packet =
GetPendingPacket(pacing_info, target_send_time, now);
if (rtp_packet == nullptr) {
// 尝试发送填充包
DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
if (padding_to_add > DataSize::Zero()) {
// 生成并入队填充包
continue;
}
break; // 无包可发,退出循环
}
// 发送数据包
packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);
data_sent += packet_size;
// 更新预算和统计
UpdatePacingStats(packet_type, packet_size, pacer_interval);
OnPacketSent(packet_type, packet_size, now);
// 探测模式下的特殊逻辑
if (is_probing && data_sent >= recommended_probe_size) {
break;
}
// 动态退出条件:防止队列积压过多
if (media_debt_.bytes() > adjusted_media_rate_.bps() / 8 * 25 / 1000 &&
max_wait_time < 50) {
break;
}
}
}
优先级队列:智能调度的秘密
数据结构设计
class PrioritizedPacketQueue {
private:
// 每个优先级一个队列
std::deque<QueuedPacket> packets_[kNumPriorityLevels];
// 每个SSRC一个流队列,支持轮询调度
std::unordered_map<uint32_t, std::unique_ptr<StreamQueue>> streams_;
// 按优先级组织的活跃流队列
std::deque<StreamQueue*> streams_by_prio_[kNumPriorityLevels];
// 当前最高活跃优先级
int top_active_prio_level_;
};
调度算法:轮询+优先级

🎪 图5:智能优先级调度算法 - 展示5级优先级+Round-Robin轮询的完整调度机制
🔄 Round-Robin 轮询机制详解:
同优先级内的公平调度:
- 第1轮:流1发送 → 流2发送 → 流3发送 → 流4发送
- 第2轮:流2发送 → 流3发送 → 流4发送 → 流1发送
- 第3轮:流3发送 → 流4发送 → 流1发送 → 流2发送
✅ 核心保证:
- 每个流都有公平的发送机会
- 避免单个流独占带宽资源
std::unique_ptr<RtpPacketToSend> PrioritizedPacketQueue::Pop() {
// 1. 从最高优先级开始查找
for (int prio = top_active_prio_level_; prio < kNumPriorityLevels; ++prio) {
if (!streams_by_prio_[prio].empty()) {
// 2. 轮询该优先级下的所有流
StreamQueue* stream = streams_by_prio_[prio].front();
streams_by_prio_[prio].pop_front();
// 3. 从流中取出一个包
QueuedPacket packet = stream->DequeuePacket(prio);
// 4. 如果流还有包,重新加入轮询队列
if (stream->HasPacketsAtPrio(prio)) {
streams_by_prio_[prio].push_back(stream);
}
return std::move(packet.packet);
}
}
return nullptr;
}
调度公平性:
- 优先级保证:高优先级总是先于低优先级
- 流间公平:同优先级不同流轮询调度
- 防饥饿:低优先级流不会被完全饿死
TTL机制:包的生命周期管理
struct PacketQueueTTL {
TimeDelta audio_retransmission = TimeDelta::PlusInfinity();
TimeDelta video_retransmission = TimeDelta::PlusInfinity();
TimeDelta video = TimeDelta::PlusInfinity();
};
void PrioritizedPacketQueue::PurgeOldPacketsAtPriorityLevel(int prio_level, Timestamp now) {
// 清理过期的数据包,防止队列积压
TimeDelta ttl = time_to_live_per_prio_[prio_level];
// ... 清理逻辑
}
网络探测:BitrateProber的智能感知
探测集群的概念
struct ProbeCluster {
PacedPacketInfo pace_info;
int sent_probes = 0;
int sent_bytes = 0;
TimeDelta min_probe_delta = TimeDelta::Zero();
Timestamp requested_at = Timestamp::MinusInfinity();
Timestamp started_at = Timestamp::MinusInfinity();
};
探测时机控制
void BitrateProber::OnIncomingPacket(DataSize packet_size) {
// 只有足够大的包才能触发探测
if (ReadyToSetActiveState(packet_size)) {
MaybeSetActiveState(packet_size);
}
}
bool BitrateProber::ReadyToSetActiveState(DataSize packet_size) const {
// 检查包大小是否满足探测要求
return packet_size >= config_.min_packet_size ||
config_.allow_start_probing_immediately;
}
探测包发送控制

📡 图6:BitrateProber网络探测流程时序图 - 展示完整的带宽探测过程和时机控制
🔧 关键参数说明:
probe_cluster_id: 探测集群标识 (0, 1, 2…)target_data_rate: 目标探测速率 (2Mbps, 4Mbps…)min_probe_delta: 最小包间隔 (1ms, 2ms…)target_probe_count: 目标包数量 (通常5个包)
📊 探测结果应用:探测结果用于带宽估计和拥塞控制算法
Timestamp BitrateProber::NextProbeTime(Timestamp now) const {
if (clusters_.empty() || probing_state_ != ProbingState::kActive) {
return Timestamp::PlusInfinity();
}
const ProbeCluster& cluster = clusters_.front();
return CalculateNextProbeTime(cluster);
}
Timestamp BitrateProber::CalculateNextProbeTime(const ProbeCluster& cluster) const {
// 根据目标码率计算下一个探测包的发送时间
TimeDelta probe_interval = DataSize::Bytes(cluster.pace_info.probe_cluster_min_bytes) /
cluster.pace_info.probe_cluster_target_bitrate;
return cluster.started_at + probe_interval * cluster.sent_probes;
}
自适应速率调整:应对网络波动
队列长度感知
void PacingController::MaybeUpdateMediaRateDueToLongQueue(Timestamp now) {
// 基础速率:原始pacing_rate的2.6倍
adjusted_media_rate_ = pacing_rate_ * 13 / 5;
if (!drain_large_queues_) {
return;
}
DataSize queue_size_data = QueueSizeData();
if (queue_size_data > DataSize::Zero()) {
packet_queue_.UpdateAverageQueueTime(now);
TimeDelta avg_time_left = std::max(TimeDelta::Millis(1),
queue_time_limit_ - packet_queue_.AverageQueueTime());
DataRate min_rate_needed = queue_size_data / avg_time_left;
// 根据剩余时间动态调整权重
if (avg_time_left.ms() < 30) {
// 紧急情况:1:1权重
adjusted_media_rate_ = min_rate_needed / 2 + adjusted_media_rate_ / 2;
} else if (avg_time_left.ms() < 55) {
// 较紧急:6:4权重
adjusted_media_rate_ = min_rate_needed * 6 / 10 + adjusted_media_rate_ * 4 / 10;
} else if (avg_time_left.ms() < 75) {
// 一般:7:3权重
adjusted_media_rate_ = min_rate_needed * 7 / 10 + adjusted_media_rate_ * 3 / 10;
} else if (avg_time_left.ms() < 110) {
// 轻微:8:2权重
adjusted_media_rate_ = min_rate_needed * 8 / 10 + adjusted_media_rate_ * 2 / 10;
}
// 设置上限防止过度加速
adjusted_media_rate_ = std::min(adjusted_media_rate_,
DataRate::KilobitsPerSec(9 * 1050));
}
}
自适应策略:
- 渐进调整:根据队列压力程度分级调整
- 权重平衡:在所需速率和当前速率间加权平均
- 上限保护:防止速率无限增长
统计监控:可观测性的重要性
性能指标收集

📊 图8:PacingController实时监控仪表盘 - 展示完整的性能监控和告警体系
常见问题与解决方案
Q1: 为什么不用简单的令牌桶算法?
A: 令牌桶有几个局限性:
- 突发处理差:无法很好处理视频关键帧等突发流量
- 状态简单:只有令牌数量一个状态,难以精确控制
- 适应性差:无法根据网络状况动态调整
WebRTC的"债务预算"模型更灵活:
- 允许透支:可以在未来时间内"借债"发送
- 精确控制:债务量精确对应发送速率
- 动态调整:可以根据队列长度实时调整速率
Q2: 如何处理关键帧的优先发送?
void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
if (keyframe_flushing_ &&
packet->packet_type() == RtpPacketMediaType::kVideo &&
packet->is_key_frame() && packet->is_first_packet_of_frame() &&
!packet_queue_.HasKeyframePackets(packet->Ssrc())) {
// 关键帧优先:清空该流的现有包
packet_queue_.RemovePacketsForSsrc(packet->Ssrc());
std::optional<uint32_t> rtx_ssrc =
packet_sender_->GetRtxSsrcForMedia(packet->Ssrc());
if (rtx_ssrc) {
packet_queue_.RemovePacketsForSsrc(*rtx_ssrc);
}
}
packet_queue_.Push(now, std::move(packet));
}
Q3: 如何防止音频包被视频包"饿死"?
A: 通过两种机制保护音频:
- 优先级机制:音频包始终在最高优先级队列
- Unpaced发送:音频包可以绕过Pacing直接发送
Timestamp PacingController::NextUnpacedSendTime() const {
if (!pace_audio_) {
Timestamp leading_audio_send_time =
packet_queue_.LeadingPacketEnqueueTime(RtpPacketMediaType::kAudio);
if (leading_audio_send_time.IsFinite()) {
return leading_audio_send_time; // 音频包立即发送
}
}
return Timestamp::MinusInfinity();
}
Q4: 如何避免Pacing过程中的死循环?
A: 通过断路器(Circuit Breaker)机制:
for (; iteration < circuit_breaker_threshold_; ++iteration) {
// 发送逻辑
}
if (iteration >= circuit_breaker_threshold_) {
RTC_LOG(LS_ERROR) << "PacingController exceeded max iterations";
// 记录详细的调试信息
// 强制退出避免死循环
return;
}
性能优化技巧
1. Burst模式优化
// 允许突发发送,减少系统调用次数
TimeDelta send_burst_interval_ = TimeDelta::Millis(11); // 默认11ms突发间隔
// 在GetPendingPacket中的检查
if (now <= target_send_time && send_burst_interval_.IsZero()) {
// 不允许突发时的严格检查
TimeDelta flush_time = media_debt_ / adjusted_media_rate_;
if (now + flush_time > target_send_time) {
return nullptr;
}
}
2. 任务队列优化
void TaskQueuePacedSender::MaybeProcessPackets(Timestamp scheduled_process_time) {
// 防止重入的保护机制
if (processing_packets_) return;
processing_packets_ = true;
// RAII自动清理
absl::Cleanup cleanup = [this] {
processing_packets_ = false;
};
// 处理逻辑...
}
3. 内存池优化
WebRTC在数据包管理中大量使用智能指针和移动语义,避免不必要的拷贝:
void TaskQueuePacedSender::EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
// 移动语义避免拷贝
for (auto& packet : packets) {
pacing_controller_.EnqueuePacket(std::move(packet));
}
}
总结
🎪 WebRTC Pacing五大核心特性:
| 特性 | 功能描述 |
|---|---|
| ⚡ 平滑发送 | 避免突发拥塞 |
| 🎯 优先级保证 | 音频包优先发送 |
| 📊 自适应调整 | 应对网络变化 |
| 🔍 主动探测 | 感知带宽变化 |
| 📈 可观测性 | 完善监控体系 |
💡 设计理念与工程价值:
- 🔧 工程化思维:考虑边界情况和异常处理,生产级可靠性
- ⚡ 性能优化:Burst、任务队列等机制,追求极致效率
- 🎛️ 可配置性:FieldTrials支持,灵活的策略调整
- 🔍 可维护性:清晰模块划分,优雅的接口设计
- 📊 可观测性:完善监控统计,助力问题诊断
WebRTC的Pacing机制是一个精心设计的系统工程,它不仅仅是简单的流量控制,更是一个集成了:
- 智能调度:基于优先级和公平性的包调度
- 自适应控制:根据网络状况动态调整发送策略
- 网络感知:主动探测网络带宽变化
- 可观测性:完善的监控和统计机制
这套机制的成功之处在于:
- 工程化思维:考虑了各种边界情况和异常处理
- 性能优化:通过Burst、任务队列等机制提升效率
- 可配置性:通过FieldTrials支持灵活的策略调整
- 可维护性:清晰的模块划分和接口设计
对于实时音视频开发者而言,理解这套机制不仅有助于更好地使用WebRTC,也为设计自己的流量控制系统提供了宝贵的参考。
这篇文章基于WebRTC M114版本的源码分析,如果你对某个细节有疑问,欢迎在评论区讨论。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)