-
-
Save kenzodeluxe/464f2b6b4f810420fabb2f0251b4e913 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3 | |
""" | |
Script to read values from Qundis Qheat 5.5 (which seems to be "identical" to Landis&Gyr T230/T330) | |
It's necessary to send 210 leading zeros before the actual request | |
Check https://www.sipatec.rs/files/uploads/T230.pdf for further protcol details (and subsequent messages) | |
Uses https://github.com/ganehag/pyMeterBus to decode M-BUS datagrams | |
""" | |
import binascii | |
import datetime | |
import json | |
import logging | |
import sys | |
from os import getenv | |
from time import sleep | |
import serial | |
sys.path.append('/home/pi/pyMeterBus') | |
import meterbus | |
from influxdb import InfluxDBClient | |
error_sleep_interval = 10 | |
loglevel = getenv('LOGLEVEL', 'INFO').upper() | |
# InfluxDB | |
send_influx = bool(getenv('SEND_INFLUX', True)) | |
dbclient = InfluxDBClient(getenv('INFLUX_HOST'), | |
getenv('INFLUX_PORT', 8086), | |
getenv('INFLUX_USER', ''), | |
getenv('INFLUX_PASS', ''), | |
getenv('INFLUX_DB', 'wmz')) | |
json_body = [] | |
receiveTime = datetime.datetime.utcnow() | |
lookup_map = { 2: 'VIFUnit.ENERGY_WH', | |
3: 'VIFUnit.VOLUME'} | |
# Serial | |
serial_port = getenv('SERIAL_PORT', '/dev/ttyUSB0') | |
ser = serial.Serial(serial_port, baudrate=2400, bytesize=8, parity="E", stopbits=1, timeout=2, xonxoff=0, rtscts=0) | |
def write_zeros(): | |
i = 210 | |
while i: | |
ser.write(b'\x00') | |
i = i - 1 | |
def send_status_request(): | |
write_zeros() | |
ser.write(b'\x68\05\x05\x68\x53\xFE\x51\x0F\x0F\xC0\x16') | |
# see link from above to understand the actual values | |
try: | |
frame = meterbus.load(meterbus.recv_frame(ser)) | |
logging.debug(json.dumps(json.loads(frame.body.to_JSON()), indent=4, sort_keys=True)) # there might be an easier way | |
return True | |
except Exception as e: | |
logging.error(f'Unable to decode datagram, please retry: {e}') | |
return False | |
def get_data(): | |
write_zeros() | |
ser.write(b'\x10\x7b\xFE\x79\x16') | |
try: | |
frame = meterbus.load(meterbus.recv_frame(ser)) | |
json_response = json.loads(frame.body.to_JSON()) # there might be an easier way | |
return json_response | |
except Exception as e: | |
logging.error(f'Unable to parse data from WMZ: {e}') | |
return False | |
def main(): | |
retries = getenv('RETRIES', 5) | |
while retries: | |
success = send_status_request() | |
if not success: | |
logging.error('WMZ did not correctly respond to status request. Retrying.') | |
sleep(error_sleep_interval) | |
retries -= 1 | |
continue | |
wmz_data = get_data() | |
if wmz_data: | |
logging.debug(wmz_data) | |
# correct datagram should show this as its first entry (Qheat 5.5) | |
if wmz_data['records'][0]['type'] != 'VIFUnit.ACTUALITY_DURATION': | |
logging.error('Incorrect datagram, retrying.') | |
sleep(error_sleep_interval) | |
continue | |
else: | |
for key in lookup_map.keys(): | |
influx_entry = { "measurement": "", "time": receiveTime, "fields": { "value": "" }} | |
try: | |
influx_entry['measurement'] = lookup_map[key].strip() | |
influx_entry['fields']['value'] = wmz_data['records'][key]['value']/1000 | |
json_body.append(influx_entry) | |
except Exception as e: | |
logging.error(f'Unable to assign values from WMZ data to InfluxDB request data: {e}') | |
sleep(error_sleep_interval) | |
continue | |
if send_influx: | |
dbclient.write_points(json_body) | |
logging.debug(json_body) | |
print('Done.') | |
return | |
else: | |
retries -= 1 | |
sleep(error_sleep_interval) | |
continue | |
logging.error(f'Did not get a meaningful response within {retries} retries. Please re-run later.') | |
if __name__ == "__main__": | |
main() |
Hi,
I know I'm late to the party, however: I tried to recreate your script in Tasmota (I just have an wireless ESP32 reader available), however I didn't succeed. I'm not sure if I'm sending the correct initial hexstring. In the comments you mention "# see link from above to understand the actual values". Unfortunately I can't find anything regarding those values... Could you elaborate a bit on that? I also noticed the hexstring seems to miss an "x" ("\x68\05\x05...) which also confused me a bit. Would be great if someone is still looking into this. Any help is appreciated :-)
Thanks a lot,
regards,
Oli
Unfortunately I can't find anything regarding those values...
I agree - looks like the PDF was replaced. I found the original one I was referencing here.
I also noticed the hexstring seems to miss an "x" ("\x68\05\x05...) which also confused me a bit.
You're right - I just changed this locally and it doesn't seem to make a difference. Likely, the meter doesn't care about the full string that much, however adding the x
didn't break communication for me.
Does the script itself work with your meter? Are you using it with a Qundis Qheat 5.5
as well?
I haven't changed the script in years and it still runs fine on my machine
:)
Hi,
thanks for the quick response. The referenced document helped a lot to understand what I'm doing, but it doesn't look like I'm doing anything wrong. Unfortunately I can't test your script directly, because I only have that ESP32 reader (can't run python), so I tried to rebuild it in Tasmota. I'll get another (USB) reader end of the week, then I can try with a raspberry pi/python. My meter is a T330, but I understand it is similar to the T230.
For now, I think I didn't even manage to wake up the meter... I'll try with different numbers of zeroes now. Let's see if that helps (otherwise I'll verify that your script works, once the reader arrives)
Thanks,
Oli
Hi @kenzodeluxe,
I tried to download Landis+Gyr UltraAssist, but unfortunately an account is needed which I don't have.
Yes, the reader was working fine with my powermeter.