Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
329 views
in Technique[技术] by (71.8m points)

python - Script using multiprocessing module does not terminate

The following code, does not print "here". What is the problem? I tested it on both my machines (windows 7, Ubuntu 12.10), and http://www.compileonline.com/execute_python_online.php It does not print "here" in all cases.

from multiprocessing import Queue, Process


def runLang(que):
    print "start"
    myDict=dict()
    for i in xrange(10000):
        myDict[i]=i
    que.put(myDict)
    print "finish"


def run(fileToAnalyze):
    que=Queue()
    processList=[]
    dicList=[]
    langs= ["chi","eng"]
    for lang in langs:
        p=Process(target=runLang,args=(que,))
        processList.append(p)
        p.start()

    for p1 in processList:
        p1.join()

    print "here"

    for _ in xrange(len(langs)):
        item=que.get()
        print item
        dicList.append(item)

if __name__=="__main__":
    processList = []
    for fileToAnalyse in ["abc.txt","def.txt"]:
        p=Process(target=run,args=(fileToAnalyse,))
        processList.append(p)
        p.start()
    for p1 in processList:
        p1.join()
Question&Answers:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

This is because when you put lots of items into a multiprocessing.Queue, they eventually get buffered in memory, once the underlying Pipe is full. The buffer won't get flushed until something starts reading from the other end of the Queue, which will allow the Pipe to accept more data. A Process cannot terminate until the buffer for all its Queue instances have been entirely flushed to their underlying Pipe. The implication of this is that if you try to join a process without having another process/thread calling get on its Queue, you could deadlock. This is mentioned in the docs:

Warning

As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue.

You can fix the issue by not calling join until after you empty the Queue in the parent:

for _ in xrange(len(langs)):
    item = que.get()
    print(item)
    dicList.append(item)

# join after emptying the queue.
for p in processList:
    p.join()

print("here")

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...