Last active
November 17, 2020 15:51
-
-
Save mattypiper/8277346681ff6422ddd959bedd27a780 to your computer and use it in GitHub Desktop.
Discord webhook bot to generate and post plots of latest COVID-19 data from JHU CSSE
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import argparse | |
import csv | |
import sys | |
import time | |
import datetime | |
import logging | |
import logging.handlers | |
import hashlib | |
import binascii | |
import traceback | |
from systemd.journal import JournaldLogHandler | |
import requests | |
from discord import Webhook, RequestsWebhookAdapter | |
from discord import File as DiscordFile | |
import matplotlib | |
matplotlib.use('Agg') | |
import matplotlib.pyplot as plt | |
import matplotlib.dates as mdates | |
AOIs = ['New York', 'California', 'Washington'] | |
WEBHOOK_URL = 'TODO' | |
NUMDAYS = 60 | |
#pd.set_option('precision', 3) | |
logger = logging.getLogger('covid19stat') | |
stdlog = logging.StreamHandler(sys.stdout) | |
#logging.basicConfig(format='[%(asctime)-15s] %(message)s') | |
logger.addHandler(stdlog) | |
journald_handler = JournaldLogHandler() | |
journald_handler.setFormatter(logging.Formatter('[%(levelname)s] %(message)s')) | |
logger.addHandler(journald_handler) | |
logger.setLevel(logging.INFO) | |
def get_daily(date): | |
dt = date.strftime('%m-%d-%Y') | |
url = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/{}.csv'.format(dt) | |
r = requests.get(url) | |
r.raise_for_status() | |
return r.content | |
def process_daily(data): | |
lines = data.decode('utf-8-sig').splitlines() | |
h = lines[0].split(',') | |
# support both of the following headers/CSV formats | |
fmt1 = ['Province/State', 'Country/Region', 'Last Update', 'Confirmed', 'Deaths', 'Recovered', 'Latitude', 'Longitude'] | |
fmt2 = ['FIPS', 'Admin2', 'Province_State', 'Country_Region', 'Last_Update', 'Lat', 'Long_', 'Confirmed', 'Deaths', 'Recovered', 'Active', 'Combined_Key'] | |
# header detection | |
l1 = min(len(fmt1), len(h)) | |
l2 = min(len(fmt2), len(h)) | |
fmt = 1 | |
for i in range(l1): | |
if fmt1[i] != h[i]: | |
fmt = 2 | |
break | |
if fmt == 2: | |
for i in range(l2): | |
if fmt2[i] != h[i]: | |
fmt = 0 | |
if fmt == 1: | |
fieldnames = ['State', 'Country', 'LastUpdate', 'Confirmed', 'Deaths', 'Recovered', 'Latitude', 'Longitude'] | |
elif fmt == 2: | |
fieldnames = ['FIPS', 'Admin2', 'State', 'Country', 'LastUpdate', 'Lat', 'Long', 'Confirmed', 'Deaths', 'Recovered', 'Active', 'Combined_Key'] | |
else: | |
logger.warning(f'Unknown format: {h}') | |
return None | |
cr = csv.DictReader(lines, fieldnames=fieldnames) | |
return list(cr) | |
def do_plot(x, data, title): | |
fig, ax = plt.subplots() | |
locator = mdates.AutoDateLocator(minticks=3, maxticks=9) | |
formatter = mdates.ConciseDateFormatter(locator) | |
ax.xaxis.set_major_locator(locator) | |
ax.xaxis.set_major_formatter(formatter) | |
for area, y in data.items(): | |
ax.plot(x, y, '.-', label=area) | |
filename = f'{title}.png' | |
logger.debug(f'Saving {filename}') | |
dates = mdates.DateLocator() | |
plt.title(f'{title}') | |
plt.legend() | |
plt.savefig(filename) | |
time.sleep(1) | |
logger.info(f'Created new plot file {filename}') | |
return filename | |
def process(multidata): | |
confirmed = {} | |
deaths = {} | |
for aoi in AOIs: | |
confirmed[aoi] = [0 for _ in multidata] | |
deaths[aoi] = [0 for _ in multidata] | |
for i, k in enumerate(multidata): | |
data = process_daily(multidata[k]) | |
if data is None: | |
logger.warning(f'Skipping {k}') | |
continue | |
#y = [x for x in data if x['State'] in AOIs or x['Country'] in AOIs] | |
y = [x for x in data if x['Country'] == 'US' and x['State'] in AOIs] | |
for z in y: | |
if z['State'] in AOIs: | |
area = z['State'] | |
if z['Country'] in AOIs: | |
area = z['Country'] | |
if area not in confirmed: | |
confirmed[area] = [] | |
if area not in deaths: | |
deaths[area] = [] | |
try: | |
confirmed[area][i] += int(z['Confirmed']) | |
except ValueError: | |
logger.warning(f'Invalid confirmed number for {area}: {z["Confirmed"]}') | |
try: | |
deaths[area][i] += int(z['Deaths']) | |
except ValueError: | |
logger.warning(f'Invalid deaths number for {area}: {z["Deaths"]}') | |
dates = list(multidata.keys()) | |
return (dates, confirmed, deaths) | |
def discord_msg(msg): | |
webhook = Webhook.from_url(WEBHOOK_URL, adapter=RequestsWebhookAdapter()) | |
logger.debug(f'Discord message: {msg}') | |
webhook.send(msg) | |
time.sleep(1) | |
def discord_upload(f): | |
webhook = Webhook.from_url(WEBHOOK_URL, adapter=RequestsWebhookAdapter()) | |
df = DiscordFile(f) | |
logger.debug(f'Discord file upload') | |
webhook.send(file=df) | |
time.sleep(1) | |
def do_confirmed(dates, confirmed, do_discord): | |
#logger.debug(f'Confirmed: {confirmed}') | |
filename = do_plot(dates, confirmed, 'Confirmed Cases') | |
if do_discord: | |
s = 'Confirmed Cases:\n' | |
for c in confirmed.items(): | |
s += f' {c[0]} {c[1][-1]}\n' | |
discord_msg(s) | |
with open(filename, 'rb') as f: | |
discord_upload(f) | |
def do_deaths(dates, deaths, do_discord): | |
#logger.debug(f'Confirmed: {deaths}') | |
filename = do_plot(dates, deaths, 'Deaths') | |
if do_discord: | |
s = 'Deaths:\n' | |
for c in deaths.items(): | |
s += f' {c[0]} {c[1][-1]}\n' | |
discord_msg(s) | |
with open(filename, 'rb') as f: | |
discord_upload(f) | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--nodiscord', action='store_true') | |
args = parser.parse_args() | |
do_discord = args.nodiscord == False | |
hashes = {} # hash of csv files | |
saved_dates = [] | |
saved_confirmed = {} | |
saved_deaths = {} | |
while True: | |
data = {} | |
update = False | |
logger.debug(f'running...') | |
now = datetime.datetime.now() | |
days = sorted([now - datetime.timedelta(days=x) for x in range(NUMDAYS)]) | |
for day in days: | |
try: | |
k = day.strftime('%m%d%Y') | |
a = get_daily(day) | |
data[day] = a | |
m = hashlib.sha256() | |
m.update(a) | |
h = m.digest() | |
if k in hashes and h != hashes[k]: | |
logger.info(f'Hash updated for {k}: {binascii.hexlify(hashes[k])} {binascii.hexlify(h)}') | |
hashes[k] = h | |
update = True | |
elif k not in hashes: | |
logger.info(f'New date {k}: {binascii.hexlify(h)}') | |
hashes[k] = h | |
update = True | |
else: | |
logger.debug(f'No new information for {k}') | |
except requests.exceptions.HTTPError as e: | |
pass | |
except Exception as e: | |
logger.error(f'Exception encountered: {e}') | |
if update: | |
(dates, confirmed, deaths) = process(data) | |
if saved_dates != dates: | |
logger.info('Dates list changed, processing') | |
do_confirmed(dates, confirmed, do_discord) | |
saved_confirmed = confirmed | |
do_deaths(dates, deaths, do_discord) | |
saved_deaths = deaths | |
elif saved_deaths != deaths: | |
logger.info('Deaths list changed, processing') | |
do_deaths(dates, deaths, do_discord) | |
saved_deaths = deaths | |
elif saved_confirmed != confirmed: | |
logger.info('Confirmed list changed, processing') | |
do_confirmed(dates, confirmed, do_discord) | |
saved_confirmed = confirmed | |
logger.debug(f'sleeping...') | |
time.sleep(60*60*12) | |
if __name__ == '__main__': | |
try: | |
main() | |
except Exception as e: | |
logger.error(str(e)) | |
traceback.print_exc() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment