Created
October 28, 2016 15:36
-
-
Save ronekko/0ba55b65821450c73e63427a233ffded to your computer and use it in GitHub Desktop.
A script for Windows to communicate with ROS hokuyo_node over rosbridge.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Created on Sat Oct 28 22:14:57 2016 | |
@author: sakurai | |
--------------------------------------- | |
This script is for Windows to communicate with ROS hokuyo_node over rosbridge. | |
Install rosbridge on the remote machine. | |
$ sudo apt-get install ros-indigo-rosbridge-suite | |
$ sudo apt-get install ros-indigo-hokuyo-node | |
Run the youbot driver and the rosbridge TCP server. | |
$ roscore | |
$ rosparam set hokuyo_node/port /dev/ttyACM1 | |
$ rosrun hokuyo_node hokuyo_node | |
$ roslaunch rosbridge_server rosbridge_tcp.launch | |
Run this script and scans are plotted. | |
""" | |
import socket | |
import json | |
from datetime import datetime | |
import matplotlib.pyplot as plt | |
class HokuyoNode(object): | |
def __init__(self, rosbridge_host='192.168.100.40', rosbridge_port=9090): | |
self.sock = socket.socket() | |
self.sock.connect((host, port)) | |
json_str = json.dumps(dict(op='subscribe', topic='/scan')) | |
self.sock.send(json_str) | |
self.str_buffer = "" | |
self.jsons = [] | |
def receive_scan(self): | |
''' | |
returns `sensor_msgs/LaserScan` message as a dict. | |
see http://docs.ros.org/kinetic/api/sensor_msgs/html/msg/LaserScan.html | |
''' | |
while True: | |
partial_str = self.sock.recv(100000) | |
self.str_buffer += partial_str | |
# insert ! as delimiter | |
self.str_buffer = self.str_buffer.replace('{"topic"', | |
'!{"topic"') | |
splits = self.str_buffer.split("!") | |
self.str_buffer = splits[-1] | |
self.jsons += splits[:-1] | |
if len(self.jsons): | |
json_str = self.jsons.pop() | |
# discard if json_str is not a complete json string | |
if json_str.endswith('"publish"}'): | |
laser_scan = json.loads(json_str)['msg'] | |
return laser_scan | |
def close(self): | |
self.sock.close() | |
if __name__ == '__main__': | |
# address and port of rosbridge | |
host = '192.168.100.40' | |
port = 9090 | |
hokuyo = HokuyoNode(host, port) # subscribe `/scan` topic | |
try: | |
while True: | |
# receive `sensor_msgs/LaserScan` message | |
laser_scan = hokuyo.receive_scan() | |
header = laser_scan['header'] | |
seq = header['seq'] | |
print "seq:", seq | |
stamp = header['stamp'] | |
date_time = datetime.fromtimestamp( | |
stamp['secs']).strftime('%Y/%m/%d %H:%M:%S') | |
date_time += ".{}".format(stamp['nsecs'] // 100000) | |
print date_time | |
ranges = laser_scan['ranges'] | |
plt.plot(ranges) | |
plt.title('# ' + str(seq)) | |
plt.show() | |
except KeyboardInterrupt: | |
pass | |
hokuyo.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment