Last active
October 14, 2021 04:43
-
-
Save mmerickel/2e5d6c1532619d371a4c0fa6d490816b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import boto3 | |
from collections import defaultdict | |
import csv | |
from datetime import datetime, timedelta | |
import logging | |
from pprint import pprint | |
import queue | |
import signal | |
import sys | |
import time | |
import threading | |
log = logging.getLogger('openvpn-monitor') | |
def batch(items, chunk_size): | |
for i in range(0, len(items), chunk_size): | |
yield items[i: i + chunk_size] | |
def throttle(delay): | |
while True: | |
yield True | |
time.sleep(delay) | |
def parse_status_file(fp): | |
title = None | |
time = None | |
clients = [] | |
routes = [] | |
headers = {} | |
for row in csv.reader(fp, delimiter='\t'): | |
type = row[0] | |
if type == 'END': | |
break | |
elif type == 'TITLE': | |
title = row[1] | |
elif type == 'TIME': | |
time = datetime.fromtimestamp(int(row[2])) | |
elif type == 'HEADER': | |
headers[row[1]] = row[2:] | |
log.debug('parsed table=%s header=%s', row[1], headers[row[1]]) | |
elif type == 'CLIENT_LIST': | |
client = dict(zip(headers['CLIENT_LIST'], row[1:])) | |
client['Connected Since'] = datetime.fromtimestamp( | |
int(client['Connected Since (time_t)']), | |
) | |
client['Bytes Sent'] = int(client['Bytes Sent']) | |
client['Bytes Received'] = int(client['Bytes Received']) | |
clients.append(client) | |
elif type == 'ROUTING_TABLE': | |
route = dict(zip(headers['ROUTING_TABLE'], row[1:])) | |
route['Last Ref'] = datetime.fromtimestamp(int(route['Last Ref (time_t)'])) | |
routes.append(route) | |
return { | |
'title': title, | |
'time': time, | |
'clients': clients, | |
'connecting_clients': [ | |
c | |
for c in clients | |
if c['Virtual Address'] == '' | |
], | |
'connected_clients': [ | |
c | |
for c in clients | |
if ( | |
c['Virtual Address'] | |
# UNDEF means the client has failed to authenticate | |
and c['Common Name'] != 'UNDEF' | |
and c['Username'] != 'UNDEF' | |
) | |
], | |
'routes': routes, | |
} | |
def initialize_accumulator(status, last_acc=None): | |
acc = { | |
'initial_time': status['time'], | |
'last_time': status['time'], | |
'last_status': status, | |
'connected_client_ids': {c['Client ID'] for c in status['connected_clients']}, | |
'known_client_ids': {c['Client ID'] for c in status['clients']}, | |
'new_client_ids': set(), | |
'new_connected_client_ids': set(), | |
'failed_authentication_attempts': 0, | |
'user': defaultdict(lambda: { | |
'connected_client_ids': set(), | |
'net_bytes_sent': 0, | |
'net_bytes_recv': 0, | |
}), | |
} | |
accumulate_stats(status, acc) | |
return acc | |
def accumulate_stats(status, acc): | |
last_status = acc['last_status'] | |
acc['last_time'] = status['time'] | |
acc['last_status'] = status | |
connected_client_ids = {c['Client ID'] for c in status['connected_clients']} | |
connecting_client_ids = {c['Client ID'] for c in status['connecting_clients']} | |
# if a previously connecting client fell out of connecting, determine if it | |
# was successful or a failed authentication attempt | |
for c in last_status['connecting_clients']: | |
cid = c['Client ID'] | |
if cid not in connected_client_ids and cid not in connecting_client_ids: | |
log.debug( | |
'detected authentication failure from client=%s, username=%s', | |
cid, c['Username'], | |
) | |
acc['failed_authentication_attempts'] += 1 | |
acc['connected_client_ids'] = connected_client_ids | |
acc['new_client_ids'].update( | |
c['Client ID'] | |
for c in status['clients'] | |
if c['Client ID'] not in acc['known_client_ids'] | |
) | |
acc['known_client_ids'].update(acc['new_client_ids']) | |
# if the new client id ends up connected, count it as a new connection | |
for cid in acc['new_client_ids']: | |
if cid in acc['connected_client_ids']: | |
log.debug('detected new connected client id=%s', cid) | |
acc['new_connected_client_ids'].add(cid) | |
for client in status['connected_clients']: | |
client_bytes_sent = client['Bytes Sent'] | |
client_bytes_recv = client['Bytes Received'] | |
last_client = next( | |
(c for c in last_status['clients'] if c['Client ID'] == client['Client ID']), | |
None, | |
) | |
if last_client is not None: | |
client_bytes_sent -= last_client['Bytes Sent'] | |
client_bytes_recv -= last_client['Bytes Received'] | |
username = client['Username'] | |
stats = acc['user'][username] | |
stats['connected_client_ids'].add(client['Client ID']) | |
stats['net_bytes_sent'] += client_bytes_sent | |
stats['net_bytes_recv'] += client_bytes_recv | |
def generate_metrics(acc, time, extra_dimensions): | |
metrics = [] | |
metrics.append({ | |
'MetricName': 'total_connected_clients', | |
'Values': [len(acc['connected_client_ids'])], | |
'Timestamp': time, | |
'Unit': 'Count', | |
'Dimensions': extra_dimensions, | |
}) | |
metrics.append({ | |
'MetricName': 'new_connected_clients', | |
'Values': [len(acc['new_connected_client_ids'])], | |
'Timestamp': time, | |
'Unit': 'Count', | |
'Dimensions': extra_dimensions, | |
}) | |
metrics.append({ | |
'MetricName': 'new_connection_attempts', | |
'Values': [len(acc['new_client_ids'])], | |
'Timestamp': time, | |
'Unit': 'Count', | |
'Dimensions': extra_dimensions, | |
}) | |
metrics.append({ | |
'MetricName': 'failed_authentication_attempts', | |
'Values': [acc['failed_authentication_attempts']], | |
'Timestamp': time, | |
'Unit': 'Count', | |
'Dimensions': extra_dimensions, | |
}) | |
def record_user_metric(user, name, unit, value): | |
metrics.append({ | |
'MetricName': name, | |
'Dimensions': [ | |
{ | |
'Name': 'Username', | |
'Value': user, | |
}, | |
*extra_dimensions, | |
], | |
'Timestamp': time, | |
'Values': [value], | |
'Unit': unit, | |
}) | |
for user, stats in acc['user'].items(): | |
record_user_metric( | |
user, 'connected_clients', 'Count', len(stats['connected_client_ids'])) | |
record_user_metric(user, 'net_bytes_recv', 'Bytes', stats['net_bytes_recv']) | |
record_user_metric(user, 'net_bytes_sent', 'Bytes', stats['net_bytes_sent']) | |
return metrics | |
def upload_main(*, q, namespace, extra_dimensions, exc_info, dry_run=False): | |
try: | |
cw = boto3.client('cloudwatch') | |
while (msg := q.get()) is not None: | |
now, acc = msg | |
metrics = generate_metrics(acc, now, extra_dimensions) | |
if dry_run: | |
pprint(metrics) | |
continue | |
for chunk in batch(metrics, 20): | |
try: | |
cw.put_metric_data( | |
Namespace=namespace, | |
MetricData=chunk, | |
) | |
except Exception: | |
log.exception('failed to upload metric data') | |
continue | |
except BaseException: | |
log.exception('unhandled error occurred uploading metrics') | |
exc_info.extend(sys.exc_info()) | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--status-file', required=True) | |
parser.add_argument('--poll-interval', type=int, default=5) | |
parser.add_argument('--cw-interval', type=int, default=60) | |
parser.add_argument('--cw-namespace', default='OpenVPN') | |
parser.add_argument('--dim', dest='dimensions', action='append', default=[]) | |
parser.add_argument('--dry-run', action='store_true') | |
parser.add_argument('-v', '--verbose', action='count', default=0) | |
args = parser.parse_args() | |
root_log_level = logging.WARNING | |
local_log_level = logging.INFO | |
if args.verbose >= 1: | |
local_log_level = logging.DEBUG | |
if args.verbose >= 2: | |
root_log_level = logging.INFO | |
if args.verbose >= 3: | |
root_log_level = logging.DEBUG | |
log.setLevel(local_log_level) | |
logging.basicConfig(level=root_log_level) | |
extra_dimensions = [] | |
for entry in args.dimensions: | |
name, value = entry.split('=', 1) | |
extra_dimensions.append({ | |
'Name': name, | |
'Value': value, | |
}) | |
q = queue.Queue() | |
upload_exc_info = [] | |
upload_thread = threading.Thread( | |
target=upload_main, | |
kwargs=dict( | |
q=q, | |
dry_run=args.dry_run, | |
namespace=args.cw_namespace, | |
extra_dimensions=extra_dimensions, | |
exc_info=upload_exc_info, | |
), | |
) | |
upload_thread.daemon = True | |
upload_thread.start() | |
stop = False | |
report_interval = timedelta(seconds=args.cw_interval) | |
last_report = datetime.utcnow() | |
acc = None | |
def handle_stop(*a, **kw): | |
nonlocal stop | |
stop = True | |
signal.signal(signal.SIGTERM, handle_stop) | |
signal.signal(signal.SIGINT, handle_stop) | |
for _ in throttle(args.poll_interval): | |
if stop or not upload_thread.is_alive(): | |
break | |
with open(args.status_file) as fp: | |
try: | |
status = parse_status_file(fp) | |
except Exception: | |
log.exception('failed to parse openvpn status') | |
continue | |
if acc is None: | |
acc = initialize_accumulator(status, None) | |
continue | |
if acc['last_time'] != status['time']: | |
accumulate_stats(status, acc) | |
log.debug("accumulator=%s", acc) | |
now = datetime.utcnow() | |
if now - last_report > report_interval or stop: | |
q.put((now, acc)) | |
acc = initialize_accumulator(status, acc) | |
last_report = now | |
log.info('shutting down...') | |
q.put(None) | |
upload_thread.join() | |
if upload_exc_info: | |
return 1 | |
return 0 | |
if __name__ == '__main__': | |
raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment