Skip to content

Instantly share code, notes, and snippets.

@natekupp
Last active May 16, 2019 04:18
Show Gist options
  • Save natekupp/4b8059f7b59f5ed9cfda96390c5c718d to your computer and use it in GitHub Desktop.
Save natekupp/4b8059f7b59f5ed9cfda96390c5c718d to your computer and use it in GitHub Desktop.
Minimal repro example of Dask exception
import dask
import dask.distributed
from tornado import gen
PUBSUB_NAME = 'test'
class Task:
def __init__(self, key, num, upstream_tasks):
self.key = key
self.num = num
self.upstream_tasks = upstream_tasks
def run(self):
for _ in range(self.num):
yield self.num
yield self.key # sentinel
def worker(task, dependencies):
pub = dask.distributed.Pub(PUBSUB_NAME, worker=dask.distributed.get_worker())
for result in task.run():
pub.put(result)
def run():
# Assume this is already topological sorted
tasks = [
Task('a', 1, []),
Task('b', 2, []),
Task('c', 3, []),
Task('d', 4, ['a', 'b', 'c']),
Task('e', 5, ['b', 'c']),
]
with dask.distributed.Client() as client:
sub = dask.distributed.Sub(PUBSUB_NAME, client=client)
futures = []
dependency_dict = {}
task_keys_to_await = set()
for task in tasks:
dependencies = [dependency_dict.get(up) for up in task.upstream_tasks]
future = client.submit(worker, task=task, dependencies=dependencies)
futures.append(future)
dependency_dict[task.key] = future
task_keys_to_await.add(task.key)
while task_keys_to_await:
try:
result = sub.get(timeout=1)
except gen.TimeoutError:
continue
# Stop waiting for this task when we see its key published
if isinstance(result, str):
task_keys_to_await.remove(result)
continue
yield result
if __name__ == "__main__":
results = list(run())
print(results)
@natekupp
Copy link
Author

Produces:

[1, 2, 2, 3, 3, 3, 5, 5, 5, 5, 5, 4, 4, 4, 4]
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x11bb76950>)
Traceback (most recent call last):
  File "/Users/nate/.pyenv/versions/dagster-3.6.8/lib/python3.6/site-packages/tornado/ioloop.py", line 758, in _run_callback
    ret = callback()
  File "/Users/nate/.pyenv/versions/dagster-3.6.8/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
    return fn(*args, **kwargs)
  File "/Users/nate/.pyenv/versions/dagster-3.6.8/lib/python3.6/site-packages/distributed/pubsub.py", line 199, in cleanup
    self.client.scheduler_comm.send(msg)
  File "/Users/nate/.pyenv/versions/dagster-3.6.8/lib/python3.6/site-packages/distributed/batched.py", line 119, in send
    raise CommClosedError
distributed.comm.core.CommClosedError

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment