Last active
January 15, 2025 17:53
-
-
Save georg90/c6822fa28261059e4c8361bdcff13f32 to your computer and use it in GitHub Desktop.
Example of micropython code to read victron SmartShunt 500A
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Pico_Victron_BT.py | |
# | |
# © Swâmi Petaramesh 2024 | |
# | |
# This program is free software: you can redistribute it and/or modify it under the terms | |
# of the GNU General Public License as published by the Free Software Foundation, | |
# either version 3 of the License, or (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; | |
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. | |
# See the GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License along with this program. | |
# If not, see <https://www.gnu.org/licenses/>. 3 | |
# | |
# This program is example code for receiving and decoding some Victron® devices information | |
# and statues over Bluetooth-LE on a Raspberry Pi® Pico W running : | |
# | |
# Works on : sys.version:3.4.0; MicroPython v1.22.1 on 2024-01-05 | |
# RPI_PICO_W-20240105-v1.22.1.uf2 | |
# Also confirmed on MicroPython v1.23.0-preview.91.g5a68e82d1 on 2024-02-07; Raspberry Pi Pico W with RP2040 (@georg90) | |
# | |
# You need to put your own devices MAC addresses and encryption keys in the code below. | |
# Lbraries ============================================================= | |
# | |
# Micropython built-in libraries | |
import time, struct, bluetooth, cryptolib, sys | |
from machine import Pin | |
from cryptolib import aes | |
# Libraries to be installed on the Raspberry Pi Pico | |
# With mpremote tool (from Linux PC terminal) : | |
# mpremote mip install github:peterhinch/micropython-async/v3/primitives | |
# mpremote mip install github:peterhinch/micropython-async/v3/threadsafe | |
import asyncio | |
from threadsafe import ThreadSafeQueue | |
import binascii | |
# Initialisations ========================================================== | |
_DEBUG = const(3) # Extra debug console output | |
_UNKN = const(-9999) # N/A value | |
pin_led_bt = Pin("WL_GPIO0", Pin.OUT, value=1) # Integrated LED, ON | |
# Victron devices parameters and values ----------------------------------- | |
# We need to put our Victron devices MAC addresses and encryption keys here | |
# The rest should be left _UNKN | |
# Get the values from Victron Connect app (Product details) and add '\x' every 2nd character to make this string byte | |
# bmv712 works for SmartShunt 500A to read values like Volt, Temp, SoC, Temp | |
victron = { | |
"bmv712" : { | |
"mac": b'\xc3\xb1\x20\xe2\x7b\x19', | |
"key": b'\x2d\x0a\x4d\x87\x1a\x34\xfe\x27\x7d\xb3\x4c\x7f\xb0\x24\x63\xc0', | |
"volt": _UNKN, | |
"amp": _UNKN, | |
"soc": _UNKN, | |
"temp": _UNKN, | |
"upd": _UNKN | |
} | |
} | |
maclist= [victron[d]["mac"] for d in victron] | |
# System parameters ------------------------------------------------ | |
_TMR_MAIN_LOOP = const(500) # Main loop duration | |
_BT_SCAN_DURATION_MS = const(0) # BT Scan default duration, 0 = forever | |
_BT_SCAN_INTERVAL_US = const(2000000) # Scan every 2 sec (uS) | |
_BT_SCAN_WINDOW_US = const(100000) # Scan for 100 mS (uS) | |
_BT_MIN_RSSI = const(-85) # Minimum RSSI | |
ble = bluetooth.BLE() | |
# Fixed values | |
_BT_EXPIRE = const(180 * 1000) # BT values expired after stalled (mS) | |
# This is our main status that we use everywhere | |
STATUS = ( | |
"CKSUM", # 0 | |
"?????", # 1 | |
"Unknown", # 2 | |
"ERROR", # 3 | |
"FAIL!", # 4 | |
"DELTA", # 5 | |
"Init", # 6 | |
"Off", # 7 | |
"Stop", # 8 | |
"Start", # 9 | |
"HI-V", # 10 | |
"HI-v", # 11 | |
"LOW-V", # 12 | |
"LOW-v", # 13 | |
"LOW-%", # 14 | |
"HI-T°", # 15 | |
"LO-T°", # 16 | |
"Drain", # 17 | |
"Timer", # 18 | |
"Timr", # 19 | |
"Stby", # 20 | |
"LOW", # 21 | |
"OK", # 22 | |
"High", # 23 | |
"Full", # 24 | |
"Chrg", # 25 | |
"Chg", # 26 | |
"Engine", # 27 | |
"Bulk", # 28 | |
"Absorption", # 29 | |
"Float", # 30 | |
"Storage", # 31 | |
"Remote", # 32 | |
"Lock", # 33 | |
"EngStop", # 34 | |
"R+Stop", # 35 | |
"RunTime", # 36 | |
"FloTime", # 37 | |
"Display", # 38 | |
"Manual", # 39 | |
"------------" # 40 | |
) | |
# Victron operation modes | |
VICTRON_OP = { 0: { "code": 7, "lib": "Off" }, | |
1: { "code": 12, "lib": "LOW-V" }, | |
2: { "code": 3, "lib": "ERROR" }, | |
3: { "code": 28, "lib": "Bulk" }, | |
4: { "code": 29, "lib": "Absorption" }, | |
5: { "code": 30, "lib": "Float" }, | |
6: { "code": 31, "lib": "Storage" }, | |
7: { "code": 1, "lib": "Equalize" }, | |
9: { "code": 1, "lib": "Inverting" }, | |
11: { "code": 1, "lib": "Supply" }, | |
245: { "code": 6, "lib": "Init" }, | |
246: { "code": 29, "lib": "Repeated absorption" }, | |
247: { "code": 1, "lib": "Recondition" }, | |
248: { "code": 1, "lib": "Bat safe" }, | |
252: { "code": 32, "lib": "Remote" } | |
} | |
# Off reason for Victron DC-DC charger | |
VICTRON_DC_OFF = { 0x00000000: { "code": 40, "lib": "None" }, | |
0x00000001: { "code": 4, "lib": "No-Input" }, | |
0x00000002: { "code": 7, "lib": "Off" }, | |
0x00000004: { "code": 7, "lib": "Off" }, | |
0x00000008: { "code": 32, "lib": "Remote" }, | |
0x00000010: { "code": 1, "lib": "Protection" }, | |
0x00000020: { "code": 1, "lib": "Pay" }, | |
0x00000040: { "code": 1, "lib": "BMS-CUT" }, | |
0x00000080: { "code": 27, "lib": "Engine" }, | |
0x00000081: { "code": 35, "lib": "R+Stop" }, | |
0x00000100: { "code": 1, "lib": "Analyzing" } | |
} | |
# Initial values -------------------------------------------------- | |
loop_time = 0 | |
comp_time = 0 | |
last_time = 0 | |
# Global exception handler ======================================= | |
def _handle_exception(loop, context): | |
print('Exception occurred !') | |
sys.print_exception(context["exception"]) | |
sys.exit() | |
# Classes ======================================================== | |
class BLEScanner: | |
def __init__(self, ble, target_mac_list): | |
self._ble = ble | |
self._ble.active(True) | |
self._ble.irq(self._irq) | |
self._target_mac_list = target_mac_list | |
self._start_time = None | |
# Interrupt proceessing routine | |
def _irq(self, event, data): | |
global bt_queue | |
pin_led_bt.on() | |
if event == 5: # Event value for _IRQ_SCAN_RESULT | |
addr_type, addr, adv_type, rssi, adv_data = data | |
if rssi > _BT_MIN_RSSI: | |
# enable for debug only.. | |
# if adv_type == 0: print("addr:",bytes(addr)) | |
if addr in self._target_mac_list and adv_type == 0: | |
try: | |
# Queue received data for async coro to process | |
bt_queue.put_sync([bytes(addr), addr_type, adv_type, rssi, bytes(adv_data)]) | |
except IndexError: | |
# Queue is full | |
pass | |
pin_led_bt.off() | |
# Start the scanner | |
def start_scan(self, duration_ms=_BT_SCAN_DURATION_MS, interval_us=_BT_SCAN_INTERVAL_US, window_us=_BT_SCAN_WINDOW_US): | |
self._ble.active(True) | |
self._start_time = time.ticks_ms() | |
self._ble.gap_scan(0, duration_ms, interval_us, window_us) | |
# Stop the scanner | |
def stop_scan(self): | |
self._ble.gap_scan(None) | |
self._ble.active(False) | |
# Functions ====================================================== | |
def kelvin_to_celsius(kelvin): | |
return round(kelvin - 273.15, 2) | |
# Decode received and decrypted BT values | |
def bt_decode(dev,cleartext): | |
global victron | |
if _DEBUG: print(f"*** Found device : {dev}") | |
if _DEBUG >= 2: | |
print(" Raw Decrypted Data (Hex): ", ' '.join(['{:02X}'.format(b) for b in cleartext])) | |
if dev is "bmv712": | |
try: | |
if cleartext[2:4] != b'\xFF\x7F': | |
victron[dev]["volt"] = float(struct.unpack('h', cleartext[2:4])[0] / 100) | |
else: | |
victron[dev]["volt"] = _UNKN | |
except: | |
victron[dev]["volt"] = _UNKN | |
try: | |
if struct.unpack('B',cleartext[8:9])[0] & 0b11 == 0b10: | |
victron[dev]["temp"] = float(kelvin_to_celsius(struct.unpack('h', cleartext[6:8])[0] / 100)) | |
else: | |
victron[dev]["temp"] = _UNKN | |
except: | |
victron[dev]["temp"] = _UNKN | |
if victron[dev]["volt"] != _UNKN and victron[dev]["temp"] != _UNKN: | |
victron[dev]["upd"] = time.ticks_ms() | |
if _DEBUG >= 2: print(f"Volt : {victron[dev]["volt"]} Temp: {victron[dev]["temp"]}") | |
if dev is "bmv712": | |
try: | |
victron[dev]["soc"] = float(((struct.unpack('h', cleartext[13:15])[0] & 0x3FFF) >> 4) / 10) | |
if victron[dev]["soc"] == 0x3FF: | |
victron[dev]["soc"] = _UNKN | |
except: | |
victron[dev]["soc"] = _UNKN | |
try: | |
amp = bytearray(cleartext[8:11]) | |
if amp[2] & 0x80 == 0x80: | |
amp.extend(b'\xFF') | |
else: | |
amp.extend(b'\x00') | |
victron[dev]["amp"] = float(((struct.unpack('i', amp)[0]) >>2 ) / 1000) | |
except: | |
victron[dev]["amp"] = _UNKN | |
if _DEBUG >= 2: print(f"Soc : {victron[dev]["soc"]} Amp: {victron[dev]["amp"]}") | |
elif dev is "smartsolar": | |
try: | |
if cleartext[0:1] != b'\xFF': | |
victron[dev]["mode"] = struct.unpack('B', cleartext[0:1])[0] | |
else: | |
victron[dev]["mode"] = _UNKN | |
except: | |
victron[dev]["mode"] = _UNKN | |
try: | |
if cleartext[4:6] != b'\xFF\x7F': | |
victron[dev]["amp"] = float(struct.unpack('h', cleartext[4:6])[0] / 10) | |
else: | |
victron[dev]["amp"] = _UNKN | |
except: | |
victron[dev]["amp"] = _UNKN | |
try: | |
if cleartext[8:10] != b'\xFF\xFF': | |
victron[dev]["pwr"] = float(struct.unpack('H', cleartext[8:10])[0]) | |
else: | |
victron[dev]["pwr"] = _UNKN | |
except: | |
victron[dev]["pwr"] = _UNKN | |
try: | |
victron[dev]["lib_mode"] = VICTRON_OP[victron[dev]["mode"]]["code"] | |
except: | |
victron[dev]["lib_mode"] = 1 | |
if (victron[dev]["mode"] != _UNKN and victron[dev]["amp"] != _UNKN | |
and victron[dev]["pwr"] != _UNKN and victron[dev]["lib_mode"] != 1 | |
): | |
victron[dev]["upd"] = time.ticks_ms() | |
if _DEBUG >= 2: print(f"Mode : {victron[dev]["mode"]} PWR : {victron[dev]["pwr"]} Lib : {victron[dev]["lib_mode"]}:{STATUS[victron[dev]["lib_mode"]]}") | |
elif dev is "orion": | |
try: | |
if cleartext[0:1] != b'\xFF': | |
victron[dev]["mode"] = struct.unpack('B', cleartext[0:1])[0] | |
else: | |
victron[dev]["mode"] = _UNKN | |
except: | |
victron[dev]["mode"] = _UNKN | |
try: | |
if cleartext[2:4] != b'\xFF\xFF': | |
victron[dev]["v_in"] = float(struct.unpack('H', cleartext[2:4])[0] / 100) | |
else: | |
victron[dev]["v_in"] = _UNKN | |
except: | |
victron[dev]["v_in"] = _UNKN | |
try: | |
if cleartext[4:6] != b'\xFF\x7F': | |
victron[dev]["v_out"] = float(struct.unpack('h', cleartext[4:6])[0] / 100) | |
else: | |
victron[dev]["v_out"] = _UNKN | |
except: | |
victron[dev]["v_out"] = _UNKN | |
try: | |
if cleartext[6:10] != b'\xFF\xFF\xFF\xFF': | |
victron[dev]["cause"] = int(struct.unpack('I', cleartext[6:10])[0]) | |
else: | |
victron[dev]["cause"] = _UNKN | |
except: | |
victron[dev]["cause"] = _UNKN | |
try: | |
victron[dev]["lib_mode"] = VICTRON_OP[victron[dev]["mode"]]["code"] | |
except: | |
victron[dev]["lib_mode"] = 1 | |
try: | |
victron[dev]["lib_cause"] = VICTRON_DC_OFF[victron[dev]["cause"]]["code"] | |
except: | |
victron[dev]["lib_cause"] = 1 | |
if (victron[dev]["mode"] != _UNKN and victron[dev]["v_in"] != _UNKN | |
and (victron[dev]["mode"] == 0 or victron[dev]["v_out"] != _UNKN) | |
and victron[dev]["cause"] != _UNKN and victron[dev]["lib_mode"] != 1 | |
and victron[dev]["lib_cause"] != 1 | |
): | |
victron[dev]["upd"] = time.ticks_ms() | |
if _DEBUG >= 2: print(f"Mode : {victron[dev]["mode"]}:{victron[dev]["lib_mode"]}:{STATUS[victron[dev]["lib_mode"]]}, Cause: {victron[dev]["cause"]}:{victron[dev]["lib_cause"]}:{STATUS[victron[dev]["lib_cause"]]}") | |
if _DEBUG >= 2: print(f"V_in : {victron[dev]["v_in"]}, V_out: {victron[dev]["v_out"]}") | |
# Old BT values expiration ---------------------------------- | |
async def bt_expire(coro_freq): | |
global comp_time, victron | |
while True: | |
coro_begin = time.ticks_ms() | |
if victron["bmv712"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["bmv712"]["upd"],_BT_EXPIRE),coro_begin) <= 0 : | |
victron["bmv712"]["volt"] = _UNKN | |
victron["bmv712"]["amp"] = _UNKN | |
victron["bmv712"]["soc"] = _UNKN | |
victron["bmv712"]["temp"] = _UNKN | |
if _DEBUG >= 3: print(f"* CORO: bt_expire: Spent {time.ticks_diff(time.ticks_ms(),coro_begin)}.") | |
coro_throttle = time.ticks_diff(time.ticks_add(coro_begin,coro_freq),time.ticks_ms()) | |
comp_time += time.ticks_diff(time.ticks_ms(), coro_begin) | |
if coro_throttle >= 0: | |
await asyncio.sleep_ms(coro_throttle) | |
else: | |
await asyncio.sleep(0) | |
# Decrypt and decode received BT values ------------------------------------------ | |
async def bt_decrypt(bt_queue): | |
global comp_time, scanner, victron | |
async for bt_input in bt_queue: | |
coro_begin = time.ticks_ms() | |
pin_led_bt.on() | |
mac = bt_input[0] # MAC address, bytes | |
# mac_type = bt_input[1] # Address type, integer | |
adv_data = bt_input[4] # Advertisement data, bytes | |
kb0 = struct.unpack('B',adv_data[14:15])[0] # 1st encryption key byte | |
if _DEBUG: | |
timestamp = time.ticks_diff(time.ticks_ms(), scanner._start_time) / 1000 | |
adv_type = bt_input[2] # Advertisement type, integer | |
rssi = bt_input[3] # RSSI, integer | |
print("\n{:.1f}s - Target Device Found - Address: {mac}, RSSI: {rssi}, Adv. Type: {adv_type}".format( | |
timestamp, | |
mac=':'.join(['{:02X}'.format(b) for b in mac]), | |
rssi=rssi, | |
adv_type=adv_type | |
)) | |
if _DEBUG >= 2: | |
record_type = struct.unpack('B',adv_data[11:12])[0] | |
nonce = struct.unpack('H',adv_data[12:14])[0] | |
# Print the entire advertising data as hex | |
print(" Raw Advertising Data (Hex):", ' '.join(['{:02X}'.format(b) for b in adv_data])) | |
print(f" Record type: {record_type:#04X} Nonce: {nonce:#06X} Key byte 0: {kb0:#04X}") | |
for dev in victron: | |
if victron[dev]["mac"] == mac: | |
print("key from dev:",kb0) | |
if victron[dev]["key"][0:1] == kb0.to_bytes(1,0): | |
if _DEBUG >= 2: print(" Encryption key matches.") | |
# AES-CTR Decryption | |
# We should use AES-CTR but it is not implemented into mycropython's | |
# cryptolib, so we need to fake it using ECB. | |
# We have at most 16 bytes to decrypt, so we can do it in a single | |
# pass with the nonce + a zero CTR value. | |
ctr = bytearray(adv_data[12:14]) # Start with nonce | |
ctr.extend(bytes(14)) # Counter is zero | |
# if _DEBUG >= 2: print(" Ctr feed (Hex):", ' '.join(['{:02X}'.format(b) for b in ctr])) | |
ciphertext = bytearray(adv_data[15:]) # Our ciphertext | |
if len(adv_data[15:]) < 16 : # Extend it to 16 bytes | |
ciphertext.extend(bytes(16 - len(adv_data[15:]))) # if needed | |
cipher = cryptolib.aes(victron[dev]["key"],1) # Initialize AES ECB with key | |
cipher.encrypt(ctr,ctr) # Encrypt counter | |
# if _DEBUG >= 2: print(" Encrypted CTR (Hex):", ' '.join(['{:02X}'.format(b) for b in ctr])) | |
cleartext = bytes(a ^ b for a, b in zip(ciphertext, ctr)) # XOR results with ciphertext | |
bt_decode(dev,cleartext) # Now decode what we got | |
else: | |
if _DEBUG: print(f" Encryption key mismatch ! Device {dev}:mac Ours: {victron[dev]["key"][0:1]}, got: {kb0.to_bytes(1,0)}") | |
break | |
break | |
if _DEBUG >= 3: print(f"* CORO: bt_decrypt: Spent {time.ticks_diff(time.ticks_ms(),coro_begin)}.") | |
comp_time += time.ticks_diff(time.ticks_ms(), coro_begin) | |
pin_led_bt.off() | |
await asyncio.sleep(0) | |
# Main loop ================================================== | |
async def main(): | |
global comp_time, last_time, loop_time | |
global scanner, bt_queue, bt_stat | |
loop = asyncio.get_event_loop() | |
loop.set_exception_handler(_handle_exception) | |
print("we are in the loop") | |
# Bluetooth thread safe queue | |
bt_queue = ThreadSafeQueue([[bytes(6), int(0), int(0), int(0), bytes(48)] for _ in range(20)]) | |
# Create scheduled tasks | |
task_bt_expire = asyncio.create_task(bt_expire(5000)) # Expire old BT values | |
task_bt_decrypt = asyncio.create_task(bt_decrypt(bt_queue)) # Decrypt and decode BT values | |
await asyncio.sleep(0) | |
# Start Bluetooth BLE scanner | |
scanner = BLEScanner(ble, maclist) | |
scanner.start_scan(duration_ms=_BT_SCAN_DURATION_MS, interval_us=_BT_SCAN_INTERVAL_US, window_us=_BT_SCAN_WINDOW_US) | |
while True: | |
# Sets the main loop defined duration | |
loop_begin_tick = time.ticks_ms() | |
loop_end_target = time.ticks_add(loop_begin_tick,_TMR_MAIN_LOOP) | |
# Wait until desired loop duration | |
loop_time = time.ticks_diff(time.ticks_ms(),loop_begin_tick) | |
last_time = comp_time + loop_time | |
comp_time = 0 | |
loop_throttle = time.ticks_diff(loop_end_target,time.ticks_ms()) - 1 | |
if loop_throttle >= 0 : | |
await asyncio.sleep_ms(loop_throttle) | |
else: | |
loop_throttle = 0 | |
# Let's do it ! | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment