Last active
November 10, 2022 10:22
-
-
Save pankajthekush/9fcaca4847124a89b42d4be5d7a2c344 to your computer and use it in GitHub Desktop.
Run a method for n seconds
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class FuncTimer(): | |
def __init__(self,fn,args,runtime): | |
self.fn = fn | |
self.args = args | |
self.queue = multiprocessing.Queue() | |
self.runtime = runtime | |
self.process = multiprocessing.Process(target=self.thread_caller) | |
def thread_caller(self): | |
with ThreadPoolExecutor() as executor: | |
future = executor.submit(self.fn, *self.args) | |
self.queue.put(future.result()) | |
def __enter__(self): | |
return self | |
def start_run(self): | |
self.process.start() | |
self.process.join(timeout=self.runtime) | |
# wait for runtime then get qsize | |
if self.queue.qsize(): | |
try: | |
out_res = self.queue.get(timeout=10) | |
except Exception: | |
out_res = None | |
else: | |
out_res = None | |
return out_res | |
def __exit__(self, exc_type, exc_value, exc_traceback): | |
if self.process.is_alive(): | |
self.process.kill() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# calling class with very less runtime, it should be killed and return none | |
>>> from test import FuncTimer,worker_1 | |
>>> with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 5) as fp: | |
... res = fp.start_run() | |
... print(res) | |
... | |
very time consuming sleep | |
very time consuming sleep | |
very time consuming sleep | |
very time consuming sleep | |
very time consuming sleep | |
None | |
# calling class with increased runtime it , should return the calculation | |
>>> from test import FuncTimer,worker_1 | |
>>> with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 15) as fp: | |
... res = fp.start_run() | |
... print(res) | |
... | |
very time consuming sleep | |
very time consuming sleep | |
very time consuming sleep | |
very time consuming sleep | |
very time consuming sleep | |
very time consuming sleep | |
very time consuming sleep | |
very time consuming sleep | |
very time consuming sleep | |
very time consuming sleep | |
6 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment