Skip to content

Instantly share code, notes, and snippets.

@roobixx
Created October 30, 2024 21:00
Show Gist options
  • Save roobixx/366e31405c3befc8d1372f89828dbaec to your computer and use it in GitHub Desktop.
Save roobixx/366e31405c3befc8d1372f89828dbaec to your computer and use it in GitHub Desktop.
GS.py for Tempest
import serial
import time
import struct
import os
# Define the serial ports for console and data
console_port = "COM3" # Change to your specific port for usb_cdc.console
data_port = "COM4" # Change to your specific port for usb_cdc.data
# Display the banner
def display_banner():
os.system('clear')
banner = """
________________________________________________
/ \\
| _________________________________________ |
| | | |
| | #:~ ICSS_satellite_conn | |
| | [*]Status: Connect | |
| | [-] Enter Command: | |
| |_________________________________________| |
| |
\\________________________________________________/
\\___________________________________/
___________________________________________
_-' .-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-. --- `-_
_-'.-.-. .---.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.--. .-.-.`-_
_-'.-.-.-. .---.-.-.-. .---.-.-.-.-.-.-.-.-.-.-`__`. .-.-.-.`-_
_-'.-.-.-.-. .---.-.-.-. .---.-.-.-.-.-.-.-.-.-.-.-.-.---. .-.-.-.`-_
_-'.-.-.-.-.-. .---.-.-.-. .---.-.-.-.-.-.-.-.-.-.-.-.-.-.-.`---'.-.-.-.-.`-_
:-----------------------------------------------------------------------------:
`---._.-----------------------------------------------------------------._.---'
"""
print(banner)
# Example usage
display_banner()
# Configure serial settings
try:
console_serial = serial.Serial(console_port, baudrate=115200, timeout=1)
print("[*] Connected to GS Transmit Port")
data_serial = serial.Serial(data_port, baudrate=115200, timeout=0) # Non-blocking read
print("[*] Connected to GS Receive Port")
except Exception as e:
print(f"Error connecting to GS Ports: {e}")
# Helper functions
def soft_reset_gs():
"""Send a soft reset command to the CircuitPython board."""
print("[+] Performing soft reset...")
console_serial.write(b'\x03') # Send CTRL-C to stop any running script
time.sleep(0.1) # Small delay
console_serial.write(b'\x04') # Send CTRL-D to perform a soft reboot
time.sleep(0.5) # Wait for the reboot to complete
console_serial.flush() # Ensure the reset has been fully processed
print("[*] Ground Station device reset complete.")
def send_command(command):
"""Send a command to the console serial port."""
print(f"Sending command: {command}")
console_serial.write(f"{command}\n".encode('utf-8'))
console_serial.flush()
def receive_replies(timeout=10):
"""Receive multiple replies from the data port until the timeout is reached."""
print("Waiting for replies...")
start_time = time.time()
received_any = False
data_serial.reset_input_buffer() # Clear input buffer to avoid reading old data
while True:
if data_serial.in_waiting > 0:
reply = data_serial.readline().strip()
if reply.startswith(b"Uplink"):
print(f"Received: {reply}")
elif reply.startswith(b"Downlink"):
print(f"Received: {reply}")
else:
unpack_command(reply, start_time)
received_any = True
start_time = time.time() # Reset timer after receiving data
if time.time() - start_time > timeout:
if not received_any:
print("Timeout: No replies received.")
else:
print("Timeout: No more packets received.")
break
time.sleep(0.01) # Small delay to prevent CPU hogging
def unpack_command(packed_data,start_time):
global obcl_data_array
# Ensure packed_data is in bytes format, without double encoding
if isinstance(packed_data, str):
# If it's a string, convert directly to bytes
print(type(packed_data))
packed_data = packed_data.encode('utf-8')
print(type(packed_data))
# If it's a bytes object, remove any unnecessary encoding
#packed_data = packed_data.decode('utf-8', errors='ignore').encode('utf-8')
print(packed_data)
# Extract the identifier and strip null bytes and extra characters
identifier = struct.unpack('4s', packed_data[:4])[0].decode('utf-8').strip('\x00')
if identifier in ["GYRO", "ACCL", "MAGN", "GRAV", "EULR"]:
try:
unpacked = struct.unpack('4s3f', packed_data[:16]) # Adjusted to 16 bytes for 4s3f format
print(f"Formatted Data: {identifier} X: {unpacked[1]}, Y: {unpacked[2]}, Z: {unpacked[3]}")
except struct.error as e:
print(f"Unpacking error: {e}")
elif identifier == "BMED":
try:
unpacked = struct.unpack('4s3f', packed_data)
print(f"BME Data: {identifier} Temp: {unpacked[0]}, Pressure: {unpacked[1]}, Altitude: {unpacked[2]}")
except struct.error as e:
print(f"Unpacking error: {e}")
elif identifier == "POLL":
try:
unpacked = struct.unpack('4s16f', packed_data)
print("Environmental Poll Data:")
for i, value in enumerate(unpacked[1:], 1):
print(f"Sensor {i}: {value}")
pass
except struct.error:
unpacked = struct.unpack('4s17f', packed_data)
print("Environmental Poll Data:")
for i, value in enumerate(unpacked[1:], 1):
print(f"Sensor {i}: {value}")
pass
else:
print("Malformed packet")
elif identifier == "OBCR":
try:
unpacked = struct.unpack('4sf', packed_data)
print(f"Onboard Computer Ram Usage: {unpacked[1]}")
except struct.error as e:
print(f"Unpacking error: {e}")
elif identifier == "OBCD":
try:
unpacked = struct.unpack('4sf', packed_data)
print(f"Onboard Computer Disk Usage: {unpacked[1]}")
except struct.error as e:
print(f"Unpacking error: {e}")
elif identifier == "OBCL":
try:
unpacked = struct.unpack('4s230s', packed_data)
if unpacked[1:][:3] != b"DNE":
print(f"Onboard Computer File Listing:")
for i in unpacked[1:]:
print(i.decode('utf-8'))
except struct.error as e:
print(f"Unpacking error: {e}")
elif identifier == "OBCC":
try:
unpacked = struct.unpack('4sf', packed_data)
print(f"Onboard Computer CPU Usage: {unpacked[1]}")
except struct.error as e:
print(f"Unpacking error: {e}")
elif identifier == "ADCS":
unpacked = struct.unpack('4s7f', packed_data)
print("Satellite Orientation:")
for i, value in enumerate(unpacked[1:], 1):
print(f"Value {i}: {value}")
elif identifier == "EPSS":
try:
unpacked = struct.unpack('4s5if', packed_data)
print("Satellite EPS Status:")
print(f"EPS Error: {unpacked[1]}\nCH1 State: {unpacked[2]}\nCH2 State: {unpacked[3]}\nCH3 State: {unpacked[4]}\nCH4 State: {unpacked[5]}\nBattery Voltage: {unpacked[6]}v")
except struct.error as e:
print(f"Unpacking error: {e}")
elif identifier == "HOST":
try:
unpacked = struct.unpack('4s11s', packed_data)
hostname = unpacked[1].decode('utf-8').strip('\x00')
print(f'Satellite Hostname: {hostname}')
except struct.error as e:
print(f"HOST unpacking error: {e}")
elif identifier == "SOLR":
try:
unpacked = struct.unpack('4s8f', packed_data)
print("Solar Panel Values:")
print(f"X-: {unpacked[1]} V, {unpacked[2]} mA")
print(f"X+: {unpacked[3]} V, {unpacked[4]} mA")
print(f"Y-: {unpacked[5]} V, {unpacked[6]} mA")
print(f"Y+: {unpacked[7]} V, {unpacked[8]} mA")
except struct.error as e:
print(f"SOLR unpacking error: {e}")
elif identifier == "OBCP":
# Unpack the OBCP packet (assuming each OBCP packet contains a 240-byte string of data)
try:
unpacked = struct.unpack('4s230s', packed_data)
obcl_data_array.append(unpacked[1].decode('utf-8').strip('\x00')) # Append the data to the OBCL array
#print(f"Accumulated OBCL Data Array: {obcl_data_array}")
count = len(obcl_data_array)
# Check if the last packet indicates the end of the sequence
if obcl_data_array and is_obcl_data_complete(obcl_data_array[-1]):
final_data = ''.join(obcl_data_array) # Join all the elements to form the final string
print(f"Final OBC Process List Data:")
for i in range(0, len(final_data.split(','))-1):
print(final_data.split(',')[i])
obcl_data_array = [] # Reset array after processing all OBCL packets
except struct.error as e:
print(f"OBCL unpacking error: {e}")
elif identifier == "RETX":
data_length = len(packed_data)
try:
data_legth = len(packed_data)
fmt = f'4s{data_length}s'
print(fmt)
print(len(packed_data))
unpacked = struct.unpack(fmt, packed_data)
print(f"Retransmitted Packet: {unpacked[3]}")
except struct.error as e:
print(f"RETX unpacking error: {e}")
else:
print(f"Unknown identifier or data format: {packed_data}")
def list_commands():
print("""
Available Commands:
- help: Print help menu
- GET_SOLAR: Get Solar Panel Voltages and Currents
- EPS_STATUS: Get All EPS Values
- EPS CH<n>,<0 or1>: Set EPS Channel Off/On
- ENV_POLL: Poll All Environmental Sensors
- GET_GYRO: Get Gyroscope Values (X,Y,Z)
- GET_MAG: Get Magnetometer Values (X,Y,Z)
- GET_ACCEL: Get Acceletation Values (X, Y, Z)
- GET_ORIENTATION: Get Satellite Orientation in Eurler and Quaternion Values
- GET_EULER: Get Satellite Orientation in Euler Values
- GET_QUATERNION: Get Satellite Orientatoin in Quaternion Values
- GET_BME: Get Pressure, Tempature, Humdty, and Altitude Values
- GET_TEMP: Get Tempature from IMU
- OBC_LIST_FILES <filepath>: List Files in Given File Path
- OBC_PROCESSES: List Running Processes on OBC
- OBC_DISK: Get OBC Disk Usage
- OBC_RAM: Get OBC Ram Usage
- OBC_SHUTDOWN: Shut Down OBC
- OBC_RESTART: Reboot OBC
- TAKE_PHOTO: Take Photo Using Onboard Camera
- SEND_IMAGE <image file path>: Send Image Down to Ground Station
- RETRANSMIT <File Path> <packets>: Retransmit Packets From Selected File
- GET_HOSTNAME: Get Hostname of Satellite
""")
def configure_radio_over_serial():
"""Allow user to configure uplink and downlink radios over serial."""
uplink_set = False
downlink_set = False
while not (uplink_set and downlink_set):
if not uplink_set:
uplink = input("Set Uplink Radio (e.g., set_uplink_radio rfm95 915.0): ")
send_command(uplink)
receive_replies(timeout=3)
uplink_set = True if "rfm95" in uplink or "rfm69" in uplink else False
if not downlink_set:
downlink = input("Set Downlink Radio (e.g., set_downlink_radio rfm95 927.0): ")
send_command(downlink)
receive_replies(timeout=3)
downlink_set = True if "rfm95" in downlink or "rfm69" in downlink else False
# Main loop for sending and receiving commands
try:
soft_reset_gs()
configure_radio_over_serial() # Configure radios first
while True:
command = input("Enter command to send: ")
if command.lower() in ["help", "?"]:
print_help()
else:
send_command(command)
receive_replies(timeout=10)
console_serial.reset_output_buffer()
data_serial.reset_input_buffer()
time.sleep(1)
except KeyboardInterrupt:
print("Program interrupted.")
finally:
console_serial.close()
data_serial.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment