Misbehaving QSignals in QThreadPool, QRunnable using QThead on top of it in python

503 Views Asked by At

Im stuck in one situation where my first few signals are getting passed and later on no signals are getting emitted.

I'll elaborate it in detail:

I've a job which required heavy processing and ram and can take upto 2-4 hours to complete it without threading in linear way, so I decided to use QTheadPool, so I created (while testing) 355 QRunners which gets started by QThreadPool. And few of them(QRunners) are depends on another QRunners to finish. Its all working fine, I'm able to derive dependencies by emitting signals and catching them. Its all working absolutely perfect when I run this without GUI

For example (below codes are not tested, i just typed here):

from PyQt4 import QtCore

class StreamPool(QtCore.QObject):
  def __init__(self, inputs, q_app):
    super(....)

    self.inputs = inputs
    self.q_app = q_app

    self.pool = QtCore.QThreadPool()

  def start(self):
    for each_input in self.inputs:
      runner = StreamRunner(each_input, self.q_app)
      runner.signal.operation_started.connect(self.mark_as_start)
      runner.signal.operation_finished.connect(self.mark_as_start)
      self.pool.start(runner)

  def mark_as_start(self, name):
    print 'operation started..', name
    # Some operation...

  def mark_as_finish(self, name):
    print 'operation finished..', name
    # Some operation...

class StreamRunner(QtCore.QRunnable):
  def __init__(self, input, q_app):
    super(..)

    self.input = input            
    self.q_app = q_app
    self.signal = WorkSignals()


  def run(self):
    self.signal.operation_started.emit(input)
    self.q_ap

    # Doing some operations

    self.signal.operation_finished.emit(input)
    self.q_app.processEvents()    

class WorkSignals(QtCore.QObject):
  operation_started = QtCore.pyqtSignal(str)
  operation_finished= QtCore.pyqtSignal(str)



if __name__ == '__main__':
  app = QtGui.QApplication([])      
  st = StreamPool(['a', 'b', 'c'], app)
  st.start()
  app.exec_()

It works brilliant in above case.

And I want to show the statuses in ui as there can be hundreds of task can be executed, so I wrote a simple ui, and running StreamPool() from another QThread lets say named - StreamWorkThread() which is getting spawned by ui, and StreamWorkThread catching StreamPool.signal.* and sending them back to ui, but in this case StreamPool can only emit few of them, the starting 4 or 5, though the tasks are still getting executed but the dependent task are not getting initialized due to this behavior and no status updates are getting displayed in ui.

I cant share the code with you guys, as its from my work place, I can write similar approach here

class StreamWorkThread (QtCore.QThread):
  def __init__(self, inputs, q_app):
    super(..)

    self.signal = WorkSignals()
    self.stream_pool = StreamPool(inputs, q_app)self.stream_pool.signal.operation_started.connect(self.signal.operation_started.emit)
    self.stream_pool.signal.operation_finished.connect(self.signal.operation_finished.emit)

  def run(self):
    self.stream_pool.start()


def print_start(name):
  print 'Started --', name

def print_finished(name):
  print 'Finished --', name

if __name__ == '__main__':
  app = QtGui.QApplication([])      
  th = StreamWorkThread (['a', 'b', 'c'], app)
  th.signal.operation_started.connect(print_start)
  th.signal.operation_finshed.connect(print_finished)
  th.start()
  app.exec_()

Consolidated code:

from PyQt4 import QtCore

class StreamPool(QtCore.QObject):
  def __inti__(self, inputs, q_app):
    super(StreamPool, self).__init()

    self.inputs = inputs
    self.q_app = q_app

    self.pool = QtCore.QThreadPool()

  def start(self):
    for each_input in self.inputs:
      runner = StreamRunner(each_input, self.q_app)
      runner.signal.operation_started.connect(self.mark_as_start)
      runner.signal.operation_finished.connect(self.mark_as_start)
      self.pool.start(runner)

  def mark_as_start(self, name):
    print 'operation started..', name
    # Some operation...

  def mark_as_finish(self, name):
    print 'operation finished..', name
    # Some operation...

class StreamRunner(QtCore.QRunnable):
  def __init__(self, input, q_app):
    super(StreamRunner, self).__init()

    self.input = input            
    self.q_app = q_app
    self.signal = WorkSignals()


  def run(self):
    self.signal.operation_started.emit(input)
    self.q_ap

    # Doing some operations

    self.signal.operation_finished.emit(input)
    self.q_app.processEvents()    

class WorkSignals(QtCore.QObject):
  operation_started = QtCore.pyqtSignal(str)
  operation_finished= QtCore.pyqtSignal(str)

class StreamWorkThread (QtCore.QThread):
  def __init__(self, inputs, q_app):
    super(StreamWorkThread, self).__init()

    self.signal = WorkSignals()
    self.stream_pool = StreamPool(inputs,q_app)
    self.stream_pool.signal.operation_started.connect(self.signal.operation_started.emit)
    self.stream_pool.signal.operation_finished.connect(self.signal.operation_finished.emit)

  def run(self):
    self.stream_pool.start()


def print_start(name):
  print 'Started --', name

def print_finished(name):
  print 'Finished --', name

if __name__ == '__main__':
  app = QtGui.QApplication([])      
  th = StreamWorkThread (['a', 'b', 'c'], app)
  th.signal.operation_started.connect(print_start)
  th.signal.operation_finshed.connect(print_finished)
  th.start()
  app.exec_()

Please guys help me, im not getting what exactly the problem here..! :(

2

There are 2 best solutions below

0
On BEST ANSWER

Okay, I got the solution for this behavior.

The root cause of blocking the signal is inheriting QObject in StrealPool, when I replace QObject with QThread it worked seamlessly.

Here are the changes I made, only tow places

class StreamPool(**QtCore.QThread**):
  def __inti__(self, inputs, q_app):
    super(StreamPool, self).__init()

    self.inputs = inputs
    self.q_app = q_app

    self.pool = QtCore.QThreadPool()

  def **run**(self):
    for each_input in self.inputs:
      runner = StreamRunner(each_input, self.q_app)
      runner.signal.operation_started.connect(self.mark_as_start)
      runner.signal.operation_finished.connect(self.mark_as_start)
      self.pool.start(runner)

and that set, it worked ! :D

8
On

very difficult without the source code, but the problem is probably when "app.exec_()" is run, the mainloop of the gui is started and interferes with your Streamx classes