Skip to content

Instantly share code, notes, and snippets.

@justengel
Last active August 19, 2020 17:03
Show Gist options
  • Save justengel/fefa8d8fe195ea9e9db8626c5eb3739a to your computer and use it in GitHub Desktop.
Save justengel/fefa8d8fe195ea9e9db8626c5eb3739a to your computer and use it in GitHub Desktop.
Asyncio serial port logger. This works, but has not been tested for efficiency.
"""Asyncio serial port logger.
Requirements:
* pyserial
* pyserial-asyncio
* aiofiles
"""
import os
import datetime
import socket
import asyncio
import serial_asyncio
import aiofiles
# Globals
HOSTNAME = socket.gethostname()
FILE = None
async def split_file_every(timeout=2, log_directory='/log/'):
global FILE
# Make the log directory if it does not exist
if not os.path.exists(log_directory):
os.makedirs(log_directory)
if log_directory.endswith('/'):
log_directory = log_directory[:-1]
while True:
# Create the file
datestr = datetime.datetime.now().strftime('%Y-%m-%d-%H.%M.%S')
filename = '{}/{}-{}EST.dat'.format(log_directory, HOSTNAME, datestr)
async with aiofiles.open(filename, 'wb') as f:
FILE = f
print('Log opened at ' + str(datestr))
# file_starttime = time.time() # Not needed anymore
# Sleep until split
await asyncio.sleep(timeout * 60)
async def log_data(byts):
global FILE
await FILE.write(byts)
class Reader(asyncio.Protocol):
def connection_made(self, transport):
print('Connection Started')
self.transport = transport
def connection_lost(self, exc):
print('Connection Closed')
asyncio.get_event_loop().stop()
def data_received(self, data):
global FILE
print('Data Received', len(data))
asyncio.get_event_loop().create_task(log_data(data))
if __name__ == '__main__':
import argparse
PORT = '/dev/ttyUSB0'
# PORT = 'COM1'
BAUDRATE = 921600
TIMEOUT = 2 # Minutes
LOG_DIRECTORY = '/log/'
P = argparse.ArgumentParser(description='Log serial port data to a file.')
P.add_argument('--port', '--com', type=str, default=PORT, help='Serial com port to open.')
P.add_argument('--baudrate', '--baud', type=int, default=BAUDRATE, help='Serial port baudrate.')
P.add_argument('--timeout', '-t', type=float, default=TIMEOUT, help='File timeout in minutes.')
P.add_argument('--log', '-o', type=str, default=LOG_DIRECTORY, help='Log file directory to write to.')
ARGS = P.parse_args()
PORT = ARGS.port
BAUDRATE = ARGS.baudrate
TIMEOUT = ARGS.timeout
LOG_DIRECTORY = ARGS.log
# Run the asyncio loop
loop = asyncio.get_event_loop()
# Create the task that writes the buffer to the file
loop.create_task(split_file_every(TIMEOUT, log_directory=LOG_DIRECTORY))
# Create the serial task
reader = serial_asyncio.create_serial_connection(loop, Reader, PORT, baudrate=BAUDRATE)
loop.run_until_complete(reader)
loop.run_forever()
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment