Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
270 views
in Technique[技术] by (71.8m points)

Python multiprocessing possible deadlock with two queue as producer-consumer pattern?

I'm wondering if there can be a sort of deadlock in the following code. I have to read each element of a database (about 1 million items), process it, then collect the results in a unique file.

I've parallelized the execution with multiprocessing using two Queue's and three types of processes:

  • Reader: Main process which reads the database and adds the read items in a task_queue
  • Worker: Pool of processes. Each worker gets an item from task_queue, processes the item, saves the results in an intermediate file stored in item_name/item_name.txt and puts the item_name in a completed_queue
  • Writer: Process which gets an item_name from completed_queue, gets the intermediate result from item_name/item_name.txt and writes it in results.txt
from multiprocessing import Pool, Process, Queue
class Computation():

    def __init__(self,K):
        self.task_queue = Queue()
        self.completed_queue = Queue()
        self.n_cpus = K

    def reader(self,):
        with open(db, "r") as db:
            ... # Read an item
            self.task_queue.put(item)
            
    def worker(self,):
        while True:
            item = self.task_queue.get(True)
            if item == "STOP":
                break
            self.process_item(item)

    def writer_process(self,):
        while True:
            f = self.completed_queue.get(True)
            if f == "DONE":
               break
            self.write_f(f)

    def run(self,):
        pool = Pool(n_cpus, self.worker, args=())
        
        writer = Process(target=self.writer_process, args=())
        writer.start()

        self.reader()

        pool.close()
        pool.join()

        self.completed_queue.put("DONE")
        writer.join()

The code works, but it seems that sometimes the writer or the pool stops working (or they are very slow). Is a deadlock possible in this scenario?

question from:https://stackoverflow.com/questions/65917456/python-multiprocessing-possible-deadlock-with-two-queue-as-producer-consumer-pat

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

There are a couple of issues with your code. First, by using the queues as you are, you are in effect creating your own process pool and have no need for using the multiprocessing.Pool class at all. You are using a pool initializer as an actual pool worker and it's a bit of a misuse of this class; you would be better off to just use regular Process instances (my opinion, anyway).

Second, although it is well and good that you are putting message DONE to the writer_process to signal it to terminate, you have not done similarly for the self.n_cpus worker processes, which are looking for 'STOP' messages, and therefore the reader function needs to put self.n_cpus STOP messages in the task queue:

from multiprocessing import Process, Queue


class Computation():

    def __init__(self, K):
        self.task_queue = Queue()
        self.completed_queue = Queue()
        self.n_cpus = K

    def reader(self,):
        with open(db, "r") as db:
            ... # Read an item
            self.task_queue.put(item)
        # signal to the worker processes to terminate:
        for _ in range(self.n_cpus):
            self.task_queue.put('STOP')
            
    def worker(self,):
        while True:
            item = self.task_queue.get(True)
            if item == "STOP":
                break
            self.process_item(item)

    def writer_process(self,):
        while True:
            f = self.completed_queue.get(True)
            if f == "DONE":
               break
            self.write_f(f)

    def run(self):
        processes = [Process(target=self.worker) for _ in range(self.n_cpus)]
        for p in processes:
            p.start()
        
        writer = Process(target=self.writer_process, args=())
        writer.start()

        self.reader()

        for p in processes:
            p.join()

        self.completed_queue.put("DONE")
        writer.join()

Personally, instead of using 'STOP' and 'DONE' as the sentinel messages, I would use None instead, assuming that is not a valid actual message. I have tested the above code where reader just processed strings in a list and self.process_item(item) simply appended ' done' to the each of those strings and put the modified string on the completed_queue and replaced self.write_f in the writer_process with a print call. I did not see any problems with the code as is.

Update to use a Managed Queue

Disclaimer: I have had no experience using mpi4py and have no idea how the queue proxies would get distributed across different computers. The above code may not be sufficient as suggested by the following article, How to share mutliprocessing queue object between multiple computers. However, that code is creating instances of Queue.Queue (that code is Python 2 code) and not the proxies that are returned by the multiprocessing.SyncManager. The documentation on this is very poor. Try the above change to see if it works better (it will be slower).

Because the proxy returned by manager.Queue(), I have had to rearrange the code a bit; the queues are now being passed explicitly as arguments to the process functions:

from multiprocessing import Process, Manager


class Computation():

    def __init__(self, K):
        self.n_cpus = K

    def reader(self, task_queue):
        with open(db, "r") as db:
            ... # Read an item
        # signal to the worker processes to terminate:
        for _ in range(self.n_cpus):
            task_queue.put('STOP')

    def worker(self, task_queue, completed_queue):
        while True:
            item = task_queue.get(True)
            if item == "STOP":
                break
            self.process_item(item)

    def writer_process(self, completed_queue):
        while True:
            f = completed_queue.get(True)
            if f == "DONE":
               break
            self.write_f(f)

    def run(self):
        with Manager() as manager:
            task_queue = manager.Queue()
            completed_queue = manager.Queue()
            processes = [Process(target=self.worker, args=(task_queue, completed_queue)) for _ in range(self.n_cpus)]
            for p in processes:
                p.start()

            writer = Process(target=self.writer_process, args=(completed_queue,))
            writer.start()

            self.reader(task_queue)

            for p in processes:
                p.join()

            completed_queue.put("DONE")
            writer.join()

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...