Skip to content

Instantly share code, notes, and snippets.

@stuartcampbell
Created October 19, 2022 20:01
Show Gist options
  • Save stuartcampbell/462bf3a48650c163b8ae103d24196ae5 to your computer and use it in GitHub Desktop.
Save stuartcampbell/462bf3a48650c163b8ae103d24196ae5 to your computer and use it in GitHub Desktop.
Get filenames for a given bluesky scan
def get_scanid_filenames(run):
"""
Get a bluesky scan_id, unique_id, filename by giveing uid
Parameters
----------
run: a BlueskyRun
Returns
-------
scan_id: integer
unique_id: string, a full string of a uid
filename: string
"""
filepaths = []
resources = {} # uid: document
datums = defaultdict(list) # uid: List(document)
for name, doc in run.documents():
if name == "resource":
resources[doc["uid"]] = doc
elif name == "datum":
datums[doc["resource"]].append(doc)
elif name == "datum_page":
for datum in event_model.unpack_datum_page(doc):
datums[datum["resource"]].append(datum)
for resource_uid, resource in resources.items():
file_prefix = Path(resource.get("root", "/"), resource["resource_path"])
if "eiger" not in resource["spec"].lower():
continue
for datum in datums[resource_uid]:
dm_kw = datum["datum_kwargs"]
seq_id = dm_kw["seq_id"]
new_filepaths = glob.glob(f"{file_prefix!s}_{seq_id}*")
filepaths.extend(new_filepaths)
return run.start["scan_id"], run.start["uid"], filepaths
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment