Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
210 views
in Technique[技术] by (71.8m points)

python - Trouble using a lock with multiprocessing.Pool: pickling error

I'm building a python module to extract tags from a large corpus of text, and while its results are high quality it executes very slowly. I'm trying to speed the process up by using multiprocessing, and that was working too, until I tried to introduce a lock so that only one process was connecting to our database at a time. I can't figure out for the life of me how to make this work - despite much searching and tweaking I am still getting a PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed. Here's the offending code - it worked fine until I tried to pass a lock object as an argument for f.

def make_network(initial_tag, max_tags = 2, max_iter = 3):
    manager = Manager()
    lock = manager.Lock()
    pool = manager.Pool(8)

    # this is a very expensive function that I would like to parallelize 
    # over a list of tags. It involves a (relatively cheap) call to an external
    # database, which needs a lock to avoid simultaneous queries. It takes a list
    # of strings (tags) as its sole argument, and returns a list of sets with entries
    # corresponding to the input list.
    f = partial(get_more_tags, max_tags = max_tags, lock = lock) 

    def _recursively_find_more_tags(tags, level):
        if level >= max_iter:
            raise StopIteration
        new_tags = pool.map(f, tags)
        to_search = []
        for i, s in zip(tags, new_tags):
            for t in s:
                joined = ' '.join(t)
                print i + "|" + joined
                to_search.append(joined)
        try:
            return _recursively_find_more_tags(to_search, level+1)
        except StopIteration:
            return None

    _recursively_find_more_tags([initial_tag], 0)
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Your problem is that lock objects are not picklable. I can see two possible solutions for you in that case.

  • To avoid this, you can make your lock variable a global variable. Then you will be able to reference it within your pool process function directly as a global variable, and will not have to pass it as an argument to the pool process function. This works because Python uses the OS fork mechanism when creating the pool processes and hence copies the entire contents of the process that creates the pool processes to them. This is the only way of passing a lock to a Python process created with the multiprocessing package. Incidentally, it is not necessary to use the Manager class just for this lock. With this change your code would look like this:

    import multiprocessing
    from functools import partial
    
    lock = None  # Global definition of lock
    pool = None  # Global definition of pool
    
    
    def make_network(initial_tag, max_tags=2, max_iter=3):
        global lock
        global pool
        lock = multiprocessing.Lock()
        pool = multiprocessing.Pool(8)
    
    
    def get_more_tags():
        global lock
        pass
    
    
    # this is a very expensive function that I would like to parallelize
    # over a list of tags. It involves a (relatively cheap) call to an external
    # database, which needs a lock to avoid simultaneous queries. It takes a
    # list of strings (tags) as its sole argument, and returns a list of sets
    # with entries corresponding to the input list.
    f = partial(get_more_tags, max_tags=max_tags) 
    
    def _recursively_find_more_tags(tags, level):
        global pool
        if level >= max_iter:
            raise StopIteration
        new_tags = pool.map(f, tags)
        to_search = []
        for i, s in zip(tags, new_tags):
            for t in s:
                joined = ' '.join(t)
                print(i + "|" + joined)
                to_search.append(joined)
        try:
            return _recursively_find_more_tags(to_search, level + 1)
        except StopIteration:
            return None
    
    _recursively_find_more_tags([initial_tag], 0)
    

In your real code, it is possible that the lock and pool variables might be class instance variables.

  • A second solution which avoids the use of locks altogether but which might have slightly higher overhead would be to create another process with multiprocessing.Process and connect it via a multiprocessing.Queue to each of your pool processes. This process would be responsible for running your database query. You would use the queue to allow your pool processes to send parameters to the process that managed the database query. Since all the pool processes would use the same queue, access to the database would automatically be serialized. The additional overheads would come from the pickling/unpickling of the database query arguments and the query response. Note that you can pass a multiprocessing.Queue object to a pool process as an argument. Note also that the multiprocessing.Lock based solution would not work on Windows where process are not created with fork semantics.

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...