Skip to content

Instantly share code, notes, and snippets.

@marcelrv
Last active January 27, 2025 23:48
Show Gist options
  • Save marcelrv/a05ff45e91f05e64188f6f36562c73de to your computer and use it in GitHub Desktop.
Save marcelrv/a05ff45e91f05e64188f6f36562c73de to your computer and use it in GitHub Desktop.
csafe protocol
import random
import re
import socket
"""
csafeserver.py
copyright: 2024
Author: Marcel Verpaalen
This module implements a TCP server CSAFE device emulator that listens for incoming connections and processes CSAFE specific commands.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
commands = [
{'cmd':'0x01','short':'cmdAutoUpload','long':'Control automatic upload features. See table below for definition.'},
{'cmd':'0x02','short':'cmdUpList','long':'List of commands that specify data for batched upload. *'},
{'cmd':'0x04','short':'cmdUpStatusSec','long':'Interval between periodic automatic uploads of status **'},
{'cmd':'0x05','short':'cmdUpListSec','long':'Interval between periodic automatic uploads of UpList **'},
{'cmd':'0x10','short':'cmdIDDigits','long':'An integer between 2 - 5 defining the number of digits to accept from the user as a valid ID'},
{'cmd':'0x11','short':'cmdSetTIme','long':'Set current time of day'},
{'cmd':'0x12','short':'cmdSetDate','long':'Set current date.'},
{'cmd':'0x13','short':'cmdSetTimeout','long':'Set timeout period for exiting certain states. See state diagram for details.'},
{'cmd':'0x1A','short':'cmdUserCfg1','long':'Slave depended configuration information'},
{'cmd':'0x1B','short':'cmdUserCfg2','long':'Slave depended configuration information'},
{'cmd':'0x20','short':'cmdSetTWork','long':'Workout time goal'},
{'cmd':'0x21','short':'cmdSetHorizontal','long':'Horizontal distance goal'},
{'cmd':'0x22','short':'cmdSetVertical','long':'Vertical distance goal'},
{'cmd':'0x23','short':'cmdSetCalories','long':'Calories goal'},
{'cmd':'0x24','short':'cmdSetProgram','long':'Machine program and level'},
{'cmd':'0x25','short':'cmdSetSpeed','long':'Equipment speed'},
{'cmd':'0x28','short':'cmdSetGrade','long':'Equipment grade (incline)'},
{'cmd':'0x29','short':'cmdSetGear','long':'Equipment gear (resistance)'},
{'cmd':'0x2B','short':'cmdSetUserInfo','long':'General user information'},
{'cmd':'0x2C','short':'cmdSetTorque','long':'Equipment torque'},
{'cmd':'0x2D','short':'cmdSetLevel','long':'Level (bike=power, stepper=speed)'},
{'cmd':'0x30','short':'cmdSetTargetHR','long':'Target HR (bpm)'},
{'cmd':'0x32','short':'cmdSetGoal','long':'Sets a workout goal'},
{'cmd':'0x33','short':'cmdSetMETS','long':'METS goal'},
{'cmd':'0x34','short':'cmdSetPower','long':'Power goal'},
{'cmd':'0x35','short':'cmdSetHRZone','long':'Target HR zone (bpm)'},
{'cmd':'0x36','short':'cmdSetHRMax','long':'Maximum HR limit (bpm)'},
{'cmd':'0x40','short':'cmdSetChannelRange','long':'Audio channel range (inclusive)'},
{'cmd':'0x41','short':'cmdSetVolumeRange','long':'Audio volume range (inclusive)'},
{'cmd':'0x42','short':'cmdSetAudioMute','long':'Set audio muting state'},
{'cmd':'0x43','short':'cmdSetAudioChannel','long':'Set audio channel'},
{'cmd':'0x44','short':'cmdSetAudioVolume','long':'Set audio volume'},
{'cmd':'0x60','short':'cmdStartText2,3,4,5','long':'Start text upload'},
{'cmd':'0x61','short':'cmdAppendText2,3,4,5','long':'Append text to previous cmdStartText'},
{'cmd':'0x65','short':'cmdGetTextStatus','long':'Indicates the current status of a certain message type string on the slave'},
{'cmd':'0x70','short':'cmdGetCaps','long':'Query capabilities of slave'},
{'cmd':'0x71','short':'cmdGetInfo','long':'Query equipment Information of slave'},
{'cmd':'0x72','short':'cmdGetErrorString','long':'Returns a variable length printable ASCII string for associated error code.'},
{'cmd':'0x80','short':'cmdGetStatus','long':'Request Status from Slave. Status is sent even if the flgAck flag is off, i.e. this command can be added to a frame to force an acknowledgment of the frame even it the flgAck is off. Unlike the "Empty Frame" this command does update the Status.'},
{'cmd':'0x81','short':'cmdReset','long':'Reset Slave. Initialize variables to Ready State and reset Frame Toggle and Status of Previous Frame flag to zero.'},
{'cmd':'0x82','short':'cmdGoIdle','long':'go to Idle State, reset variables to Idle state'},
{'cmd':'0x83','short':'cmdGoHaveID','long':'go to HaveID state'},
{'cmd':'0x85','short':'cmdGoInUse','long':'go to InUse State '},
{'cmd':'0x86','short':'cmdGoFinished','long':'go to Finished State'},
{'cmd':'0x87','short':'cmdGoReady','long':'go to Ready State'},
{'cmd':'0x88','short':'cmdBadID','long':'Indicates to Slave that the user ID entered was invalid'},
{'cmd':'0x91','short':'cmdGetVersion','long':'Codes used to uniquely identify equipment and ROM version.2'},
{'cmd':'0x92','short':'cmdGetID','long':'ID # defined for the user'},
{'cmd':'0x93','short':'cmdGetUnits','long':'Unit Mode (Metric/English)'},
{'cmd':'0x94','short':'cmdGetSerial','long':'Return equipment serial number'},
{'cmd':'0x98','short':'cmdGetList','long':'List of batched commands configured with cmdUpList.3 '},
{'cmd':'0x99','short':'cmdGetUtilization','long':'Hours used since manufactured'},
{'cmd':'0x9A','short':'cmdGetMotorCurrent','long':'Motor current'},
{'cmd':'0x9B','short':'cmdGetOdometer','long':'Equipment odometer value','has_units':True},
{'cmd':'0x9C','short':'cmdGetErrorCode','long':'Equipment error code'},
{'cmd':'0x9D','short':'cmdGetServiceCode','long':'Equipment service code'},
{'cmd':'0x9E','short':'cmdGetUserCfg1','long':'Slave dependent configuration data'},
{'cmd':'0x9F','short':'cmdGetUserCfg2','long':'Slave dependent configuration data'},
{'cmd':'0xA0','short':'cmdGetTWork','long':'Workout duration ***'},
{'cmd':'0xA1','short':'cmdGetHorizontal','long':'Accumulated (for the workout) Distance (horizontal )','has_units':True},
{'cmd':'0xA2','short':'cmdGetVertical','long':'Accumulated (for the workout) Distance (vertical)','has_units':True},
{'cmd':'0xA3','short':'cmdGetCalories','long':'Accumulated Calories Burned'},
{'cmd':'0xA4','short':'cmdGetProgram','long':'Current Machine program and level '},
{'cmd':'0xA5','short':'cmdGetSpeed','long':'Current Speed','has_units':True},
{'cmd':'0xA6','short':'cmdGetPace','long':'Current Pace','has_units':True},
{'cmd':'0xA7','short':'cmdGetCadence','long':'Current Cadence','has_units':True},
{'cmd':'0xA8','short':'cmdGetGrade','long':'Current grade of exercise'},
{'cmd':'0xA9','short':'cmdGetGear','long':'Current Gear'},
{'cmd':'0xAA','short':'cmdGetUpList','long':'Batched workout results as defined by CmdUpList **'},
{'cmd':'0xAB','short':'cmdGetUserInfo','long':'Current user information'},
{'cmd':'0xAC','short':'cmdGetTorque','long':'Current torque','has_units':True},
{'cmd':'0xB0','short':'cmdGetHRCur ','long':'Current HR (BPM)'},
{'cmd':'0xB2','short':'cmdGetHRTZone','long':'Current time in target HR zone'},
{'cmd':'0xB3','short':'cmdGetMETS','long':'Current 3.5 ml/kg/min oxygen consumption rating'},
{'cmd':'0xB4','short':'cmdGetPower','long':'Current Power expenditure ( i.e. calories/min or watts )','has_units':True},
{'cmd':'0xB5','short':'cmdGetHRAvg','long':'Current average heart rate (bpm)'},
{'cmd':'0xB6','short':'cmdGetHRMax','long':'Workout maximum heart rate (bpm)'},
{'cmd':'0xBE','short':'cmdGetUserData1','long':'Slave dependent workout data'},
{'cmd':'0xBF','short':'cmdGetUserData2','long':'Slave dependent workout data'},
{'cmd':'0xC0','short':'cmdGetAudioChannel','long':'Audio channel number selection'},
{'cmd':'0xC1','short':'cmdGetAudioVolume','long':'Audio volume setting'},
{'cmd':'0xC2','short':'cmdGetAudioMute','long':'Audio muting'},
{'cmd':'0xE0','short':'cmdEndText2,6','long':'End of text upload'},
{'cmd':'0xE1','short':'cmdDisplayPopup7','long':'Display pop up'},
{'cmd':'0xE5','short':'cmdGetPopupStatus8','long':'Indicates the status of a previous cmdDisplayPopup request'},
]
# TODO: this module does not implement unstuffing for commands containing 0xf3
def get_command(cmd):
for command in commands:
if int(command['cmd'],0) == cmd:
return command
return None
def get_command_name(cmd):
for command in commands:
if int(command['cmd'],0) == cmd:
return command['short']
return 'Unknown'
def get_response(data, distance):
cmds = bytearray(data[1:-2])
if cmds[0] == 0x80:
return b"\xf1\x70\x70\xf2"
response = bytearray()
response.append(0xF1)
response.append(0x98)
for cmd in cmds:
i = len(response)
response.append(cmd)
# list emultated commands
match cmd:
case 0xA1: # cmdGetDistance
response.append(0x03)
response.extend(distance.to_bytes(2, byteorder="little"))
response.append(0x24)
case 0xA3: # cmdGetCalories
cal = int(distance / 5)
response.append(0x02)
response.extend(cal.to_bytes(2, byteorder="little"))
case 0xA5: # cmdGetSpeed
speed = random.randint(0x25, 0x3F)
response.append(0x03)
response.extend(speed.to_bytes(2, byteorder="little"))
response.append(0x52)
case 0xB0: # cmdGetHRCur
response.append(0x01)
response.append(random.randint(100, 110))
case 0xB4: # cmdGetPower
response.append(0x03)
response.extend(random.randint(90, 100).to_bytes(2, byteorder="little"))
response.append(0x58)
case 0x9B: # getOdometer
response.append(0x05)
response.extend(int(43485).to_bytes(4, byteorder="little"))
response.append(0x23)
case _: # default case
print (f"Unimplemented command: {cmd:02x} - {get_command_name (cmd)}")
print(f"Add response for cmd: {cmd:02x} - {get_command_name (cmd)} - data: {response[i:].hex()}")
# add checksum and end byte
response.append(calculate_checksum(bytes(response[1:])))
response.append(0xF2)
if len(response) <= 5:
print(f"not found cmd {data[1]:02x}")
return bytes(response)
def calculate_checksum(data):
# print(f"Checksum data: {data.hex()}")
checksum = 0
for byte in data:
checksum ^= byte
return checksum & 0xFF
def stuff_bytes(data):
j = 0
new_data = bytearray()
new_data.append(data[0])
for i in range(1, len(data) - 1):
if data[i] == 0xF1 or data[i] == 0xF2 or data[i] == 0xF3:
new_data.append(0xF3)
new_data.append(data[i] & 0x0F)
else:
new_data.append(data[i])
new_data.append(data[len(data) - 1])
return bytes(new_data)
def start_server():
host = "0.0.0.0"
port = 2000
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
server_socket.bind((host, port))
server_socket.listen(1)
print(f"Server listening on port {port}...")
distance = 0
while True:
client_socket, client_address = server_socket.accept()
with client_socket:
print(f"Connection from {client_address}")
while True:
data = client_socket.recv(1024)
if not data:
break
distance += 1
print(f"Received request data: {data.hex()}")
start = data.find(b"\xf1")
end = data.find(b"\xf2", start)
if start != -1 and end != -1:
selected_data = data[start : end + 1]
cmd = selected_data[2]
#print(f"Selected data: {selected_data.hex()}")
response = get_response(selected_data, distance)
checksum = calculate_checksum(response[1:-2])
# print(f"checksum: {checksum:02x} - {response[-2]:02x} - {response.hex()}")
response = stuff_bytes(response)
client_socket.send(response)
print(
f"Reply response: {response.hex()} response data {response[3:-2].hex(' ')} {response[4] if len(response) == 7 else ''} "
)
if __name__ == "__main__":
start_server()

request: f19191f2 <--> response: f1 89 91 05 09 02 06 09 0b 12 f2 0x91 command data response: 09 02 06 09 0b CSAFE_GETVERSION_CMD 0x91 Byte 0: Mfg ID Byte 1: CID = CID (CSAFE Class Identifier) Byte 2: Model Byte 3: HW Version (LS) Byte 4: HW Version (MS) Byte 5: SW Version (LS) Byte 6: SW Version (MS)

Mfg: 09 ==> LifeFitness CID: 02 ==> Elliptical crosstrainer rest meaning dependent on the manufacturer...

{'cmd': 145, 'hexcmd': '0x91', 'status': 1, 'status_description': 'Ready', 'command': 'cmdGetVersion', 'description': 'Codes used to uniquely identify equipment and ROM version.2', 'bytes': 5, 'data': '090206090b', 'byte': 9, 'value': 521}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment