Created
October 18, 2021 21:05
-
-
Save rkachowski/f61eada9846e9062fdb94e9319153bd1 to your computer and use it in GitHub Desktop.
get data from dht, co2 meter and noise level sensor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import paho.mqtt.client as mqtt | |
import Adafruit_DHT | |
import usb.core | |
import statistics | |
import requests | |
import sdnotify | |
import sys, fcntl, time, os, argparse, socket | |
from time import sleep | |
delay = 5 | |
broker = "kitchenpi.lan" | |
lwt = "alive/stats_relay" | |
topic = "stats" | |
n = sdnotify.SystemdNotifier() | |
h = [] | |
t = [] | |
s = [] | |
buffer_size = 5 | |
min_deviation = 5 | |
def app(list, val): | |
if len(list) < buffer_size: | |
list.append(val) | |
return val | |
else: | |
#return last sample if val is weird outlier | |
std_dev = statistics.stdev(list) | |
if abs(statistics.mean(list) - val) > (std_dev * 2 + min_deviation) : | |
return list[len(list)-1] | |
else: | |
list.pop(0) | |
list.append(val) | |
return val | |
dev = usb.core.find(idVendor=0x16c0, idProduct=0x5dc) | |
def on_connect(client, userdata, flags, rc): | |
if rc == 3: | |
print("Server unavailable") | |
on_disconnect(client, userdata, rc) | |
return | |
print("connected to mqtt broker") | |
client.will_set(lwt, "dead", QOS1, retain=True) | |
client = mqtt.Client("stats_relay") | |
client.on_connect = on_connect | |
client.connect(broker, 1883,60) | |
client.loop_start() | |
def notify(): | |
requests.get(url = "https://hc-ping.com/53816436-ddba-490d-b113-8a7d9017c629") | |
n.notify("WATCHDOG=1") | |
def decrypt_co2(key, data): | |
cstate = [0x48, 0x74, 0x65, 0x6D, 0x70, 0x39, 0x39, 0x65] | |
shuffle = [2, 4, 0, 7, 1, 6, 5, 3] | |
phase1 = [0] * 8 | |
for i, o in enumerate(shuffle): | |
phase1[o] = data[i] | |
phase2 = [0] * 8 | |
for i in range(8): | |
phase2[i] = phase1[i] ^ key[i] | |
phase3 = [0] * 8 | |
for i in range(8): | |
phase3[i] = ( (phase2[i] >> 3) | (phase2[ (i-1+8)%8 ] << 5) ) & 0xff | |
ctmp = [0] * 8 | |
for i in range(8): | |
ctmp[i] = ( (cstate[i] >> 4) | (cstate[i]<<4) ) & 0xff | |
out = [0] * 8 | |
for i in range(8): | |
out[i] = (0x100 + phase3[i] - ctmp[i]) & 0xff | |
return out | |
def stat_loop(): | |
key = [0xc4, 0xc6, 0xc0, 0x92, 0x40, 0x23, 0xdc, 0x96] | |
fp = open("/dev/hidraw0", "a+b", 0) | |
HIDIOCSFEATURE_9 = 0xC0094806 | |
set_report = "\x00" + "".join(chr(e) for e in key) | |
fcntl.ioctl(fp, HIDIOCSFEATURE_9, set_report) | |
values = {} | |
data_encrypted_print = False | |
while(1): | |
data = list(ord(e) for e in fp.read(8)) | |
if data[4] == 0x0d and (sum(data[:3]) & 0xff) == data[3]: | |
decrypted = data | |
if data_encrypted_print == False: | |
print "Info: data not encrypted" | |
data_encrypted_print = True | |
else: | |
decrypted = decrypt(key, data) | |
if decrypted[4] != 0x0d or (sum(decrypted[:3]) & 0xff) != decrypted[3]: | |
print "co2 device Checksum error" | |
else: | |
op = decrypted[0] | |
val = decrypted[1] << 8 | decrypted[2] | |
values[op] = val | |
if (0x50 in values) and (0x42 in values): | |
co2 = values[0x50] | |
tmp = (values[0x42]/16.0-273.15) | |
client.publish(topic + "/co2", co2) | |
client.publish(topic + "/co2_temp", tmp) | |
print "co2: ", co2, " co2_temp : ", tmp | |
humidity, temperature = Adafruit_DHT.read_retry(Adafruit_DHT.DHT22, 18) | |
print "raw dht22 temperature: ", temperature, " humidity : ", humidity | |
humidity = app(h, humidity) | |
temperature = app(t, temperature) | |
client.publish(topic + "/humidity", humidity) | |
client.publish(topic + "/temperature", temperature) | |
ret = dev.ctrl_transfer(0xC0, 4, 0, 0, 200) | |
dB = (ret[0] + ((ret[1] & 3) * 256)) * 0.1 + 30 | |
client.publish(topic + "/raw_sound_level", dB) | |
s.append(dB) | |
if len(s) > buffer_size: | |
s.pop(0) | |
dB = statistics.mean(s) | |
client.publish(topic + "/sound_level", dB) | |
print "dht22 temperature: ", temperature, " humidity : ", humidity, " db level: ", dB | |
notify() | |
sleep(delay) | |
stat_loop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment