Skip to content

Instantly share code, notes, and snippets.

@vrbadev
Last active June 1, 2024 17:31
Show Gist options
  • Save vrbadev/6e9886a24509abcca59830f9ca3472bf to your computer and use it in GitHub Desktop.
Save vrbadev/6e9886a24509abcca59830f9ca3472bf to your computer and use it in GitHub Desktop.
A ROS-independent script which simulates the ROS command "rosbag info" functionality.
# -*- coding: utf-8 -*-
"""
Created on Sat Jun 1 17:49:53 2024
@author: Vojtech Vrba ([email protected])
Required extra packages: rosbags
"""
from pathlib import Path
from rosbags.highlevel import AnyReader
import datetime, os, sys
def trailing_spaces(s, length=13):
return s + ' ' * (length - len(s))
def format_stamp_sec(stamp_sec):
return datetime.datetime.fromtimestamp(stamp_sec).strftime("%d/%m/%Y %H:%M:%S.%f") + " (%f)" % stamp_sec
def get_file_size_human(file_path):
n_bytes = os.stat(file_path).st_size
n = n_bytes
units = ["B", "KB", "MB", "GB", "TB"]
i = 0
while n > 1024:
n /= 1024
i += 1
return ("%.3f " % n) + units[i] + (" (%d B)" % n_bytes)
def get_max_len(max_val):
i = 0
while 10**i < max_val:
i += 1
return i
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Missing argument: <path/to/file.bag>", file=sys.stderr)
sys.exit(-1)
bag_path = sys.argv[1]
try:
with AnyReader([Path(bag_path)]) as reader:
print(trailing_spaces("path:"), bag_path)
print(trailing_spaces("version:"), "2.0" if reader.is2 else "1.0")
print(trailing_spaces("duration:"), reader.duration * 1e-9, "s")
print(trailing_spaces("start:"), format_stamp_sec(1e-9 * reader.start_time))
print(trailing_spaces("end:"), format_stamp_sec(1e-9 * reader.end_time))
print(trailing_spaces("size:"), get_file_size_human(bag_path))
print(trailing_spaces("messages:"), reader.message_count)
#print(trailing_spaces("compression:"), "?")
topics = list()
types = set()
for tname, tinfo in reader.topics.items():
mt = tinfo.msgtype.replace("/msg/", "/")
types.add((mt, tinfo.connections[0].md5sum))
topics.append((tname, tinfo.msgcount, mt, len(tinfo.connections)))
types = sorted(list(types), key=lambda x: x[0])
mt_max_len = max([len(t) for t, h in types])
for i, (mt, md) in enumerate(types):
first_col = trailing_spaces("types:") if i == 0 else trailing_spaces("")
print(first_col, trailing_spaces(mt, length=mt_max_len), "[%s]" % md)
topics = sorted(topics, key=lambda x: x[0])
tn_max_len = max([len(tn) for tn, mc, mt, nc in topics])
mc_max_len = get_max_len(max([mc for tn, mc, mt, nc in topics]))
for i, (tn, mc, mt, nc) in enumerate(topics):
first_col = trailing_spaces("topics:") if i == 0 else trailing_spaces("")
print(first_col, trailing_spaces(tn, length=tn_max_len), "%*d" % (mc_max_len, mc), trailing_spaces("msg" if mc == 1 else "msgs", length=7), ":", trailing_spaces(mt, length=mt_max_len), "(%d connections)" % nc if nc > 1 else "")
except Exception as err:
print("Error: The file '%s' cannot be read:" % bag_path, str(err), file=sys.stderr)
sys.exit(-2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment