Last active
January 1, 2022 17:06
-
-
Save kk7ds/44560412f8887eaf39fd5f4ff125e3ab to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# Copyright 2022 Dan Smith <[email protected]> | |
# | |
# This monitors an aprx rflog and compares packets heard on multiple | |
# interfaces to make sure they arrive reasonably close. It is intended | |
# to monitor a troublesome kpc3+ to make sure the packets received via | |
# it and APRSIS are close together. When using this, enable a wide | |
# range filter on your aprsis connection so that you receive things | |
# back from APRSIS you are likely to hear on RF. | |
# | |
# Run this like: | |
# | |
# tail /tmp/rflog | python3 kpc3mon.py | |
# | |
# For more information, see | |
# http://www.danplanet.com/blog/2021/12/31/fixing-kpc3-kiss-issues/ | |
import collections | |
import datetime | |
import sys | |
import time | |
import colorama as C | |
def cprint(color, string): | |
print(color + string + C.Style.RESET_ALL) | |
class Packet: | |
def __init__(self, line): | |
date, time, self.iface, self.action, packet = line.split(None, 4) | |
self.stamp = datetime.datetime.strptime('%s %s' % (date, time[:8]), | |
'%Y-%m-%d %H:%M:%S') | |
self.stamp = self.stamp.replace(microsecond=int(time[9:]) * 1000) | |
self.src, rest = packet.split('>', 1) | |
path, self.payload = rest.split(':', 1) | |
try: | |
self.dst, self.path = path.split(',', 1) | |
except ValueError: | |
self.dst = path | |
self.path = '' | |
self.fpath = ','.join(x for x in self.path.split(',') | |
if not x.startswith('WIDE')) | |
def __hash__(self): | |
return hash('%s/%s/%s' % (self.src, self.dst, self.payload)) | |
def __eq__(self, other): | |
return hash(self) == hash(other) | |
def __str__(self): | |
return '%s>%s via %s: %r' % (self.src, self.dst, self.iface, | |
self.payload) | |
def __repr__(self): | |
return str(self) | |
def clean(interfaces): | |
now = datetime.datetime.utcnow() | |
limit = datetime.timedelta(minutes=5) | |
stats = collections.defaultdict(int) | |
for interface, packets in interfaces.items(): | |
for packet in list(packets): | |
if now - packet.stamp > limit: | |
packets.remove(packet) | |
stats[interface] += 1 | |
print('Expired unaccounted packets: %s' % ( | |
' '.join('%s=%s' % (k, v) | |
for k, v in stats.items()))) | |
def report(new_packet, old_packet): | |
delta = new_packet.stamp - old_packet.stamp | |
timer = 5 | |
old_if = old_packet.iface | |
new_if = new_packet.iface | |
if old_if != 'APRSIS': | |
#old_if = '%s%s' % (old_if, '*' * old_packet.fpath.count('*')) | |
timer = 10 | |
if new_if != 'APRSIS': | |
#new_if = '%s%s' % (new_if, '*' * new_packet.fpath.count('*')) | |
timer = 10 | |
# Direct packets are held to a higher standard than | |
# ones that came through a digi | |
if delta > datetime.timedelta(seconds=timer): | |
color = C.Back.RED | |
else: | |
color = C.Fore.GREEN | |
if delta.total_seconds() < 10: | |
t = '%.1f sec' % delta.total_seconds() | |
else: | |
t = '%i sec' % delta.total_seconds() | |
cprint(color, '%s %s via %s is %s after %s (%s:%s)' % ( | |
new_packet.stamp.strftime('%H:%M:%S'), | |
new_packet.src, new_if, t, | |
old_if, new_packet.fpath, new_packet.payload)) | |
def match(interfaces, packet): | |
now = datetime.datetime.now() | |
matched = False | |
for interface, packets in interfaces.items(): | |
if interface == packet.iface: | |
continue | |
if packet in packets: | |
# This is silly, but to get the old packet out of the set, | |
# we have to iterate and find it. Set math may return the | |
# "same" packet we're comparing against. | |
old_packet = [x for x in packets if x == packet][0] | |
report(packet, old_packet) | |
packets.remove(packet) | |
matched = True | |
return matched | |
def process_stream(): | |
interfaces = collections.defaultdict(set) | |
last_clean = time.time() | |
while True: | |
line = sys.stdin.readline().strip() | |
packet = Packet(line) | |
if not match(interfaces, packet): | |
interfaces[packet.iface].add(packet) | |
if time.time() - last_clean > 300: | |
clean(interfaces) | |
last_clean = time.time() | |
process_stream() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment