Created
December 24, 2019 18:46
-
-
Save wshayes/d8b73c2c1e03c311066334b75782bcfc to your computer and use it in GitHub Desktop.
[python multiprocessing example] writing to file from a queue #python #multiprocessing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://stackoverflow.com/a/13530258/886938 | |
import multiprocessing as mp | |
import time | |
fn = 'c:/temp/temp.txt' | |
def worker(arg, q): | |
'''stupidly simulates long running process''' | |
start = time.clock() | |
s = 'this is a test' | |
txt = s | |
for i in range(200000): | |
txt += s | |
done = time.clock() - start | |
with open(fn, 'rb') as f: | |
size = len(f.read()) | |
res = 'Process' + str(arg), str(size), done | |
q.put(res) | |
return res | |
def listener(q): | |
'''listens for messages on the q, writes to file. ''' | |
with open(fn, 'w') as f: | |
while 1: | |
m = q.get() | |
if m == 'kill': | |
f.write('killed') | |
break | |
f.write(str(m) + '\n') | |
f.flush() | |
def main(): | |
#must use Manager queue here, or will not work | |
manager = mp.Manager() | |
q = manager.Queue() | |
pool = mp.Pool(mp.cpu_count() + 2) | |
#put listener to work first | |
watcher = pool.apply_async(listener, (q,)) | |
#fire off workers | |
jobs = [] | |
for i in range(80): | |
job = pool.apply_async(worker, (i, q)) | |
jobs.append(job) | |
# collect results from the workers through the pool result queue | |
for job in jobs: | |
job.get() | |
#now we are done, kill the listener | |
q.put('kill') | |
pool.close() | |
pool.join() | |
if __name__ == "__main__": | |
main() |
@gorkem2020 80 is just an arbitrary number of jobs being created. It could have been 10, 100 or 1000 in this example.
The number 2 in line 38 is completely arbitrary. You should test to see how many jobs your CPU, RAM and DiskIO can effectively support and set it based on that. A reasonable starting point used to be cpu_count + 2 when machines only had a few cores. With the 64 core monsters out there now, it's time to think differently :)
I found this in a StackOverflow thread (as shown in the comment at the top of the gist) and captured it here as an example I could refer back to. These would be good questions to add to SO.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
sorry I know this is an old post but where does '80' come from in the range function?
and same thing with 2 in 'pool = mp.Pool(mp.cpu_count() + 2)'
if I had 20 cores, it will be set to 22? what is the logic behind this?
I'd appreciate it if you could spare a minute to explain. thanks!