Skip to content

Instantly share code, notes, and snippets.

@hotovson
Forked from CallMeFoxie/jkbms.conf
Created July 31, 2024 11:03
Show Gist options
  • Save hotovson/ad38e15e5a541b1e3f3c49d7fd6ff586 to your computer and use it in GitHub Desktop.
Save hotovson/ad38e15e5a541b1e3f3c49d7fd6ff586 to your computer and use it in GitHub Desktop.
Solar stuff monitoring, WIP stuff
{
"port": "/dev/ttyS2",
"baudrate": 115200,
"timeout": 5,
"batteries": [
{
"battery_id": 1,
"cells": 16,
"gpio_de": 74,
"gpio_re": 75,
"echo": true
},
{
"battery_id": 2,
"cells": 16,
"gpio_de": 68,
"gpio_re": 70,
"echo": true
},
{
"battery_id": 3,
"cells": 16,
"gpio_de": 73,
"gpio_re": 80,
"echo": true
},
{
"battery_id": 4,
"cells": 16,
"gpio_de": 230,
"gpio_re": 69,
"echo": true
}
]
}
import paho.mqtt.client as mqtt
import json
import os
import subprocess
import time
import struct
import serial
from periphery import GPIO
#config_path = '/etc/jkbms/config.json'
config_path = '/opt/jkbms/config.json'
with open(config_path, "r") as configfile:
config = json.loads(configfile.read())
port = config['port']
baudrate = int(config['baudrate'])
base_path = 'homeassistant'
global_availability_topic = 'availability/jkbms'
def create_discovery(client, hatype, deviceid, name, device_class, unit_of_measurement, availability_topic, device_name, state_class):
xpath = f"{base_path}/{hatype}/battery{deviceid}_{name}"
client.publish(f"{xpath}/config", json.dumps({
'name': name,
'state_topic': xpath,
'device_class': device_class,
'unit_of_measurement': unit_of_measurement,
'state_class': state_class,
'unique_id': f"solar_jkbms_{name}",
'availability': [{'topic': availability_topic}, {'topic': global_availability_topic}],
'device': {'name': device_name, 'identifiers': [f"solar_jkbms_{deviceid}"]}
}))
print(f"publishing discovery to {xpath}")
def publish_discovery(client):
for battery in config['batteries']:
deviceid = battery['battery_id']
cells = battery['cells']
availability_topic = f'{base_path}/battery{deviceid}/available'
device_name = f'JK-BMS Battery {deviceid}'
sensor = 'sensor'
binary_sensor = 'binary_sensor'
voltage = 'voltage'
temperature = 'temperature'
power = 'power'
for i in range(cells):
create_discovery(client, sensor, deviceid, f'cell_{i}_volts', voltage, 'mV', availability_topic, device_name, None)
create_discovery(client, sensor, deviceid, f'cell_{i}_volts_delta', voltage, 'mV', availability_topic, device_name, None)
for i in {'mos', 'battery', 'battery_box'}:
create_discovery(client, sensor, deviceid, f'temperature_{i}', temperature, 'C', availability_topic, device_name, None)
for i in {'out', 'in'}:
create_discovery(client, sensor, deviceid, f'power_{i}', power, 'W', availability_topic, device_name, 'measurement')
create_discovery(client, sensor, deviceid, 'power', power, 'W', availability_topic, device_name, 'measurement')
ampcur = {'voltage': 'V', 'current': 'A'}
for i in ampcur.keys():
create_discovery(client, sensor, deviceid, f'battery_{i}', i, ampcur[i], availability_topic, device_name, 'measurement')
create_discovery(client, sensor, deviceid, 'soc', 'battery', '%', availability_topic, device_name, 'measurement')
def publish_value(client, hatype, deviceid, name, value):
xpath = f"{base_path}/{hatype}/battery{deviceid}_{name}"
client.publish(xpath, value)
print(f"publishing value {value} to {xpath}")
def on_connect(client, userdata, flags, reason_code, properties):
client.publish(global_availability_topic, "online", retain=True)
client.subscribe("homeassistant/status")
publish_discovery(client)
print(f"Connected with result code {reason_code}")
def on_message(client, userdata, msg):
if msg.topic == "homeassistant/status":
publish_discovery(client)
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.username_pw_set("voltronic", "voltronicpwd")
mqttc.will_set(global_availability_topic, "offline", retain=True)
ser = serial.Serial(port, baudrate, timeout=int(config['timeout']))
mqttc.connect("ha.smeg", 1883, 60)
def set_available(client, availability_topic, battery):
client.publish(availability_topic, "online", retain=True)
def set_unavailable(client, availability_topic):
client.publish(availability_topic, "offline", retain=True)
def shift_ushort(bytes_in):
(ushort, ) = struct.unpack('>H', bytes_in[0:2])
return (ushort, bytes_in[2:])
def shift_uchar(bytes_in):
(uchar, ) = struct.unpack('B', bytes_in[0:1])
return (uchar, bytes_in[1:])
def shift_uint(bytes_in):
(uint, ) = struct.unpack('>I', bytes_in[0:4])
return (uint, bytes_in[4:])
def fb_volts_divide_100(val):
return float(val)/100
def fb_volts_divide_1000(val):
return float(val)/1000
def fb_convert_temperature(val):
if val > 100:
val = (val - 100) * -1
return val
def fb_convert_current(val):
if val > 32768:
return float(val - 32768) / 100
return -1 * (float(val) / 100)
mapping = {
0x80: ('mos_temperature', 2, fb_convert_temperature),
0x81: ('battery_box_temperature', 2, fb_convert_temperature),
0x82: ('battery_temperature', 2, fb_convert_temperature),
0x83: ('total_battery_voltage', 2, fb_volts_divide_100),
0x84: ('total_current', 2, fb_convert_current),
0x85: ('soc', 1, None),
0x86: ('temperature_sensor_count', 1, None),
0x87: ('battery_cycles', 2, None),
0x89: ('total_battery_cycle_capacity', 4, None),
0x8a: ('total_number_battery_strings', 2, None),
0x8b: ('battery_warnings', 2, None), # TODO parse flags
0x8c: ('battery_status', 2, None), # TODO parse flags
0x8e: ('total_overvoltage_voltage', 2, fb_volts_divide_100),
0x8f: ('total_undervoltage_voltage', 2, fb_volts_divide_100),
0x90: ('single_overvoltage_voltage', 2, fb_volts_divide_1000),
0x91: ('single_overvoltage_recovery_voltage', 2, fb_volts_divide_1000),
0x92: ('single_overvoltage_recovery_delay', 2, None),
0x93: ('single_undervoltage_voltage', 2, fb_volts_divide_1000),
0x94: ('single_undervoltage_recovery_voltage', 2, fb_volts_divide_1000),
0x95: ('single_undervoltage_recovery_delay', 2, None),
0x96: ('cell_pressure_difference', 2, fb_volts_divide_1000),
0x97: ('discharge_overcurrent_current', 2, None),
0x98: ('discharge_overcurrent_delay', 2, None),
0x99: ('charging_overcurrent_current', 2, None),
0x9a: ('charging_overcurrent_delay', 2, None),
0x9b: ('balancing_start_voltage', 2, fb_volts_divide_1000),
0x9c: ('balancing_voltage_difference_min', 2, fb_volts_divide_1000),
0x9d: ('balancing_enabled', 1, None),
0x9e: ('mos_overtemperature_celsius', 2, None),
0x9f: ('mos_overtemperature_recovery', 2, None),
0xa0: ('battery_box_overtemperature_celsius', 2, None),
0xa1: ('battery_box_overtemperature_recovery', 2, None),
0xa2: ('battery_temperature_difference', 2, None),
0xa3: ('battery_charging_high_temperature_limit', 2, None),
0xa4: ('battery_discharge_high_temperature_limit', 2, None),
0xa5: ('battery_charging_low_temperature_limit', 2, None),
0xa6: ('battery_charging_low_temperature_recovery', 2, None),
0xa7: ('battery_discharge_low_temperature_limit', 2, None),
0xa8: ('battery_discharge_low_temperature_recovery', 2, None),
0xa9: ('battery_string_setting', 1, None),
0xaa: ('battery_capacity_setting', 4, None),
0xab: ('charging_enabled', 1, None),
0xac: ('discharging_enabled', 1, None),
0xad: ('current_calibration', 2, None),
0xae: ('protection_board_address', 1, None),
0xaf: ('battery_type', 1, None),
0xb0: ('sleeping_time', 2, None),
0xb1: ('low_alarm_value', 1, None),
0xb2: ('modify_password', 10, None),
0xb3: ('dedicated_charger_switch', 1, None),
0xb4: ('device_id', 8, None),
0xb5: ('date_of_manufacture', 4, None),
0xb6: ('working_hours', 4, None),
0xb7: ('software_version_number', 15, None),
0xb8: ('start_calibration', 1, None),
0xb9: ('actual_battery_capacity', 4, None),
0xba: ('manufacturer_name', 24, None),
0xbb: ('restart_system', 1, None),
0xbc: ('reset', 1, None),
0xbd: ('remote_upgrade_logo', 1, None),
0xbe: ('battery_disable_gps_low_voltage', 2, fb_volts_divide_100),
0xbf: ('battery_disable_gps_low_voltage_recovery', 2, fb_volts_divide_100),
0xc0: ('protocol_version_number', 1, None)
}
mqttc.loop_start()
def flush(ser):
ser.read(ser.in_waiting)
ser.close()
ser.open()
def load_battery_info(battery):
bid = battery['battery_id']
device_name = f"JK-BMS battery {bid}"
availability_topic = f'{base_path}/battery{bid}/available'
request_all = b'\x4e\x57\x00\x13\x00\x00\x00\x00\x06\x03\x00\x00\x00\x00\x00\x00\x68\x00\x00\x01\x29'
ser.write(request_all)
time.sleep(1)
print(ser.in_waiting)
if battery['echo']:
if ser.in_waiting < len(request_all):
print("did not get back echo response...")
set_unavailable(mqttc, availability_topic)
return False
request_all_echo = ser.read(len(request_all))
print(f"orig: {request_all}")
print(f"repl: {request_all_echo}")
if ser.in_waiting < 2:
print(f"did not get enough bytes, wanted 2 got {ser.in_waiting}... delaying until next")
set_unavailable(mqttc, availability_topic)
return False
begin = ser.read(4)
(nn, ww) = struct.unpack('cc', begin[0:2])
if nn != b'N' or ww != b'W':
print(f"got wrong bytes at the beginning ... delaying ... {nn} {ww}")
set_unavailable(mqttc, availability_topic)
return False
# 2B length, 4B terminal id 0, 1B command word, 1B frame sourec, 1B transmission type, < data begin >, 4B record number, 1B end flag 0x68, 4b checksum
read_fmt = '>H'
(datalen,) = struct.unpack(read_fmt, begin[2:4])
print(f"data length: {datalen}")
datalen = datalen - 2 # already read 2 bytes with the size
read_rest = ser.read(datalen)
if len(read_rest) < 10:
print(f"too short reply: {datalen}")
set_unavailable(mqttc, availability_topic)
return False
print(f"rest: {read_rest}")
read_rest = read_rest[7:]
print(f"remaining: {read_rest}")
battery_info = {}
cells = {}
while len(read_rest) > 9:
idcode = read_rest[0]
read_rest = read_rest[1:]
match idcode:
case 0x79: # batteries
length = read_rest[0]
cell_voltages = read_rest[1:length + 1]
print(f"cells: {cell_voltages}, length: {length}")
read_rest = read_rest[length + 1:]
for i in range(int(len(cell_voltages) / 3)):
(cells[i],) = struct.unpack('>xH', cell_voltages[i * 3:i * 3 + 3])
print(f"cell {i} = {cells[i]}")
case _:
if idcode not in mapping.keys():
print(f"unknown type: {idcode}")
continue
(valuename, size, callback) = mapping[idcode]
if size == 1:
(battery_info[valuename], read_rest) = shift_uchar(read_rest)
elif size == 2:
(battery_info[valuename], read_rest) = shift_ushort(read_rest)
elif size == 4:
(battery_info[valuename], read_rest) = shift_uint(read_rest)
else:
battery_info[valuename] = read_rest[0:size]
read_rest = read_rest[size:]
if callback is not None:
battery_info[valuename] = callback(battery_info[valuename])
print(f"value {valuename} = {battery_info[valuename]}")
set_available(mqttc, availability_topic, battery)
allvolts = 0
for i in cells:
allvolts = allvolts + cells[i]
average = round(allvolts / len(cells))
print(f"average: {average}")
for i in cells:
publish_value(mqttc, 'sensor', bid, f'cell_{i}_volts', cells[i])
publish_value(mqttc, 'sensor', bid, f'cell_{i}_volts_delta', round(cells[i] - average, 1))
for i in {'battery', 'battery_box', 'mos'}:
temperature = battery_info[f'{i}_temperature']
publish_value(mqttc, 'sensor', bid, f'temperature_{i}', temperature)
# power
current = battery_info['total_current']
if current > 0:
charging_current = current
discharging_current = 0
else:
charging_current = 0
discharging_current = current * -1
power_in = round(charging_current * battery_info['total_battery_voltage'])
power_out = round(discharging_current * battery_info['total_battery_voltage'])
power = round(current * battery_info['total_battery_voltage'])
publish_value(mqttc, 'sensor', bid, 'battery_voltage', battery_info['total_battery_voltage'])
publish_value(mqttc, 'sensor', bid, 'battery_current', current)
publish_value(mqttc, 'sensor', bid, 'power_in', power_in)
publish_value(mqttc, 'sensor', bid, 'power_out', power_out)
publish_value(mqttc, 'sensor', bid, 'power', power)
# capacity nominal, remain
publish_value(mqttc, 'sensor', bid, 'soc', battery_info['soc'])
def main():
gpios = {}
for battery in config['batteries']:
gpio_re = GPIO("/dev/gpiochip0", battery['gpio_re'], 'out')
gpio_de = GPIO("/dev/gpiochip0", battery['gpio_de'], 'out')
gpio_de.write(True) # enable TX always
gpio_re.write(True)
gpios[battery['battery_id']] = {'de': gpio_de, 're': gpio_re}
while True:
for battery in config['batteries']:
print(f"Loading battery {battery['battery_id']}")
gpio = gpios[battery['battery_id']]
gpio['re'].write(False)
time.sleep(0.2)
retval = load_battery_info(battery)
if not retval:
data = ser.read(ser.in_waiting)
print(f"data buffer: {data}")
flush(ser)
time.sleep(0.1)
gpio['re'].write(True)
time.sleep(0.5)
time.sleep(10)
main()
[Unit]
Description=JK-BMS Serial Service
After=network.target
[Service]
ExecStart=/usr/bin/python3 -u /opt/jkbms/jkbms-serial.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
PORT=/dev/ttyS3
ID=1
import serial
import struct
import sys
import json
import time
import datetime
import os
import paho.mqtt.client as mqtt
fubar=b'\xAA\x55\x00\x00\x00\x55'
deviceid=os.environ.get('ID', 1)
topic_base = f"solar/makeskyblue{deviceid}"
discovery_base_sensor = f"homeassistant/sensor/makeskyblue{deviceid}_"
discovery_base_binary_sensor = f"homeassistant/binary_sensor/makeskyblue{deviceid}_"
device_name = f"MakeSkyBlue MPPT charger {deviceid}"
availability_topic = f"{topic_base}/available"
port=os.environ.get('PORT')
attempts_fail = 0
def set_available(client):
global attempts_fail
attempts_fail = 0
client.publish(availability_topic, "online", retain=True)
def set_unavailable(client):
global attempts_fail
if attempts_fail > 5:
client.publish(availability_topic, "offline", retain=True)
else:
print("adding one attempt fail...")
attempts_fail = attempts_fail + 1
def publish_discovery(client):
print("publishing discovery...")
client.publish(f"{discovery_base_sensor}batt_volts/config", json.dumps({
'name': "battery_volts",
'state_topic': f"{topic_base}/battery_volts",
'device_class': 'voltage',
'unit_of_measurement': 'V',
'unique_id': f"solar_makeskyblue_{deviceid}_batt_volts",
'availability_topic': availability_topic,
'device': {'name': device_name, 'identifiers': [f"solar_makeskyblue_{deviceid}"]}
}))
client.publish(f"{discovery_base_sensor}batt_amps/config", json.dumps({
'name': "battery_amps",
'state_topic': f"{topic_base}/battery_amps",
'device_class': 'current',
'unit_of_measurement': 'A',
'unique_id': f"solar_makeskyblue_{deviceid}_batt_amps",
'availability_topic': availability_topic,
'device': {'name': device_name, 'identifiers': [f"solar_makeskyblue_{deviceid}"]}
}))
client.publish(f"{discovery_base_sensor}solar_volts/config", json.dumps({
'name': "solar_volts",
'state_topic': f"{topic_base}/solar_volts",
'device_class': 'voltage',
'unit_of_measurement': 'V',
'unique_id': f"solar_makeskyblue_{deviceid}_solar_volts",
'availability_topic': availability_topic,
'device': {'name': device_name, 'identifiers': [f"solar_makeskyblue_{deviceid}"]}
}))
client.publish(f"{discovery_base_sensor}solar_watts/config", json.dumps({
'name': "solar_watts",
'state_topic': f"{topic_base}/solar_watts",
'device_class': 'power',
'unit_of_measurement': 'W',
'unique_id': f"solar_makeskyblue_{deviceid}_solar_watts",
'availability_topic': availability_topic,
'device': {'name': device_name, 'identifiers': [f"solar_makeskyblue_{deviceid}"]}
}))
client.publish(f"{discovery_base_sensor}temp_c/config", json.dumps({
'name': "temperature_celsius",
'state_topic': f"{topic_base}/temperature_celsius",
'device_class': 'temperature',
'unit_of_measurement': 'C',
'unique_id': f"solar_makeskyblue_{deviceid}_temperature_celsius",
'availability_topic': availability_topic,
'device': {'name': device_name, 'identifiers': [f"solar_makeskyblue_{deviceid}"]}
}))
client.publish(f"{discovery_base_sensor}cumm_kwh/config", json.dumps({
'name': "cummulative_kwh",
'state_topic': f"{topic_base}/cummulative_kwh",
'device_class': 'energy',
'unit_of_measurement': 'kWh',
'state_class': 'total',
'unique_id': f"solar_makeskyblue_{deviceid}_cummulative_kwh",
'availability_topic': availability_topic,
'device': {'name': device_name, 'identifiers': [f"solar_makeskyblue_{deviceid}"]}
}))
client.publish(f"{discovery_base_binary_sensor}mppt_active/config", json.dumps({
'name': "mppt_active",
'state_topic': f"{topic_base}/mppt_active",
'device_class': 'running',
'unique_id': f"solar_makeskyblue_{deviceid}_mppt_active",
'availability_topic': availability_topic,
'device': {'name': device_name, 'identifiers': [f"solar_makeskyblue_{deviceid}"]}
}))
client.publish(f"{discovery_base_binary_sensor}batt_undervolt/config", json.dumps({
'name': "battery_undervoltage",
'state_topic': f"{topic_base}/battery_undervoltage",
'device_class': 'problem',
'unique_id': f"solar_makeskyblue_{deviceid}_battery_undervoltage",
'availability_topic': availability_topic,
'device': {'name': device_name, 'identifiers': [f"solar_makeskyblue_{deviceid}"]}
}))
client.publish(f"{discovery_base_binary_sensor}batt_overvoltage/config", json.dumps({
'name': "battery_overvoltage",
'state_topic': f"{topic_base}/battery_overvoltage",
'device_class': 'problem',
'unique_id': f"solar_makeskyblue_{deviceid}_battery_overvoltage",
'availability_topic': availability_topic,
'device': {'name': device_name, 'identifiers': [f"solar_makeskyblue_{deviceid}"]}
}))
ser = serial.Serial(port, 9600, timeout=5)
ser.close()
ser.open()
def on_connect(client, userdata, flags, reason_code, properties):
print(f"Connected with result code {reason_code}")
publish_discovery(client)
client.subscribe("homeassistant/status")
client.publish(availability_topic, "online", retain=True)
def on_message(client, userdata, msg):
if msg.topic == "homeassistant/status":
publish_discovery(client)
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.username_pw_set("voltronic", "voltronicpwd")
mqttc.will_set(availability_topic, "offline", retain=True)
mqttc.connect("ha.smeg", 1883, 60)
mqttc.loop_start()
while True:
ser.write(fubar)
try:
reply = ser.read(20)
except Exception as e:
print("caught exception:")
print(e)
set_unavailable(mqttc)
ser.close()
ser.open()
time.sleep(30)
continue
if len(reply) != 20:
print("not enough bytes read back! only {}".format(len(reply)))
set_unavailable(mqttc)
ser.close()
ser.open()
time.sleep(30)
continue
checksum_check = 0
fmt='cchhhhhhhBBcB'
for i in range(1, 19):
checksum_check = checksum_check + reply[i]
checksum_check = checksum_check & 0xFF
(aa, bb, batt_volts, batt_amps, solar_volts, solar_watts, temperature_c, cummulative_kwh, unknown, modeflags, errorflags, zeroes, checksum) = struct.unpack(fmt, reply[:struct.calcsize(fmt)])
if aa != b'\xAA' or bb != b'\xBB':
print(f"wrong AA/BB: got {aa} and {bb}")
set_unavailable(mqttc)
ser.close()
ser.open()
time.sleep(30)
continue
if checksum_check != checksum:
print(f"checksum doesn't match! got {checksum} but calculated {checksum_check}")
set_unavailable(mqttc)
ser.close()
ser.open()
time.sleep(30)
continue
mppt_active = "ON" if modeflags & 0x4 != 0 else "OFF"
battery_undervoltage = "ON" if errorflags & 0x1 != 0 else "OFF"
battery_overvoltage = "ON" if errorflags & 0x2 != 0 else "OFF"
print_reply = {'battery_volts': batt_volts, 'battery_amps': batt_amps, 'solar_volts': solar_volts, 'solar_watts': solar_watts, 'temperature_celsius': temperature_c, 'cummulative_kwh': cummulative_kwh,
'flags': {
'mppt_active': mppt_active,
'battery_undervoltage': battery_undervoltage,
'battery_overvoltage': battery_overvoltage,
},
}
print(datetime.datetime.now())
print(print_reply)
mqttc.publish(f"{topic_base}/battery_volts", batt_volts / 10)
mqttc.publish(f"{topic_base}/battery_amps", batt_amps / 10)
mqttc.publish(f"{topic_base}/solar_volts", solar_volts / 10)
mqttc.publish(f"{topic_base}/solar_watts", solar_watts)
mqttc.publish(f"{topic_base}/temperature_celsius", temperature_c / 10)
mqttc.publish(f"{topic_base}/cummulative_kwh", cummulative_kwh)
mqttc.publish(f"{topic_base}/mppt_active", mppt_active)
mqttc.publish(f"{topic_base}/battery_undervoltage", battery_undervoltage)
mqttc.publish(f"{topic_base}/battery_overvoltage", battery_overvoltage)
set_available(mqttc)
time.sleep(10)
[Unit]
Description=MakeSkyBlue V119 scraper service
After=network.target
[Service]
EnvironmentFile=-/etc/makeskyblue/%i.conf
ExecStart=/usr/bin/python3 -u /opt/makeskyblue/scraper.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
{
"port": "/dev/ttyUSB0",
"baudrate": 2400,
"timeout": 5
}
import paho.mqtt.client as mqtt
import time
import struct
import json
import serial
from crccheck.crc import CrcXmodem
config_path = '/opt/voltronic/config.json'
with open(config_path, "r") as configfile:
config = json.loads(configfile.read())
port = config['port']
baudrate = int(config['baudrate'])
base_path = 'homeassistant'
global_availability_topic = 'availability/voltronic'
deviceid = 1
fails = 0
def create_discovery(client, hatype, name, device_class, unit_of_measurement, availability_topic, device_name, state_class):
xpath = f"{base_path}/{hatype}/voltronic{deviceid}_{name}"
client.publish(f"{xpath}/config", json.dumps({
'name': name,
'state_topic': xpath,
'device_class': device_class,
'unit_of_measurement': unit_of_measurement,
'state_class': state_class,
'unique_id': f"solar_voltronic_{name}",
'availability': [{'topic': availability_topic}, {'topic': global_availability_topic}],
'device': {'name': device_name, 'identifiers': [f"solar_voltronic_{deviceid}"]}
}))
print(f"publishing discovery to {xpath}")
def publish_discovery(client):
availability_topic = f'{base_path}/voltronic{deviceid}/available'
device_name = f'Voltronic Inverter {deviceid}'
sensor = 'sensor'
binary_sensor = 'binary_sensor'
voltage = {'class': 'voltage', 'units': 'V'}
temperature = {'class': 'temperature', 'units': '°C'}
power = {'class': 'power', 'units': 'W'}
frequency = {'class': 'frequency', 'units': 'Hz'}
apparent_power = {'class': 'apparent_power', 'units': 'VA'}
percent = {'class': None, 'units': '%'}
current = {'class': 'current', 'units': 'A'}
discovery = {
'ac_input_voltage': voltage,
'ac_input_frequency': frequency,
'ac_output_voltage': voltage,
'ac_output_frequency': frequency,
'ac_output_apparent_power': apparent_power,
'ac_output_active_power': power,
'ac_output_load': percent,
'bus_voltage': voltage,
'battery_voltage': voltage,
'battery_charging_current': current,
'heatsink_temperature': temperature,
'pv_current_for_battery': current,
'pv_input_voltage': voltage,
'battery_discharge_current': current,
'pv_input_power': power,
'battery_charge_power': power, # temp
'battery_discharge_power': power, # temp
}
for item in discovery.keys():
create_discovery(client, sensor, item, discovery[item]['class'], discovery[item]['units'], availability_topic, device_name, None)
for item in qpiws_data.keys():
if item.startswith("rsv"):
continue
create_discovery(client, binary_sensor, item, 'problem', None, availability_topic, device_name, None)
def publish_value(client, hatype, name, value):
xpath = f"{base_path}/{hatype}/voltronic{deviceid}_{name}"
client.publish(xpath, value)
print(f"publishing value {value} to {xpath}")
def on_connect(client, userdata, flags, reason_code, properties):
client.publish(global_availability_topic, "online", retain=True)
client.subscribe("homeassistant/status")
publish_discovery(client)
print(f"Connected with result code {reason_code}")
def on_message(client, userdata, msg):
if msg.topic == "homeassistant/status":
publish_discovery(client)
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.username_pw_set("voltronic", "voltronicpwd")
mqttc.will_set(global_availability_topic, "offline", retain=True)
ser = serial.Serial(port, baudrate, timeout=int(config['timeout']))
mqttc.connect("ha.smeg", 1883, 60)
def set_available(client, availability_topic):
fails = 0
client.publish(availability_topic, "online", retain=True) # TODO
def set_unavailable(client, availability_topic):
fails = fails + 1
if fails > 5:
client.publish(availability_topic, "offline", retain=True)
mqttc.loop_start()
def flush(ser):
ser.write(b'\r')
time.sleep(5)
ser.read(ser.in_waiting)
ser.close()
ser.open()
def shiftby(data, positions):
retval = data[0:positions]
newdata = data[positions + 1:] # strip also space afterwards
return (retval, newdata)
def send_and_read(ser, msg):
crcinst = CrcXmodem()
crcinst.process(msg)
ser.write(msg)
ser.write(crcinst.finalbytes())
ser.write(b'\r')
time.sleep(1)
print(ser.in_waiting)
data = ser.read_until(b'\r')
print(f"reply: {data}")
return data
qpiws_data = { # dict name -> critical
'pv_loss': False,
'inverter_fault': True,
'bus_over': True,
'bus_under': True,
'bus_soft_fail': True,
'line_fail': False,
'opvshort': True,
'inverter_voltage_too_low': True,
'inverter_voltage_too_high': True,
'over_temperature': True,
'fan_locked': True,
'battery_voltage_high': True,
'battery_low_alarm': False,
'rsv1': False,
'battery_under_shutdown': False,
'battery_derating': False,
'overload': True,
'eeprom_fault': False,
'inverter_over_current': True,
'inverter_soft_fail': True,
'self_test_fail': True,
'op_dc_voltage_over': True,
'bat_open': False,
'current_sensor_fail': True,
'rsv2': True,
'rsv3': True,
'rsv4': True,
'rsv5': True,
'rsv6': True,
'rsv7': True,
'rsv8': True,
'battery_weak': True,
'rsv9': True,
'rsv10': True,
'rsv11': True,
'battery_equalization': False
}
def parse_qpiws(data):
values = {}
for item in qpiws_data.keys():
data = data[1:] #initially strips (
if item.startswith("rsv"):
continue
values[item] = data[0:1] == b'1'
return values
def load_voltronic_info():
availability_topic = f'{base_path}/voltronic{deviceid}/available'
# qpi = send_and_read(ser, b'QPI')
# qpiri = send_and_read(ser, b'QPIRI')
# qmod = send_and_read(ser, b'QMOD')
qpiws = send_and_read(ser, b'QPIWS')
if len(qpiws) != 40:
print("too short QPIWS reply")
return False
# qflags = send_and_read(ser, b'QFLAG')
qpiws_parsed = parse_qpiws(qpiws)
data = send_and_read(ser, b'QPIGS')
if len(data) != 110:
print("too short reply")
return False
set_available(mqttc, availability_topic)
# if data[0] != b'40':
# print(f"bad header - {data[0]}")
# return False
data = data[:len(data)-3].decode('ascii')
values = {}
data = data[1:]
(values['ac_input_voltage'], data) = shiftby(data, 5)
(values['ac_input_frequency'], data) = shiftby(data, 4)
(values['ac_output_voltage'], data) = shiftby(data, 5)
(values['ac_output_frequency'], data) = shiftby(data, 4)
(values['ac_output_apparent_power'], data) = shiftby(data, 4)
(values['ac_output_active_power'], data) = shiftby(data, 4)
(values['ac_output_load'], data) = shiftby(data, 3)
(values['bus_voltage'], data) = shiftby(data, 3)
(values['battery_voltage'], data) = shiftby(data, 5)
(values['battery_charging_current'], data) = shiftby(data, 3)
(battery_percent, data) = shiftby(data, 3) # ignore, guessed
(values['heatsink_temperature'], data) = shiftby(data, 4)
(values['pv_input_for_battery'], data) = shiftby(data, 4)
(values['pv_input_voltage'], data) = shiftby(data, 5)
(battery_volt_from_scc, data) = shiftby(data, 5) # ignore
(values['battery_discharge_current'], data) = shiftby(data, 5)
(device_status, data) = shiftby(data, 8) # TODO
(rsv1, data) = shiftby(data, 2) # ignore
(rsv2, data) = shiftby(data, 2) # ignore
(values['pv_input_power'], data) = shiftby(data, 5)
(device_status2, data) = shiftby(data, 3) # TODO
for item in values.keys():
publish_value(mqttc, 'sensor', item, values[item])
for item in qpiws_parsed.keys():
publish_value(mqttc, 'binary_sensor', item, "ON" if qpiws_parsed[item] else "OFF")
publish_value(mqttc, 'sensor', 'battery_charge_power', float(values['battery_voltage']) * float(values['battery_charging_current']))
publish_value(mqttc, 'sensor', 'battery_discharge_power', float(values['battery_voltage']) * float(values['battery_discharge_current']))
# TODO parse flags
return True
#publish_value(mqttc, 'sensor', bid, 'battery_voltage', battery_info['total_battery_voltage'])
#publish_value(mqttc, 'sensor', bid, 'battery_current', current)
#publish_value(mqttc, 'sensor', bid, 'power_in', power_in)
#publish_value(mqttc, 'sensor', bid, 'power_out', power_out)
#publish_value(mqttc, 'sensor', bid, 'power', power)
def main():
while True:
retval = load_voltronic_info()
if not retval:
data = ser.read(ser.in_waiting)
print(f"data buffer: {data}")
flush(ser)
time.sleep(10)
main()
[Unit]
Description=Voltronic Serial Service
After=network.target
[Service]
ExecStart=/usr/bin/python3 -u /opt/voltronic/voltronic.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment