Skip to content

Instantly share code, notes, and snippets.

@avrahamshukron
Last active April 17, 2020 20:37
Show Gist options
  • Save avrahamshukron/a9269a5da936bd2f36f1038ca1dcfc15 to your computer and use it in GitHub Desktop.
Save avrahamshukron/a9269a5da936bd2f36f1038ca1dcfc15 to your computer and use it in GitHub Desktop.
Telemetry sender
{
"ventilation_status": {
"timestamp": 2963.83161,
"inspiration_volume": 2139.4583284802043,
"expiration_volume": 0.0,
"avg_inspiration_volume": 2113.9426131820755,
"avg_expiration_volume": 0.0,
"peak_flow": 0.0,
"peak_pressure": 0.0,
"min_pressure": 13.194630759841317,
"bpm": 4,
"o2_saturation_percentage": 21.0,
"current_state": 3,
"alerts": [
{
"code": 1,
"message": "Low Pressure",
"timestamp": 2745.782418
},
{
"code": 256,
"message": "No Breathing",
"timestamp": 2759.035528
},
{
"code": 8,
"message": "High Volume",
"timestamp": 2772.812902
},
{
"code": 4,
"message": "Low Volume",
"timestamp": 2886.8161280000004
}
]
},
"system_status": {
"battery_percentage": 94.0,
"uptime": 280721.36,
"cpu_usage": 59.1,
"ram_usage": 63.1,
"load_avg": [
3.73,
2.4,
1.94
],
"time": "2020-04-17T19:22:49.007061",
"temperatures": {
"k10temp": [
70.25
],
"amdgpu": [
70.0
]
},
"alerts": []
},
"version": "1.4.0-5-gf9f53-*"
}
from dataclasses import field
from datetime import datetime
import psutil
from pydantic import BaseModel
from pydantic.dataclasses import dataclass
from uptime import uptime
class VentilationState(Enum):
Unknown = 0 # State unknown yet
Inhale = 1 # Air is flowing to the lungs
Hold = 2 # PIP is maintained
Exhale = 3 # Pressure is relieved, air flowing out
PEEP = 4 # Positive low pressure is maintained until next cycle.
def get_temperatures():
return {name: [t.current for t in v]
for name, v in psutil.sensors_temperatures().items()}
@dataclass
class Alert:
code: int
message: str
timestamp: float
@dataclass
class SystemStatus:
battery_percentage: float = 0
up_time: float = field(default_factory=uptime)
cpu_usage: float = field(default_factory=psutil.cpu_percent)
ram_usage: float = field(default_factory=lambda: psutil.virtual_memory().percent)
load_avg: tuple = field(default_factory=psutil.getloadavg)
time: datetime = field(default_factory=datetime.now)
temperatures: dict = field(default_factory=get_temperatures)
alerts: list = None
@dataclass
class VentilationStatus:
timestamp: float = None
inspiration_volume: float = None
expiration_volume: float = None
avg_inspiration_volume: float = None
avg_expiration_volume: float = None
peak_flow: float = None
peak_pressure: float = None
min_pressure: float = None
bpm: int = None
o2_saturation_percentage: float = None
current_state: VentilationState = VentilationState.Unknown
alerts: list = None
class StatusReport(BaseModel):
ventilation_status: VentilationStatus = None
system_status: SystemStatus = None
version: str
import time
from argparse import ArgumentParser
from random import random, randint
import requests
from pydantic import BaseModel
HEADERS = {
'Content-Type': 'application/json',
}
class Telemetry(BaseModel):
DeviceType: str = "VMS-19"
DeviceId: str = "123456"
DeviceIp: str = "1.1.1.1"
ComPort: str = "N/A"
P_PEAK: float
P_PEAK_HighAlarm: str = "NORMAL"
P_PEAK_LowAlarm: str = "NORMAL"
V_TE: float
V_TE_High_Alarm: str = "NORMAL"
V_TE_MAND: float = 0
V_TE_MAND_LowAlarm: str = "NORMAL"
V_TI: float
V_TI_HighAlarm1: str = "NORMAL"
V_TI_HighAlarm2: str = "NORMAL"
f_TOT: float = 0
f_TOT_HighAlarm: str = "NORMAL"
O2percent: float
O2percent_HighAlarm: str = "NORMAL"
O2percent_LowAlarm: str = "NORMAL"
I_ratio_E: str = ""
PEEP: float
ETCO2: str = ""
P_MEAN: float
P_I_END: float
V_E_SPONT: float = 0
T_I_to_T_TOT: float = 0
f_to_V_T: float = 0
MandatoryType: str = ""
Timestamp: int
Sender: float = 0
def parse_args():
parser = ArgumentParser()
parser.add_argument("url", help="The API endpoint URL")
parser.add_argument(
"--api-key", "-k", help="The server's API key, for 'x-api-key' header",
default=None)
parser.add_argument(
"--timeout", "-t", default=None, type=float,
help="Timeout for the request, in seconds")
return parser.parse_args()
def build_telemetry(args):
return Telemetry(
Timestamp=int(time.time()),
P_PEAK=(random() * 5 + 20),
O2percent=(random() * 60 + 20),
PEEP=(random() * 4 + 1),
P_I_END=20,
P_MEAN=(random() * 3 + 17),
V_TE=(randint(350, 550)),
V_TI=(randint(350, 550)),
)
def main():
args = parse_args()
if args.api_key is not None:
HEADERS["x-api-key"] = args.api_key
telem = build_telemetry(args)
as_json = telem.json()
print("Sending telemetry:")
print(f"Headers: {HEADERS}")
print("Data:")
print(as_json)
requests.put(args.url, headers=HEADERS, data=as_json, timeout=args.timeout)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment