Created
March 12, 2025 00:53
-
-
Save cfpandrade/8f408cbf04c49f0552aaa0223987a5fa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
import time | |
from datetime import datetime | |
import appdaemon.plugins.hass.hassapi as hass | |
class SpoolmanSync(hass.Hass): | |
def initialize(self): | |
"""Inicializa la app y escucha cambios en los sensores AMS.""" | |
self.spoolman_url = self.args["spoolman_url"] | |
self.ha_url = self.args["homeassistant_url"] | |
self.ha_token = self.args["homeassistant_token"] | |
self.sensors = self.args["sensor_ams_trays"] | |
# Diccionario para rastrear carretes en cada bandeja | |
self.active_spools = {} | |
# Atributos importantes para monitorear | |
self.attributes_to_monitor = ["tag_uid", "type", "color", "remain", "active", "name"] | |
# Configurar listeners para cada sensor y cada atributo relevante | |
for sensor in self.sensors: | |
# Monitorear cambios en el estado principal | |
self.listen_state(self.sync_spoolman, sensor) | |
# Monitorear cambios en atributos específicos | |
for attribute in self.attributes_to_monitor: | |
self.listen_state(self.sync_spoolman, sensor, attribute=attribute) | |
# Añadir verificación periódica cada 6 horas para mayor seguridad | |
self.run_every(self.periodic_sync, "now", 6 * 60 * 60) | |
self.log("✅ Spoolman Sync AppDaemon iniciado correctamente.") | |
def periodic_sync(self, kwargs): | |
"""Ejecuta sincronización periódica para todos los sensores cada 6 horas.""" | |
self.log("🔄 Ejecutando sincronización periódica programada (cada 6 horas)") | |
for sensor in self.sensors: | |
self.sync_spoolman(sensor, None, None, None, kwargs) | |
def normalize_color(self, color_hex): | |
"""Normaliza un código de color removiendo el canal alpha (AA) si existe.""" | |
if not color_hex: | |
return None | |
color = color_hex.replace("#", "").lower() | |
if len(color) == 8: | |
color = color[:6] | |
return f"#{color}" | |
def get_ha_sensor(self, entity_id): | |
"""Obtiene datos de un sensor en Home Assistant con autenticación.""" | |
url = f"{self.ha_url}/states/{entity_id}" | |
headers = { | |
"Authorization": f"Bearer {self.ha_token}", | |
"Content-Type": "application/json" | |
} | |
try: | |
response = requests.get(url, headers=headers, timeout=5, verify=False) | |
response.raise_for_status() | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
self.log(f"❌ ERROR obteniendo sensor {entity_id}: {e}") | |
return None | |
def get_spoolman_spools(self): | |
"""Obtiene todos los carretes en Spoolman con reintentos.""" | |
url = f"{self.spoolman_url}/spool" | |
for intento in range(3): | |
try: | |
response = requests.get(url, timeout=5) | |
response.raise_for_status() | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
self.log(f"❌ ERROR obteniendo spools de Spoolman (Intento {intento+1}/3): {e}") | |
time.sleep(2) | |
return [] | |
def sync_spoolman(self, entity, attribute, old, new, kwargs): | |
"""Sincroniza Spoolman con los datos del AMS.""" | |
sensor_data = self.get_ha_sensor(entity) | |
if not sensor_data: | |
return | |
attributes = sensor_data.get('attributes', {}) | |
tag_uid = attributes.get('tag_uid') | |
material_type = attributes.get('type') | |
color_hex = self.normalize_color(attributes.get('color')) | |
filament_name = attributes.get('name') | |
remain_percent = attributes.get('remain') | |
is_active = attributes.get('active', False) | |
tray_position = entity.split('_')[-1] | |
if not tag_uid or tag_uid == "0000000000000000": | |
self.log(f"🚫 Bandeja {tray_position} sin RFID, ignorando...") | |
return | |
location = f"AMS {tray_position}" | |
self.log(f"📡 AMS Tray {tray_position}: Material {material_type}, Color {color_hex}, RFID {tag_uid}, Ubicación: {location}") | |
spools = self.get_spoolman_spools() | |
if not spools: | |
return | |
# 📌 Buscar carrete con el RFID en Spoolman | |
matching_spool = None | |
for spool in spools: | |
spool_rfid = spool.get('extra', {}).get('extra_rfid_tag_string', "").replace('"', '') | |
if spool_rfid == tag_uid: | |
matching_spool = spool | |
break | |
if matching_spool: | |
self.log(f"✅ Spool encontrado en Spoolman con RFID {tag_uid}, ID: {matching_spool['id']}") | |
else: | |
for spool in spools: | |
if spool.get('extra', {}).get('extra_rfid_tag_string', "").replace('"', '') == "": | |
spool_material = spool['filament']['material'] | |
spool_color = self.normalize_color(spool['filament']['color_hex']) | |
if spool_material == material_type and spool_color == color_hex: | |
matching_spool = spool | |
self.log(f"🔄 Asignando RFID {tag_uid} al spool {matching_spool['id']}") | |
break | |
if not matching_spool: | |
self.log(f"⚠️ No se encontró carrete en Spoolman para la bandeja {tray_position}, se debe registrar manualmente.") | |
return | |
spool_id = matching_spool["id"] | |
initial_weight = matching_spool.get("initial_weight", 0) | |
current_remaining = matching_spool.get("remaining_weight", 0) | |
update_data = {} | |
last_used = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") | |
# 🏷️ Si no tiene first_used, lo asignamos | |
if "first_used" not in matching_spool or not matching_spool["first_used"]: | |
self.log(f"🆕 Primera vez que se usa este carrete. Asignando first_used: {last_used}") | |
update_data["first_used"] = last_used | |
if remain_percent is not None and initial_weight > 0: | |
new_remaining = (remain_percent / 100) * initial_weight | |
if abs(new_remaining - current_remaining) > 1: | |
self.log(f"🔄 Actualizando peso restante de {current_remaining}g a {new_remaining}g") | |
update_data["remaining_weight"] = new_remaining | |
if is_active: | |
self.log(f"⏳ Carrete activo, actualizando last_used a {last_used}") | |
update_data["last_used"] = last_used | |
# ⚡ Verificar si la bandeja tenía otro carrete antes y moverlo a Home | |
previous_spool = self.active_spools.get(entity) | |
if previous_spool and previous_spool != tag_uid: | |
self.log(f"🏠 Moviendo carrete anterior {previous_spool} a Home") | |
for spool in spools: | |
if spool.get('extra', {}).get('extra_rfid_tag_string', "").replace('"', '') == previous_spool: | |
response = requests.patch(f"{self.spoolman_url}/spool/{spool['id']}", json={"location": "Home"}) | |
if response.status_code == 200: | |
self.log(f"✅ Carrete {spool['id']} movido a Home") | |
else: | |
self.log(f"❌ ERROR moviendo carrete {spool['id']} a Home: {response.text}") | |
# 🔄 Actualizar el nuevo carrete con la posición correcta | |
if matching_spool.get("location") != location: | |
self.log(f"📍 Actualizando ubicación a {location}") | |
update_data["location"] = location | |
self.active_spools[entity] = tag_uid | |
if update_data: | |
self.log(f"📡 Enviando actualización a Spoolman para spool {spool_id}: {update_data}") | |
response = requests.patch(f"{self.spoolman_url}/spool/{spool_id}", json=update_data) | |
if response.status_code == 200: | |
self.log(f"✅ Carrete {spool_id} actualizado correctamente") | |
else: | |
self.log(f"❌ ERROR al actualizar Spoolman: {response.text}") | |
else: | |
self.log(f"⚡ No hay cambios que actualizar para el carrete {spool_id}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment