Last active
December 30, 2015 01:05
-
-
Save basak/c0a8f4f5a51d7fce1cc7 to your computer and use it in GitHub Desktop.
asyncio event and worker pattern thoughts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
class Worker: | |
'''A Worker has background asyncio tasks that are cancelled on __del__''' | |
def __init__(self, loop=None): | |
self._loop = loop if loop else asyncio.get_event_loop() | |
# This set serves to keep incomplete tasks alive as long as this class | |
# instance exists. | |
self._tasks = set() | |
def __del__(self): | |
# As we cancel tasks, they may disappear from the set which | |
# could invalidate the iteration, so take a copy first. | |
tasks = set(self._tasks) | |
for task in tasks: | |
task.cancel() | |
@asyncio.coroutine | |
def _wait_coroutine(self, coroutine): | |
'''Wait for coroutine to finish and then clean up''' | |
yield from coroutine | |
self._tasks.remove(asyncio.Task.current_task()) | |
def create_task(self, coroutine): | |
task = self._loop.create_task(self._wait_coroutine(coroutine)) | |
self._tasks.add(task) | |
return task | |
class _FutureLinkedListItem: | |
def __init__(self): | |
self.future = asyncio.Future() | |
self.next = None | |
class EventQueue: | |
def __init__(self): | |
self._next_future_item = _FutureLinkedListItem() | |
# XXX This is a hack. Is there a better way? | |
# In KeyedEventQueue below, an underlying EventQueue might disappear if | |
# a caller holds on to a future generated by the EventQueue but not to | |
# the EventQueue itself. Then an incoming event will fail to find the | |
# EventQueue, create a new one, and then the original future will never | |
# fire. So as a workaround make sure the future holds on to a reference | |
# to the EventQueue itself, so that the EventQueue is not destroyed | |
# before the future. | |
self._next_future_item.future.__keep = self | |
def __iter__(self): | |
return _EventQueueReader(self) | |
def _get_next_future_item(self, item): | |
if item.next is None: | |
item.next = _FutureLinkedListItem() | |
# XXX same hack as in class constructor | |
item.next.future.__keep = self | |
return item.next | |
def _move_along(self): | |
self._next_future_item = self._get_next_future_item(self._next_future_item) | |
def send_result(self, result): | |
self._next_future_item.future.set_result(result) | |
self._move_along() | |
def send_exception(self, exception): | |
self._next_future_item.future.set_exception(exception) | |
self._move_along() | |
class _EventQueueReader: | |
def __init__(self, parent): | |
self.parent = parent | |
self._next_future_item = parent._next_future_item | |
def _move_along(self): | |
self._next_future_item = self.parent._get_next_future_item(self._next_future_item) | |
def __iter__(self): | |
return self | |
def __next__(self): | |
future = self._next_future_item.future | |
self._move_along() | |
return future | |
class KeyedEventQueue: | |
def __init__(self, parent, key_func, data_func=lambda x:x): | |
self.parent = parent | |
self.key_func = key_func | |
self.data_func = data_func | |
self._queues = weakref.WeakValueDictionary() | |
self._worker = Worker() | |
self._worker.create_task(self._start()) | |
@asyncio.coroutine | |
def _start(self): | |
for future in self.parent: | |
try: | |
result = yield from future | |
except: | |
# Swallow exceptions. Keyed queue readers can never see them | |
# by definition, since we don't know how to filter them. | |
# Listeners interested in exception events need to watch the | |
# master queue. | |
pass | |
data_key = self.key_func(result) | |
data_value = self.data_func(result) | |
self[data_key].send_result(data_value) | |
def __getitem__(self, k): | |
try: | |
return self._queues[k] | |
except KeyError: | |
new_queue = EventQueue() | |
self._queues[k] = new_queue | |
return new_queue |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import asyncio | |
import functools | |
import subprocess | |
import time | |
import util | |
class PowerStateWatcher: | |
def __init__(self, loop=None): | |
self.loop = loop or asyncio.get_event_loop() | |
self.power_events = util.EventQueue() | |
self._worker = util.Worker(loop=loop) | |
self._worker.create_task(self.watch_wakeups()) | |
@asyncio.coroutine | |
def watch_wakeups(self): | |
create = asyncio.create_subprocess_exec('stdbuf', '-o0', 'powerd-cli', 'listen', stdout=subprocess.PIPE) | |
proc = yield from create | |
try: | |
while True: | |
line = yield from proc.stdout.readline() | |
if line == b"Received SysPowerStateChange: state=0\n": | |
self.power_events.send_result(0) | |
elif line == b"Received SysPowerStateChange: state=1\n": | |
self.power_events.send_result(1) | |
finally: | |
proc.kill() | |
class Clock: | |
def __init__(self, period, loop=None): | |
self.loop = loop or asyncio.get_event_loop() | |
self.trigger_events = util.EventQueue() | |
self._worker = util.Worker(loop=loop) | |
self._worker.create_task(self._watch(period)) | |
@asyncio.coroutine | |
def _watch(self, period): | |
watcher = PowerStateWatcher(self.loop) | |
power_events = iter(watcher.power_events) | |
future_power_event = next(power_events) | |
deadline = time.time() + period | |
while True: | |
time_remaining = deadline - time.time() | |
if time_remaining < 0: | |
self.trigger_events.send_result(-time_remaining) | |
deadline = time.time() + period | |
else: | |
yield from asyncio.wait( | |
[future_power_event], | |
timeout=time_remaining, | |
loop=self.loop, | |
) | |
if future_power_event.done(): | |
future_power_event = next(power_events) | |
@asyncio.coroutine | |
def create_periodic_reports(loop): | |
clock = Clock(period=300, loop=loop) | |
for future_trigger in clock.trigger_events: | |
tardiness = yield from future_trigger | |
# DO SOMETHING HERE | |
def main(): | |
loop = asyncio.get_event_loop() | |
loop.create_task(create_periodic_reports(loop)) | |
loop.run_forever() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment