iterative closest points
CEV ICP algorithm library
Loading...
Searching...
No Matches
rosbag_extract.py
Go to the documentation of this file.
1# Copyright (C) 2024-2025 Cornell Electric Vehicles.
2# SPDX-License-Identifier: MIT
3
4import argparse
5import os
6
7import matplotlib
8import numpy as np
9import pandas as pd
10from bagpy import bagreader
11from matplotlib import pyplot as plt
12
13parser = argparse.ArgumentParser(
14 prog="rosbag_extract",
15 description="Extract LiDAR data from ROS bags to use in ICP testing",
16)
17parser.add_argument(
18 "-i",
19 "--input",
20 type=str,
21 help="Input ROS bag file",
22 required=True,
23)
24parser.add_argument(
25 "-o",
26 "--output_dir",
27 type=str,
28 help="Output directory to save the extracted scans",
29 required=True,
30)
31parser.add_argument(
32 "-t",
33 "--topic",
34 type=str,
35 help="Topic name to extract",
36 required=True,
37)
38args = parser.parse_args()
39
40os.makedirs(args.output_dir, exist_ok=True)
41
42b = bagreader(args.input)
43LASER_MSG = b.message_by_topic(args.topic)
44df = pd.read_csv(LASER_MSG)
45
46current_i = 0
47exported = 0
48
49
50def on_press(event):
51 global current_i
52 if event.key == "l":
53 current_i = min(current_i + 1, len(df) - 1)
54 elif event.key == "h":
55 current_i = max(current_i - 1, 0)
56 elif event.key == "p":
57 current_i = min(current_i + 10, len(df) - 1)
58 elif event.key == "u":
59 current_i = max(current_i - 10, 0)
60 elif event.key == "e":
61 save(current_i)
62 draw(current_i)
63
64
65def get_ranges(row):
66 ranges = []
67 i = 0
68 while True:
69 if f"ranges_{i}" not in row:
70 break
71 ranges.append(row[f"ranges_{i}"])
72 i += 1
73 return np.array(ranges)
74
75
76def draw(index: int):
77 row = df.iloc[index]
78 ranges = get_ranges(row)
79
80 angle_increment = row["angle_increment"]
81 angle_min = row["angle_min"]
82 angle_max = row["angle_max"]
83 angles = np.arange(angle_min, angle_max + angle_increment, angle_increment)
84
85 x = np.cos(angles) * ranges
86 y = np.sin(angles) * ranges
87
88 ax.clear()
89 ax.set_xlabel("x")
90 ax.set_ylabel("y")
91 ax.set_aspect("equal")
92 ax.set_xscale("linear")
93 ax.set_yscale("linear")
94
95 lidar_max = df.iloc[0]["range_max"]
96 ax.set_xlim(-lidar_max, lidar_max)
97 ax.set_ylim(-lidar_max, lidar_max)
98 ax.plot(x, y, "o", linestyle="None")
99
100
101def save(index: int):
102 global exported
103 row = df.iloc[index]
104 angle_increment = row["angle_increment"]
105 angle_min = row["angle_min"]
106 angle_max = row["angle_max"]
107 range_min = row["range_min"]
108 range_max = row["range_max"]
109 ranges = get_ranges(row)
110 length = len(ranges)
111 with open(f"{args.output_dir}/scan_{exported}.conf", "w") as f:
112 f.write(f"angle_max = {angle_max}\n")
113 f.write(f"angle_min = {angle_min}\n")
114 f.write(f"angle_increment = {angle_increment}\n")
115 f.write(f"length = {length}\n")
116 f.write(f"range_max = {range_max}\n")
117 f.write(f"range_min = {range_min}\n")
118 for i, r in enumerate(ranges):
119 f.write(f"{i} = {r}\n")
120 print(f"Exported scan {exported}")
121 exported += 1
122
123
124matplotlib.use("qt5agg")
125fig, ax = plt.subplots()
126plt.ion()
127fig.canvas.mpl_connect("key_press_event", on_press)
128draw(current_i)
129plt.show(block=True)
draw(int index)