Skip to content

Instantly share code, notes, and snippets.

@danielballan
Last active July 22, 2025 15:52
Show Gist options
  • Save danielballan/e8e7124deb47f22cb05aedab86148626 to your computer and use it in GitHub Desktop.
Save danielballan/e8e7124deb47f22cb05aedab86148626 to your computer and use it in GitHub Desktop.

Clone tiled from tiled#999 PR branch and pip install -e ".[all]".

Start Tiled server with Redis cache:

tiled serve catalog --temp --api-key secret --redis-uri redis://localhost:6379

Start consumer:

python test_consumer.py

Start producer:

python test_producer.py
from tiled.client.stream import Subscription
from tiled.client import from_uri
c = from_uri("https://tiled-dev.nsls2.bnl.gov")
target = "pil2M_image"
# c = from_uri("http://localhost:8000", api_key="secret")
# target = "img"
def on_new_run(sub, data):
uid = data["key"]
print(f"New run {uid}")
run_sub = Subscription(c.context, [uid], start=0)
run_sub.add_callback(on_streams_namespace)
run_sub.start()
def on_streams_namespace(sub, data):
streams_sub = Subscription(c.context, sub.segments + ["streams"], start=0)
streams_sub.add_callback(on_new_stream)
streams_sub.start()
def on_new_stream(sub, data):
stream_name = data["key"]
print(f"new stream {stream_name}")
stream_sub = Subscription(c.context, sub.segments + [stream_name], start=0)
stream_sub.add_callback(on_node_in_stream)
stream_sub.start()
def on_node_in_stream(sub, data):
key = data["key"]
if key != target:
return
stream_sub = Subscription(c.context, sub.segments + [key], start=0)
stream_sub.add_callback(load_data)
stream_sub.start()
def load_data(sub, data):
patch = data['patch']
slice_ = tuple(slice(offset, offset + shape) for offset, shape in zip(patch["offset"], patch["shape"])) # GET /array/full/...
node = c['/'.join(sub.segments)] # GET /metadata/... wasteful to do it on each load_data call
print(node.read(slice=slice_)) # could be sub.node.read(...)
catalog_sub = Subscription(c.context)
catalog_sub.add_callback(on_new_run)
catalog_sub.start()
if __name__ == "__main__":
import time
while True:
time.sleep(1)
from bluesky import RunEngine
from bluesky.callbacks.tiled_writer import TiledWriter
from bluesky.plans import count
from ophyd.sim import img
from tiled.client import from_uri
RE = RunEngine()
client = from_uri('http://localhost:8000', api_key='secret')
tw = TiledWriter(client, batch_size=1)
RE.subscribe(tw)
RE(count([img], 10, delay=5))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment