Skip to content

Instantly share code, notes, and snippets.

@J3ronimo
Last active September 26, 2024 05:04
Show Gist options
  • Save J3ronimo/b4f9b48b5dec7ea1227f167d22067494 to your computer and use it in GitHub Desktop.
Save J3ronimo/b4f9b48b5dec7ea1227f167d22067494 to your computer and use it in GitHub Desktop.
Interruptable threading.Thread, based on https://gist.github.com/liuw/2407154
import threading
import ctypes
import time
import atexit
class InterruptableThread(threading.Thread):
""" Interruptable threading.Thread, based on:
https://gist.github.com/liuw/2407154
Calling this thread's interrupt() method from the main thread will raise
an Exception inside target() or run(), which will stop the execution.
Blocking I/O or calls to time.sleep are not interruptable this way and will
delay the stopping of the Thread. Use self.sleep as a workaround.
C extension functions inside run() may release the GIL, which goes along with
calling Py_END_ALLOW_THREADS and thereby PyEval_RestoreThread before returning to
the Python side. This will remove the exception state set via interrupt(). Same
goes for calling PyErr_Clear.
An interrupt() will have no effect in this case. Calling it multipe times consecutively
may work, but falling back to a classical "_stopped" flag solution in the implementation
of run() might be the only safe fallback. """
# Internal exception type, raised on self.interrupt(), suppressed in self.run()
class ThreadInterrupt(BaseException): pass
def _suppress_threadinterrupts(self, run):
""" Decorator that swallows ThreadInterrupts silently. """
def wrapped_func():
try:
run()
except self.ThreadInterrupt:
pass
except Exception as err:
# anything reraised from ThreadInterrupt also counts
if isinstance(err.__cause__, self.ThreadInterrupt):
pass
else:
raise
finally:
atexit.unregister(self.interrupt) # releases all references to self in atexit
self.finalize()
return wrapped_func
def start(self):
""" Thread.start may only get called once, so we can savely decorate self.run here
without having to expect this will ever be done twice. """
self.run = self._suppress_threadinterrupts(self.run)
# extra atextit watcher to prevent open threads from processes hanging after close
atexit.register(self.interrupt)
super().start()
def sleep(self, seconds, interval=0.1):
""" time.sleep is not interruptable when called from a Thread in Windows.
Therefore, break it up in smaller blocks after which exception handling can happen. """
t_end = time.monotonic() + seconds
while t_end - time.monotonic() > interval:
time.sleep(interval)
time.sleep(max(t_end - time.monotonic(), 0))
def interrupt(self):
""" Inject a ThreadInterrupt into this thread to stop self.run(). """
assert self.ident, "Thread has not been started yet."
if not self.is_alive():
return False # NOOP
tid = ctypes.c_long(self.ident) # TODO: use unsigned long for CPython >= 3.7
exc = ctypes.py_object(self.ThreadInterrupt) # BaseException subclass, not instance!
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, exc) # returns the number of thread states modified
if ret == 0:
raise ValueError("Invalid thread ID")
else:
return True
def finalize(self):
""" Called after thread ends or gets interrupted. """
pass
def test_interruptable_thread():
class TestThread(InterruptableThread):
def run(self):
print("Thread starts")
self.sleep(5)
print("Thread ends regularly")
def finalize(self):
print("Thread is finalized")
def interrupt(self):
print("Thread is interrupted")
super().interrupt()
t = TestThread()
t.start()
time.sleep(1)
t.interrupt() # comment this out for regular thread end
if __name__ == "__main__":
test_interruptable_thread()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment