Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
139 views
in Technique[技术] by (71.8m points)

python - multiprocessing GUI schemas to combat the "Not Responding" blocking

What are the best ways to create a multiprocessing/ GUI coding system?

I would like to create a place for the internet community to come and find examples on how to use the multiprocessing module in python.

I have seen several small examples of multiprocessing processes on the internet of simple global functions which are called in a main module, but I have found that this rarely translates easily into anything that anyone actually does with regard to GUIs. I would think that many programs would have the functions which they want to use in a separate process as methods of objects (which may be aggregates of other objects etc.) and perhaps a single GUI element would have an associated object that needs to call this process, etc.

For example, I have a relatively complex program and I am having problems in getting a responsive GUI for it, which I believed to be due to my lack of understanding in multiprocessing and threading with QThread. However, I do know that the example given below will at least pass information between processes in the manner I desire (due to being able to execute print statements) but my GUI is still locking. Does anyone know what may be causing this, and if it is still a probelm with my lack of understanding in mutlithreaded/multiprocessing architectures?

Here is a small pseudo code example of what I am doing:

class Worker:
    ...
    def processing(self, queue):
        # put stuff into queue in a loop

# This thread gets data from Worker
class Worker_thread(QThread):
    def __init__(self):
        ...
        # make process with Worker inside
    def start_processing(self):
        # continuously get data from Worker
        # send data to Tab object with signals/slots

class Tab(QTabWidget):
    # spawn a thread separate from main GUI thread

    # update GUI using slot
    def update_GUI()

And this code is fully compilable example which embodies the overlying sturcture of my program:

from PyQt4 import QtCore, QtGui
import multiprocessing as mp
import numpy as np
import sys
import time

# This object can hold several properties which will be used for the processing
# and will be run in the background, while it updates a thread with all of it's progress
class Worker:
    def __init__(self, some_var):
        self.some_var = some_var
        self.iteration = 0

    def some_complex_processing(self, queue):
        for i in range(0,5000):
            self.iteration += 1
            queue.put(self.iteration)
        queue.put('done with processing')

# This Woker_thread is a thread which will spawn a separate process (Worker).
# This separate is needed in order to separate the data retrieval
# from the main GUI thread, which should only quickly update when needed 
class Worker_thread(QtCore.QThread):
    # signals and slots are used to communicate back to the main GUI thread
    update_signal = QtCore.pyqtSignal(int)
    done_signal = QtCore.pyqtSignal()

    def __init__(self, parent, worker):
        QtCore.QThread.__init__(self, parent)
        self.queue = mp.Queue()
        self.worker = worker
        self.parent = parent
        self.process = mp.Process(target=self.worker.some_complex_processing, args=(self.queue,))

    # When the process button is pressed, this function will start getting data from Worker
    # this data is then retrieved by the queue and pushed through a signal
    # to Tab.update_GUI
    @QtCore.pyqtSlot()
    def start_computation(self):
        self.process.start()
        while(True):
            try:
                message = self.queue.get()
                self.update_signal.emit(message)
            except EOFError:
                pass
            if message == 'done with processing':
                self.done_signal.emit()
                break
            #self.parent.update_GUI(message)
        self.process.join()
        return

# Each tab will start it's own thread, which will spawn a process
class Tab(QtGui.QTabWidget):
    start_comp = QtCore.pyqtSignal()
    def __init__(self, parent, this_worker):
        self.parent = parent
        self.this_worker = this_worker
        QtGui.QTabWidget.__init__(self, parent)

        self.treeWidget = QtGui.QTreeWidget(self)
        self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
        self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])

        self.thread = Worker_thread(parent=self, worker=self.this_worker)
        self.thread.update_signal.connect(self.update_GUI)
        self.thread.done_signal.connect(self.thread.quit)
        self.start_comp.connect(self.thread.start_computation)
        self.thread.start()

    ###############################
    # Here is what should update the GUI at every iteration of Worker.some_complex_processing()
    # The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
    @QtCore.pyqtSlot(int)
    def update_GUI(self, iteration):
        self.step.setText(0, str(iteration))
        #time.sleep(0.1)
        print iteration

    def start_signal_emit(self):
        self.start_comp.emit()

# GUI stuff
class MainWindow(QtGui.QMainWindow):
    def __init__(self, parent = None):
        QtGui.QMainWindow.__init__(self)
        self.tab_list = []
        self.setTabShape(QtGui.QTabWidget.Rounded)
        self.centralwidget = QtGui.QWidget(self)
        self.top_level_layout = QtGui.QGridLayout(self.centralwidget)

        self.tabWidget = QtGui.QTabWidget(self.centralwidget)
        self.top_level_layout.addWidget(self.tabWidget, 1, 0, 25, 25)

        process_button = QtGui.QPushButton("Process")
        self.top_level_layout.addWidget(process_button, 0, 1)
        QtCore.QObject.connect(process_button, QtCore.SIGNAL("clicked()"), self.process)

        self.setCentralWidget(self.centralwidget)
        self.centralwidget.setLayout(self.top_level_layout)

        # Make Tabs in loop from button
        for i in range(0,10):
            name = 'tab' + str(i)
            self.tab_list.append(Tab(self.tabWidget, Worker(name)))
            self.tabWidget.addTab(self.tab_list[-1], name)

    # Do the processing
    def process(self):
        for tab in self.tab_list:
            tab.start_signal_emit()
        return

if __name__ == "__main__":
    app = QtGui.QApplication([])
    win = MainWindow()
    win.show()
    sys.exit(app.exec_())

More Information: I am writing a program which I would like to spawn several processes from and have them continuously show their progress throughout their processing. I would like the program to be multiprocessed in order to get the best speed out of the program as possible.

At the moment, I am trying to use a thread to spawn a process and use signals and slots to update the GUI while the data is continuously retrieved by a queue. It appears that the queues, signals, and slots work when using print statements, but can not update the GUI. If anyone has any other suggestions as to how I should structure this in order to keep the program more managable, I would like to learn.

EDIT: I have made the adjustments put forth by Min Lin, with the addition of making Worker a QObject so that moveToThread() would work.
Here is the new code I have at the moment:

from PyQt4 import QtCore, QtGui
import multiprocessing as mp
import numpy as np
import sys
import time

class Worker(QtCore.QObject):
    update_signal = QtCore.pyqtSignal(int)
    done_signal = QtCore.pyqtSignal()

    def __init__(self, some_var):
        QtCore.QObject.__init__(self, parent=None)
        self.some_var = some_var
        self.iteration = 0
        self.queue = mp.Queue()
        self.process = mp.Process(target=self.some_complex_processing, args=(self.queue,))

    def some_complex_processing(self, queue):
        for i in range(0,5000):
            self.iteration += 1
            queue.put(self.iteration)
        queue.put('done with processing')

    @QtCore.pyqtSlot()
    def start_computation(self):
        self.process.start()
        while(True):
            try:
                message = self.queue.get()
                self.update_signal.emit(message)
            except EOFError:
                pass
            if message == 'done with processing':
                self.done_signal.emit()
                break
        self.process.join()
        return



class Tab(QtGui.QTabWidget):
    start_comp = QtCore.pyqtSignal()
    def __init__(self, parent, this_worker):
        self.parent = parent
        self.this_worker = this_worker
        QtGui.QTabWidget.__init__(self, parent)

        self.treeWidget = QtGui.QTreeWidget(self)
        self.properties = QtGui.QTreeWidgetItem(self.treeWidget, ["Properties"])
        self.step = QtGui.QTreeWidgetItem(self.properties, ["Iteration #"])

        # Use QThread is enough
        self.thread = QtCore.QThread();
        # Change the thread affinity of worker to self.thread.
        self.this_worker.moveToThread(self.thread);
        self.this_worker.update_signal.connect(self.update_GUI)
        self.this_worker.done_signal.connect(self.thread.quit)
        self.start_comp.connect(self.this_worker.start_computation)
        self.thread.start()

    ###############################
    # Here is what should update the GUI at every iteration of Worker.some_complex_processing()
    # The message appears to be getting sent, due to seeing the print statement in the console, but the GUI is not updated.
    @QtCore.pyqtSlot(int)
    def update_GUI(self, iteration):
        self.step.setText(0, str(iteration))
        #time.sleep(0.1)
        print iteration

    def start_signal_emit(self):
        self.start_comp.emit()

# GUI stuff
class MainWindow(QtGui.QMainWindow):
    def __init__(self, parent = None):
        QtGui.QMainWindow.__init__(self)
        self.tab_list = []
        self.setTabShape(QtGui.QTabWidget.Rounded)
        self.centralwidget = QtGui.QWidget(self)
        self.top_level_layout = QtGui.QGridLayout(self.centralwidget)

        self.tabWidget = QtGui.QTabWidget(self.centralwidget)
        self.top_level_layout.addWidget(self.tabWidget, 1, 0, 25, 25)

        process_button = QtGui.QPushButton("Process")
        self.top_level_layout.addWidget(process_button, 0, 1)
        QtCore.QObject.connect(process_button, QtCore.SIGNAL("clicked()"), self.process)

        self.setCentralWidget(self.centralwidget)
        self.centralwidget.setLayout(self.top_level_layout)

        # Make Tabs in loop from button
        for i in range(0,10):
            name = 'tab' + str(i)
            self.tab_list.append(Tab(self.tabWidget, Worker(name)))
            self.tabWidget.addTab(self.tab_list[-1], name)

    # Do the processing
    def process(self):
        for tab in self.tab_list:
            tab.start_signal_emit()
        return

if __name__ == "__main__":
    app = QtGui.QApplication([])
    win = MainWindow()
    win.show()
    sys.exit(app.exec_())

Thank you for all of the answers, I appreciate the level of detail that everyone has gone into in describing the idea they believe to be solution, but unfortunately I have not yet been able to perform these types of processes which operate on the object they belong to while displaying the object's attribute on a GUI.
However, I have learned a decent amount from this post, which allo


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

A GUI application is perfect for testing stuff, as it is easy to spawn new tasks and visualize what is going on, so I wrote a little example app (Screenshot, code is below) as I did want to learn it for my self.

At first, i took a similar approach as yours, trying to implement the Consumer/Producer pattern and I struggeled with Background Processes doing endless loops to wait for new jobs and took care of communication back and forth for myself. Then I found out about the Pool Interface and then I could replace all that hidious code with just a few lines. All you need is a single pool and a few callbacks:

#!/usr/bin/env python3
import multiprocessing, time, random, sys
from PySide.QtCore import * # equivalent: from PyQt4.QtCore import *
from PySide.QtGui import *   # equivalent: from PyQt4.QtGui import *

def compute(num):
    print("worker() started at %d" % num)
    random_number = random.randint(1, 6)
    if random_number in (2, 4, 6):
        raise Exception('Random Exception in _%d' % num)
    time.sleep(random_number)
    return num

class MainWindow(QMainWindow):
    def __init__(self):
        QMainWindow.__init__(self)
        self.toolBar = self.addToolBar("Toolbar")
        self.toolBar.addAction(QAction('Add Task', self, triggered=self.addTask))
        self.list = QListWidget()
        self.setCentralWidget(self.list)

        # Pool of Background Processes
        self.pool = multiprocessing.Pool(processes=4)

    def addTask(self):
        num_row = self.list.count()
        self.pool.apply_async(func=compute, args=(num_row,), callback=self.receiveResult,
                              error_callback=self.receiveException)
        item = QListWidgetItem("item %d" % num_row)
        item.setForeground(Qt.gray)
        self.list.addItem(item)

    def receiveResult(self, result):
        assert isinstance(result, int)
        print("end_work(), where result is %s" % result)
        self.list.item(result).setForeground(Qt.darkGreen)

    def receiveException(self, exception):
        error = str(exception)
        _pos = error.find('_') + 1
        num_row = int(error[_pos:])
        item = self.list.item(num_row)
        item.setForeground(Qt.darkRed)
        item.setText(item.text() + ' Retry...')
        self.pool.apply_async(func=compute, args=(num_row,), callback=self.receiveResult,
                              error_callback=self.receiveException)

if __name__ == '__main__':
    app = QApplication(sys.argv)
    main_window = MainWindow()
    main_window.show()
    sys.exit(app.exec_())

Edit: I did another example using a QTimer instead of Callbacks, checking periodically for Entries in a Queue, updating a QProgressBar:

#!/usr/bin/env python3
import multiprocessing, multiprocessing.pool, time, random, sys
from PySide.QtCore import *
from PySide.QtGui import *

def compute(num_row):
    print("worker started at %d" % num_row)
    random_number = random.randint(1, 10)
    for second in range(random_number):
        progress = float(second) / float(random_number) * 100
        compute.queue.put((num_row, progress,))
        time.sleep(1)
    compute.queue.put((num_row, 100))

def pool_init(queue):
    # see http://stackoverflow.com/a/3843313/852994
    compute.queue = queue

class MainWindow(QMainWindow):
    def __init__(self):
        QMainWindow.__init__(self)
        self.toolBar = self.addToolBar("Toolbar")
        self.toolBar.addAction(QAction('Add Task', self, triggered=self.addTask))
        self.table = QTableWidget()
        self.table.verticalHeader().hide()
        self.table.setColumnCount(2)
        self.setCentralWidget(self.table)

        # Pool of Background Processes
        self.queue = multiprocessing.Queue()
        self.pool = multiprocessing.Pool(processes=4, initializer=pool_init, initargs=(self.queue,))

        # Check for progress periodically
        self.timer = QTimer()
        self.timer.timeout.connect(self.updateProgress)
        self.timer.start(2000)

    def addTask(self):
        num_row = self.table.rowCount()
        self.pool.apply_async(func=compute, args=(num_row,))
        label = QLabel("Queued")
        bar = QProgressBar()
        bar.setValue(0)
        self.table.setRowCount(num_row + 1)
        self.table.setCellWidget(num_row, 0, label)
        self.table.setCellWidget(num_row, 1, bar)

    def updateProgress(self):
        if self.queue.empty(): return
        num_row, progress = self.queue.get() # unpack
        print("received progress of %s at %s" % (progress, num_row))
        label = self.table.cellWidget(num_row, 0)
        bar = self.table.cellWidget(num_row, 1)
        bar.setValue(progress)
        if progress == 100:
            label.setText('Finished')
        elif label.text() == 'Queued':
            label.setText('Downloading')
        self.updateProgress() # recursion

if __name__ == '__main__':
    app = QApplication(sys.argv)
    main_window = MainWindow()
    main_window.show()
    sys.exit(app.exec_())

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...