现在我们来看 SendVideo 流程的最后一环:LogAndSendToNetwork 方法。这是整个视频发送链路的"最后一关"!

LogAndSendToNetwork 方法概览

请添加图片描述

第一部分:线程安全的统计计算

void RTPSenderVideo::LogAndSendToNetwork(
    std::vector<std::unique_ptr<RtpPacketToSend>> packets,
    size_t encoder_output_size) {
  {
    MutexLock lock(&stats_mutex_);
    size_t packetized_payload_size = 0;
    for (const auto& packet : packets) {
      if (*packet->packet_type() == RtpPacketMediaType::kVideo) {
        packetized_payload_size += packet->payload_size();
      }
    }

这就像快递站的"出库统计员",在包裹发出前进行最后的统计记录:请添加图片描述
为什么只统计视频包?

请添加图片描述

第二部分:编码后开销计算

    // AV1 and H264 packetizers may produce less packetized bytes than
    // unpacketized.
    if (packetized_payload_size >= encoder_output_size) {
      post_encode_overhead_bitrate_.Update(
          packetized_payload_size - encoder_output_size, clock_->CurrentTime());
    }

这就像比较"原材料重量"和"成品重量"的差异:
请添加图片描述
为什么要检查 packetized_payload_size >= encoder_output_size?
请添加图片描述

第三部分:网络发送

  }  // 统计锁作用域结束

  rtp_sender_->EnqueuePackets(std::move(packets));
}

这就像把所有准备好的包裹交给"发货部门":请添加图片描述
设计分析:为什么分离统计和发送?
请添加图片描述
请添加图片描述
post_encode_overhead_bitrate_ 统计的重要性
请添加图片描述
完整的视频发送流程总结
现在让我们把从 OnEncodedImage 到 LogAndSendToNetwork 的完整流程串联起来:
请添加图片描述

void RTPSender::EnqueuePackets(
    std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
  RTC_DCHECK(!packets.empty());
  Timestamp now = clock_->CurrentTime();
  for (auto& packet : packets) {
    RTC_DCHECK(packet);
    RTC_CHECK(packet->packet_type().has_value())
        << "Packet type must be set before sending.";
    if (packet->capture_time() <= Timestamp::Zero()) {
      packet->set_capture_time(now);
    }
  }

  paced_sender_->EnqueuePackets(std::move(packets));
}

为什么 packet_type 必须设置?
请添加图片描述
将需要发送的packets添加到平滑发送模块,接下来就是真正的平滑发送的核心逻辑了。

第一部分:异步任务队列机制

void TaskQueuePacedSender::EnqueuePackets(
    std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
  task_queue_->PostTask(
      SafeTask(safety_.flag(), [this, packets = std::move(packets)]() mutable {

这就像现代快递公司的"智能调度中心":请添加图片描述

关键设计解析

  1. SafeTask:对象生命周期保护请添加图片描述
  2. task_queue_->PostTask:专业分工机制请添加图片描述
  3. Lambda捕获的零拷贝设计
[this, packets = std::move(packets)]() mutable

请添加图片描述

  task_queue_->PostTask(
      SafeTask(safety_.flag(), [this, packets = std::move(packets)]() mutable {
        RTC_DCHECK_RUN_ON(task_queue_);
        TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
                     "TaskQueuePacedSender::EnqueuePackets");
        for (auto& packet : packets) {
          TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
                       "TaskQueuePacedSender::EnqueuePackets::Loop",
                       "sequence_number", packet->SequenceNumber(),
                       "rtp_timestamp", packet->Timestamp());

          size_t packet_size = packet->payload_size() + packet->padding_size();
          if (include_overhead_) {
            packet_size += packet->headers_size();
          }
          packet_size_.Apply(1, packet_size);
          RTC_DCHECK_GE(packet->capture_time(), Timestamp::Zero());
          pacing_controller_.EnqueuePacket(std::move(packet));
        }
        MaybeProcessPackets(Timestamp::MinusInfinity());
      })

为什么要灵活计算包大小?
请添加图片描述

第二部分:线程检查与性能追踪

RTC_DCHECK_RUN_ON(task_queue_);
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
             "TaskQueuePacedSender::EnqueuePackets");

这就像工人上岗前的"安全检查"和"工作记录":请添加图片描述

第三部分:逐包处理循环

for (auto& packet : packets) {
  TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
               "TaskQueuePacedSender::EnqueuePackets::Loop",
               "sequence_number", packet->SequenceNumber(),
               "rtp_timestamp", packet->Timestamp());

这就像传送带上的"质检员"逐个检查包裹:
请添加图片描述

第四部分:统计更新

packet_size_.Apply(1, packet_size);

这就像快递公司的"实时统计系统":请添加图片描述

第五部分:调度控制器入队

这就像把处理好的包裹交给"专业调度系统":请添加图片描述

第六部分:立即处理尝试

MaybeProcessPackets(Timestamp::MinusInfinity());

这就像调度员说"看看能不能马上发货":请添加图片描述

EnqueuePackets方法总结

请添加图片描述
💡 体现的核心设计原则
请添加图片描述
现在我们来到了整个视频发送流程的"终极大脑":PacingController::EnqueuePacket。这是决定何时、如何发送每个RTP包的智能调度中心!

PacingController::EnqueuePacket 整体架构

请添加图片描述

第一部分:前置验证与安全检查

void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
  RTC_DCHECK(pacing_rate_ > DataRate::Zero())
      << "SetPacingRate must be called before InsertPacket.";
  RTC_CHECK(packet->packet_type());

这就像高速公路收费站的"入口检查":
请添加图片描述

第二部分:关键帧冲刷机制(超重要!)

if (keyframe_flushing_ &&
    packet->packet_type() == RtpPacketMediaType::kVideo &&
    packet->is_key_frame() && packet->is_first_packet_of_frame() &&
    !packet_queue_.HasKeyframePackets(packet->Ssrc())) {
  // First packet of a keyframe (and no keyframe packets currently in the
  // queue). Flush any pending packets currently in the queue for that stream
  // in order to get the new keyframe out as quickly as possible.
  packet_queue_.RemovePacketsForSsrc(packet->Ssrc());
  std::optional<uint32_t> rtx_ssrc =
      packet_sender_->GetRtxSsrcForMedia(packet->Ssrc());
  if (rtx_ssrc) {
    packet_queue_.RemovePacketsForSsrc(*rtx_ssrc);
  }
}

这就像医院急诊室的"紧急通道"机制:
请添加图片描述
为什么关键帧需要特殊处理?
请添加图片描述

第三部分:带宽探测通知

prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));

这就像告诉"网络测速员"有新的数据包要发送:
请添加图片描述

第四部分:时间预算管理(复杂但重要)

const Timestamp now = CurrentTime();
if (packet_queue_.Empty()) {
  // If queue is empty, we need to "fast-forward" the last process time,
  // so that we don't use passed time as budget for sending the first new
  // packet.
  Timestamp target_process_time = now;
  Timestamp next_send_time = NextSendTime();
  if (next_send_time.IsFinite()) {
    // There was already a valid planned send time, such as a keep-alive.
    // Use that as last process time only if it's prior to now.
    target_process_time = std::min(now, next_send_time);
  }
  UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time));
}

这就像银行的"信用额度管理系统":
请添加图片描述
时间预算管理的重要性
请添加图片描述

第五部分:包大小计算与入队

DataSize packet_size = DataSize::Bytes(packet->payload_size());
if (include_overhead_) {
    packet_size += DataSize::Bytes(packet->headers_size());
}
packet_queue_.Push(now, std::move(packet));
seen_first_packet_ = true;

这就像快递公司的"计费称重和入库":请添加图片描述

第六部分:流量统计监控(自定义扩展)

if (0 == last_instart_10ms_time_.ms()) {
   in_video_cnt_ = 0;
   in_data_sent_ = 0;
   in_cur_10ms_cnt_ = 0;
   in_max_10ms_cnt_ = 0;
   last_instart_10ms_time_ = now;
}

in_video_cnt_++;
in_cur_10ms_cnt_++;
in_data_sent_ += packet_size.bytes();
if (now - last_instart_10ms_time_ > TimeDelta::Millis(10)) {
  if (in_cur_10ms_cnt_ > in_max_10ms_cnt_) {
    in_max_10ms_cnt_ = in_cur_10ms_cnt_;
  }
  last_instart_10ms_time_ = now;
  in_cur_10ms_cnt_ = 0;
}

这就像工厂的"实时生产监控看板":请添加图片描述

第七部分:队列长度检查与自适应调整

// Queue length has increased, check if we need to change the pacing rate.
MaybeUpdateMediaRateDueToLongQueue(now);

这就像高速公路的"智能流量控制系统":请添加图片描述

现在我们来到了整个视频发送系统的"心脏":TaskQueuePacedSender::MaybeProcessPackets。这是真正决定何时发送包的核心调度方法!

// EnqueuePacket方法之后直接开始处理数据包发送
        MaybeProcessPackets(Timestamp::MinusInfinity());

MaybeProcessPackets 整体架构

请添加图片描述

第一部分:状态检查与重入保护

void TaskQueuePacedSender::MaybeProcessPackets(
    Timestamp scheduled_process_time) {
  RTC_DCHECK_RUN_ON(task_queue_);

  TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
               "TaskQueuePacedSender::MaybeProcessPackets");

  if (is_shutdown_ || !is_started_) {
    return;
  }

  // Protects against re-entry from transport feedback calling into the task
  // queue pacer.
  RTC_DCHECK(!processing_packets_);
  processing_packets_ = true;
  absl::Cleanup cleanup = [this] {
    RTC_DCHECK_RUN_ON(task_queue_);
    processing_packets_ = false;
  };

这就像工厂车间的"安全检查和门禁系统":
请添加图片描述

第二部分:时间计算与提前执行机制

Timestamp next_send_time = pacing_controller_.NextSendTime();
RTC_DCHECK(next_send_time.IsFinite());
const Timestamp now = clock_->CurrentTime();
TimeDelta early_execute_margin =
    pacing_controller_.IsProbing()
        ? PacingController::kMaxEarlyProbeProcessing
        : TimeDelta::Zero();

这就像精密的"列车调度系统":
请添加图片描述

第三部分:包处理循环(核心发送逻辑)

// Process packets and update stats.
while (next_send_time <= now + early_execute_margin) {
  pacing_controller_.ProcessPackets();
  next_send_time = pacing_controller_.NextSendTime();
  RTC_DCHECK(next_send_time.IsFinite());

  // Probing state could change. Get margin after process packets.
  early_execute_margin = pacing_controller_.IsProbing()
                             ? PacingController::kMaxEarlyProbeProcessing
                             : TimeDelta::Zero();
}

这就像"智能交通灯的发车控制":
请添加图片描述

第四部分:统计更新与任务调度管理

UpdateStats();

// Ignore retired scheduled task, otherwise reset `next_process_time_`.
if (scheduled_process_time.IsFinite()) {
  if (scheduled_process_time != next_process_time_) {
    return;
  }
  next_process_time_ = Timestamp::MinusInfinity();
}

这就像"工厂的班次交接和任务管理":
请添加图片描述

第五部分:延迟窗口计算(高级优化机制)

// Do not hold back in probing.
TimeDelta hold_back_window = TimeDelta::Zero();
if (!pacing_controller_.IsProbing()) {
  hold_back_window = max_hold_back_window_;
  DataRate pacing_rate = pacing_controller_.pacing_rate();
  if (max_hold_back_window_in_packets_ != kNoPacketHoldback &&
      !pacing_rate.IsZero() &&
      packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) {
    TimeDelta avg_packet_send_time =
        DataSize::Bytes(packet_size_.filtered()) / pacing_rate;
    hold_back_window =
        std::min(hold_back_window,
                 avg_packet_send_time * max_hold_back_window_in_packets_);
  }
}

这就像"智能快递配送的批量优化策略":请添加图片描述

第六部分:下次处理时间计算

// Calculate next process time.
TimeDelta time_to_next_process =
    std::max(hold_back_window, next_send_time - now - early_execute_margin);
next_send_time = now + time_to_next_process;

这就像"精密的时钟调度算法":
请添加图片描述

第七部分:定时任务调度(自我驱动机制)

// If no in flight task or in flight task is later than `next_send_time`,
// schedule a new one. Previous in flight task will be retired.
if (next_process_time_.IsMinusInfinity() ||
    next_process_time_ > next_send_time) {
  // Prefer low precision if allowed and not probing.
  task_queue_->PostDelayedHighPrecisionTask(
      SafeTask(
          safety_.flag(),
          [this, next_send_time]() { MaybeProcessPackets(next_send_time); }),
      time_to_next_process.RoundUpTo(TimeDelta::Millis(1)));
  next_process_time_ = next_send_time;
}

这就像"智能闹钟的自动设置系统":
请添加图片描述
💡 体现的核心设计原则
请添加图片描述

完整流程总结
请添加图片描述
🎉 完整视频发送技术栈总结
请添加图片描述

下一章我们详解平滑发送的详细控制逻辑,期待~~~

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐