Created
December 11, 2011 21:51
-
-
Save agrif/1462964 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
import itertools | |
import Queue | |
from signals import Signal | |
## | |
## first, some generic interfaces | |
## | |
class JobProvider(object): | |
"""An interface that can be implemented to provide jobs to a | |
JobManager implementation.""" | |
def get_jobs(self): | |
"""Returns an iterator of job objects, that are used as | |
arguments to do_job.""" | |
raise NotImplementedError("get_jobs") | |
def do_job(self, job): | |
"""Does whatever work is associated with the given job.""" | |
raise NotImplementedError("do_job") | |
class JobManager(object): | |
"""A generic interface for distributing jobs to worker processes | |
(or elsewhere). It is also responsible for intercepting and | |
relaying signals back to the Python instance where do_all_jobs was | |
called.""" | |
def do_all_jobs(self, job_providers): | |
"""Run all the jobs provided by the given list of | |
providers.""" | |
raise NotImplementedError("do_all_jobs") | |
## | |
## now a specific JobManager that uses multiprocessing | |
## | |
# worker process class | |
class MultiprocessingJobManagerProcess(multiprocessing.Process): | |
def __init__(self, job_providers, job_queue, result_queue, signal_queue): | |
super(MultiprocessingJobManagerProcess, self).__init__() | |
self.providers = job_providers | |
self.job_queue = job_queue | |
self.result_queue = result_queue | |
self.signal_queue = signal_queue | |
def run(self): | |
# register for all available signals | |
def register_signal(name, sig): | |
def handler(*args, **kwargs): | |
self.signal_queue.put((name, args, kwargs), False) | |
sig.set_interceptor(handler) | |
for name, sig in Signal.signals.iteritems(): | |
register_signal(name, sig) | |
# job loop | |
while 1: | |
try: | |
i, job = self.job_queue.get(False) | |
result = self.providers[i].do_job(job) | |
self.result_queue.put(result, False) | |
except Queue.Empty: | |
pass | |
class MultiprocessingJobManager(JobManager): | |
def __init__(self, pool_size=0): | |
if pool_size <= 0: | |
pool_size = multiprocessing.cpu_count() | |
self.pool_size = pool_size | |
def _handle_messages(self, result_queue, signal_queue, timeout=0.0): | |
# figure out how many jobs have finished | |
finished_jobs = 0 | |
result_empty = False | |
signal_empty = False | |
while not (result_empty and signal_empty): | |
if not result_empty: | |
try: | |
if timeout > 0.0: | |
result = result_queue.get(True, timeout) | |
# timeout should only apply once | |
timeout = 0.0 | |
else: | |
result = result_queue.get(False) | |
finished_jobs += 1 | |
except Queue.Empty: | |
result_empty = True | |
if not signal_empty: | |
try: | |
name, args, kwargs = signal_queue.get(False) | |
sig = Signal.signals[name] | |
sig.emit_intercepted(*args, **kwargs) | |
except Queue.Empty: | |
signal_empty = True | |
return finished_jobs | |
def do_all_jobs(self, job_providers): | |
# make sure job_providers is a list | |
try: | |
job_providers = list(job_providers) | |
except TypeError: | |
job_providers = [job_providers] | |
# create the job queue, result queue and signal queue | |
job_queue = multiprocessing.Queue() | |
result_queue = multiprocessing.Queue() | |
signal_queue = multiprocessing.Queue() | |
# create the process pool | |
pool = [] | |
for i in range(self.pool_size): | |
proc = MultiprocessingJobManagerProcess(job_providers, job_queue, result_queue, signal_queue) | |
pool.append(proc) | |
for p in pool: | |
p.start() | |
# distribute work, shuffling amongst available providers | |
outstanding_jobs = 0 | |
provider_jobs = map(lambda p: p.get_jobs(), job_providers) | |
for joblist in itertools.izip_longest(*provider_jobs): | |
for i, job in enumerate(joblist): | |
if not job is None: | |
job_queue.put((i, job), False) | |
outstanding_jobs += 1 | |
while outstanding_jobs > self.pool_size * 10: | |
outstanding_jobs -= self._handle_messages(result_queue, signal_queue, timeout=1) | |
# finish all jobs and terminate the processes | |
while outstanding_jobs > 0: | |
outstanding_jobs -= self._handle_messages(result_queue, signal_queue, timeout=1) | |
for p in pool: | |
p.terminate() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## | |
## a way to create named "signals" that, when emitted, call a set | |
## of registered functions | |
## | |
class Signal(object): | |
"""A mechanism for registering functions to be called whenever | |
some specified event happens. This object is designed to work with | |
JobManager so that functions can register to always run in the | |
main Python instance.""" | |
# a global list of registered signals, indexed by name | |
# this is used by JobManagers to register and relay signals | |
signals = {} | |
def __init__(self, namespace, name): | |
"""Creates a signal. Namespace and name should be the name of | |
the class this signal is for, and the name of the signal. They | |
are used to create a globally-unique name.""" | |
self.namespace = namespace | |
self.name = name | |
self.fullname = namespace + '.' + name | |
self.interceptor = None | |
self.local_functions = [] | |
self.functions = [] | |
# register this signal | |
self.signals[self.fullname] = self | |
def register(self, func): | |
"""Register a function to be called when this signal is | |
emitted. Functions registered in this way will always run in | |
the main Python instance.""" | |
self.functions.append(func) | |
return func | |
def register_local(self, func): | |
"""Register a function to be called when this signal is | |
emitted. Functions registered in this way will always run in | |
the Python instance in which they were emitted.""" | |
self.local_functions.append(func) | |
return func | |
def set_interceptor(self, func): | |
"""Sets an interceptor function. This function is called | |
instead of all the non-locally registered functions if it is | |
present, and should be used by JobManagers to intercept signal | |
emissions.""" | |
self.interceptor = func | |
def emit(self, *args, **kwargs): | |
"""Emits the signal with the given arguments.""" | |
for func in self.local_functions: | |
func(*args, **kwargs) | |
if self.interceptor: | |
self.interceptor(*args, **kwargs) | |
return | |
for func in self.functions: | |
func(*args, **kwargs) | |
def emit_intercepted(self, *args, **kwargs): | |
"""Re-emits an intercepted signal, and finishes the work that | |
would have been done during the original emission. This should | |
be used by JobManagers to re-emit signals intercepted in | |
worker Python instances.""" | |
for func in self.functions: | |
func(*args, **kwargs) | |
# convenience | |
def __call__(self, *args, **kwargs): | |
self.emit(*args, **kwargs) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
import string | |
import jobs | |
import signals | |
## | |
## a test job provider | |
## | |
def md5(s): | |
return hashlib.md5(s).hexdigest() | |
class HashChecker(jobs.JobProvider): | |
on_find = signals.Signal("HashChecker", "on_find") | |
def __init__(self, length, letters=string.ascii_lowercase, hasher=md5, target="000000"): | |
super(HashChecker, self).__init__() | |
self.length = length | |
self.letters = letters | |
self.num_letters = len(letters) | |
self.hasher = hasher | |
self.target = target | |
def get_jobs(self): | |
batch_size = 32 ** 4 | |
space_size = self.num_letters ** self.length | |
batches = space_size // batch_size | |
i = 0 | |
while i < batches: | |
yield (batch_size * i, min(batch_size * (i + 1), space_size)) | |
i += 1 | |
def do_job(self, min_max): | |
for i in xrange(*min_max): | |
source_letters = [] | |
while i != 0: | |
source_letters.append(i % self.num_letters) | |
i = i // self.num_letters | |
while len(source_letters) < self.length: | |
source_letters.append(0) | |
source = ''.join(map(lambda i: self.letters[i], source_letters)) | |
self.check_hash(source) | |
def check_hash(self, src): | |
hash = self.hasher(src) | |
if hash.startswith(self.target): | |
self.on_find(src, hash) | |
@HashChecker.on_find.register | |
def on_find_listener(src, hash): | |
print "found", repr(src), "hashes to", hash | |
if __name__ == "__main__": | |
manager = jobs.MultiprocessingJobManager(3) | |
provider1 = HashChecker(5, letters=string.ascii_letters + string.digits + ' ', target="deadbeaf") | |
manager.do_all_jobs([provider1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Flow chart detailing how signals work: http://i.imgur.com/DoSs2.png
One other thing: the
@signal
decorator essentially turns a method into a Signal object. So,defines a class
A
with signalA.do_stuff
, and the default handler returns the string "stuff". This is a syntax I liked, but the whole thing can be rewritten without default handlers or the decorator like so:If you go this way though, you lose one really neat side-effect of the decorator: calling
self.on_test()
automatically emits the "on_test" signal withself
as the first argument.