Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
439 views
in Technique[技术] by (71.8m points)

python - numpy vs. multiprocessing and mmap

I am using Python's multiprocessing module to process large numpy arrays in parallel. The arrays are memory-mapped using numpy.load(mmap_mode='r') in the master process. After that, multiprocessing.Pool() forks the process (I presume).

Everything seems to work fine, except I am getting lines like:

AttributeError("'NoneType' object has no attribute 'tell'",)
  in `<bound method memmap.__del__ of
       memmap([ 0.57735026,  0.57735026,  0.57735026,  0.        ,  0.        ,        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,        0.        ,  0.        ], dtype=float32)>`
     ignored

in the unittest logs. The tests pass fine, nevertheless.

Any idea what's going on there?

Using Python 2.7.2, OS X, NumPy 1.6.1.


UPDATE:

After some debugging, I hunted down the cause to a code path that was using a (small slice of) this memory-mapped numpy array as input to a Pool.imap call.

Apparently the "issue" is with the way multiprocessing.Pool.imap passes its input to the new processes: it uses pickle. This doesn't work with mmaped numpy arrays, and something inside breaks which leads to the error.

I found this reply by Robert Kern which seems to address the same issue. He suggests creating a special code path for when the imap input comes from a memory-mapped array: memory-mapping the same array manually in the spawned process.

This would be so complicated and ugly that I'd rather live with the error and the extra memory copies. Is there any other way that would be lighter on modifying existing code?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

My usual approach (if you can live with extra memory copies) is to do all IO in one process and then send things out to a pool of worker threads. To load a slice of a memmapped array into memory just do x = np.array(data[yourslice]) (data[yourslice].copy() doesn't actually do this, which can lead to some confusion.).

First off, let's generate some test data:

import numpy as np
np.random.random(10000).tofile('data.dat')

You can reproduce your errors with something like this:

import numpy as np
import multiprocessing

def main():
    data = np.memmap('data.dat', dtype=np.float, mode='r')
    pool = multiprocessing.Pool()
    results = pool.imap(calculation, chunks(data))
    results = np.fromiter(results, dtype=np.float)

def chunks(data, chunksize=100):
    """Overly-simple chunker..."""
    intervals = range(0, data.size, chunksize) + [None]
    for start, stop in zip(intervals[:-1], intervals[1:]):
        yield data[start:stop]

def calculation(chunk):
    """Dummy calculation."""
    return chunk.mean() - chunk.std()

if __name__ == '__main__':
    main()

And if you just switch to yielding np.array(data[start:stop]) instead, you'll fix the problem:

import numpy as np
import multiprocessing

def main():
    data = np.memmap('data.dat', dtype=np.float, mode='r')
    pool = multiprocessing.Pool()
    results = pool.imap(calculation, chunks(data))
    results = np.fromiter(results, dtype=np.float)

def chunks(data, chunksize=100):
    """Overly-simple chunker..."""
    intervals = range(0, data.size, chunksize) + [None]
    for start, stop in zip(intervals[:-1], intervals[1:]):
        yield np.array(data[start:stop])

def calculation(chunk):
    """Dummy calculation."""
    return chunk.mean() - chunk.std()

if __name__ == '__main__':
    main()

Of course, this does make an extra in-memory copy of each chunk.

In the long run, you'll probably find that it's easier to switch away from memmapped files and move to something like HDF. This especially true if your data is multidimensional. (I'd reccomend h5py, but pyTables is nice if your data is "table-like".)

Good luck, at any rate!


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...