Last active
July 20, 2022 15:14
-
-
Save chrisdel101/ac8d17ac32b8e7214f3fe2a21975d672 to your computer and use it in GitHub Desktop.
Example of the fake serial message exchange; 2 examples of trying to write and read to it, and the resuls
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# It hangs, or else I thik this would be almost correct | |
Python 3.7.3 (default, Jan 22 2021, 20:04:44) | |
[GCC 8.3.0] on linux | |
Type "help", "copyright", "credits" or "license" for more information. | |
>>> from Dyno.Classes.Dyno.Dyno import Dyno | |
ENV development | |
>>> dyno = Dyno() | |
Listening on ('localhost', 65432) | |
>>> dyno.makeSerialConnection() | |
Opening serial: If hangs, check e-stop | |
ENV development | |
command: b'test2\r\n' | |
result: b'I dont understand\r\n' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# send some bytes to the serialPort right away | |
def makeSerialConnection(self): | |
try: | |
# ideall this would be inside the class. Niether work right now | |
serial_exchange = FakeSerialExchange() | |
# create a separate thread that listens on the master device for commands | |
thread = threading.Thread( | |
target=serial_exchange.listener, | |
args=[serial_exchange.pty_tuple[0]] | |
) | |
thread.start() | |
self.serial_port = serial.Serial( | |
port=serial_exchange.slave_file_name, baudrate=dyno_constants.UART_RX_BAUD_RATE, timeout=5 | |
) | |
#testing it immediatley | |
self.serial_port.write(b'test2\r\n') # write the first command | |
res = b"" | |
self.lock2.acquire() | |
while not res.endswith(b'\r\n'): | |
# read the response | |
res += self.serial_port.read() | |
print("result: %s" % res) | |
self.lock2.release() | |
self.serial_port.write(b'QPGS\r\n') # write a second command | |
res = b"" | |
while not res.endswith(b'\r\n'): | |
# read the response | |
res += self.serial_port.read() | |
print("result: %s" % res) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# it accepts only the first write to the serial port, then nothing | |
>>> from Dyno.Classes.Dyno.Dyno import Dyno | |
>>> dyno = Dyno() | |
Listening on ('localhost', 65432) | |
>>> dyno.makeSerialConnection() | |
Opening serial: If hangs, check e-stop | |
ENV development | |
makeSerialConnection: created OK | |
>>> dyno.serial_port | |
Serial<id=0xb5ecc830, open=True>(port='/dev/pts/6', baudrate=230400, bytesize=8, parity='N', stopbits=1, timeout=5, xonxoff=False, rtscts=False, dsrdtr=False) | |
>>> dyno.serial_port.write(b'test2\r\n') | |
7 | |
>>> command: b'test2\r\n' | |
dyno.serial_port.write(b'hello\r\n') | |
7 | |
>>> dyno.serial_port.write(b'test2\r\n') | |
7 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# send some bytes to the serialPort after init. Purpose of two sep checks is b/c this way, the proper way, was not working at all b4. | |
# Now it works a little bit | |
def makeSerialConnection(self): | |
try: | |
# ideall this would be inside the class. Niether work right now | |
serial_exchange = FakeSerialExchange() | |
# create a separate thread that listens on the master device for commands | |
thread = threading.Thread( | |
target=serial_exchange.listener, | |
args=[serial_exchange.pty_tuple[0]] | |
) | |
thread.start() | |
self.serial_port = serial.Serial( | |
port=serial_exchange.slave_file_name, baudrate=dyno_constants.UART_RX_BAUD_RATE, timeout=5 | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from enum import Enum | |
import os | |
import pty | |
import time | |
import threading | |
import logging | |
format = "%(asctime)s: %(message)s" | |
logging.basicConfig( | |
format=format, | |
level=logging.INFO, | |
datefmt="%H:%M:%S" | |
) | |
class FakeSerialExchange: | |
def __init__(self): | |
# create 2 terminals, master/slave | |
self.pty_tuple = self.createPtys() | |
# get name of file_desc, dyno write to this file | |
self.slave_file_name = self.openPty(self.pty_tuple[1]) | |
def listener(self, master_pty): | |
res = b"" | |
while not res.endswith(b"\r\n"): | |
# keep reading one byte at a time until we have a full line | |
res += os.read(master_pty, 1) | |
print("command: %s" % res) | |
# write back the response | |
if res == b'QPGS\r\n': | |
os.write(master_pty, b"correct result\r\n") | |
else: | |
os.write(master_pty, b"I dont understand\r\n") | |
def createPtys(self): | |
master, slave = pty.openpty() | |
return master, slave | |
def openPty(self, slave_pty): | |
filename = os.ttyname(slave_pty) | |
return filename |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment