Skip to content

Instantly share code, notes, and snippets.

@ciceroverneck
Created January 24, 2012 15:48
Show Gist options
  • Save ciceroverneck/1670790 to your computer and use it in GitHub Desktop.
Save ciceroverneck/1670790 to your computer and use it in GitHub Desktop.
Async Decorator
import threading
class _TimeoutError(RuntimeError):
pass
class _AsyncCall(object):
def __init__(self, fnc, callback = None):
self.Callable = fnc
self.Callback = callback
def __call__(self, *args, **kwargs):
self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs)
self.Thread.start()
return self
def wait(self, timeout = None):
self.Thread.join(timeout)
if self.Thread.isAlive():
raise _TimeoutError()
else:
return self.Result
def run(self, *args, **kwargs):
self.Result = self.Callable(*args, **kwargs)
if self.Callback:
self.Callback(self.Result)
class _AsyncMethod(object):
def __init__(self, fnc, callback=None):
self.Callable = fnc
self.Callback = callback
def __call__(self, *args, **kwargs):
return _AsyncCall(self.Callable, self.Callback)(*args, **kwargs)
def Async(fnc = None, callback = None):
if fnc == None:
def AddAsyncCallback(fnc):
return _AsyncMethod(fnc, callback)
return AddAsyncCallback
else:
return _AsyncMethod(fnc, callback)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment