Skip to content

Instantly share code, notes, and snippets.

@ed-davies
Created September 30, 2024 18:33
Show Gist options
  • Save ed-davies/9a4b6cb0b42f3eb875bfd0b310445ce1 to your computer and use it in GitHub Desktop.
Save ed-davies/9a4b6cb0b42f3eb875bfd0b310445ce1 to your computer and use it in GitHub Desktop.
Python program to download product and price information from the Octopus API
#!/usr/bin/env python3
# infundibulum.py
"""
Download data from the Octopus Energy API and store it in the mqttrunner
database.
Infundibulum is the Latin for funnel. It's used in various contexts in biology
including for the outer part of the suckers on an octopus's arms.
"""
import sys
import os
from os import path
import argparse
import requests
import datetime
import json
import re
import subprocess
import platform
from mqtt_utils import topic as topicLib
from mqtt_utils import rfc3339
from mqtt_utils.mctl import Mctl
from mqtt_utils import cache_dir
BASE_URL = 'https://api.octopus.energy/v1/'
PRODUCTS_URL = BASE_URL + 'products/'
DEFAULT_PRODUCT = 'AGILE-23-12-06'
DEFAULT_GSP = '_P' # Grid supply point for northern Scotland.
CHARGE_TYPES = ['standing_charges', 'standard_unit_rates']
def parse_options(args):
default_host = Mctl.default_host()
parser = argparse.ArgumentParser(
add_help=False,
description=__doc__)
parser.add_argument(
'--help',
action='store_true',
help="Show this help message and exit")
parser.add_argument(
'--product', '-p',
default=DEFAULT_PRODUCT,
help=f'Octopus Energy product code ({DEFAULT_PRODUCT})')
parser.add_argument(
'--gsp',
default=DEFAULT_GSP,
help=f'Grid supply point ({DEFAULT_GSP})')
parser.add_argument(
'--unit-rate',
action='store_const',
dest='charge_type',
const='standard-unit-rates',
default='standard_unit_rates',
help='Download unit rates (default)')
parser.add_argument(
'--standing-charge', '-s',
action='store_const',
dest='charge_type',
const='standing_charges',
help='Download standing charges')
parser.add_argument(
'--conditional',
action='store_true',
help='Only do download if there\'s no data of this sort already in '
'the mqttrunner database for tomorrow morning')
parser.add_argument(
'--product-list',
action='store_const',
dest='action',
const='product-list',
default='download-rates',
help='Download current and historical products and store in products.json')
parser.add_argument(
'--host', '-h',
default=default_host,
help='Host to send command to (%s)' % (default_host,))
parser.add_argument(
'--localhost', '-l',
action='store_const',
const='localhost',
dest='host',
help='Shorthand for “--host localhost”')
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Chat about what we\'re doing as we do it')
options = parser.parse_args(args)
if options.help:
parser.print_help()
return 0
assert options.charge_type in CHARGE_TYPES
return options
class KVCache:
""" Simple key-value store used to persist assumed-stable information in
a JSON-storeable format to avoid bothering the Octopus servers more than
we absolutely need to.
"""
def __init__(self, filename='infundibulum.cache.json'):
self.filename = path.join(cache_dir.cache_dir(), filename)
try:
with open(self.filename, 'rt') as f:
self.data = json.load(f)
except FileNotFoundError:
self.data = dict()
self.clean = True
def save(self):
if not self.clean:
with open(self.filename, 'wt') as f:
json.dump(self.data, f, ensure_ascii=False, indent=4)
self.clean = True
def __len__(self):
return self.data.__len__()
def __getitem__(self, key):
return self.data.__getitem__(key)
def __setitem__(self, key, value):
r = self.data.__setitem__(key, value)
self.clean = False
return r
def __delitem__(self, key):
r = self.data.__delitem__(key)
self.clean = False
return r
def __iter__(self):
return self.data.__iter__()
def __contains__(self, key):
return self.data.__contains__(key)
def product_url_key(product_code):
""" Key used in the cache for the URL of the information for a particular
Octopus product.
"""
return 'product.' + product_code
def charge_url_key(product_code, gsp, charge_type):
""" Key used in the cache for the URL of the information of a particular
type of charge for an Octopus product and a specified grid supply
point.
"""
assert charge_type in CHARGE_TYPES
return product_url_key(product_code) + '.gsp.' + gsp + '.' + charge_type
def fetch(op, url):
""" Fetch a JSON data file from the Octopus server and decode it from JSON """
if op.options.verbose:
print('Fetching:', url, file=sys.stderr)
r = requests.get(url)
if r.status_code != 200:
raise ValueError(f'GET of {url} returned status {r.status_code}')
return r.json()
def fetch_product_data_url(op, product_code):
""" Find the URL of the data file for a particular product. """
for historical in (False, True):
url = PRODUCTS_URL
if historical:
url += '?is_historical=true'
while True:
data = fetch(op, url)
for d in data['results']:
if d['code'] == product_code:
for link in d['links']:
if (link['rel'] == 'self') and (link['method'] == 'GET'):
return link['href']
json.dump(d, sys.stderr, indent=True)
print(file=sys.stderr)
raise ValueError(f'No link found for {product_code} in {url}')
if data['next'] != None:
url = data['next']
else:
break
raise ValueError(f'Product code {product_code} not found')
def fetch_product_data(op, product_code):
""" Fetch the data about a particular Octopus product. """
key = product_url_key(product_code)
if key in op.cache:
url = op.cache[key]
else:
url = fetch_product_data_url(op, product_code)
op.cache[key] = url
return fetch(op, url)
def fetch_charge_data_url(op, product_code, gsp, charge_type):
""" Find the URL of the file for the standing charge or unit rates for a
particular Octopus product in a particular grid supply point area.
This will be the URL of the most recent data. Older data will be found
by following the 'next' links back in time.
"""
product_data = None
tariffs = None
tariff = None
url = None
try:
key = charge_url_key(product_code, gsp, charge_type)
if key in op.cache:
url = op.cache[key]
else:
product_data = fetch_product_data(op, product_code)
try:
tariffs = product_data['single_register_electricity_tariffs']
except:
raise ValueError(f'Single register electricity tariffs not found for {product_code}')
try:
tariffs = tariffs[gsp]
except:
raise ValueError(f'Grid supply point {gsp} not found for {product_code}')
tariffs = [tariffs[k]
for k in ('direct_debit_monthly', 'varying')
if k in tariffs]
if len(tariffs) == 0:
raise ValueError(f'No tariffs found for {product_code} in area {gsp}')
elif len(tariffs) > 1:
raise ValueError(f'Multiple tariffs found for {product_code} in area {gsp}')
tariff = tariffs[0]
for link in tariff['links']:
if link['method'] == 'GET':
rel = link['rel']
href = link['href']
if rel == charge_type:
assert url == None
url = href
if rel in CHARGE_TYPES:
op.cache[charge_url_key(product_code, gsp, rel)] = href
if url == None:
raise ValueError(f'{charge_type} URL not found for {product_code} in area {gsp}')
except:
if tariff != None:
json.dump(tariff, sys.stderr, indent=4)
elif tariffs != None:
json.dump(tariffs, sys.stderr, indent=4)
elif product_data != None:
json.dump(product_data, sys.stderr, indent=4)
raise
return url
def charge_topic(product_code, gsp, charge_type):
""" Return the MQTT/database topic name used for the data of the specified
sort.
"""
return topicLib.asString(['octopus', product_code, gsp, charge_type, 'value'])
def new_data(op, topic, page_data):
""" Find the indicies of the timestamp-value tuples in the page of data
returned by the Octopus web site which are not already in the mqttrunner
database.
"""
reader = op.mctl.topic(
topic,
start=page_data[0][0],
end=page_data[-1][0] + datetime.timedelta(seconds=1)
)
index = {timestamp: i for i, (timestamp, _) in enumerate(page_data)}
for timestamp, value in reader:
if timestamp in index:
i = index[timestamp]
if op.options.verbose:
print('Already:',
rfc3339.format(timestamp), i,
page_data[i][1], value,
file=sys.stderr)
assert page_data[i][1] == value
del index[timestamp]
return sorted(index.values())
def post(op, timestamp, topic, value):
""" Add a message to the mqttrunner database. """
print('Post:', rfc3339.format(timestamp), topic, value, file=sys.stderr)
op.mctl.post(timestamp, topic, value)
def update_database(op, product_code, gsp, charge_type):
""" Update the mqttrunner database with available recent data from the
Octopus API for unit rates or standing charges.
We assume the mqttrunner database is complete for all data older than
the most recent data present. I.e., it's only recent data which needs
fetching.
"""
def dt(obj, field):
f = obj.get(field, None)
if f != None:
f = rfc3339.parse(f).astimezone(datetime.timezone.utc)
return f
topic = charge_topic(product_code, gsp, charge_type)
url = fetch_charge_data_url(op, product_code, gsp, charge_type)
urls_visited = set()
while True:
page = fetch(op, url)
urls_visited.add(url)
page_data = []
δ = datetime.timedelta(microseconds=1)
for d in page['results']:
value = d['value_inc_vat']
valid_from = dt(d, 'valid_from')
assert valid_from != None
page_data.append((valid_from, value))
valid_to = dt(d, 'valid_to')
if valid_to != None:
page_data.append((valid_to - δ, value))
page_data.sort()
if len(page_data) == 0:
break
new_data_indicies = new_data(op, topic, page_data)
for i in new_data_indicies:
timestamp, value = page_data[i]
post(op, timestamp, topic, value)
need_next_page = (0 in new_data_indicies)
if need_next_page and ('next' in page) and (page['next'] != None):
url = page['next']
assert url not in urls_visited
else:
break
def data_to_hand(op, product_code, gsp, charge_type):
""" Return True iff we have at least some data for the specified product
in the mqttrunner database for the following morning, False otherwise.
"""
reader = op.mctl.topic(
charge_topic(product_code, gsp, charge_type),
start='.@P1D@T06',
end='T12'
)
data_to_hand = False
for l in reader:
data_to_hand = True
return data_to_hand
def download_product_list(op):
""" Download current and historical product information and write to a
products.json file in the current directory.
"""
product_list = []
for historical in (False, True):
url = PRODUCTS_URL
if historical:
url += '?is_historical=true'
while True:
data = fetch(op, url)
product_list += data['results']
if data['next'] != None:
url = data['next']
else:
break
products_file = 'products.json'
with open(products_file, 'wt') as f:
json.dump(product_list, f, ensure_ascii=False, indent=4)
print(file=f)
if op.options.verbose:
print(f'File {products_file} written.')
def main(options):
class op:
options = None
cache = None
mctl = None
try:
op.options = options
if op.options.action == 'download-rates':
print(f'{datetime.datetime.now().isoformat()}: infundibulum download start',
file=sys.stderr)
op.mctl = Mctl(options)
if options.conditional:
if data_to_hand(
op,
op.options.product,
op.options.gsp,
op.options.charge_type):
print('At least some data in database for 06Z to 12Z tomorrow, fetch not done.',
file=sys.stderr)
return
op.cache = KVCache()
update_database(
op,
op.options.product,
op.options.gsp,
op.options.charge_type)
elif op.options.action == 'product-list':
download_product_list(op)
else:
raise ValueError(f'Unknown action {op.options.action}.')
finally:
if op.cache != None:
op.cache.save()
if op.mctl != None:
op.mctl.close()
if __name__ == '__main__':
options = parse_options(sys.argv[1:])
if options == 0: # Help given.
sys.exit(0)
main(options)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment