WEBRTC 带你精通《平滑发送模块》中
💡 体现的核心设计原则现在我们来到了整个视频发送流程的"终极大脑":PacingController::EnqueuePacket。这是决定何时、如何发送每个RTP包的智能调度中心!
现在我们来看 SendVideo 流程的最后一环:LogAndSendToNetwork 方法。这是整个视频发送链路的"最后一关"!
LogAndSendToNetwork 方法概览

第一部分:线程安全的统计计算
void RTPSenderVideo::LogAndSendToNetwork(
std::vector<std::unique_ptr<RtpPacketToSend>> packets,
size_t encoder_output_size) {
{
MutexLock lock(&stats_mutex_);
size_t packetized_payload_size = 0;
for (const auto& packet : packets) {
if (*packet->packet_type() == RtpPacketMediaType::kVideo) {
packetized_payload_size += packet->payload_size();
}
}
这就像快递站的"出库统计员",在包裹发出前进行最后的统计记录:
为什么只统计视频包?
第二部分:编码后开销计算
// AV1 and H264 packetizers may produce less packetized bytes than
// unpacketized.
if (packetized_payload_size >= encoder_output_size) {
post_encode_overhead_bitrate_.Update(
packetized_payload_size - encoder_output_size, clock_->CurrentTime());
}
这就像比较"原材料重量"和"成品重量"的差异:
为什么要检查 packetized_payload_size >= encoder_output_size?
第三部分:网络发送
} // 统计锁作用域结束
rtp_sender_->EnqueuePackets(std::move(packets));
}
这就像把所有准备好的包裹交给"发货部门":
设计分析:为什么分离统计和发送?

post_encode_overhead_bitrate_ 统计的重要性
完整的视频发送流程总结
现在让我们把从 OnEncodedImage 到 LogAndSendToNetwork 的完整流程串联起来:
void RTPSender::EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
RTC_DCHECK(!packets.empty());
Timestamp now = clock_->CurrentTime();
for (auto& packet : packets) {
RTC_DCHECK(packet);
RTC_CHECK(packet->packet_type().has_value())
<< "Packet type must be set before sending.";
if (packet->capture_time() <= Timestamp::Zero()) {
packet->set_capture_time(now);
}
}
paced_sender_->EnqueuePackets(std::move(packets));
}
为什么 packet_type 必须设置?
将需要发送的packets添加到平滑发送模块,接下来就是真正的平滑发送的核心逻辑了。
第一部分:异步任务队列机制
void TaskQueuePacedSender::EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
task_queue_->PostTask(
SafeTask(safety_.flag(), [this, packets = std::move(packets)]() mutable {
这就像现代快递公司的"智能调度中心":
关键设计解析
- SafeTask:对象生命周期保护

- task_queue_->PostTask:专业分工机制

- Lambda捕获的零拷贝设计
[this, packets = std::move(packets)]() mutable

task_queue_->PostTask(
SafeTask(safety_.flag(), [this, packets = std::move(packets)]() mutable {
RTC_DCHECK_RUN_ON(task_queue_);
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"TaskQueuePacedSender::EnqueuePackets");
for (auto& packet : packets) {
TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"TaskQueuePacedSender::EnqueuePackets::Loop",
"sequence_number", packet->SequenceNumber(),
"rtp_timestamp", packet->Timestamp());
size_t packet_size = packet->payload_size() + packet->padding_size();
if (include_overhead_) {
packet_size += packet->headers_size();
}
packet_size_.Apply(1, packet_size);
RTC_DCHECK_GE(packet->capture_time(), Timestamp::Zero());
pacing_controller_.EnqueuePacket(std::move(packet));
}
MaybeProcessPackets(Timestamp::MinusInfinity());
})
为什么要灵活计算包大小?
第二部分:线程检查与性能追踪
RTC_DCHECK_RUN_ON(task_queue_);
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"TaskQueuePacedSender::EnqueuePackets");
这就像工人上岗前的"安全检查"和"工作记录":
第三部分:逐包处理循环
for (auto& packet : packets) {
TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"TaskQueuePacedSender::EnqueuePackets::Loop",
"sequence_number", packet->SequenceNumber(),
"rtp_timestamp", packet->Timestamp());
这就像传送带上的"质检员"逐个检查包裹:
第四部分:统计更新
packet_size_.Apply(1, packet_size);
这就像快递公司的"实时统计系统":
第五部分:调度控制器入队
这就像把处理好的包裹交给"专业调度系统":
第六部分:立即处理尝试
MaybeProcessPackets(Timestamp::MinusInfinity());
这就像调度员说"看看能不能马上发货":
EnqueuePackets方法总结

💡 体现的核心设计原则
现在我们来到了整个视频发送流程的"终极大脑":PacingController::EnqueuePacket。这是决定何时、如何发送每个RTP包的智能调度中心!
PacingController::EnqueuePacket 整体架构

第一部分:前置验证与安全检查
void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
RTC_DCHECK(pacing_rate_ > DataRate::Zero())
<< "SetPacingRate must be called before InsertPacket.";
RTC_CHECK(packet->packet_type());
这就像高速公路收费站的"入口检查":
第二部分:关键帧冲刷机制(超重要!)
if (keyframe_flushing_ &&
packet->packet_type() == RtpPacketMediaType::kVideo &&
packet->is_key_frame() && packet->is_first_packet_of_frame() &&
!packet_queue_.HasKeyframePackets(packet->Ssrc())) {
// First packet of a keyframe (and no keyframe packets currently in the
// queue). Flush any pending packets currently in the queue for that stream
// in order to get the new keyframe out as quickly as possible.
packet_queue_.RemovePacketsForSsrc(packet->Ssrc());
std::optional<uint32_t> rtx_ssrc =
packet_sender_->GetRtxSsrcForMedia(packet->Ssrc());
if (rtx_ssrc) {
packet_queue_.RemovePacketsForSsrc(*rtx_ssrc);
}
}
这就像医院急诊室的"紧急通道"机制:
为什么关键帧需要特殊处理?
第三部分:带宽探测通知
prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));
这就像告诉"网络测速员"有新的数据包要发送:
第四部分:时间预算管理(复杂但重要)
const Timestamp now = CurrentTime();
if (packet_queue_.Empty()) {
// If queue is empty, we need to "fast-forward" the last process time,
// so that we don't use passed time as budget for sending the first new
// packet.
Timestamp target_process_time = now;
Timestamp next_send_time = NextSendTime();
if (next_send_time.IsFinite()) {
// There was already a valid planned send time, such as a keep-alive.
// Use that as last process time only if it's prior to now.
target_process_time = std::min(now, next_send_time);
}
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time));
}
这就像银行的"信用额度管理系统":
时间预算管理的重要性
第五部分:包大小计算与入队
DataSize packet_size = DataSize::Bytes(packet->payload_size());
if (include_overhead_) {
packet_size += DataSize::Bytes(packet->headers_size());
}
packet_queue_.Push(now, std::move(packet));
seen_first_packet_ = true;
这就像快递公司的"计费称重和入库":
第六部分:流量统计监控(自定义扩展)
if (0 == last_instart_10ms_time_.ms()) {
in_video_cnt_ = 0;
in_data_sent_ = 0;
in_cur_10ms_cnt_ = 0;
in_max_10ms_cnt_ = 0;
last_instart_10ms_time_ = now;
}
in_video_cnt_++;
in_cur_10ms_cnt_++;
in_data_sent_ += packet_size.bytes();
if (now - last_instart_10ms_time_ > TimeDelta::Millis(10)) {
if (in_cur_10ms_cnt_ > in_max_10ms_cnt_) {
in_max_10ms_cnt_ = in_cur_10ms_cnt_;
}
last_instart_10ms_time_ = now;
in_cur_10ms_cnt_ = 0;
}
这就像工厂的"实时生产监控看板":
第七部分:队列长度检查与自适应调整
// Queue length has increased, check if we need to change the pacing rate.
MaybeUpdateMediaRateDueToLongQueue(now);
这就像高速公路的"智能流量控制系统":
现在我们来到了整个视频发送系统的"心脏":TaskQueuePacedSender::MaybeProcessPackets。这是真正决定何时发送包的核心调度方法!
// EnqueuePacket方法之后直接开始处理数据包发送
MaybeProcessPackets(Timestamp::MinusInfinity());
MaybeProcessPackets 整体架构

第一部分:状态检查与重入保护
void TaskQueuePacedSender::MaybeProcessPackets(
Timestamp scheduled_process_time) {
RTC_DCHECK_RUN_ON(task_queue_);
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"TaskQueuePacedSender::MaybeProcessPackets");
if (is_shutdown_ || !is_started_) {
return;
}
// Protects against re-entry from transport feedback calling into the task
// queue pacer.
RTC_DCHECK(!processing_packets_);
processing_packets_ = true;
absl::Cleanup cleanup = [this] {
RTC_DCHECK_RUN_ON(task_queue_);
processing_packets_ = false;
};
这就像工厂车间的"安全检查和门禁系统":
第二部分:时间计算与提前执行机制
Timestamp next_send_time = pacing_controller_.NextSendTime();
RTC_DCHECK(next_send_time.IsFinite());
const Timestamp now = clock_->CurrentTime();
TimeDelta early_execute_margin =
pacing_controller_.IsProbing()
? PacingController::kMaxEarlyProbeProcessing
: TimeDelta::Zero();
这就像精密的"列车调度系统":
第三部分:包处理循环(核心发送逻辑)
// Process packets and update stats.
while (next_send_time <= now + early_execute_margin) {
pacing_controller_.ProcessPackets();
next_send_time = pacing_controller_.NextSendTime();
RTC_DCHECK(next_send_time.IsFinite());
// Probing state could change. Get margin after process packets.
early_execute_margin = pacing_controller_.IsProbing()
? PacingController::kMaxEarlyProbeProcessing
: TimeDelta::Zero();
}
这就像"智能交通灯的发车控制":
第四部分:统计更新与任务调度管理
UpdateStats();
// Ignore retired scheduled task, otherwise reset `next_process_time_`.
if (scheduled_process_time.IsFinite()) {
if (scheduled_process_time != next_process_time_) {
return;
}
next_process_time_ = Timestamp::MinusInfinity();
}
这就像"工厂的班次交接和任务管理":
第五部分:延迟窗口计算(高级优化机制)
// Do not hold back in probing.
TimeDelta hold_back_window = TimeDelta::Zero();
if (!pacing_controller_.IsProbing()) {
hold_back_window = max_hold_back_window_;
DataRate pacing_rate = pacing_controller_.pacing_rate();
if (max_hold_back_window_in_packets_ != kNoPacketHoldback &&
!pacing_rate.IsZero() &&
packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) {
TimeDelta avg_packet_send_time =
DataSize::Bytes(packet_size_.filtered()) / pacing_rate;
hold_back_window =
std::min(hold_back_window,
avg_packet_send_time * max_hold_back_window_in_packets_);
}
}
这就像"智能快递配送的批量优化策略":
第六部分:下次处理时间计算
// Calculate next process time.
TimeDelta time_to_next_process =
std::max(hold_back_window, next_send_time - now - early_execute_margin);
next_send_time = now + time_to_next_process;
这就像"精密的时钟调度算法":
第七部分:定时任务调度(自我驱动机制)
// If no in flight task or in flight task is later than `next_send_time`,
// schedule a new one. Previous in flight task will be retired.
if (next_process_time_.IsMinusInfinity() ||
next_process_time_ > next_send_time) {
// Prefer low precision if allowed and not probing.
task_queue_->PostDelayedHighPrecisionTask(
SafeTask(
safety_.flag(),
[this, next_send_time]() { MaybeProcessPackets(next_send_time); }),
time_to_next_process.RoundUpTo(TimeDelta::Millis(1)));
next_process_time_ = next_send_time;
}
这就像"智能闹钟的自动设置系统":
💡 体现的核心设计原则
完整流程总结
🎉 完整视频发送技术栈总结
下一章我们详解平滑发送的详细控制逻辑,期待~~~
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐

所有评论(0)