Skip to content

Instantly share code, notes, and snippets.

@shaposhnikoff
Created January 8, 2026 20:55
Show Gist options
  • Select an option

  • Save shaposhnikoff/cefc18c931a9085cca9b3c879e31743f to your computer and use it in GitHub Desktop.

Select an option

Save shaposhnikoff/cefc18c931a9085cca9b3c879e31743f to your computer and use it in GitHub Desktop.
iot_device_checker
from math import log
import requests
import json
import logging
from requests.exceptions import (
ConnectionError,
Timeout,
RequestException,
HTTPError
)
# add logging here
esp_sensor = "http://192.168.10.118"
shelly_3phase = "http://192.168.10.69"
# SONOFF devices dictionary: name -> IP address
SONOFF_DEVICES = {
"boyler": "http://192.168.11.111", # Boiler
"bath_heater": "http://192.168.11.141", # Bathroom heater
}
REQUEST_TIMEOUT = 4 # seconds
def get_sensor_data():
"""Gets data from ESP sensor."""
url = f"{esp_sensor}"
try:
response = requests.get(url, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
return response.json()
except ConnectionError:
print(f"Connection error: ESP sensor ({esp_sensor}) is unavailable")
return None
except Timeout:
print(f"Timeout: ESP sensor ({esp_sensor}) is not responding")
return None
except HTTPError as e:
print(f"HTTP error from ESP sensor: {e.response.status_code}")
return None
except json.JSONDecodeError:
print(f"JSON parsing error from ESP sensor response")
return None
except RequestException as e:
print(f"Unknown error while requesting ESP sensor: {e}")
return None
def shelly_3phase_status():
"""Gets status of Shelly 3-phase device."""
url = f"{shelly_3phase}/rpc/Shelly.GetStatus"
try:
response = requests.get(url, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
return response.json()
except ConnectionError:
print(f"Connection error: Shelly 3-phase ({shelly_3phase}) is unavailable")
return None
except Timeout:
print(f"Timeout: Shelly 3-phase ({shelly_3phase}) is not responding")
return None
except HTTPError as e:
print(f"HTTP error from Shelly 3-phase: {e.response.status_code}")
return None
except json.JSONDecodeError:
print(f"JSON parsing error from Shelly 3-phase response")
return None
except RequestException as e:
print(f"Unknown error while requesting Shelly 3-phase: {e}")
return None
def sonoff_turn_on(device_name: str):
"""
Turns on SONOFF device.
Args:
device_name: Device name from SONOFF_DEVICES dictionary
('boyler', 'bath_heater')
"""
if device_name not in SONOFF_DEVICES:
print(f"Error: device '{device_name}' not found")
print(f"Available devices: {list(SONOFF_DEVICES.keys())}")
return False
device_ip = SONOFF_DEVICES[device_name]
url = f"{device_ip}/cm?cmnd=Power%20On"
try:
response = requests.get(url, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
result = response.json()
if result.get("POWER") == "ON":
print(f"SONOFF '{device_name}' ({device_ip}) turned ON")
return True
else:
print(f"SONOFF '{device_name}': unexpected response - {result}")
return False
except ConnectionError:
print(f"Connection error: SONOFF '{device_name}' ({device_ip}) is unavailable")
return False
except Timeout:
print(f"Timeout: SONOFF '{device_name}' ({device_ip}) is not responding")
return False
except HTTPError as e:
print(f"HTTP error from SONOFF '{device_name}': {e.response.status_code}")
return False
except json.JSONDecodeError:
print(f"JSON parsing error from SONOFF '{device_name}' response")
return False
except RequestException as e:
print(f"Unknown error while requesting SONOFF '{device_name}': {e}")
return False
def sonoff_turn_off(device_name: str):
"""
Turns off SONOFF device.
Args:
device_name: Device name from SONOFF_DEVICES dictionary
('boyler', 'bath_heater')
"""
if device_name not in SONOFF_DEVICES:
print(f"Error: device '{device_name}' not found")
print(f"Available devices: {list(SONOFF_DEVICES.keys())}")
return False
device_ip = SONOFF_DEVICES[device_name]
url = f"{device_ip}/cm?cmnd=Power%20Off"
try:
response = requests.get(url, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
result = response.json()
if result.get("POWER") == "OFF":
print(f"SONOFF '{device_name}' ({device_ip}) turned OFF")
return True
else:
print(f"SONOFF '{device_name}': unexpected response - {result}")
return False
except ConnectionError:
print(f"Connection error: SONOFF '{device_name}' ({device_ip}) is unavailable")
return False
except Timeout:
print(f"Timeout: SONOFF '{device_name}' ({device_ip}) is not responding")
return False
except HTTPError as e:
print(f"HTTP error from SONOFF '{device_name}': {e.response.status_code}")
return False
except json.JSONDecodeError:
print(f"JSON parsing error from SONOFF '{device_name}' response")
return False
except RequestException as e:
print(f"Unknown error while requesting SONOFF '{device_name}': {e}")
return False
def sonoff_turn_on_all():
"""Turns on all SONOFF devices."""
print("Turning on all SONOFF devices...")
for device_name in SONOFF_DEVICES:
sonoff_turn_on(device_name)
def sonoff_turn_off_all():
"""Turns off all SONOFF devices."""
print("Turning off all SONOFF devices...")
for device_name in SONOFF_DEVICES:
sonoff_turn_off(device_name)
def main():
"""Main function for status check."""
DTEK_STATUS = get_sensor_data()
if DTEK_STATUS is None:
print("Failed to get data from ESP sensor")
return
SHELLY_3PHASE_STATUS = shelly_3phase_status()
if SHELLY_3PHASE_STATUS is None:
print("Failed to get Shelly 3-phase status")
return
# if the voltage difference between any two phases is more than 3 volts
# and sensor data is ON, then the system is on DTEK
# else is anker power station
try:
em_data = SHELLY_3PHASE_STATUS['em:0']
a_voltage = em_data['a_voltage']
b_voltage = em_data['b_voltage']
c_voltage = em_data['c_voltage']
# Check sensor data (DTEK == True or "ON")
sensor_dtek_on = DTEK_STATUS.get('DTEK', False)
if isinstance(sensor_dtek_on, str):
sensor_dtek_on = sensor_dtek_on.upper() == "ON"
# Check voltage DIFFERENCE between phases
# DTEK: phases are different (230V, 190V, 200V) - difference > 3V
# Anker: inverter provides equal voltage - difference < 3V
diff_ab = abs(a_voltage - b_voltage)
diff_ac = abs(a_voltage - c_voltage)
diff_bc = abs(b_voltage - c_voltage)
# If at least one difference > 3V — it's DTEK
voltage_diff_ok = diff_ab > 3 or diff_ac > 3 or diff_bc > 3
# DTEK is ON only if BOTH conditions are met
if voltage_diff_ok and sensor_dtek_on:
print("DTEK is ON")
print(f" Voltage: A={a_voltage}V, B={b_voltage}V, C={c_voltage}V")
print(f" Difference: A-B={diff_ab:.1f}V, A-C={diff_ac:.1f}V, B-C={diff_bc:.1f}V")
# DTEK is ON - turn on all devices
sonoff_turn_on_all()
else:
print("Anker Power Station is ON (DTEK OFF)")
print(f" Voltage: A={a_voltage}V, B={b_voltage}V, C={c_voltage}V")
print(f" Difference: A-B={diff_ab:.1f}V, A-C={diff_ac:.1f}V, B-C={diff_bc:.1f}V")
print(f" Sensor DTEK: {DTEK_STATUS.get('DTEK', 'N/A')}")
# DTEK is OFF - STRICTLY turn off all devices
sonoff_turn_off_all()
except KeyError as e:
print(f"Error: missing key {e} in Shelly response")
# On error - turn off everything for safety
sonoff_turn_off_all()
except TypeError:
print("Error: invalid data format from Shelly")
# On error - turn off everything for safety
sonoff_turn_off_all()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment