Interrupt paho mqtt client to reload subscriptions

854 Views Asked by At

I have an mqtt client app that subscribes to topics based on a configuration file. Something like:

def connectMQTT():
    global Connection
    Connection = Client()
    Connection.on_message = handleQuery
    for clientid in clientids.allIDs(): # clientids.allIDs() reads files to get this
        topic = '{}/{}/Q/+'.format(Basename, clientid)
        print('subscription:', topic)
        Connection.subscribe(topic)

I have been using it with a simple invocation like:

def main():
    connectMQTT()
    Connection.loop_forever()

The loop_forever will block forever. But I'd like to notice when the information read by clientids.allIDs() is out of date and I should reconnect forcing it to subscribe afresh.

I can detect a change in the files with pyinotify:

def filesChanged():
    # NOT SURE WHAT TO DO HERE

def watchForChanges():
    watchManager = pyinotify.WatchManager()
    notifier = pyinotify.ThreadedNotifier(watchManager, FileEventHandler(eventCallback))
    notifier.start()
    watchManager.add_watch('/etc/my/config/dir', pyinotify.IN_CLOSE_WRITE | pyinotify.IN_DELETE)

Basically, I need loop_forever (or some other paho mqtt mechanism) to run until some signal comes from the pyinotify machinery. I'm not sure how to weld those two together though. In pseudo code, I thing I want something like

def main():
    signal = setup_directory_change_signal()
    while True:
        connectMQTT()
        Connection.loop(until=signal)
        Connection.disconnect()

I'm not sure how to effect that though.

1

There are 1 best solutions below

0
On BEST ANSWER

I finally circled around to the following solution which seems to work. Whereas I was trying to run the notifier in another thread and the mqtt loop in the main thread, the trick seemed to be invert that setup:

def restartMQTT():
    if Connection:
        Connection.loop_stop()
    connectMQTT()
    Connection.loop_start()

class FileEventHandler(pyinotify.ProcessEvent):
    def process_IN_CREATE(self, fileEvent):
        restartMQTT()

    def process_IN_DELETE(self, fileEvent):
        restartMQTT()


def main():
    restartMQTT()
    watchManager = pyinotify.WatchManager()
    notifier = pyinotify.Notifier(watchManager, FileEventHandler())
    watchManager.add_watch('/etc/my/config_directory', pyinotify.IN_CREATE | pyinotify.IN_DELETE)
    notifier.loop()

Where connectMQTT stores a newly connected and configured MQTT client in the Connection global.