Skip to content

Instantly share code, notes, and snippets.

@MihaelIsaev
Created April 26, 2025 11:57
Show Gist options
  • Save MihaelIsaev/52fd61b73bc66440923f9ed88d462a19 to your computer and use it in GitHub Desktop.
Save MihaelIsaev/52fd61b73bc66440923f9ed88d462a19 to your computer and use it in GitHub Desktop.
Serial Server + Client to forward serial port into docker container
#!/usr/bin/env python3
import os
import pty
import select
import socket
import serial
import threading
import termios
import fcntl
import sys
import time
# === CONFIG ===
TCP_HOST = "host.docker.internal"
TCP_PORT = 33329
BAUDRATE = 115200
PTY_NAME = "/dev/ttyS0"
def create_virtual_serial():
master_fd, slave_fd = pty.openpty()
pty_name = os.ttyname(slave_fd)
# Force PTY name if needed (e.g., /dev/ttyESP32 symlink)
try:
if os.path.exists(PTY_NAME):
os.remove(PTY_NAME)
os.symlink(pty_name, PTY_NAME)
print(f"[client] Linked PTY: {PTY_NAME}{pty_name}")
except Exception as e:
print(f"[client] Failed to link PTY: {e}")
return None, None, None
return master_fd, slave_fd, pty_name
def proxy_loop(master_fd, ser):
print("[client] Starting proxy loop...")
try:
while True:
rlist, _, _ = select.select([master_fd, ser], [], [])
if master_fd in rlist:
data = os.read(master_fd, 1024)
if data:
ser.write(data)
if ser in rlist:
data = ser.read(ser.in_waiting or 1)
if data:
os.write(master_fd, data)
except Exception as e:
print(f"[client] Proxy loop error: {e}")
def monitor_control_lines(master_fd, ser):
print("[client] Monitoring DTR/RTS...")
prev_dtr = None
prev_rts = None
while True:
try:
# Read current state from the PTY
buf = fcntl.ioctl(master_fd, termios.TIOCMGET, b"\x00\x00\x00\x00")
state = int.from_bytes(buf, byteorder='little')
dtr = bool(state & termios.TIOCM_DTR)
rts = bool(state & termios.TIOCM_RTS)
if dtr != prev_dtr:
ser.dtr = dtr
print(f"[client] DTR changed: {dtr}")
prev_dtr = dtr
if rts != prev_rts:
ser.rts = rts
print(f"[client] RTS changed: {rts}")
prev_rts = rts
time.sleep(0.05)
except Exception as e:
print(f"[client] Error monitoring DTR/RTS: {e}")
break
def main():
print(f"[client] Connecting to {TCP_HOST}:{TCP_PORT}...")
try:
ser = serial.serial_for_url(f"socket://{TCP_HOST}:{TCP_PORT}", baudrate=BAUDRATE, timeout=0)
except Exception as e:
print(f"[client] Could not open TCP socket: {e}")
return
master_fd, slave_fd, pty_name = create_virtual_serial()
if master_fd is None:
return
proxy_thread = threading.Thread(target=proxy_loop, args=(master_fd, ser), daemon=True)
control_thread = threading.Thread(target=monitor_control_lines, args=(master_fd, ser), daemon=True)
proxy_thread.start()
control_thread.start()
proxy_thread.join()
control_thread.join()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n[client] Exiting.")
# macOS: server.py
import socket
import serial
import threading
import time
import logging
import sys
from serial.serialutil import SerialException
# === Configuration ===
SERIAL_PORT = "/dev/cu.usbmodem101" # Update for your system
BAUD_RATE = 115200
TCP_HOST = "0.0.0.0"
TCP_PORT = 33329
RECONNECT_DELAY = 2 # seconds
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] [%(levelname)s] %(message)s",
datefmt="%H:%M:%S"
)
def handle_connection(client_socket, serial_port):
def tcp_to_serial():
try:
while True:
data = client_socket.recv(1024)
if not data:
break
# Control command: 1 byte prefix 0xFF + 1 byte state
if data.startswith(b"\xFF") and len(data) >= 2:
control = data[1]
serial_port.rts = bool(control & 0b10)
serial_port.dtr = bool(control & 0b01)
logging.info(f"[CTRL] Set RTS={serial_port.rts}, DTR={serial_port.dtr}")
continue
serial_port.write(data)
except Exception as e:
logging.warning(f"TCP→Serial thread ended: {e}")
finally:
client_socket.close()
def serial_to_tcp():
try:
while True:
if serial_port.in_waiting:
data = serial_port.read(serial_port.in_waiting)
client_socket.sendall(data)
else:
time.sleep(0.01)
except Exception as e:
logging.warning(f"Serial→TCP thread ended: {e}")
finally:
client_socket.close()
logging.info("TCP client connected, starting bridge threads")
tx_thread = threading.Thread(target=tcp_to_serial, daemon=True)
rx_thread = threading.Thread(target=serial_to_tcp, daemon=True)
tx_thread.start()
rx_thread.start()
tx_thread.join()
rx_thread.join()
logging.info("TCP client disconnected, closing bridge")
def run_bridge():
while True:
try:
logging.info(f"Waiting for serial device: {SERIAL_PORT}")
while True:
try:
serial_port = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=0)
break
except SerialException:
logging.warning(f"Serial port not available: {SERIAL_PORT}")
time.sleep(RECONNECT_DELAY)
logging.info(f"Serial port opened: {SERIAL_PORT} @ {BAUD_RATE} baud")
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((TCP_HOST, TCP_PORT))
server_socket.listen(1)
logging.info(f"Listening on TCP {TCP_HOST}:{TCP_PORT}")
while True:
logging.info("Waiting for TCP client...")
try:
client_socket, addr = server_socket.accept()
logging.info(f"TCP connection from {addr}")
handle_connection(client_socket, serial_port)
except Exception as e:
logging.warning(f"TCP accept failed: {e}")
time.sleep(1)
except Exception as e:
logging.error(f"Fatal error: {e}")
finally:
try:
serial_port.close()
except:
pass
try:
server_socket.close()
except:
pass
logging.info("Restarting bridge in a few seconds...")
time.sleep(RECONNECT_DELAY)
if __name__ == "__main__":
try:
run_bridge()
except KeyboardInterrupt:
logging.info("Bridge interrupted by user. Exiting.")
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment