Created
May 6, 2020 18:05
-
-
Save habibutsu/bb7ebe053b37ba5c131229ac2a187116 to your computer and use it in GitHub Desktop.
Dask - distributed computation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
import sys | |
import socket | |
import time | |
from contextlib import closing | |
try: | |
from contextlib import asynccontextmanager | |
except ImportError: | |
# for python 3.6 | |
from async_generator import asynccontextmanager | |
try: | |
from contextlib import AsyncExitStack | |
except ImportError: | |
# for python 3.6 | |
from async_exit_stack import AsyncExitStack | |
from dask.distributed import ( | |
Client, | |
Variable | |
) | |
logging.basicConfig( | |
level=logging.DEBUG | |
) | |
logger = logging.getLogger() | |
def find_free_port(): | |
with closing( | |
socket.socket( | |
socket.AF_INET, socket.SOCK_STREAM | |
) | |
) as s: | |
s.bind(('', 0)) | |
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
return s.getsockname()[1] | |
@asynccontextmanager | |
async def run_process(command, args): | |
async def _stream_logging(stream): | |
while True: | |
print( | |
(await stream.readline()).decode().strip() | |
) | |
proc = await asyncio.create_subprocess_exec( | |
command, | |
*args, | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE | |
) | |
future = asyncio.ensure_future(_stream_logging(proc.stderr)) | |
# waiting of starting | |
await asyncio.sleep(2) | |
if proc.returncode is not None: | |
raise Exception( | |
'could not start %s with %s', command, args | |
) | |
logger.info('%s started with args %s', command, args) | |
try: | |
yield proc | |
finally: | |
future.cancel() | |
proc.terminate() | |
await proc.wait() | |
@asynccontextmanager | |
async def dask_scheduler(port): | |
async with run_process( | |
sys.executable, | |
[ | |
"-m", | |
"distributed.cli.dask_scheduler", | |
"--host=0.0.0.0", | |
f"--port={port}", | |
# "--no-dashboard", | |
"--dashboard-address=0.0.0.0:8000" | |
] | |
) as proc: | |
yield proc | |
@asynccontextmanager | |
async def dask_worker(port, *resources): | |
async with run_process( | |
sys.executable, | |
[ | |
"-m", | |
"distributed.cli.dask_worker", | |
f"127.0.0.1:{port}", | |
"--nthreads=1", | |
"--nprocs=1", | |
] + ( | |
["--resources", *resources] if resources else [] | |
) | |
) as proc: | |
yield proc | |
def long_running_task(stop): | |
logging.basicConfig( | |
level=logging.DEBUG | |
) | |
wlogger = logging.getLogger() | |
counter = 0 | |
while not stop.get() and counter < 25: | |
time.sleep(0.2) | |
wlogger.info('tick %s', counter) | |
counter += 1 | |
return counter | |
async def main(): | |
PORT = find_free_port() | |
async with AsyncExitStack() as stack: | |
await stack.enter_async_context( | |
dask_scheduler(PORT)) | |
await stack.enter_async_context( | |
dask_worker(PORT)) | |
client = await Client( | |
f'127.0.0.1:{PORT}', | |
asynchronous=True | |
) | |
logger.info('scheduler_info %s', client.scheduler_info) | |
stop = Variable('stop', client) | |
await stop.set(False) | |
logger.info('client created %s', client) | |
future = client.submit(long_running_task, stop) | |
logger.info('task was scheduled, future=%s', future) | |
await asyncio.sleep(2) | |
await stop.set(True) | |
await future.cancel(force=True) | |
logger.info('cancel future=%s', future) | |
await asyncio.sleep(3) | |
await client.close() | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment