Last active
August 29, 2015 14:07
-
-
Save volodymyrsmirnov/86ca1ca884aae757cf87 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from zway import ZWay, ZWayData | |
import sys | |
import signal | |
import binascii | |
from flask import Flask, jsonify, request, abort | |
app = Flask(__name__) | |
class ZWayHandler(ZWay): | |
def on_device(self, type, node_id, instance_id, command_id): | |
print "got on_device event", (type, node_id, instance_id, command_id) | |
zway_instance = ZWayHandler( | |
"/dev/ttyAMA0", | |
"/opt/z-way-server/config/", | |
"/opt/z-way-server/translations/", | |
"/opt/z-way-server/ZDDX/", | |
"/tmp/pyzway.log", 0 | |
) | |
zway_instance.start() | |
zway_instance.device_add_callback(0x01 | 0x02 | 0x04 | 0x08 | 0x10 | 0x20) | |
zway_instance.discover() | |
def check_response(data): | |
return jsonify({ | |
"success": type(data) is int or type(data) is bool or bool(data), | |
"data": data | |
}) | |
@app.route("/") | |
def home(): | |
return "PyZBerry Testing Application" | |
@app.route("/inclusion/<int:state>") | |
def inclusion(state): | |
return check_response(zway_instance.controller_add_node_to_network(state)) | |
@app.route("/zway/device_guess/<int:device_id>") | |
def device_guess(device_id): | |
return check_response(zway_instance.device_guess(device_id)) | |
@app.route("/exclusion/<int:state>") | |
def exclusion(state): | |
return check_response(zway_instance.controller_remove_node_from_network(state)) | |
@app.route("/devices") | |
def devices(): | |
return check_response(zway_instance.devices_list()) | |
@app.route("/instances") | |
def instances(): | |
return check_response(zway_instance.instances_list(int(request.values.get("device", 0)))) | |
@app.route("/command_classes") | |
def command_classes(): | |
return check_response(zway_instance.command_classes_list(int(request.values.get("device", 0)), int(request.values.get("instance", 0)))) | |
@app.route("/controller") | |
def controller(): | |
data = ZWayData.find_controller_data(zway_instance, request.values.get("path", "").encode("ascii", "ignore")) | |
data_output = [{ | |
"path": data_child.path, | |
"name": data_child.name, | |
"type": data_child.type, | |
"value": binascii.hexlify(data_child.value) if data_child.type == 5 else data_child.value | |
} for data_child in data.children] | |
data_output.insert(0, { | |
"path": data.path, | |
"name": data.name, | |
"type": data.type, | |
"value": data.value | |
}) | |
return check_response(data_output) | |
@app.route("/device") | |
def device(): | |
device_i = request.values.get("device", None) | |
instance_i = request.values.get("instance", None) | |
command_class_i = request.values.get("command_class", None) | |
if not device: | |
return abort(500) | |
data = ZWayData.find_device_data( | |
zway_instance, | |
request.values.get("path", "").encode("ascii", "ignore"), | |
int(device_i) if device_i is not None else None, | |
int(instance_i) if instance_i is not None else None, | |
int(command_class_i) if command_class_i is not None else None, | |
) | |
data_output = [{ | |
"path": data_child.path, | |
"name": data_child.name, | |
"type": data_child.type, | |
"value": binascii.hexlify(data_child.value) if data_child.type == 5 else data_child.value | |
} for data_child in data.children] | |
data_output.insert(0, { | |
"path": data.path, | |
"name": data.name, | |
"type": data.type, | |
"value": data.value | |
}) | |
return check_response(data_output) | |
def signal_handler(signal, frame): | |
print ("now exit") | |
zway_instance.stop() | |
zway_instance.terminate() | |
sys.exit(0) | |
signal.signal(signal.SIGINT, signal_handler) | |
if __name__ == '__main__': | |
app.run(host="0.0.0.0", debug=True, use_reloader=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment