Skip to content

Instantly share code, notes, and snippets.

@feliksyasnopolski
Created February 8, 2025 18:58
Show Gist options
  • Save feliksyasnopolski/509867bdc160b6fe008279db2afd8cef to your computer and use it in GitHub Desktop.
Save feliksyasnopolski/509867bdc160b6fe008279db2afd8cef to your computer and use it in GitHub Desktop.
import json
import os
import struct
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional
@dataclass
class ACEPacketRequest:
id: int
method: str
params: Optional[dict] = None
@dataclass
class ACEPacketResponse:
id: int
code: int
msg: str
result: Optional[dict]
@dataclass
class SwitchFilament:
index: int
@dataclass
class UnwindingFilament:
index: int
length: int
speed: int
mode: int # 0 - Normal mode; 1 - Enhanced mode
@dataclass
class StopUnwindingFilament:
index: int
@dataclass
class FeedFilament:
index: int
length: int
speed: int
@dataclass
class UpdateFeedingSpeed:
index: int
speed: int
@dataclass
class UpdateUnwindingSpeed:
index: int
speed: int
@dataclass
class StopFeedFilament:
index: int
@dataclass
class StartFeedAssist:
index: int
@dataclass
class StopFeedAssist:
index: int
@dataclass
class ContinueFilament:
auto: int
@dataclass
class SetFanSpeed:
fan_speed: int
@dataclass
class Drying:
temp: int
fan_speed: int
duration: int
@dataclass
class DryerStatus:
status: str
target_temp: int
duration: int
remain_time: int
class RfidStatus(Enum):
NOT_FOUND = 0
UNRECOGNIZED = 1
RECOGNIZED = 2
RECOGNIZING = 3
@dataclass
class SlotStatus:
index: int
status: str
sku: str
type: str
color: List[int]
rfid: Optional[RfidStatus] = None
source: Optional[int] = None
redirect: Optional[str] = None
@dataclass
class GetStatus:
status: str # startup/busy/ready
action: str # feeding/unwinding
temp: int
humidity: int
enable_rfid: int
dryer_status: DryerStatus
feed_assist_count: int
cont_assist_time: float # Continuous feeding time in ms
slots: List[SlotStatus]
@dataclass
class Info:
id: int
slots: int
sn: str
date: str
model: str
firmware: str
@dataclass
class IntRange:
min: int
max: int
@dataclass
class FilamentInfo:
id: Optional[int]
index: int
sku: str
brand: str
type: str
color: List[int]
extruder_temp: IntRange
hotbed_temp: IntRange
diameter: float
rfid: Optional[RfidStatus] = None
source: Optional[int] = None
@dataclass
class SetFilamentInfo:
index: int
type: str
color: List[int]
@dataclass
class RefreshFilamentInfo:
index: int
class ACEProto:
def __init__(self):
self.last_msg_id = 0
def calc_crc16(self, buf: bytes) -> int:
crc = 0xffff
for data in buf:
data ^= crc & 0xff
data ^= (data & 0x0f) << 4
crc = ((data << 8) | (crc >> 8)) ^ (data >> 4) ^ (data << 3)
return crc
def write_all(self, fd: int, buf: bytes) -> Optional[Exception]:
sent = 0
while sent < len(buf):
try:
w = os.write(fd, buf[sent:])
sent += w
except OSError as e:
print(f"Failed to write data to fd {fd}: {e}")
return e
return None
def build_packet(self, pkt: ACEPacketRequest) -> bytes:
buf = bytearray()
buf.extend([0xFF, 0xAA])
bts = json.dumps(pkt.__dict__).encode()
size = len(bts)
buf.extend(struct.pack('<H', size))
crc = self.calc_crc16(bts)
buf.extend(bts)
buf.extend(struct.pack('<H', crc))
buf.append(0xFE)
return bytes(buf)
def build_packet_by_buf(self, data: bytes, buf_len: int) -> bytes:
buf = bytearray()
buf.extend([0xFF, 0xAA])
buf.extend(struct.pack('<H', buf_len))
crc = self.calc_crc16(data)
buf.extend(data)
buf.extend(struct.pack('<H', crc))
buf.append(0xFE)
return bytes(buf)
def parse_packet(self, data: bytes) -> Optional[dict]:
if len(data) < 6 or data[0] != 0xFF or data[1] != 0xAA or data[-1] != 0xFE:
return None # Invalid packet
size = struct.unpack('<H', data[2:4])[0]
payload = data[4:4+size]
received_crc = struct.unpack('<H', data[4+size:6+size])[0]
if self.calc_crc16(payload) != received_crc:
return None # CRC mismatch
try:
return json.loads(payload.decode())
except json.JSONDecodeError:
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment