Created
February 26, 2017 21:33
-
-
Save thodnev/d00e5eb406995541eea0b8a118a22602 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''A throughput-limiting message dispatcher for Telegram bots''' | |
import sys | |
import time | |
import threading | |
if sys.version_info.major > 2: | |
import queue as q | |
else: | |
import Queue as q | |
# We need to count < 1s intervals, so the most accurate timer is needed | |
# Starting from Python 3.3 we have time.perf_counter which is the clock | |
# with the highest resolution available to the system, so let's use it there. | |
# In Python 2.7, there's no perf_counter yet, so fallback on what we have: | |
# on Windows, the best available is time.clock while time.time is on | |
# another platforms (M. Lutz, "Learning Python," 4ed, p.630-634) | |
if sys.version_info.major == 3 and sys.version_info.minor >= 3: | |
curtime = time.perf_counter | |
else: | |
curtime = time.clock if sys.platform[:3] == 'win' else time.time | |
class MessageQueueError(RuntimeError): | |
'''Indicates dispatching errors''' | |
pass | |
class MessageQueue(threading.Thread): | |
'''Dispatches callbacks from queue with specified throughput limits. | |
''' | |
_instcnt = 0 # instance counter | |
def __init__(self, queue=None, burst_limit=30, time_limit_ms=1000, | |
exc_route=None, autostart=True, name=None): | |
'''Creates a separate thread to dispatch callbacks with delays. | |
Args: | |
queue (:obj:`queue.Queue`, optional): used to pass callbacks to | |
thread. | |
Creates `queue.Queue` implicitly if not provided. | |
burst_limit (:obj:`int`, optional): numer of maximum callbacks to | |
dispatch per time-window defined by `time_limit_ms`. | |
Defaults to 30. | |
time_limit_ms (:obj:`int`, optional): defines width of time-window | |
in each dispatching limit is calculated. | |
Defaults to 1000. | |
exc_route (:obj:`callable`, optional): a callable, accepting 1 | |
positional argument; used to route exceptions from dispatcher | |
thread to main thread; is called on `Exception` subclass | |
exceptions. | |
If not provided, exceptions are routed through dummy handler, | |
which re-raises them. | |
autostart (:obj:`bool`, optional): if True, dispatcher is started | |
immediately after object's creation; if False, should be | |
started manually by `start` method. | |
Defaults to True. | |
name (:obj:`str`, optional): thread's name. | |
Defaults to ``'MessageQueue-N'``, where N is sequential | |
number of object created. | |
''' | |
self._queue = queue if queue is not None else q.Queue() | |
self.burst_limit = burst_limit | |
self.time_limit = time_limit_ms/1000 | |
self.exc_route = (exc_route if exc_route is not None | |
else self._default_exception_handler) | |
self.__exit_req = False # flag to gently exit thread | |
self.__class__._instcnt += 1 | |
if name is None: | |
name = '%s-%s' % (self.__class__.__name__, self.__class__._instcnt) | |
super(MessageQueue, self).__init__(name=name) | |
self.daemon = False | |
if autostart: # immediately start dispatching | |
super(MessageQueue, self).start() | |
def run(self): | |
'''Do not use the method except for unthreaded testing purposes, | |
the method normally is automatically called by `start` method. | |
''' | |
times = [] # used to store each callable dispatch time | |
while True: | |
item = self._queue.get() | |
if self.__exit_req: | |
return # shutdown thread | |
# delay routine | |
now = curtime() | |
t_delta = now - self.time_limit # calculate early to improve perf. | |
if times and t_delta > times[-1]: | |
# if last call was before the limit time-window | |
# used to impr. perf. in long-interval calls case | |
times = [now] | |
else: | |
# collect last in current limit time-window | |
times = [t for t in times if t >= t_delta] | |
times.append(now) | |
if len(times) >= self.burst_limit: # if throughput limit was hit | |
time.sleep(times[1] - t_delta) | |
# finally dispatch one | |
try: | |
func, args, kwargs = item | |
func(*args, **kwargs) | |
except Exception as exc: # re-route any exceptions | |
self.exc_route(exc) # to prevent thread exit | |
def stop(self, timeout=None): | |
'''Used to gently stop dispatching process and shutdown its thread. | |
Args: | |
timeout (:obj:`float`): indicates maximum time to wait for | |
dispatcher to stop and its thread to exit. | |
If timeout exceeds and dispatcher has not stopped, method | |
silently returns. `is_alive` method could be used afterwards | |
to check the actual status. If `timeout` set to None, blocks | |
until dispatcher is shut down. | |
Defaults to None. | |
Returns: | |
None | |
''' | |
self.__exit_req = True # gently request | |
self._queue.put(None) # put something to unfreeze if frozen | |
super(MessageQueue, self).join(timeout=timeout) | |
@staticmethod | |
def _default_exception_handler(exc): | |
'''Dummy exception handler which re-raises exception in thread. | |
Could be possibly overwritten by subclasses. | |
''' | |
raise exc | |
def __call__(self, func, *args, **kwargs): | |
'''Used to dispatch callbacks in throughput-limiting thread | |
through queue. | |
Args: | |
func (:obj:`callable`): the actual function (or any callable) that | |
is dispatched through queue. | |
*args: variable-length `func` arguments. | |
**kwargs: arbitrary keyword-arguments to `func`. | |
Returns: | |
None | |
''' | |
if not self.is_alive() or self.__exit_req: | |
raise MessageQueueError('Could not dispatch into stopped thread') | |
self._queue.put((func, args, kwargs)) | |
# self-test below | |
if __name__ == '__main__': | |
argv = sys.argv[1:] | |
N = int(argv[1]) if argv else 122 | |
burst_limit = int(argv[2]) if len(argv) > 1 else 30 | |
time_limit_ms = int(argv[3]) if len(argv) > 2 else 1000 | |
margin_ms = int(argv[4]) if len(argv) > 3 else 0 | |
print('Self-test with N = {} msgs, burst_limit = {} msgs, ' | |
'time_limit = {} ms, margin = {} ms' | |
''.format(N, burst_limit, time_limit_ms, margin_ms)) | |
testtimes = [] | |
def testcall(): | |
testtimes.append(curtime()) | |
dsp = MessageQueue(burst_limit=burst_limit, | |
time_limit_ms=time_limit_ms) | |
print('Started dispatcher {}\nStatus: {}' | |
''.format(dsp, ['inactive', 'active'][dsp.is_alive()])) | |
print('Dispatching {} calls @ {}'.format(N, time.asctime())) | |
for i in range(N): | |
dsp(testcall) | |
print('Queue filled, waiting 4 dispatch finish @ ' + str(time.asctime())) | |
while not dsp._queue.empty(): | |
pass | |
dsp.stop() | |
print('Dispatcher ' + ['stopped', '!NOT STOPPED!'][dsp.is_alive()] + | |
' @ ' + str(time.asctime())) | |
print('Calculating call time windows') | |
passes, fails = [], [] | |
for start, stop in enumerate(range(burst_limit+1, len(testtimes))): | |
part = testtimes[start:stop] | |
if (part[-1] - part[0]) >= ((time_limit_ms - margin_ms)/1000): | |
passes.append(part) | |
else: | |
print(start, stop) | |
fails.append(testtimes[start:stop]) | |
print('Tested: {}, Passed: {}, Failed: {}' | |
''.format(len(passes+fails), len(passes), len(fails))) | |
if fails: | |
print('(!) Got mismatches: ' + ';\n'.join(map(str, fails))) | |
else: | |
print('3 parts (for reference)\n' + | |
('-'*80+'\n').join(map(str, passes[:3]))) | |
print('*'*80) | |
print('RESULT: Test ' + ['failed', 'passed'][len(fails) == 0]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment