Last active
June 2, 2020 12:38
-
-
Save viiru-/4da114fe94eb2cea2f4f7c708e8ef3b6 to your computer and use it in GitHub Desktop.
Script for loading electricity consumption data from several local power companies into Influxdb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import sys | |
import asyncio | |
import json | |
import datetime | |
from urllib.parse import urljoin | |
from typing import NamedTuple | |
import aiohttp | |
import bs4 | |
import pytz | |
import yaml | |
from aioinflux import InfluxDBClient, lineprotocol, TIMEDT, MEASUREMENT, TAG, FLOAT | |
FORM_URL = '/eServices/Online/IndexNoAuth' | |
LOGIN_URL = '/eServices/Online/Login' | |
USAGE_URL = '/Reporting/CustomerConsumption/GetHourlyConsumption' | |
LOCALTIME = pytz.timezone('Europe/Helsinki') | |
@lineprotocol | |
class PowerUsage(NamedTuple): | |
"""Definition of the Influxdb power measurement schema""" | |
timestamp: TIMEDT | |
total_usage: FLOAT | |
tariff: TAG | |
name: MEASUREMENT = 'power_usage' | |
@lineprotocol | |
class Temperature(NamedTuple): | |
"""Definition of the Influxdb temperature measurement schema""" | |
timestamp: TIMEDT | |
temperature: FLOAT | |
temperature_location: TAG | |
name: MEASUREMENT = 'temperature' | |
async def get_login_token(session, config): | |
"""Parse CSRF token from the login form""" | |
async with session.get(urljoin(config['base_url'], FORM_URL)) as login_form: | |
login_form.raise_for_status() | |
content = await login_form.text() | |
soup = bs4.BeautifulSoup(content, features='html.parser') | |
form_html = soup.find(id='loginform') | |
token = form_html.find('input', attrs={'name': '__RequestVerificationToken'})['value'] | |
return token | |
async def login(session, config): | |
"""Login to the service, sets cookies as result""" | |
token = await get_login_token(session, config) | |
login_data = {'UserName': config['username'], | |
'Password': config['password'], | |
'__RequestVerificationToken': token} | |
async with session.post(urljoin(config['base_url'], LOGIN_URL), data=login_data) as resp: | |
resp.raise_for_status() | |
return resp | |
async def fetch_consumption(session, config): | |
"""Retrieves and parses JSON consumption data""" | |
params = {'customerCode': config['customerCode'], | |
'networkCode': config['networkCode'], | |
'meteringPointCode': config['meteringPointCode'], | |
'enableTemperature': 'true', | |
'enablePriceSeries': 'false', | |
'enableTemperatureCorrectedConsumption': 'true', | |
'mpSourceCompanyCode': '', | |
'activeTarificationId': ''} | |
async with session.post(urljoin(config['base_url'], USAGE_URL), data=params) as resp: | |
resp.raise_for_status() | |
json_data = await resp.text() | |
return json.loads(json_data) | |
def convert_timestamp(timestamp): | |
"""From milliseconds to seconds after epoch, and mark as local time. | |
Strange format, milliseconds from the epoch but not in UTC.""" | |
dt = datetime.datetime.utcfromtimestamp(timestamp / 1000) | |
dt = LOCALTIME.localize(dt) | |
return dt | |
def process_consumptions(data): | |
"""Parses consumption data from the JSON response, and yields | |
PowerUsage objects for loading into Influxdb""" | |
for series in data: | |
tariff = series['Series']['Name'] | |
for timestamp, usage in series['Series']['Data']: | |
measurement = PowerUsage(timestamp=convert_timestamp(timestamp), | |
tariff=tariff, | |
total_usage=usage) | |
yield measurement | |
def process_temperature(data): | |
"""Parses temperature data from the JSON response, and yields | |
Temperature objects for loading into Influxdb""" | |
temperature_location = data['Name'] | |
for timestamp, temperature in data['Data']: | |
measurement = Temperature(timestamp=convert_timestamp(timestamp), | |
temperature=temperature, | |
temperature_location=temperature_location) | |
yield measurement | |
async def load_influx(data, config): | |
"""Loads parsed PowerUsage objects into Influxdb""" | |
async with InfluxDBClient(db=config['database'], | |
username=config['username'], | |
password=config['password']) as client: | |
consumptions = client.write(process_consumptions(data['Consumptions'])) | |
temperature = client.write(process_temperature(data['Temperature'])) | |
await asyncio.gather(consumptions, temperature) | |
async def main(config): | |
async with aiohttp.ClientSession() as session: | |
await login(session, config['service']) | |
data = await fetch_consumption(session, config['service']) | |
await load_influx(data, config['influxdb']) | |
series1_end = data['Consumptions'][0]['Series']['Data'][-1] | |
series1_name = data['Consumptions'][0]['Series']['Name'] | |
series2_end = data['Consumptions'][1]['Series']['Data'][-1] | |
series2_name = data['Consumptions'][1]['Series']['Name'] | |
print(series1_name, convert_timestamp(series1_end[0]), series1_end[1]) | |
print(series2_name, convert_timestamp(series2_end[0]), series2_end[1]) | |
print('Temperature', data['Temperature']['Name'], | |
convert_timestamp(data['Temperature']['Data'][-1][0]), | |
data['Temperature']['Data'][-1][1]) | |
if __name__ == '__main__': | |
with open(sys.argv[1]) as config_file: | |
orig_config = yaml.safe_load(config_file) | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main(orig_config)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment