av_msgs/msg/Odometry 是 ROS (Robot Operating System) 中一个极其重要和基础的消息类型。它用于表示机器人在空间中的位姿(位置和姿态)和速度(线速度和角速度)的估计值,并且包含了这些估计值的不确定性(协方差)。
简单来说,它回答了以下几个核心问题:
机器人现在在哪里? (位置) 机器人现在朝向哪里? (姿态/方向) 机器人正在以多快的速度移动? (线速度) 机器人正在以多快的速度转动? (角速度) 我们对这些估计有多大的信心? (协方差)
首先执行以下命令查看该数据格式:
ros2 interface show nav_msgs/msg/Odometry
然后我们会得到:
# This represents an estimate of a position and velocity in free space. # The pose in this message should be specified in the coordinate frame given by header.frame_id # The twist in this message should be specified in the coordinate frame given by the child_frame_id # Includes the frame id of the pose parent. std_msgs/Header header builtin_interfaces/Time stamp int32 sec uint32 nanosec string frame_id # Frame id the pose points to. The twist is in this coordinate frame. string child_frame_id # Estimated pose that is typically relative to a fixed world frame. geometry_msgs/PoseWithCovariance pose Pose pose Point position float64 x float64 y float64 z Quaternion orientation float64 x 0 float64 y 0 float64 z 0 float64 w 1 float64[36] covariance # Estimated linear and angular velocity relative to child_frame_id. geometry_msgs/TwistWithCovariance twist Twist twist Vector3 linear float64 x float64 y float64 z Vector3 angular float64 x float64 y float64 z float64[36] covariance
builtin_interfaces/Time stamp: 时间戳。表示这个里程计信息是在哪个时间点生成的或有效的。这是数据同步和时序分析的关键。
string frame_id: 坐标系ID。这个非常重要,它定义了 pose(位姿)是在哪个坐标系下表示的。通常,它是一个固定的、全局的参考坐标系,比如 odom 或 map。例如,frame_id: "odom" 意味着机器人的位置和姿态是相对于里程计坐标系原点的。
frame_id 和 child_frame_id 的关系: 这个消息描述的是 child_frame_id 相对于 frame_id 的状态。在 TF (Transform) 系统中,这个消息通常对应一个从 frame_id 到 child_frame_id 的变换。
顺序:矩阵的行和列对应于 (x, y, z, roll, pitch, yaw) 这六个自由度。 roll, pitch, yaw 是姿态的欧拉角表示,协方差在这里表示的是姿态估计在这些轴上的不确定性。 解读:
对角线元素:表示各个变量的方差。例如,第一个元素 covariance[0] 是 x 位置的方差。方差越大,表示对 x 位置的估计越不确定。
非对角线元素:表示不同变量之间的协方差。例如,covariance[1] 表示 x 和 y 之间的协方差,如果它不为零,说明对 x 的估计误差和对 y 的估计误差是相关的。
实际应用:在卡尔曼滤波器(如 EKF)等状态估计算法中,这个协方差矩阵至关重要,它决定了在融合新的传感器数据(如IMU、GPS)时,应该给予里程计数据多大的信任权重。
这个脚本会创建一个 ROS 2 节点,订阅里程计话题,并将每一条消息的关键信息(时间戳、位置、姿态、速度)追加到一个列表中,最后在程序退出时将所有数据一次性保存为 CSV 文件。
⚠️:先执行该脚本,然后再播放bag包
pythonimport rclpy
from rclpy.node import Node
import pandas as pd
import os
# 导入里程计消息类型
from nav_msgs.msg import Odometry
# 从scipy库中导入旋转处理工具,用于四元数到欧拉角的转换
from scipy.spatial.transform import Rotation as R
class OdometryExtractor(Node):
"""
一个用于从ROS 2 Bag文件中提取里程计数据并保存为CSV文件的节点。
"""
def __init__(self):
super().__init__('odometry_extractor')
# --- 参数配置 ---
# 从bag文件中监听的里程计话题
self.odom_topic = '/Odometry' # !! 重要:请根据你的bag文件信息修改此话题名 !!
# 保存CSV文件的输出路径
self.output_csv_path = 'odometry_data.csv'
# ----------------
# 创建订阅者
self.subscription = self.create_subscription(
Odometry,
self.odom_topic,
self.odom_callback,
10 # QoS profile depth
)
# 用于存储所有数据的列表
self.data_list = []
self.get_logger().info(f"节点已启动,正在监听话题: '{self.odom_topic}'")
self.get_logger().info(f"数据将在节点关闭时保存到: '{self.output_csv_path}'")
def odom_callback(self, msg: Odometry):
"""
回调函数,处理接收到的每一条Odometry消息。
"""
# --- 提取时间戳 ---
timestamp_sec = msg.header.stamp.sec
timestamp_nanosec = msg.header.stamp.nanosec
timestamp = timestamp_sec + timestamp_nanosec / 1e9
# --- 提取位置 ---
pos_x = msg.pose.pose.position.x
pos_y = msg.pose.pose.position.y
pos_z = msg.pose.pose.position.z
# --- 提取姿态 (四元数) ---
quat_x = msg.pose.pose.orientation.x
quat_y = msg.pose.pose.orientation.y
quat_z = msg.pose.pose.orientation.z
quat_w = msg.pose.pose.orientation.w
# --- 将四元数转换为欧拉角 (roll, pitch, yaw) ---
# Scipy的Rotation期望的四元数顺序是 (x, y, z, w)
r = R.from_quat([quat_x, quat_y, quat_z, quat_w])
# 'xyz' 是欧拉角旋转顺序,可以根据需要更改为 'zyx' 等
# as_euler 的结果单位是弧度 (radians)
roll, pitch, yaw = r.as_euler('xyz', degrees=False)
# --- 提取线速度 ---
lin_vel_x = msg.twist.twist.linear.x
lin_vel_y = msg.twist.twist.linear.y
lin_vel_z = msg.twist.twist.linear.z
# --- 提取角速度 ---
ang_vel_x = msg.twist.twist.angular.x
ang_vel_y = msg.twist.twist.angular.y
ang_vel_z = msg.twist.twist.angular.z
# 将提取的数据添加到列表中
self.data_list.append({
'timestamp': timestamp,
'pos_x': pos_x,
'pos_y': pos_y,
'pos_z': pos_z,
'quat_x': quat_x,
'quat_y': quat_y,
'quat_z': quat_z,
'quat_w': quat_w,
'roll': roll,
'pitch': pitch,
'yaw': yaw,
'lin_vel_x': lin_vel_x,
'lin_vel_y': lin_vel_y,
'lin_vel_z': lin_vel_z,
'ang_vel_x': ang_vel_x,
'ang_vel_y': ang_vel_y,
'ang_vel_z': ang_vel_z,
})
# 打印一个简单的进度反馈
if len(self.data_list) % 100 == 0:
self.get_logger().info(f"已处理 {len(self.data_list)} 条里程计消息...")
def save_data(self):
"""
将收集到的数据转换为Pandas DataFrame并保存为CSV文件。
"""
if not self.data_list:
self.get_logger().warn("没有收集到任何数据,不创建CSV文件。")
return
self.get_logger().info(f"正在保存 {len(self.data_list)} 条数据到 {self.output_csv_path}...")
# 创建DataFrame
df = pd.DataFrame(self.data_list)
# 设置时间戳为索引(可选,但推荐)
# df.set_index('timestamp', inplace=True)
# 保存为CSV
df.to_csv(self.output_csv_path, index=False, float_format='%.6f')
self.get_logger().info("数据保存成功!")
def main(args=None):
rclpy.init(args=args)
odom_extractor = OdometryExtractor()
try:
# spin会阻塞,直到节点被关闭(例如通过Ctrl+C)
rclpy.spin(odom_extractor)
except KeyboardInterrupt:
odom_extractor.get_logger().info("接收到键盘中断信号 (Ctrl+C)...")
finally:
# 在节点销毁前保存数据
odom_extractor.save_data()
# 销毁节点并关闭rclpy
odom_extractor.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
本文作者:James
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!