Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
698 views
in Technique[技术] by (71.8m points)

parallel processing - Why do I have idle workers when using Python multiprocessing pools?

I am breaking a very large text file up into smaller chunks, and performing further processing on the chunks. For this example, let text_chunks be a list of lists, each list containing a section of text. The elements of text_chunks range in length from ~50 to ~15000. The class ProcessedText exists elsewhere in the code and does a large amount of subsequent processing and data classification based on the text fed to it. The different text chunks are processed into ProcessedText instances in parallel using code like the following:

def do_things_to_text(a, b):
    #pull out necessary things for ProcessedText initialization and return an instance
    print('Processing {0}'.format(a))
    return ProcessedText(a, b)

import multiprocessing as mp

#prepare inputs for starmap, pairing with list index so order can be reimposed later
pool_inputs = list(enumerate(text_chunks))

#parallel processing
pool = mp.Pool(processes=8)
results = pool.starmap_async(do_things_to_text, pool_inputs)
output = results.get()

The code executes successfully, but it seems that some of the worker processes created as part of the Pool randomly sit idle while the code runs. I track the memory usage, CPU usage, and status in top while the code executes.

At the beginning all 8 worker processes are engaged (status "R" in top and nonzero CPU usage), after ~20 entries from text_chunks are completed, the worker processes start to vary wildly. At times, as few as 1 worker process is running, and the others are in status "S" with zero CPU usage. I can also see from my printed output statements that do_things_to_text() is being called less frequently. So far I haven't been able to identify why the processes start to idle. There are plenty of entries left to process, so them sitting idle leads to time-inefficiency.

My questions are:

  1. Why are these worker processes sitting idle?
  2. Is there a better way to implement multiprocessing that will prevent this?

EDITED to ADD: I have further characterized the problem. It is clear from the indexes I print out in do_things_to_text() that multiprocessing is dividing the total number of jobs into threads at every tenth index. So my console output shows Job 0, 10, 20, 30, 40, 50, 60, 70 being submitted at the same time (8 processes). And some of the Jobs complete faster than others, so you might see Job 22 completed before you see Job 1 completed.

Up until this first batch of threads is completed, all processes are active with nothing idle. However, when that batch is complete, and Job 80 starts, only one process is active, and the other 7 are idle. I have not confirmed, but I believe it stays like this until the 80-series is complete.

question from:https://stackoverflow.com/questions/65926296/why-do-i-have-idle-workers-when-using-python-multiprocessing-pools

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Here are some recommendations for better memory utilization:

I don't know how text_chunks is created but ultimately you end up with 8GB worth of strings in pool_inputs. Ideally, you would have a generator function, for example make_text_chunks, that yields the individual "text chunks" that formerly comprised the text_chunks iterable (if text_chunks is already such a generator expression, then you are all set). The idea is to not create all 8GB worth of data at once but only as the data is needed. With this strategy you can no longer use Pool method starmap_asynch; we will be using Pool.imap. This method, unlike startmap_asynch, will iteratively submit jobs in chunksize chunks and you can process the results as they become available (although that doesn't seem to be an issue).

def make_text_chunks():
    # logic goes here to generate the next chunk
    yield text_chunk


def do_things_to_text(t):
    # t is now a tuple:
    a, b = t
    #pull out necessary things for ProcessedText initialization and return an instance
    print('Processing {0}'.format(a))
    return ProcessedText(a, b)


import multiprocessing as mp

# do not turn into a list!
pool_inputs = enumerate(make_text_chunks())

def compute_chunksize(n_jobs, poolsize):
    """
    function to compute chunksize as is done by Pool module
    """
    if n_jobs == 0:
        return 0
    chunksize, remainder = divmod(n_jobs, poolsize * 4)
    if remainder:
        chunksize += 1
    return chunksize

#parallel processing
# number of jobs approximately
# don't know exactly without turning pool_inputs into a list, which would be self-defeating
N_JOBS = 300
POOLSIZE = 8
CHUNKSIZE = compute_chunksize(N_JOBS, POOLSIZE)
with mp.Pool(processes=POOLSIZE) as pool:
    output = [result for result in pool.imap(do_things_to_text, pool_inputs, CHUNKSIZE)]

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...