Skip to content

Instantly share code, notes, and snippets.

@tai
Created December 8, 2021 14:46
Show Gist options
  • Save tai/6b96f91a11e5a24ab3293d4df6f3e134 to your computer and use it in GitHub Desktop.
Save tai/6b96f91a11e5a24ab3293d4df6f3e134 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Unofficial CLI tool to control FNIRSI Power Source (DC580, DC6006L, ...)
See Also:
- https://github.com/jcheger/fnirsi-dc580-protocol
"""
import sys
import os
import time
import re
import json
import logging
from serial import Serial
from argparse import ArgumentParser
log = logging.getLogger(__name__)
def usage_format():
p = os.path.basename(sys.argv[0])
return """
{p} - Control FNIRSI DC Power Supply (DC-60006L)
Usage: {p} cmd [cmd...]
Commands:
on: Turn power on
off: Turn power off
stat: Show status
v=<V>: Set output voltage
c=<A>: Set output current
ovp=<V>: Set over-voltage protection
ocp=<A>: Set over-current protection
opp=<W>: Set over-power protection
ohp_enable=[01]: Enable over-hour protection
ohp=<HH:MM:SS>: Set OHP limit
sleep=<sec>: Sleep for given seconds
cmd=<buf>: Send raw command bytes
Example:
# Output 5V100mA for ~3s
$ {p} v=5000 c=100 on sleep=3 off
# Show status
$ {p} stat
""".lstrip().format(**locals())
def usage():
sys.stderr.write(usage_format())
sys.exit(0)
class GenericPS(object):
def __init__(self, port):
self.sio = Serial(port, baudrate=115200, xonxoff=True)
self.reset()
def reset(self):
"""Stops logging and clears internal buffer"""
self.set('log', 0)
for i in range(3):
if self.sio.read(self.sio.in_waiting) == b'':
return True
time.sleep(0.5)
return False
def send(self, cmd, wait=0.5):
"""Sends low-level control command"""
log.debug("write: " + cmd)
time.sleep(wait)
self.sio.write((cmd + "\r\n").encode('ascii'))
def set(self, key, value):
"""Controls a unit with high-level parameter. Note units are in V/A/W"""
if key in ('enable'):
self.send("N" if value else "F")
elif key in ('log', 'logging'):
self.send("Q" if value else "W")
elif key in ('protect', 'protection') and not value:
self.send("Z")
elif key in ('v', 'voltage'):
self.send("V%04d" % (value * 100))
elif key in ('c', 'current'):
self.send("I%04d" % (value * 1000))
elif key in ('ovp'):
self.send("B%04d" % (value * 100))
elif key in ('ocp'):
self.send("D%04d" % (value * 1000))
elif key in ('opp'):
self.send("E%04d" % (value * 10))
elif key in ('ohp'):
hms = list(map(int, value.split(":")))
self.send("H%02d" % hms[0])
self.send("M%02d" % hms[1])
self.send("S%02d" % hms[2])
elif key in ('ohp_enable'):
self.send("X" if value else "Y")
elif key in ('mode'):
self.send("O" if value == 'm1' else "P")
def on(self):
"""Powers on a unit"""
self.set('power', 1)
def off(self):
"""Powers off a unit"""
self.set('power', 0)
def stat(self):
"""Returns a merged dict of several log trace captures"""
stat = {}
for ret in self.trace(nr=3, timeout=1):
if ret:
stat.update(ret)
return stat if stat else None
def trace(self, nr=10, timeout=None):
"""Returns a stream of log trace captures"""
if timeout is None:
timeout = nr * 1.5
t0 = time.time()
buf = b''
while (nr != 0) and (timeout > 0 and (time.time() - t0) < timeout):
buf += self.sio.read(self.sio.in_waiting)
log.debug("buf: " + buf.decode('ascii'))
stat, rest = self.parse_status(buf)
if stat:
yield stat
buf = rest
if nr > 0:
nr -= 1
else:
time.sleep(0.5)
def dump(self):
"""Dumps serial port data"""
while True:
buf = self.sio.read(self.sio.in_waiting)
print(buf.decode('ascii'))
time.sleep(0.5)
def parse_status(self, buf):
"""Extracts first valid log from given buffer. Also returns remaining buffer."""
# match with various possible fragment patterns
results = [
# type-0 fragment
re.match(''.join([
'(?P<voltage>\d{4})A', '(?P<current>\d{4})A', '(?P<power>\d{4})A',
'(\d)A',
'(?P<temperature>\d{3})A',
'(?P<cvcc>\d)A', '(?P<protection>\d)A', '(?P<enable>\d)A',
]), buf.decode('ascii')),
# type-1 fragment
re.match(''.join([
'(?P<ovp>\d{4})A', '(?P<ocp>\d{4})A', '(?P<opp>\d{4,5})A',
'(?P<ohp_enable>\d)A', '(?P<ohp_h>\d\d)A', '(?P<ohp_m>\d\d)A', '(?P<ohp_s>\d\d)A',
]), buf.decode('ascii')),
# type-2 fragment
re.match(''.join([
'(?P<target_voltage>\d{4})A', '(?P<target_current>\d{4})A',
]), buf.decode('ascii')),
# type-3 fragment (type-0 with leading garbage)
re.match(''.join([
'.*?',
'(?P<voltage>\d{4})A', '(?P<current>\d{4})A', '(?P<power>\d{4})A',
'(\d)A',
'(?P<temperature>\d{3})A',
'(?P<cvcc>\d)A', '(?P<protection>\d)A', '(?P<enable>\d)A',
]), buf.decode('ascii')),
]
# take the first valid match
for type,ret in enumerate(results):
if ret is None: continue
stat = {}
for k,v in ret.groupdict().items():
if v is not None: stat[k] = int(v)
if type in (0, 3):
stat['voltage'] /= 100
stat['current'] /= 1000
stat['power'] /= 100
stat['cvcc'] = 'CV' if stat['cvcc'] == 0 else 'CC'
stat['protection'] = ['none', 'OVP', 'OCP', 'OPP', 'OTP' 'OHP'][stat['protection']]
elif type == 1:
stat['ovp'] /= 100
stat['opp'] /= 100
elif type == 2:
stat['target_voltage'] /= 100
stat['target_current'] /= 1000
return stat, buf[ret.end():]
return None, buf
class DC6006L(GenericPS):
pass
class DC580(GenericPS):
pass
def main(opt):
dc = eval(opt.model)(opt.port)
for cmd in opt.args:
ret = re.match('(\w+)=(\S+)', cmd)
if ret:
kv = ret.groups()
if kv[0] == 'sleep':
time.sleep(float(kv[1]))
elif kv[0] == 'trace':
dc.set('log', 1)
for stat in dc.trace(int(kv[1])):
print(json.dumps(stat))
elif kv[0] in ('cmd'):
dc.send(kv[1])
elif kv[0] in ('ohp', 'mode'):
dc.set(key=kv[0], value=kv[1])
else:
dc.set(key=kv[0], value=float(kv[1]))
elif cmd == 'reset':
dc.reset()
elif cmd == 'on':
dc.on()
elif cmd == 'off':
dc.off()
elif cmd == 'dump':
dc.dump()
elif cmd == 'stat':
dc.set('log', 1)
stat = dc.stat()
print(json.dumps(stat))
if __name__ == '__main__':
ap = ArgumentParser()
ap.print_help = usage
ap.add_argument('-D', '--debug', nargs='?', default='INFO')
ap.add_argument('-m', '--model', type=str, default='GenericPS')
ap.add_argument('-v', '--voltage', type=int, default=0)
ap.add_argument('-c', '--current', type=int, default=0)
ap.add_argument('-ovp', '--ovp', type=int, default=0)
ap.add_argument('-ocp', '--ocp', type=int, default=0)
ap.add_argument('-opp', '--opp', type=int, default=0)
ap.add_argument('-ohp', '--ohp', type=str)
ap.add_argument('-p', '--port', type=str, default='/dev/fnirsi-ps0')
ap.add_argument('args', nargs='*')
opt = ap.parse_args()
logging.basicConfig(level=eval('logging.' + opt.debug))
main(opt)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment