在 ROS(Robot Operating System)中,动作(Action)、**话题(Topic)和服务(Service)**是三种主要的通信机制,每种机制都有其特定的用途和特点。

  • 如果需要传输连续的数据流(如传感器数据),使用话题

  • 如果需要执行短时间的任务并获取即时响应,使用服务

  • 如果需要执行长时间的任务,并且需要中间状态更新或任务取消功能,使用动作

一,话题(Topic)

通信机制

  • 话题是一种单向异步通信机制,基于**发布-订阅(Publish-Subscribe)**模型。

  • 发布者(Publisher)将消息发送到话题,订阅者(Subscriber)从话题中接收消息。

  • 消息的传输是连续的,发布者和订阅者之间没有直接的交互

特点

  • 单向通信:数据只能从发布者流向订阅者。

  • 异步:发布者和订阅者不需要同时在线。

  • 多对多通信:一个话题可以有多个发布者和多个订阅者。

  • 实时性:适合传输连续的数据流(如传感器数据)

使用场景

  • 传感器数据的传输(如激光雷达、摄像头数据)。

  • 机器人状态的广播(如位置、速度)。

 二,服务(Service)

通信机制

  • 服务是一种双向同步通信机制,基于**请求-响应(Request-Response)**模型。

  • 客户端(Client)发送请求,服务器(Server)处理请求并返回响应

  • 通信是同步的,客户端会等待服务器的响应

特点

  • 双向通信:客户端发送请求,服务器返回响应。

  • 同步:客户端会阻塞,直到收到服务器的响应。

  • 一对一通信:一个服务通常只有一个服务器。

  • 即时性:适合短时间完成的任务。

使用场景

  • 执行简单的计算任务(如加法运算)。

  • 查询机器人状态(如获取当前电量)。

  • 触发一次性操作(如开关灯)。

三,动作(Action) 

通信机制

  • 动作是一种双向异步通信机制,结合了话题和服务的优点。适用于需要反馈和取消操作的任务

    Action 的核心思想是允许客户端向服务器发送一个目标(Goal),服务器执行任务并定期向客户端发送反馈(Feedback),最后返回结果(Result)。

  • 动作通信包括三个部分:

    1. Goal(目标):

      • 客户端向服务器发送一个目标(Goal),告诉服务器需要执行的任务。

      • 例如,在导航任务中,目标可以是移动到某个位置。

    2. Feedback(反馈):

      • 服务器在执行任务的过程中,定期向客户端发送反馈(Feedback)。

      • 例如,在导航任务中,反馈可以是当前的位置或进度。

    3. Result(结果):

      • 当任务完成后,服务器向客户端发送结果(Result)。

      • 例如,在导航任务中,结果可以是任务是否成功完成。

  • 动作还支持任务取消功能。

特点

  • 双向通信:客户端发送目标,服务器返回反馈和结果。

  • 异步:客户端不需要等待任务完成,可以继续执行其他操作。

  • 长时间任务:适合执行需要较长时间的任务。

  • 可中断:任务可以被取消。

  • 中间状态:服务器可以持续发送反馈。

使用场景

  • 机器人导航(如移动到目标位置)。

  • 机械臂抓取(如执行抓取任务)。

  • 任何需要长时间运行、可中断、并且需要中间状态更新的任务。

Action 的通信模型基于 ROS 的 ActionLib 库。actionlib 是 ROS(Robot Operating System)中的一个重要库,用于实现**动作(Action)**机制。

actionlib 的核心组件

  • Action Client客户端,用于向服务器发送目标并接收反馈和结果。客户端可以取消目标或查询目标的状态。

  • Action Server服务器,接收目标并执行任务,同时发送反馈和结果。服务器可以接受或拒绝目标,并在执行过程中定期发送反馈。

  • Action Protocol(动作协议):Action 的通信基于 ROS 的 Topic 和 Service。具体来说,Action 使用以下 Topic 和 Service:

  1. Goal Topic:客户端向服务器发送目标。
  2. Feedback Topic:服务器向客户端发送反馈。
  3. Result Topic:服务器向客户端发送结果。

  4. Cancel Topic:客户端向服务器发送取消请求。

  5. Status Topic:服务器向客户端发送任务状态

actionlib 的工作流程

  1. 客户端发送目标

    • 客户端通过ActionClient向服务器发送一个目标(Goal)。服务器可以选择接受或拒绝目标

  2. 服务器接收目标

    • 服务器通过ActionServer接收目标,开始执行任务并定期向客户端发送反馈(Feedback)。反馈可以包含任务的进度、状态或其他信息。

  3. 服务器发送反馈

    • 在执行任务的过程中,服务器可以定期向客户端发送反馈(Feedback)。

  4. 任务完成或取消

    • 任务完成后,服务器发送结果(Result)给客户端。

    • 如果任务被取消,服务器会停止执行并通知客户端

    • 客户端可以随时发送取消请求,服务器会停止任务并返回结果。

  5. 客户端接收结果

    • 客户端接收最终结果或取消通知。

    • 结果可以包含任务的成功状态、最终数据等。

action 的状态机

状态信息是由 Action Server 发布的,Action Client 可以通过订阅 status 主题来获取这些状态信息。

Action 的状态机定义了任务的生命周期,包括以下状态:

  1. PENDING

    • 目标已发送,但服务器尚未开始处理。

  2. ACTIVE

    • 服务器正在处理目标,并定期发送反馈。

  3. PREEMPTED

    • 目标被取消或抢占。

  4. SUCCEEDED

    • 目标成功完成。

  5. ABORTED

    • 目标因错误而中止。

  6. REJECTED

    • 目标被服务器拒绝。

  7. LOST

    • 目标因通信问题丢失。

Action Client 通过订阅 status 主题或调用 get_state() 接口来获取任务的状态。如下代码示例

state = client.get_state()
def status_cb(status):
    for goal_status in status.status_list:
        rospy.loginfo(f"Goal ID: {goal_status.goal_id}, State: {goal_status.status}")

rospy.Subscriber('/fibonacci/status', GoalStatusArray, status_cb)

actionlib 的使用场景

  • 导航:机器人移动到目标位置的过程中,可以持续发送反馈(如当前位置)。

  • 抓取:机械臂执行抓取任务时,可以发送中间状态(如关节角度)。

  • 长时间任务:任何需要较长时间执行的任务,并且需要中间状态更新或取消功能。

SimpleActionClient类

先介绍一个重要的类SimpleActionClient,SimpleActionClient是 ROS 中用于与 Action Server 进行通信的客户端类。它封装了与 Action Server 的交互细节,使得客户端可以方便地发送目标(Goal)、接收反馈(Feedback)和结果(Result),并监控任务的状态。可以通过实例化对象的方式在你的代码中创建的一个对象,用于与 Action Server 进行交互。有SimpleActionClient肯定就有SimpleActionServer,它是用来构造服务端的类,由于我的导航任务move_base已经是一个现成的 Action Server所以不需要再构造Server端,如果你需要可以自行查找,它和SimpleActionClient差不多

客户端可以通过 SimpleActionClient 向 Action Server 发送目标,告诉服务器需要执行的任务。

SimpleActionClient初始化需要的参数

  1. Action Name

    • Action Server 的名称(即 Action 的命名空间)。

    • 例如,如果我要实现的是导航功能,那么第一个参数就填写move_base

  2. Action Type

    • Action 的类型,通常是一个 ROS 消息类型。

    • 例如,如果我要实现的是导航功能,就填写MoveBaseAction,它是来自move_base_msgs.msg的消息类型。

  3. 其他可选参数

    • spin_thread:是否在单独的线程中运行 ROS 的 spin() 函数。默认为 True,建议保持默认值。

SimpleActionClient 的常用方法 :

1,wait_for_server()

  • 等待 Action Server 启动。

  • 在发送目标之前,必须确保服务器已启动。

client.wait_for_server()

2,send_goal(goal, done_cb=None, active_cb=None, feedback_cb=None)

  • 发送目标(Goal)到服务器。

  • 可选参数:

    • done_cb:任务完成时的回调函数。

    • active_cb:任务激活时的回调函数。

    • feedback_cb:接收反馈时的回调函数。

3,get_state()

  • 获取当前任务的状态。

state = client.get_state()

4,get_result()

  • 获取任务的结果。

result = client.get_result()

5,cancel_goal()

  • 取消当前任务。

client.cancel_goal()

6,wait_for_result(timeout=rospy.Duration())

  • 等待任务完成,并返回是否成功。

success = client.wait_for_result()

使用代码示例

使用Action机制的一般步骤如下:以实现导航为例

  •  第一步:初始化(实例化)SimpleActionClient
import rospy
import actionlib
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from geometry_msgs.msg import Point, Quaternion

class NavigationClient:
    def __init__(self):
        # 初始化 Action Client,连接到 move_base 的 Action Server
        self.client = actionlib.SimpleActionClient('move_base', MoveBaseAction)
        rospy.loginfo("等待 move_base Action Server 启动...")
        self.client.wait_for_server()
        rospy.loginfo("move_base Action Server 已启动!")
  • 第二步:封装send_goal()
''' 
下面的send_goal() 方法封装了 SimpleActionClient 的 send_goal() 方法,并添加了回调函数
(done_callback)来处理任务完成后的逻辑。通过封装 send_goal() 方法,主程序可以更简洁地调用导
航功能,而不需要直接与 SimpleActionClient 交互。除此之外,封装send_goal()功能,可以在
send_goal()外实现同步导航异步导航等多种导航方式,最后将处理后的目标点给send_goal()即可
'''
def send_goal(self, target_pose):
        """
        发送导航目标
        :param target_pose: 目标点的位置和方向(字典格式)
        """
        # 构造 MoveBaseGoal 对象
        goal = MoveBaseGoal()
        goal.target_pose.header.frame_id = "map"  # 设置参考坐标系
        goal.target_pose.header.stamp = rospy.Time.now()
        goal.target_pose.pose.position = Point(**target_pose['position'])  # 设置位置
        goal.target_pose.pose.orientation = Quaternion(**target_pose['orientation'])  # 设置方向

        # 发送目标
        rospy.loginfo(f"发送目标点: {target_pose}")
        self.client.send_goal(goal, done_cb=self.done_callback, feedback_cb=self.feedback_callback)

# 任务完成时的回调函数(可选)
def done_callback(self, status, result):
        """
        任务完成时的回调函数
        :param status: 任务状态
        :param result: 任务结果
        """
        if status == actionlib.GoalStatus.SUCCEEDED:
            rospy.loginfo("导航成功!")
        else:
            rospy.loginfo(f"导航失败,状态码: {status}")

# 任务反馈时的回调函数(可选)
def feedback_callback(self, feedback):
        """
        任务反馈时的回调函数
        :param feedback: 反馈信息
        """
        rospy.loginfo(f"当前进度: {feedback.base_position}")


def wait_for_result(self):
        """
        等待任务完成
        """
        self.client.wait_for_result()
  • 第三步:main函数 
if __name__ == '__main__':
    try:
        # 初始化 ROS 节点
        rospy.init_node('navigation_client')

        # 创建 NavigationClient 对象
        navigator = NavigationClient()

        # 定义目标点
        target_pose = {
            'position': {'x': 1.0, 'y': 0.5, 'z': 0.0},  # 目标位置
            'orientation': {'x': 0.0, 'y': 0.0, 'z': 0.0, 'w': 1.0}  # 目标方向
        }

        # 发送目标点
        navigator.send_goal(target_pose)

        # 等待任务完成
        navigator.wait_for_result()

    except rospy.ROSInterruptException:
        rospy.logerr("程序被中断!")

最终代码 

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import rospy
import actionlib
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from geometry_msgs.msg import Point, Quaternion

class NavigationClient:
    def __init__(self):
        # 初始化 Action Client,连接到 move_base 的 Action Server
        self.client = actionlib.SimpleActionClient('move_base', MoveBaseAction)
        rospy.loginfo("等待 move_base Action Server 启动...")
        self.client.wait_for_server()
        rospy.loginfo("move_base Action Server 已启动!")

    def send_goal(self, target_pose):
        """
        发送导航目标
        :param target_pose: 目标点的位置和方向(字典格式)
        """
        # 构造 MoveBaseGoal 对象
        goal = MoveBaseGoal()
        goal.target_pose.header.frame_id = "map"  # 设置参考坐标系
        goal.target_pose.header.stamp = rospy.Time.now()
        goal.target_pose.pose.position = Point(**target_pose['position'])  # 设置位置
        goal.target_pose.pose.orientation = Quaternion(**target_pose['orientation'])  # 设置方向

        # 发送目标
        rospy.loginfo(f"发送目标点: {target_pose}")
        self.client.send_goal(goal, done_cb=self.done_callback, feedback_cb=self.feedback_callback)

    def done_callback(self, status, result):
        """
        任务完成时的回调函数
        :param status: 任务状态
        :param result: 任务结果
        """
        if status == actionlib.GoalStatus.SUCCEEDED:
            rospy.loginfo("导航成功!")
        else:
            rospy.loginfo(f"导航失败,状态码: {status}")

    def feedback_callback(self, feedback):
        """
        任务反馈时的回调函数
        :param feedback: 反馈信息
        """
        rospy.loginfo(f"当前进度: {feedback.base_position}")

    def wait_for_result(self):
        """
        等待任务完成
        """
        self.client.wait_for_result()

if __name__ == '__main__':
    try:
        # 初始化 ROS 节点
        rospy.init_node('navigation_client')

        # 创建 NavigationClient 对象
        navigator = NavigationClient()

        # 定义目标点
        target_pose = {
            'position': {'x': 1.0, 'y': 0.5, 'z': 0.0},  # 目标位置
            'orientation': {'x': 0.0, 'y': 0.0, 'z': 0.0, 'w': 1.0}  # 目标方向
        }

        # 发送目标点
        navigator.send_goal(target_pose)

        # 等待任务完成
        navigator.wait_for_result()

    except rospy.ROSInterruptException:
        rospy.logerr("程序被中断!")

对于导航任务,move_base 已经是一个现成的 Action Server。但如果你想自定义一个简单的 Action Server 来处理导航任务,可以按照以下步骤实现。 

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐