Created
December 10, 2022 23:27
-
-
Save jgeboski/56eb625bc9072c5f11dde164cc7b9cb8 to your computer and use it in GitHub Desktop.
Collector for Prometheus for interface addresses with timing data. Useful with a Cron job in pfSense to export WAN addresses via the node exporter.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.8 | |
import argparse | |
import logging | |
import os | |
import re | |
import subprocess | |
import time | |
from datetime import datetime | |
from ipaddress import ip_address, IPv4Address, IPv6Address | |
from typing import Any, Dict, NamedTuple, Optional, Set, Union | |
IfAddrs = Dict[str, Set[Union[IPv4Address, IPv6Address]]] | |
IfAddrTimes = Dict[str, Dict[Union[IPv4Address, IPv6Address], int]] | |
METRIC_NAME = "node_network_address_assigned_seconds" | |
PROMETHEUS_METRIC_PATTERN = re.compile( | |
rf"^(?P<name>\w+)\s*" r"(?:{(?P<labels>.+)})?\s*" r"(?P<value>\d+(?:\.\d+)?)$" | |
) | |
logger = logging.getLogger(__name__) | |
class PrometheusMetric(NamedTuple): | |
name: str | |
value: float | |
labels: Dict[str, str] | |
def __str__(self) -> str: | |
labels_str = ",".join( | |
f'{label}="{value}"' | |
for label, value in sorted(self.labels.items(), key=lambda lv: lv[0]) | |
) | |
return f"{self.name}{{{labels_str}}} {self.value}" | |
@staticmethod | |
def from_str(line: str) -> Optional["PrometheusMetric"]: | |
line = line.strip() | |
if line.startswith("#"): | |
return None | |
metric_match = PROMETHEUS_METRIC_PATTERN.match(line) | |
if not metric_match: | |
print("1") | |
return None | |
labels: Dict[str, str] = {} | |
labels_str = metric_match.group("labels") | |
for label_str in labels_str.split(","): | |
label_parts = label_str.split("=", 1) | |
if len(label_parts) != 2: | |
print("2") | |
return None | |
label = label_parts[0].strip() | |
value_str = label_parts[1].strip() | |
if not value_str.startswith('"') or not value_str.startswith('"'): | |
return None | |
labels[label] = value_str[1:-1] | |
return PrometheusMetric( | |
name=metric_match.group("name"), | |
value=float(metric_match.group("value")), | |
labels=labels, | |
) | |
def get_if_addrs() -> IfAddrs: | |
if_addrs: IfAddrs = {} | |
proc = subprocess.run("ifconfig", stdout=subprocess.PIPE, check=True) | |
if_name = None | |
for line in proc.stdout.decode().splitlines(): | |
stripped_line = line.strip() | |
if not stripped_line: | |
continue | |
words = stripped_line.split() | |
assert len(words) > 0, f"Invalid line: {line}" | |
row_type = words[0] | |
if row_type.endswith(":"): | |
row_type = row_type[:-1] | |
if row_type in ("inet", "inet6"): | |
assert len(words) > 1, f"Invalid inet line: {line}" | |
assert if_name, f"No if name for line: {line}" | |
# FreeBSD appends %{if_name} to IPv6 addresses | |
addr_str = words[1].split("%", 1)[0] | |
if_addrs[if_name].add(ip_address(addr_str)) | |
elif not line[0].isspace(): | |
if_name = row_type | |
if_addrs[if_name] = set() | |
return dict(if_addrs) | |
def filter_public_if_addrs(if_addrs: IfAddrs) -> IfAddrs: | |
public_if_addrs: IfAddrs = {} | |
for if_name, addrs in if_addrs.items(): | |
filtered_addrs = {addr for addr in addrs if addr.is_global} | |
if filtered_addrs: | |
public_if_addrs[if_name] = filtered_addrs | |
return public_if_addrs | |
def update_if_addr_times(if_addr_times: IfAddrTimes, if_addrs: IfAddrs) -> IfAddrTimes: | |
updated_if_addr_times: IfAddrTimes = {} | |
now = int(time.time()) | |
for if_name, addrs in if_addrs.items(): | |
addr_times = if_addr_times.get(if_name, {}) | |
for addr in addrs: | |
if if_name not in updated_if_addr_times: | |
updated_if_addr_times[if_name] = {} | |
updated_if_addr_times[if_name][addr] = addr_times.get(addr, now) | |
return updated_if_addr_times | |
def load_if_addr_times(prom_file: str) -> IfAddrTimes: | |
if not os.path.exists(prom_file): | |
return {} | |
if_addr_times: IfAddrTimes = {} | |
with open(prom_file) as fp: | |
for line in fp: | |
metric = PrometheusMetric.from_str(line) | |
if ( | |
not metric | |
or metric.name != METRIC_NAME | |
or set(metric.labels) - {"addr", "if_name", "addr_type"} | |
): | |
logger.warning("Ignoring bad metric in %s: %s", prom_file, line) | |
continue | |
if_name = metric.labels["if_name"] | |
try: | |
addr = ip_address(metric.labels["addr"]) | |
acquire_time = int(metric.value) | |
except ValueError: | |
logger.warning("Ignoring bad metric in %s: %s", prom_file, line) | |
continue | |
if if_name not in if_addr_times: | |
if_addr_times[if_name] = {} | |
if_addr_times[if_name][addr] = acquire_time | |
return if_addr_times | |
def write_if_addr_times(if_addr_times: IfAddrTimes, prom_file: str) -> None: | |
os.makedirs(os.path.dirname(prom_file), exist_ok=True) | |
with open(prom_file, "w") as fp: | |
for if_name, addr_times in if_addr_times.items(): | |
for addr, acquire_time in addr_times.items(): | |
metric = PrometheusMetric( | |
name=METRIC_NAME, | |
value=float(acquire_time), | |
labels={ | |
"addr": f"{addr}", | |
"if_name": if_name, | |
"addr_type": f"IPv{addr.version}", | |
}, | |
) | |
print(f"{metric}", file=fp) | |
def parse_args() -> Any: | |
parser = argparse.ArgumentParser( | |
description="Export interface addresses with timing for Prometheus", | |
) | |
parser.add_argument( | |
"--prom-file", | |
"-f", | |
action="store", | |
help="File path for the PROM timing data.", | |
required=True, | |
) | |
parser.add_argument( | |
"--only-public", | |
"-p", | |
action="store_true", | |
help="Only export public addresses.", | |
) | |
parser.add_argument( | |
"--verbose", | |
"-v", | |
action="store_true", | |
help="Show verbose logging messages.", | |
) | |
return parser.parse_args() | |
def main() -> None: | |
args = parse_args() | |
logging.basicConfig( | |
format="%(levelname)s: %(message)s", | |
level=logging.DEBUG if args.verbose else logging.INFO, | |
) | |
if_addrs = get_if_addrs() | |
if args.only_public: | |
if_addrs = filter_public_if_addrs(if_addrs) | |
if args.verbose: | |
for if_name, addrs in if_addrs.items(): | |
for addr in addrs: | |
logger.debug("Found address %s on %s", addr, if_name) | |
logger.info("Found %s addresses on the host", len(if_addrs)) | |
if_addr_times = load_if_addr_times(args.prom_file) | |
if args.verbose: | |
for if_name, addr_times in if_addr_times.items(): | |
for addr, acquire_time in addr_times.items(): | |
logger.debug( | |
"Loaded previous address %s on %s from %s", | |
addr, | |
if_name, | |
datetime.utcfromtimestamp(acquire_time).strftime("%c"), | |
) | |
logger.info("Loaded %s previous addresses", len(if_addr_times)) | |
updated_if_addr_times = update_if_addr_times(if_addr_times, if_addrs) | |
if updated_if_addr_times != if_addr_times: | |
old_addrs = { | |
(if_name, f"{addr}") | |
for if_name, addr_times in if_addr_times.items() | |
for addr in addr_times | |
} | |
new_addrs = { | |
(if_name, f"{addr}") | |
for if_name, addr_times in updated_if_addr_times.items() | |
for addr in addr_times | |
} | |
added_addrs = sorted(new_addrs - old_addrs) | |
for if_name, str_addr in added_addrs: | |
logger.info("Added address %s on %s", if_name, str_addr) | |
removed_addrs = sorted(old_addrs - new_addrs) | |
for if_name, str_addr in removed_addrs: | |
logger.info("Removed address %s on %s", if_name, str_addr) | |
logger.info( | |
"Added %s new addresses and removed %s old addresses", | |
len(added_addrs), | |
len(removed_addrs), | |
) | |
write_if_addr_times(updated_if_addr_times, args.prom_file) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
pfSense Cron job: