Created
May 19, 2022 15:28
-
-
Save thehesiod/0d2beed27c800e4f7286be6c3cd17a09 to your computer and use it in GitHub Desktop.
python sync watchdog
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import faulthandler | |
import warnings | |
from pathlib import Path | |
import io | |
from typing import Optional | |
# This is how often we'll trigger our callback to measure blockage | |
_MIN_RESOLUTION = 0.1 | |
LOGGING_FILE_PATH = os.environ.get('LOGGING_FILE_PATH') | |
class WatchDog: | |
def __init__(self, period_s: float = 5, resolution_s: float = 1): | |
""" | |
Create new watchdog instance. | |
:param period_s: this is the maximum number of seconds main thread can block without triggering traceback | |
:param resolution_s: this is how often we'll check for the main thread being blocked | |
""" | |
assert resolution_s >= _MIN_RESOLUTION, f'resolution must be >= {_MIN_RESOLUTION} to avoid high cpu usage' | |
self._period_s = period_s | |
self._resolution_s = resolution_s | |
self._loop = asyncio.get_running_loop() | |
self._watchdog_file: Optional[io.StringIO] = None | |
self._watchdog_task: Optional[asyncio.Task] = None | |
async def _timer_task(self): | |
while True: | |
faulthandler.dump_traceback_later(self._period_s, repeat=True, file=self._watchdog_file) | |
await asyncio.sleep(self._resolution_s) | |
def __enter__(self): | |
if not Path(LOGGING_FILE_PATH).exists(): | |
warnings.warn(f"unable to enable watchdog as {LOGGING_FILE_PATH} does not exist") | |
return self | |
if not self._watchdog_file: | |
self._watchdog_file = open(LOGGING_FILE_PATH, 'w+') | |
if not self._watchdog_task: | |
self._watchdog_task = asyncio.ensure_future(self._timer_task()) | |
return self | |
def __aenter__(self): | |
return self.__enter__() | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
if self._watchdog_task: | |
faulthandler.cancel_dump_traceback_later() | |
self._watchdog_task.cancel() | |
self._watchdog_task = None | |
if self._watchdog_file: | |
self._watchdog_file.close() | |
self._watchdog_file = None | |
def __aexit__(self, exc_type, exc_val, exc_tb): | |
self.__exit__(exc_type, exc_val, exc_tb) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment