Last active
December 2, 2015 15:20
-
-
Save gerhc/ebe0b6bc51821bd8e6d5 to your computer and use it in GitHub Desktop.
fetch and clean descriptions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
import justext | |
import re | |
import redis | |
import requests | |
from queue import Queue as ThreadQueue | |
from threading import Thread | |
from multiprocessing import Process, Queue | |
class DescriptionThreadWorker(Thread): | |
def __init__(self, process_num, thread_num, queue, redis_sock): | |
self.queue = queue | |
self.retries = 3 | |
self.process_num = process_num | |
self.thread_num = thread_num | |
self.r = redis.Redis(unix_socket_path=redis_sock) | |
self.tokenizer = re.compile(r'\w+', flags=re.UNICODE | re.MULTILINE | re.DOTALL | re.I) | |
super(DescriptionThreadWorker, self).__init__() | |
def _clean(self, dirty): | |
if dirty is None or len(dirty) == 0: | |
return [] | |
paragraphs = justext.justext(dirty, justext.get_stoplist('Spanish')) | |
def valid(paragraph): | |
if not paragraph.is_boilerplate: | |
return True | |
return len(self.tokenizer.findall(paragraph.text)) > 4 | |
def to_text(paragraph): | |
return paragraph.text | |
return [self.tokenizer.findall(to_text(paragraph)) for paragraph in paragraphs if valid(paragraph)] | |
def _fetch_description(self, item_id): | |
retry = 0 | |
while retry < self.retries: | |
try: | |
retry += 1 | |
url = 'https://api.xxxxxxxxx.com/items/{}/description'.format(item_id) | |
r = requests.get(url) | |
if r.ok: | |
json_rsp = r.json() | |
return self._clean(json_rsp['text']), self._clean(json_rsp['plain_text']) | |
except Exception as e: | |
pass | |
return [], [] | |
def _hash(self, val): | |
return hashlib.sha224(val.encode('utf-8')).hexdigest() | |
def _save(self, item_id, descriptions): | |
if len(descriptions) == 0: | |
return | |
site_id = item_id[:3] | |
try: | |
mset_map = {} | |
for description in descriptions: | |
mset_map[site_id + self._hash(''.join(description))] = description | |
self.r.mset(mset_map) | |
except: | |
# ATM we don't care if it fails to save | |
pass | |
def run(self): | |
print('Starting Thread {}-{}'.format(self.process_num, self.thread_num)) | |
total_items = 0 | |
for item_id in iter(self.queue.get, None): | |
try: | |
if item_id == 'TIME_TO_DIE': | |
return | |
if item_id is None or len(item_id) == 0: | |
continue | |
print('Fetching {}'.format(item_id)) | |
text, plain_text = self._fetch_description(item_id) | |
descriptions = text + plain_text | |
self._save(item_id, descriptions) | |
total_items += 1 | |
if total_items % 100 == 0: | |
print('P-{}-{} processed: {}'.format(self.process_num, self.thread_num, total_items)) | |
except Exception as e: | |
print('Error fetching: {}'.format(e)) | |
class DescriptionProcessWorker(Process): | |
def __init__(self, process_num, q, redis_sock, num_threads=100): | |
self.q = q | |
self.process_num = process_num | |
self.redis_sock = redis_sock | |
self.num_threads = num_threads | |
super(DescriptionProcessWorker, self).__init__() | |
def run(self): | |
queue = ThreadQueue() | |
threads = [] | |
for i in range(self.num_threads): | |
t = DescriptionThreadWorker(self.process_num, i, queue, self.redis_sock) | |
threads.append(t) | |
t.start() | |
for item_id in iter(self.q.get, None): | |
try: | |
if item_id == 'TIME_TO_DIE': | |
# Kill all | |
for _ in range(self.num_threads): | |
queue.put(item_id) | |
return | |
if item_id is None or len(item_id) == 0: | |
continue | |
queue.put(item_id) | |
except Exception as e: | |
print('Error delegating: {}'.format(e)) | |
def read_item_ids(filename): | |
with open(filename, 'r') as f: | |
for line in f: | |
yield line.strip() | |
if __name__ == '__main__': | |
request_queue = Queue() | |
redis_sock = '/tmp/redis.sock' | |
jobs = [] | |
workers = 4 | |
for i in range(workers): | |
p = DescriptionProcessWorker(i, request_queue, redis_sock) | |
jobs.append(p) | |
p.start() | |
for item_id in read_item_ids('items.csv'): | |
request_queue.put(item_id) | |
for _ in range(workers): | |
request_queue.put('TIME_TO_DIE') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment