Created
December 4, 2025 09:25
-
-
Save RISCfuture/4096933ae425eef5fab98340bd0fcece to your computer and use it in GitHub Desktop.
Finds the closest airports to your home airport that you have NOT yet visited, according to your LogTen Pro for Mac logbook.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Find the closest airports you haven't flown to. | |
| This tool analyzes your LogTen Pro logbook to find airports you haven't visited, | |
| then uses FAA NASR data to calculate distances from your home airport and shows | |
| the closest unvisited airports. | |
| """ | |
| import argparse | |
| import csv | |
| import io | |
| import json | |
| import logging | |
| import math | |
| import os | |
| import sqlite3 | |
| import tempfile | |
| import zipfile | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, date | |
| from pathlib import Path | |
| from typing import Optional | |
| from urllib.parse import urljoin | |
| import requests | |
| from geopy.distance import geodesic | |
| # Constants | |
| NASR_BASE_URL = "https://www.faa.gov/air_traffic/flight_info/aeronav/aero_data/NASR_Subscription/" | |
| CACHE_VERSION = 1 | |
| MAX_DISTANCE_NM = 500.0 | |
| MIN_DISTANCE_METERS = 200.0 # Skip airports closer than this (e.g., home airport) | |
| # Configure logging | |
| logger = logging.getLogger("closest_unvisited_airport") | |
| # ============================================================================= | |
| # Data Classes | |
| # ============================================================================= | |
| @dataclass | |
| class AirportInfo: | |
| """Information about an airport from NASR data.""" | |
| identifier: str # FAA LID (e.g., "OAK") | |
| icao_identifier: Optional[str] # ICAO code (e.g., "KOAK") | |
| name: str | |
| latitude: float | |
| longitude: float | |
| facility_type: str # AIRPORT, HELIPORT, SEAPLANE BASE, etc. | |
| ownership: str # Public, Private, Air Force, Navy, etc. | |
| public_use: bool | |
| @property | |
| def is_military(self) -> bool: | |
| """Check if this is a military airport.""" | |
| military_codes = {"MA", "MN", "MR"} # Air Force, Navy, Army | |
| return self.ownership in military_codes | |
| @property | |
| def is_heliport(self) -> bool: | |
| return self.facility_type == "HELIPORT" | |
| @property | |
| def is_seaport(self) -> bool: | |
| return self.facility_type == "SEAPLANE BASE" | |
| @property | |
| def is_balloonport(self) -> bool: | |
| return self.facility_type == "BALLOONPORT" | |
| @property | |
| def is_gliderport(self) -> bool: | |
| return self.facility_type == "GLIDERPORT" | |
| @property | |
| def is_ultralight(self) -> bool: | |
| return self.facility_type == "ULTRALIGHT" | |
| @dataclass | |
| class AirportWithDistance: | |
| """An airport with its distance from a reference point.""" | |
| identifier: str | |
| name: str | |
| latitude: float | |
| longitude: float | |
| distance_nm: float | |
| facility_type: str | |
| public_use: bool | |
| def __lt__(self, other: "AirportWithDistance") -> bool: | |
| return self.distance_nm < other.distance_nm | |
| @dataclass | |
| class VisitedAirports: | |
| """Set of visited airport identifiers.""" | |
| faa_identifiers: set[str] = field(default_factory=set) | |
| icao_identifiers: set[str] = field(default_factory=set) | |
| iata_identifiers: set[str] = field(default_factory=set) | |
| def add_airport( | |
| self, | |
| faa: Optional[str] = None, | |
| icao: Optional[str] = None, | |
| iata: Optional[str] = None, | |
| ) -> None: | |
| """Add an airport by its identifiers.""" | |
| if faa and faa.strip(): | |
| self.faa_identifiers.add(faa.strip().upper()) | |
| if icao and icao.strip(): | |
| self.icao_identifiers.add(icao.strip().upper()) | |
| if iata and iata.strip(): | |
| self.iata_identifiers.add(iata.strip().upper()) | |
| def has_visited( | |
| self, faa: Optional[str] = None, icao: Optional[str] = None | |
| ) -> bool: | |
| """Check if an airport has been visited.""" | |
| if faa: | |
| faa_upper = faa.upper() | |
| if faa_upper in self.faa_identifiers: | |
| return True | |
| # Check if FAA identifier matches an IATA code (common situation) | |
| if faa_upper in self.iata_identifiers: | |
| return True | |
| if icao: | |
| icao_upper = icao.upper() | |
| if icao_upper in self.icao_identifiers: | |
| return True | |
| return False | |
| @property | |
| def count(self) -> int: | |
| """Total count of unique airports.""" | |
| return len( | |
| self.faa_identifiers | self.icao_identifiers | self.iata_identifiers | |
| ) | |
| @dataclass | |
| class AirportFilters: | |
| """Filters for excluding airport types.""" | |
| exclude_private: bool = True | |
| exclude_military: bool = True | |
| exclude_heliports: bool = True | |
| exclude_seaports: bool = True | |
| exclude_balloonports: bool = True | |
| exclude_gliderports: bool = True | |
| exclude_ultralights: bool = True | |
| def passes(self, airport: AirportInfo) -> bool: | |
| """Check if an airport passes all filters.""" | |
| if self.exclude_private and not airport.public_use: | |
| return False | |
| if self.exclude_military and airport.is_military: | |
| return False | |
| if self.exclude_heliports and airport.is_heliport: | |
| return False | |
| if self.exclude_seaports and airport.is_seaport: | |
| return False | |
| if self.exclude_balloonports and airport.is_balloonport: | |
| return False | |
| if self.exclude_gliderports and airport.is_gliderport: | |
| return False | |
| if self.exclude_ultralights and airport.is_ultralight: | |
| return False | |
| return True | |
| @dataclass | |
| class AirportCache: | |
| """Cached airport data.""" | |
| version: int | |
| cycle_date: str # YYYY-MM-DD format | |
| generated_at: str | |
| airports: list[dict] | |
| # ============================================================================= | |
| # LogTen Pro Database Reader | |
| # ============================================================================= | |
| class LogTenReader: | |
| """Reads visited airports from LogTen Pro SQLite database.""" | |
| # Default LogTen Pro database path on macOS | |
| DEFAULT_LOGBOOK_PATH = Path.home() / ( | |
| "Library/Group Containers/group.com.coradine.LogTenPro/" | |
| "LogTenProData_6583aa561ec1cc91302449b5/LogTenCoreDataStore.sql" | |
| ) | |
| # Simulator category titles | |
| SIMULATOR_CATEGORIES = {"SIMULATOR", "TRAINING DEVICE", "PCATD"} | |
| def __init__(self, logbook_path: Path): | |
| self.logbook_path = logbook_path | |
| if not logbook_path.exists(): | |
| raise FileNotFoundError( | |
| f"LogTen database not found at: {logbook_path}\n" | |
| "Ensure LogTen Pro is installed and has been opened at least once." | |
| ) | |
| def fetch_visited_airports(self, exclude_simulators: bool = True) -> VisitedAirports: | |
| """Fetch all visited airports from the LogTen database.""" | |
| logger.info("Fetching visited airports from LogTen Pro") | |
| visited = VisitedAirports() | |
| conn = sqlite3.connect(f"file:{self.logbook_path}?mode=ro", uri=True) | |
| conn.row_factory = sqlite3.Row | |
| try: | |
| # First, get all places indexed by their primary key | |
| places = self._fetch_places(conn) | |
| logger.debug(f"Loaded {len(places)} places from database") | |
| # Get aircraft categories for simulator detection | |
| aircraft_categories = {} | |
| if exclude_simulators: | |
| aircraft_categories = self._fetch_aircraft_categories(conn) | |
| logger.debug(f"Loaded {len(aircraft_categories)} aircraft records") | |
| # Get all flights | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT Z_PK, ZFLIGHT_FROMPLACE, ZFLIGHT_TOPLACE, | |
| ZFLIGHT_SIMULATOR, ZFLIGHT_AIRCRAFT | |
| FROM ZFLIGHT | |
| """) | |
| flight_count = 0 | |
| skipped_simulator = 0 | |
| for row in cursor: | |
| flight_count += 1 | |
| # Check if simulator flight | |
| if exclude_simulators and self._is_simulator_flight( | |
| row, aircraft_categories | |
| ): | |
| skipped_simulator += 1 | |
| logger.debug(f"Skipping simulator flight {row['Z_PK']}") | |
| continue | |
| # Add departure airport | |
| if row["ZFLIGHT_FROMPLACE"]: | |
| place = places.get(row["ZFLIGHT_FROMPLACE"]) | |
| if place: | |
| visited.add_airport( | |
| faa=place.get("identifier"), | |
| icao=place.get("icao"), | |
| iata=place.get("iata"), | |
| ) | |
| # Add arrival airport | |
| if row["ZFLIGHT_TOPLACE"]: | |
| place = places.get(row["ZFLIGHT_TOPLACE"]) | |
| if place: | |
| visited.add_airport( | |
| faa=place.get("identifier"), | |
| icao=place.get("icao"), | |
| iata=place.get("iata"), | |
| ) | |
| # Add waypoint places | |
| waypoint_count = self._add_waypoint_places( | |
| conn, places, visited, exclude_simulators, aircraft_categories | |
| ) | |
| logger.debug( | |
| f"Processed {flight_count} flights, skipped {skipped_simulator} simulators" | |
| ) | |
| logger.debug(f"Added {waypoint_count} waypoint airports") | |
| logger.info(f"Found {visited.count} unique visited airports") | |
| finally: | |
| conn.close() | |
| return visited | |
| def _fetch_places(self, conn: sqlite3.Connection) -> dict[int, dict]: | |
| """Fetch all places indexed by primary key.""" | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT Z_PK, ZPLACE_IDENTIFIER, ZPLACE_ICAOID, ZPLACE_IATAID | |
| FROM ZPLACE | |
| """) | |
| places = {} | |
| for row in cursor: | |
| places[row["Z_PK"]] = { | |
| "identifier": row["ZPLACE_IDENTIFIER"], | |
| "icao": row["ZPLACE_ICAOID"], | |
| "iata": row["ZPLACE_IATAID"], | |
| } | |
| return places | |
| def _fetch_aircraft_categories(self, conn: sqlite3.Connection) -> dict[int, str]: | |
| """Fetch aircraft categories indexed by aircraft primary key.""" | |
| cursor = conn.cursor() | |
| # Join aircraft -> aircraft type -> customization property to get category title | |
| cursor.execute(""" | |
| SELECT a.Z_PK, UPPER(cp.ZLOGTENCUSTOMIZATIONPROPERTY_TITLE) as category | |
| FROM ZAIRCRAFT a | |
| JOIN ZAIRCRAFTTYPE at ON a.ZAIRCRAFT_AIRCRAFTTYPE = at.Z_PK | |
| JOIN ZLOGTENCUSTOMIZATIONPROPERTY cp ON at.ZAIRCRAFTTYPE_CATEGORY = cp.Z_PK | |
| """) | |
| categories = {} | |
| for row in cursor: | |
| categories[row["Z_PK"]] = row["category"] | |
| return categories | |
| def _is_simulator_flight( | |
| self, flight_row: sqlite3.Row, aircraft_categories: dict[int, str] | |
| ) -> bool: | |
| """Check if a flight is a simulator flight.""" | |
| # Check simulator flag | |
| if flight_row["ZFLIGHT_SIMULATOR"] == 1: | |
| return True | |
| # Check aircraft category | |
| aircraft_pk = flight_row["ZFLIGHT_AIRCRAFT"] | |
| if aircraft_pk and aircraft_pk in aircraft_categories: | |
| category = aircraft_categories[aircraft_pk] | |
| if category in self.SIMULATOR_CATEGORIES: | |
| return True | |
| return False | |
| def _add_waypoint_places( | |
| self, | |
| conn: sqlite3.Connection, | |
| places: dict[int, dict], | |
| visited: VisitedAirports, | |
| exclude_simulators: bool, | |
| aircraft_categories: dict[int, str], | |
| ) -> int: | |
| """Add waypoint places from flights.""" | |
| cursor = conn.cursor() | |
| # Try to find the waypoint relationship table | |
| # Core Data creates these with names like Z_<N>FLIGHT_WAYPOINTPLACES | |
| cursor.execute(""" | |
| SELECT name FROM sqlite_master | |
| WHERE type='table' AND name LIKE '%WAYPOINTPLACES%' | |
| """) | |
| waypoint_tables = [row[0] for row in cursor.fetchall()] | |
| if not waypoint_tables: | |
| logger.debug("No waypoint places table found") | |
| return 0 | |
| waypoint_count = 0 | |
| for table_name in waypoint_tables: | |
| logger.debug(f"Processing waypoint table: {table_name}") | |
| try: | |
| # Get column names to find the flight and place columns | |
| cursor.execute(f"PRAGMA table_info({table_name})") | |
| columns = [row[1] for row in cursor.fetchall()] | |
| # Find the flight and place columns | |
| flight_col = None | |
| place_col = None | |
| for col in columns: | |
| if "FLIGHT" in col.upper() and flight_col is None: | |
| flight_col = col | |
| elif "PLACE" in col.upper() or "WAYPOINTPLACES" in col.upper(): | |
| place_col = col | |
| if not flight_col or not place_col: | |
| logger.debug(f"Could not identify columns in {table_name}: {columns}") | |
| continue | |
| # Get flights we need to filter (if excluding simulators) | |
| simulator_flights = set() | |
| if exclude_simulators: | |
| cursor.execute(""" | |
| SELECT Z_PK FROM ZFLIGHT | |
| WHERE ZFLIGHT_SIMULATOR = 1 | |
| """) | |
| simulator_flights = {row[0] for row in cursor.fetchall()} | |
| # Also add flights with simulator aircraft | |
| cursor.execute(""" | |
| SELECT Z_PK, ZFLIGHT_AIRCRAFT FROM ZFLIGHT | |
| WHERE ZFLIGHT_AIRCRAFT IS NOT NULL | |
| """) | |
| for row in cursor.fetchall(): | |
| if row[1] in aircraft_categories: | |
| if aircraft_categories[row[1]] in self.SIMULATOR_CATEGORIES: | |
| simulator_flights.add(row[0]) | |
| cursor.execute(f"SELECT {flight_col}, {place_col} FROM {table_name}") | |
| for row in cursor.fetchall(): | |
| flight_pk, place_pk = row[0], row[1] | |
| # Skip simulator flights | |
| if exclude_simulators and flight_pk in simulator_flights: | |
| continue | |
| place = places.get(place_pk) | |
| if place: | |
| visited.add_airport( | |
| faa=place.get("identifier"), | |
| icao=place.get("icao"), | |
| iata=place.get("iata"), | |
| ) | |
| waypoint_count += 1 | |
| except sqlite3.Error as e: | |
| logger.warning(f"Error processing {table_name}: {e}") | |
| return waypoint_count | |
| # ============================================================================= | |
| # FAA NASR Data Loader | |
| # ============================================================================= | |
| class NASRLoader: | |
| """Loads and caches FAA NASR airport data.""" | |
| def __init__(self): | |
| self.cache_dir = Path(tempfile.gettempdir()) / "NASRCache" | |
| self.cache_file = self.cache_dir / "current_nasr.zip" | |
| self.processed_cache_file = self.cache_dir / "airports_processed.json" | |
| self.airports: list[AirportInfo] = [] | |
| def load_airports(self) -> list[AirportInfo]: | |
| """Load airports from cache or download fresh data.""" | |
| # Create cache directory | |
| self.cache_dir.mkdir(parents=True, exist_ok=True) | |
| # Try to load from processed cache first | |
| cached = self._load_processed_cache() | |
| if cached: | |
| logger.info("Using cached airport data") | |
| self.airports = cached | |
| return cached | |
| # Download and process NASR data | |
| logger.info("Downloading NASR data from FAA") | |
| nasr_zip = self._download_nasr() | |
| logger.info("Parsing airport data") | |
| self.airports = self._parse_airports(nasr_zip) | |
| logger.info(f"Loaded {len(self.airports)} airports") | |
| # Save to cache | |
| self._save_processed_cache() | |
| return self.airports | |
| def find_airport(self, identifier: str) -> Optional[AirportInfo]: | |
| """Find an airport by FAA LID or ICAO identifier.""" | |
| identifier_upper = identifier.upper() | |
| for airport in self.airports: | |
| if airport.identifier == identifier_upper: | |
| return airport | |
| if airport.icao_identifier == identifier_upper: | |
| return airport | |
| return None | |
| def _get_current_cycle_url(self) -> str: | |
| """Get the URL for the current NASR cycle.""" | |
| # The FAA publishes NASR data on 28-day cycles | |
| # We need to find the current cycle's URL | |
| today = date.today() | |
| # Try to find the current cycle by checking recent dates | |
| # NASR cycles start on certain Thursdays | |
| for days_back in range(0, 56): # Check up to 2 cycles back | |
| check_date = today - timedelta(days=days_back) | |
| cycle_url = f"{NASR_BASE_URL}{check_date.strftime('%Y-%m-%d')}/" | |
| try: | |
| response = requests.head(cycle_url, timeout=10) | |
| if response.status_code == 200: | |
| return cycle_url | |
| except requests.RequestException: | |
| continue | |
| # Fall back to trying to get the subscription page and parse it | |
| raise RuntimeError("Could not determine current NASR cycle URL") | |
| def _download_nasr(self) -> bytes: | |
| """Download NASR data, using cache if available and fresh.""" | |
| # Check if we have a cached ZIP from today | |
| if self.cache_file.exists(): | |
| mod_time = datetime.fromtimestamp(self.cache_file.stat().st_mtime) | |
| if mod_time.date() == date.today(): | |
| logger.info("Using cached NASR ZIP from earlier today") | |
| return self.cache_file.read_bytes() | |
| # Find and download from current cycle | |
| cycle_url = self._get_current_cycle_url() | |
| cycle_date = cycle_url.rstrip("/").split("/")[-1] | |
| # The subscription ZIP is hosted on nfdc.faa.gov, not the main FAA site | |
| zip_urls = [ | |
| f"https://nfdc.faa.gov/webContent/28DaySub/28DaySubscription_Effective_{cycle_date}.zip", | |
| f"https://nfdc.faa.gov/webContent/28DaySub/{cycle_date}/28DaySubscription.zip", | |
| ] | |
| for zip_url in zip_urls: | |
| try: | |
| logger.debug(f"Trying to download from: {zip_url}") | |
| response = requests.get(zip_url, timeout=300) | |
| if response.status_code == 200: | |
| # Cache the ZIP | |
| self.cache_file.write_bytes(response.content) | |
| return response.content | |
| except requests.RequestException as e: | |
| logger.debug(f"Failed to download from {zip_url}: {e}") | |
| continue | |
| raise RuntimeError(f"Could not download NASR data for cycle {cycle_date}") | |
| def _parse_airports(self, zip_data: bytes) -> list[AirportInfo]: | |
| """Parse airports from the NASR ZIP file (handles nested CSV ZIP).""" | |
| airports = [] | |
| with zipfile.ZipFile(io.BytesIO(zip_data)) as outer_zf: | |
| # Look for the nested CSV ZIP file in CSV_Data/ folder | |
| csv_zip_files = [ | |
| n for n in outer_zf.namelist() | |
| if "CSV" in n.upper() and n.upper().endswith(".ZIP") | |
| ] | |
| if csv_zip_files: | |
| # Extract and parse the nested CSV ZIP | |
| csv_zip_name = csv_zip_files[0] | |
| logger.debug(f"Found nested CSV ZIP: {csv_zip_name}") | |
| with outer_zf.open(csv_zip_name) as csv_zip_file: | |
| csv_zip_data = csv_zip_file.read() | |
| with zipfile.ZipFile(io.BytesIO(csv_zip_data)) as csv_zf: | |
| airports = self._parse_csv_zip(csv_zf) | |
| else: | |
| # Maybe the CSV files are directly in the archive | |
| airports = self._parse_csv_zip(outer_zf) | |
| return airports | |
| def _parse_csv_zip(self, zf: zipfile.ZipFile) -> list[AirportInfo]: | |
| """Parse airports from a ZIP file containing CSV files.""" | |
| airports = [] | |
| # Find the base airport file | |
| apt_files = [n for n in zf.namelist() if "APT_BASE" in n.upper() and n.upper().endswith(".CSV")] | |
| if not apt_files: | |
| # Try alternative naming patterns | |
| apt_files = [n for n in zf.namelist() if n.upper().endswith(".CSV") and "APT" in n.upper()] | |
| # Filter out runway and other sub-files | |
| apt_files = [n for n in apt_files if "RWY" not in n.upper() and "END" not in n.upper()] | |
| if not apt_files: | |
| raise RuntimeError(f"No APT CSV file found in archive. Files: {zf.namelist()}") | |
| apt_file = apt_files[0] | |
| logger.debug(f"Reading {apt_file}") | |
| with zf.open(apt_file) as f: | |
| # Read as text | |
| content = io.TextIOWrapper(f, encoding="utf-8") | |
| reader = csv.DictReader(content) | |
| for row in reader: | |
| airport = self._parse_airport_row(row) | |
| if airport: | |
| airports.append(airport) | |
| return airports | |
| def _parse_airport_row(self, row: dict) -> Optional[AirportInfo]: | |
| """Parse a single airport row from CSV.""" | |
| try: | |
| # Get coordinates | |
| lat = self._get_float(row, ["LAT_DECIMAL"]) | |
| lon = self._get_float(row, ["LONG_DECIMAL"]) | |
| if lat is None or lon is None: | |
| return None | |
| # Get identifiers | |
| identifier = self._get_str(row, ["ARPT_ID"]) | |
| if not identifier: | |
| return None | |
| icao = self._get_str(row, ["ICAO_ID"]) | |
| name = self._get_str(row, ["ARPT_NAME"]) or identifier | |
| # Get site type code and convert to facility type | |
| # A=Airport, H=Heliport, B=Balloonport, G=Gliderport, U=Ultralight, S=Seaplane base | |
| site_type_code = self._get_str(row, ["SITE_TYPE_CODE"]) or "A" | |
| facility_type_map = { | |
| "A": "AIRPORT", | |
| "H": "HELIPORT", | |
| "B": "BALLOONPORT", | |
| "G": "GLIDERPORT", | |
| "U": "ULTRALIGHT", | |
| "S": "SEAPLANE BASE", | |
| } | |
| facility_type = facility_type_map.get(site_type_code.upper(), "AIRPORT") | |
| # Get ownership type code | |
| # PU=Public, PR=Private, MA=Air Force, MN=Navy, MR=Army | |
| ownership = self._get_str(row, ["OWNERSHIP_TYPE_CODE"]) or "PU" | |
| # Determine public use from FACILITY_USE_CODE | |
| # PU = Public Use, PR = Private | |
| facility_use = self._get_str(row, ["FACILITY_USE_CODE"]) or "PU" | |
| public_use = facility_use.upper() == "PU" | |
| return AirportInfo( | |
| identifier=identifier.strip().upper(), | |
| icao_identifier=icao.strip().upper() if icao else None, | |
| name=name.strip(), | |
| latitude=lat, | |
| longitude=lon, | |
| facility_type=facility_type, | |
| ownership=ownership.strip().upper(), | |
| public_use=public_use, | |
| ) | |
| except (ValueError, KeyError) as e: | |
| logger.debug(f"Failed to parse airport row: {e}") | |
| return None | |
| def _get_str(self, row: dict, keys: list[str]) -> Optional[str]: | |
| """Get a string value from row, trying multiple possible keys.""" | |
| for key in keys: | |
| if key in row and row[key]: | |
| return str(row[key]).strip() | |
| return None | |
| def _get_float(self, row: dict, keys: list[str]) -> Optional[float]: | |
| """Get a float value from row, trying multiple possible keys.""" | |
| for key in keys: | |
| if key in row and row[key]: | |
| try: | |
| return float(row[key]) | |
| except ValueError: | |
| continue | |
| return None | |
| def _load_processed_cache(self) -> Optional[list[AirportInfo]]: | |
| """Load airports from processed JSON cache if valid.""" | |
| if not self.processed_cache_file.exists(): | |
| return None | |
| try: | |
| with open(self.processed_cache_file) as f: | |
| cache = json.load(f) | |
| # Check version | |
| if cache.get("version") != CACHE_VERSION: | |
| logger.debug("Cache version mismatch, regenerating") | |
| return None | |
| # Check if cycle is still current (within 28 days) | |
| cycle_date = datetime.strptime(cache["cycle_date"], "%Y-%m-%d").date() | |
| if (date.today() - cycle_date).days > 28: | |
| logger.debug("Cache cycle outdated, regenerating") | |
| return None | |
| # Parse airports | |
| airports = [] | |
| for apt in cache["airports"]: | |
| airports.append( | |
| AirportInfo( | |
| identifier=apt["identifier"], | |
| icao_identifier=apt.get("icao_identifier"), | |
| name=apt["name"], | |
| latitude=apt["latitude"], | |
| longitude=apt["longitude"], | |
| facility_type=apt["facility_type"], | |
| ownership=apt["ownership"], | |
| public_use=apt["public_use"], | |
| ) | |
| ) | |
| return airports | |
| except (json.JSONDecodeError, KeyError, ValueError) as e: | |
| logger.warning(f"Failed to load cache: {e}") | |
| return None | |
| def _save_processed_cache(self) -> None: | |
| """Save processed airports to JSON cache.""" | |
| cache = { | |
| "version": CACHE_VERSION, | |
| "cycle_date": date.today().isoformat(), | |
| "generated_at": datetime.now().isoformat(), | |
| "airports": [ | |
| { | |
| "identifier": a.identifier, | |
| "icao_identifier": a.icao_identifier, | |
| "name": a.name, | |
| "latitude": a.latitude, | |
| "longitude": a.longitude, | |
| "facility_type": a.facility_type, | |
| "ownership": a.ownership, | |
| "public_use": a.public_use, | |
| } | |
| for a in self.airports | |
| ], | |
| } | |
| with open(self.processed_cache_file, "w") as f: | |
| json.dump(cache, f) | |
| logger.debug(f"Saved processed cache to {self.processed_cache_file}") | |
| # ============================================================================= | |
| # Distance Calculator | |
| # ============================================================================= | |
| class DistanceCalculator: | |
| """Calculates distances from a home airport to other airports.""" | |
| def __init__( | |
| self, airports: list[AirportInfo], visited_airports: VisitedAirports | |
| ): | |
| self.airports = airports | |
| self.visited_airports = visited_airports | |
| def find_closest_unvisited( | |
| self, | |
| home_lat: float, | |
| home_lon: float, | |
| count: int = 10, | |
| filters: Optional[AirportFilters] = None, | |
| ) -> list[AirportWithDistance]: | |
| """Find the N closest unvisited airports.""" | |
| if filters is None: | |
| filters = AirportFilters() | |
| logger.debug(f"Finding {count} closest unvisited airports") | |
| # Calculate bounding box for optimization | |
| bbox = self._calculate_bounding_box(home_lat, home_lon, MAX_DISTANCE_NM) | |
| logger.debug(f"Using bounding box: {bbox}") | |
| results = [] | |
| for airport in self.airports: | |
| # Quick bounding box check | |
| if not self._is_within_bbox(airport, bbox): | |
| continue | |
| # Check if visited | |
| if self.visited_airports.has_visited( | |
| faa=airport.identifier, icao=airport.icao_identifier | |
| ): | |
| logger.debug(f"Skipping visited: {airport.identifier}") | |
| continue | |
| # Apply filters | |
| if not filters.passes(airport): | |
| logger.debug(f"Filtered out: {airport.identifier}") | |
| continue | |
| # Calculate distance | |
| distance_nm = self._calculate_distance_nm( | |
| home_lat, home_lon, airport.latitude, airport.longitude | |
| ) | |
| # Skip home airport or very close airports | |
| distance_m = distance_nm * 1852 # Convert to meters | |
| if distance_m < MIN_DISTANCE_METERS: | |
| logger.debug(f"Skipping too close: {airport.identifier}") | |
| continue | |
| results.append( | |
| AirportWithDistance( | |
| identifier=airport.identifier, | |
| name=airport.name, | |
| latitude=airport.latitude, | |
| longitude=airport.longitude, | |
| distance_nm=distance_nm, | |
| facility_type=airport.facility_type, | |
| public_use=airport.public_use, | |
| ) | |
| ) | |
| # Sort by distance and return top N | |
| results.sort() | |
| logger.debug(f"Found {len(results)} airports within filters") | |
| return results[:count] | |
| def _calculate_distance_nm( | |
| self, lat1: float, lon1: float, lat2: float, lon2: float | |
| ) -> float: | |
| """Calculate distance in nautical miles using geopy.""" | |
| return geodesic((lat1, lon1), (lat2, lon2)).nautical | |
| def _calculate_bounding_box( | |
| self, lat: float, lon: float, max_distance_nm: float | |
| ) -> dict: | |
| """Calculate a bounding box around a point.""" | |
| # At the equator, 1 degree latitude = 60nm | |
| # Longitude varies by latitude | |
| lat_degree_nm = 60.0 | |
| lon_degree_nm = 60.0 * math.cos(math.radians(lat)) | |
| lat_range = (max_distance_nm / lat_degree_nm) * 1.2 # 20% padding | |
| lon_range = (max_distance_nm / max(lon_degree_nm, 10.0)) * 1.2 | |
| return { | |
| "min_lat": lat - lat_range, | |
| "max_lat": lat + lat_range, | |
| "west_bound": lon - lon_range, | |
| "east_bound": lon + lon_range, | |
| } | |
| def _is_within_bbox(self, airport: AirportInfo, bbox: dict) -> bool: | |
| """Check if an airport is within the bounding box.""" | |
| lat = airport.latitude | |
| # Latitude check | |
| if lat < bbox["min_lat"] or lat > bbox["max_lat"]: | |
| return False | |
| # Longitude check (handle date line wrap-around) | |
| lon = airport.longitude | |
| west = bbox["west_bound"] | |
| east = bbox["east_bound"] | |
| if west >= -180 and east <= 180: | |
| # Normal case | |
| return west <= lon <= east | |
| elif east > 180: | |
| # Eastern boundary crosses date line | |
| wrapped_east = east - 360 | |
| return lon >= west or lon <= wrapped_east | |
| elif west < -180: | |
| # Western boundary crosses date line | |
| wrapped_west = west + 360 | |
| return lon <= east or lon >= wrapped_west | |
| return True | |
| # ============================================================================= | |
| # CLI and Main | |
| # ============================================================================= | |
| def parse_args() -> argparse.Namespace: | |
| """Parse command-line arguments.""" | |
| parser = argparse.ArgumentParser( | |
| description="Find the closest airports you haven't flown to", | |
| epilog=( | |
| "This tool analyzes your LogTen Pro logbook to find airports you haven't " | |
| "visited, then uses FAA NASR data to calculate distances from your home " | |
| "airport and shows the closest unvisited airports." | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--home-airport", | |
| "-a", | |
| required=True, | |
| help="Your home airport ICAO/FAA code (e.g., KOAK or OAK)", | |
| ) | |
| parser.add_argument( | |
| "--logbook", | |
| type=Path, | |
| default=LogTenReader.DEFAULT_LOGBOOK_PATH, | |
| help="Path to LogTen Pro database file", | |
| ) | |
| parser.add_argument( | |
| "--count", | |
| "-c", | |
| type=int, | |
| default=10, | |
| help="Number of closest airports to show (default: 10)", | |
| ) | |
| # Filter flags with --include/--no-include pattern | |
| parser.add_argument( | |
| "--include-private", | |
| action="store_true", | |
| default=False, | |
| help="Include private airports (excluded by default)", | |
| ) | |
| parser.add_argument( | |
| "--no-include-private", | |
| action="store_false", | |
| dest="include_private", | |
| help="Exclude private airports", | |
| ) | |
| parser.add_argument( | |
| "--include-military", | |
| action="store_true", | |
| default=False, | |
| help="Include military airports (excluded by default)", | |
| ) | |
| parser.add_argument( | |
| "--no-include-military", | |
| action="store_false", | |
| dest="include_military", | |
| help="Exclude military airports", | |
| ) | |
| parser.add_argument( | |
| "--include-heliports", | |
| action="store_true", | |
| default=False, | |
| help="Include heliports (excluded by default)", | |
| ) | |
| parser.add_argument( | |
| "--no-include-heliports", | |
| action="store_false", | |
| dest="include_heliports", | |
| help="Exclude heliports", | |
| ) | |
| parser.add_argument( | |
| "--include-seaports", | |
| action="store_true", | |
| default=False, | |
| help="Include seaplane bases (excluded by default)", | |
| ) | |
| parser.add_argument( | |
| "--no-include-seaports", | |
| action="store_false", | |
| dest="include_seaports", | |
| help="Exclude seaplane bases", | |
| ) | |
| parser.add_argument( | |
| "--include-balloonports", | |
| action="store_true", | |
| default=False, | |
| help="Include balloonports (excluded by default)", | |
| ) | |
| parser.add_argument( | |
| "--no-include-balloonports", | |
| action="store_false", | |
| dest="include_balloonports", | |
| help="Exclude balloonports", | |
| ) | |
| parser.add_argument( | |
| "--include-gliderports", | |
| action="store_true", | |
| default=False, | |
| help="Include gliderports (excluded by default)", | |
| ) | |
| parser.add_argument( | |
| "--no-include-gliderports", | |
| action="store_false", | |
| dest="include_gliderports", | |
| help="Exclude gliderports", | |
| ) | |
| parser.add_argument( | |
| "--include-ultralights", | |
| action="store_true", | |
| default=False, | |
| help="Include ultralight fields (excluded by default)", | |
| ) | |
| parser.add_argument( | |
| "--no-include-ultralights", | |
| action="store_false", | |
| dest="include_ultralights", | |
| help="Exclude ultralight fields", | |
| ) | |
| parser.add_argument( | |
| "--include-simulators", | |
| action="store_true", | |
| default=False, | |
| help="Include simulator flights when determining visited airports", | |
| ) | |
| parser.add_argument( | |
| "--log-level", | |
| choices=["debug", "info", "warning", "error"], | |
| default="warning", | |
| help="Set logging level (default: warning)", | |
| ) | |
| return parser.parse_args() | |
| def configure_logging(level: str) -> None: | |
| """Configure logging based on command-line option.""" | |
| level_map = { | |
| "debug": logging.DEBUG, | |
| "info": logging.INFO, | |
| "warning": logging.WARNING, | |
| "error": logging.ERROR, | |
| } | |
| logging.basicConfig( | |
| level=level_map[level], | |
| format="%(levelname)s: %(message)s", | |
| ) | |
| def main() -> None: | |
| """Main entry point.""" | |
| args = parse_args() | |
| configure_logging(args.log_level) | |
| logger.info("Starting ClosestUnvisitedAirport") | |
| logger.debug(f"Home airport: {args.home_airport}") | |
| # Load visited airports from LogTen Pro | |
| logger.info("Loading visited airports from LogTen Pro") | |
| try: | |
| logten_reader = LogTenReader(args.logbook) | |
| visited_airports = logten_reader.fetch_visited_airports( | |
| exclude_simulators=not args.include_simulators | |
| ) | |
| except FileNotFoundError as e: | |
| print(f"Error: {e}", file=__import__("sys").stderr) | |
| raise SystemExit(1) | |
| logger.debug(f"Found {visited_airports.count} visited airports") | |
| # Load NASR airport data | |
| logger.info("Loading NASR airport data") | |
| nasr_loader = NASRLoader() | |
| try: | |
| airports = nasr_loader.load_airports() | |
| except Exception as e: | |
| print(f"Error loading NASR data: {e}", file=__import__("sys").stderr) | |
| raise SystemExit(1) | |
| logger.debug(f"Loaded {len(airports)} airports from NASR") | |
| # Find home airport | |
| home_airport = nasr_loader.find_airport(args.home_airport) | |
| if not home_airport: | |
| print( | |
| f"Error: Airport '{args.home_airport}' not found in NASR database.\n" | |
| "Verify the airport code is correct. Use either the FAA LID (e.g., 'OAK') " | |
| "or ICAO code (e.g., 'KOAK').", | |
| file=__import__("sys").stderr, | |
| ) | |
| raise SystemExit(1) | |
| logger.info(f"Home airport: {home_airport.identifier} - {home_airport.name}") | |
| # Set up filters | |
| filters = AirportFilters( | |
| exclude_private=not args.include_private, | |
| exclude_military=not args.include_military, | |
| exclude_heliports=not args.include_heliports, | |
| exclude_seaports=not args.include_seaports, | |
| exclude_balloonports=not args.include_balloonports, | |
| exclude_gliderports=not args.include_gliderports, | |
| exclude_ultralights=not args.include_ultralights, | |
| ) | |
| # Find closest unvisited airports | |
| logger.info("Finding closest unvisited airports") | |
| calculator = DistanceCalculator(airports, visited_airports) | |
| closest = calculator.find_closest_unvisited( | |
| home_airport.latitude, | |
| home_airport.longitude, | |
| count=args.count, | |
| filters=filters, | |
| ) | |
| logger.debug(f"Found {len(closest)} closest unvisited airports") | |
| # Display results | |
| if not closest: | |
| print("No unvisited airports found matching your criteria.") | |
| else: | |
| print(f"Closest unvisited airports from {home_airport.identifier}:") | |
| print() | |
| for i, airport in enumerate(closest, 1): | |
| print(f"{i}. {airport.identifier} {airport.name}: {airport.distance_nm:.1f} nmi") | |
| # Need to import timedelta | |
| from datetime import timedelta | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment