Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
75 views
in Technique[技术] by (71.8m points)

python - Using Multiprocessing in a Time and Memory Efficient Way

I have a problem and am running out of ideas, so I'm hoping someone has a better idea.

I am trying to run an optimization in python over millions of data points. I have a function calculate(data), which receives a huge array data and returns an array of the same size results. The calculations are fairly simple, but they need to be made for each entry in results using several entries in data and unfortunately can't be vectorized. Due to dimensionality reasons the size of data and results is massive and can't be reduced. Due to the sheer number of calculations this can take several days to calculate.

Therefore I started using multiprocessing, which improves the speed significantly. The way I implemented this was by cutting results into chunks, giving each core the full (required) data array and the task to calculate its individual chunk of the whole results array and returning them to merge them after. (using pool.apply_async(func, (data,)))

As the size of data increased further however, I started getting memory errors. From my analysis, this is because each core has its individual data array, meaning I have the original 1 + # of cores copies of data in my RAM. To cut down on this I figured I would try to use a manager with a proxy dictionary for data that each core can then access. (using data_shared = manager.dict(data)) Unfortunately this has turned out to be incredibly slow which is presumably why it's not recommended.

Is there an obvious solution to this that I missed? I am very grateful for any ideas.

question from:https://stackoverflow.com/questions/65942647/using-multiprocessing-in-a-time-and-memory-efficient-way

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Here is an example using a list of integers.

This code just breaks up an array of 1000 integers from 0, 1, 2, ... 999 into chunk sizes of 10 ending up with 1000 / 10 = 100 chunks and creates 100 lists from these chunks. These chunks along with its starting index are submitted to a worker that sums the elements of the lists and returns back the starting index and sum, which is then used to compute the grand total. Here the starting index is not being used to calculate the grand total but in another context it could be useful.

from multiprocessing import Pool, Array
import itertools


def init_pool(arr):
    global data
    data = arr


def my_worker(tpl):
    index, results_chunk = tpl
    # sum items from results_chunk and data array:
    data_index = index
    the_sum = 0
    for item in results_chunk:
        the_sum += item + data[data_index]
        data_index += 1
    return index, the_sum


def get_chunks(arr, size):
    index = 0
    it = iter(arr)
    while True:
        x = tuple(itertools.islice(it, size))
        if not x:
            return
        yield index, list(x)
        index += size


# required for Windows and other platforms that do not have a fork() call:
if __name__ == '__main__':
    data = [x for x in range(1000, 2000)]
    data_sum = sum(data)
    arr = Array('i', data, lock=False)
    results = [x for x in range(1000)]
    results_sum = sum(results)
    print('Excpected total sum =', data_sum + results_sum)
    with Pool(initializer=init_pool, initargs=(arr,)) as pool:
        total = 0
        for index, the_sum in pool.imap_unordered(my_worker, get_chunks(results, 10), 3):
            total += the_sum
        print('Actual total sum =', total)

Prints:

Excpected total sum = 1999000
Actual total sum = 1999000

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...