Last active
September 23, 2019 19:36
-
-
Save niklasf/b9fbe22e207184a23304f676ae7acd13 to your computer and use it in GitHub Desktop.
A ChildWatcher that does not require the asyncio loop to be running in the main thread
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import asyncio | |
import os | |
import sys | |
import threading | |
import time | |
import warnings | |
def setup_loop(): | |
if sys.platform == "win32" or threading.current_thread() == threading.main_thread(): | |
if sys.platform == "win32": | |
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
return loop | |
class PollingChildWatcher(asyncio.SafeChildWatcher): | |
def __init__(self): | |
super().__init__() | |
self._poll_handle = None | |
def attach_loop(self, loop): | |
assert loop is None or isinstance(loop, asyncio.AbstractEventLoop) | |
if self._loop is not None and loop is None and self._callbacks: | |
warnings.warn("A loop is being detached from a child watcher with pending handlers", RuntimeWarning) | |
if self._poll_handle is not None: | |
self._poll_handle.cancel() | |
self._loop = loop | |
if loop is not None: | |
self._poll_handle = self._loop.call_soon(self._poll) | |
# Prevent a race condition in case a child terminated | |
# during the switch. | |
self._do_waitpid_all() | |
def _poll(self): | |
if self._loop: | |
self._do_waitpid_all() | |
self._poll_handle = self._loop.call_later(0.2, self._poll) | |
policy = asyncio.get_event_loop_policy() | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
watcher = PollingChildWatcher() | |
watcher.attach_loop(loop) | |
policy.set_child_watcher(watcher) | |
return loop | |
class ExampleProtocol(asyncio.SubprocessProtocol): | |
def __init__(self): | |
self.returncode = asyncio.get_running_loop().create_future() | |
def connection_made(self, transport): | |
print("Connection made") | |
self.transport = transport | |
def connection_lost(self, exc): | |
print("Connection lost") | |
self.returncode.set_result(self.transport.get_returncode()) | |
def pipe_data_received(self, fd, data): | |
print("{}: {}".format(fd, data)) | |
async def example_coroutine(command): | |
loop = asyncio.get_running_loop() | |
transport, protocol = await loop.subprocess_shell(ExampleProtocol, command) | |
code = await protocol.returncode | |
print("Exited with code", code) | |
def example(): | |
loop = setup_loop() | |
try: | |
loop.run_until_complete(example_coroutine(sys.argv[1])) | |
finally: | |
loop.close() | |
def main(): | |
thread = threading.Thread(target=example) | |
thread.start() | |
print("Running thread ...") | |
thread.join() | |
if __name__ == "__main__": | |
if len(sys.argv) != 2: | |
print("Usage:", sys.argv[0], "<shell command>") | |
sys.exit(128) | |
else: | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment