Last active
September 23, 2020 10:31
-
-
Save serpent213/a8612c49babccb4108e7630b64ad4fa4 to your computer and use it in GitHub Desktop.
Ray interval playground
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import logging | |
import ray | |
import time | |
@ray.remote | |
class Worker(): | |
def __init__(self): | |
self.logger = logging.getLogger(__name__) | |
self.counter = 0 | |
self.logger.warning("init") | |
def cycle(self): | |
self.logger.warning(f"counter is {self.counter}") | |
self.counter += 1 | |
@ray.remote | |
class Interval(): | |
""" | |
based on https://phoolish-philomath.com/asynchronous-task-scheduling-in-python.html | |
""" | |
async def run_periodically(self, wait_time, func, *args): | |
""" | |
Helper for schedule_task_periodically. | |
Wraps a function in a coroutine that will run the | |
given function indefinitely | |
:param wait_time: seconds to wait between iterations of func | |
:param func: the function that will be run | |
:param args: any args that need to be provided to func | |
""" | |
while True: | |
func(*args) | |
await asyncio.sleep(wait_time) | |
def schedule_task_periodically(self, wait_time, func, *args): | |
""" | |
Schedule a function to run periodically as an asyncio.Task | |
:param wait_time: interval (in seconds) | |
:param func: the function that will be run | |
:param args: any args needed to be provided to func | |
:return: an asyncio Task that has been scheduled to run | |
""" | |
self.task = asyncio.create_task( | |
self.run_periodically(wait_time, func, *args)) | |
async def cancel_scheduled_task(self, task): | |
""" | |
Gracefully cancels a task | |
:type task: asyncio.Task | |
""" | |
self.task.cancel() | |
try: | |
await self.task | |
except asyncio.CancelledError: | |
pass | |
if __name__ == "__main__": | |
ray.init(address="auto", _redis_password="5241590000000000") | |
interval = Interval.options(lifetime="detached", name="interval").remote() | |
worker = Worker.remote() | |
interval.schedule_task_periodically.remote(2, worker.cycle.remote) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment