Skip to content

Instantly share code, notes, and snippets.

@paul121
Last active January 8, 2025 23:36
Show Gist options
  • Save paul121/4ba3ae9005a93e1f50410adc756f9b28 to your computer and use it in GitHub Desktop.
Save paul121/4ba3ae9005a93e1f50410adc756f9b28 to your computer and use it in GitHub Desktop.
farmOS.py async Rothamsted example

This is an example python script using async farmOS.py (via HTTPX) from this PR: farmOS/farmOS.py#67

This example was designed for Rothamsted Research to export quantities from all logs associated with a single given experiment. Using async requests improves performance by 10x when making requests to Experiment plans that have many 50-100+ Plot assets associated with them.

This example.py can be run in a Google colab notebook. First install the dev version of async farmOS.py:

pip install git+https://github.com/paul121/[email protected]
import asyncio
from collections import ChainMap
import csv
import time
from google.colab import userdata
from farmOS import AsyncFarmClient
from httpx_auth import OAuth2ResourceOwnerPasswordCredentials
from httpx import Limits
hostname = "https://example.com"
log_types = [
"activity",
"drilling",
"harvest",
"input",
"lab_test",
"maintenance",
"observation",
"seeding",
]
async def process_all_asset_logs(farm, asset_id):
"""
Helper function to dispatch jobs and combine logs for the asset.
"""
tasks = [process_asset_log_type(farm, log_type, asset_id) for log_type in log_types]
results = await asyncio.gather(*tasks)
return dict(ChainMap(*results))
async def process_asset_log_type(farm, log_type, asset_id):
"""
Helper function to fetch all logs of a given type for an asset.
"""
collected_logs = {}
asset_id_filter = farm.filter('asset.id', asset_id)
params = {
**asset_id_filter,
"sort": "timestamp",
"include": "quantity,quantity.units,quantity.material_type",
}
logs = await farm.resource.get('log', log_type, {**params})
# Index includes by id.
includes = {}
for included in logs.get("included", []):
includes[included["id"]] = included
# Process each log.
for log in logs["data"]:
uuid = log["id"]
collected_logs[uuid] = {
"uuid": uuid,
"id": log["attributes"]["drupal_internal__id"],
"quantity": {}
}
# Process quantities.
quantity_ids = [quantity["id"] for quantity in log["relationships"]["quantity"]["data"]]
for qid in quantity_ids:
quantity = includes[qid]
# Pull in units term.
quantity["units"] = None
if includes[qid]["relationships"]["units"]["data"] is not None:
term_id = includes[qid]["relationships"]["units"]["data"]["id"]
if term_id in includes:
quantity["units"] = includes[term_id]["attributes"]["name"]
# Pull in material type term.
quantity["material_type"] = []
if includes[qid]["relationships"].get("material_type"):
for term in includes[qid]["relationships"]["material_type"]["data"]:
if term["id"] in includes:
quantity["material_type"].append(includes[term["id"]]["attributes"]["name"])
collected_logs[uuid]["quantity"][qid] = quantity
return collected_logs
async def main():
"""
Main function.
"""
auth = OAuth2ResourceOwnerPasswordCredentials(
token_url=f"{hostname}/oauth/token",
username=userdata.get('username'),
password=userdata.get('password'),
client_id="farm",
scope="rothamsted_data_admin"
)
limits = Limits(max_connections=4, max_keepalive_connections=4)
async with AsyncFarmClient(hostname, auth=auth, limits=limits, timeout=200) as farm:
# Hard-code plan id.
# Change me!
plan_id = 1
# Query the plan.
filters = farm.filter('drupal_internal__id', plan_id)
plans = await farm.resource.get('plan', 'rothamsted_experiment', {**filters})
plan = plans["data"][0]
# Collect plot and asset IDs.
asset_ids = [reference["id"] for reference in plan["relationships"]["asset"]["data"]]
plot_ids = [reference["id"] for reference in plan["relationships"]["plot"]["data"]]
all_asset_ids = asset_ids + plot_ids
# Dispatch jobs to request all logs for each asset.
tic = time.time()
print(f"Fetching logs for {len(all_asset_ids)} assets...")
jobs = [process_all_asset_logs(farm, asset_id) for asset_id in all_asset_ids]
result = await asyncio.gather(*jobs)
toc = time.time()
print(f"Done fetching in {toc-tic} seconds.")
# Merge a list of dicts into a single dict.
all_logs = dict(ChainMap(*result))
print(f"Found {len(all_logs)} unique logs.")
# Build a CSV.
with open(f"plan-{plan_id}-log-quantity.csv", 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
# Header row.
writer.writerow(["log id", "log uuid", "type", "measure", "value", "units", "label", "material_type"])
# Loop through logs.
for log in all_logs.values():
id = log["id"]
uuid = log["uuid"]
for quantity in log.get("quantity", {}).values():
value = None
if quantity["attributes"]["value"]:
value = quantity["attributes"]["value"].get("decimal")
writer.writerow([
id,
uuid,
quantity["relationships"]["quantity_type"]["data"]["meta"]["drupal_internal__target_id"],
quantity["attributes"]["measure"],
value,
quantity["units"],
quantity["attributes"]["label"],
"|".join(quantity["material_type"]),
])
loop = asyncio.get_running_loop()
await loop.create_task(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment