|
|
|
import os |
|
import redis |
|
import json |
|
import business_logic |
|
|
|
# redis docker container linked? |
|
REDIS_HOST = os.getenv('REDIS_PORT_6379_TCP_ADDR', 'localhost') |
|
REDIS_PORT = int(os.getenv('REDIS_PORT_6379_TCP_PORT', 6379)) |
|
|
|
JOBS = 'jobs' |
|
JOBQUEUE = 'jobqueue' |
|
RESULTS = 'results' |
|
NUM_JOBS = 'num_jobs' |
|
|
|
store = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0) |
|
|
|
def get_work_item(): |
|
job_id = store.rpop(JOBQUEUE) |
|
data = store.hget(JOBS, job_id) |
|
job = json.loads(data) |
|
return job |
|
|
|
def incr_field(job, field): |
|
job[field] = job[field] + 1 |
|
store.hset(JOBS, job['job_id'], job) |
|
|
|
def decr_field(job, field): |
|
job[field] = job[field] - 1 |
|
store.hset(JOBS, job['job_id'], job) |
|
|
|
def update_results(job, images, visited): |
|
job_id = job['job_id'] |
|
result_key = '{0}:{1}'.format(RESULTS, job_id) |
|
for img in images: |
|
# do not want duplicates. |
|
if img in visited: |
|
continue |
|
# add it to the results. |
|
visited.add(img) |
|
store.rpush(result_key, img) |
|
|
|
def work_on_one_item(): |
|
job = get_work_item() |
|
incr_field(job, 'inprogress') |
|
|
|
urls = job['urls'][:] |
|
maxlevel = 2 |
|
output = [] |
|
visited = set() |
|
imgvisited = set() |
|
|
|
for curr_level in range(maxlevel): |
|
# check if we are already done |
|
if not urls: |
|
break |
|
|
|
next_urls = [] |
|
for url in urls: |
|
# do not process the same url twice. |
|
if url in visited: |
|
continue |
|
|
|
# mark url as visited and process. |
|
visited.add(url) |
|
links, images = business_logic.process_url(url) |
|
next_urls += links |
|
|
|
# update store with results |
|
update_results(job, images, imgvisited) |
|
|
|
# flip the lists. |
|
urls = next_urls |
|
|
|
incr_field(job, 'completed') |
|
decr_field(job, 'inprogress') |
|
|
|
|
|
if __name__ == '__main__': |
|
work_on_one_item() |