Last active
October 10, 2022 08:55
-
-
Save ZdenekM/d84b178444950dbfe46afebf3fe1dd0d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rospy | |
import sys | |
from sensor_msgs.msg import Image | |
from std_msgs.msg import String | |
import rosbag | |
from cv_bridge import CvBridge | |
import cv2 | |
import json | |
import time | |
results = {} | |
times = {} | |
last_msg = time.time() | |
def results_cb(msg): | |
global last_msg | |
data = json.loads(msg.data) | |
ts = data["header"]["stamp"] | |
ats = time.time() | |
times[ts].append(ats) | |
results[ts] = data["detections"] | |
last_msg = ats | |
def main(): | |
bag_path = sys.argv[1] | |
topic = sys.argv[2] | |
bag = rosbag.Bag(bag_path) | |
rospy.init_node('listener', anonymous=True) | |
rospy.Subscriber("/results", String, results_cb) | |
pub = rospy.Publisher('/movie', Image, queue_size=10) | |
bridge = CvBridge() | |
out = None | |
msgs = 0 | |
print("Publishing...") | |
for topic, msg, t in bag.read_messages(topics=[topic]): | |
pub.publish(msg) | |
ts = "%s.%s" % (msg.header.stamp.secs, msg.header.stamp.nsecs) | |
times[ts] = [time.time()] | |
time.sleep(1.0) | |
msgs+=1 | |
print("Published msgs: " + str(msgs)) | |
print("Result msgs: " + str(len(results))) | |
while time.time() - last_msg < 5.0: # nevim jak lip poznat ze prisla posledni detekce | |
time.sleep(1) | |
print("Result msgs: " + str(len(results))) | |
if not results: | |
print("No results!") | |
return | |
frames = 0 | |
for topic, msg, t in bag.read_messages(topics=[topic]): | |
res_key = "%s.%s" % (msg.header.stamp.secs, msg.header.stamp.nsecs) | |
res = results.pop(res_key, None) | |
cv_image = bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8') | |
if not out: | |
out_path = bag_path + ".avi" | |
print("Saving to " + out_path) | |
shape = tuple(cv_image.shape[1::-1]) | |
print("Shape: " + str(shape)) | |
fourcc = cv2.VideoWriter_fourcc(*'X264') | |
out = cv2.VideoWriter(out_path, fourcc, 25.0, shape) | |
frames += 1 | |
if res: | |
for d in res: | |
cls_name = d["class_name"] | |
score = d["score"] | |
# Draw detection into frame. | |
if score < 0.5: | |
continue | |
# if cls_name in ["face", "person"]: | |
x1, y1, x2, y2 = [int(coord) for coord in d["bbox"]] | |
if abs(x2-x1) < 50 or abs(y2-y1) < 50: | |
continue | |
cv2.rectangle(cv_image, (x1, y1), (x2, y2), (255, 0, 0), 2) | |
cv2.putText(cv_image, cls_name, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (36,255,12), 1, cv2.LINE_AA) | |
out.write(cv_image) | |
print("Frames: " + str(frames)) | |
if out: | |
print("releasing") | |
out.release() | |
sum = 0 | |
for vals in times.values(): | |
if len(vals) < 2: | |
continue | |
sum += vals[1]-vals[0] | |
print("Mean latency: " + str(sum/len(times))) | |
if __name__ == '__main__': | |
try: | |
main() | |
except rospy.ROSInterruptException: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment