Skip to content

Instantly share code, notes, and snippets.

@bcopy
Created January 17, 2021 18:15
Show Gist options
  • Save bcopy/bc960a3409226fc8542a3b0fb733ed7d to your computer and use it in GitHub Desktop.
Save bcopy/bc960a3409226fc8542a3b0fb733ed7d to your computer and use it in GitHub Desktop.

ulineprotocol (micro line protocol)

A tiny line protocol parser for micropython

You can test it with simple calls :

from ulineprotocol import *

parsed = ulp_parse("temperature,location=Paris c=28,f=82 28")
print(parsed[0])
print(parsed[1]["location"])
print(parsed[2]["c"])
print(parsed[2]["f"])
print(parsed[3])

print("=======")
parsed = ulp_parse("temperature,location=Paris 28")
print(parsed[0])
print(parsed[1]["location"])
print(parsed[3])

print("=======")
parsed = ulp_parse("temperature 28")
print(parsed[0])
print(parsed[3])

print("=======")
parsed = ulp_parse("temperature c=28,f=82 28")
print(parsed[0])
print(parsed[1])
print(parsed[2]["c"])
print(parsed[2]["f"])
print(parsed[3])

You can also serialize messages with the ulp_serialize method.

########
# A tiny line protocol parser for micropython
def _pop_head_or_none(arr, peek_only = False):
if arr and len(arr)>0:
if peek_only:
return arr[0]
else:
return arr.pop(0)
else:
return None
def ulp_parse(msg):
meas = None
tags = {}
vals = {}
tmstp = None
frags = msg.split(" ")
frag = _pop_head_or_none(frags)
if frag is not None:
measFrags = frag.split(",")
if len(measFrags) > 0:
meas = measFrags[0]
if len(measFrags) > 1:
for tagFrag in measFrags[1:]:
tagKV = tagFrag.split("=")
tags[tagKV[0]] = tagKV[1].strip('"\'')
frag = _pop_head_or_none(frags,True)
if frag is not None:
if("=" in frag):
frag = _pop_head_or_none(frags)
valuesFragment = frag.split(",")
for valueFragment in map(lambda v: v.split("="),valuesFragment):
vals[valueFragment[0]] = valueFragment[1]
frag = _pop_head_or_none(frags)
if frag is not None:
tmstp = int(frag)
return (meas, tags, vals, tmstp)
def ulp_serialize(measurement, tags=None, values=None, timestamp=None):
result = measurement
if tags is not None:
result += (','.join('{}="{}"'.format(key, value) for key, value in tags.items())) + " "
if values is not None:
result += (','.join('{}={}'.format(key, value) for key, value in values.items())) + " "
if timestamp is not None:
result += timestamp
else:
result += str(running_time())
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment