Last active
February 6, 2022 11:07
-
-
Save acidzebra/c866794aaf16da707f83894cb3dd8b74 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import anki_vector | |
import json | |
import paho.mqtt.client as mqtt | |
import requests | |
url = "YOURDISCORDWEBHOOKURL" | |
MQTT_HOST = "YOURMQTTHOST" | |
MQTT_PORT = 1883 | |
MQTT_KEEPALIVE_INTERVAL = 20 | |
MQTT_TOPIC = "YOURMQTTTOPIC" | |
vectorserial = "YOURVECTORSERIAL" | |
myrobot = anki_vector.Robot(vectorserial,default_logging=False,behavior_control_level=None) | |
robot_voltage = 0 | |
robot_batlevel = 0 | |
robot_charging = 0 | |
robot_docked = 0 | |
robot_discordmessage = "" | |
try: | |
myrobot.connect(timeout=12) | |
myrobot_battery_state = myrobot.get_battery_state() | |
myrobot_carryblock = myrobot.status.is_carrying_block | |
if myrobot_carryblock: | |
robot_discordmessage = ":robot::blue_square: Bluey is carrying a cube!" | |
myrobot_cliffdetect = myrobot.status.is_cliff_detected | |
if myrobot_cliffdetect: | |
robot_discordmessage = ":robot::blue_square: Bluey is getting himself into trouble!" | |
myrobot_docking = myrobot.status.is_docking_to_marker | |
if myrobot_docking: | |
robot_discordmessage = ":robot::blue_square: Bluey is docking!" | |
myrobot_falling = myrobot.status.is_falling | |
if myrobot_falling: | |
robot_discordmessage = ":robot::blue_square: Bluey is falling!" | |
myrobot_pickup = myrobot.status.is_picked_up | |
if myrobot_pickup: | |
robot_discordmessage = ":robot::blue_square: Bluey is getting himself into trouble!" | |
if myrobot_battery_state: | |
robot_voltage = round(myrobot_battery_state.battery_volts,2) | |
robot_batlevel = myrobot_battery_state.battery_level | |
robot_charging = myrobot_battery_state.is_charging | |
robot_docked = myrobot_battery_state.is_on_charger_platform | |
myrobot.disconnect() | |
except: | |
print("unable to get data from robot") | |
if robot_discordmessage: | |
robotdcdata = { | |
"content" : robot_discordmessage | |
} | |
try: | |
result = requests.post(url, json = robotdcdata) | |
except: | |
print("unable to post robot discord message") | |
data = {} | |
if robot_voltage and robot_voltage != 0: | |
data['robots'] = [] | |
data['robots'].append({ | |
'name': 'robot', | |
'voltage': robot_voltage, | |
'batlevel': robot_batlevel, | |
'charging': robot_charging, | |
'docked': robot_docked | |
}) | |
if data: | |
MQTT_MSG = str(data) | |
# Define on_publish event function | |
def on_publish(client, userdata, mid): | |
print("Robot Message Published...") | |
# Initiate MQTT Client | |
mqttc = mqtt.Client() | |
# Register publish callback function | |
mqttc.on_publish = on_publish | |
try: | |
# Connect with MQTT Broker | |
mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE_INTERVAL) | |
# Publish message to MQTT Broker | |
mqttc.publish(MQTT_TOPIC,MQTT_MSG) | |
# Disconnect from MQTT_Broker | |
mqttc.disconnect() | |
except: | |
print("issue publishing robot data") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment