Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
361 views
in Technique[技术] by (71.8m points)

python - How to properly set up multiprocessing proxy objects for objects that already exist

I'm trying to share an existing object across multiple processing using the proxy methods described here. My multiprocessing idiom is the worker/queue setup, modeled after the 4th example here.

The code needs to do some calculations on data that are stored in rather large files on disk. I have a class that encapsulates all the I/O interactions, and once it has read a file from disk, it saves the data in memory for the next time a task needs to use the same data (which happens often).

I thought I had everything working from reading the examples linked to above. Here is a mock up of the code that just uses numpy random arrays to model the disk I/O:

import numpy
from multiprocessing import Process, Queue, current_process, Lock
from multiprocessing.managers import BaseManager

nfiles = 200
njobs = 1000

class BigFiles:

    def __init__(self, nfiles):
        # Start out with nothing read in.
        self.data = [ None for i in range(nfiles) ]
        # Use a lock to make sure only one process is reading from disk at a time.
        self.lock = Lock()

    def access(self, i):
        # Get the data for a particular file
        # In my real application, this function reads in files from disk.
        # Here I mock it up with random numpy arrays.
        if self.data[i] is None:
            with self.lock:
                self.data[i] = numpy.random.rand(1024,1024)
        return self.data[i]

    def summary(self):
        return 'BigFiles: %d, %d Storing %d of %d files in memory'%(
                id(self),id(self.data),
                (len(self.data) - self.data.count(None)),
                len(self.data)  )


# I'm using a worker/queue setup for the multprocessing:
def worker(input, output):
    proc = current_process().name
    for job in iter(input.get, 'STOP'):
        (big_files, i, ifile) = job
        data = big_files.access(ifile)
        # Do some calculations on the data
        answer = numpy.var(data)
        msg = '%s, job %d'%(proc, i)
        msg += '
   Answer for file %d = %f'%(ifile, answer)
        msg += '
   ' + big_files.summary()
        output.put(msg)

# A class that returns an existing file when called.
# This is my attempted workaround for the fact that Manager.register needs a callable.
class ObjectGetter:
    def __init__(self, obj):
        self.obj = obj
    def __call__(self):
        return self.obj

def main():
    # Prior to the place where I want to do the multprocessing, 
    # I already have a BigFiles object, which might have some data already read in.
    # (Here I start it out empty.)
    big_files = BigFiles(nfiles)
    print 'Initial big_files.summary = ',big_files.summary()

    # My attempt at making a proxy class to pass big_files to the workers
    class BigFileManager(BaseManager): 
        pass
    getter = ObjectGetter(big_files)
    BigFileManager.register('big_files', callable = getter)
    manager = BigFileManager()
    manager.start()

    # Set up the jobs:
    task_queue = Queue()
    for i in range(njobs):
        ifile = numpy.random.randint(0, nfiles)
        big_files_proxy = manager.big_files()
        task_queue.put( (big_files_proxy, i, ifile) )

    # Set up the workers
    nproc = 12
    done_queue = Queue()
    process_list = []
    for j in range(nproc):
        p = Process(target=worker, args=(task_queue, done_queue))
        p.start()
        process_list.append(p)
        task_queue.put('STOP')

    # Log the results
    for i in range(njobs):
        msg = done_queue.get()
        print msg

    print 'Finished all jobs'
    print 'big_files.summary = ',big_files.summary()

    # Shut down the workers
    for j in range(nproc):
        process_list[j].join()
    task_queue.close()
    done_queue.close()

main()

This works in the sense that it calculates everything correctly, and it is caching the data that is read along the way. The only problem I'm having is that at the end, the big_files object doesn't have any of the files loaded. The final msg returned is:

Process-2, job 999.  Answer for file 198 = 0.083406
   BigFiles: 4303246400, 4314056248 Storing 198 of 200 files in memory

But then after it's all done, we have:

Finished all jobs
big_files.summary =  BigFiles: 4303246400, 4314056248 Storing 0 of 200 files in memory

So my question is: What happened to all the stored data? It's claiming to be using the same self.data according to the id(self.data). But it's empty now.

I want the end state of big_files to have all the saved data that it accumulated along the way, since I actually have to repeat this entire process many times, so I don't want to have to redo all the (slow) I/O each time.

I'm assuming it must have something to do with my ObjectGetter class. The examples for using BaseManager only show how to make a new object that will be shared, not share an existing one. So am I doing something wrong with way I get the existing big_files object? Can anyone suggest a better way to do this step?

Thanks much!

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...