Last active
February 24, 2017 21:45
-
-
Save braingineer/35180f30280d75684d81de5ac7714d9d to your computer and use it in GitHub Desktop.
parallel spacy parse for jupyter notebooks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import eidos | |
from toolz import partition | |
import spacy | |
nlp = spacy.load('en') | |
def parse(nlp, input_, n_threads, batch_size): | |
nlp.matcher = None | |
out = [] | |
for doc in nlp.pipe(input_, batch_size=batch_size, n_threads=n_threads): | |
out.append(doc.to_bytes()) | |
return out | |
n_workers, n_threads, spacy_batchsize, worker_batchsize = 5, 2, 500, 5000 | |
factory = eidos.parallel.SpacyWorkerFactory(parse) | |
workers = [next(factory) for _ in range(n_workers)] | |
coop = eidos.parallel.Cooperate(workers, ((part, n_threads, spacy_batchsize) | |
for part in partition(worker_batchsize, raw_data))) | |
results = coop.run() | |
data = [spacy.tokens.doc.Doc(nlp.vocab).from_bytes(doc_bytestring) | |
for worker_group in results | |
for doc_bytestring in worker_group] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
import eidos | |
class Cooperate(multiprocessing.Process): | |
def __init__(self, workers, iterable, use_progress=True): | |
self.job_queue = multiprocessing.Queue() | |
self.solution_queue = multiprocessing.Queue() | |
self.cached = dict(enumerate(iterable)) | |
list(map(self.job_queue.put, self.cached.items())) | |
self.workers = [worker.link(self.job_queue, self.solution_queue, w_i) | |
for w_i, worker in enumerate(workers)] | |
self.total_n = self.job_queue.qsize() | |
if use_progress: | |
self.progress = eidos.ProgressBar(total=self.job_queue.qsize()) | |
def run(self): | |
for w in self.workers: | |
w.start() | |
self.out = [] | |
while self.job_queue.qsize() > 0 or self.solution_queue.qsize() > 0: | |
solution_index, solution = self.solution_queue.get() | |
if self.progress: | |
self.progress.update(1) | |
del self.cached[solution_index] | |
self.out.append(solution) | |
print("I think I'm done;") | |
print("solution queue: {}".format(self.solution_queue.qsize())) | |
print("job queue: {}".format(self.job_queue.qsize())) | |
print("total_n: {}".format(self.total_n)) | |
print("out array size: {}".format(len(self.out))) | |
print("left in cache: {}".format(len(self.cached))) | |
if len(self.out) != self.total_n: | |
print("leftover jobs; sigsegv and annoying bug.") | |
print("safer to quit and let you decide what to do") | |
print("consider calling this.flush() to get remaining") | |
return self.out | |
def flush(self): | |
print('solution_queue: ', self.solution_queue.qsize()) | |
print('cache :', len(self.cached)) | |
while self.solution_queue.qsize() > 0: | |
solution_index, solution = self.solution_queue.get() | |
self.out.append(solution) | |
del self.cached[solution_index] | |
print("Manually running {} left in cache".format(len(self.cached))) | |
for key in list(self.cached.keys()): | |
self.out.append(self.workers[0].handle(self.cached.pop(key))) | |
class QueueWorker(multiprocessing.Process): | |
def __init__(self, func, unpack=False, name="worker"): | |
self.func = func | |
self.unpack = unpack | |
self.linked = False | |
self.name = name | |
super(QueueWorker, self).__init__() | |
def link(self, job_queue, solution_queue, worker_id): | |
self.worker_id = worker_id | |
self.job_queue = job_queue | |
self.solution_queue = solution_queue | |
self.linked = True | |
return self | |
def run(self): | |
assert self.linked, "Workers should be called with job_queue, solution_queue, and worker_id first" | |
while self.job_queue.qsize() > 0: | |
job_index, job = self.job_queue.get() | |
self.solution_queue.put((job_index, self.handle(job))) | |
def handle(self, job): | |
if self.unpack: | |
out = self.func(*job) | |
else: | |
out = self.func(job) | |
return out | |
class SharedStateFactory(object): | |
def __init__(self, name, obj, cls, cls_kwargs): | |
self.obj_name = name | |
self.shared_obj = obj | |
self.cls = cls | |
self.cls_kwargs = cls_kwargs | |
def make(self): | |
out = self.cls(**self.cls_kwargs) | |
out.__dict__[self.obj_name] = self.shared_obj | |
return out | |
def __call__(self): | |
return self.make() | |
class SpacyWorker(QueueWorker): | |
''' | |
import eidos | |
from toolz import partition | |
def parse(nlp, input_, n_threads, batch_size): | |
nlp.matcher = None | |
out = [] | |
for doc in nlp.pipe(input_, batch_size=batch_size, n_threads=n_threads): | |
out.append(doc.to_bytes()) | |
return out | |
n_workers, n_threads, spacy_batchsize, worker_batchsize = 5, 2, 500, 5000 | |
factory = eidos.parallel.SpacyWorkerFactory(parse) | |
workers = [next(factory) for _ in range(n_workers)] | |
coop = eidos.parallel.Cooperate(workers, ((part, n_threads, spacy_batchsize) | |
for part in partition(worker_batchsize, raw_data))) | |
results = coop.run() | |
data = [spacy.tokens.doc.Doc(nlp.vocab).from_bytes(doc_bytestring) | |
for worker_group in results | |
for doc_bytestring in worker_group] | |
''' | |
def link(self, *args, **kwargs): | |
if not hasattr(self, '_nlp'): | |
import spacy | |
self._nlp = spacy.load('en') | |
print("{} has loaded spacy".format(self)) | |
return super(SpacyWorker, self).link(*args, **kwargs) | |
def handle(self, job): | |
if self.unpack: | |
job = tuple([self._nlp] + list(job)) | |
#print("{} args with nlp".format(len(job))) | |
out = self.func(*job) | |
else: | |
out = self.func(self._nlp, job) | |
return out | |
def SpacyWorkerFactory(func, unpack=True, shared_nlp=False, name=None): | |
if name is None: | |
name = "spacy-worker" | |
if shared_nlp: | |
print("sharing nlp object") | |
import spacy | |
nlp = spacy.load('en') | |
factory = SharedStateFactory('_nlp', nlp, SpacyWorker, dict(func=func, unpack=unpack, name=name)) | |
while True: | |
yield factory() | |
else: | |
print("No shared NLP object") | |
while True: | |
yield SpacyWorker(func, unpack, name=name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment