Skip to content

Instantly share code, notes, and snippets.

@JoaoRodrigues
Created December 19, 2024 21:40
Show Gist options
  • Save JoaoRodrigues/d9aa8d76cd2775e4874be00a41628821 to your computer and use it in GitHub Desktop.
Save JoaoRodrigues/d9aa8d76cd2775e4874be00a41628821 to your computer and use it in GitHub Desktop.
Example of combining RCSB Search and Data APIs to retrieve entry release dates.
"""Retrieve RCSB entry release dates using the API.
Example of combining Search and Data APIs
"""
import csv
import itertools
import json
import urllib3
from datetime import datetime
import requests
urllib3.disable_warnings()
# https://docs.python.org/3/library/itertools.html#itertools.batched
def batched(iterable, n, *, strict=False):
# batched('ABCDEFG', 3) → ABC DEF G
if n < 1:
raise ValueError('n must be at least one')
iterator = iter(iterable)
while batch := tuple(itertools.islice(iterator, n)):
if strict and len(batch) != n:
raise ValueError('batched(): incomplete batch')
yield batch
if __name__ == "__main__":
session = requests.Session()
# Fetch all released structures (excluding obsolete entries)
all_entries_q = {
"query": {
"type": "terminal",
"service": "text",
"parameters": {
"operator": "exists",
"attribute": "exptl.method"
}
},
"request_options": {
"return_all_hits": True
},
"return_type": "entry"
}
print("Grabbing all released PDB IDs")
search_url = "https://search.rcsb.org/rcsbsearch/v2/query"
response = requests.post(
search_url,
headers={"Content-Type": "application/json"},
json=all_entries_q,
verify=True,
)
assert response.status_code == 200, \
f"request failed with error {response.status_code}"
results = response.json()
n_expected_entries = results["total_count"]
entries = [d["identifier"] for d in results["result_set"]]
assert n_expected_entries == len(entries)
# avoid abusing RCSB Data API; limit of 15k per query
data_url = "https://data.rcsb.org/graphql"
graphql_data_q = """
{{
entries(entry_ids: [{pdb_id_list}]) {{
rcsb_entry_container_identifiers {{
entry_id
}}
rcsb_accession_info {{
initial_release_date
}}
}}
}}
"""
release_dates_per_entry = []
batch_size = 14_000
print("Querying release dates")
for ibatch, batch in enumerate(batched(entries, n=batch_size), start=1):
pdb_id_list = ", ".join(f'"{pdb_id}"' for pdb_id in batch)
q = graphql_data_q.format(pdb_id_list=pdb_id_list)
response = session.post(
data_url,
headers={"Content-Type": "application/json"},
json={"query": q},
verify=True
)
assert response.status_code == 200
data = response.json()
for d in data["data"]["entries"]:
entry_id = d["rcsb_entry_container_identifiers"]["entry_id"]
isodate = d["rcsb_accession_info"]["initial_release_date"]
date_ymd = datetime.fromisoformat(isodate).strftime("%Y/%m/%d")
assert entry_id not in release_dates_per_entry, \
f"duplicate entry: {entry_id}"
entry_d = {
"pdb_id": entry_id,
"release_date": date_ymd
}
release_dates_per_entry.append(entry_d)
print(f"Queried {int(ibatch * batch_size)} entries")
with open("rcsb_release_dates.csv", "w") as handle:
headers = list(release_dates_per_entry[-1].keys())
writer = csv.DictWriter(handle, fieldnames=headers)
writer.writeheader()
writer.writerows(release_dates_per_entry)
print("Wrote release dates to rcsb_release_dates.csv")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment