Skip to content

Instantly share code, notes, and snippets.

@josephsamela
Last active April 2, 2020 23:46
Show Gist options
  • Save josephsamela/5eb3b4774396ebf1d15838948eb76def to your computer and use it in GitHub Desktop.
Save josephsamela/5eb3b4774396ebf1d15838948eb76def to your computer and use it in GitHub Desktop.
import os
import time
import re
from bs4 import BeautifulSoup
import csv
from prettytable import PrettyTable
import threading
import queue
run = True
banner = " __ __ .______ .___ ___. ______ .__ __. __ .___________. ______ .______\n| | | | | _ \ | \/ | / __ \ | \ | | | | | | / __ \ | _ \ \n| |__|
| | |_) | | \ / | | | | | | \| | | | `---| |----`| | | | | |_) |\n| __ | | _ < | |\/| | | | | | | . ` | | | | | | | | | | /\n| | | | | |_) | | |
| | | `--' | | |\ | | | | | | `--' | | |\ \----.\n|__| |__| |______/ |__| |__| \______/ |__| \__| |__| |__| \______/ | _| `._____|\n"
def main(q):
while run == True:
# Use websocat to read message from websocket into temporary file.
success = os.system('./websocat_arm-linux -1 -U ws://dmrpi:9000 > temp.txt')
# Parse raw message into
with open('temp.txt', 'r') as f:
ascii_table = parse(f.read())
# Enqueue update
q.put(ascii_table)
# Check every
time.sleep(2)
def parse(raw):
# Define colors
R = "\033[0;31;40m" #RED
G = "\033[0;32;40m" # GREEN
Y = "\033[0;33;40m" # Yellow
B = "\033[0;34;40m" # Blue
N = "\033[0m" # Reset
# Search the html for tables
soup = BeautifulSoup(raw, features="html.parser")
tables = soup.find_all("table")[1]
# Restructure html table for ascii
table = []
for row in tables.select("tr + tr"):
rows = row.find_all("td")
# Channel + Mode
channel, mode = rows[0].text.split("Mode: ")
# Callsign + DMR ID
callsign,x = rows[1].text.split("\xa0\xa0\xa0")
# Time Connected
time_connected = rows[2].text[0:6]
# Loss rate
loss_rate = rows[2].text[6:]
# TS1
ts1_slot = rows[3].text
ts1_src = rows[4].text
ts1_dst = rows[5].text
# TS2
ts2_slot = rows[6].text
ts2_src = rows[7].text
ts2_dst = rows[8].text
# Assemble table rows
table.append([channel, mode, callsign, time_connected, loss_rate, R+ts1_slot+N, Y+ts1_src+N, Y+ts1_dst+N])
table.append([ '', '', '', '', '', B+ts2_slot+N, Y+ts2_src+N, Y+ts2_dst+N])
# Convert table into a "pretty" table for ascii terminal
ascii_table = PrettyTable()
ascii_table.field_names = ["Channel", "Mode", "Callsign (DMR ID)", "Time Connected", "TX/RX/Lost", "Slot", "Source Subscriber", "Destination"]
for t in table:
ascii_table.add_row(t)
return str(ascii_table)
if __name__ == '__main__':
q = queue.Queue()
threading.Thread(target=main, args=(q,)).start()
while run == True:
try:
while not q.empty():
os.system('clear')
print(banner + '\n' + q.get())
except:
run = False
@josephsamela
Copy link
Author

Here's an screenshot of the software running. The ideas is to make checking hbmonitor easy in non-graphical environments. For example, a raspberry pi running without desktop environment or an ssh sessions.

hbmon

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment