Created
October 8, 2014 13:07
-
-
Save leth/727c43d538768a51c7f8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import weakref | |
import threading | |
from twisted.python.threadpool import ThreadPool | |
from crochet import EventualResult | |
_blocked_pool_threads = weakref.WeakKeyDictionary() | |
_orig_pool_limits = weakref.WeakKeyDictionary() | |
_originals = {} | |
def install_patches(): | |
_originals[(EventualResult, '_result')] = EventualResult._result | |
_originals[(ThreadPool, 'threadFactory')] = ThreadPool.threadFactory | |
EventualResult._result = make_EventualResult_result(EventualResult._result) | |
ThreadPool.threadFactory = threadFactory | |
def uninstall_patches(): | |
for (obj, name), orig in _originals.iteritems(): | |
setattr(obj, name, orig) | |
for pool, (min, max) in _orig_pool_limits.iteritems(): | |
pool.adjustPoolsize(min, max) | |
def threadFactory(pool, *args, **kwargs): | |
return WorkerThread(pool, *args, **kwargs) | |
def make_EventualResult_result(orig): | |
def _EventualResult_result(self, *args, **kwargs): | |
thread = threading.current_thread() | |
is_worker = isinstance(thread, WorkerThread) | |
if is_worker: | |
pool = thread._thread_pool | |
blocked_threads = _blocked_pool_threads[pool] | |
blocked_threads.add(thread) | |
blocked_count = len(blocked_threads) | |
min, max = _orig_pool_limits[pool] | |
pool.adjustPoolsize(min + blocked_count, max + blocked_count) | |
try: | |
return orig(self, *args, **kwargs) | |
finally: | |
if is_worker: | |
blocked_threads.remove(thread) | |
return _EventualResult_result | |
class WorkerThread(threading.Thread): | |
def __init__(self, pool, *args, **kwargs): | |
threading.Thread.__init__(self, *args, **kwargs) | |
self._thread_pool = pool | |
_orig_pool_limits.setdefault(pool, (pool.min, pool.max)) | |
_blocked_pool_threads.setdefault(pool, set()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment