About QThreadPool cleaning in Qt

Recently, I learned PyQt5 programming and encountered a problem, which is the cleaning of QThreadPool. Specifically, I sometimes need to stop the execution of all threads in a QThreadPool and do the necessary cleaning. Sometimes there are two main scenarios here: because the new input QThreadPool needs to be cleaned up and start with the new input; Close the window and exit the program. One of the first scenarios is to search keywords to grab pictures on the network. The capture is performed in the online process pool, and then the returned picture data is displayed on the GUI through slot. The new search is completely independent of the previous search. The final forming code is as follows:

import sys
import time
from collections import deque
from PyQt5.QtCore import QObject, QRunnable, QThreadPool
from PyQt5.QtCore import pyqtSignal as Signal
from PyQt5.QtCore import pyqtSlot as Slot
from pyQt5.QtWidgets import QApplication, QMainWindow
import traceback


class WorkerSignals(QObject):
    '''
    Defines the signals available from a running worker thread.

    Supported signals are:

    finished
        No data

    error
        tuple (exctype, value, traceback.format_exc() )

    result
        object data returned from processing, anything

    progress
        int indicating % progress

    '''
    finished = Signal()
    error = Signal(tuple)
    result = Signal(object)
    progress = Signal(int)


class Worker(QRunnable):
    '''
    Worker thread

    Inherits from QRunnable to handler worker thread setup, signals and wrap-up.

    :param callback: The function callback to run on this worker thread. Supplied args and
                     kwargs will be passed through to the runner.
    :type callback: function
    :param args: Arguments to pass to the callback function
    :param kwargs: Keywords to pass to the callback function

    '''

    def __init__(self, fn, *args, **kwargs):
        super().__init__()

        # Store constructor arguments (re-used for processing)
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.signals = WorkerSignals()

        # Add the callback to our kwargs
        self.kwargs['progress_callback'] = self.signals.progress

    @Slot()
    def run(self):
        '''
        Initialise the runner function with passed args, kwargs.
        '''

        # Retrieve args/kwargs here; and fire processing using them
        # print(self.fn, self.args, self.kwargs)
        try:
            result = self.fn(*self.args, **self.kwargs)
        except Exception:
            traceback.print_exc()
            exctype, value = sys.exc_info()[:2]
            self.signals.error.emit((exctype, value, traceback.format_exc()))
        else:
            # Return the result of the processing
            self.signals.result.emit(result)
        finally:
            self.signals.finished.emit()  # Done


class GenericThreadPool(QThreadPool):
    def __init__(self, parent=None):
        super().__init__(parent=parent)
        self._signalsQObjectQueue = deque()

    def cleanup(self, timeout=0.01):
        while self._signalsQObjectQueue:
            signalsQObject = self._signalsQObjectQueue.pop()
            signalsQObject.disconnect()
        self.clear()
        return self.waitForDone(timeout)

    def start(self, worker, priority=0):
        self._signalsQObjectQueue.append(worker.signals)
        super().start(worker, priority)
	
	# I only need to call start instead of tryStart, so I didn't try to overload tryStart


class Ui_MainWindow(object):
	pass


class MainWindow(QMainWindow):

	def __init__(self, *args, **kwargs):
        QMainWindow.__init__(self, *args, **kwargs)

		self.ui = Ui_MainWindow()
		# setup main window ui and others

		self.ui.someWidget.someSignal.connect(self.onSomeSignal)

		self.threadpool_1 = GenericThreadPool()
		self.threadpool_2 = GenericThreadPool()

	def cleanup(self):
		for threadpool in [self.threadpool_1, self.threadpool_2]:
			if not threadpool.cleanup(-1):
				time.sleep(30)
	
	@Slot()
	def onSomeSignal(self):
		self.threadpool_1.cleanup()
		# more cleanup and reset or initialization

		# get input

		for data in input:
			worker = Worker(self.someMember, data)
			worker.signals.result.connect(self.someResultSlot)
            worker.signals.error.connect(self.someErrorSlot)
            worker.signals.finished.connect(self.someFinishedSlot)
            worker.signals.progress.connect(self.someProgressSlot)
			self.threadpool_1.start(worker)

	# other members


def main():
    app = QApplication(sys.argv)

    mainWindow = MainWindow()
    mainWindow.show()

    app.aboutToQuit.connect(mainWindow.cleanup)

    sys.exit(app.exec())

The above definitions of WorkerSignals and Worker are copied from the Internet and are a basic pattern matched with QThreadPool. However, I didn't find the QThreadPool cleaning pattern on the Internet (which is obviously an extremely basic problem). The result is countless ups and downs, although it is only a few lines of code in the end.

The next part is mainly about stories and lessons, hoping to be useful to beginners like me. I hope my understanding is correct.

Clean up, the most basic thing is

threadpool.clear()
threadpool.waitForDone()

Where clear is to remove the pending tasks, and waitForDone is to wait for the end of the run part of the worker in the executing thread. At the beginning, I added judgment:

if threadPool.activeThreadCount() > 0

Later, I thought it was unnecessary. It is possible that all threads did not start execution, although this possibility is very small.

The key problem is that the run code sends a signal to the main thread, so it will be queued. Since it is cleaning, I don't want these signal slots to be executed. This requires disconnect At first, I wanted to save the correspondence between signal and slot (one to many, many to one) in the queue, and then pop out (signal, slot) and disconnect one by one after waitForDone As a result, I encountered segmentation fault In fact, I can't access the queue after waitForDone. It is this access that causes the segmentation fault. I didn't know why.

After that, the natural idea is to save all workers or workers Signals to the queue, and then through worker or worker Signals to disconnect Here I am also a toss.

  • From the beginning, I tend to pass the worker Signals, although I don't know whether the worker can be the same (the worker is autoDelete by default). Later, I learned that the signal (pyqtSignal) instance must be defined as a class variable in the class inheriting QObject (inheritance can be multiple of course), and the instance of this class is the sender of signal, so this tendency is not bad.
  • When I first disconnected the connected slots through signal, I also encountered the problem that python's partial function cannot be disconnected as a slot. The traceback of python exception indicates that there is no such slot Then I defined a function as slot by defining an internal function, but the result was that disconnect failed. I don't know why.
  • At first, I didn't notice that you can call disconnect directly on QObject to disconnect all its signals from the connected slots, so I also saved the correspondence between signals and slots. Later I knew, so I just put the worker Signals is put in the queue.

After this toss, the code can run and is much cleaner. But anyway, I found that slot was still executed. I wonder why disconnect doesn't work. Later, when you search disconnect, you see a simple test disconnect code without context on stackoverflow, I tried it myself. (the question seemed to be a problem), and suddenly I realized that the problem was: disconnect must be called before the signal was sent out. I started to delete worker.signals after disconnect, so the code is as follows:

threadpool.clear()
threadpool.waitForDone()
while queue:
	signalsQObject = queue.pop()
	signalsQObject.disconnect()
	del signalsQObject
	# signalsQObject.deleteLater()

Regardless of the problem of delete, my fault is that I put disconnect after waitForDone, but before clear. I don't know why I never thought of this before. So I changed to the following:

for signalsQObject in queue:
	signalsQObject.disconnect()
threadpool.clear()
threadpool.waitForDone()
while queue:
	signalsQObject = queue.pop()
	del signalsQObject
	# signalsQObject.deleteLater()

slot finally stopped executing!

I've always noticed is that threadpool restarts is too laggy and GUI is stuck every time a new input is present. It's obviously caused by the slowness of waitForDone. The threads in my threadpool perform IO intensive tasks, and the website connection speed is slow, so it is very time-consuming. If you wait in the main thread, the GUI will get stuck. However, waitForDone has a timeout parameter, so I tried to set it, 0.01 seconds or even 0.001 seconds. Sure enough, there was no more Caton. Then I did some stress tests. The new input was always OK. When I closed the window and exited, I encountered a RuntimeError:

Traceback (most recent call last):
  File "/path/to/python/script", line 79, in run
    self.signals.result.emit(result)
RuntimeError: wrapped C/C++ object of type WorkerSignals has been deleted

run still running? When was QObject destroyed? I also wonder why it's just a problem when I quit.

Fortunately, at this time, I remembered that waitForDone had a bool return value. I printed it and found that the relevant threadpool returned False in waitForDone, that is, it did run. This is true for new input, as well as for closing the window and exiting. After thinking about it, I probably believe that closing the window and exiting will cause the QObject system to be destroyed, while del does not destroy signalsQObject. At first I thought del signalsQObject and signalsQObject Deletelater () will destroy signalsQObject, but sooner or later. After a query, I realized that Del just deleted Python's reference to signalsQObject, which will still exist in the QOject system. I tried calling signalsQObject Deletelater(), the new input will encounter the same problem as closing the window. When will signalsQObject exist? My workers are all temporary variables in the function and are autoDelete. If I don't save my worker Signals, that is, there are workers in the queue items of signals and slots I think the reference count of signals sender is 0 when all its signals are processed, and it will be garbage collected soon. As for the origin of the segmentation fault I didn't understand earlier, I think it's because I didn't save the worker or worker Signals, after the thread runs the worker's task, the worker is deleted, and the worker After the slot of signals is executed, the worker Signals have also been cleaned up, so their signals cannot be accessed, even if there is a reference on python.

Finally, I removed the code to delete signalsQObject, and when I was ready to exit, the timeout of waitForDone was set to - 1, that is, I kept waiting without interrupting run And for insurance, I added judgment. If waitForDone still returns False, wait another 30 seconds. In the case of new input, I simply call cleanup, regardless of whether its waitForDone is successful or failed, because the continued execution of the run in it has no impact on me. I just don't expect the slot of the signal to be executed. In fact, if I have new input here, I can even avoid waitForDone But it is always necessary to close the window and exit there, unless you can ensure that all threads in the thread pool exit when you exit.

Finally, there are two ways to clean up when closing the window and exiting: one is to overload the closeEvent of QWidget, as follows:

def closeEvent(self, event):
	# do cleanup
	self.cleanup()

	event.accept()

The other is the code cleaned up through the aboutToQuit signal connection of QApplication. My cleanup here may be very slow. The first method may cause the window to close for half a day, while the second method can close immediately, although the final exit time of the program is almost the same.

Keywords: Python Programming Qt5 PyQt5

Added by techker on Mon, 27 Dec 2021 11:06:44 +0200