Created
December 30, 2017 16:47
-
-
Save MikahB/94d41ac42e1fd1199727e303f1b63476 to your computer and use it in GitHub Desktop.
Example file from alphaMesh project
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json, os, time | |
from datetime import datetime, timedelta | |
from alphamesh import device_management, mesh_configuration | |
from alphamesh.message_publisher import MessagePublisher, OutboundMessage | |
def handle_message(message, queue, mconfig): | |
try: | |
json_msg = json.loads(message) | |
except Exception as e: | |
queue.put("Error trying to parse message into JSON:" + str(e)) | |
return | |
# Thinking I should replace this with a case-insensive dict or just deal with | |
# case sensitivity (yuck) | |
json_msg2 = dict((k.lower(), v) for k, v in json_msg.items()) | |
if 'action' not in json_msg2: | |
action = "No 'action' key found" | |
else: | |
action = json_msg2['action'] | |
if action == "send_device_status": | |
proc = StatusMessageHandler(json_msg2, queue, mconfig) | |
elif action == "set_device_condition": | |
proc = SetDeviceInfoMessageHandler(json_msg2, queue, mconfig) | |
elif action == "update_alphamesh": | |
proc = UpdateMessageHandler(json_msg2, queue, mconfig) | |
elif action == "burst_vibration_data": | |
proc = RawVibrationMessageHandler(json_msg2, queue, mconfig) | |
elif action == "get_config_file": | |
proc = ConfigFileMessageHandler(json_msg2, queue, mconfig) | |
elif action == "show_run_index": | |
proc = RunIndexMessageHandler(json_msg2, queue, mconfig) | |
elif action == "shell_command": | |
proc = ShellCommandMessageHandler(json_msg2, queue, mconfig) | |
elif action == "reboot": | |
import os | |
os.system('sudo shutdown -r now') | |
else: | |
queue.put("Unrecognized action requested: " + action) | |
return | |
proc.process_message(json_msg2) | |
class MessageHandler: | |
def __init__(self, json_message, queue, mconfig, **kwargs): | |
self._q = queue | |
self._mconfig = mconfig | |
self._json_message = json_message | |
def send_message_to_cloud(self, outbound_message): | |
pub = MessagePublisher(exchange='', queue=self._mconfig.OUTBOUND_QUEUE_NAME, | |
url=self._mconfig.RABBIT_INSTANCE_URL, sender_hardware_id=self._mconfig.HARDWARE_ID) | |
pub.publish_message(outbound_message) | |
def process_message(self, json_message): | |
pass | |
class RawVibrationMessageHandler(MessageHandler): | |
def process_message(self, json_message): | |
# Need to get and keep track of the DataRequestId as it will be needed when | |
# sending data back to the mothership | |
from alphamesh.bolt_ons.vibration_sensing import vibration_service | |
try: | |
request_id = int(json_message['args']['data_request_id']) | |
num_data_points = int(json_message['args']['data_points']) | |
except: | |
self._q.put('Inbound request for burst_vibration_data did not include required args.data_request_id') | |
self._q.put('Ignoring badly formed message.') | |
return | |
condition_id = int(self._mconfig.CONDITION_ID) | |
if num_data_points == 0: # unset int from web will be 0 | |
num_data_points = 1000 | |
self._q.put(str(num_data_points) + " Raw Vibration Data points requested...") | |
try: | |
msg_body = vibration_service.get_burst_vibration_data(num_data_points) | |
msg = OutboundMessage(routing_key=self._mconfig.OUTBOUND_QUEUE_NAME, | |
headers={'data_request_id': request_id, | |
'condition_id': condition_id}, | |
data_points=msg_body, | |
message_type='vibrationdataraw') | |
self.send_message_to_cloud(msg) | |
self._q.put("Raw Vibration Data sent.") | |
except Exception as e: | |
self._q.put('An error occurred trying to get Vibration Data points - no data sent') | |
print('Error trying to get vibration data: ' + str(e)) | |
class StatusMessageHandler(MessageHandler): | |
def process_message(self, json_message): | |
msg_body = device_management.get_device_status_message() | |
msg = OutboundMessage(routing_key=self._mconfig.OUTBOUND_QUEUE_NAME, | |
headers='', | |
message_lines=msg_body, | |
message_type='devicestatus') | |
self.send_message_to_cloud(msg) | |
self._q.put("Device status requested and sent.") | |
class RunIndexMessageHandler(MessageHandler): | |
def process_message(self, json_message): | |
index_value = 0.0 | |
value_time = datetime.utcnow() | |
try: # in case we get bad json message | |
index_value = self._json_message['args']['run_index_value'] | |
value_time_raw = self._json_message['args']['run_index_time'] | |
value_time = datetime.strptime(value_time_raw, '%b %d %Y %I:%M%p') | |
value_time = value_time + timedelta(hours=-6) | |
except Exception as e: | |
print('Error trying to get run_index_value or run_index_time from message: ' + str(e)) | |
self._q.put('run_index_value,' + str(index_value) + "," + str(value_time)) | |
class ConfigFileMessageHandler(MessageHandler): | |
def process_message(self, json_message): | |
config_data = mesh_configuration.get_config_file_as_array('config.ini') | |
if config_data[0] == '': | |
# There ws a problem, can't continue | |
return | |
msg_body = config_data | |
msg = OutboundMessage(routing_key=self._mconfig.OUTBOUND_QUEUE_NAME, | |
headers={'file_name': 'config.ini', 'note': ''}, | |
message_lines=msg_body, | |
message_type='deviceconfiguration') | |
self.send_message_to_cloud(msg) | |
self._q.put("config.ini requested and sent") | |
class UpdateMessageHandler(MessageHandler): | |
def process_message(self, json_message): | |
# this means the Cloud wants us to go see if there is a new | |
# version of software, so go check | |
try: | |
branch = json_message['args']['branch'] | |
except: | |
self._q.put('Received request update_alphamesh but could not read args.branch') | |
self._q.put('Ignoring badly formed message.') | |
return | |
import subprocess | |
dir = os.path.dirname(__file__) | |
update_file = os.path.join(dir, 'utils/update_alphamesh.sh') | |
self._q.put("activate_startup_screen") | |
i = 10 | |
self._q.put("Software update requested on " + branch + " branch, shutting down in " + str(i) + " seconds...") | |
while i > 0: | |
i -= 1 | |
time.sleep(1) | |
self._q.put(str(i)) | |
if branch == "": | |
subprocess.check_call([update_file, "-r"]) | |
else: | |
subprocess.check_call([update_file, "-r", "-b", branch]) | |
class SetDeviceInfoMessageHandler(MessageHandler): | |
def process_message(self, json_message): | |
# Whenever this device is switched to a different ConditionId online, | |
# the website will send down a message telling us to update here | |
new_id = json_message['conditionid'] | |
new_desc = json_message['conditiondescription'] | |
if 'machinedescription' not in json_message: | |
new_machine_desc = '' | |
else: | |
new_machine_desc = json_message['machinedescription'] | |
if mesh_configuration.update_condition(new_id, new_desc, new_machine_desc): | |
self._q.put("Updated Condition to " + new_desc) | |
class ShellCommandMessageHandler(MessageHandler): | |
def process_message(self, json_message): | |
import subprocess | |
args = json_message['command'] | |
use_shell = json_message['use_shell'] | |
try: | |
r = subprocess.check_output(args=args, shell=use_shell) | |
self._q.put('Shell Command returned: ' + r.decode('utf-8')) | |
msg_body = ['Shell Command Success: ', r.decode('utf-8')] | |
msg = OutboundMessage(routing_key=self._mconfig.OUTBOUND_QUEUE_NAME, | |
headers='', | |
message_lines=msg_body, | |
message_type='devicestatus') | |
self.send_message_to_cloud(msg) | |
except subprocess.CalledProcessError as e: | |
self._q.put(str(e) + str(e.output)) | |
msg_body = ['Shell Command Error:', str(e)] | |
msg = OutboundMessage(routing_key=self._mconfig.OUTBOUND_QUEUE_NAME, | |
headers='', | |
message_lines=msg_body, | |
message_type='devicestatus') | |
self.send_message_to_cloud(msg) | |
except Exception as e: | |
self._q.put('Other Unspecified Exception: ' + str(e)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You can tighten up that first chunk using a hashmap of handler classes.
DRY: why are you passing json_msg2 to the Handler constructor and as an argument to the constructed object’s method?