Created
July 12, 2023 20:44
-
-
Save svenk/56aeebc4309dd91b43ee9cc302361906 to your computer and use it in GitHub Desktop.
Recursive Ref Resolver for OpenAPI
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# somewhat related to https://github.com/openapi-generators/openapi-python-client/issues/466 | |
from openapi3 import OpenAPI | |
import yaml | |
import posixpath, re | |
import requests | |
from pathlib import Path | |
# download from https://dev.crewmeister.com | |
with open('crewmeister.yaml') as f: | |
spec = yaml.safe_load(f.read()) | |
cur_url = "https://dev.crewmeister.com/crewmeister.yaml" | |
# poor man's URI manipulations | |
remove_path = lambda url: re.sub("#.+","",url) | |
urlbase = lambda url: posixpath.dirname(remove_path(url)) + "/" | |
# poor mans query engine | |
def lookup(dct, query): | |
# for the moment, we only support direct first level lookups, no nesting in queries. | |
# That is, something like "/foo" and not "/foo/bar/baz". | |
query = query.replace("~1", "/") # whatever encoding | |
if not query or query == "/" or query == "": | |
return dct | |
elif query[1:] in dct: | |
return dct[ query[1:] ] # stripping "/" | |
else: | |
raise ValueError(f"Cannot find '{query}' in given document.") | |
class YamlPathLoader: | |
ORIGIN_URL = "# ORIGIN_URL = " | |
origin_obtainer = lambda text: re.match("^"+ORIGIN_URL+"(.+)\n", text)[1] | |
def __init__(self): | |
# I use requests session handling and a file cache because while | |
# developing this code I am in a train with bad wifi. | |
self.session = requests.Session() | |
self.file_cache_dir = Path("cache/") | |
self.struct_cache = {} | |
def cache_key(self, url): | |
return re.sub("[^a-zA-Z0-9-.]", "-", url) | |
def load_url(self, url): | |
cache_candidate = Path(self.file_cache_dir / self.cache_key(url)) | |
if url in self.struct_cache: | |
return self.struct_cache[url] | |
elif cache_candidate.exists(): | |
text = cache_candidate.read_text() | |
else: | |
print("Downloading ", url, " to ", cache_candidate) | |
resp = self.session.get(url) | |
if resp.status_code != 200: | |
raise ValueError(f"Error at fetching {url}: Response {resp}") | |
text = resp.content.decode("utf-8") | |
# text = ORIGIN_URL + url + "\n" + text # actually not really neccessary | |
cache_candidate.write_text(text) | |
try: | |
structured = yaml.safe_load(text) | |
except: | |
print("Could not parse YAML at ", url) | |
print("Downloaded content: ", text) | |
self.struct_cache[url] = structured | |
return structured | |
def load_yaml(self, path): | |
# print("Resolving ", path) | |
url_and_suffix = path.split("#", 2) # maxsplit=2 | |
if len(url_and_suffix) < 2: | |
url, query = url_and_suffix[0], "" | |
else: | |
url, query = url_and_suffix | |
document = self.load_url(url) | |
return lookup(document, query) | |
loader = YamlPathLoader() | |
def replace_refs(dct, cur_url, base_doc, tracekey=""): | |
if isinstance(dct, list): | |
return [ replace_refs(v, cur_url, base_doc, tracekey=tracekey+"."+str(i)) for i,v in enumerate(dct) ] | |
if not isinstance(dct, dict): | |
return dct | |
if "$ref" in dct: | |
ref = dct["$ref"] | |
if ref[0] == "#": | |
print(f"{tracekey} @ {cur_url}, {ref} -> LOCAL") | |
# resolve a document-local reference | |
# as above, support only global lookups | |
#loaded_content = lookup(base_doc, ref[1:]) # stripping # | |
#new_base_url = cur_url | |
#new_base_doc = base_doc | |
# as local context can be lost during embedding partial | |
# documents, lookup from the current "full" document | |
# in any case. | |
target_url = cur_url + ref | |
else: | |
target_url = urlbase(cur_url) + dct["$ref"] | |
new_base_url = remove_path(target_url) | |
print(f"{tracekey} @ {cur_url}: {ref} -> {target_url}") | |
print("Replacing URL ", target_url) | |
loaded_content = loader.load_yaml(target_url) | |
new_base_doc = loaded_content | |
ret = replace_refs( | |
loaded_content, | |
cur_url = new_base_url, | |
base_doc = new_base_doc | |
) | |
else: | |
ret = { k: replace_refs(v, cur_url, base_doc, | |
tracekey=tracekey+"/"+k) for k,v in dct.items() } | |
return ret | |
resolved_struct = replace_refs(spec, cur_url=cur_url, base_doc=spec) | |
with open('crewmeister-resolved.yaml', 'w') as f: | |
spec = yaml.dump(resolved_struct, f) | |
# there *is* an error at $refs in subdirectories with non well defined base directories. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment