在ROS(Robot Operating System)中,如果发布频率(Publisher Rate)高于处理频率(Processing Rate),订阅者(Subscriber)会逐渐滞后,导致处理的始终是旧消息,而最新的消息可能会在队列中积压或被丢弃(取决于queue_size)。

1. 使用更智能的订阅逻辑

以下是一个基于最新消息的智能订阅方案,该方案在回调函数中只存储最新的消息,并在处理完成后继续处理最新的消息。

class MessageHandler:
    def __init__(self):
        self.latest_msg = None
        self.processing = False
        self.sub = rospy.Subscriber('/camera/image_raw', Image, self.callback, queue_size=1)
    
    def callback(self, msg):
        # 只保存最新消息,不立即处理
        self.latest_msg = msg
        
        # 如果没有正在处理,则启动处理
        if not self.processing:
            self.processing = True
            self.process_latest()
    
    def process_latest(self):
        while not rospy.is_shutdown():
            if self.latest_msg is None:
                self.processing = False
                return
                
            # 获取当前最新消息
            msg = self.latest_msg
            self.latest_msg = None
            
            # 处理消息(耗时操作)
            print("处理消息,时间戳:", msg.header.stamp.to_sec())
            rospy.sleep(4)  # 模拟耗时处理
            
            # 如果没有新消息了,退出处理循环
            if self.latest_msg is None:
                self.processing = False
                return

2. 使用独立的处理线程

为了确保消息处理不阻塞ROS回调线程,可以使用独立的处理线程,确保始终处理最新的消息。

import threading

class ThreadedProcessor:
    def __init__(self):
        self.lock = threading.Lock()
        self.latest_msg = None
        self.processing_thread = None
        self.sub = rospy.Subscriber('/camera/image_raw', Image, self.callback, queue_size=1)
    
    def callback(self, msg):
        with self.lock:
            # 始终更新为最新消息
            self.latest_msg = msg
            
        # 确保处理线程在运行
        if self.processing_thread is None or not self.processing_thread.is_alive():
            self.processing_thread = threading.Thread(target=self.processing_loop)
            self.processing_thread.daemon = True
            self.processing_thread.start()
    
    def processing_loop(self):
        while not rospy.is_shutdown():
            # 获取当前最新消息
            with self.lock:
                if self.latest_msg is None:
                    # 没有消息可处理,等待一会再检查
                    rospy.sleep(0.1)
                    continue
                
                msg = self.latest_msg
                self.latest_msg = None
            
            # 处理消息(耗时操作)
            print("处理消息,时间戳:", msg.header.stamp.to_sec())
            rospy.sleep(4)  # 模拟耗时处理

3. 使用消息过滤器和时间戳筛选

可以基于消息的时间戳进行筛选,避免处理过旧的消息。

import message_filters

def time_filtered_callback(msg):
    now = rospy.Time.now()
    delay = (now - msg.header.stamp).to_sec()
    
    if delay > 0.5:  # 如果消息延迟超过0.5秒则跳过
        rospy.logwarn(f"跳过旧消息,延迟 {delay:.2f}s")
        return
    
    # 处理最新消息
    print("处理消息,延迟:", delay)
    rospy.sleep(4)  # 模拟耗时处理

# 订阅消息,使用小队列和时间过滤
sub = rospy.Subscriber('/camera/image_raw', Image, time_filtered_callback, queue_size=1)

4. 降低发布频率以匹配处理能力

最简单的解决方案是降低发布频率,使其与处理能力相匹配:

# 将发布频率从1Hz降低到0.25Hz
rate = rospy.Rate(0.25)  # 每4秒发布一次

5. 使用ApproximateTime同步器

如果需要处理多个话题的数据,可以使用ApproximateTimeSynchronizer确保多个消息同步。

import message_filters

def sync_callback(image_msg, cloud_msg):
    # 处理同步的图像和点云消息
    print("处理同步消息,时间戳:", image_msg.header.stamp.to_sec())
    rospy.sleep(4)  # 模拟耗时处理

# 订阅器
image_sub = message_filters.Subscriber('/camera/image_raw', Image)
cloud_sub = message_filters.Subscriber('/lidar/points', PointCloud2)

# 使用ApproximateTime同步器
ts = message_filters.ApproximateTimeSynchronizer([image_sub, cloud_sub], queue_size=1, slop=0.1)
ts.registerCallback(sync_callback)

实际应用建议

  1. 评估优先级:是否所有消息都必须处理?还是只需最新状态?
  2. 监控系统:添加延迟监控,了解系统瓶颈。
  3. 混合方法:结合使用队列管理和多线程处理。
  4. 优化处理代码:减少每条消息的处理时间。

对于大多数应用,方案2(多线程处理)或方案3(时间戳筛选)是最佳选择。它们允许你保持较高的发布频率,同时确保总是处理最新的消息。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐