list(file_obj)
can require a lot of memory when fileobj
is large. We can reduce that memory requirement by using itertools to pull out chunks of lines as we need them.
In particular, we can use
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
to split the file into processable chunks, and
groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)]
result = pool.map(worker, groups)
to have the multiprocessing pool work on num_chunks
chunks at a time.
By doing so, we need roughly only enough memory to hold a few (num_chunks
) chunks in memory, instead of the whole file.
import multiprocessing as mp
import itertools
import time
import csv
def worker(chunk):
# `chunk` will be a list of CSV rows all with the same name column
# replace this with your real computation
# print(chunk)
return len(chunk)
def keyfunc(row):
# `row` is one row of the CSV file.
# replace this with the name column.
return row[0]
def main():
pool = mp.Pool()
largefile = 'test.dat'
num_chunks = 10
results = []
with open(largefile) as f:
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
print(results)
if __name__ == '__main__':
main()
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…