2025-10-12
数据处理
00

目录

解析Odometry和提取数据
格式解析
订阅与发布的应用
发布者 (Producers):
订阅者 (Consumers):
从ROS2 Bag中提取数据

av_msgs/msg/Odometry 是 ROS (Robot Operating System) 中一个极其重要和基础的消息类型。它用于表示机器人在空间中的位姿(位置和姿态)和速度(线速度和角速度)的估计值,并且包含了这些估计值的不确定性(协方差)。

简单来说,它回答了以下几个核心问题:

机器人现在在哪里? (位置) 机器人现在朝向哪里? (姿态/方向) 机器人正在以多快的速度移动? (线速度) 机器人正在以多快的速度转动? (角速度) 我们对这些估计有多大的信心? (协方差)

解析Odometry和提取数据

首先执行以下命令查看该数据格式:

ros2 interface show nav_msgs/msg/Odometry

然后我们会得到:

# This represents an estimate of a position and velocity in free space. # The pose in this message should be specified in the coordinate frame given by header.frame_id # The twist in this message should be specified in the coordinate frame given by the child_frame_id # Includes the frame id of the pose parent. std_msgs/Header header builtin_interfaces/Time stamp int32 sec uint32 nanosec string frame_id # Frame id the pose points to. The twist is in this coordinate frame. string child_frame_id # Estimated pose that is typically relative to a fixed world frame. geometry_msgs/PoseWithCovariance pose Pose pose Point position float64 x float64 y float64 z Quaternion orientation float64 x 0 float64 y 0 float64 z 0 float64 w 1 float64[36] covariance # Estimated linear and angular velocity relative to child_frame_id. geometry_msgs/TwistWithCovariance twist Twist twist Vector3 linear float64 x float64 y float64 z Vector3 angular float64 x float64 y float64 z float64[36] covariance

格式解析

  1. std_msgs/Header header 这是一个标准的消息头,包含了所有ROS消息都应该有的基本信息。
  • builtin_interfaces/Time stamp: 时间戳。表示这个里程计信息是在哪个时间点生成的或有效的。这是数据同步和时序分析的关键。

  • string frame_id: 坐标系ID。这个非常重要,它定义了 pose(位姿)是在哪个坐标系下表示的。通常,它是一个固定的、全局的参考坐标系,比如 odom 或 map。例如,frame_id: "odom" 意味着机器人的位置和姿态是相对于里程计坐标系原点的。

  1. string child_frame_id 子坐标系ID。它定义了 twist(速度)是在哪个坐标系下表示的。通常,这是机器人自身的坐标系,比如 base_link 或 base_footprint。例如,child_frame_id: "base_link" 意味着报告的速度是机器人本体坐标系下的前向、左向和向上速度。

frame_id 和 child_frame_id 的关系: 这个消息描述的是 child_frame_id 相对于 frame_id 的状态。在 TF (Transform) 系统中,这个消息通常对应一个从 frame_id 到 child_frame_id 的变换。

  1. geometry_msgs/PoseWithCovariance pose 这部分描述了机器人的静态位姿和其不确定性。
  • geometry_msgs/Pose pose
  • geometry_msgs/Point position: 位置
  • float64 x: 在 frame_id 坐标系的 X 轴上的位置。
  • float64 y: 在 frame_id 坐标系的 Y 轴上的位置。
  • float64 z: 在 frame_id 坐标系的 Z 轴上的位置。
  • geometry_msgs/Quaternion orientation: 姿态/方向
  • float64 x
  • float64 y
  • float64 z
  • float64 w
  • 这四个值共同组成一个四元数,用于表示机器人在 frame_id 坐标系中的旋转。四元数是表示三维旋转的标准方式,可以避免万向锁问题。
  • float64[36] covariance
  • 位姿协方差矩阵。这是一个 6x6 的矩阵,被展平成一个长度为36的数组。它表示了位姿估计的不确定性。

顺序:矩阵的行和列对应于 (x, y, z, roll, pitch, yaw) 这六个自由度。 roll, pitch, yaw 是姿态的欧拉角表示,协方差在这里表示的是姿态估计在这些轴上的不确定性。 解读:

对角线元素:表示各个变量的方差。例如,第一个元素 covariance[0] 是 x 位置的方差。方差越大,表示对 x 位置的估计越不确定。

非对角线元素:表示不同变量之间的协方差。例如,covariance[1] 表示 x 和 y 之间的协方差,如果它不为零,说明对 x 的估计误差和对 y 的估计误差是相关的。

实际应用:在卡尔曼滤波器(如 EKF)等状态估计算法中,这个协方差矩阵至关重要,它决定了在融合新的传感器数据(如IMU、GPS)时,应该给予里程计数据多大的信任权重。

订阅与发布的应用

发布者 (Producers):

  • 轮式里程计节点:通过读取轮子编码器的读数来估算。
  • 视觉里程计 (VO) 节点:通过分析连续的相机图像来估算。
  • 激光雷达里程计 (LO) 节点:通过匹配连续的激光扫描点云来估算。
  • 惯性导航系统 (INS) 节点:通过融合 IMU 和 GPS 等数据。
  • 状态估计节点 (如 robot_localization):融合多种来源的里程计和传感器数据,发布一个更精确、更鲁棒的里程计信息。

订阅者 (Consumers):

  • TF广播节点:订阅 /odom 消息,并将其中的位姿信息广播为 odom -> base_link 的 TF 变换。
  • 导航栈 (Nav2):使用里程计信息进行路径规划和跟踪。
  • 建图算法 (SLAM):里程计是 SLAM 算法中的重要输入,用于提供运动先验。
  • 可视化工具 (RViz):显示机器人的轨迹和当前位置。

从ROS2 Bag中提取数据

这个脚本会创建一个 ROS 2 节点,订阅里程计话题,并将每一条消息的关键信息(时间戳、位置、姿态、速度)追加到一个列表中,最后在程序退出时将所有数据一次性保存为 CSV 文件。

⚠️:先执行该脚本,然后再播放bag包

python
import rclpy from rclpy.node import Node import pandas as pd import os # 导入里程计消息类型 from nav_msgs.msg import Odometry # 从scipy库中导入旋转处理工具,用于四元数到欧拉角的转换 from scipy.spatial.transform import Rotation as R class OdometryExtractor(Node): """ 一个用于从ROS 2 Bag文件中提取里程计数据并保存为CSV文件的节点。 """ def __init__(self): super().__init__('odometry_extractor') # --- 参数配置 --- # 从bag文件中监听的里程计话题 self.odom_topic = '/Odometry' # !! 重要:请根据你的bag文件信息修改此话题名 !! # 保存CSV文件的输出路径 self.output_csv_path = 'odometry_data.csv' # ---------------- # 创建订阅者 self.subscription = self.create_subscription( Odometry, self.odom_topic, self.odom_callback, 10 # QoS profile depth ) # 用于存储所有数据的列表 self.data_list = [] self.get_logger().info(f"节点已启动,正在监听话题: '{self.odom_topic}'") self.get_logger().info(f"数据将在节点关闭时保存到: '{self.output_csv_path}'") def odom_callback(self, msg: Odometry): """ 回调函数,处理接收到的每一条Odometry消息。 """ # --- 提取时间戳 --- timestamp_sec = msg.header.stamp.sec timestamp_nanosec = msg.header.stamp.nanosec timestamp = timestamp_sec + timestamp_nanosec / 1e9 # --- 提取位置 --- pos_x = msg.pose.pose.position.x pos_y = msg.pose.pose.position.y pos_z = msg.pose.pose.position.z # --- 提取姿态 (四元数) --- quat_x = msg.pose.pose.orientation.x quat_y = msg.pose.pose.orientation.y quat_z = msg.pose.pose.orientation.z quat_w = msg.pose.pose.orientation.w # --- 将四元数转换为欧拉角 (roll, pitch, yaw) --- # Scipy的Rotation期望的四元数顺序是 (x, y, z, w) r = R.from_quat([quat_x, quat_y, quat_z, quat_w]) # 'xyz' 是欧拉角旋转顺序,可以根据需要更改为 'zyx' 等 # as_euler 的结果单位是弧度 (radians) roll, pitch, yaw = r.as_euler('xyz', degrees=False) # --- 提取线速度 --- lin_vel_x = msg.twist.twist.linear.x lin_vel_y = msg.twist.twist.linear.y lin_vel_z = msg.twist.twist.linear.z # --- 提取角速度 --- ang_vel_x = msg.twist.twist.angular.x ang_vel_y = msg.twist.twist.angular.y ang_vel_z = msg.twist.twist.angular.z # 将提取的数据添加到列表中 self.data_list.append({ 'timestamp': timestamp, 'pos_x': pos_x, 'pos_y': pos_y, 'pos_z': pos_z, 'quat_x': quat_x, 'quat_y': quat_y, 'quat_z': quat_z, 'quat_w': quat_w, 'roll': roll, 'pitch': pitch, 'yaw': yaw, 'lin_vel_x': lin_vel_x, 'lin_vel_y': lin_vel_y, 'lin_vel_z': lin_vel_z, 'ang_vel_x': ang_vel_x, 'ang_vel_y': ang_vel_y, 'ang_vel_z': ang_vel_z, }) # 打印一个简单的进度反馈 if len(self.data_list) % 100 == 0: self.get_logger().info(f"已处理 {len(self.data_list)} 条里程计消息...") def save_data(self): """ 将收集到的数据转换为Pandas DataFrame并保存为CSV文件。 """ if not self.data_list: self.get_logger().warn("没有收集到任何数据,不创建CSV文件。") return self.get_logger().info(f"正在保存 {len(self.data_list)} 条数据到 {self.output_csv_path}...") # 创建DataFrame df = pd.DataFrame(self.data_list) # 设置时间戳为索引(可选,但推荐) # df.set_index('timestamp', inplace=True) # 保存为CSV df.to_csv(self.output_csv_path, index=False, float_format='%.6f') self.get_logger().info("数据保存成功!") def main(args=None): rclpy.init(args=args) odom_extractor = OdometryExtractor() try: # spin会阻塞,直到节点被关闭(例如通过Ctrl+C) rclpy.spin(odom_extractor) except KeyboardInterrupt: odom_extractor.get_logger().info("接收到键盘中断信号 (Ctrl+C)...") finally: # 在节点销毁前保存数据 odom_extractor.save_data() # 销毁节点并关闭rclpy odom_extractor.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()

本文作者:James

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!