Last active
February 6, 2021 17:44
-
-
Save PeterMitrano/ea3a89c1766a60400934b32844e48d84 to your computer and use it in GitHub Desktop.
Example of a class with a thread
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import threading | |
import time | |
import weakref | |
class BadRobot: | |
def __init__(self): | |
self.should_disconnect = False | |
self.command_thread = threading.Thread(target=self.command_thread_func) | |
print(f"Starting {self}") | |
self.command_thread.start() | |
self.latest_command = None | |
def __del__(self): | |
# NOTE: this will never be called :( | |
print(f'deleting BadRobot {self}') | |
self.should_disconnect = True | |
def set_command(self, command): | |
self.latest_command = command | |
def command_thread_func(self): | |
while True: | |
print(f"{self} sending self.latest_command to motor controllers...") | |
if self.should_disconnect: | |
break | |
time.sleep(0.1) | |
class Robot: | |
def __init__(self): | |
self.should_disconnect = False | |
# NOTE: The trick is to use the pure function name Robot.threaded_func, which is not already bound to 'self' | |
# and then to pass a weak ref to thread_func instead and | |
self.command_thread = threading.Thread(target=Robot.command_thread_func, args=(weakref.proxy(self),)) | |
print(f"Starting {self}") | |
self.command_thread.start() | |
self.latest_command = None | |
def __del__(self): | |
print(f'deleting Robot {self}') | |
self.should_disconnect = True | |
def set_command(self, command): | |
self.latest_command = command | |
def command_thread_func(self): | |
# NOTE: here self is a weakref, so Robot is eligable for garbage collection | |
# as even if this function is still running. Without the weak ref, | |
# Robot would be kept alive because this function still references 'self' | |
try: | |
while True: | |
print(f"{self} sending self.latest_command to motor controllers...") | |
if self.should_disconnect: | |
break | |
time.sleep(0.1) | |
except ReferenceError: | |
pass | |
def main(): | |
# NOTE: this function creates "scope" for robot, which is necessary for this test | |
robot = Robot() | |
# NOTE: the following would never exit :( | |
#robot = BadRobot() | |
if __name__ == '__main__': | |
main() | |
#print("ok, BadRobot should be dead now.... but it's not.") | |
# NOTE: the following would never exit, because robot is never considered for garbage collection | |
#robot = Robot() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment