Created
April 18, 2020 21:46
-
-
Save Mikuana/50765a471915c65d435496dbf8f3d5ba to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Use the speedtest-cli package to perform a test of internet speeds, and log the | |
result to file. The results are broken up by month and stored in a folder named | |
`speed_tests`, unless you supply an alternate path in the first command line | |
argument. | |
``` | |
python3 speed_testing.py /var/logs/my_speed_folder | |
``` | |
This script is best used on a schedule. | |
``` | |
pip install speedtest-cli | |
``` | |
""" | |
import csv | |
import sys | |
import json | |
from datetime import datetime | |
from pathlib import Path | |
import speedtest | |
now = datetime.utcnow() | |
try: | |
s = speedtest.Speedtest() | |
s.get_servers([]) | |
s.get_best_server() | |
s.download() | |
s.upload() | |
s.results.share() | |
results = s.results.dict() | |
fmt = '%Y-%m-%dT%H:%M:%S.%fZ' | |
results['timestamp'] = datetime.strptime(results['timestamp'], fmt) | |
results['notes'] = None | |
results['client'] = json.dumps(results['client']) | |
results['server'] = json.dumps(results['server']) | |
except Exception as e: # capture any error with notes and timestamp | |
results = dict( | |
timestamp=now, download=None, upload=None, ping=None, notes=e, | |
bytes_sent=None, bytes_received=None, client=None, server=None | |
) | |
columns = [ | |
'timestamp', 'download', 'upload', 'ping', 'notes', | |
'bytes_sent', 'bytes_received', 'client', 'server' | |
] | |
data = {k: v for k, v in results.items() if k in columns} | |
log_dir = Path(sys.argv[1] if len(sys.argv) > 1 else 'speed_tests') | |
log_dir.mkdir(parents=True, exist_ok=True) | |
p = Path(log_dir, 'speed_tests_' + now.strftime("%Y%m") + '.csv') | |
if not p.exists(): # if file doesn't exist, create it with headers | |
p.write_text(','.join(columns) + '\n') | |
with p.open('a') as f: # append new row of data to file | |
writer = csv.DictWriter(f, columns) | |
writer.writerow(data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment