Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
403 views
in Technique[技术] by (71.8m points)

python - Chunking data from a large file for multiprocessing?

I'm trying to a parallelize an application using multiprocessing which takes in a very large csv file (64MB to 500MB), does some work line by line, and then outputs a small, fixed size file.

Currently I do a list(file_obj), which unfortunately is loaded entirely into memory (I think) and I then I break that list up into n parts, n being the number of processes I want to run. I then do a pool.map() on the broken up lists.

This seems to have a really, really bad runtime in comparison to a single threaded, just-open-the-file-and-iterate-over-it methodology. Can someone suggest a better solution?

Additionally, I need to process the rows of the file in groups which preserve the value of a certain column. These groups of rows can themselves be split up, but no group should contain more than one value for this column.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

list(file_obj) can require a lot of memory when fileobj is large. We can reduce that memory requirement by using itertools to pull out chunks of lines as we need them.

In particular, we can use

reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)

to split the file into processable chunks, and

groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)]
result = pool.map(worker, groups)

to have the multiprocessing pool work on num_chunks chunks at a time.

By doing so, we need roughly only enough memory to hold a few (num_chunks) chunks in memory, instead of the whole file.


import multiprocessing as mp
import itertools
import time
import csv

def worker(chunk):
    # `chunk` will be a list of CSV rows all with the same name column
    # replace this with your real computation
    # print(chunk)
    return len(chunk)  

def keyfunc(row):
    # `row` is one row of the CSV file.
    # replace this with the name column.
    return row[0]

def main():
    pool = mp.Pool()
    largefile = 'test.dat'
    num_chunks = 10
    results = []
    with open(largefile) as f:
        reader = csv.reader(f)
        chunks = itertools.groupby(reader, keyfunc)
        while True:
            # make a list of num_chunks chunks
            groups = [list(chunk) for key, chunk in
                      itertools.islice(chunks, num_chunks)]
            if groups:
                result = pool.map(worker, groups)
                results.extend(result)
            else:
                break
    pool.close()
    pool.join()
    print(results)

if __name__ == '__main__':
    main()

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...