ROS是大家喜闻乐见的机器人操作系统,本Gist介绍一些基本用法
-
-
Save KuRRe8/b5252354ef7a377ef32c9d42de4e940f to your computer and use it in GitHub Desktop.
本教程将指导您如何在Ubuntu系统上安装和配置ROS 2,为后续的ROS 2开发打下坚实的基础。我们将主要关注ROS 2的LTS(长期支持)版本,例如Humble Hawksbill(对应Ubuntu 22.04)或Foxy Fitzroy(对应Ubuntu 20.04)。请根据您的Ubuntu版本选择合适的ROS 2发行版。
在开始之前,请确保您的系统满足基本要求:
- 操作系统: Ubuntu Linux。本教程将以Ubuntu 22.04 (Jammy Jellyfish) 为例,对应ROS 2 Humble Hawksbill。如果您使用的是Ubuntu 20.04 (Focal Fossa),则应选择ROS 2 Foxy Fitzroy。
- 区域设置 (Locale): 推荐使用支持UTF-8的区域设置。
- 系统更新: 建议在安装前更新您的系统包列表。
选择ROS 2发行版: ROS 2会定期发布新的版本,称为“发行版 (Distributions)”。每个发行版都有一个代号(如Humble, Foxy, Galactic, Iron)。LTS版本会获得更长时间的支持和更新。
- ROS 2 Humble Hawksbill: 推荐用于Ubuntu 22.04。
- ROS 2 Foxy Fitzroy: 推荐用于Ubuntu 20.04。
您可以访问 ROS 2官方文档 查看最新的发行版和支持的平台。
我们将安装 ros-<distro>-desktop
版本,它包含了ROS、RViz、demos、tutorials以及对2D/3D模拟器的支持。
确保您的系统支持UTF-8。如果您的系统区域设置不是UTF-8,请执行以下命令:
sudo apt update && sudo apt install locales
sudo locale-gen en_US en_US.UTF-8
sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
export LANG=en_US.UTF-8
您可以通过 locale
命令检查当前的区域设置。
您需要将ROS 2的APT仓库添加到您的系统中。
首先,确保Ubuntu Universe仓库已启用:
sudo apt install software-properties-common
sudo add-apt-repository universe
然后,添加ROS 2的GPG密钥:
sudo apt update && sudo apt install curl gnupg lsb-release
sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /usr/share/keyrings/ros-archive-keyring.gpg
接着,将ROS 2的仓库添加到您的sources list中。请将 <distro>
替换为您选择的ROS 2发行版 (例如 humble
或 foxy
):
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] http://packages.ros.org/ros2/ubuntu $(source /etc/os-release && echo $UBUNTU_CODENAME) main" | sudo tee /etc/apt/sources.list.d/ros2.list > /dev/null
更新您的APT包索引并安装ROS 2桌面版。同样,将 <distro>
替换为您选择的发行版:
sudo apt update
sudo apt upgrade # 可选,但建议更新已安装的包
sudo apt install ros-<distro>-desktop
例如,对于Humble版本:
sudo apt install ros-humble-desktop
安装过程可能需要一些时间,因为它会下载并安装大量的软件包。
colcon
是ROS 2推荐的构建工具。同时,rosdep
用于安装依赖项。
sudo apt install python3-colcon-common-extensions python3-rosdep
初始化 rosdep
(如果尚未初始化):
sudo rosdep init
rosdep update
安装完成后,您需要配置环境变量,以便ROS 2的命令能够被系统找到。每次打开新的终端时,都需要source这个setup文件。
将 <distro>
替换为您安装的ROS 2发行版:
source /opt/ros/<distro>/setup.bash
例如,对于Humble版本:
source /opt/ros/humble/setup.bash
为了避免每次打开新终端都手动执行此命令,您可以将其添加到您的 .bashrc
文件中:
echo "source /opt/ros/<distro>/setup.bash" >> ~/.bashrc
source ~/.bashrc
例如,对于Humble版本:
echo "source /opt/ros/humble/setup.bash" >> ~/.bashrc
source ~/.bashrc
注意: 如果您同时使用多个ROS发行版或需要在不同的ROS环境间切换,不建议将 source
命令添加到 .bashrc
。在这种情况下,您应该在需要使用特定ROS 2环境的终端中手动执行 source
命令。
打开一个新的终端 (如果已将source命令添加到 .bashrc
,则新终端会自动加载环境;否则,请手动source)。
尝试运行一些ROS 2命令来验证安装是否成功:
-
打印ROS发行版:
printenv ROS_DISTRO
这应该会输出您安装的ROS 2发行版的名称,例如
humble
。 -
运行一个简单的示例 (talker/listener): ROS 2提供了一些C++和Python的示例程序。
打开第一个终端,运行talker (发布者):
# 将 <distro> 替换为您安装的发行版,例如 humble ros2 run demo_nodes_cpp talker
或者Python版本的talker:
# 将 <distro> 替换为您安装的发行版,例如 humble ros2 run demo_nodes_py talker
您应该会看到类似 "Publishing: 'Hello World: X'" 的消息不断输出。
打开第二个终端 (确保也source了setup文件),运行listener (订阅者):
# 将 <distro> 替换为您安装的发行版,例如 humble ros2 run demo_nodes_cpp listener
或者Python版本的listener:
# 将 <distro> 替换为您安装的发行版,例如 humble ros2 run demo_nodes_py listener
您应该会看到类似 "I heard: [Hello World: X]" 的消息不断输出,与talker发布的消息对应。
如果这些命令都能正常工作,那么恭喜您,ROS 2已经成功安装并配置好了!
ROS 2的Python包通常通过APT安装,但有时您可能需要使用 pip
来安装一些ROS 2相关的Python库或您自己开发的Python包的依赖。
确保您的系统已安装Python 3和pip:
sudo apt install python3 python3-pip
大多数情况下,ROS 2安装过程中会处理好Python依赖。rosdep
工具也会帮助安装Python依赖项。
虚拟环境 (可选但推荐):
对于复杂的Python项目,使用Python虚拟环境(如 venv
或 conda
)是一个好习惯,可以隔离项目依赖,避免版本冲突。当在ROS 2工作区内开发Python节点时,您可以在工作区级别或包级别激活虚拟环境。
例如,使用 venv
:
# 在您的工作区根目录
python3 -m venv ros2_venv
source ros2_venv/bin/activate
# 在此虚拟环境中安装所需的Python包
pip install some_package
# 当您完成工作后,可以停用虚拟环境
# deactivate
当在虚拟环境中构建ROS 2工作区时,确保 colcon
能够找到正确的Python解释器和库。
ROS 2项目通常组织在称为“工作区 (Workspace)”的目录中。一个工作区可以包含一个或多个“功能包 (Packages)”。
创建工作区目录结构:
一个典型的工作区包含一个 src
目录,用于存放功能包的源代码。
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws
构建工作区:
在工作区的根目录下(例如 ~/ros2_ws
),您可以使用 colcon build
命令来编译工作区中的所有功能包。
colcon build
构建完成后,会在工作区根目录下生成 build
、install
和 log
目录。
Sourcing工作区:
为了让ROS 2系统能够找到您工作区中编译好的功能包和可执行文件,您需要source工作区 install
目录下的 setup.bash
文件:
source ~/ros2_ws/install/setup.bash
重要: Sourcing工作区的 setup.bash
文件会覆盖 (overlay) 系统级的ROS 2环境。这意味着ROS 2会优先查找和使用您工作区中的包。
至此,您已经完成了ROS 2的安装和基本环境配置,并对工作区的概念有了初步了解。在接下来的教程中,我们将开始创建自己的ROS 2功能包和节点。
在ROS 2中,代码通常组织在“功能包 (Packages)”内。一个功能包包含了运行一个或多个ROS 2节点所需的代码、配置文件、launch文件、消息定义等。本教程将指导您如何创建一个Python功能包,并在其中编写一个简单的发布者(Publisher)节点和一个订阅者(Subscriber)节点。
首先,确保您已经创建并进入了一个ROS 2工作区 (Workspace)。如果还没有,请参考上一篇教程创建工作区,例如 ~/ros2_ws
。
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src
现在,我们使用 ros2 pkg create
命令来创建一个新的Python功能包。我们将这个包命名为 my_first_package
。
ros2 pkg create --build-type ament_python --node-name my_publisher_node --node-name my_subscriber_node my_first_package
让我们分析一下这个命令:
ros2 pkg create
: 创建新包的命令。--build-type ament_python
: 指定包的构建类型为ament_python
,表示这是一个Python包。--node-name my_publisher_node
: (可选) 会在包内生成一个名为my_publisher_node.py
的骨架Python文件。--node-name my_subscriber_node
: (可选) 会在包内生成一个名为my_subscriber_node.py
的骨架Python文件。my_first_package
: 您要创建的功能包的名称。
执行后,src
目录下会生成一个 my_first_package
文件夹,其结构大致如下:
my_first_package/
├── my_first_package/ # Python模块目录
│ ├── __init__.py
│ ├── my_publisher_node.py # 骨架文件 (如果使用了 --node-name)
│ └── my_subscriber_node.py # 骨架文件 (如果使用了 --node-name)
├── resource/
│ └── my_first_package # 包标记文件,内容为空
├── setup.cfg
├── setup.py
├── package.xml
└── test/
├── test_copyright.py
├── test_flake8.py
└── test_pep257.py
package.xml
是一个清单文件,描述了包的元信息,如名称、版本、描述、维护者、许可证以及依赖关系。打开 my_first_package/package.xml
,您会看到类似以下的内容:
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>my_first_package</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="[email protected]">user</maintainer>
<license>TODO: License declaration</license>
<depend>rclpy</depend> <!-- 依赖rclpy库 -->
<depend>std_msgs</depend> <!-- 我们将使用标准消息 -->
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
重要:
<name>
: 包名,应与文件夹名一致。<version>
: 包的版本号。<description>
: 包的简短描述。<maintainer>
: 维护者的邮箱和名字。<license>
: 包的许可证,例如Apache-2.0
,BSD-3-Clause
,MIT
。<depend>
: 列出包的依赖项。对于Python节点,rclpy
是必需的。如果使用标准消息类型,还需要添加相应的消息包依赖,如std_msgs
。<export>
:<build_type>ament_python</build_type>
表明这是一个ament Python包。
请根据您的实际情况修改 description
, maintainer
, 和 license
字段。确保添加了 <depend>rclpy</depend>
和 <depend>std_msgs</depend>
(因为我们接下来会用到它们)。
setup.py
是Python包的标准配置文件,用于描述如何构建和安装包。对于ROS 2 Python包,它尤其重要,因为它定义了节点的可执行文件入口点。
打开 my_first_package/setup.py
:
from setuptools import find_packages, setup
package_name = 'my_first_package'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='user',
maintainer_email='[email protected]',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'my_publisher_node = my_first_package.my_publisher_node:main',
'my_subscriber_node = my_first_package.my_subscriber_node:main',
],
},
)
关键部分 entry_points
:
entry_points
下的 console_scripts
字典定义了当您使用 ros2 run
时,哪些Python脚本可以作为可执行节点运行。
'my_publisher_node = my_first_package.my_publisher_node:main'
表示:- 创建一个名为
my_publisher_node
的可执行脚本。 - 当运行这个脚本时,它会执行
my_first_package
模块(即my_first_package/my_first_package/
目录下的Python代码)中的my_publisher_node.py
文件里的main
函数。
- 创建一个名为
- 类似地,
my_subscriber_node
也是如此。
如果您没有使用 --node-name
选项创建包,或者想添加新的节点,您需要手动修改 entry_points
。
setup.cfg
文件通常用于为 setup.py
提供一些默认配置。对于简单的Python包,它可能很简单:
[develop]
script_dir=$base/lib/my_first_package
[install]
install_scripts=$base/lib/my_first_package
这有助于指定脚本的安装位置。
这是一个空文件,作为ament构建系统识别包的标记。
现在,我们来编辑 my_first_package/my_first_package/my_publisher_node.py
文件。如果该文件不存在或您想重新开始,可以创建一个新文件。
这个节点将周期性地发布一个字符串消息到名为 chatter
的主题上。
# my_first_package/my_first_package/my_publisher_node.py
import rclpy # ROS 2 Python客户端库
from rclpy.node import Node # Node类
from std_msgs.msg import String # 标准字符串消息类型
class MyPublisherNode(Node):
def __init__(self):
super().__init__('simple_publisher') # 初始化节点,并设置节点名
# 创建一个发布者,消息类型为String,主题名为"chatter",队列大小为10
self.publisher_ = self.create_publisher(String, 'chatter', 10)
timer_period = 0.5 # seconds (发布频率为2Hz)
# 创建一个定时器,每隔timer_period秒调用一次timer_callback方法
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0 # 计数器
def timer_callback(self):
msg = String() # 创建一个String消息对象
msg.data = f'Hello World ROS 2: {self.i}' # 填充消息数据
self.publisher_.publish(msg) # 发布消息
# 在控制台打印日志信息
self.get_logger().info(f'Publishing: "{msg.data}"')
self.i += 1
def main(args=None):
rclpy.init(args=args) # 初始化rclpy库
publisher_node = MyPublisherNode() # 创建节点实例
try:
rclpy.spin(publisher_node) # 保持节点运行,等待回调函数被调用
except KeyboardInterrupt:
pass # 允许Ctrl+C中断
finally:
# 销毁节点,释放资源
publisher_node.destroy_node()
rclpy.shutdown() # 关闭rclpy库
if __name__ == '__main__':
main()
代码解释:
import rclpy
,from rclpy.node import Node
,from std_msgs.msg import String
: 导入必要的模块。class MyPublisherNode(Node):
: 定义一个继承自Node
的类。super().__init__('simple_publisher')
: 调用父类的构造函数,并为节点命名为simple_publisher
。这个名称将在ROS网络中唯一标识此节点。self.create_publisher(String, 'chatter', 10)
: 创建一个发布者。String
: 发布的消息类型。'chatter'
: 主题名称。10
: QoS(服务质量)设置中的队列大小。如果发布速度过快,订阅者处理不过来,这里会缓存最多10条消息。
self.create_timer(0.5, self.timer_callback)
: 创建一个定时器,每0.5秒调用一次timer_callback
方法。timer_callback(self)
:msg = String()
: 创建消息对象。msg.data = ...
: 设置消息内容。self.publisher_.publish(msg)
: 发布消息。self.get_logger().info(...)
: 打印日志信息。
main(args=None)
: 程序的入口点。rclpy.init(args=args)
: 初始化ROS 2 Python客户端库。publisher_node = MyPublisherNode()
: 创建节点实例。rclpy.spin(publisher_node)
: 使节点保持活动状态,处理回调(如定时器回调)。程序会阻塞在这里,直到节点被关闭或Ctrl+C
被按下。publisher_node.destroy_node()
: 清理节点资源。rclpy.shutdown()
: 关闭rclpy。
if __name__ == '__main__': main()
: Python的标准写法,确保main()
只在直接运行此脚本时被调用。
接下来,编辑 my_first_package/my_first_package/my_subscriber_node.py
。这个节点将订阅 chatter
主题,并在收到消息时打印出来。
# my_first_package/my_first_package/my_subscriber_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MySubscriberNode(Node):
def __init__(self):
super().__init__('simple_subscriber') # 初始化节点,并设置节点名
# 创建一个订阅者,消息类型为String,主题名为"chatter"
# 当收到消息时,调用listener_callback方法
# 队列大小为10,与发布者匹配
self.subscription = self.create_subscription(
String,
'chatter',
self.listener_callback,
10)
self.subscription # prevent unused variable warning
def listener_callback(self, msg):
# 在控制台打印收到的消息
self.get_logger().info(f'I heard: "{msg.data}"')
def main(args=None):
rclpy.init(args=args) # 初始化rclpy库
subscriber_node = MySubscriberNode() # 创建节点实例
try:
rclpy.spin(subscriber_node) # 保持节点运行,等待回调函数被调用
except KeyboardInterrupt:
pass
finally:
# 销毁节点,释放资源
subscriber_node.destroy_node()
rclpy.shutdown() # 关闭rclpy库
if __name__ == '__main__':
main()
代码解释:
- 大部分结构与发布者节点类似。
self.create_subscription(String, 'chatter', self.listener_callback, 10)
: 创建一个订阅者。String
: 期望接收的消息类型。'chatter'
: 要订阅的主题名称(必须与发布者一致)。self.listener_callback
: 当收到消息时要调用的回调函数。10
: QoS队列大小。
listener_callback(self, msg)
:msg
: 接收到的消息对象,类型为std_msgs.msg.String
。self.get_logger().info(...)
: 打印收到的消息内容 (msg.data
)。
回到您的工作区根目录 (~/ros2_ws
),然后使用 colcon
构建您的包:
cd ~/ros2_ws
colcon build --packages-select my_first_package
colcon build
: 构建命令。--packages-select my_first_package
: (可选) 只构建指定的包,可以加快编译速度,尤其是在大型工作区中。如果工作区中只有这一个包,可以直接用colcon build
。
如果一切顺利,您应该会看到编译成功的消息。构建完成后,会在工作区根目录下生成 build
, install
, 和 log
目录。install
目录包含了编译好的包和可执行文件。
在运行节点之前,您需要source工作区的配置文件,以便ROS 2能够找到您刚刚编译的包和节点。打开一个新的终端,或者如果您当前的终端已经source过系统ROS 2环境,那么需要用工作区环境覆盖它。
# 在工作区根目录 (e.g., ~/ros2_ws)
source install/setup.bash
重要: 每次打开新终端并想运行此工作区中的节点时,都需要执行此 source
命令。您也可以将其添加到 .bashrc
,但要注意,如果同时处理多个工作区,这可能会导致混淆。
现在,我们可以运行这两个节点了。您需要两个终端。
在第一个终端,运行发布者节点:
ros2 run my_first_package my_publisher_node
您应该会看到类似以下的输出,每0.5秒打印一行:
[INFO] [simple_publisher]: Publishing: "Hello World ROS 2: 0"
[INFO] [simple_publisher]: Publishing: "Hello World ROS 2: 1"
...
在第二个终端 (确保也执行了 source install/setup.bash
),运行订阅者节点:
ros2 run my_first_package my_subscriber_node
您应该会看到类似以下的输出,与发布者发布的内容对应:
[INFO] [simple_subscriber]: I heard: "Hello World ROS 2: 0"
[INFO] [simple_subscriber]: I heard: "Hello World ROS 2: 1"
...
要停止节点,请在对应的终端按下 Ctrl+C
。
ROS 2提供了一些命令行工具来检查和调试正在运行的系统。
-
查看活动主题: 打开第三个终端 (source
install/setup.bash
),运行:ros2 topic list
您应该会看到
/chatter
和其他一些默认主题(如/parameter_events
,/rosout
)。 -
查看主题消息内容 (echo):
ros2 topic echo /chatter
这个命令会直接在终端打印
/chatter
主题上发布的消息。 -
查看节点图 (rqt_graph): 如果安装了
rqt_graph
(通常随ros-<distro>-desktop
安装),可以运行它来可视化节点和主题之间的连接:rqt_graph
在打开的图形界面中,您应该能看到
simple_publisher
节点通过chatter
主题连接到simple_subscriber
节点。
恭喜!您已经成功创建、编译并运行了您的第一个ROS 2 Python发布者和订阅者节点。这是理解ROS 2核心通信机制——主题(Topics)——的关键一步。
在ROS 2中,节点(Node)是执行计算的基本单元。一个机器人系统通常由多个协同工作的节点组成,每个节点负责一部分特定的功能,例如传感器数据处理、路径规划、电机控制等。本教程将深入探讨如何使用Python的rclpy
库创建和管理ROS 2节点,包括命名、日志记录以及与节点交互的命令行工具。
我们已经在之前的教程中接触过 rclpy.node.Node
类。它是创建任何ROS 2 Python节点的基础。
import rclpy
from rclpy.node import Node
class MyCustomNode(Node):
def __init__(self, node_name):
super().__init__(node_name)
self.get_logger().info(f"节点 '{self.get_name()}' 已启动!")
# 在这里可以添加节点的其他初始化代码,如创建发布者、订阅者等
def main(args=None):
rclpy.init(args=args)
# 创建一个名为 "my_node" 的节点
custom_node_instance = MyCustomNode('my_node')
try:
rclpy.spin(custom_node_instance)
except KeyboardInterrupt:
pass
finally:
custom_node_instance.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
rclpy.init(args=args)
: 初始化ROS 2客户端库。这是任何ROS 2程序开始时必须调用的第一步。super().__init__(node_name)
: 调用父类Node
的构造函数,并传入node_name
。这个node_name
将成为该节点在ROS计算图中的唯一标识符(在当前命名空间下)。self.get_name()
: 获取节点在初始化时设置的名称。self.get_namespace()
: 获取节点当前的命名空间。rclpy.spin(node)
: 使节点保持活动状态,处理其回调函数(如订阅回调、定时器回调等)。程序会在此处阻塞,直到节点被外部信号(如Ctrl+C)或内部逻辑关闭。node.destroy_node()
: 清理节点创建的资源,如发布者、订阅者等。这是一个良好的实践,在节点结束前调用。rclpy.shutdown()
: 关闭ROS 2客户端库,释放所有rclpy资源。
节点名称和命名空间对于组织复杂的ROS系统至关重要。它们有助于避免名称冲突,并允许逻辑上对节点进行分组。
节点名称是在创建节点时通过 Node
构造函数的第一个参数指定的。例如,Node('camera_driver')
创建一个名为 camera_driver
的节点。
- 唯一性: 在同一个命名空间内,节点名称必须是唯一的。
- 命名约定: 通常使用小写字母和下划线 (
snake_case
)。
命名空间为节点、主题、服务等ROS实体提供了一个作用域。一个节点总是存在于某个命名空间中。默认情况下,节点位于全局命名空间 (/
)。
获取当前命名空间和完全限定名称:
class NodeInfoNode(Node):
def __init__(self):
super().__init__('info_node_example') # 节点基础名
node_name = self.get_name() # 获取基础名称
namespace = self.get_namespace() # 获取当前命名空间
fully_qualified_name = self.get_fully_qualified_name() # 获取完整的节点名
self.get_logger().info(f"基础节点名: {node_name}")
self.get_logger().info(f"命名空间: {namespace}")
self.get_logger().info(f"完全限定节点名: {fully_qualified_name}")
# ... (main function as above, instantiating NodeInfoNode) ...
如何设置命名空间? 命名空间通常不是在节点代码内部直接设置的,而是通过以下方式在节点启动时指定:
-
Launch 文件: 这是最常见和推荐的方式。在Launch文件中启动节点时,可以为其指定
namespace
参数。# In a launch file (e.g., my_launch.py) from launch_ros.actions import Node ld.add_action( Node( package='my_package', executable='my_node_executable', name='my_base_name', # 节点的基础名 namespace='/robot1' # 设置命名空间 ) )
如果这样启动
NodeInfoNode
,它的get_namespace()
会返回/robot1
,get_fully_qualified_name()
会返回/robot1/info_node_example
。 -
命令行参数: 使用
ros2 run
时,可以使用--ros-args -r __ns:=/your_namespace
。ros2 run my_package my_node_executable --ros-args -r __ns:=/robot1
相对名称与全局名称:
- 当在节点内部创建发布者/订阅者时,如果主题名不以
/
开头,它就是相对名称,会自动加上节点的当前命名空间。例如,在/robot1
命名空间下的节点创建一个名为scan
的主题,其完全限定名将是/robot1/scan
。 - 如果主题名以
/
开头 (例如/global_map
),则它是一个全局名称,不会受节点命名空间的影响。
良好的日志记录对于调试和监控ROS系统至关重要。rclpy
提供了方便的日志记录API。
每个节点实例都有自己的日志记录器,可以通过 self.get_logger()
获取。
class LoggingDemoNode(Node):
def __init__(self):
super().__init__('logging_node')
# 获取节点的日志记录器
logger = self.get_logger()
# 不同级别的日志消息
logger.debug("这是一个调试信息。通常用于详细的开发信息。")
logger.info("这是一个常规信息。用于报告节点状态或重要事件。")
logger.warn("这是一个警告信息。表明可能存在问题,但不影响当前操作。")
logger.error("这是一个错误信息。表明发生了错误,可能影响节点功能。")
logger.fatal("这是一个致命错误信息。表明发生了严重错误,节点可能无法继续运行。")
# 也可以使用条件日志和一次性日志
if self.get_parameter('enable_verbose_logging').value: # 假设有这个参数
logger.info("详细日志已启用。", throttle_duration_sec=5) # 每5秒最多打印一次
logger.info("这条消息只会在第一次调用时打印。", once=True)
logger.info("这条消息只会在第一次调用时打印。", once=True) # 第二次不会打印
# ... (main function as above, instantiating LoggingDemoNode) ...
ROS 2 定义了以下日志级别(严重程度从低到高):
DEBUG
: 最详细的日志,通常在开发和调试时使用。INFO
: 标准信息,用于报告正常操作和状态。WARN
: 警告,表明可能出现问题或非预期情况,但系统仍能运行。ERROR
: 错误,表明节点遇到问题,某些功能可能无法正常工作。FATAL
: 致命错误,表明节点遇到严重问题,很可能无法继续运行。
默认情况下,终端通常显示 INFO
及以上级别的日志。
可以为特定节点或全局配置日志级别:
-
通过命令行 (针对特定节点): 当运行一个节点时,可以使用参数设置其日志级别:
# 将 my_node_name 的日志级别设置为 DEBUG ros2 run my_package my_node_executable --ros-args --log-level my_node_name:=debug
注意:这里的
my_node_name
是节点初始化时super().__init__('my_node_name')
中使用的名称。 -
通过
rqt_console
和rqt_logger_level
:rqt_console
是一个GUI工具,用于查看所有节点的日志输出。rqt_logger_level
允许您在运行时动态更改节点的日志级别。 -
通过 Launch 文件: 可以在Launch文件中为节点设置启动时的日志级别。
- 控制台 (stdout/stderr): 默认情况下,日志信息会输出到运行节点的终端。
INFO
及以下级别通常到stdout
,WARN
及以上级别到stderr
。 - ROS 2 日志目录: 日志也会被写入文件,通常位于
~/.ros/log
或工作区的log
目录下。这对于事后分析非常有用。
ROS 2 提供了丰富的命令行工具来检查和管理正在运行的节点。
列出当前ROS计算图中所有活动的节点及其命名空间。
ros2 node list
输出示例:
/
_ros2cli_daemon_0
my_node
logging_node
/robot1
my_other_node
这显示了在全局命名空间 /
下有 _ros2cli_daemon_0
(ROS 2 CLI的内部节点), my_node
, logging_node
,以及在 /robot1
命名空间下有 my_other_node
。
显示特定节点的详细信息,包括它发布的的主题、订阅的主题、提供的服务、使用的动作等。
ros2 node info /my_node
或者,如果节点在特定命名空间下:
ros2 node info /robot1/my_other_node
输出示例 (假设 my_node
是一个发布者和订阅者):
/my_node
Subscribers:
/chatter: std_msgs/msg/String
Publishers:
/parameter_events: rcl_interfaces/msg/ParameterEvent
/rosout: rcl_interfaces/msg/Log
/status_topic: std_msgs/msg/String
Service Servers:
/my_node/describe_parameters: rcl_interfaces/srv/DescribeParameters
/my_node/get_parameter_types: rcl_interfaces/srv/GetParameterTypes
# ... (other default parameter services)
Service Clients:
None
Action Servers:
None
Action Clients:
None
rclpy.spin(node)
会阻塞并持续处理节点的回调,直到节点关闭。但在某些情况下,您可能希望对节点的执行有更精细的控制,例如在主循环中执行一些非回调驱动的逻辑。这时可以使用 rclpy.spin_once(node, timeout_sec=None)
。
spin_once
会检查并执行一次当前挂起的回调(例如,一个已到达的订阅消息或一个到期的定时器),然后立即返回。
node
: 要处理回调的节点实例。timeout_sec
: 可选的超时时间(秒)。如果在这段时间内没有回调被执行,函数也会返回。如果为None
(默认),则如果没有挂起的回调,它会立即返回。
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import time
class SpinOnceDemoNode(Node):
def __init__(self):
super().__init__('spin_once_node')
self.publisher_ = self.create_publisher(String, 'heartbeat', 10)
self.subscription_ = self.create_subscription(
String, 'control_topic', self.control_callback, 10)
self.counter = 0
self.running = True
def control_callback(self, msg):
self.get_logger().info(f"Received control command: {msg.data}")
if msg.data == "stop":
self.running = False
def run_custom_loop(self):
while rclpy.ok() and self.running:
# 1. 执行一次回调检查
rclpy.spin_once(self, timeout_sec=0.1) # 0.1秒超时
# 2. 执行自定义逻辑
self.counter += 1
if self.counter % 10 == 0: # 每执行10次循环 (大约1秒)
heartbeat_msg = String()
heartbeat_msg.data = f"Node alive, count: {self.counter}"
self.publisher_.publish(heartbeat_msg)
self.get_logger().info("Published heartbeat.")
# 模拟一些其他工作
# time.sleep(0.01) # 注意:不要在这里用长时间阻塞的sleep,会影响回调响应
# spin_once 的 timeout_sec 已经起到了类似的作用
self.get_logger().info("Custom loop finished.")
def main(args=None):
rclpy.init(args=args)
spin_once_node = SpinOnceDemoNode()
try:
spin_once_node.run_custom_loop() # 调用包含spin_once的自定义循环
except KeyboardInterrupt:
spin_once_node.get_logger().info("Keyboard interrupt, shutting down.")
finally:
spin_once_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
何时使用 spin_once
?
- 当您需要在ROS回调之外执行周期性或条件性的任务。
- 当您想将ROS节点集成到具有自己主事件循环的现有应用程序中。
- 当您需要精确控制循环的频率,并确保在每个循环迭代中都能处理ROS事件。
注意事项:
spin_once
的timeout_sec
非常重要。如果设置得太小或为0,并且您的自定义逻辑执行很快,CPU使用率可能会很高。如果设置得太大,回调的响应可能会有延迟。- 确保
rclpy.ok()
在循环条件中,以便在ROS系统关闭时正确退出。
通过本教程,您应该对ROS 2节点有了更深入的理解,包括其创建、命名、日志记录以及如何通过命令行工具和 spin_once
进行更高级的控制。
在ROS 2中,主题(Topics)是一种核心的异步通信机制,它允许节点之间通过发布(Publishing)和订阅(Subscribing)消息(Messages)来进行数据交换。一个节点可以将数据发布到一个特定的主题上,而其他对这些数据感兴趣的节点可以订阅该主题以接收数据。本教程将深入探讨主题和消息的使用,包括标准消息类型、发布者和订阅者的创建与配置,以及相关的命令行工具。
发布/订阅模型是一种解耦的通信模式:
- 发布者 (Publisher): 生成数据并将其发送到一个命名的“主题”上,不关心是否有节点正在接收这些数据。
- 订阅者 (Subscriber): 声明对某个特定“主题”感兴趣,并接收该主题上发布的所有数据。
- 主题 (Topic): 充当消息的通道或总线,按名称标识。
- 消息 (Message): 结构化的数据单元,通过主题进行传输。消息具有特定的类型和字段定义。
关键特性:
- 多对多: 一个主题可以有多个发布者和多个订阅者。
- 异步: 发布者发送消息后不会等待订阅者接收或处理。
- 解耦: 发布者和订阅者之间没有直接的依赖关系,它们仅通过主题名称和消息类型进行匹配。
- 类型安全: 发布者和订阅者必须使用相同类型的消息才能在同一主题上通信。
ROS 2提供了一系列预定义的标准消息类型,用于常见的机器人应用场景。这些消息类型通常位于 std_msgs
, sensor_msgs
, geometry_msgs
, nav_msgs
等包中。
-
std_msgs
: 包含基本数据类型的消息,如:String
: 字符串Int32
,Int64
,Float32
,Float64
: 不同大小的整数和浮点数Bool
: 布尔值Header
: 包含时间戳 (stamp
) 和坐标系ID (frame_id
) 的元数据,常嵌入其他消息中。ColorRGBA
: RGBA颜色值Empty
: 空消息,用于触发事件等。
-
geometry_msgs
: 包含表示几何形状、姿态、速度等的消息,如:Point
: 3D点 (x, y, z)Vector3
: 3D向量Quaternion
: 四元数 (表示旋转)Pose
: 位姿 (位置Point
+姿态Quaternion
)PoseStamped
: 带时间戳和坐标系的位姿 (Header
+Pose
)Twist
: 线速度和角速度 (各是一个Vector3
)Transform
: 变换 (平移Vector3
+旋转Quaternion
)TransformStamped
: 带时间戳和坐标系的变换
-
sensor_msgs
: 包含来自各种传感器的消息,如:Image
: 图像数据LaserScan
: 2D激光雷达扫描数据PointCloud2
: 3D点云数据Imu
: 惯性测量单元数据 (加速度、角速度、姿态)NavSatFix
: GPS数据Temperature
: 温度数据Joy
: 游戏手柄/操纵杆输入
-
nav_msgs
: 包含用于导航的消息,如:Odometry
: 里程计数据 (位姿和速度,带协方差)Path
: 由一系列位姿组成的路径
要使用这些消息,您需要从相应的包中导入它们。例如:
from std_msgs.msg import String, Int32, Header
from geometry_msgs.msg import Point, Twist, PoseStamped
from sensor_msgs.msg import Image, LaserScan
消息对象在Python中表现为类实例,其字段是类的属性。
# 创建和填充一个String消息
str_msg = String()
str_msg.data = "Hello ROS 2 Topic!"
# 创建和填充一个Point消息
point_msg = Point()
point_msg.x = 1.0
point_msg.y = 2.5
point_msg.z = -0.5
# 创建和填充一个PoseStamped消息
pose_st_msg = PoseStamped()
pose_st_msg.header.stamp = Node().get_clock().now().to_msg() # 获取当前时间戳
pose_st_msg.header.frame_id = "map"
pose_st_msg.pose.position.x = 0.0
pose_st_msg.pose.orientation.w = 1.0 # 单位四元数 (无旋转)
注意: Node().get_clock().now().to_msg()
用于获取当前时间并转换为 builtin_interfaces.msg.Time
类型,这是 Header
消息中 stamp
字段所期望的。
发布者节点用于向特定主题发送消息。
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import time
class SimplePublisher(Node):
def __init__(self):
super().__init__('my_string_publisher')
# 参数:
# 1. 消息类型 (e.g., String)
# 2. 主题名称 (e.g., 'status_updates')
# 3. QoS Profile (服务质量配置) 或队列大小 (旧版API中是队列大小,现在推荐使用QoSProfile)
self.publisher_ = self.create_publisher(String, 'status_updates', 10)
self.timer_period = 1.0 # seconds
self.timer = self.create_timer(self.timer_period, self.publish_message)
self.message_count = 0
def publish_message(self):
msg = String()
msg.data = f"Current status: OK, count: {self.message_count}"
self.publisher_.publish(msg)
self.get_logger().info(f'Publishing: "{msg.data}"')
self.message_count += 1
def main(args=None):
rclpy.init(args=args)
publisher_node = SimplePublisher()
try:
rclpy.spin(publisher_node)
except KeyboardInterrupt:
pass
finally:
publisher_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
self.create_publisher(msg_type, topic_name, qos_profile_or_depth)
:
msg_type
: 要发布的消息的Python类,例如std_msgs.msg.String
。topic_name
: 主题的名称,字符串。命名约定:- 小写字母,数字,下划线。
- 可以是相对名称 (如
status_updates
),它会自动加上节点的命名空间。 - 也可以是全局名称 (如
/global_feed
),它不受节点命名空间影响。
qos_profile_or_depth
: 服务质量 (Quality of Service) 配置。- 可以直接传递一个整数作为队列深度 (e.g.,
10
),这会使用默认的QoS设置但指定历史记录深度。 - 更推荐的做法是传递一个
rclpy.qos.QoSProfile
对象,允许更细致地配置可靠性、持久性等。我们将在专门的QoS教程中详细讨论。目前,使用一个小的整数(如10)作为队列深度是常见的起点。
- 可以直接传递一个整数作为队列深度 (e.g.,
self.publisher_.publish(msg_object)
:
将一个已填充的消息对象 msg_object
发送到主题上。msg_object
的类型必须与创建发布者时指定的 msg_type
一致。
订阅者节点用于从特定主题接收消息。
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class SimpleSubscriber(Node):
def __init__(self):
super().__init__('my_string_subscriber')
# 参数:
# 1. 消息类型 (e.g., String)
# 2. 主题名称 (e.g., 'status_updates') - 必须与发布者匹配
# 3. 回调函数 (e.g., self.message_callback) - 当收到消息时调用
# 4. QoS Profile 或队列大小
self.subscription_ = self.create_subscription(
String,
'status_updates',
self.message_callback,
10)
self.subscription_ # prevent unused variable warning
def message_callback(self, msg):
# 'msg' 参数是接收到的消息对象,类型与订阅时指定的一致
self.get_logger().info(f'Received: "{msg.data}"')
def main(args=None):
rclpy.init(args=args)
subscriber_node = SimpleSubscriber()
try:
rclpy.spin(subscriber_node)
except KeyboardInterrupt:
pass
finally:
subscriber_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
self.create_subscription(msg_type, topic_name, callback_function, qos_profile_or_depth)
:
msg_type
: 期望接收的消息的Python类。topic_name
: 要订阅的主题名称。callback_function
: 一个函数,当订阅者收到此主题上的消息时会被调用。此函数的第一个参数将是接收到的消息对象。qos_profile_or_depth
: QoS配置,与发布者类似。为了成功通信,发布者和订阅者的QoS设置需要兼容。
回调函数是订阅者处理接收到消息的逻辑所在。
- 它必须接受一个参数,即接收到的消息对象。
- 回调函数应该尽量快速执行,避免长时间阻塞,因为它们通常在节点的
spin
循环中被调用。如果需要进行耗时操作,应考虑使用其他机制(如单独的线程,或动作)。
ROS 2提供了强大的命令行工具来检查、调试和与主题交互。
列出系统中当前所有活动的主题。
ros2 topic list
可以添加 -t
选项来同时显示每个主题的消息类型:
ros2 topic list -t
输出示例:
/parameter_events [rcl_interfaces/msg/ParameterEvent]
/rosout [rcl_interfaces/msg/Log]
/status_updates [std_msgs/msg/String]
显示特定主题的详细信息,包括其消息类型、发布者数量和订阅者数量。
ros2 topic info /status_updates
输出示例:
Type: std_msgs/msg/String
Publisher count: 1
Subscription count: 1
实时打印指定主题上发布的消息内容。这是一个非常有用的调试工具。
ros2 topic echo /status_updates
如果 SimplePublisher
正在运行,您会看到它发布的消息不断输出到终端。
您可以指定 --no-arr
来避免打印数组的每个元素,或者 --truncate-length <N>
来截断长消息。
从命令行直接发布一条消息到指定主题。这对于测试订阅者或触发某些行为非常方便。
<topic_name>
: 目标主题。<message_type>
: 要发布的消息类型 (例如std_msgs/msg/String
,geometry_msgs/msg/Twist
)。'<yaml_message_data>'
: YAML格式的消息内容。
示例:发布一个 String
消息:
ros2 topic pub --once /status_updates std_msgs/msg/String '{data: "Manual message from CLI"}'
--once
: 只发布一次然后退出。--rate <N>
: 以N Hz的频率持续发布。
示例:发布一个 Twist
消息 (用于控制机器人移动):
# 假设有一个机器人订阅 /cmd_vel 主题,类型为 geometry_msgs/msg/Twist
ros2 topic pub --rate 1 /cmd_vel geometry_msgs/msg/Twist '{linear: {x: 0.5, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.2}}'
注意YAML格式中的大括号和字段名称。您可以使用 ros2 interface show <message_type>
来查看消息的结构。
测量并显示指定主题的发布频率 (Hz)。
ros2 topic hz /status_updates
这对于检查发布者是否按预期速率发布数据很有用。
测量并显示指定主题的带宽占用 (bytes/sec)。
ros2 topic bw /status_updates
这在处理图像等大数据量主题时有助于分析网络负载。
虽然本教程主要关注使用标准消息,但ROS 2允许您定义自己的自定义消息类型。这通过在功能包中创建 .msg
文件来实现。我们将在专门的“自定义接口”教程中详细介绍如何创建和使用自定义消息。
一个简单的 .msg
文件示例 (my_interfaces/msg/MyCustomData.msg
):
# 这是一个自定义消息的例子
std_msgs/Header header # 可以包含标准消息类型
int32 sensor_id
float64 temperature
string[] data_points
这个文件定义了一个名为 MyCustomData
的消息,它包含一个 Header
,一个 sensor_id
,一个 temperature
,以及一个字符串数组 data_points
。
通过本教程,您应该对ROS 2中的主题和消息有了深入的理解,包括如何使用标准消息类型创建发布者和订阅者,以及如何使用命令行工具进行交互和调试。这是构建分布式机器人应用程序的核心通信机制。
除了主题(Topics)提供的异步发布/订阅通信外,ROS 2还提供了服务(Services)机制,用于同步的请求/响应(Request/Response)式通信。当一个节点(客户端)需要从另一个节点(服务端)获取某些信息或请求其执行某个操作并等待结果时,服务是一个理想的选择。本教程将深入探讨如何在Python中使用rclpy
创建服务客户端和服务端,以及相关的命令行工具。
服务通信模型与主题有显著不同:
- 客户端 (Client): 发起一个请求(Request)到特定的服务,并等待服务端的响应(Response)。
- 服务端 (Server): 监听特定服务的请求。当收到请求时,执行相应的处理逻辑,并返回一个响应给客户端。
- 服务 (Service): 由名称和类型唯一标识。服务类型定义了请求和响应消息的结构。
- 同步性: 从客户端的角度看,服务调用通常是阻塞的(或者通过异步机制模拟阻塞等待),直到收到服务端的响应或超时。服务端处理请求的过程也可能花费一定时间。
关键特性:
- 一对一 (通常): 一个服务请求通常由一个服务端处理并返回一个响应给发起请求的客户端。虽然可以有多个客户端调用同一个服务,或多个服务端提供同名服务(ROS 2通过DDS支持,但通常一个请求只会被一个服务端处理),但基本交互模式是点对点的。
- 阻塞/等待: 客户端发起请求后会等待响应。
- 面向任务/查询: 适用于需要获取计算结果、触发特定动作并确认完成、或查询状态等场景。
与消息类似,ROS 2也提供了一些标准的服务类型。服务类型定义在一个 .srv
文件中,该文件分为两部分,用 ---
分隔:上面是请求(Request)的结构,下面是响应(Response)的结构。
-
std_srvs
: 包含一些非常基础的服务类型,如:Empty.srv
: 请求和响应都为空。常用于触发一个事件并确认其已处理。# Request part is empty --- # Response part is empty
SetBool.srv
: 请求包含一个布尔值,响应包含一个成功标志和一条消息。bool data # 请求:要设置的布尔值 --- bool success # 响应:操作是否成功 string message # 响应:可选的描述信息
-
example_interfaces
: 这个包提供了许多用于教学和示例的接口,包括一些服务类型,例如:AddTwoInts.srv
: 请求包含两个整数,响应包含它们的和。int64 a int64 b --- int64 sum
要使用这些服务,您需要从相应的包中导入它们。服务类型在Python中会生成两个内部类:Request
和 Response
。
from std_srvs.srv import Empty, SetBool
from example_interfaces.srv import AddTwoInts
创建请求和响应对象:
# 创建一个 AddTwoInts 服务的请求对象
add_ints_request = AddTwoInts.Request()
add_ints_request.a = 5
add_ints_request.b = 10
# 创建一个 AddTwoInts 服务的响应对象 (通常在服务端回调中创建和填充)
add_ints_response = AddTwoInts.Response()
add_ints_response.sum = 15 # 假设这是计算结果
服务端节点用于监听特定服务的请求,并在收到请求时执行处理逻辑并返回响应。
# service_server_node.py
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts # 导入服务类型
class MinimalServiceServer(Node):
def __init__(self):
super().__init__('minimal_service_server')
# 参数:
# 1. 服务类型 (e.g., AddTwoInts)
# 2. 服务名称 (e.g., 'add_two_ints_service')
# 3. 回调函数 (e.g., self.add_two_ints_callback) - 当收到请求时调用
self.srv = self.create_service(
AddTwoInts,
'add_two_ints_service',
self.add_two_ints_callback)
self.get_logger().info('Service "add_two_ints_service" is ready.')
def add_two_ints_callback(self, request, response):
# 'request' 参数是接收到的请求对象 (e.g., AddTwoInts.Request)
# 'response' 参数是需要填充并返回的响应对象 (e.g., AddTwoInts.Response)
response.sum = request.a + request.b
self.get_logger().info(
f'Incoming request\na: {request.a} b: {request.b}\nResponding with sum: {response.sum}')
return response # 必须返回填充好的响应对象
def main(args=None):
rclpy.init(args=args)
service_server_node = MinimalServiceServer()
try:
rclpy.spin(service_server_node) # 保持节点运行以接收服务请求
except KeyboardInterrupt:
pass
finally:
service_server_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
self.create_service(srv_type, srv_name, callback_function)
:
srv_type
: 服务的Python类,例如example_interfaces.srv.AddTwoInts
。srv_name
: 服务的名称,字符串。命名约定与主题类似。callback_function
: 一个函数,当服务端收到此服务的请求时会被调用。
回调函数的签名通常是 def callback_name(self, request, response):
self
: 指向节点实例。request
: 客户端发送的请求对象。其类型是服务类型的.Request
内部类 (例如AddTwoInts.Request
)。您可以从此对象中读取请求数据。response
: 一个空的响应对象,由rclpy
在调用回调前创建。其类型是服务类型的.Response
内部类 (例如AddTwoInts.Response
)。您需要在此回调函数中填充此对象的字段。- 返回值: 回调函数必须返回填充好的
response
对象。
服务端回调函数应该高效地完成其工作,因为它们会阻塞客户端(除非客户端使用纯异步调用且不等待结果)。
服务客户端节点用于向特定服务发送请求并接收响应。
# service_client_node.py
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
import sys # 用于获取命令行参数
class MinimalServiceClient(Node):
def __init__(self):
super().__init__('minimal_service_client')
# 参数:
# 1. 服务类型 (e.g., AddTwoInts)
# 2. 服务名称 (e.g., 'add_two_ints_service') - 必须与服务端匹配
self.cli = self.create_client(AddTwoInts, 'add_two_ints_service')
# 等待服务可用,可以设置超时时间 (秒)
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service "add_two_ints_service" not available, waiting again...')
self.req = AddTwoInts.Request() # 创建请求对象
def send_request(self, a, b):
self.req.a = a
self.req.b = b
# 异步调用服务
# call_async返回一个Future对象,可用于稍后获取结果
self.future = self.cli.call_async(self.req)
self.get_logger().info(f'Sending request: a={a}, b={b}')
# 您可以选择在这里等待结果 (同步行为) 或在其他地方处理Future
# 为了简单起见,我们在这里等待
rclpy.spin_until_future_complete(self, self.future) # 会阻塞直到Future完成
if self.future.result() is not None:
response = self.future.result()
self.get_logger().info(f'Service response: sum = {response.sum}')
return response.sum
else:
self.get_logger().error(f'Service call failed: {self.future.exception()}')
return None
def main(args=None):
rclpy.init(args=args)
if len(sys.argv) != 3: # 期望两个命令行参数作为整数
print("Usage: ros2 run <package_name> <executable_name> <int_a> <int_b>")
return
service_client_node = MinimalServiceClient()
try:
a = int(sys.argv[1])
b = int(sys.argv[2])
result = service_client_node.send_request(a, b)
if result is not None:
print(f"Sum: {result}")
except ValueError:
service_client_node.get_logger().error("Invalid input. Please provide two integers.")
except KeyboardInterrupt:
pass
finally:
service_client_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
self.create_client(srv_type, srv_name)
:
srv_type
: 服务的Python类。srv_name
: 要调用的服务名称。
self.cli.wait_for_service(timeout_sec=1.0)
:
在尝试调用服务之前,检查该服务是否已在网络上可用是一个好习惯。此方法会阻塞,直到服务可用或超时。它返回 True
(服务可用) 或 False
(超时)。
self.future = self.cli.call_async(request_object)
:
request_object
: 一个填充好的请求对象 (例如AddTwoInts.Request
)。call_async()
立即返回一个rclpy.task.Future
对象。这表示服务调用是异步发起的。您不会立即得到服务响应。
要获取服务调用的结果,您需要处理 Future
对象:
- 阻塞等待:
rclpy.spin_until_future_complete(node, future_object, timeout_sec=None)
- 这个函数会阻塞当前线程(同时允许节点的其他回调继续处理,因为它会适当地spin节点),直到指定的
future_object
完成(即服务端已响应或发生错误)。 node
: 当前节点实例。future_object
: 从call_async()
返回的Future。timeout_sec
: 可选的等待超时时间。
- 这个函数会阻塞当前线程(同时允许节点的其他回调继续处理,因为它会适当地spin节点),直到指定的
- 检查Future状态:
future_object.done()
返回True
如果Future已完成。 - 获取结果:
future_object.result()
- 如果服务调用成功,返回响应对象 (例如
AddTwoInts.Response
)。 - 如果服务调用期间发生异常(例如服务不存在、服务端崩溃、网络问题),此方法会重新抛出该异常。
- 如果Future尚未完成,调用
result()
可能会阻塞或引发异常,具体取决于Future的实现。通常在done()
为True
后调用。
- 如果服务调用成功,返回响应对象 (例如
- 获取异常:
future_object.exception()
- 如果服务调用失败,返回异常对象。如果成功,则返回
None
。
- 如果服务调用失败,返回异常对象。如果成功,则返回
更纯粹的异步处理 (使用回调):
Future
对象还允许您添加一个完成时的回调函数:
# ... 在 MinimalServiceClient 中 ...
# def send_request_async_with_callback(self, a, b):
# self.req.a = a
# self.req.b = b
# self.future = self.cli.call_async(self.req)
# self.future.add_done_callback(self.handle_service_response)
# self.get_logger().info(f'Sending request: a={a}, b={b} (async with callback)')
# def handle_service_response(self, future):
# try:
# response = future.result()
# self.get_logger().info(f'Service response (from callback): sum = {response.sum}')
# except Exception as e:
# self.get_logger().error(f'Service call failed (from callback): {e}')
这种方式下,send_request
函数会立即返回,而响应将在未来的某个时刻由 handle_service_response
处理。节点需要持续 spin()
才能使这个回调被触发。
ROS 2提供了用于检查和与服务交互的命令行工具。
列出系统中当前所有可用的服务。
ros2 service list
可以添加 -t
选项来同时显示每个服务的类型:
ros2 service list -t
输出示例 (如果 MinimalServiceServer
正在运行):
/add_two_ints_service [example_interfaces/srv/AddTwoInts]
/minimal_service_server/describe_parameters [rcl_interfaces/srv/DescribeParameters]
# ... (其他默认参数服务)
显示特定服务的类型。
ros2 service type /add_two_ints_service
输出:
example_interfaces/srv/AddTwoInts
显示服务类型定义的结构(请求和响应部分)。
ros2 interface show example_interfaces.srv.AddTwoInts
输出:
int64 a
int64 b
---
int64 sum
从命令行直接调用一个服务并打印其响应。
<service_name>
: 目标服务。<service_type>
: 服务的类型。'<yaml_request_data>'
: YAML格式的请求数据。
示例:调用我们创建的 add_two_ints_service
:
ros2 service call /add_two_ints_service example_interfaces/srv/AddTwoInts '{a: 7, b: 8}'
输出示例:
requester: making request: example_interfaces.srv.AddTwoInts_Request(a=7, b=8)
response:
example_interfaces.srv.AddTwoInts_Response(sum=15)
这对于快速测试服务端非常有用。
与消息类似,您可以通过在功能包中创建 .srv
文件来定义自己的自定义服务类型。 .srv
文件结构如下:
# Request part
request_field1 type1
request_field2 type2
---
# Response part
response_field1 type3
response_field2 type4
---
(三个连字符) 用于分隔请求和响应的定义。我们将在专门的“自定义接口”教程中详细介绍如何创建和使用自定义服务。
通过本教程,您应该对ROS 2中的服务有了深入的理解,包括其请求/响应模型、如何创建服务端和客户端节点,以及如何使用命令行工具进行交互。服务是实现节点间同步通信和任务执行的关键机制。
除了主题(Topics)用于异步流式数据传输和服务(Services)用于同步请求/响应交互外,ROS 2还提供了动作(Actions)机制。动作适用于需要执行长时间运行的任务,并且在此过程中能够提供反馈(Feedback)以及最终结果(Result)的场景。客户端发起一个目标(Goal),动作服务端接受目标并开始执行,期间可以周期性地向客户端发送反馈,任务完成后返回一个最终结果。客户端也可以请求取消正在执行的目标。
动作通信模型包含以下几个关键部分:
- 动作客户端 (Action Client):
- 向动作服务端发送一个目标 (Goal),描述了要执行的任务。
- 可以选择接收来自服务端的周期性反馈 (Feedback),了解任务的当前进展。
- 等待并接收任务完成后的最终结果 (Result)。
- 可以请求取消 (Cancel) 一个已发送的目标。
- 动作服务端 (Action Server):
- 接收来自客户端的目标请求。
- 决定是否接受 (Accept) 或拒绝 (Reject) 该目标。
- 如果接受,开始执行目标任务。
- 在执行过程中,可以周期性地向客户端发布反馈。
- 任务完成后,向客户端发送最终结果 (成功、失败或被抢占)。
- 处理来自客户端的取消请求。
- 动作 (Action): 由名称和类型唯一标识。动作类型定义了目标、结果和反馈消息的结构。
- 交互流程:
- 客户端发送目标。
- 服务端响应目标(接受/拒绝)。
- 如果接受,服务端开始执行,并可能发送反馈。
- 服务端完成任务,发送结果。
与服务的区别:
- 反馈: 服务是一次性的请求/响应,没有中间反馈。动作允许在任务执行期间持续提供反馈。
- 可取消性: 动作提供了标准的取消机制。服务一旦调用,通常无法轻易中途取消。
- 长时间运行: 动作设计用于可能需要较长时间才能完成的任务(如导航到某个点、执行一个复杂的机械臂动作序列)。
与消息和服务类似,ROS 2也提供了一些标准动作类型,主要位于 action_tutorials_interfaces
(用于教学) 和其他特定功能的包中 (如 nav2_msgs
包含导航相关的动作)。
动作类型定义在一个 .action
文件中,该文件分为三部分,用 ---
分隔:
- 第一部分:目标 (Goal) 的结构。
- 第二部分:结果 (Result) 的结构。
- 第三部分:反馈 (Feedback) 的结构。
一个常见的教学示例是计算斐波那契数列,其动作定义(例如在 action_tutorials_interfaces/action/Fibonacci.action
)可能如下:
# Goal definition
int32 order # 要计算的斐波那契数列的阶数
---
# Result definition
int32[] sequence # 计算得到的斐波那契数列
---
# Feedback definition
int32[] partial_sequence # 当前已计算的部分斐波那契数列
要使用这些动作,您需要从相应的包中导入它们。动作类型在Python中会生成三个内部类:Goal
, Result
, 和 Feedback
。
# 假设我们有一个名为 action_tutorials_interfaces 的包,其中定义了 Fibonacci.action
from action_tutorials_interfaces.action import Fibonacci
创建目标、结果、反馈对象:
# 创建一个 Fibonacci 动作的目标对象
fib_goal = Fibonacci.Goal()
fib_goal.order = 10
# 创建一个 Fibonacci 动作的结果对象 (通常在服务端回调中创建和填充)
fib_result = Fibonacci.Result()
fib_result.sequence = [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55] # 示例结果
# 创建一个 Fibonacci 动作的反馈对象 (通常在服务端回调中创建和填充)
fib_feedback = Fibonacci.Feedback()
fib_feedback.partial_sequence = [0, 1, 1, 2] # 示例部分反馈
动作服务端节点用于接收目标请求,执行任务,并提供反馈和结果。
# action_server_node.py
import rclpy
from rclpy.action import ActionServer, CancelResponse, GoalResponse
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci # 导入动作类型
import time
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci, # 动作类型
'fibonacci', # 动作名称
execute_callback=self.execute_callback, # 主执行回调
goal_callback=self.goal_callback, # 目标接收回调 (可选)
handle_accepted_callback=self.handle_accepted_callback, # 目标接受后回调 (可选)
cancel_callback=self.cancel_callback # 取消请求回调 (可选)
)
self.get_logger().info('Fibonacci Action Server is ready.')
def goal_callback(self, goal_request):
"""决定是否接受或拒绝一个新的目标。"""
self.get_logger().info(f'Received goal request with order {goal_request.order}')
# 示例:只接受阶数大于0的目标
if goal_request.order <= 0:
self.get_logger().info('Rejecting goal: order must be > 0')
return GoalResponse.REJECT
self.get_logger().info('Accepting goal.')
return GoalResponse.ACCEPT
def handle_accepted_callback(self, goal_handle):
"""当一个目标被接受后,在执行回调之前调用。可以用于启动异步任务。"""
self.get_logger().info(f'Goal accepted (ID: {goal_handle.goal_id.uuid.tobytes()}). Executing...')
# 重要:必须显式调用 execute() 来启动 execute_callback
# 通常在新的线程中执行,以避免阻塞 ActionServer 的内部处理
goal_handle.execute() # 将在 ActionServer 的执行器中调度 execute_callback
def cancel_callback(self, cancel_request_handle):
"""决定是否接受或拒绝一个取消请求。"""
self.get_logger().info(f'Received cancel request for goal (ID: {cancel_request_handle.goal_id.uuid.tobytes()})')
# 示例:总是接受取消请求
return CancelResponse.ACCEPT # 也可以是 CancelResponse.REJECT
def execute_callback(self, goal_handle):
"""执行目标任务,发布反馈,并设置最终结果。"""
self.get_logger().info('Executing goal...')
feedback_msg = Fibonacci.Feedback()
feedback_msg.partial_sequence = [0, 1] # 初始斐波那契序列
# 检查目标是否已被请求取消
if goal_handle.is_cancel_requested:
goal_handle.canceled() # 将目标状态设置为CANCELED
self.get_logger().info('Goal canceled before execution.')
return Fibonacci.Result() # 返回一个空结果或表示取消的结果
order = goal_handle.request.order
if order == 1:
feedback_msg.partial_sequence = [0]
elif order > 1:
for i in range(1, order -1): # 从第三项开始计算
if goal_handle.is_cancel_requested: # 在长时间循环中定期检查取消请求
goal_handle.canceled()
self.get_logger().info('Goal canceled during execution.')
return Fibonacci.Result()
next_val = feedback_msg.partial_sequence[-1] + feedback_msg.partial_sequence[-2]
feedback_msg.partial_sequence.append(next_val)
goal_handle.publish_feedback(feedback_msg) # 发布反馈
self.get_logger().info(f'Feedback: {feedback_msg.partial_sequence}')
time.sleep(0.5) # 模拟耗时操作
# 检查目标是否在执行过程中被最终确定(例如被另一个目标抢占)
if not goal_handle.is_active:
self.get_logger().info('Goal is no longer active (e.g. preempted).')
return Fibonacci.Result() # 如果被抢占,通常也返回一个表示未完成的结果
goal_handle.succeed() # 将目标状态设置为SUCCEEDED
result = Fibonacci.Result()
result.sequence = feedback_msg.partial_sequence
self.get_logger().info(f'Returning result: {result.sequence}')
return result
def main(args=None):
rclpy.init(args=args)
fibonacci_action_server = FibonacciActionServer()
# 使用多线程执行器,以便 ActionServer 的内部处理和 execute_callback 不会互相阻塞
# 如果 execute_callback 是耗时的,这一点尤其重要。
executor = rclpy.executors.MultiThreadedExecutor()
rclpy.spin(fibonacci_action_server, executor=executor)
fibonacci_action_server.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
ActionServer(node, action_type, action_name, execute_callback, goal_callback=None, handle_accepted_callback=None, cancel_callback=None, result_timeout=10.0, ...)
:
node
: 宿主节点实例。action_type
: 动作的Python类 (例如Fibonacci
)。action_name
: 动作的名称 (字符串)。execute_callback
: 必需。当目标被接受并准备执行时调用的主回调函数。原型:def execute_cb(self, goal_handle)
。它必须返回一个结果对象 (例如Fibonacci.Result
)。goal_callback
: (可选) 当收到新的目标请求时调用。原型:def goal_cb(self, goal_request)
。它必须返回GoalResponse.ACCEPT
或GoalResponse.REJECT
。如果未提供,默认接受所有目标。handle_accepted_callback
: (可选) 当目标被goal_callback
接受后,在execute_callback
之前立即调用。原型:def handle_accepted_cb(self, goal_handle)
。它可以用来启动一个后台线程来执行耗时任务,然后在此回调中调用goal_handle.execute()
。cancel_callback
: (可选) 当收到取消请求时调用。原型:def cancel_cb(self, cancel_request_handle)
。它必须返回CancelResponse.ACCEPT
或CancelResponse.REJECT
。如果未提供,默认拒绝所有取消请求。result_timeout
: (可选,默认10秒) 如果execute_callback
返回后,客户端在指定时间内没有请求结果,则结果将被丢弃。
在 execute_callback
, handle_accepted_callback
中,goal_handle
对象是与特定目标实例交互的关键。它提供了以下方法:
goal_handle.request
: 包含客户端发送的目标数据 (例如Fibonacci.Goal
对象)。goal_handle.goal_id
: 目标的唯一ID。goal_handle.is_active
: 如果目标当前处于活动状态(正在执行),则为True
。goal_handle.is_cancel_requested
: 如果客户端已请求取消此目标,则为True
。goal_handle.publish_feedback(feedback_msg)
: 发布反馈消息。goal_handle.succeed()
: 标记目标成功完成。必须在execute_callback
返回结果之前调用。goal_handle.canceled()
: 标记目标已被取消。goal_handle.abort()
: 标记目标执行失败/中止。goal_handle.execute()
: (主要在handle_accepted_callback
中使用)调度execute_callback
的执行。
重要: execute_callback
通常应该在一个单独的线程中运行,以避免阻塞 ActionServer
的主处理循环(例如,它需要能够响应新的目标请求或取消请求)。rclpy.executors.MultiThreadedExecutor
可以帮助实现这一点,或者您可以在 handle_accepted_callback
中手动创建线程来运行 goal_handle.execute()
或实际的业务逻辑。如果 execute_callback
本身是异步的(例如使用 async def
并且 ActionServer
配置为使用支持 asyncio
的执行器),那么 это也可以。
动作客户端节点用于向动作服务端发送目标,并处理反馈和最终结果。
# action_client_node.py
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci # 导入动作类型
import sys
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self.get_logger().info('Waiting for Fibonacci action server...')
self._action_client.wait_for_server() # 等待服务器可用
self.get_logger().info(f'Sending goal request with order {order}...')
# send_goal_async 返回一个 Future 对象,该 Future 解析为一个 GoalHandle
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback) # 提供反馈回调
# 为 GoalHandle 的 Future 添加完成回调
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
"""处理来自服务器的对目标请求的初始响应(接受/拒绝)。"""
goal_handle = future.result() # 获取 GoalHandle
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
# GoalHandle 也有一个 Future,用于获取最终结果
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
"""处理从服务器获得的最终结果。"""
result = future.result().result # 第一个 .result() 是 Future 的结果 (WrappedResult), 第二个 .result 是 WrappedResult 内部的实际结果消息
status = future.result().status # 获取目标状态 (e.g., SUCCEEDED, CANCELED, ABORTED)
self.get_logger().info(f'Result received: sequence={result.sequence}, status={status}')
rclpy.shutdown() # 示例:收到结果后关闭节点
def feedback_callback(self, feedback_msg_wrapper):
"""处理从服务器收到的反馈。"""
feedback = feedback_msg_wrapper.feedback # 反馈消息本身在 .feedback 属性中
self.get_logger().info(f'Received feedback: {feedback.partial_sequence}')
def cancel_goal(self): # 示例:如何发送取消请求
if hasattr(self, '_send_goal_future') and self._send_goal_future.done():
goal_handle = self._send_goal_future.result()
if goal_handle.is_active: # 确保目标仍在执行
self.get_logger().info('Canceling goal...')
cancel_future = goal_handle.cancel_goal_async()
cancel_future.add_done_callback(self.cancel_done_callback)
else:
self.get_logger().info('Goal is not active, cannot cancel.')
else:
self.get_logger().info('No active goal to cancel or goal sending not complete.')
def cancel_done_callback(self, future):
cancel_response = future.result()
if len(cancel_response.goals_canceling) > 0:
self.get_logger().info('Goal cancellation request accepted.')
else:
self.get_logger().info('Goal cancellation request rejected or goal already finished.')
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
if len(sys.argv) > 1:
try:
order = int(sys.argv[1])
action_client.send_goal(order)
# 保持节点运行以接收回调
# 在实际应用中,你可能会有更复杂的逻辑或GUI事件循环
rclpy.spin(action_client)
except ValueError:
action_client.get_logger().error("Invalid order provided. Please provide an integer.")
except KeyboardInterrupt:
action_client.get_logger().info('User interrupted. Attempting to cancel goal if active.')
action_client.cancel_goal() # 尝试取消
# 给取消操作一些时间
# 在实际应用中,需要更优雅地处理关闭和取消的同步
time.sleep(1)
else:
action_client.get_logger().info("Please provide an 'order' argument for Fibonacci.")
action_client.destroy_node()
# rclpy.shutdown() 已经在 get_result_callback 中调用,或在 KeyboardInterrupt 后确保调用
if __name__ == '__main__':
main()
ActionClient(node, action_type, action_name, ...)
:
node
: 宿主节点实例。action_type
: 动作的Python类。action_name
: 要调用的动作名称。
self._action_client.send_goal_async(goal_msg, feedback_callback=None)
:
goal_msg
: 填充好的目标消息对象 (例如Fibonacci.Goal
)。feedback_callback
: (可选) 一个回调函数,当收到来自服务端的反馈时调用。原型:def feedback_cb(self, feedback_msg_wrapper)
。注意feedback_msg_wrapper
的.feedback
属性包含实际的反馈消息。- 返回值: 返回一个
Future
对象。这个Future
的结果 (future.result()
) 是一个ClientGoalHandle
对象。
ClientGoalHandle
用于与已发送到服务器的目标进行交互:
goal_handle.accepted
: (在send_goal_async
的Future解析后可用) 如果服务器接受了目标,则为True
。goal_handle.goal_id
: 目标的唯一ID。goal_handle.get_result_async()
: 异步请求此目标的最终结果。返回一个Future
对象。这个Future
的结果 (future.result()
) 是一个rclpy.action.client.ClientGoalHandle.WrappedResult
对象,它包含:.status
: 目标的最终状态 (一个枚举值,如GoalStatus.STATUS_SUCCEEDED
)。.result
: 实际的结果消息 (例如Fibonacci.Result
)。
goal_handle.cancel_goal_async()
: 异步请求取消此目标。返回一个Future
对象。这个Future
的结果是一个CancelGoal.Response
消息,指示取消请求是否被接受。
整个交互过程是异步的,涉及多个Future和回调。
ROS 2提供了用于检查和与动作交互的命令行工具。
列出系统中当前所有可用的动作。
ros2 action list
可以添加 -t
选项来同时显示每个动作的类型:
ros2 action list -t
输出示例 (如果 FibonacciActionServer
正在运行):
/fibonacci [action_tutorials_interfaces/action/Fibonacci]
显示特定动作的详细信息,包括其类型、动作服务端数量和动作客户端数量。
ros2 action info /fibonacci
显示动作类型定义的结构(目标、结果、反馈)。
ros2 interface show action_tutorials_interfaces.action.Fibonacci
从命令行直接发送一个目标到指定的动作服务端,并可选地打印反馈和最终结果。
<action_name>
: 目标动作。<action_type>
: 动作的类型。'<yaml_goal_data>'
: YAML格式的目标数据。
示例:发送一个 Fibonacci
目标:
ros2 action send_goal /fibonacci action_tutorials_interfaces/action/Fibonacci '{order: 5}' --feedback
--feedback
: 选项会使工具在目标执行期间打印反馈信息。
工具会显示目标的ID,是否被接受,然后是反馈(如果指定了 --feedback
),最后是结果和状态。您可以在工具运行时按 Ctrl+C
来尝试取消目标。
如前所述,您可以通过在功能包中创建 .action
文件来定义自己的自定义动作类型。文件结构:
# Goal definition
goal_field1 type1
...
---
# Result definition
result_field1 typeA
...
---
# Feedback definition
feedback_field1 typeX
...
三个 ---
分隔符分别界定目标、结果和反馈的定义。我们将在专门的“自定义接口”教程中详细介绍。
通过本教程,您应该对ROS 2中的动作有了深入的理解,包括其目标/反馈/结果模型、如何创建动作服务端和客户端节点,以及如何使用命令行工具进行交互。动作是实现具有反馈和可取消性的长时间运行任务的关键机制。
虽然ROS 2提供了丰富的标准消息(.msg
)、服务(.srv
)和动作(.action
)类型,但在许多实际应用中,您需要定义自己的数据结构来满足特定需求。这些自定义的数据结构被称为“接口”。本教程将指导您如何创建和使用自定义的ROS 2接口。
- 特定数据需求: 当标准接口无法精确或高效地表示您的数据时。例如,您可能需要一个包含特定机器人传感器组合读数的消息,或者一个执行特定设备控制序列的服务。
- 项目专用: 为您的项目定义清晰、自解释的数据结构,可以提高代码的可读性和可维护性。
- 模块化: 将数据结构定义与使用它们的节点分开,有助于更好地组织代码。
自定义接口通过在功能包中创建具有特定扩展名的文本文件来定义:
.msg
文件: 定义消息类型。.srv
文件: 定义服务类型。.action
文件: 定义动作类型。
这些文件使用一种简单的IDL(接口定义语言)风格的语法。
.msg
文件用于定义一个数据结构。每一行定义一个字段,格式为:类型 字段名
。
基本类型:
bool
byte
char
float32
,float64
int8
,uint8
,int16
,uint16
,int32
,uint32
,int64
,uint64
string
数组:
- 固定大小数组:
类型[N] 字段名
(例如int32[5] five_integers
) - 可变大小数组 (动态数组/向量):
类型[] 字段名
(例如float64[] sensor_readings
) - 有界可变大小数组:
类型[<=N] 字段名
(例如string[<=10] limited_strings
,最多10个字符串)
嵌套类型:
您可以包含其他已定义的消息类型 (标准或自定义),格式为:包名/消息名 字段名
。
例如:
std_msgs/Header header # 包含一个标准Header
geometry_msgs/Point position
my_package/OtherMessage nested_data # 包含同包或其他包的自定义消息
常量:
可以定义常量,格式为:类型 常量名=值
。
例如:
uint8 STATUS_OK=0
uint8 STATUS_WARNING=1
uint8 STATUS_ERROR=2
string DEFAULT_NAME="my_robot"
示例 .msg
文件 (learning_interfaces/msg/SensorData.msg
):
# learning_interfaces/msg/SensorData.msg
std_msgs/Header header # 时间戳和坐标系
int32 sensor_id
float64 temperature
float64 humidity
string status
.srv
文件定义了一个请求(Request)结构和一个响应(Response)结构,两者之间用三个连字符 ---
分隔。每一部分的语法与 .msg
文件相同。
示例 .srv
文件 (learning_interfaces/srv/ProcessData.srv
):
# learning_interfaces/srv/ProcessData.srv
# --- Request ---
learning_interfaces/SensorData input_data # 使用上面定义的自定义消息
bool perform_analysis
---
# --- Response ---
bool success
string result_message
float64[] processed_values
.action
文件定义了一个目标(Goal)结构、一个结果(Result)结构和一个反馈(Feedback)结构。它们之间也用三个连字符 ---
分隔。
示例 .action
文件 (learning_interfaces/action/ControlGripper.action
):
# learning_interfaces/action/ControlGripper.action
# --- Goal ---
int32 gripper_id
float32 target_position # 0.0 (open) to 1.0 (closed)
float32 speed
---
# --- Result ---
bool success
string final_status
---
# --- Feedback ---
float32 current_position
float32 effort_applied
通常,自定义接口会放在一个专门的功能包中,例如 my_interfaces
或 learning_interfaces
。这个包主要包含接口定义文件,而不一定包含可执行的节点代码(尽管也可以包含)。
步骤:
-
创建接口功能包: 如果您还没有一个接口包,可以创建一个。这个包的构建类型可以是
ament_cmake
或ament_python
,但即使是纯Python项目,为了生成接口的Python绑定,CMakeLists.txt
也是必需的。推荐使用ament_cmake
作为接口包的构建类型,因为它更直接地支持接口生成。cd ~/ros2_ws/src # 推荐方式:使用 ament_cmake ros2 pkg create --build-type ament_cmake learning_interfaces # 或者,如果希望包主要是Python,仍然需要CMakeLists.txt来处理接口 # ros2 pkg create --build-type ament_python learning_interfaces
-
创建接口目录: 在包内创建与接口类型对应的目录:
msg/
存放.msg
文件srv/
存放.srv
文件action/
存放.action
文件
cd learning_interfaces mkdir msg mkdir srv mkdir action
-
创建接口定义文件: 在相应的目录中创建您的
.msg
,.srv
,.action
文件。例如,在learning_interfaces/msg/
目录下创建SensorData.msg
文件,内容如上所示。
package.xml
文件需要声明对接口生成工具的依赖,并指明该包提供了接口。
打开 learning_interfaces/package.xml
并进行如下修改/添加:
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>learning_interfaces</name>
<version>0.0.0</version>
<description>Package for custom learning interfaces (messages, services, actions)</description>
<maintainer email="[email protected]">Your Name</maintainer>
<license>Apache-2.0</license> <!-- 或者您选择的许可证 -->
<!-- 构建工具依赖,用于生成接口代码 -->
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<!-- 如果接口中使用了标准消息类型,如 std_msgs/Header -->
<depend>std_msgs</depend>
<depend>geometry_msgs</depend> <!-- 如果使用了 geometry_msgs -->
<!-- <depend>action_msgs</depend> --><!-- 如果直接使用 action_msgs (通常由rosidl_default_generators处理) -->
<!-- 执行依赖,其他包使用此接口包时需要 -->
<exec_depend>rosidl_default_runtime</exec_depend>
<!-- 表明此包是接口包的一部分 -->
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type> <!-- 即使是Python包,如果主要目的是提供接口,ament_cmake更合适 -->
</export>
</package>
关键修改:
<buildtool_depend>rosidl_default_generators</buildtool_depend>
: 这是生成接口代码(C++, Python等)所必需的。<depend>...</depend>
: 如果您的自定义接口中嵌套了其他包的接口(例如std_msgs/Header
),需要在此处声明对那些包的依赖。<exec_depend>rosidl_default_runtime</exec_depend>
: 运行时依赖。<member_of_group>rosidl_interface_packages</member_of_group>
: 声明这是一个提供ROS IDL接口的包。
即使您的最终节点将使用Python,接口包的 CMakeLists.txt
也需要配置来生成接口代码,包括Python绑定。
打开 learning_interfaces/CMakeLists.txt
并进行如下修改/添加:
cmake_minimum_required(VERSION 3.8)
project(learning_interfaces)
# 默认C++标准
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 17)
endif()
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# 查找必要的包
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED) # 用于生成接口
find_package(std_msgs REQUIRED) # 如果您的接口使用了std_msgs
find_package(geometry_msgs REQUIRED) # 如果您的接口使用了geometry_msgs
# find_package(action_msgs REQUIRED) # 通常包含在rosidl_default_generators中
# === 定义接口文件 ===
# rosidl_generate_interfaces() 函数负责处理 .msg, .srv, .action 文件
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/SensorData.msg"
"srv/ProcessData.srv"
"action/ControlGripper.action"
# 如果接口依赖其他包的接口,需要列出这些依赖
DEPENDENCIES std_msgs geometry_msgs # action_msgs
)
# 安装配置和ament索引
ament_package()
关键修改:
find_package(rosidl_default_generators REQUIRED)
: 查找接口生成器。find_package(std_msgs REQUIRED)
等: 如果您的接口文件(.msg
,.srv
,.action
)中使用了来自其他包(如std_msgs
)的类型,那么rosidl_generate_interfaces
需要知道这些依赖。因此,需要find_package
这些依赖包。rosidl_generate_interfaces(${PROJECT_NAME} ...)
: 这是核心命令。- 第一个参数是当前包名 (
${PROJECT_NAME}
通常就是learning_interfaces
)。 - 后续参数是相对于包根目录的接口文件路径列表 (例如
"msg/SensorData.msg"
)。 DEPENDENCIES
关键字后列出此接口包在生成代码时需要依赖的其他接口包 (例如std_msgs
,geometry_msgs
)。这些必须是之前find_package
找到的包。
- 第一个参数是当前包名 (
完成 package.xml
和 CMakeLists.txt
的修改后,回到您的工作区根目录 (~/ros2_ws
) 并构建接口包:
cd ~/ros2_ws
colcon build --packages-select learning_interfaces
如果构建成功,ROS 2的构建系统会在 install/learning_interfaces/
目录下生成各种语言的接口代码,包括Python模块,以便您可以在Python节点中导入和使用它们。
重要: 构建接口包后,您需要source工作区的 setup.bash
文件,以便ROS 2系统(和您的Python环境)能够找到新生成的接口模块。
source ~/ros2_ws/install/setup.bash
或者,如果您之前已经source过,重新source或打开一个新终端。
一旦接口包成功构建并且环境已source,您就可以像使用标准接口一样在Python节点中导入和使用它们了。
假设我们有一个名为 my_python_pkg
的Python功能包,它依赖于我们刚刚创建的 learning_interfaces
包。
在 my_python_pkg/package.xml
中添加对 learning_interfaces
的依赖:
<!-- ...其他内容... -->
<depend>learning_interfaces</depend>
<!-- ...其他内容... -->
发布者节点 (my_python_pkg/my_python_pkg/custom_sensor_publisher.py
):
import rclpy
from rclpy.node import Node
from learning_interfaces.msg import SensorData # 导入自定义消息
from std_msgs.msg import Header # 如果需要填充Header
class CustomSensorPublisher(Node):
def __init__(self):
super().__init__('custom_sensor_publisher')
self.publisher_ = self.create_publisher(SensorData, 'custom_sensor_topic', 10)
self.timer = self.create_timer(1.0, self.timer_callback)
self.sensor_id_counter = 0
def timer_callback(self):
msg = SensorData()
# 填充Header
msg.header = Header()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'sensor_frame'
# 填充自定义字段
msg.sensor_id = self.sensor_id_counter
msg.temperature = 25.5 + float(self.sensor_id_counter % 10) / 10.0
msg.humidity = 60.0 - float(self.sensor_id_counter % 5)
msg.status = "OK"
self.publisher_.publish(msg)
self.get_logger().info(f'Publishing SensorData: ID={msg.sensor_id}, Temp={msg.temperature:.1f}')
self.sensor_id_counter += 1
def main(args=None):
rclpy.init(args=args)
node = CustomSensorPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
订阅者节点 (my_python_pkg/my_python_pkg/custom_sensor_subscriber.py
):
import rclpy
from rclpy.node import Node
from learning_interfaces.msg import SensorData # 导入自定义消息
class CustomSensorSubscriber(Node):
def __init__(self):
super().__init__('custom_sensor_subscriber')
self.subscription = self.create_subscription(
SensorData,
'custom_sensor_topic',
self.listener_callback,
10)
def listener_callback(self, msg):
self.get_logger().info(
f'Received SensorData:\n'
f' Header: stamp={msg.header.stamp.sec}.{msg.header.stamp.nanosec}, frame_id="{msg.header.frame_id}"\n'
f' ID: {msg.sensor_id}\n'
f' Temperature: {msg.temperature:.2f}\n'
f' Humidity: {msg.humidity:.2f}\n'
f' Status: "{msg.status}"'
)
def main(args=None):
rclpy.init(args=args)
node = CustomSensorSubscriber()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
服务端节点 (my_python_pkg/my_python_pkg/custom_process_server.py
):
import rclpy
from rclpy.node import Node
from learning_interfaces.srv import ProcessData # 导入自定义服务
from learning_interfaces.msg import SensorData # 可能需要,如果服务请求包含它
class CustomProcessServer(Node):
def __init__(self):
super().__init__('custom_process_server')
self.srv = self.create_service(ProcessData, 'process_sensor_data', self.process_data_callback)
self.get_logger().info("Custom ProcessData service is ready.")
def process_data_callback(self, request, response):
self.get_logger().info(
f'Received ProcessData request:\n'
f' Sensor ID: {request.input_data.sensor_id}\n'
f' Temperature: {request.input_data.temperature}\n'
f' Perform Analysis: {request.perform_analysis}'
)
if request.perform_analysis:
# 模拟数据处理
response.processed_values = [request.input_data.temperature * 2.0, request.input_data.humidity / 2.0]
response.result_message = "Analysis complete."
response.success = True
else:
response.processed_values = []
response.result_message = "Analysis not requested."
response.success = True
return response
def main(args=None):
rclpy.init(args=args)
node = CustomProcessServer()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
使用自定义动作与使用自定义消息和服务类似,您只需从 learning_interfaces.action
导入 ControlGripper
,然后像使用标准动作一样创建 ActionServer
和 ActionClient
。
关键导入:
from learning_interfaces.action import ControlGripper
然后,在 ActionServer
的构造函数和 ActionClient
的构造函数中使用 ControlGripper
作为动作类型。目标、结果和反馈对象将分别是 ControlGripper.Goal
, ControlGripper.Result
, ControlGripper.Feedback
。
-
确保
my_python_pkg
的setup.py
正确配置了入口点 (entry points) 以运行这些Python节点。 -
回到工作区根目录 (
~/ros2_ws
)。 -
构建
my_python_pkg
(如果learning_interfaces
之前已构建并source过,则无需再次构建它,除非接口有变动)。colcon build --packages-select my_python_pkg
-
Source工作区环境:
source install/setup.bash
-
在不同终端运行您的发布者/订阅者或服务端/客户端节点。
例如,运行自定义消息的例子:
- 终端1:
ros2 run my_python_pkg custom_sensor_publisher
- 终端2:
ros2 run my_python_pkg custom_sensor_subscriber
您也可以使用
ros2 topic echo /custom_sensor_topic
来查看消息内容,或使用ros2 service call /process_sensor_data learning_interfaces/srv/ProcessData "{input_data: {sensor_id: 1, temperature: 22.0, humidity: 55.0, status: 'test'}, perform_analysis: true}"
来调用服务。 - 终端1:
通过本教程,您学会了如何定义、构建和使用自定义的ROS 2消息、服务和动作接口,这是根据特定应用需求定制ROS 2系统的重要一步。记住,接口定义包的 CMakeLists.txt
对于生成Python绑定至关重要。
在ROS 2中,参数(Parameters)是一种允许节点在启动时或运行时进行配置的机制。它们使得节点行为更加灵活和可重用,因为可以通过外部方式调整节点的内部设置,而无需修改代码并重新编译。本教程将深入探讨如何在Python节点中使用参数,包括声明、获取、设置参数,响应参数变化,以及使用YAML文件进行配置和相关的命令行工具。
参数对于ROS 2节点至关重要,原因如下:
- 可配置性: 允许用户或系统根据不同环境或需求调整节点的行为。例如,相机节点的曝光时间、控制算法的增益值、传感器的滤波阈值等。
- 可重用性: 同一个节点可以通过不同的参数配置在多种场景下使用。
- 运行时调整: 某些参数可以在节点运行时动态修改,方便调试和优化。
- 分离配置与代码: 将配置信息(如参数值)从代码中分离出来,通常存储在YAML文件中,使得配置管理更加清晰。
每个ROS 2节点都自动拥有一个参数服务端,用于管理其自身的参数。
在使用参数之前,节点必须首先声明它们。声明参数时,可以指定参数的名称、默认值以及一个可选的参数描述符(ParameterDescriptor
),用于定义参数的类型、描述、约束等。
# parameter_basics_node.py
import rclpy
from rclpy.node import Node
from rcl_interfaces.msg import ParameterDescriptor, ParameterType, FloatingPointRange, IntegerRange
from rclpy.exceptions import ParameterNotDeclaredException
class ParameterBasicsNode(Node):
def __init__(self):
super().__init__('parameter_basics_node')
# 1. 声明一个简单的字符串参数带默认值
self.declare_parameter('my_string_param', 'default_string_value')
# 2. 声明一个整型参数带默认值和描述
int_descriptor = ParameterDescriptor(
type=ParameterType.PARAMETER_INTEGER,
description='An example integer parameter.',
# 附加约束:整数范围
integer_range=[IntegerRange(from_value=0, to_value=100, step=1)]
)
self.declare_parameter('my_int_param', 42, int_descriptor)
# 3. 声明一个双精度浮点型参数带默认值和描述
double_descriptor = ParameterDescriptor(
type=ParameterType.PARAMETER_DOUBLE,
description='A double parameter for some gain.',
# 附加约束:浮点数范围
floating_point_range=[FloatingPointRange(from_value=0.0, to_value=1.0)]
)
self.declare_parameter('my_double_param', 0.5, double_descriptor)
# 4. 声明一个布尔参数
bool_descriptor = ParameterDescriptor(
type=ParameterType.PARAMETER_BOOL,
description='A boolean flag to enable/disable a feature.'
)
self.declare_parameter('my_bool_param', True, bool_descriptor)
# 5. 声明一个只读参数 (声明后不能通过服务或回调修改)
readonly_descriptor = ParameterDescriptor(
description='This parameter is read-only after declaration.',
read_only=True # 关键
)
self.declare_parameter('readonly_param', 'cannot_change_me', readonly_descriptor)
self.get_logger().info("Parameters declared.")
# 尝试获取未声明的参数会引发异常
try:
undeclared_param = self.get_parameter('non_existent_param')
except ParameterNotDeclaredException:
self.get_logger().warn("Tried to get 'non_existent_param', which was not declared.")
def main(args=None):
rclpy.init(args=args)
node = ParameterBasicsNode()
# 节点将保持运行,允许外部通过服务查询/设置参数
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
declare_parameter(name, default_value=None, descriptor=ParameterDescriptor(), ignore_override=False)
:
name
(str): 参数的名称。default_value
: 参数的默认值。如果启动时没有提供其他值,则使用此默认值。如果default_value
是None
且没有外部提供值,则参数类型为PARAMETER_NOT_SET
。descriptor
(ParameterDescriptor
): 参数的元数据和约束。ignore_override
(bool): 如果为True
,即使外部提供了参数覆盖(例如通过Launch文件),节点也会使用代码中指定的default_value
。通常保持为False
。
可以一次性声明多个参数,通常用于同一命名空间下的参数组。
# ... (在 ParameterBasicsNode 中) ...
# 6. 声明一组参数,使用命名空间
namespace = 'motion_control'
self.declare_parameters(
namespace=namespace,
parameters=[
('max_velocity', 1.5, ParameterDescriptor(type=ParameterType.PARAMETER_DOUBLE, description="Max linear velocity")),
('max_acceleration', 0.5, ParameterDescriptor(type=ParameterType.PARAMETER_DOUBLE, description="Max linear acceleration")),
(namespace + '.pid.kp', 1.2), # 也可以直接在名称中包含命名空间
(namespace + '.pid.ki', 0.1),
(namespace + '.pid.kd', 0.05)
]
)
self.get_logger().info(f"Parameters in namespace '{namespace}' declared.")
declare_parameters(namespace, parameters)
:
namespace
(str): 应用于此批次所有参数的命名空间前缀。参数的最终名称将是namespace.parameter_name
。parameters
(list of tuples): 每个元组可以是(name, default_value)
或(name, default_value, descriptor)
。
ParameterDescriptor
用于提供关于参数的更多信息和约束:
type
(ParameterType
枚举): 指定参数的预期类型,如PARAMETER_BOOL
,PARAMETER_INTEGER
,PARAMETER_DOUBLE
,PARAMETER_STRING
,PARAMETER_BYTE_ARRAY
,PARAMETER_BOOL_ARRAY
,PARAMETER_INTEGER_ARRAY
,PARAMETER_DOUBLE_ARRAY
,PARAMETER_STRING_ARRAY
。如果声明时提供了默认值,类型通常可以从默认值推断出来,但显式指定类型更安全。name
(str): 参数的名称 (通常由declare_parameter
自动填充)。description
(str): 对参数用途的人类可读描述。additional_constraints
(str): 附加的文本约束描述。read_only
(bool): 如果为True
,则参数在声明后不能通过参数服务或回调函数修改。dynamic_typing
(bool): 如果为True
,允许参数类型在运行时改变(通常不推荐)。floating_point_range
(list ofFloatingPointRange
): 约束浮点参数的取值范围。integer_range
(list ofIntegerRange
): 约束整型参数的取值范围。IntegerRange
包含from_value
,to_value
,step
。
声明参数后,可以在节点代码中获取其当前值。
# ... (在 ParameterBasicsNode 中) ...
def get_and_log_parameters(self): # 可以在初始化后或定时器中调用
# 获取单个参数
my_str = self.get_parameter('my_string_param').value
my_int = self.get_parameter('my_int_param').value
# 或者直接获取值
my_double = self.get_parameter_value('my_double_param')
self.get_logger().info(f"Retrieved parameters: string='{my_str}', int={my_int}, double={my_double:.2f}")
# 获取带命名空间的参数
max_vel = self.get_parameter('motion_control.max_velocity').value
kp = self.get_parameter('motion_control.pid.kp').value
self.get_logger().info(f"Motion control: max_vel={max_vel}, Kp={kp}")
# 批量获取参数
param_names_to_get = ['my_string_param', 'my_int_param', 'motion_control.pid.ki']
retrieved_params = self.get_parameters(param_names_to_get)
for param in retrieved_params:
self.get_logger().info(f"Bulk get: {param.name} = {param.value} (type: {param.type_})")
# ... (在 main 中,可以调用 node.get_and_log_parameters() 一次) ...
# node = ParameterBasicsNode()
# node.get_and_log_parameters() # 调用一次以打印初始值
# rclpy.spin(node)
Node.get_parameter(name)
: 返回一个rcl_interfaces.msg.Parameter
对象。该对象包含.name
,.value
, 和.type_
(实际为ParameterValue
类型) 属性。要获取实际值,请使用.value
。Node.get_parameter_value(name)
: 一个便捷方法,直接返回参数的值。如果参数未声明,则引发ParameterNotDeclaredException
。Node.get_parameters(names_list)
: 批量获取参数,返回一个Parameter
对象列表。Node.has_parameter(name)
: 检查参数是否已声明。
参数值可以通过多种方式设置:
- 声明时的默认值: 如上所示。
- Launch 文件: 通过在启动节点时传递参数文件或直接指定值(见下文YAML部分)。
- 命令行: 使用
ros2 param set
(见下文命令行部分)。 - 节点内部: 节点可以设置自己的参数值,但这通常用于响应某些内部逻辑,而不是常规配置。
- 参数回调: 当外部尝试设置参数时,回调函数可以验证并接受或拒绝更改。
节点内部设置参数:
# ... (在 ParameterBasicsNode 中) ...
def set_some_parameters_internally(self):
# 假设我们想根据某些条件更新参数
# 注意:通常不建议节点频繁修改自己声明的、主要用于外部配置的参数
# 但对于内部状态或动态调整的参数可能是合理的。
new_string_param = rclpy.parameter.Parameter(
'my_string_param',
rclpy.parameter.Parameter.Type.STRING,
'internally_set_value'
)
new_int_param = rclpy.parameter.Parameter(
'my_int_param',
rclpy.parameter.Parameter.Type.INTEGER,
99
)
# set_parameters 需要一个 Parameter 列表
set_result_list = self.set_parameters([new_string_param, new_int_param])
for result in set_result_list:
if result.successful:
self.get_logger().info(f"Successfully set parameter (internally)")
else:
self.get_logger().warn(f"Failed to set parameter (internally): {result.reason}")
# 尝试设置只读参数将会失败 (如果通过参数服务,此处直接调用可能绕过只读检查,需谨慎)
# readonly_update = rclpy.parameter.Parameter('readonly_param', rclpy.parameter.Parameter.Type.STRING, 'new_value')
# set_readonly_result = self.set_parameters([readonly_update])
# self.get_logger().info(f"Set readonly param result: {set_readonly_result[0].successful}, reason: {set_readonly_result[0].reason}")
# ... (在 main 中,可以在 spin 之前调用一次 node.set_some_parameters_internally() 来观察效果) ...
Node.set_parameters(parameters_list)
: 尝试设置参数列表。parameters_list
是rclpy.parameter.Parameter
对象的列表。返回一个SetParametersResult
对象列表,指示每个参数的设置是否成功。Node.set_parameters_atomically(parameters_list)
: 与set_parameters
类似,但保证原子性:要么所有参数都成功设置,要么一个都不设置。
当外部(例如通过 ros2 param set
或其他节点的服务调用)尝试更改节点的参数时,可以注册一个回调函数。这个回调函数在参数实际更改之前被调用,允许节点验证新值并决定是否接受更改。
# parameter_callback_node.py
import rclpy
from rclpy.node import Node
from rcl_interfaces.msg import ParameterDescriptor, ParameterType, SetParametersResult
from rclpy.parameter import Parameter
class ParameterCallbackNode(Node):
def __init__(self):
super().__init__('parameter_callback_node')
# 声明一个可调参数
tunable_param_descriptor = ParameterDescriptor(
type=ParameterType.PARAMETER_INTEGER,
description='An integer parameter that can be tuned, must be positive.'
)
self.declare_parameter('tunable_int', 10, tunable_param_descriptor)
# 注册参数设置回调函数
self.add_on_set_parameters_callback(self.parameters_callback)
self.get_logger().info("Parameter callback node started. Try setting 'tunable_int'.")
self.get_logger().info(f"Initial tunable_int: {self.get_parameter('tunable_int').value}")
def parameters_callback(self, params: list[Parameter]):
# params 是一个包含尝试设置的参数的列表
# 回调必须返回一个 SetParametersResult 对象
result = SetParametersResult(successful=True) # 默认为成功
for param in params:
if param.name == "tunable_int":
current_value = self.get_parameter("tunable_int").value # 获取当前值,用于比较或记录
self.get_logger().info(f"Attempting to change 'tunable_int' from {current_value} to {param.value}")
if param.type_ != Parameter.Type.INTEGER:
result.successful = False
result.reason = f"Parameter 'tunable_int' must be an integer, got {param.type_}"
self.get_logger().warn(result.reason)
return result # 一旦失败,立即返回
if param.value <= 0:
result.successful = False
result.reason = "Parameter 'tunable_int' must be positive."
self.get_logger().warn(result.reason)
return result
self.get_logger().info(f"'tunable_int' successfully validated, will be set to {param.value}.")
else:
# 如果尝试设置未声明或不希望通过回调更改的参数
self.get_logger().warn(f"Ignoring attempt to set unhandled parameter '{param.name}' via callback.")
return result # 所有检查通过,返回成功
def main(args=None):
rclpy.init(args=args)
node = ParameterCallbackNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
# 当节点销毁时,回调会自动注销
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Node.add_on_set_parameters_callback(callback_function)
:
callback_function
: 原型为def my_callback(self, params: List[Parameter]) -> SetParametersResult
。params
: 一个rclpy.parameter.Parameter
对象列表,包含所有正在尝试设置的参数(名称和新值)。- 返回值: 必须是一个
rcl_interfaces.msg.SetParametersResult
对象。successful
(bool): 如果所有参数更改都被接受,则为True
。reason
(str): 如果successful
为False
,提供拒绝更改的原因。
如果回调返回 successful=True
,参数值将被更新。否则,参数值保持不变。一个节点可以注册多个参数回调,它们会按注册顺序被调用。任何一个回调返回 False
都会阻止参数的更改。
将参数配置存储在YAML文件中是一种常见的做法,便于管理和版本控制。Launch文件可以直接加载这些YAML文件。
YAML 文件结构:
# my_params.yaml
parameter_basics_node: # 节点名称 (与节点初始化时使用的名称完全匹配)
ros__parameters: # 固定前缀
my_string_param: "value_from_yaml"
my_int_param: 99
my_double_param: 0.75
my_bool_param: false
# readonly_param: "attempt_to_set_readonly" # 即使在YAML中设置只读参数,通常也会被忽略或报错
motion_control: # 对应声明时的命名空间
max_velocity: 2.0
max_acceleration: 0.8
pid: # 嵌套命名空间
kp: 1.5
ki: 0.15
kd: 0.03
# 如果有其他节点,可以在同一个YAML文件中为它们配置参数
# another_node_name:
# ros__parameters:
# some_param: value
- 顶层键是节点名称。
- 每个节点下必须有一个
ros__parameters:
键。 - 然后是参数名和对应的值。嵌套结构可以用于匹配参数声明时的命名空间。
通过Launch文件加载YAML参数:
# my_launch_file.py
from launch import LaunchDescription
from launch_ros.actions import Node
import os
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
# 获取参数文件的路径
my_package_name = 'my_python_pkg' # 假设您的包名
config_file_path = os.path.join(
get_package_share_directory(my_package_name),
'config', # 假设YAML文件在包的 'config' 目录下
'my_params.yaml'
)
return LaunchDescription([
Node(
package=my_package_name,
executable='parameter_basics_node_executable', # 您的节点可执行文件名
name='parameter_basics_node', # 确保与YAML中的节点名匹配
output='screen',
emulate_tty=True,
parameters=[config_file_path] # 传递参数文件列表
),
Node(
package=my_package_name,
executable='parameter_callback_node_executable',
name='parameter_callback_node',
output='screen',
emulate_tty=True,
# 也可以直接在Launch文件中指定参数值
parameters=[
{'tunable_int': 25},
# {'another_param_for_this_node': 'value'}
]
)
])
在 launch_ros.actions.Node
的 parameters
参数中:
- 可以传递一个包含YAML文件路径的列表。
- 也可以传递一个字典列表,每个字典直接定义参数键值对,例如
[{'param_name': value}]
。 - 还可以混合使用文件路径和字典。
当节点通过Launch文件启动并加载了参数文件时,这些值会覆盖在代码中 declare_parameter
时指定的默认值(除非 ignore_override=True
)。
ROS 2提供了 ros2 param
系列命令行工具来与正在运行的节点的参数进行交互。
-
ros2 param list <node_name>
: 列出指定节点的所有已声明参数。ros2 param list /parameter_basics_node
-
ros2 param get <node_name> <param_name>
: 获取指定节点上特定参数的当前值。ros2 param get /parameter_basics_node my_int_param
-
ros2 param set <node_name> <param_name> <value>
: 设置指定节点上特定参数的值。ros2 param set /parameter_callback_node tunable_int 50 # 应该成功 ros2 param set /parameter_callback_node tunable_int -5 # 应该被回调拒绝
<value>
的格式取决于参数类型,通常是直接的字面量。对于字符串,如果包含空格,可能需要引号。对于布尔值,使用true
或false
。 -
ros2 param describe <node_name> <param_name>
: 显示特定参数的描述符信息(类型、描述、约束等)。ros2 param describe /parameter_basics_node my_int_param
-
ros2 param dump <node_name> [--output-dir <directory>] > params.yaml
: 将指定节点当前所有参数的值转储(保存)到一个YAML文件中。ros2 param dump /parameter_basics_node > current_basics_params.yaml
-
ros2 param load <node_name> <yaml_file_path>
: 从指定的YAML文件加载参数到正在运行的节点。# 假设有一个 new_params.yaml 文件 # ros2 param load /parameter_basics_node ./new_params.yaml
注意:
ros2 param load
尝试设置参数,如果节点有参数回调,这些回调会被触发并可能拒绝更改。
- 配置: 用于配置传感器采样率、控制算法增益、机器人物理属性、行为阈值等。
- 模式切换: 使用布尔或枚举型参数在不同操作模式间切换。
- 调试: 动态调整参数以观察系统行为变化。
- 可重用性: 通过参数化使节点适用于不同机器人或环境。
- 默认值: 为所有参数提供合理的默认值。
- 描述符: 尽可能使用
ParameterDescriptor
来指定类型、描述和约束,这有助于工具(如ros2 param describe
)和用户理解参数。 - YAML配置: 优先使用YAML文件进行静态配置,并将其纳入版本控制。
- 参数回调: 用于验证参数更改,防止无效值导致节点行为异常。仅在必要时使用,避免回调逻辑过于复杂。
- 命名约定: 使用清晰、一致的参数命名,可以利用命名空间进行组织。
通过本教程,您应该对ROS 2中的参数机制有了全面的理解,包括如何在Python节点中声明、获取、设置参数,如何响应参数变化,以及如何利用YAML文件和命令行工具进行参数管理。参数是构建健壮、灵活的ROS 2应用程序的关键组成部分。
在机器人系统中,通常存在多个坐标系(Frames of Reference)。例如,机器人本体有一个坐标系 (base_link
),每个传感器(如激光雷达、摄像头)有其自身的坐标系,地图也有一个坐标系 (map
),而目标物体可能在世界坐标系 (odom
或 world
) 中。TF2是一个ROS 2库,用于跟踪和管理这些坐标系之间的关系,并允许您在任意两个已连接的坐标系之间进行坐标点、向量或位姿的变换。
本教程将介绍TF2的基本概念,以及如何在Python节点中使用tf2_ros
库来发布(广播)和监听(查询)坐标变换。
- 坐标系 (Frame): 一个具有原点和三个正交轴(X, Y, Z)的参考系统。每个坐标系都有一个唯一的字符串ID,称为
frame_id
(例如,base_link
,laser_frame
,map
)。 - 变换 (Transform): 描述一个坐标系相对于另一个坐标系的位置和姿态(旋转)。一个变换通常由一个平移向量(translation)和一个旋转四元数(rotation quaternion)表示。
- TF树 (TF Tree): 所有已知的坐标系及其它们之间的变换关系构成一个树状结构(或有时是多个树,但不应有循环)。每个变换都定义了一个父坐标系(parent frame)和一个子坐标系(child frame)。
- 时间戳 (Timestamp): 每个变换都关联一个时间戳,表示该变换在该特定时间有效。TF2可以处理随时间动态变化的坐标系关系。
TF2的作用:
- 保持坐标系关系同步: 允许系统中的不同部分了解各个组件的相对位置和姿态。
- 数据转换: 将一个坐标系下的数据(如传感器读数、目标点)转换到另一个坐标系下,以便进行比较、融合或控制。
TF2使用 geometry_msgs/msg/TransformStamped
消息来表示和传输单个坐标变换。其结构如下:
# geometry_msgs/msg/TransformStamped.msg
std_msgs/Header header # 包含时间戳 (stamp) 和父坐标系的ID (frame_id)
string child_frame_id # 子坐标系的ID
geometry_msgs/Transform transform # 变换本身 (平移和旋转)
header.stamp
: 此变换有效的时间点。header.frame_id
: 父坐标系的名称。child_frame_id
: 子坐标系的名称。transform
: 一个geometry_msgs/msg/Transform
对象,包含:translation
(geometry_msgs/msg/Vector3
): 从父坐标系原点到子坐标系原点的平移向量 (x, y, z),表示在父坐标系下。rotation
(geometry_msgs/msg/Quaternion
): 将父坐标系的姿态旋转到子坐标系姿态的四元数 (x, y, z, w)。
重要理解: TransformStamped
描述的是 从 header.frame_id
(父) 到 child_frame_id
(子) 的变换。也就是说,它定义了子坐标系在父坐标系中的位置和姿态。
节点如果知道某些坐标系之间的关系(例如,通过机器人模型URDF、传感器标定或运动学计算),就需要将这些变换发布到TF2系统中,供其他节点使用。这通过 tf2_ros.TransformBroadcaster
(用于动态变换) 或 tf2_ros.StaticTransformBroadcaster
(用于不随时间改变的静态变换) 来完成。
动态变换是指随时间可能发生变化的坐标系关系,例如机器人底盘相对于里程计坐标系的移动。
# dynamic_tf_broadcaster_node.py
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import TransformStamped
from tf2_ros import TransformBroadcaster
import math # 用于计算旋转
class DynamicFrameBroadcaster(Node):
def __init__(self):
super().__init__('dynamic_frame_broadcaster')
self.br = TransformBroadcaster(self) # 创建TransformBroadcaster实例
self.timer = self.create_timer(0.1, self.broadcast_timer_callback) # 10Hz
self.declare_parameter('parent_frame', 'world')
self.declare_parameter('child_frame', 'dynamic_child')
self.x_translation = 0.0
self.angle = 0.0
def broadcast_timer_callback(self):
parent_frame = self.get_parameter('parent_frame').value
child_frame = self.get_parameter('child_frame').value
t = TransformStamped()
# 填充Header
t.header.stamp = self.get_clock().now().to_msg()
t.header.frame_id = parent_frame # 父坐标系
t.child_frame_id = child_frame # 子坐标系
# 模拟子坐标系在父坐标系下的平移和旋转
self.x_translation += 0.01
if self.x_translation > 1.0:
self.x_translation = 0.0
self.angle += 0.05 # 弧度
if self.angle > 2 * math.pi:
self.angle -= 2 * math.pi
# 设置平移
t.transform.translation.x = self.x_translation
t.transform.translation.y = 0.5 # 固定Y平移
t.transform.translation.z = 0.0
# 设置旋转 (绕Z轴旋转)
# 四元数表示: q = [cos(angle/2), sin(angle/2)*axis_x, sin(angle/2)*axis_y, sin(angle/2)*axis_z]
# 对于绕Z轴: axis_x=0, axis_y=0, axis_z=1
t.transform.rotation.x = 0.0
t.transform.rotation.y = 0.0
t.transform.rotation.z = math.sin(self.angle / 2)
t.transform.rotation.w = math.cos(self.angle / 2)
# 发送变换
self.br.sendTransform(t)
# self.get_logger().info(f"Publishing transform from '{parent_frame}' to '{child_frame}'")
def main(args=None):
rclpy.init(args=args)
node = DynamicFrameBroadcaster()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
TransformBroadcaster(self)
: 创建一个广播器实例,与当前节点关联。self.br.sendTransform(transform_stamped_message)
: 发布一个TransformStamped
消息。TF2系统会自动将这些消息发布到一个名为/tf
的特殊主题上(对于动态变换)。
静态变换是指那些一旦定义就不会改变的坐标系关系,例如机器人上固定安装的传感器相对于机器人基座的变换。静态变换由 StaticTransformBroadcaster
发布,它们被发布到 /tf_static
主题,并且通常只发布一次。TF2系统会缓存这些静态变换。
# static_tf_broadcaster_node.py
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import TransformStamped
from tf2_ros.static_transform_broadcaster import StaticTransformBroadcaster # 注意导入
import math
class StaticFrameBroadcaster(Node):
def __init__(self):
super().__init__('static_frame_broadcaster')
self.static_br = StaticTransformBroadcaster(self) # 创建StaticTransformBroadcaster
# 创建并填充一个TransformStamped消息
# 示例:激光雷达相对于机器人底盘 (`base_link`) 的变换
static_transform_stamped = TransformStamped()
static_transform_stamped.header.stamp = self.get_clock().now().to_msg() # 时间戳仍然需要
static_transform_stamped.header.frame_id = 'base_link' # 父坐标系
static_transform_stamped.child_frame_id = 'laser_sensor_frame' # 子坐标系
# 假设激光雷达在base_link前方0.2米,上方0.1米处
static_transform_stamped.transform.translation.x = 0.2
static_transform_stamped.transform.translation.y = 0.0
static_transform_stamped.transform.translation.z = 0.1
# 假设激光雷达没有额外的旋转 (与base_link姿态一致)
static_transform_stamped.transform.rotation.x = 0.0
static_transform_stamped.transform.rotation.y = 0.0
static_transform_stamped.transform.rotation.z = 0.0
static_transform_stamped.transform.rotation.w = 1.0
# 发送静态变换 (通常在初始化时发送一次即可)
self.static_br.sendTransform(static_transform_stamped)
self.get_logger().info("Static transform from 'base_link' to 'laser_sensor_frame' published.")
# 注意:对于静态广播器,节点通常不需要spin,除非它还有其他任务。
# 如果只是发布静态TF,节点可以完成初始化后就结束。
# 但为了示例的完整性,我们让它保持运行。
def main(args=None):
rclpy.init(args=args)
node = StaticFrameBroadcaster()
# 对于仅发布静态TF的节点,spin可能不是必需的,因为它只在启动时发布一次。
# 但如果节点还有其他功能,或者为了保持与其他节点结构一致,可以spin。
try:
# rclpy.spin(node) # 对于纯静态TF发布者,可以注释掉spin,让其执行完就退出
rclpy.get_global_executor().add_node(node) # 另一种保持节点活动的方式,如果需要
while rclpy.ok(): # 简单地保持主线程活动
rclpy.spin_once(node, timeout_sec=0.1) # 允许其他潜在的rclpy活动
if not node.executor or not node.executor.context.ok(): # 检查节点是否已销毁
break
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
StaticTransformBroadcaster(self)
: 创建静态广播器。self.static_br.sendTransform(transform_stamped_message or list_of_messages)
: 发布一个或多个静态变换。这些消息被发送到/tf_static
主题。
节点如果需要知道两个坐标系之间的变换关系,或者需要将数据从一个坐标系转换到另一个,就需要使用 tf2_ros.Buffer
和 tf2_ros.TransformListener
。
tf2_ros.Buffer
: 缓存所有接收到的TF数据(来自/tf
和/tf_static
),并提供查询变换的API。tf2_ros.TransformListener
: 订阅/tf
和/tf_static
主题,并将接收到的变换数据填充到Buffer
中。
# tf_listener_node.py
import rclpy
from rclpy.node import Node
from tf2_ros.buffer import Buffer # TF2 Buffer
from tf2_ros.transform_listener import TransformListener # TF2 Listener
from geometry_msgs.msg import PointStamped # 用于示例点变换
# 导入 tf2_geometry_msgs 以便使用 do_transform_point 等辅助函数 (如果安装了)
# 如果没有安装,需要手动进行点变换的数学计算
try:
from tf2_geometry_msgs import do_transform_point # Python helper
except ImportError:
print("tf2_geometry_msgs not found. Point transformation will be manual if needed.")
def do_transform_point(point_stamped, transform_stamped):
# 简化的手动变换 (仅平移,未实现旋转) - 生产代码中应使用完整的库或实现
# 这只是一个占位符,实际的变换需要四元数乘法等
transformed_point = PointStamped()
transformed_point.header.stamp = point_stamped.header.stamp
transformed_point.header.frame_id = transform_stamped.header.frame_id # Target frame
transformed_point.point.x = point_stamped.point.x + transform_stamped.transform.translation.x
transformed_point.point.y = point_stamped.point.y + transform_stamped.transform.translation.y
transformed_point.point.z = point_stamped.point.z + transform_stamped.transform.translation.z
return transformed_point
import tf_transformations # 用于四元数和变换矩阵操作
class FrameListener(Node):
def __init__(self):
super().__init__('tf_frame_listener')
# 声明参数
self.declare_parameter('target_frame', 'world') # 目标坐标系
self.declare_parameter('source_frame', 'dynamic_child') # 源坐标系
# 创建TF2 Buffer和Listener
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self) # Listener需要Buffer和Node
# 用于示例的点
self.point_in_source_frame = PointStamped()
self.point_in_source_frame.header.frame_id = self.get_parameter('source_frame').value
self.point_in_source_frame.point.x = 1.0
self.point_in_source_frame.point.y = 0.5
self.point_in_source_frame.point.z = 0.0
self.timer = self.create_timer(1.0, self.timer_callback) # 1Hz
def timer_callback(self):
target_frame = self.get_parameter('target_frame').value
source_frame = self.get_parameter('source_frame').value
self.point_in_source_frame.header.frame_id = source_frame # 更新点的源坐标系以防参数更改
try:
# 等待并获取从 source_frame 到 target_frame 的变换
# rclpy.time.Time() 获取最新的可用变换
# self.get_clock().now() 也可以,但 rclpy.time.Time() 更通用于TF
when = rclpy.time.Time() # 或者 self.get_clock().now().to_msg() for a specific time
# 检查变换是否存在
if self.tf_buffer.can_transform(target_frame, source_frame, when, timeout=rclpy.duration.Duration(seconds=0.5)):
trans_stamped = self.tf_buffer.lookup_transform(
target_frame, # 目标坐标系
source_frame, # 源坐标系
when # 时间点 (rclpy.time.Time() for latest)
# 也可以是 rclpy.time.Time(seconds=0) 表示最新的
# 或者是一个特定的时间戳 msg = self.get_clock().now().to_msg()
)
self.get_logger().info(
f"Transform from '{source_frame}' to '{target_frame}':\n"
f" Translation: [{trans_stamped.transform.translation.x:.2f}, "
f"{trans_stamped.transform.translation.y:.2f}, "
f"{trans_stamped.transform.translation.z:.2f}]\n"
f" Rotation (quaternion): [{trans_stamped.transform.rotation.x:.2f}, "
f"{trans_stamped.transform.rotation.y:.2f}, "
f"{trans_stamped.transform.rotation.z:.2f}, "
f"{trans_stamped.transform.rotation.w:.2f}]"
)
# 示例:变换一个点
self.point_in_source_frame.header.stamp = self.get_clock().now().to_msg() # 点的时间戳
# 使用 tf2_geometry_msgs 进行点变换 (推荐)
# transformed_point = do_transform_point(self.point_in_source_frame, trans_stamped)
# 或者手动进行点变换 (更复杂,需要处理旋转)
# 以下是一个使用tf_transformations库进行完整变换的示例
# 1. 将点和平移向量转换为numpy数组
p_source = [self.point_in_source_frame.point.x, self.point_in_source_frame.point.y, self.point_in_source_frame.point.z]
translation_s_to_t = [trans_stamped.transform.translation.x,
trans_stamped.transform.translation.y,
trans_stamped.transform.translation.z]
rotation_s_to_t_quat = [trans_stamped.transform.rotation.x,
trans_stamped.transform.rotation.y,
trans_stamped.transform.rotation.z,
trans_stamped.transform.rotation.w]
# 2. 将四元数转换为旋转矩阵
rot_matrix_s_to_t = tf_transformations.quaternion_matrix(rotation_s_to_t_quat)[:3,:3]
# 3. 变换点: p_target = R_s_t * p_source + T_s_t
p_target = rot_matrix_s_to_t.dot(p_source) + translation_s_to_t
transformed_point_manual = PointStamped()
transformed_point_manual.header.stamp = self.point_in_source_frame.header.stamp
transformed_point_manual.header.frame_id = target_frame
transformed_point_manual.point.x = p_target[0]
transformed_point_manual.point.y = p_target[1]
transformed_point_manual.point.z = p_target[2]
self.get_logger().info(
f"Point ({self.point_in_source_frame.point.x:.2f}, {self.point_in_source_frame.point.y:.2f}) "
f"in '{source_frame}' is "
f"({transformed_point_manual.point.x:.2f}, {transformed_point_manual.point.y:.2f}) "
f"in '{target_frame}' (manual calc)."
)
else:
self.get_logger().warn(
f"Could not get transform from '{source_frame}' to '{target_frame}' within timeout.")
except Exception as e: # tf2通常会抛出特定的异常,如 tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException
self.get_logger().error(f"Failed to get transform or transform point: {e}")
def main(args=None):
rclpy.init(args=args)
node = FrameListener()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
-
self.tf_buffer = Buffer()
: 创建TF缓存。可以指定缓存的持续时间 (默认10秒)。 -
self.tf_listener = TransformListener(self.tf_buffer, self)
: 创建监听器,它会自动订阅/tf
和/tf_static
并用数据填充self.tf_buffer
。 -
self.tf_buffer.can_transform(target_frame, source_frame, time, timeout=Duration)
: 检查在指定time
(或其附近,取决于TF2的插值策略) 是否存在从source_frame
到target_frame
的变换路径。timeout
是等待数据到达的可选超时。 -
self.tf_buffer.lookup_transform(target_frame, source_frame, time, timeout=Duration)
: 获取从source_frame
到target_frame
在指定time
的变换。target_frame
: 目标坐标系 (我们想将数据转换到的坐标系)。source_frame
: 源坐标系 (数据当前所在的坐标系)。time
:rclpy.time.Time
对象。rclpy.time.Time()
(或rclpy.time.Time(seconds=0, nanoseconds=0)
) 通常表示“获取最新的可用变换”。- 传递一个具体的时间戳(例如从消息头中获取)可以获取该时间点的历史变换。
timeout
: 可选的等待数据到达的超时时间 (rclpy.duration.Duration
)。- 如果找不到变换,会抛出相应的
tf2_ros
异常。
-
tf2_geometry_msgs.do_transform_point(point_stamped, transform_stamped)
: (需要安装tf2_geometry_msgs
包) 这是一个便捷函数,用于将geometry_msgs.msg.PointStamped
对象从其原始坐标系转换到由transform_stamped.header.frame_id
指定的目标坐标系。transform_stamped
必须是从点的原始坐标系到目标坐标系的变换。类似地,还有do_transform_pose
,do_transform_vector3
等函数用于其他geometry_msgs
类型。 -
手动变换: 如果不使用
tf2_geometry_msgs
,则需要手动进行矩阵和向量运算。tf_transformations
库 (通常随ROS安装) 提供了四元数、欧拉角、变换矩阵之间转换的函数,非常有用。tf_transformations.quaternion_matrix(quaternion)
: 将四元数转换为4x4齐次变换矩阵。tf_transformations.translation_matrix(translation_vector)
: 创建平移的4x4矩阵。- 可以组合这些矩阵,然后乘以点的齐次坐标
[x, y, z, 1]
。
ros2 run tf2_tools view_frames
: 生成一个PDF文件 (frames.pdf
),可视化当前的TF树结构。非常有用!ros2 run tf2_tools view_frames evince frames.pdf
ros2 run tf2_ros tf2_echo <source_frame> <target_frame>
: 在命令行实时打印从source_frame
到target_frame
的变换。ros2 run tf2_ros tf2_echo world dynamic_child
- RViz2: RViz2是强大的3D可视化工具。
- 添加一个 "TF" Display类型,可以可视化所有已发布的坐标系及其关系。
- 许多其他Display类型(如
LaserScan
,PointCloud2
,Marker
)都有一个 "Target Frame" 或 "Fixed Frame" 属性,RViz2会使用TF2将这些数据显示在选定的目标坐标系中。
- TF树断裂: 如果
lookup_transform
找不到两个坐标系之间的路径,会抛出tf2_ros.ConnectivityException
。确保所有必要的变换都被正确发布。 - 时间同步:
lookup_transform
对时间非常敏感。ExtrapolationException
: 如果请求的时间点早于Buffer中最早的数据或晚于最新的数据(超出了插值范围),会抛出此异常。- 确保发布变换的节点和监听变换的节点大致时间同步(NTP通常能解决)。
rclpy.time.Time()
(或Time(seconds=0)
) 请求最新变换,可以减少外插问题,但可能不适用于需要特定历史时刻变换的场景。- Buffer的缓存时间 (
Buffer(cache_time=Duration(seconds=10.0))
) 也会影响能查询多久以前的数据。
- 坐标系命名约定:
- 使用小写字母、数字和下划线。
- 避免使用
/
开头,除非您明确知道是全局根坐标系(如map
或odom
通常是相对于某个隐式世界根的)。 - 常用标准名称:
base_link
(机器人移动基座),base_footprint
(在地面上的投影),map
(静态世界地图),odom
(里程计坐标系,可能会漂移),camera_link
,laser_frame
等。
tf2_geometry_msgs
: 强烈建议安装并使用tf2_geometry_msgs
Python包(通常通过sudo apt install ros-<distro>-tf2-geometry-msgs
安装)来进行点、位姿等几何类型的变换,它处理了复杂的数学运算并减少了出错的可能。
通过本教程,您应该对ROS 2中的TF2坐标变换系统有了扎实的理解,包括如何发布动态和静态变换,以及如何监听和查询变换以在不同坐标系间转换数据。TF2是构建任何移动或操作机器人的基础。
ROS 2 提供了一系列强大的图形化工具和实用程序,用于可视化数据、监控系统状态、调试节点以及与ROS网络交互。本教程将介绍三个最常用的工具/系统:RViz2(3D可视化工具)、RQT工具套件(插件式图形界面框架)以及ROS 2的日志系统。
RViz2是ROS 2的主要3D可视化工具。它允许您查看来自传感器(如激光雷达、摄像头、点云)、机器人模型(URDF)、TF坐标系、导航数据(如地图、路径、里程计)、标记(Markers)等多种类型的数据,并将它们融合在一个3D场景中。
如果安装了 ros-<distro>-desktop
,RViz2通常已经包含在内。可以通过以下命令启动:
rviz2
或者,如果想加载一个特定的配置文件:
rviz2 -d /path/to/your/config.rviz
RViz2的主要界面组件包括:
- 3D视图 (3D View): 主窗口,显示3D场景。
- 显示面板 (Displays Panel): 左侧面板,用于添加、配置和管理各种“显示类型 (Displays)”。每个Display负责可视化特定类型的数据(例如,一个TF Display显示坐标系,一个LaserScan Display显示激光雷达数据)。
- 视图控制器 (Views Panel): 通常在右上方,允许您选择和配置不同的3D视图控制方式(例如,Orbit, FPS, TopDownOrtho)。
- 工具面板 (Tools Panel): 通常在顶部工具栏,提供交互工具,如发布目标点(
2D Nav Goal
)、测量距离等。 - 全局选项 (Global Options): 在显示面板的顶部,设置全局参数,如固定坐标系 (
Fixed Frame
)。 - 时间面板 (Time Panel): 通常在底部,显示当前ROS时间,并允许控制时间的显示(例如,回放bag文件时)。
-
设置固定坐标系 (Fixed Frame): 这是RViz2中最重要的设置之一。所有其他数据都将相对于这个选定的固定坐标系进行变换和显示。通常选择一个相对静止的坐标系,如
map
,odom
, 或者机器人模型的根连杆base_link
。 在显示面板的 "Global Options" -> "Fixed Frame" 中设置。 -
添加显示类型 (Adding Displays): 点击显示面板左下角的 "Add" 按钮。会弹出一个窗口,列出所有可用的显示类型。选择一个,例如 "TF",然后点击 "OK"。
-
配置显示类型: 每个添加的Display都有其特定的配置选项。例如:
- TF Display:
Show Names
: 是否显示坐标系名称。Show Axes
: 是否显示坐标系轴。Show Arrows
: 是否显示变换关系箭头。Marker Scale
: 调整显示大小。
- LaserScan Display:
Topic
: 选择要订阅的包含sensor_msgs/msg/LaserScan
消息的主题。Style
: 点的样式(Points, Lines, Squares)。Size (m)
或Size (Pixels)
: 点的大小。Color Transformer
: 如何为激光点着色(例如,按强度、按距离)。
- Image Display:
Image Topic
: 选择包含sensor_msgs/msg/Image
消息的主题。
- RobotModel Display:
Description Topic
: 通常是/robot_description
,包含机器人URDF模型的XML字符串。TF Prefix
: 如果机器人模型的TF坐标系有前缀。
- Map Display:
Topic
: 选择包含nav_msgs/msg/OccupancyGrid
消息的主题。Color Scheme
: 地图颜色方案。
- Marker Display / MarkerArray Display:
Marker Topic
: 选择包含visualization_msgs/msg/Marker
或MarkerArray
消息的主题。标记用于在RViz2中绘制自定义的几何形状、文本等。
- TF Display:
-
保存和加载配置:
- File -> Save Config As...: 将当前的RViz2布局和所有Display配置保存到一个
.rviz
文件中。 - File -> Open Config: 加载一个之前保存的
.rviz
文件。 这对于快速恢复常用的可视化设置非常有用。
- File -> Save Config As...: 将当前的RViz2布局和所有Display配置保存到一个
- TF: 可视化坐标系及其关系。
- RobotModel: 显示机器人的3D模型(需要URDF)。
- LaserScan: 显示2D激光雷达数据。
- PointCloud2: 显示3D点云数据。
- Image: 在RViz2的一个面板中显示图像。
- Map: 显示占据栅格地图。
- Path: 显示由一系列位姿组成的路径 (
nav_msgs/msg/Path
)。 - Odometry: 显示机器人的里程计信息(通常是一个箭头表示位姿,以及可选的协方差椭圆)。
- Pose: 显示单个位姿 (
geometry_msgs/msg/PoseStamped
)。 - PoseArray: 显示一组位姿。
- Marker/MarkerArray: 显示由节点发布的自定义可视化标记。非常灵活,可以绘制形状、文本、箭头等。
- Grid: 在3D场景中显示一个参考网格。
通过组合这些Display,您可以构建出强大的可视化界面来理解机器人的状态和其感知到的环境。
RQT是一个基于Qt的图形用户界面框架,它允许通过插件的形式组合各种GUI工具。许多标准的ROS工具都有RQT插件版本。
可以通过以下命令启动RQT主程序:
rqt
启动后,会打开一个空窗口。您可以从菜单栏的 "Plugins" 中选择并添加各种工具插件。
或者,您可以直接启动某个特定的RQT插件:
rqt_graph
rqt_plot
rqt_console
# 等等
-
rqt_graph
(Plugin: Introspection -> Node Graph): 可视化当前ROS网络中活动的节点、主题、服务和动作,以及它们之间的连接关系。非常有助于理解系统的通信架构。- 可以过滤显示的元素类型(例如,只显示节点和主题)。
- 可以高亮特定节点或主题。
-
rqt_plot
(Plugin: Visualization -> Plot): 实时绘制一个或多个主题发布的数据。可以绘制数值型消息字段随时间的变化曲线。- 在 "Topic" 字段输入主题名称(例如
/chatter
)。 - 在 "Field" 字段输入要绘制的消息字段路径(例如
/data
对于std_msgs/String
,或/pose/position/x
对于nav_msgs/Odometry
)。 - 可以添加多个曲线到同一个图表中。
- 在 "Topic" 字段输入主题名称(例如
-
rqt_console
(Plugin: Logging -> Console): 显示来自所有ROS节点的日志消息(DEBUG, INFO, WARN, ERROR, FATAL)。- 可以按严重级别、节点名称、消息内容等进行过滤。
- 可以高亮特定消息。
- 比直接在终端看日志更方便管理。
-
rqt_logger_level
(Plugin: Logging -> Logger Level): 允许您在运行时动态更改节点的日志级别。- 选择一个节点,然后为其日志记录器设置新的级别(例如,从INFO改为DEBUG以获取更详细的输出)。
-
rqt_topic
(Plugin: Introspection -> Topic Monitor): 监视主题的详细信息,包括发布频率、带宽、消息类型,并可以查看最近发布的消息内容。 -
rqt_publisher
(Plugin: Introspection -> Topic Publisher): 图形化地发布消息到指定主题。可以选择消息类型,填充字段,并设置发布频率。 -
rqt_service_caller
(Plugin: Introspection -> Service Caller): 图形化地调用服务。选择服务名称和类型,填充请求字段,然后发送请求并查看响应。 -
rqt_action
(Plugin: Introspection -> Action GUI): 图形化地与动作服务端交互。选择动作名称和类型,发送目标,查看反馈和结果。 -
rqt_image_view
(Plugin: Visualization -> Image View): 订阅图像主题并在窗口中显示图像。 -
rqt_reconfigure
(Plugin: Configuration -> Dynamic Reconfigure): (主要用于ROS 1的dynamic_reconfigure
,ROS 2中参数管理方式不同,但某些兼容层或特定节点可能仍使用类似机制)。ROS 2主要通过参数服务和ros2 param set
进行运行时配置。
RQT允许您在同一个窗口中打开和排列多个插件。您可以将常用的插件组合保存为一个“视角 (Perspective)”,方便下次快速加载。
- File -> Save Perspective As...
- File -> Load Perspective
良好的日志记录对于开发、调试和监控ROS应用程序至关重要。rclpy
(ROS 2 Python客户端库)提供了内置的日志功能。
在Python节点中,通过 self.get_logger()
方法获取与该节点关联的日志记录器实例:
# In your rclpy.node.Node subclass:
logger = self.get_logger()
日志记录器提供了对应不同严重级别的方法:
logger.debug("Detailed debugging message.")
logger.info("Informational message about normal operation.")
logger.warn("Warning: something unexpected happened but not critical.")
logger.error("Error: a problem occurred that might affect functionality.")
logger.fatal("Fatal error: a critical problem, node might be unable to continue.")
- 控制台 (Console): 默认情况下,
INFO
及以上级别的日志会输出到运行节点的终端(stdout
),WARN
及以上级别可能会输出到stderr
。 rqt_console
: 如前所述,rqt_console
会聚合所有节点的日志。- 日志文件: ROS 2会自动将日志消息写入文件。这些文件通常位于:
~/.ros/log/<run_id>/
(全局日志目录)<workspace>/log/<package_name>/<node_executable_name>/
(如果通过colcon test
或特定Launch配置运行) 日志文件名通常包含节点名和时间戳。
可以在运行时或启动时配置节点的日志级别。
-
通过命令行参数 (启动时):
# 将 my_node_name 节点的日志级别设置为 DEBUG ros2 run my_package my_node_executable --ros-args --log-level my_node_name:=debug # 将所有节点的默认日志级别设置为 DEBUG ros2 run my_package my_node_executable --ros-args --log-level:=debug
这里的
my_node_name
是节点初始化时super().__init__('my_node_name')
中使用的名称。 -
通过
rqt_logger_level
(运行时): 如图形化工具部分所述。 -
通过Launch文件 (启动时): 可以在
Node
Action中设置ros_arguments
或特定的日志配置参数 (取决于ROS 2发行版和具体实现,有时通过设置环境变量或传递给节点的参数)。 一个常见的方法是设置一个名为log_level
(或类似名称) 的ROS参数,并在节点代码中读取它来配置日志记录器,但这需要节点显式支持。 更通用的方法是使用extra_arguments
(较新ROS 2版本) 或通过prefix
来设置环境变量,例如:# In a launch file Node( package='my_package', executable='my_node_exe', name='my_logger_node', # For Foxy/Galactic, setting via arguments to the node's main might be needed # arguments=['--ros-args', '--log-level', 'debug'], # This can be tricky # For Humble and later, a more robust way might be setting an env var or if node supports parameters for logging # prefix=['MY_NODE_LOG_LEVEL=debug'], # If node internally checks this env var # Or using a parameter if the node is designed to accept it: # parameters=[{'log_level': 'debug'}] )
注意: 通过Launch文件直接控制日志级别有时不如命令行或
rqt_logger_level
直观,具体方法可能随ROS 2版本演进。最可靠的方法是使用命令行参数覆盖或rqt_logger_level
。
- 命名日志记录器 (Named Loggers):
rclpy.logging.get_logger('my_custom_logger_name')
允许您创建不直接与特定节点关联的命名日志记录器,或者在节点内部为不同子模块创建不同的日志记录器。 - 日志节流 (Throttling):
logger.info("This message will appear at most once every 5 seconds.", throttle_duration_sec=5.0)
logger.info("This message will appear only if the previous one was >1s ago.", skip_first=True, throttle_time_source_type=rclpy.logging. अभी समय है।)
(RCUTILS_STEADY_TIME
可能是RCUTILS_SYSTEM_TIME
) - 一次性日志 (Once):
logger.info("This message will only be logged once.", once=True)
- 条件日志:
logger.info("Only log if condition is met.", condition=lambda: some_variable > 10)
通过熟练使用RViz2进行3D数据可视化,利用RQT插件监控和与系统交互,以及有效地使用ROS 2的日志系统,您可以大大提高ROS 2应用程序的开发效率、调试能力和可维护性。这些工具是ROS 2生态系统中不可或缺的部分。
在复杂的机器人系统中,节点的启动、关闭和错误处理顺序非常重要。例如,在驱动程序完全初始化之前,控制节点不应该尝试发送命令。ROS 2的生命周期节点(Lifecycle Nodes)为此提供了一个标准的状态机和管理接口,允许节点以一种可控和协调的方式转换其状态。
本教程将介绍生命周期节点的基本概念、状态转换以及如何在Python中使用rclpy.lifecycle.LifecycleNode
来创建和管理它们。
常规的ROS 2节点在启动后立即进入活动状态,并开始执行其功能(例如,发布/订阅、提供服务等)。这种方式对于简单应用可能足够,但在大型系统中存在以下问题:
- 启动顺序: 难以保证依赖节点在主节点之前完全准备好。
- 资源管理: 没有标准方式来处理节点的初始化和清理。
- 错误恢复: 当节点遇到问题时,缺乏标准机制使其进入安全状态或尝试恢复。
- 系统集成: 外部系统(如任务调度器、状态监控器)难以与节点的内部状态同步。
生命周期节点通过定义一组明确的状态和转换来解决这些问题。
一个生命周期节点遵循一个定义良好的状态机,包含以下主要状态:
UNCONFIGURED
(未配置): 节点的初始状态或清理后的状态。在此状态下,节点不应分配大量资源或执行任何功能。INACTIVE
(非活动): 节点已配置并分配了必要的资源,但尚未激活其主要功能。它可以处理参数更改、查询等,但不进行数据处理或通信。ACTIVE
(活动): 节点的主要运行状态。在此状态下,它执行其核心功能,如发布数据、处理传感器输入、提供服务等。FINALIZED
(已终止): 节点的最终状态,表示节点已结束。
还有一些过渡状态,如 CONFIGURING
, ACTIVATING
, DEACTIVATING
, CLEANINGUP
, SHUTTINGDOWN
。
主要状态转换:
configure()
: 从UNCONFIGURED
到INACTIVE
。- 节点应在此转换中加载配置、分配资源(如内存、硬件句柄)、初始化发布者/订阅者等。
activate()
: 从INACTIVE
到ACTIVE
。- 节点应在此转换中启动其主要功能,例如开始发布数据、响应传感器输入。
deactivate()
: 从ACTIVE
到INACTIVE
。- 节点应在此转换中停止其主要功能,但保持资源已分配状态。例如,停止数据发布,但保持连接。
cleanup()
: 从INACTIVE
到UNCONFIGURED
。- 节点应在此转换中释放其在
configure()
期间分配的资源。
- 节点应在此转换中释放其在
shutdown()
: 从任何状态(通常是UNCONFIGURED
,INACTIVE
,ACTIVE
)到FINALIZED
。- 这是一个特殊的转换,用于彻底关闭节点。它会依次尝试调用
on_deactivate
(如果当前是ACTIVE
) 和on_cleanup
(如果当前是INACTIVE
或刚从ACTIVE
转换而来)。
- 这是一个特殊的转换,用于彻底关闭节点。它会依次尝试调用
外部工具或节点可以通过调用生命周期节点提供的标准服务来触发这些状态转换。
在Python中,生命周期节点通过继承 rclpy.lifecycle.LifecycleNode
来创建。您需要重写特定的回调方法来实现在每个状态转换期间的逻辑。
# lifecycle_talker_node.py
import rclpy
from rclpy.lifecycle import LifecycleNode, TransitionCallbackReturn, LifecycleState
from rclpy.node import Node # 用于获取时钟等
from std_msgs.msg import String
import time
class LifecycleTalker(LifecycleNode):
def __init__(self, node_name, **kwargs):
super().__init__(node_name, **kwargs)
self._publisher = None
self._timer = None
self._count = 0
self.get_logger().info(f"LifecycleTalker '{self.get_name()}' created in state: {self.get_current_state().label}")
# --- Lifecycle Transition Callbacks ---
def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info(f"'{self.get_name()}' is in state '{state.label}', transitioning to 'configuring'.")
try:
# 在这里进行配置:创建发布者、加载参数等
self._publisher = self.create_lifecycle_publisher(String, 'lifecycle_chatter', 10)
# 注意:定时器不应在这里创建,因为它会立即开始运行。
# 定时器应该在 on_activate 中创建和启动。
self.get_logger().info("Publisher created.")
# 假设配置成功
return TransitionCallbackReturn.SUCCESS
except Exception as e:
self.get_logger().error(f"Error during configure: {e}")
return TransitionCallbackReturn.FAILURE
def on_activate(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info(f"'{self.get_name()}' is in state '{state.label}', transitioning to 'activating'.")
try:
# 激活功能:启动定时器
if self._publisher is None: # 确保已配置
self.get_logger().error("Publisher not configured. Cannot activate.")
return TransitionCallbackReturn.ERROR # 或 FAILURE
# 调用父类的 on_activate 是一个好习惯,它会激活所有通过 create_lifecycle_publisher 创建的发布者
super().on_activate(state) # This will activate the lifecycle publisher
# 创建并启动定时器
self._timer = self.create_timer(1.0, self.timer_callback)
self.get_logger().info("Timer started, node is now active and publishing.")
return TransitionCallbackReturn.SUCCESS
except Exception as e:
self.get_logger().error(f"Error during activate: {e}")
return TransitionCallbackReturn.FAILURE
def on_deactivate(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info(f"'{self.get_name()}' is in state '{state.label}', transitioning to 'deactivating'.")
try:
# 停用功能:停止定时器
if self._timer is not None:
self._timer.cancel()
self._timer = None # 清除引用
self.get_logger().info("Timer stopped.")
# 调用父类的 on_deactivate 是一个好习惯,它会停用所有 lifecycle publishers
super().on_deactivate(state)
self.get_logger().info("Node is now inactive.")
return TransitionCallbackReturn.SUCCESS
except Exception as e:
self.get_logger().error(f"Error during deactivate: {e}")
return TransitionCallbackReturn.FAILURE
def on_cleanup(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info(f"'{self.get_name()}' is in state '{state.label}', transitioning to 'cleaning up'.")
try:
# 清理资源:销毁发布者、定时器
if self._timer is not None: # 确保定时器已停止
self._timer.cancel()
self._timer = None
if self._publisher is not None:
self.destroy_publisher(self._publisher) # 手动销毁 create_lifecycle_publisher 创建的发布者
self._publisher = None
self.get_logger().info("Publisher and timer cleaned up.")
return TransitionCallbackReturn.SUCCESS
except Exception as e:
self.get_logger().error(f"Error during cleanup: {e}")
return TransitionCallbackReturn.FAILURE
def on_shutdown(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info(f"'{self.get_name()}' is in state '{state.label}', transitioning to 'shutting down'.")
try:
# 这是最后的清理机会,通常会尝试执行 on_deactivate 和 on_cleanup 的逻辑
# 您可以在这里添加特定于关闭的最终清理
if self._timer is not None:
self._timer.cancel()
self._timer = None
if self._publisher is not None:
self.destroy_publisher(self._publisher)
self._publisher = None
self.get_logger().info("Node shutdown gracefully.")
return TransitionCallbackReturn.SUCCESS
except Exception as e:
self.get_logger().error(f"Error during shutdown: {e}")
return TransitionCallbackReturn.FAILURE
def on_error(self, state: LifecycleState) -> TransitionCallbackReturn:
# 当任何转换回调返回ERROR或引发未捕获的异常时调用
self.get_logger().error(f"An error occurred while in state '{state.label}'. Transitioning to error processing.")
# 通常,节点会尝试进入 UNCONFIGURED 状态
# 这里的实现可以更复杂,例如尝试恢复或记录详细错误信息
return TransitionCallbackReturn.SUCCESS # 表示错误处理本身成功了
# --- 业务逻辑 ---
def timer_callback(self):
if self._publisher is not None and self._publisher.is_activated:
msg = String()
msg.data = f'Lifecycle Hello World: {self._count}'
self._publisher.publish(msg)
self.get_logger().info(f'Publishing: "{msg.data}" (from state: {self.get_current_state().label})')
self._count += 1
else:
self.get_logger().warn("Timer callback called but publisher is not active or not set.")
def main(args=None):
rclpy.init(args=args)
# 生命周期节点通常需要自己的执行器来处理其内部服务调用(用于状态转换)
executor = rclpy.executors.SingleThreadedExecutor()
lifecycle_node = LifecycleTalker('lc_talker')
executor.add_node(lifecycle_node)
try:
# spin()会使节点保持活动,并响应外部的生命周期管理命令
# 它不会自动触发状态转换,这些转换需要外部通过服务调用来发起
executor.spin()
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
pass
finally:
lifecycle_node.get_logger().info("Shutting down lifecycle node...")
# 尝试优雅地关闭节点
# 注意:如果节点正处于某个转换中,destroy_node可能需要等待
# 在实际应用中,可能需要更复杂的关闭逻辑来确保所有转换完成
if lifecycle_node.get_current_state().label != "finalized":
lifecycle_node.on_shutdown(lifecycle_node.get_current_state()) # 手动触发关闭逻辑
lifecycle_node.destroy_node() # 这会处理一些内部清理
if executor.context.ok(): # 检查执行器上下文是否仍然有效
executor.shutdown()
rclpy.try_shutdown() # 尝试关闭rclpy,如果它尚未关闭
if __name__ == '__main__':
main()
on_configure(self, state: LifecycleState) -> TransitionCallbackReturn
: 配置资源。on_activate(self, state: LifecycleState) -> TransitionCallbackReturn
: 激活功能。on_deactivate(self, state: LifecycleState) -> TransitionCallbackReturn
: 停用功能。on_cleanup(self, state: LifecycleState) -> TransitionCallbackReturn
: 清理资源。on_shutdown(self, state: LifecycleState) -> TransitionCallbackReturn
: 最终关闭。on_error(self, state: LifecycleState) -> TransitionCallbackReturn
: 错误处理。
每个回调都接收当前的 LifecycleState
对象作为参数,并必须返回一个 TransitionCallbackReturn
枚举值:
TransitionCallbackReturn.SUCCESS
: 转换成功,节点进入目标状态。TransitionCallbackReturn.FAILURE
: 转换失败,节点通常会回到转换开始前的状态。TransitionCallbackReturn.ERROR
: 转换过程中发生严重错误,节点会进入ERRORPROCESSING
中间状态,并调用on_error
回调。
LifecycleNode
提供了 create_lifecycle_publisher()
方法。这种类型的发布者与节点的生命周期状态相关联:
- 它只在节点处于
ACTIVE
状态时才实际发布消息。 - 在
on_activate
中,需要调用super().on_activate(state)
(或手动激活发布者)。 - 在
on_deactivate
中,需要调用super().on_deactivate(state)
(或手动停用发布者)。 - 在
on_cleanup
(或on_shutdown
) 中,需要像普通发布者一样使用self.destroy_publisher(self._publisher)
来销毁它。
self.get_current_state() -> LifecycleState
: 获取节点当前的生命周期状态。LifecycleState
对象有id
(数字) 和label
(字符串) 属性。
ROS 2 提供了 ros2 lifecycle
命令行工具来与正在运行的生命周期节点交互并触发其状态转换。一个生命周期节点会自动提供一组标准服务,用于管理其状态。
假设 LifecycleTalker
节点正在运行,其ROS节点名为 lc_talker
。
-
获取当前状态:
ros2 lifecycle get /lc_talker
输出应为
unconfigured [1]
(或其当前状态)。 -
获取可用转换:
ros2 lifecycle list /lc_talker
会列出从当前状态可以进行的转换,例如,从
unconfigured
可以进行configure
。 -
触发状态转换: 使用
ros2 lifecycle set <node_name> <transition_name>
。-
Configure:
ros2 lifecycle set /lc_talker configure
如果
on_configure
返回SUCCESS
,节点将进入inactive
状态。ros2 lifecycle get /lc_talker
应显示inactive [2]
。 -
Activate:
ros2 lifecycle set /lc_talker activate
如果
on_activate
返回SUCCESS
,节点将进入active
状态,并开始发布消息。ros2 lifecycle get /lc_talker
应显示active [3]
。 您可以使用ros2 topic echo /lifecycle_chatter
来查看发布的消息。 -
Deactivate:
ros2 lifecycle set /lc_talker deactivate
节点进入
inactive
状态,应停止发布消息。 -
Cleanup:
ros2 lifecycle set /lc_talker cleanup
节点进入
unconfigured
状态。 -
Shutdown:
ros2 lifecycle set /lc_talker shutdown
节点将尝试优雅关闭并进入
finalized
状态。此后,该节点通常不再响应生命周期命令。
-
注意:
<node_name>
是ROS网络中节点的完全限定名称。<transition_name>
是小写的转换名称 (例如configure
,activate
,deactivate
,cleanup
,shutdown
)。
Launch文件可以用于启动生命周期节点。更高级的用法是使用Launch事件系统或专门的生命周期管理器节点(如 nav2_lifecycle_manager
)来自动协调多个生命周期节点的启动和转换顺序。
简单的Launch文件启动:
# launch_lifecycle_node.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
Node(
package='my_lifecycle_pkg', # 您的包名
executable='lifecycle_talker_executable', # 您的可执行文件名
name='lc_talker', # ROS节点名
output='screen',
emulate_tty=True,
# 生命周期节点通常不需要额外的参数来启动,
# 状态管理通过服务进行
)
])
启动后,您仍然需要使用 ros2 lifecycle set
命令或另一个节点来管理其状态。
- 驱动程序: 传感器或执行器的驱动程序是生命周期节点的理想用例。
configure
打开设备,activate
开始数据流/运动,deactivate
停止,cleanup
关闭设备。 - 复杂算法模块: 计算密集型模块(如SLAM、路径规划)可以在
inactive
状态加载模型和数据,在active
状态处理请求。 - 系统编排: 在大型系统中,使用生命周期管理器来确保组件按正确顺序配置和激活。
- 故障恢复: 当某个组件失败时,可以将其转换到
inactive
或unconfigured
状态,尝试重新配置,或安全地关闭相关部分。
- 保持回调简短: 转换回调应该快速执行。如果需要长时间操作,应在回调中启动一个异步任务(例如,新线程),并在任务完成后报告转换的最终成功或失败。
- 资源管理: 严格遵守在
configure
中分配资源,在cleanup
中释放它们。 - 错误处理: 在回调中妥善处理异常,并返回适当的
TransitionCallbackReturn
值。 - 幂等性: 某些转换(如
activate
后再次activate
)可能没有效果,或应该被优雅处理。 - 父类调用: 在
on_activate
和on_deactivate
中,如果使用了create_lifecycle_publisher
,记得调用super().on_activate(state)
和super().on_deactivate(state)
来激活/停用这些发布者。
生命周期节点为构建健壮、可管理的ROS 2系统提供了一个强大的框架。虽然它们比常规节点需要更多的样板代码,但在需要精确控制节点状态和转换的复杂应用中,其带来的好处是显著的。
编写清晰、可维护、高效的ROS 2 Python节点和功能包对于构建健壮的机器人应用程序至关重要。本教程将总结一些在Python中进行ROS 2开发的最佳实践和代码组织建议,帮助您编写更高质量的ROS 2代码。
遵循PEP 8: PEP 8是Python代码的官方风格指南。遵循它能让您的代码更易读,也更易于被其他Python开发者理解和协作。
- 工具: 使用诸如
flake8
,pylint
,black
,autopep8
等工具可以帮助您检查和自动格式化代码以符合PEP 8。flake8
(结合pycodestyle
和pyflakes
):检查风格和基本错误。black
: 一个“不妥协的”代码格式化器,它会强制采用一种特定的风格。- 许多IDE(如VS Code, PyCharm)都集成了这些工具或有相应的插件。
- 关键点:
- 使用4个空格进行缩进。
- 行长度限制(通常79或88个字符,
black
默认为88)。 - 合适的空行来组织代码块。
- 清晰的命名约定(函数用小写加下划线
snake_case
,类用驼峰式CapWords
,常量用全大写ALL_CAPS
)。 - 有意义的变量名和函数名。
- 适当的注释和文档字符串 (docstrings)。
Python是动态类型语言,但从Python 3.5开始引入了类型提示(Type Hinting),允许您为变量、函数参数和返回值添加类型信息。
优势:
- 提高代码可读性: 类型提示使代码的意图更清晰,更容易理解函数期望什么类型的输入以及返回什么。
- 静态分析与错误检测: 工具如
mypy
可以使用类型提示进行静态类型检查,在代码运行前发现潜在的类型错误。 - 改善IDE支持: IDE可以利用类型提示提供更好的自动补全、代码导航和重构支持。
- 文档作用: 类型提示本身就是一种形式的文档。
在ROS 2 Python节点中的应用:
import rclpy
from rclpy.node import Node
from rclpy.publisher import Publisher # 导入具体的类型
from rclpy.timer import Timer # 导入Timer类型
from std_msgs.msg import String
from typing import Optional, List # 用于更复杂的类型
class TypedPublisherNode(Node):
# 为类成员变量添加类型提示
publisher_: Publisher
timer_: Timer
count_: int
def __init__(self) -> None: # -> None 表示构造函数不返回值
super().__init__('typed_publisher_node')
self.count_ = 0
# 明确指定发布者和定时器的类型
self.publisher_ = self.create_publisher(String, 'typed_topic', 10)
self.timer_ = self.create_timer(0.5, self.timer_callback)
self.get_logger().info("TypedPublisherNode has been started.")
def timer_callback(self) -> None: # 回调不返回值
msg = String()
msg.data = f'Typed Hello: {self.count_}'
self.publisher_.publish(msg)
self.get_logger().info(f'Publishing: "{msg.data}"')
self.count_ += 1
# 示例:一个带类型提示的辅助方法
def format_message(self, prefix: str, count_val: int) -> str:
return f"{prefix}: {count_val}"
# 示例:可选参数和返回可选类型
def get_last_message(self) -> Optional[String]:
# 假设这里有逻辑获取最后一条消息,如果没有则返回None
if self.count_ > 0:
last_msg = String()
last_msg.data = f'Last was: {self.count_ -1}'
return last_msg
return None
def main(args: Optional[List[str]] = None) -> None: # main函数也可以有类型提示
rclpy.init(args=args)
node = TypedPublisherNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
from typing import ...
: 导入Optional
,List
,Dict
,Callable
等复杂类型。mypy
: 安装mypy
(pip install mypy
) 并在您的代码上运行它 (mypy your_script.py
或mypy your_package_dir
) 来进行类型检查。
在ROS 2中,回调函数的参数类型通常由rclpy
库隐式定义(例如,订阅回调的第一个参数是消息对象),为这些参数添加类型提示可以极大地帮助理解和静态检查。
将大型节点或复杂逻辑分解为更小的、可管理的模块(Python文件、类、函数)可以提高代码的可维护性和可测试性。
始终将ROS 2节点的主要逻辑封装在一个继承自 rclpy.node.Node
的类中。这提供了良好的组织结构。
对于与ROS无关的复杂计算、数据处理或业务逻辑,可以将其提取到单独的Python类或函数中,并由ROS节点类调用。
utils.py
: 创建一个utils.py
文件来存放通用的辅助函数。- 特定功能模块: 如果有复杂的算法(例如,一个特定的滤波器、一个路径规划算法的纯Python实现),可以将其放在自己的模块中。
示例包结构:
my_robot_controller_pkg/
├── my_robot_controller_pkg/ # Python 模块目录
│ ├── __init__.py
│ ├── robot_driver_node.py # 主ROS节点
│ ├── motor_controller.py # 电机控制逻辑 (非ROS类)
│ ├── sensor_processor.py # 传感器数据处理逻辑 (非ROS类)
│ └── utils.py # 通用工具函数
├── launch/
│ └── robot_controller.launch.py
├── config/
│ └── controller_params.yaml
├── test/
├── package.xml
└── setup.py
在 robot_driver_node.py
中:
# from .motor_controller import MotorController # 从同包导入
# from .sensor_processor import SensorProcessor
# from .utils import calculate_something
class RobotDriverNode(Node):
def __init__(self):
super().__init__('robot_driver')
# self.motor_ctrl = MotorController(...)
# self.sensor_proc = SensorProcessor(...)
# ...
如前一教程所述,使用ROS 2组件是将功能模块化的另一种强大方式,特别适用于需要进程内通信以提高性能的场景。
在ROS 2节点中,健壮的错误处理非常重要,以防止节点意外崩溃。
try...except
: 捕获可能发生的特定异常。- 捕获ROS相关的异常,如
rclpy.exceptions.ParameterNotDeclaredException
,rclpy.exceptions.ServiceException
。 - 捕获Python内置异常,如
ValueError
,IOError
。 - 记录错误信息:
self.get_logger().error(f"An error occurred: {e}")
。
- 捕获ROS相关的异常,如
finally
: 确保资源(如文件句柄、锁、硬件连接)在任何情况下都能被正确释放。- 在
main
函数的try...finally
块中,确保node.destroy_node()
和rclpy.shutdown()
被调用。
- 在
# ... 在节点方法中 ...
def process_file(self, filepath: str) -> bool:
file_handle = None
try:
file_handle = open(filepath, 'r')
# ... process file ...
self.get_logger().info(f"Successfully processed {filepath}")
return True
except FileNotFoundError:
self.get_logger().error(f"File not found: {filepath}")
return False
except IOError as e:
self.get_logger().error(f"IOError while processing {filepath}: {e}")
return False
except Exception as e: # 捕获其他意外异常
self.get_logger().fatal(f"Unexpected error processing {filepath}: {e}")
# 也许需要更激烈的措施,如使节点进入错误状态
return False
finally:
if file_handle:
file_handle.close()
self.get_logger().debug(f"Closed file {filepath}")
当节点不再需要某个发布者、订阅者、定时器、服务或动作客户端/服务端时,或者在节点关闭前,应该显式地销毁它们。
self.destroy_publisher(my_publisher)
self.destroy_subscription(my_subscription)
self.destroy_timer(my_timer)
(或者my_timer.cancel()
后让其被垃圾回收)self.destroy_service(my_service)
self.destroy_client(my_client)
self.destroy_action_server(my_action_server)
self.destroy_action_client(my_action_client)
虽然 rclpy
在节点销毁时会尝试清理这些资源,但显式销毁是良好的编程习惯,尤其是在复杂的生命周期管理或动态创建/销毁ROS实体时。通常在节点的 destroy_node()
方法(如果重写)或 on_cleanup
/ on_shutdown
(对于生命周期节点) 中执行。
class MyNodeWithResources(Node):
def __init__(self):
super().__init__('resource_node')
self.pub_ = self.create_publisher(...)
self.timer_ = self.create_timer(...)
# ...
def destroy_node(self): # 重写Node的destroy_node方法
self.get_logger().info("Custom destroy_node called.")
if hasattr(self, 'timer_') and self.timer_:
self.timer_.cancel() # 首先取消定时器
# self.destroy_timer(self.timer_) # destroy_timer通常在Node的基类destroy_node中处理
if hasattr(self, 'pub_') and self.pub_:
self.destroy_publisher(self.pub_)
# ...销毁其他资源...
super().destroy_node() # 调用父类的destroy_node以完成标准清理
对于需要精细控制资源分配和释放的节点(例如,与硬件交互的驱动程序),使用生命周期节点(Lifecycle Nodes)是最佳实践。
- 在
on_configure()
中分配资源。 - 在
on_cleanup()
中释放资源。 - 在
on_shutdown()
中确保所有资源都已释放。
- 声明所有参数: 在节点初始化时使用
declare_parameter()
或declare_parameters()
声明所有期望的参数及其默认值和描述符。 - 使用参数回调: 对于需要在设置时进行验证或触发特定行为的参数,使用
add_on_set_parameters_callback()
。 - YAML文件: 将默认配置和部署配置存储在YAML参数文件中,并通过Launch文件加载。
- 命名空间: 使用参数命名空间来组织相关的参数 (例如
motion_controller.pid.kp
)。
- 一个包一个或多个Launch文件: 将相关的节点启动逻辑组织在同一个Launch文件中。
- 使用
IncludeLaunchDescription
: 将大型系统分解为模块化的Launch文件。 - Launch参数: 使用
DeclareLaunchArgument
和LaunchConfiguration
使Launch文件可配置。 get_package_share_directory
: 可靠地引用包内的文件(如YAML配置、其他Launch文件、URDF)。- 注释: 为复杂的Launch逻辑添加注释。
为您的ROS 2 Python节点和功能包编写测试是确保其正确性和稳定性的关键。
- 单元测试 (Unit Tests):
- 使用Python的
unittest
或pytest
框架。 - 测试非ROS相关的辅助类和函数。
- 对于ROS节点,可以实例化节点类(可能需要mock
rclpy
的部分功能或在测试环境中初始化rclpy
),然后测试其方法的逻辑。
- 使用Python的
- 集成测试 (Integration Tests):
- 使用
launch_testing
包,它允许您编写测试来启动一组节点(通过Launch文件),然后在它们之间发送消息、调用服务等,并断言系统的行为。 launch_testing_ros
提供了与ROS 2更紧密集成的功能。
- 使用
- 测试覆盖率: 使用工具(如
coverage.py
)来衡量测试覆盖了多少代码。
在 package.xml
中声明测试依赖:
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<test_depend>launch_testing</test_depend>
<test_depend>launch_testing_ros</test_depend>
在 setup.py
中配置测试(通常由 ament_python
自动处理)。
- 文档字符串 (Docstrings): 为所有公共模块、类、函数和方法编写清晰的文档字符串。遵循PEP 257。
- 使用Sphinx等工具可以从文档字符串生成漂亮的HTML文档。
- README文件: 在包的根目录下提供一个
README.md
文件,解释包的用途、如何构建、如何运行示例以及其接口(发布的/订阅的主题、提供的服务/动作、参数等)。 - Launch文件文档: 复杂的Launch文件也应该有注释或附带的文档来解释其配置选项和行为。
- 使用Git: 将您的ROS 2功能包纳入Git版本控制系统。
.gitignore
: 创建一个.gitignore
文件来忽略构建产物(如build/
,install/
,log/
目录)、Python缓存文件 (__pycache__/
,*.pyc
)和IDE特定的配置文件。- 有意义的提交信息: 编写清晰、简洁的Git提交信息。
- 分支策略: 对于新功能或修复,使用特性分支。
回调函数(用于订阅、定时器、服务、动作)应该尽快执行完毕。如果一个回调阻塞时间过长:
- 对于单线程执行器: 它会阻塞所有其他回调的执行。
- 对于多线程执行器: 它会占用一个线程,如果所有线程都被阻塞,新的回调将无法及时处理。
处理耗时任务的方法:
- 单独的线程: 在回调中启动一个新的Python线程 (
threading.Thread
) 来执行耗时操作。注意线程安全。 - ROS 2动作 (Actions): 如果任务本身适合动作模型(有目标、反馈、结果),则使用动作。
- 异步编程 (
async/await
): (将在后续教程中详细介绍)对于I/O密集型任务,使用async/await
结合支持异步的执行器可以提高响应性,而不会阻塞线程。 - 任务队列: 将任务放入队列,由一个或多个工作线程处理。
遵循这些最佳实践将帮助您创建更可靠、可维护和高效的ROS 2 Python应用程序。虽然有些实践需要额外的工作,但它们在项目的长期发展中会带来巨大的回报。
当机器人系统变得复杂,包含多个节点、参数配置、重映射等时,手动逐个启动和配置它们会变得非常繁琐且容易出错。ROS 2的Launch系统(启动系统)允许您通过编写Python脚本来描述和启动整个应用程序或其中的一部分。这些Python脚本通常以 .launch.py
结尾。本教程将深入探讨如何使用Python Launch System来管理复杂的ROS 2应用。
- 自动化启动: 自动启动一个或多个ROS 2节点。
- 参数配置: 为节点加载参数,可以来自YAML文件或直接在Launch文件中指定。
- 命名空间与重映射: 为节点设置命名空间(Namespaces)和主题/服务/动作的重映射(Remappings)。
- 条件启动: 根据条件(例如环境变量、Launch参数)决定是否启动某些节点或配置。
- 事件处理: 响应系统事件,如节点启动、退出等,并执行相应操作。
- 模块化与复用: 将复杂的启动逻辑分解为多个可复用的Launch文件,并通过包含(Include)机制组合它们。
- 可扩展性: Python Launch System非常灵活,允许您使用Python的全部功能来编写复杂的启动逻辑。
ROS 2主要推荐使用Python编写Launch文件,因为它提供了比ROS 1 XML Launch文件更强大的编程能力。
一个Python Launch文件是一个标准的Python脚本,它必须定义一个名为 generate_launch_description()
的函数。这个函数返回一个 launch.LaunchDescription
对象,该对象包含了要执行的所有启动“动作”(Actions)。
# my_simple_launch.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node # 用于启动ROS节点的Action
def generate_launch_description():
# 创建一个LaunchDescription对象
ld = LaunchDescription()
# 创建一个Node Action来启动一个节点
my_node_action = Node(
package='my_package_name', # 节点所在的功能包名
executable='my_node_executable', # 节点的可执行文件名 (在setup.py中定义的)
name='custom_node_name', # (可选) 覆盖节点内部设置的名称
output='screen', # (可选) 将节点的stdout/stderr输出到屏幕
emulate_tty=True, # (可选) 模拟TTY,使颜色输出等正常工作
)
# 将Node Action添加到LaunchDescription中
ld.add_action(my_node_action)
# 可以添加更多的Action
# another_node_action = Node(...)
# ld.add_action(another_node_action)
return ld
关键组件:
from launch import LaunchDescription
: 导入LaunchDescription
类。from launch_ros.actions import Node
: 导入Node
Action,专门用于启动ROS 2节点。还有其他类型的Action。generate_launch_description()
: Launch系统会调用这个函数来获取启动描述。LaunchDescription()
: 这是所有启动动作的容器。Node(...)
: 一个Action,代表启动一个ROS 2节点。package
: 功能包名称。executable
: 节点的可执行文件名称(在Python包的setup.py
中通过entry_points
定义的脚本名)。name
(可选): 在ROS网络中为该节点设置的名称。如果指定,它会覆盖节点代码中super().__init__('node_name_in_code')
设置的名称。namespace
(可选): 为此节点设置命名空间。parameters
(可选): 设置节点参数(见下文)。remappings
(可选): 设置主题/服务/动作的重映射(见下文)。output
(可选):'screen'
表示将节点的标准输出和标准错误流直接打印到启动它的终端。'log'
表示输出到ROS日志文件。默认为'log'
。emulate_tty
(可选): 通常与output='screen'
一起使用,以确保颜色代码等终端特性正常工作。
ld.add_action(action_object)
: 将一个Action添加到启动描述中。
除了 Node
Action,还有其他一些常用的Actions:
用于执行任意的Shell命令或可执行文件。
from launch.actions import ExecuteProcess
# ... in generate_launch_description() ...
execute_action = ExecuteProcess(
cmd=['echo', 'Hello from Launch!'], # 要执行的命令及其参数列表
output='screen'
)
ld.add_action(execute_action)
# 启动一个ROS 1节点 (如果安装了 ros1_bridge)
# ros1_node_action = ExecuteProcess(
# cmd=['rosrun', 'rospy_tutorials', 'talker.py'],
# output='screen'
# )
# ld.add_action(ros1_node_action)
用于包含(导入)另一个Launch文件。这使得Launch文件可以模块化。
from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
import os
from ament_index_python.packages import get_package_share_directory
# ... in generate_launch_description() ...
# 获取要包含的launch文件的路径
other_package_name = 'another_package'
included_launch_file_path = os.path.join(
get_package_share_directory(other_package_name),
'launch', # 假设launch文件在 'launch' 目录下
'another_example.launch.py'
)
include_action = IncludeLaunchDescription(
PythonLaunchDescriptionSource(included_launch_file_path),
# (可选) 为被包含的launch文件传递参数
launch_arguments={'some_argument_for_included_launch': 'value'}.items()
)
ld.add_action(include_action)
PythonLaunchDescriptionSource
: 用于指定要包含的Python Launch文件的路径。还有XMLLaunchDescriptionSource
和YAML ਲunchDescriptionSource
。launch_arguments
: 可以将被包含Launch文件暴露的Launch参数(见下文)设置为特定值。
设置一个环境变量,该变量对此Launch文件后续启动的进程有效。
from launch.actions import SetEnvironmentVariable
# ... in generate_launch_description() ...
set_env_var_action = SetEnvironmentVariable(
name='MY_CUSTOM_ENV_VAR',
value='some_value'
)
ld.add_action(set_env_var_action) # 通常放在其他Action之前
Node
Action的 parameters
参数允许您为节点设置参数。它可以是一个字典列表或包含YAML文件路径的列表。
from launch_ros.actions import Node
import os
from ament_index_python.packages import get_package_share_directory
# ... in generate_launch_description() ...
my_package_name = 'my_package_name'
param_config_file = os.path.join(
get_package_share_directory(my_package_name),
'config',
'my_node_params.yaml' # 您的参数YAML文件
)
parameter_node_action = Node(
package=my_package_name,
executable='my_parameter_node_exe',
name='parameter_node',
output='screen',
parameters=[
param_config_file, # 从YAML文件加载
{'some_other_param': 123}, # 直接指定参数值
{'group.nested_param': 'hello'} # 带命名空间的参数
]
)
ld.add_action(parameter_node_action)
参数会按照列表中的顺序被加载和覆盖。
为节点设置命名空间,将其所有相对名称的主题、服务等都限定在该命名空间下。
from launch_ros.actions import Node
# ... in generate_launch_description() ...
namespaced_node_action = Node(
package='my_package_name',
executable='my_node_executable',
name='node_in_ns',
namespace='robot1' # 设置命名空间
)
ld.add_action(namespaced_node_action)
此节点发布的主题 chatter
将变为 /robot1/chatter
。
改变节点内部使用的原始主题/服务/动作名称到ROS网络中实际使用的名称。
remappings
参数是一个元组列表,每个元组是 ('from_name', 'to_name')
。
from launch_ros.actions import Node
# ... in generate_launch_description() ...
remapped_node_action = Node(
package='my_package_name',
executable='my_node_executable', # 假设此节点内部使用 'input_topic' 和 'output_topic'
name='remapped_node',
remappings=[
('input_topic', '/global_sensor_data'), # 将内部的 'input_topic' 映射到 '/global_sensor_data'
('output_topic', 'processed_data'), # 将内部的 'output_topic' 映射到 'processed_data' (相对名称)
# 可以为服务、动作重映射,例如:
# ('internal_service_name', 'external_service_name'),
]
)
ld.add_action(remapped_node_action)
Launch参数允许您在调用Launch文件时从外部传递值,使Launch文件更加灵活和可配置。
-
声明Launch参数 (
DeclareLaunchArgument
): 在generate_launch_description()
函数的开头声明Launch文件接受的参数。from launch.actions import DeclareLaunchArgument from launch.substitutions import LaunchConfiguration # 用于获取Launch参数的值 # ... in generate_launch_description() ... # 声明一个名为 'use_simulation' 的Launch参数 declare_use_sim_arg = DeclareLaunchArgument( 'use_simulation', default_value='false', # 默认值 description='Set to "true" to use simulation time, "false" for real time.' ) ld.add_action(declare_use_sim_arg) # 声明另一个参数 declare_robot_name_arg = DeclareLaunchArgument( 'robot_name', default_value='my_default_robot', description='Name of the robot.' ) ld.add_action(declare_robot_name_arg)
DeclareLaunchArgument(name, default_value=None, description=None, choices=None)
name
: Launch参数的名称。default_value
: 如果未从外部提供,则使用此默认值。description
: 对参数用途的描述。choices
: (可选) 一个字符串列表,限制此参数的有效值。
-
使用Launch参数的值 (
LaunchConfiguration
): 使用LaunchConfiguration('param_name')
来在Launch文件的其他部分(例如Node
Action的参数中)引用Launch参数的值。这是一个“替换”(Substitution),它的值将在Launch文件执行时被解析。from launch.substitutions import LaunchConfiguration from launch_ros.actions import Node # ... (在声明Launch参数之后) ... # 获取Launch参数 'robot_name' 的值 robot_name_lc = LaunchConfiguration('robot_name') use_sim_lc = LaunchConfiguration('use_simulation') # 在Node的参数中使用LaunchConfiguration sim_time_node_param = {'use_sim_time': use_sim_lc} # 为节点设置 use_sim_time ROS参数 my_robot_node_action = Node( package='my_package_name', executable='robot_driver_exe', name=robot_name_lc, # 将节点名设置为Launch参数的值 namespace=robot_name_lc, # 将命名空间也设置为robot_name parameters=[ sim_time_node_param, # {'another_node_param': LaunchConfiguration('some_other_launch_arg')} ] ) ld.add_action(my_robot_node_action)
从命令行传递Launch参数:
使用 ros2 launch
时,可以通过 argument_name:=value
的形式传递Launch参数:
ros2 launch my_package_name my_launch_file.launch.py use_simulation:=true robot_name:=my_awesome_robot
在包含的Launch文件中传递Launch参数:
如前所述,IncludeLaunchDescription
的 launch_arguments
参数用于此目的。
允许根据一个条件(通常是 LaunchConfiguration
的布尔值)来决定是否执行某个Action。
from launch.actions import DeclareLaunchArgument, GroupAction
from launch.conditions import IfCondition, UnlessCondition
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
# ... in generate_launch_description() ...
# 假设已声明 'start_motor_driver' Launch参数 (true/false)
declare_start_motor_arg = DeclareLaunchArgument('start_motor_driver', default_value='true')
ld.add_action(declare_start_motor_arg)
start_motor_driver_lc = LaunchConfiguration('start_motor_driver')
motor_driver_node = Node(
package='my_drivers',
executable='motor_driver_node',
condition=IfCondition(start_motor_driver_lc) # 仅当start_motor_driver为'true'时启动
)
ld.add_action(motor_driver_node)
# 使用GroupAction来对一组Action应用同一个条件
simulation_group = GroupAction(
actions=[
Node(package='gazebo_ros', executable='spawn_entity.py', arguments=['...']),
Node(package='my_sim_tools', executable='sim_setup_node')
],
condition=IfCondition(LaunchConfiguration('use_simulation')) # 假设有 use_simulation 参数
)
ld.add_action(simulation_group)
# UnlessCondition: 当条件为false时执行
real_robot_interface_node = Node(
package='my_hardware_interface',
executable='real_robot_if_node',
condition=UnlessCondition(LaunchConfiguration('use_simulation'))
)
ld.add_action(real_robot_interface_node)
IfCondition(predicate)
: 当predicate
(通常是一个解析为'true'
或'false'
的LaunchConfiguration
) 为真时,Action被执行。UnlessCondition(predicate)
: 当predicate
为假时,Action被执行。GroupAction
: 可以将多个Action组合在一起,并对整个组应用一个条件或作用域(例如,统一设置命名空间)。
Launch系统有一个事件处理机制,允许您响应各种事件(如进程启动、退出、标准输出等)并执行相应的Action。这是一个高级特性,通常用于更复杂的编排。
from launch.actions import RegisterEventHandler, EmitEvent
from launch.event_handlers import OnProcessExit, OnProcessStart
from launch.events import Shutdown
from launch_ros.actions import Node
# ... in generate_launch_description() ...
# 启动一个关键节点
critical_node = Node(
package='my_critical_pkg',
executable='critical_process',
name='critical_node'
)
ld.add_action(critical_node)
# 注册一个事件处理器:当critical_node退出时,关闭整个Launch
shutdown_on_critical_exit = RegisterEventHandler(
event_handler=OnProcessExit(
target_action=critical_node, # 监控这个Action (Node也是一种Action)
on_exit=[
EmitEvent(event=Shutdown(reason='Critical node exited')) # 发出一个Shutdown事件
]
)
)
ld.add_action(shutdown_on_critical_exit)
# 注册另一个事件处理器:当某个节点启动时,打印一条消息
# print_on_start = RegisterEventHandler(
# event_handler=OnProcessStart(
# target_action=some_other_node, # 假设 some_other_node 已定义
# on_start=[
# LogInfo(msg="Some other node has started successfully!")
# ]
# )
# )
# ld.add_action(print_on_start)
RegisterEventHandler
: 注册一个事件处理器。event_handler
: 指定要处理的事件类型及其条件和响应。OnProcessExit
: 当指定的Action(通常是Node
或ExecuteProcess
)代表的进程退出时触发。OnProcessStart
: 当进程启动时触发。- 还有
OnExecutionComplete
,OnIncludeLaunchDescription
,OnShutdown
等。
on_exit
/on_start
等: 一个Action列表,当事件发生时执行这些Action。EmitEvent(event=...)
: 发出一个新的事件,例如Shutdown
。
- 一个包可以有多个Launch文件: 将它们放在包内的
launch/
目录下。 - 使用
IncludeLaunchDescription
: 将大型系统分解为更小、可管理的、可复用的Launch文件。例如,一个用于启动机器人硬件驱动,一个用于启动导航栈,一个用于启动感知模块。 - 使用Launch参数: 使Launch文件可配置,避免硬编码值。
- 使用
get_package_share_directory
: 可靠地获取其他包中文件(如参数YAML、其他Launch文件、URDF模型)的路径。from ament_index_python.packages import get_package_share_directory # path = get_package_share_directory('package_name')
使用 ros2 launch
命令:
ros2 launch <package_name> <launch_file_name.launch.py> [launch_arguments ...]
例如:
ros2 launch my_package_name my_simple_launch.launch.py
ros2 launch my_package_name my_configurable_launch.launch.py robot_name:=robby use_simulation:=true
按 Ctrl+C
可以关闭由Launch文件启动的所有进程。
通过本教程,您应该对ROS 2的Python Launch System有了全面的了解,包括如何启动节点、配置参数、设置重映射、使用Launch参数、条件启动以及组织复杂的启动描述。这是高效管理和部署ROS 2应用程序的关键工具。