Skip to content

Instantly share code, notes, and snippets.

@dotchang
Last active September 28, 2022 08:37
Show Gist options
  • Save dotchang/8438a5df8487aa3ca94917ea697a99fa to your computer and use it in GitHub Desktop.
Save dotchang/8438a5df8487aa3ca94917ea697a99fa to your computer and use it in GitHub Desktop.
import serial
from serial import SerialException
import time
from datetime import datetime
import sys
import zmq
from pythonosc import osc_message_builder
from distutils.util import strtobool
context = zmq.Context()
sock = context.socket(zmq.PUB)
sock.bind("tcp://*:5556")
# LED display rule. Normal Off.
DISPLAY_RULE_NORMALLY_OFF = 0
# LED display rule. Normal On.
DISPLAY_RULE_NORMALLY_ON = 1
def s16(value):
return -(value & 0x8000) | (value & 0x7fff)
def calc_crc(buf, length):
"""
CRC-16 calculation.
"""
crc = 0xFFFF
for i in range(length):
crc = crc ^ buf[i]
for i in range(8):
carrayFlag = crc & 1
crc = crc >> 1
if (carrayFlag == 1):
crc = crc ^ 0xA001
crcH = crc >> 8
crcL = crc & 0x00FF
return (bytearray([crcL, crcH]))
def now_utc_str():
"""
Get now utc.
"""
return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
class Omron_2jciebu:
def __init__(self):
self.ser = None
self.port = ""
# device information
self.model_number = ""
self.serial_number = ""
self.firmware_revision = ""
self.hardware_revision = ""
self.manufacture_name = ""
# latest_data
self.time_measured = ""
self.temperature = ""
self.relative_humidity = ""
self.ambient_light = ""
self.barometric_pressure = ""
self.sound_noise = ""
self.eTVOC = ""
self.eCO2 = ""
self.discomfort_index = ""
self.heat_stroke = ""
self.vibration_information = ""
self.si_value = ""
self.pga = ""
self.seismic_intensity = ""
self.acceleration_x_axis = ""
self.acceleration_y_axis = ""
self.acceleration_z_axis = ""
self.temperature_flag = ""
self.relative_humidity_flag = ""
self.ambient_light_flag = ""
self.barometric_pressure_flag = ""
self.sound_noise_flag = ""
self.etvoc_flag = ""
self.eco2_flag = ""
self.discomfort_index_flag = ""
self.heat_stroke_flag = ""
self.si_value_flag = ""
self.pga_flag = ""
self.seismic_intensity_flag = ""
def __del__(self):
try:
if self.ser is None:
raise ValueError("ser is None")
self.ser.close()
except ValueError:
return
def setup(self, PORT):
# Serial
self.port = PORT
self.connect()
def connect(self):
try:
self.ser = serial.Serial(self.port, 115200, serial.EIGHTBITS, serial.PARITY_NONE)
except SerialException:
return
def close(self):
try:
if self.ser is None:
raise ValueError("ser is None")
return self.ser.close()
except ValueError:
return
def isOpen(self):
try:
if self.ser is None:
raise ValueError("ser is None")
return self.ser.isOpen()
except ValueError:
return 0
def write(self, command):
try:
if self.ser is None:
raise ValueError("ser is None")
tmp = self.ser.write(command)
except SerialException:
self.close()
except ValueError:
return
def read(self):
try:
if self.ser is None:
raise ValueError("ser is None")
return self.ser.read(self.ser.inWaiting())
except SerialException:
self.close()
return
except ValueError:
return
def device_information(self):
# Get Device Information 0x180A
command = bytearray([0x52, 0x42, 0x05, 0x00, 0x01, 0x0a, 0x18])
command = command + calc_crc(command, len(command))
self.write(command)
time.sleep(0.1)
info = self.read()
time.sleep(0.1)
if info is None:
return
try:
self.model_number = info[7:17].decode()
self.serial_number = info[17:27].decode()
self.firmware_revision = info[27:32].decode()
self.hardware_revision = info[32:37].decode()
self.manufacture_name = info[37:42].decode()
except UnicodeError:
self.device_information()
def print_device_information(self):
print("")
print("Model number:" + self.model_number)
print("Serial Number:" + self.serial_number)
print("Firmware Revision:" + self.firmware_revision)
print("Hardware Revision:" + self.hardware_revision)
print("Manufacturer Name:" + self.manufacture_name)
def latest_sensing_data(self):
# Get latest sensing sata
command = bytearray([0x52, 0x42, 0x05, 0x00, 0x01, 0x12, 0x50])
command = command + calc_crc(command, len(command))
self.write(command)
time.sleep(0.1)
data = self.read()
time.sleep(0.1)
if data is None:
return
self.time_measured = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
self.temperature = str( s16(int(hex(data[9]) + '{:02x}'.format(data[8], 'x'), 16)) / 100)
self.relative_humidity = str(int(hex(data[11]) + '{:02x}'.format(data[10], 'x'), 16) / 100)
self.ambient_light = str(int(hex(data[13]) + '{:02x}'.format(data[12], 'x'), 16))
self.barometric_pressure = str(int(hex(data[17]) + '{:02x}'.format(data[16], 'x')
+ '{:02x}'.format(data[15], 'x') + '{:02x}'.format(data[14], 'x'), 16) / 1000)
self.sound_noise = str(int(hex(data[19]) + '{:02x}'.format(data[18], 'x'), 16) / 100)
self.eTVOC = str(int(hex(data[21]) + '{:02x}'.format(data[20], 'x'), 16))
self.eCO2 = str(int(hex(data[23]) + '{:02x}'.format(data[22], 'x'), 16))
def print_sensing_data(self):
print("")
print("Time measured:" + self.time_measured)
print("Temperature:" + self.temperature)
print("Relative humidity:" + self.relative_humidity)
print("Ambient light:" + self.ambient_light)
print("Barometric pressure:" + self.barometric_pressure)
print("Sound noise:" + self.sound_noise)
print("eTVOC:" + self.eTVOC)
print("eCO2:" + self.eCO2)
def latest_calculation_data(self):
# Get latest calculation data
command = bytearray([0x52, 0x42, 0x05, 0x00, 0x01, 0x13, 0x50])
command = command + calc_crc(command, len(command))
self.write(command)
time.sleep(0.1)
data = self.read()
time.sleep(0.1)
if data is None:
return
self.time_measured = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
self.discomfort_index = str(int(hex(data[9]) + '{:02x}'.format(data[8], 'x'), 16) / 100)
self.heat_stroke = str(s16(int(hex(data[11]) + '{:02x}'.format(data[10], 'x'), 16)) / 100)
self.vibration_information = str(int(hex(data[12]), 16))
self.si_value = str(int(hex(data[14]) + '{:02x}'.format(data[13], 'x'), 16) / 10)
self.pga = str(int(hex(data[16]) + '{:02x}'.format(data[15], 'x'), 16) / 10)
self.seismic_intensity = str(int(hex(data[18]) + '{:02x}'.format(data[17], 'x'), 16) / 1000)
self.acceleration_x_axis = str(s16(int(hex(data[20]) + '{:02x}'.format(data[19], 'x'), 16)) / 100)
self.acceleration_y_axis = str(s16(int(hex(data[22]) + '{:02x}'.format(data[21], 'x'), 16)) / 100)
self.acceleration_z_axis = str(s16(int(hex(data[24]) + '{:02x}'.format(data[23], 'x'), 16)) / 100)
def print_calculation_data(self):
print("")
print("Discomfort index:" + self.discomfort_index)
print("Heat stroke:" + self.heat_stroke)
print("Vibration information:" + self.vibration_information)
print("SI value:" + self.si_value)
print("PGA:" + self.pga)
print("Seismic intensity:" + self.seismic_intensity)
print("Acceleration(X-axis):" + self.acceleration_x_axis)
print("Acceleration(Y-axis):" + self.acceleration_y_axis)
print("Acceleration(Z-axis):" + self.acceleration_z_axis)
def latest_sensing_flag(self):
# Get latest sensing flag
command = bytearray([0x52, 0x42, 0x05, 0x00, 0x01, 0x14, 0x50])
command = command + calc_crc(command, len(command))
self.write(command)
time.sleep(0.1)
data = self.read()
time.sleep(0.1)
if data is None:
return
self.temperature_flag = str(int(hex(data[9]) + '{:02x}'.format(data[8], 'x'), 16))
self.relative_humidity_flag = str(int(hex(data[11]) + '{:02x}'.format(data[10], 'x'), 16))
self.ambient_light_flag = str(int(hex(data[13]) + '{:02x}'.format(data[12], 'x'), 16))
self.barometric_pressure_flag = str(int(hex(data[15]) + '{:02x}'.format(data[14], 'x'), 16))
self.sound_noise_flag = str(int(hex(data[17]) + '{:02x}'.format(data[16], 'x'), 16))
self.etvoc_flag = str(int(hex(data[19]) + '{:02x}'.format(data[18], 'x'), 16))
self.eco2_flag = str(int(hex(data[21]) + '{:02x}'.format(data[20], 'x'), 16))
def print_sensing_flag(self):
print("")
print("Temperature flag:" + self.temperature_flag)
print("Relative humidity flag:" + self.relative_humidity_flag)
print("Ambient light flag:" + self.ambient_light_flag)
print("Barometric pressure flag:" + self.barometric_pressure_flag)
print("Sound noise flag:" + self.sound_noise_flag)
print("eTVOC flag:" + self.etvoc_flag)
print("eCO2 flag:" + self.eco2_flag)
def latest_calculation_flag(self):
# Get latest calculation flag
command = bytearray([0x52, 0x42, 0x05, 0x00, 0x01, 0x15, 0x50])
command = command + calc_crc(command, len(command))
self.write(command)
time.sleep(0.1)
data = self.read()
time.sleep(0.1)
if data is None:
return
self.discomfort_index_flag = str(int(hex(data[9]) + '{:02x}'.format(data[8], 'x'), 16))
self.heat_stroke_flag = str(int(hex(data[11]) + '{:02x}'.format(data[10], 'x'), 16))
self.si_value_flag = str(int(hex(data[12]), 16))
self.pga_flag = str(int(hex(data[13]), 16))
self.seismic_intensity_flag = str(int(hex(data[14]), 16))
def print_calculation_flag(self):
print("")
print("Discomfort index flag:" + self.discomfort_index_flag)
print("Heat stroke flag:" + self.heat_stroke_flag)
print("SI value flag:" + self.si_value_flag)
print("PGA flag:" + self.pga_flag)
print("Seismic intensity flag:" + self.seismic_intensity_flag)
def latest_data_long(self):
# Get Latest data Long. 0x5021
command = bytearray([0x52, 0x42, 0x05, 0x00, 0x01, 0x21, 0x50])
command = command + calc_crc(command, len(command))
self.write(command)
time.sleep(0.1)
data = self.read()
time.sleep(0.1)
if data is None:
return
self.time_measured = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
self.temperature = str( s16(int(hex(data[9]) + '{:02x}'.format(data[8], 'x'), 16)) / 100)
self.relative_humidity = str(int(hex(data[11]) + '{:02x}'.format(data[10], 'x'), 16) / 100)
self.ambient_light = str(int(hex(data[13]) + '{:02x}'.format(data[12], 'x'), 16))
self.barometric_pressure = str(int(hex(data[17]) + '{:02x}'.format(data[16], 'x')
+ '{:02x}'.format(data[15], 'x') + '{:02x}'.format(data[14], 'x'), 16) / 1000)
self.sound_noise = str(int(hex(data[19]) + '{:02x}'.format(data[18], 'x'), 16) / 100)
self.eTVOC = str(int(hex(data[21]) + '{:02x}'.format(data[20], 'x'), 16))
self.eCO2 = str(int(hex(data[23]) + '{:02x}'.format(data[22], 'x'), 16))
self.discomfort_index = str(int(hex(data[25]) + '{:02x}'.format(data[24], 'x'), 16) / 100)
self.heat_stroke = str(s16(int(hex(data[27]) + '{:02x}'.format(data[26], 'x'), 16)) / 100)
self.vibration_information = str(int(hex(data[28]), 16))
self.si_value = str(int(hex(data[30]) + '{:02x}'.format(data[29], 'x'), 16) / 10)
self.pga = str(int(hex(data[32]) + '{:02x}'.format(data[31], 'x'), 16) / 10)
self.seismic_intensity = str(int(hex(data[34]) + '{:02x}'.format(data[33], 'x'), 16) / 1000)
self.temperature_flag = str(int(hex(data[36]) + '{:02x}'.format(data[35], 'x'), 16))
self.relative_humidity_flag = str(int(hex(data[38]) + '{:02x}'.format(data[37], 'x'), 16))
self.ambient_light_flag = str(int(hex(data[40]) + '{:02x}'.format(data[39], 'x'), 16))
self.barometric_pressure_flag = str(int(hex(data[42]) + '{:02x}'.format(data[41], 'x'), 16))
self.sound_noise_flag = str(int(hex(data[44]) + '{:02x}'.format(data[43], 'x'), 16))
self.etvoc_flag = str(int(hex(data[46]) + '{:02x}'.format(data[45], 'x'), 16))
self.eco2_flag = str(int(hex(data[48]) + '{:02x}'.format(data[47], 'x'), 16))
self.discomfort_index_flag = str(int(hex(data[50]) + '{:02x}'.format(data[49], 'x'), 16))
self.heat_stroke_flag = str(int(hex(data[52]) + '{:02x}'.format(data[51], 'x'), 16))
self.si_value_flag = str(int(hex(data[53]), 16))
self.pga_flag = str(int(hex(data[54]), 16))
self.seismic_intensity_flag = str(int(hex(data[55]), 16))
def print_latest_data_long(self):
print("")
print("Time measured:" + self.time_measured)
print("Temperature:" + self.temperature)
print("Relative humidity:" + self.relative_humidity)
print("Ambient light:" + self.ambient_light)
print("Barometric pressure:" + self.barometric_pressure)
print("Sound noise:" + self.sound_noise)
print("eTVOC:" + self.eTVOC)
print("eCO2:" + self.eCO2)
print("Discomfort index:" + self.discomfort_index)
print("Heat stroke:" + self.heat_stroke)
print("Vibration information:" + self.vibration_information)
print("SI value:" + self.si_value)
print("PGA:" + self.pga)
print("Seismic intensity:" + self.seismic_intensity)
print("Temperature flag:" + self.temperature_flag)
print("Relative humidity flag:" + self.relative_humidity_flag)
print("Ambient light flag:" + self.ambient_light_flag)
print("Barometric pressure flag:" + self.barometric_pressure_flag)
print("Sound noise flag:" + self.sound_noise_flag)
print("eTVOC flag:" + self.etvoc_flag)
print("eCO2 flag:" + self.eco2_flag)
print("Discomfort index flag:" + self.discomfort_index_flag)
print("Heat stroke flag:" + self.heat_stroke_flag)
print("SI value flag:" + self.si_value_flag)
print("PGA flag:" + self.pga_flag)
print("Seismic intensity flag:" + self.seismic_intensity_flag)
def led_on(self):
# LED On. Color of Green.
command = bytearray([0x52, 0x42, 0x0a, 0x00, 0x02, 0x11, 0x51, DISPLAY_RULE_NORMALLY_ON, 0x00, 0, 255, 0])
command = command + calc_crc(command, len(command))
self.write(command)
time.sleep(0.1)
ret = self.read()
time.sleep(0.1)
def led_off(self):
# LED Off.
command = bytearray([0x52, 0x42, 0x0a, 0x00, 0x02, 0x11, 0x51, DISPLAY_RULE_NORMALLY_OFF, 0x00, 0, 0, 0])
command = command + calc_crc(command, len(command))
self.write(command)
time.sleep(0.1)
def zmq_pub(self):
msg = osc_message_builder.OscMessageBuilder(address="/2jcie-bu01")
# info
msg.add_arg(self.model_number, "s")
msg.add_arg(self.serial_number, "s")
msg.add_arg(self.firmware_revision, "s")
msg.add_arg(self.hardware_revision, "s")
msg.add_arg(self.manufacture_name, "s")
# latest data
msg.add_arg(self.time_measured, "s")
msg.add_arg(float(self.temperature), "f")
msg.add_arg(float(self.relative_humidity), "f")
msg.add_arg(int(self.ambient_light), "i")
msg.add_arg(float(self.barometric_pressure), "f")
msg.add_arg(float(self.sound_noise), "f")
msg.add_arg(int(self.eTVOC), "i")
msg.add_arg(int(self.eCO2), "i")
msg.add_arg(float(self.discomfort_index), "f")
msg.add_arg(float(self.heat_stroke), "f")
msg.add_arg(int(self.vibration_information), "i")
msg.add_arg(float(self.si_value), "f")
msg.add_arg(float(self.pga), "f")
msg.add_arg(float(self.seismic_intensity), "f")
msg.add_arg(strtobool(self.temperature_flag), "i")
msg.add_arg(strtobool(self.relative_humidity_flag), "i")
msg.add_arg(strtobool(self.ambient_light_flag), "i")
msg.add_arg(strtobool(self.barometric_pressure_flag), "i")
msg.add_arg(strtobool(self.sound_noise_flag), "i")
msg.add_arg(strtobool(self.etvoc_flag), "i")
msg.add_arg(strtobool(self.eco2_flag), "i")
msg.add_arg(strtobool(self.discomfort_index_flag), "i")
msg.add_arg(strtobool(self.heat_stroke_flag), "i")
msg.add_arg(strtobool(self.si_value_flag), "i")
msg.add_arg(strtobool(self.pga_flag), "i")
msg.add_arg(strtobool(self.seismic_intensity_flag), "i")
# acceleration
msg.add_arg(float(self.acceleration_x_axis), "f")
msg.add_arg(float(self.acceleration_y_axis), "f")
msg.add_arg(float(self.acceleration_z_axis), "f")
m = msg.build()
sock.send(m.dgram)
if __name__ == '__main__':
num_of_device = 3
omron_2jciebu = []
flag_connect = 0
for i in range(num_of_device):
omron_2jciebu.append(Omron_2jciebu())
omron_2jciebu[i].setup("COM" + str(3+i))
for bu in omron_2jciebu:
#if bu.isOpen():
bu.led_on()
bu.device_information()
bu.print_device_information()
flag_connect = flag_connect + 1
try:
while(flag_connect):
flag_connect = 0
for bu in omron_2jciebu:
if bu.isOpen():
# bu.latest_data_long()
bu.latest_sensing_data()
bu.latest_calculation_data()
bu.latest_sensing_flag()
bu.latest_calculation_flag()
# bu.print_latest_data_long()
bu.print_sensing_data()
bu.print_calculation_data()
bu.print_sensing_flag()
bu.print_calculation_flag()
bu.zmq_pub()
flag_connect = flag_connect + 1
else:
bu.connect()
if bu.isOpen():
bu.led_on()
bu.device_information()
bu.print_device_information()
time.sleep(1)
except KeyboardInterrupt:
# LED Off.
for bu in omron_2jciebu:
bu.led_off()
time.sleep(1)
# script finish.
sys.exit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment