Created
March 13, 2021 15:44
-
-
Save kk7ds/65e0c6db43c0092ce0e0a20eb47cd398 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import threading | |
import requests | |
# This is my dedicated house module. It's private, and includes details I | |
# can't share for obvious reasons. It has an Events module that ends up sending | |
# things to MQTT, but used to send them to a different bus technology before | |
# I moved to MQTT. | |
import myhouse | |
from nx584 import model | |
LOG = logging.getLogger('ext') | |
def get_state(flags): | |
flags = set(flags) | |
armtype = 'Entryguard (stay mode)' in flags and 'stay' or 'away' | |
if 'Armed' not in flags: | |
return {'state': 'disarmed'} | |
else: | |
if 'Siren on' in flags: | |
return {'state': 'alarm'} | |
elif set(['Exit 1', 'Exit 2']) & flags: | |
return {'state': 'arming'} | |
else: | |
return {'state': 'armed', | |
'armtype': armtype} | |
class MyHouseNXEvents(model.NX584Extension): | |
def __init__(self, *args, **kwargs): | |
super(MyHouseNXEvents, self).__init__(*args, **kwargs) | |
self._partitions = {} | |
def _zone_event(self, zone): | |
myhouse.Events.send_mq_event( | |
'alarm/zone/%03i-%s/status' % (zone.number, | |
zone.name.replace(' ', '_')), | |
{'name': zone.name, | |
'number': zone.number, | |
'status': zone.state}, | |
retain=False) | |
def _partition_event(self, part): | |
new_state = get_state(part.condition_flags) | |
cur_state = self._partitions.get(part.number) | |
if new_state != cur_state: | |
event = {'number': part.number} | |
event.update(new_state) | |
myhouse.Events.send_mq_event( | |
'alarm/partition/%i/status' % part.number, | |
event) | |
LOG.info('State %s -> %s' % (cur_state, new_state)) | |
self._partitions[part.number] = new_state | |
def zone_status(self, zone): | |
self._zone_event(zone) | |
def partition_status(self, part): | |
self._partition_event(part) | |
def device_command(self, house, unit, command): | |
myhouse.Events.send_mq_event( | |
'alarm/device/%s%02i/command' % (house.upper(), unit), | |
{'house': house, | |
'unit': unit, | |
'command': command}) | |
def log_event(self, event): | |
myhouse.Events.send_mq_event( | |
'alarm/log', | |
{'event': event.event, | |
'message': event.event_string}) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is the seutp.py for the larger eventhandler package I use. | |
# It installs multiple things, including the event injection plugin that nx584 will | |
# pick up in order to start sending the events to MQTT. That's why you see phue | |
# and pywws in the requires list. | |
setup(name='myhouse-events', | |
version='1.0', | |
py_modules=['myhouse_nx584'], | |
entry_points={ | |
'pynx584': ['nxevents=myhouse_nx584:myhouseNXEvents'], | |
}, | |
install_requires=['myhouse', 'phue', 'pynx584', 'pywws'], | |
); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment