Last active
March 2, 2018 10:40
-
-
Save pvergain/b779d29ae8deb2949894921d702dbcbd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
"""Exemple d'application Python basée sur pyserial, logging et click.""" | |
# Import standard modules | |
from os.path import abspath, basename, dirname, join, normpath | |
import binascii | |
import csv | |
import string | |
import time | |
from collections import namedtuple | |
import logging | |
from logging.handlers import RotatingFileHandler | |
# Import addons modules | |
######################### | |
# https://pythonhosted.org/pyserial/ | |
import serial | |
# http://click.pocoo.org | |
import click | |
__all__ = ('simul_beagle') | |
""" | |
Fonction exportée : simul_beagle | |
Voir http://www.dabeaz.com/modulepackage/ModulePackage.pdf | |
""" | |
# création de l'objet logger qui va nous servir à écrire dans les logs | |
# =============================================== | |
# CRITICAL 50 Le programme complet agonise. | |
# ERROR 40 Une opération a foirée. | |
# WARNING 30 Pour avertir que quelque chose mérite l’attention | |
# enclenchement d’un mode particulier, detection d’une situation | |
# rare, un lib optionelle peut être installée. | |
# INFO 20 Pour informer de la marche du programme. | |
# Par exemple : “Starting CSV parsing” | |
# DEBUG 10 Pour dumper des information quand vous débuggez. | |
logger = logging.getLogger() | |
# on met le niveau du logger à DEBUG, comme ça il écrit tout | |
logger.setLevel(logging.DEBUG) | |
# création d'un formateur qui va ajouter le temps, le niveau | |
# de chaque message quand on écrira un message dans le log | |
formatter = logging.Formatter('%(asctime)s :: %(levelname)s :: %(message)s') | |
# création d'un handler qui va rediriger une écriture du log vers | |
# un fichier en mode 'append', avec 1 backup et une taille max de 1Mo | |
file_handler = RotatingFileHandler('test_serial_id3.log', 'a', 1000000, 1) | |
# on lui met le niveau sur DEBUG donc tous les messages | |
# de log vont se retrouver dans le fichier de logs | |
file_handler.setLevel(logging.DEBUG) | |
# si on met "INFO", les messages de DEBUG ne seront pas dans le fichier de log | |
# file_handler.setLevel(logging.INFO) | |
file_handler.setFormatter(formatter) | |
logger.addHandler(file_handler) | |
# création d'un second handler qui va rediriger chaque écriture de log | |
# sur la console | |
stream_handler = logging.StreamHandler() | |
stream_handler.setLevel(logging.INFO) | |
# stream_handler.setFormatter(formatter) | |
logger.addHandler(stream_handler) | |
BeagleRecord = namedtuple('BeagleRecord', 'Index, Time, Dur, Len, Err, Dev, Ep, Level, Record Data') | |
""" | |
Définition d'un enregistrement beagle | |
""" | |
def beagle_records(filename): | |
"""Définition d'un générateur renvoyant un enregistrement 'beagle'. | |
Note:: | |
En employant un générateur, on économise beaucoup de mémoire. | |
Args: | |
filename (str) : le nom complet du fichier. | |
Yields: | |
BeagleRecord: a beagle record. | |
Raises: | |
Exception: the filename does not exist. | |
""" | |
try: | |
for beagle_record in map(BeagleRecord._make, csv.reader(open(filename, "r"))): | |
yield beagle_record | |
except Exception as e: | |
logger.error('In beagle_records() exception:{}'.format(filename)) | |
@click.command() | |
@click.option('--beagle_file', default='C:\projects_id3\P5E627\XLOG9X324_python_Simul\data\CR_beagle_2009_11_09_all_gen.csv', | |
help='Le nom du fichier contenant les enregistrements beagle') | |
@click.option('--nb_max_records', default=1000000, | |
help="Nombre maximal d'enregistrements a lire (1000000 par defaut)") | |
def read_beagle_file(beagle_file, nb_max_records): | |
"""read_beagle_file().""" | |
logger.info("beagle_file:{}".format(beagle_file)) | |
logger.info("abspath:{}".format(abspath(beagle_file))) | |
logger.info("nb_max_records:{}".format(nb_max_records)) | |
nb_records = 0 | |
for beagle_record in beagle_records(abspath(beagle_file)): | |
nb_records=nb_records+1 | |
if ((nb_records % 200) == 0): | |
logger.info("{} beagle records".format(nb_records)) | |
if (nb_records > nb_max_records): | |
break; | |
@click.command() | |
@click.option('--serial_port', default=5, | |
help='Le numero de port COM') | |
@click.option('--beagle_file', default='../CR_beagle_2009_11_09_all_gen.csv', | |
help='Le nom du fichier contenant les enregistrements beagle') | |
@click.option('--nb_max_records', default=100000000000, | |
help="Nombre maximal d'enregistrements a lire (100000000000 par defaut)") | |
def simul_beagle(serial_port, beagle_file, nb_max_records): | |
"""simul_beagle. | |
Exemples d'appel: | |
> python serial_id3.py --serial_port=4 | |
> python serial_id3.py --beagle_file='C:\projects_id3\P5E627\XLOG9X324_python_Simul\data\CR_beagle_20091109_small_gen.csv' | |
""" | |
logger.info("serial port:{}".format(serial_port)) | |
logger.info("beagle_file:{}".format(beagle_file)) | |
logger.info("normpath:{}".format(normpath(beagle_file))) | |
logger.info("nb_max_records:{}".format(nb_max_records)) | |
try: | |
port_com = 'COM'+ str(serial_port-1) | |
ser = serial.Serial(port_com, 115200) | |
logger.info("serial port:{}".format(serial_port)) | |
except serial.SerialException as e : | |
logger.error("Unable to open port:{}, please check port number".format(port_com)) | |
return | |
try : | |
Table = string.maketrans('','') | |
RemoveNonHexCharsTable = string.translate(Table, Table, string.hexdigits) | |
CheckBadCharTable = string.translate(Table, Table, string.punctuation) | |
UniformSepTable = string.maketrans(string.punctuation,' '.ljust(len(string.punctuation))) | |
LastTime = time.clock() | |
PreviousTimeStamp = None | |
nb_records=0; | |
for beagle_record in beagle_records(normpath(beagle_file)): | |
nb_records=nb_records+1 | |
if ((nb_records % 200) == 0): | |
logger.info("{} beagle records".format(nb_records)) | |
if (nb_records > nb_max_records): | |
break; | |
SplittedTime = string.translate(beagle_record.Time, UniformSepTable).split() | |
Minutes, Secondes, Millisecondes = int(SplittedTime[0]), int(SplittedTime[1]), int(SplittedTime[2]) | |
Microsecondes , Nanosecondes = int(SplittedTime[3]), int(SplittedTime[4]) | |
CurrentTimeStamp = (Minutes * 60) + Secondes + (Millisecondes / 1000.0) + (Microsecondes / 1000000.0) + (Nanosecondes / 1000000000.0) | |
# to comment if no delay between frames. | |
if PreviousTimeStamp is not None : | |
if CurrentTimeStamp > PreviousTimeStamp : | |
time.sleep(CurrentTimeStamp-PreviousTimeStamp) | |
else: | |
PreviousTimeStamp = CurrentTimeStamp | |
else: | |
PreviousTimeStamp = CurrentTimeStamp | |
if len(string.translate(beagle_record.Data, Table, CheckBadCharTable)) == 0 : | |
try : | |
RawData = binascii.a2b_hex(string.translate(beagle_record.Data, Table, RemoveNonHexCharsTable)) | |
ser.write(RawData) | |
except TypeError : | |
pass | |
PreviousTimeStamp = CurrentTimeStamp | |
finally : | |
logger.info("{} records processed".format(nb_records)) | |
logger.info("Closing serial port ...") | |
ser.close() | |
if __name__ == '__main__': | |
simul_beagle() | |
# read_beagle_file() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment