Created
July 11, 2019 08:52
-
-
Save ReallyLiri/54612a92fb7613fa316b5af7fd5f7679 to your computer and use it in GitHub Desktop.
Simply map-reduce with multiprocessing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import pandas as pd | |
import queue | |
from multiprocessing import Process, Queue | |
from time import sleep | |
DEFAULT_TIMEOUT_SEC = 60*10 | |
def _chunks(l, n): | |
"""Yield successive n chunks from l.""" | |
return (l[i::n] for i in range(n)) | |
def _worker_job(inputs, output_queue, inner_worker): | |
try: | |
logging.info("Worker started") | |
result = inner_worker(inputs) | |
except: | |
output_queue.put(None) | |
logging.exception("worker failed") | |
else: | |
output_queue.put(result) | |
logging.info("Worker completed") | |
def map_reduce(workers_count, inputs, mapper, reducer, timeout_sec=None): | |
""" | |
Forks <workers_count> workers to process inputs using | |
mapper function: accepts as an input a list of inputs (portion of <inputs>) | |
reducer function: reduces the outputs of mapper | |
Returns the mapped-reduced result | |
""" | |
logging.info("Map-Reducing %d inputs using %d workers", len(inputs), workers_count) | |
divided_inputs = list(_chunks(inputs, workers_count)) | |
processes = [] | |
output_queue = Queue() | |
if not timeout_sec: | |
timeout_sec = DEFAULT_TIMEOUT_SEC | |
for i, inputs_ration in enumerate(divided_inputs): | |
process = Process(target=_worker_job, args=(inputs_ration, output_queue, mapper)) | |
processes.append(process) | |
process.start() | |
final_result = None | |
for i in range(workers_count): | |
while True: | |
try: | |
single_result = output_queue.get(timeout=timeout_sec) | |
break | |
except queue.Empty: | |
sleep(5) | |
if single_result is None: | |
logging.warning("No result from worker %d", i) | |
continue | |
if final_result is None: | |
final_result = single_result | |
else: | |
final_result = reducer(final_result, single_result) | |
for process in processes: | |
process.join() | |
return final_result | |
def map_reduce_dataframes(workers_count, inputs, df_from_inputs_mapper, output_file_path=None, timeout_sec=None): | |
final_df = map_reduce( | |
workers_count, inputs, | |
mapper=df_from_inputs_mapper, | |
reducer=lambda df1, df2: pd.concat([df1, df2]), | |
timeout_sec=timeout_sec | |
) | |
if output_file_path: | |
final_df.to_excel(output_file_path, encoding='utf-8') | |
logging.info("Completed writing %d lines to %s", final_df.shape[0], output_file_path) | |
else: | |
logging.info("finish map-reduce df: {}".format(final_df.shape[0])) | |
return final_df |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment