Last active
December 22, 2023 22:41
-
-
Save rufinus/ba4666d788efaa52d7222c9f085b2200 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import serial | |
import time | |
from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
from binascii import unhexlify | |
import sys | |
import string | |
import paho.mqtt.client as mqtt | |
import xml.etree.ElementTree as ET | |
from gurux_dlms.GXDLMSTranslator import GXDLMSTranslator | |
from bs4 import BeautifulSoup | |
from Cryptodome.Cipher import AES | |
from time import sleep | |
from gurux_dlms.TranslatorOutputType import TranslatorOutputType | |
# EVN Schlüssel als String eingeben also mit "" | |
evn_schluessel = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" | |
#MQTT Verwenden (True | False) | |
useMQTT = True | |
#MQTT Broker IP adresse Eingeben ohne Port! | |
mqttBroker = "192.168.x.x" | |
#Comport Config/Init | |
comport = "/dev/ttyUSB0" | |
#Aktulle Werte auf Console ausgeben (True | False) | |
printValue = True | |
# Hohlt daten von Serieler Schnittstelle | |
def recv(serialIncoming): | |
while True: | |
data = serialIncoming.read_all() | |
if data == '': | |
continue | |
else: | |
break | |
sleep(0.5) | |
return data | |
#MQTT Init | |
if useMQTT: | |
try: | |
client = mqtt.Client("SmartMeter") | |
client.connect(mqttBroker) | |
except: | |
print("Die Ip Adresse des Brokers ist falsch!") | |
sys.exit() | |
tr = GXDLMSTranslator(TranslatorOutputType.SIMPLE_XML) | |
serIn = serial.Serial( port=comport, | |
baudrate=2400, | |
bytesize=serial.EIGHTBITS, | |
parity=serial.PARITY_NONE, | |
stopbits=serial.STOPBITS_ONE | |
) | |
octet_string_values = {} | |
octet_string_values['0100010800FF'] = 'WirkenergieP' | |
octet_string_values['0100020800FF'] = 'WirkenergieN' | |
octet_string_values['0100010700FF'] = 'MomentanleistungP' | |
octet_string_values['0100020700FF'] = 'MomentanleistungN' | |
octet_string_values['0100200700FF'] = 'SpannungL1' | |
octet_string_values['0100340700FF'] = 'SpannungL2' | |
octet_string_values['0100480700FF'] = 'SpannungL3' | |
octet_string_values['01001F0700FF'] = 'StromL1' | |
octet_string_values['0100330700FF'] = 'StromL2' | |
octet_string_values['0100470700FF'] = 'StromL3' | |
octet_string_values['01000D0700FF'] = 'Leistungsfaktor' | |
while 1: | |
sleep(4.7) | |
#daten = ser.read(size=282).hex() | |
daten = recv(serIn) | |
if daten != '': | |
daten = daten.hex() | |
#print (daten) | |
# if (daten == '' or daten[0:8] != "68010168"): | |
if (daten == '' or daten[0:8] != "68fafa68"): | |
print ("Invalid Start Bytes... waiting") | |
continue | |
#print ("Daten: ", daten); | |
#print ("\n") | |
systemTitel = daten[22:38] | |
frameCounter = daten[44:52] | |
frame = daten[52:512] | |
#print("SystemTitle: ", systemTitel) | |
#print("FrameCounter: ", frameCounter) | |
frame = unhexlify(frame) | |
encryption_key = unhexlify(evn_schluessel) | |
init_vector = unhexlify(systemTitel + frameCounter) | |
cipher = AES.new(encryption_key, AES.MODE_GCM, nonce=init_vector) | |
apdu = cipher.decrypt(frame).hex() | |
#print("APDu: ", apdu) | |
try: | |
xml = tr.pduToXml(apdu,) | |
#print("XML: ",xml) | |
root = ET.fromstring(xml) | |
found_lines = [] | |
momentan = [] | |
items = list(root.iter()) | |
for i, child in enumerate(items): | |
if child.tag == 'OctetString' and 'Value' in child.attrib: | |
value = child.attrib['Value'] | |
if value in octet_string_values.keys(): | |
if ('Value' in items[i+1].attrib): | |
if value in ['0100010700FF', '0100020700FF']: | |
# special handling for momentanleistung | |
momentan.append(int(items[i+1].attrib['Value'], 16)) | |
found_lines.append({'key': octet_string_values[value], 'value': int(items[i+1].attrib['Value'], 16)}); | |
#print(found_lines) | |
except BaseException as err: | |
#print("APU: ", format(apdu)) | |
print("Fehler: ", format(err)) | |
continue; | |
try: | |
if len(momentan) == 2: | |
found_lines.append({'key': 'Momentanleistung', 'value': momentan[0]-momentan[1]}) | |
for element in found_lines: | |
#ConsoleText | |
if printValue: | |
print(element['key']+ ': '+ str(element['value'])) | |
#MQTT | |
if useMQTT: | |
client.publish("Smartmeter/" + element['key'], element['value']) | |
#add some lines at the end in console mode | |
if printValue: | |
print() | |
print() | |
print('----------------------------------------------') | |
except BaseException as err: | |
print("Fehler: ", format(err)) | |
continue; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment