|
# File: main.py |
|
# Purpose: Provide a FastAPI-based webapp that reads dump1090 BEAST messages, |
|
# computes message rate statistics (msg/sec at 30s, 1m, 5m intervals), |
|
# computes signal strength statistics (min, max, average over 30s), |
|
# and serves two live line charts via WebSocket. |
|
# |
|
# In this version, the WebSocket endpoint is guaranteed to be called |
|
# once the frontend attempts to connect. To confirm, we log when a new |
|
# client connects or disconnects, and we send a "ping" from the client. |
|
|
|
import math |
|
import time |
|
import socket |
|
import threading |
|
from collections import deque, defaultdict |
|
from typing import Deque, Tuple, AsyncGenerator |
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect |
|
from fastapi.responses import HTMLResponse |
|
import uvicorn |
|
import asyncio |
|
import pyModeS as pms |
|
from pyModeS.extra.tcpclient import TcpClient |
|
import statistics |
|
from math import radians, sin, cos, asin, sqrt, degrees, atan2 |
|
import json |
|
|
|
# Global deques for timestamps and signal levels in rolling windows. |
|
MESSAGE_TIMESTAMPS_5S: Deque[float] = deque() |
|
MESSAGE_TIMESTAMPS_15S: Deque[float] = deque() |
|
MESSAGE_TIMESTAMPS_30S: Deque[float] = deque() |
|
MESSAGE_TIMESTAMPS_60S: Deque[float] = deque() |
|
MESSAGE_TIMESTAMPS_300S: Deque[float] = deque() |
|
SIGNAL_LEVELS_30S: Deque[Tuple[float, float]] = deque() |
|
DISTANCES_30S: Deque[Tuple[float, float]] = deque() |
|
|
|
REF_LAT = 51.260765417701066 |
|
REF_LON = 6.338479359520795 |
|
|
|
# Global set of connected WebSockets for broadcast. |
|
CONNECTED_WEBSOCKETS = set() |
|
|
|
# Synchronization lock to protect shared data. |
|
DATA_LOCK = threading.Lock() |
|
|
|
BEARING_SEGMENTS = 16 |
|
RING_DISTANCE = 80 |
|
MAX_DISTANCE = 480 |
|
BEARING_LABELS = ["N", "NNE", "NE", "ENE", "E", "ESE", "SE", "SSE", "S", "SSW", "SW", "WSW", "W", "WNW", "NW", "NNW"] |
|
COVERAGE_60S = defaultdict(int) |
|
|
|
|
|
def get_current_time() -> float: |
|
""" |
|
Returns the current time in seconds. |
|
""" |
|
return time.time() |
|
|
|
|
|
def update_message_sliding_windows(now: float) -> None: |
|
""" |
|
Removes entries from the sliding windows older than 5s, 15s, 30s, 60s, and 300s respectively. |
|
""" |
|
while MESSAGE_TIMESTAMPS_5S and MESSAGE_TIMESTAMPS_5S[0] < now - 5: |
|
MESSAGE_TIMESTAMPS_5S.popleft() |
|
while MESSAGE_TIMESTAMPS_15S and MESSAGE_TIMESTAMPS_15S[0] < now - 15: |
|
MESSAGE_TIMESTAMPS_15S.popleft() |
|
while MESSAGE_TIMESTAMPS_30S and MESSAGE_TIMESTAMPS_30S[0] < now - 30: |
|
MESSAGE_TIMESTAMPS_30S.popleft() |
|
while MESSAGE_TIMESTAMPS_60S and MESSAGE_TIMESTAMPS_60S[0] < now - 60: |
|
MESSAGE_TIMESTAMPS_60S.popleft() |
|
while MESSAGE_TIMESTAMPS_300S and MESSAGE_TIMESTAMPS_300S[0] < now - 300: |
|
MESSAGE_TIMESTAMPS_300S.popleft() |
|
|
|
|
|
def update_signal_sliding_window(now: float) -> None: |
|
""" |
|
Removes signal-level entries older than 30 seconds. |
|
""" |
|
while SIGNAL_LEVELS_30S and SIGNAL_LEVELS_30S[0][0] < now - 30: |
|
SIGNAL_LEVELS_30S.popleft() |
|
|
|
|
|
def update_distance_sliding_window(now: float) -> None: |
|
"""Removes distance entries older than 30s.""" |
|
while DISTANCES_30S and DISTANCES_30S[0][0] < now - 30: |
|
DISTANCES_30S.popleft() |
|
|
|
|
|
def update_coverage_sliding_window(now: float) -> None: |
|
"""Removes coverage entries older than 60s.""" |
|
keys_to_remove = [key for key, (ts, _) in COVERAGE_60S.items() if ts < now - 60] |
|
for key in keys_to_remove: |
|
del COVERAGE_60S[key] |
|
|
|
|
|
def compute_message_rates() -> Tuple[float, float, float, float, float]: |
|
""" |
|
Computes average message rates for 5s, 15s, 30s, 60s, and 300s windows. |
|
""" |
|
rate_5s = len(MESSAGE_TIMESTAMPS_5S) / 5.0 if MESSAGE_TIMESTAMPS_5S else 0.0 |
|
rate_15s = len(MESSAGE_TIMESTAMPS_15S) / 15.0 if MESSAGE_TIMESTAMPS_15S else 0.0 |
|
rate_30s = len(MESSAGE_TIMESTAMPS_30S) / 30.0 if MESSAGE_TIMESTAMPS_30S else 0.0 |
|
rate_60s = len(MESSAGE_TIMESTAMPS_60S) / 60.0 if MESSAGE_TIMESTAMPS_60S else 0.0 |
|
rate_300s = len(MESSAGE_TIMESTAMPS_300S) / 300.0 if MESSAGE_TIMESTAMPS_300S else 0.0 |
|
return (rate_5s, rate_15s, rate_30s, rate_60s, rate_300s) |
|
|
|
|
|
def compute_signal_stats() -> Tuple[float, float, float]: |
|
""" |
|
Computes min, max, and average signal level over the last 30s. |
|
""" |
|
if not SIGNAL_LEVELS_30S: |
|
return (0.0, 0.0, 0.0) |
|
values = [v[1] for v in SIGNAL_LEVELS_30S] |
|
return (min(values), max(values), sum(values) / len(values)) |
|
|
|
|
|
def compute_signal_percentiles() -> dict: |
|
""" |
|
Computes selected percentiles of signal level over the last 30s. |
|
""" |
|
if not SIGNAL_LEVELS_30S: |
|
return {"25":0,"50":0,"75":0,"90":0,"95":0,"99":0} |
|
values = sorted([v[1] for v in SIGNAL_LEVELS_30S]) |
|
def pct(p): return statistics.quantiles(values, n=100, method='inclusive')[p-1] |
|
return { |
|
"25": pct(25), |
|
"50": pct(50), |
|
"75": pct(75), |
|
"90": pct(90), |
|
"95": pct(95), |
|
"99": pct(99) |
|
} |
|
|
|
|
|
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: |
|
""" |
|
Returns distance in km between two lat/lon points using the Haversine formula. |
|
""" |
|
r = 6371 |
|
d_lat = radians(lat2 - lat1) |
|
d_lon = radians(lon2 - lon1) |
|
a = sin(d_lat / 2) ** 2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(d_lon / 2) ** 2 |
|
return 2 * r * asin(sqrt(a)) |
|
|
|
|
|
def compute_bearing(lat1: float, lon1: float, lat2: float, lon2: float) -> float: |
|
""" |
|
Returns the bearing in degrees from point (lat1, lon1) to point (lat2, lon2). |
|
""" |
|
d_lon = radians(lon2 - lon1) |
|
y = sin(d_lon) * cos(radians(lat2)) |
|
x = cos(radians(lat1)) * sin(radians(lat2)) - sin(radians(lat1)) * cos(radians(lat2)) * cos(d_lon) |
|
return (degrees(atan2(y, x)) + 360) % 360 |
|
|
|
|
|
def compute_distance_stats() -> dict: |
|
""" |
|
Computes min, max, 25%, 50%, 75% distance over the last 30s. |
|
""" |
|
if not DISTANCES_30S: |
|
return {"min":0,"25":0,"50":0,"75":0,"max":0} |
|
values = sorted([d[1] for d in DISTANCES_30S]) |
|
def pct(p): |
|
return statistics.quantiles(values, n=100, method='inclusive')[p-1] |
|
return { |
|
"min": values[0], |
|
"25": pct(25), |
|
"50": pct(50), |
|
"75": pct(75), |
|
"max": values[-1] |
|
} |
|
|
|
|
|
def compute_distance_buckets() -> list: |
|
""" |
|
Computes the rolling 30s count of position readings in 30km buckets (up to 300km). |
|
""" |
|
buckets = [0] * 10 |
|
for _, dist in DISTANCES_30S: |
|
index = int(dist // 30) |
|
if index >= len(buckets): |
|
index = len(buckets) - 1 |
|
buckets[index] += 1 |
|
return buckets |
|
|
|
|
|
def compute_coverage_stats() -> list: |
|
""" |
|
Computes the maximum distance read and count of readings in each sector over the last 60s. |
|
""" |
|
coverage = [[(0, 0)] * BEARING_SEGMENTS for _ in range(MAX_DISTANCE // RING_DISTANCE)] |
|
for (ring, segment), (ts, dist) in COVERAGE_60S.items(): |
|
max_dist, count = coverage[ring][segment] |
|
coverage[ring][segment] = (max(max_dist, dist), count + 1) |
|
return coverage |
|
|
|
|
|
class ADSBClient(TcpClient): |
|
""" |
|
Custom ADS-B client that extends TcpClient to handle Mode-S messages. |
|
""" |
|
|
|
def __init__(self, host, port, rawtype='beast'): |
|
super(ADSBClient, self).__init__(host, port, rawtype) |
|
|
|
def handle_messages(self, messages): |
|
for msg, dbfs_rssi, ts in messages: |
|
if len(msg) < 2: |
|
continue |
|
with DATA_LOCK: |
|
MESSAGE_TIMESTAMPS_5S.append(ts) |
|
MESSAGE_TIMESTAMPS_15S.append(ts) |
|
MESSAGE_TIMESTAMPS_30S.append(ts) |
|
MESSAGE_TIMESTAMPS_60S.append(ts) |
|
MESSAGE_TIMESTAMPS_300S.append(ts) |
|
|
|
if dbfs_rssi is not None and dbfs_rssi > -100: |
|
SIGNAL_LEVELS_30S.append((ts, dbfs_rssi)) |
|
|
|
df_val = pms.df(msg) |
|
if df_val in (17, 18) and pms.crc(msg) == 0: |
|
tc = pms.typecode(msg) |
|
lat, lon = None, None |
|
if 5 <= tc <= 8: |
|
lat, lon = pms.adsb.surface_position_with_ref(msg, REF_LAT, REF_LON) |
|
elif 9 <= tc <= 18 or 20 <= tc <= 22: |
|
lat, lon = pms.adsb.airborne_position_with_ref(msg, REF_LAT, REF_LON) |
|
if lat is not None and lon is not None: |
|
dist_km = haversine_distance(REF_LAT, REF_LON, lat, lon) |
|
DISTANCES_30S.append((ts, dist_km)) |
|
if dist_km <= MAX_DISTANCE: |
|
ring = int(dist_km // RING_DISTANCE) |
|
bearing = compute_bearing(REF_LAT, REF_LON, lat, lon) |
|
segment = int(bearing // (360 / BEARING_SEGMENTS)) |
|
key = (ring, segment) |
|
if key in COVERAGE_60S: |
|
COVERAGE_60S[key] = (ts, max(COVERAGE_60S[key][1], dist_km)) |
|
else: |
|
COVERAGE_60S[key] = (ts, dist_km) |
|
|
|
update_message_sliding_windows(ts) |
|
update_signal_sliding_window(ts) |
|
update_distance_sliding_window(ts) |
|
update_coverage_sliding_window(ts) |
|
|
|
def run(self, raw_pipe_in=None, stop_flag=None, exception_queue=None): |
|
self.raw_pipe_in = raw_pipe_in |
|
self.exception_queue = exception_queue |
|
self.stop_flag = stop_flag |
|
self.connect() |
|
|
|
while True: |
|
received = [i for i in self.socket.recv(4096)] |
|
self.buffer.extend(received) |
|
messages = self.read_beast_buffer_rssi_piaware() |
|
if not messages: |
|
continue |
|
else: |
|
self.handle_messages(messages) |
|
|
|
def read_beast_buffer_rssi_piaware(self): |
|
"""Handle mode-s beast data type. |
|
|
|
<esc> "1" : 6 byte MLAT timestamp, 1 byte signal level, |
|
2 byte Mode-AC |
|
<esc> "2" : 6 byte MLAT timestamp, 1 byte signal level, |
|
7 byte Mode-S short frame |
|
<esc> "3" : 6 byte MLAT timestamp, 1 byte signal level, |
|
14 byte Mode-S long frame |
|
<esc> "4" : 6 byte MLAT timestamp, status data, DIP switch |
|
configuration settings (not on Mode-S Beast classic) |
|
<esc><esc>: true 0x1a |
|
<esc> is 0x1a, and "1", "2" and "3" are 0x31, 0x32 and 0x33 |
|
|
|
timestamp: |
|
wiki.modesbeast.com/Radarcape:Firmware_Versions#The_GPS_timestamp |
|
""" |
|
messages_mlat = [] |
|
msg = [] |
|
i = 0 |
|
|
|
# process the buffer until the last divider <esc> 0x1a |
|
# then, reset the self.buffer with the remainder |
|
|
|
while i < len(self.buffer): |
|
if self.buffer[i : i + 2] == [0x1A, 0x1A]: |
|
msg.append(0x1A) |
|
i += 1 |
|
elif (i == len(self.buffer) - 1) and (self.buffer[i] == 0x1A): |
|
# special case where the last bit is 0x1a |
|
msg.append(0x1A) |
|
elif self.buffer[i] == 0x1A: |
|
if i == len(self.buffer) - 1: |
|
# special case where the last bit is 0x1a |
|
msg.append(0x1A) |
|
elif len(msg) > 0: |
|
messages_mlat.append(msg) |
|
msg = [] |
|
else: |
|
msg.append(self.buffer[i]) |
|
i += 1 |
|
|
|
# save the reminder for next reading cycle, if not empty |
|
if len(msg) > 0: |
|
reminder = [] |
|
for i, m in enumerate(msg): |
|
if (m == 0x1A) and (i < len(msg) - 1): |
|
# rewind 0x1a, except when it is at the last bit |
|
reminder.extend([m, m]) |
|
else: |
|
reminder.append(m) |
|
self.buffer = [0x1A] + msg |
|
else: |
|
self.buffer = [] |
|
|
|
# extract messages |
|
messages = [] |
|
for mm in messages_mlat: |
|
ts = time.time() |
|
|
|
msgtype = mm[0] |
|
# print(''.join('%02X' % i for i in mm)) |
|
|
|
if msgtype == 0x32: |
|
# Mode-S Short Message, 7 byte, 14-len hexstr |
|
msg = "".join("%02X" % i for i in mm[8:15]) |
|
elif msgtype == 0x33: |
|
# Mode-S Long Message, 14 byte, 28-len hexstr |
|
msg = "".join("%02X" % i for i in mm[8:22]) |
|
else: |
|
# Other message tupe |
|
continue |
|
|
|
if len(msg) not in [14, 28]: |
|
continue |
|
|
|
''' |
|
we get the raw 0-255 byte value (raw_rssi = mm[7]) |
|
we scale it to 0.0 - 1.0 (voltage = raw_rssi / 255) |
|
we convert it to a dBFS power value (rolling the squaring of the voltage into the dB calculation) |
|
''' |
|
try: |
|
df = pms.df(msg) |
|
raw_rssi = mm[7] # eighth byte of Mode-S message should contain RSSI value |
|
if raw_rssi == 0: |
|
dbfs_rssi = -100 |
|
else: |
|
rssi_ratio = raw_rssi / 255 |
|
signalLevel = rssi_ratio ** 2 |
|
dbfs_rssi = 10 * math.log10(signalLevel) |
|
except Exception as e: |
|
print(f"Error: {e}") |
|
continue |
|
|
|
# skip incomplete message |
|
if df in [0, 4, 5, 11] and len(msg) != 14: |
|
continue |
|
if df in [16, 17, 18, 19, 20, 21, 24] and len(msg) != 28: |
|
continue |
|
|
|
messages.append([msg, dbfs_rssi, ts]) |
|
return messages |
|
|
|
|
|
async def broadcast_stats_task() -> None: |
|
""" |
|
Async background task that periodically computes and broadcasts stats |
|
without relying on get_event_loop() in a synchronous thread. |
|
""" |
|
while True: |
|
await asyncio.sleep(1.0) |
|
with DATA_LOCK: |
|
rate_5s, rate_15s, rate_30s, rate_60s, rate_300s = compute_message_rates() |
|
sig_min, sig_max, sig_avg = compute_signal_stats() |
|
sig_p = compute_signal_percentiles() |
|
dist_stats = compute_distance_stats() |
|
dist_hist = compute_distance_buckets() |
|
coverage_stats = compute_coverage_stats() |
|
|
|
payload = { |
|
"msg5s": rate_5s, |
|
"msg15s": rate_15s, |
|
"msg30s": rate_30s, |
|
"msg60s": rate_60s, |
|
"msg300s": rate_300s, |
|
"sigMin": sig_min, |
|
"sigMax": sig_max, |
|
"sigAvg": sig_avg, |
|
"sig25": sig_p["25"], |
|
"sig50": sig_p["50"], |
|
"sig75": sig_p["75"], |
|
"sig90": sig_p["90"], |
|
"sig95": sig_p["95"], |
|
"sig99": sig_p["99"], |
|
"distMin": dist_stats["min"], |
|
"dist25": dist_stats["25"], |
|
"dist50": dist_stats["50"], |
|
"dist75": dist_stats["75"], |
|
"distMax": dist_stats["max"], |
|
"distHistogram": dist_hist, |
|
"coverageStats": coverage_stats |
|
} |
|
|
|
# Broadcast to all connected clients |
|
for ws_conn in list(CONNECTED_WEBSOCKETS): |
|
try: |
|
await ws_conn.send_text(json.dumps(payload)) |
|
except Exception: |
|
pass |
|
|
|
|
|
async def app_lifespan(app_ref: FastAPI) -> AsyncGenerator: |
|
""" |
|
Lifespan context to replace deprecated @app.on_event usage. |
|
Starts the BEAST listener and stats broadcaster at startup. |
|
""" |
|
# Start ADSB client instead of beast_listener |
|
client = ADSBClient(host='localhost', port=30005, rawtype='beast') |
|
threading.Thread(target=client.run, daemon=True).start() |
|
|
|
asyncio.create_task(broadcast_stats_task()) |
|
|
|
yield # Wait for shutdown if needed. |
|
|
|
app = FastAPI(lifespan=app_lifespan) |
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
def get_index() -> str: |
|
""" |
|
Serves the external index.html file. |
|
""" |
|
with open("msgrates.html", "r", encoding="utf-8") as file: |
|
return file.read() |
|
|
|
|
|
@app.websocket("/ws") |
|
async def websocket_endpoint(websocket: WebSocket) -> None: |
|
""" |
|
WebSocket endpoint that registers the connection and handles incoming messages. |
|
Periodic updates are broadcast from broadcast_stats_loop(). |
|
""" |
|
print("New WebSocket connection") |
|
await websocket.accept() |
|
CONNECTED_WEBSOCKETS.add(websocket) |
|
try: |
|
while True: |
|
# Wait for a message to confirm connection. |
|
msg = await websocket.receive_text() |
|
print("Received from client:", msg) |
|
except WebSocketDisconnect: |
|
CONNECTED_WEBSOCKETS.remove(websocket) |
|
print("WebSocket disconnected") |
|
|
|
if __name__ == "__main__": |
|
uvicorn.run(app, host="0.0.0.0", port=8000) |