Skip to content

Instantly share code, notes, and snippets.

@devsamuelv
Last active December 5, 2023 02:01
Show Gist options
  • Save devsamuelv/5c7b42a3a157267311e691a10e819908 to your computer and use it in GitHub Desktop.
Save devsamuelv/5c7b42a3a157267311e691a10e819908 to your computer and use it in GitHub Desktop.
from multiprocessing import Process
# A simple process manager to control a single daemon at a time.
class Orchestrator:
def __init__(self) -> None:
self._current_process = None
pass
def schedule(self, thread: Process) -> bool:
if self._current_process is not None:
return False
self._current_process = thread
self._current_process.daemon = True
self._current_process.start()
return True
def get_current_process(self) -> Process | None:
return self._current_process
def kill_process(self):
if self._current_process is not None:
self._current_process.terminate()
self._current_process = None
def thread_join(self):
self._current_process.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment