Using Queue() to send commands to specific threads

1.4k Views Asked by At

I want to send messages to specific threads that are acting as controllers for multiple instances of a service. This is the code snippet each thread will use to check for messages that apply to it. The comment should explain what it does fairly well (and if my comment is bad, please let me know how I should be writing them!)

This isn't a bug fix question - I'm new to threading and want to know if there's a better way to go about doing this. It feels a bit kludgey to me.

def check_queue(self):
      """Gets a simple lock on the command queue, checks its size, then
      iterates through jobs, looking for one that matches its uuid.
      If there is a match, break the out
      If not, put the job back on the queue, set job equal to None, and try again
  """
  with simple_lock:
      for i in range(cmd_q.qsize()):
          self.job = cmd_q.get()
          if job[0] == self.uuid
                break
          cmd_q.push(self.job)
          self.job = None
def command_check(self)
    while not self.job:
        sleep(.5) #wait for things to happen
        self.checkQueue()
    if self.job == 'kill':
        die_gracefully()
     ...etc
#example of a what commands are being passed
def kill_service(uuid):
    job = [uuid, 1]
    cmd_queue.put(job)
    #handle clean up

Any help/comments/critiques would be much appreciated.

Edit: simple_lock is just threading.Lock(). I used it to ensure that calls to cmd_q.qsize() would be accurate - otherwise, another process could add to the queue in between the call to qsize and going through the jobs.

2

There are 2 best solutions below

1
On BEST ANSWER

You should really have each thread have its own unique queue: Create a dict that maps from the uuid to the Queue that goes with that uuid. The producer can use that dict to figure out which queue to push to, and then each consumer can just use q.get() without the need to take a lock, possibly re-queue if its the wrong uuid, etc.

I took your example code and expanded it a bit to demonstrate how that might look:

from Queue import Queue
import threading

class MyThread(threading.Thread):
    cmd_queues = {}  # Map of uuid -> Queue for all instances of MyThread

    def __init__(self, uuid):
        super(threading.Thread, self).__init__()
        self.cmd_q = Queue()
        self.uuid = uuid
        MyThread.cmd_queues[uuid] = self.cmd_q

    def run(self):
        while True:
            self.command_check()

    def command_check(self):
        job = self.cmd_q.get()
        if job == 'kill':
            self.die_gracefully()
         #...etc

    def die_gracefully(self):
       # stuff

def kill_service(uuid):
    job = 1
    MyThread.cmd_queues[uuid].put(job)
    #handle clean up

if __name__ == "__main__":
    for uuid in ['123', '234', '345']:
        t = MyThread(uuid)
        t.start()

    kill_service('123')
0
On

With Python 2.7, @dano's answer (which I think is awesome) raises a RuntimeError - RuntimeError: thread.__init__() not called. I believe that the exception can be remedied by replacing threading.Thread with the base class name when calling super.

from Queue import Queue
import threading

class MyThread(threading.Thread):
    cmd_queues = {}  # Map of uuid -> Queue for all instances of MyThread

    def __init__(self, uuid):
        super(MyThread, self).__init__()
        self.cmd_q = Queue()
        self.uuid = uuid
        MyThread.cmd_queues[uuid] = self.cmd_q

    def run(self):
        while True:
            self.command_check()

    def command_check(self):
        job = self.cmd_q.get()
        if job == 'kill':
            self.die_gracefully()
         #...etc

    def die_gracefully(self):
       # stuff
       pass

def kill_service(uuid):
    job = 1
    MyThread.cmd_queues[uuid].put(job)
    #handle clean up

if __name__ == "__main__":
    for uuid in ['123', '234', '345']:
        t = MyThread(uuid)
        t.start()

    kill_service('123')

Edits to this answer are appreciated if that's not the proper fix.