Notify QML for 'usb device inserted' events using PyQt5 and pyudev

593 Views Asked by At

I have a GUI application (made with PyQt5 and QML) and would like to get notify when a usb device is plugged or unplugged from the computer. After some investigation, I have found that pyudev could be the library to use. But I have trouble using it with PyQt5 and QML. I have succeed to use the pyudev example for MonitorObservor, and there are other example provided in the documentation, here with PySide and here with Glib. I have also found an example using PyQt5 and widgets application here. But I have trouble implementing this on my PyQt5 QML application. I am sure it is very easy, so I think I'm just missing something but I can't found out what...

Here is what I have so far:

import sys
from PyQt5.QtWidgets import QApplication
from PyQt5.QtQml import QQmlApplicationEngine
from PyQt5.QtCore import QUrl
from pyudev import Context, Monitor, Device
from pyudev.pyqt5 import MonitorObserver
from Passerelle import *

# def device_connected(self, device):
def device_connected(self):
    print("Test")
    print("device action: ", device.action, ", path: ", device.device_path)
if __name__ == "__main__":
    app = QApplication(sys.argv)
    engine = QQmlApplicationEngine()
    p = Passerelle()
    engine.rootContext().setContextProperty("passerelle", p)
    engine.load(QUrl("main.qml"))
    if not engine.rootObjects:
        sys.exit(-1)

    context = Context()
    monitor = Monitor.from_netlink(context)
    # monitor.filter_by(subsystem='tty')
    observer = MonitorObserver(monitor)
    observer.deviceEvent.connect(device_connected)
    monitor.start()
    ret = app.exec_()
    sys.exit(ret)

I have succeeded to print "Test" on the console when unplugging or plugging back a device but can't seem to print the device information (TypeError: device_connected() missing 1 required positional argument: 'device' when I uncomment def device_connected(self, device):).

Here the first step would be to be able to print the device information on the console, then to find a way to notify the GUI and finally to notify the GUI only if the device plugged or unplugged has a specified VID/PID.

Edit: I have found a way to identify the device through VID PID using vid = device.get('ID_VENDOR_ID') and pid = device.get('ID_MODEL_ID')

For the 2nd step, I had thought of using the Passerelle class as the QML backend:

from PyQt5.QtCore import QObject, pyqtSlot, pyqtSignal#, pyqtProperty, QUrl
from pyudev import Context, Monitor
from pyudev.pyqt5 import MonitorObserver

def device_event(observer, device):
    print ("event ", device.action, " on device ", device)
class Passerelle(QObject):
    sendDeviceEvent = pyqtSignal(int)
    def __init__(self, parent=None):
        print("Passerelle constructor called")
        QObject.__init__(self, parent)
        print("end Passerelle constructor")
    @pyqtSlot()
    def setObserverForDeviceEvents(self):
        print("setObserverForDeviceEvents called")
        context = Context()
        monitor = Monitor.from_netlink(context)
        monitor.filter_by(subsystem='usb')
        observer = MonitorObserver(monitor)
        observer.deviceEvent.connect(self.device_connected)
        monitor.start()
        print("end setObserverForDeviceEvents")
    def device_connected(self, device):
        print("Test")
        print("device action: ", device.action, ", path: ", device.device_path)

But I am not sure it is a good idea as I read in that post that the monitor needed to be started before entering qt's main loop. Which I understand as : the monitor should be started in main.py before calling app.exec_()...

Thank you in advance for your help !

1

There are 1 best solutions below

3
On BEST ANSWER

The best thing to do is to modify the GUI in QML, for which the Monitor and Device object must be accessible from QML. Only QObjects receive notifications so I will create 2 classes that wrap with a light layer to both classes using q-properties and slots.

pyqtudev.py

from PyQt5 import QtCore
from pyudev import Context, Monitor, Device
from pyudev.pyqt5 import MonitorObserver

class QtUdevDevice(QtCore.QObject):
    def __init__(self, parent=None):
        super(QtUdevDevice, self).__init__(parent)
        self.m_dev = None

    def initialize(self, dev):
        self.m_dev = dev

    @QtCore.pyqtSlot(result=bool)
    def isValid(self):
        return self.m_dev is not None

    @QtCore.pyqtProperty(str, constant=True)
    def devType(self):
        if not self.isValid():
            return ""
        if self.m_dev.device_type is None:
            return ""
        return self.m_dev.device_type

    @QtCore.pyqtProperty(str, constant=True)
    def subsystem(self):
        if not self.isValid():
            return ""
        return self.m_dev.subsystem

    @QtCore.pyqtProperty(str, constant=True)
    def name(self):
        if not self.isValid():
            return ""
        return self.m_dev.sys_name

    @QtCore.pyqtProperty(str, constant=True)
    def driver(self):
        if not self.isValid():
            return ""
        if self.m_dev.driver is None:
            return ""
        return self.m_dev.driver

    @QtCore.pyqtProperty(str, constant=True)
    def deviceNode(self):
        if not self.isValid():
            return ""
        if self.m_dev.device_node is None:
            return ""
        return self.m_dev.device_node

    @QtCore.pyqtProperty(list, constant=True)
    def alternateDeviceSymlinks(self):
        return list(self.m_dev.device_links)

    @QtCore.pyqtProperty(str, constant=True)
    def sysfsPath(self):
        if not self.isValid():
            return ""
        return self.m_dev.sys_path

    @QtCore.pyqtProperty(int, constant=True)
    def sysfsNumber(self):
        if not self.isValid():
            return -1
        if self.m_dev.sys_number is None:
            return -1
        return int(self.m_dev.sys_number)

    @QtCore.pyqtSlot(str, result=str)
    def property(self, name):
        if not self.isValid():
            return ""
        v = self.m_dev.properties.get(name)
        return v if v is not None else ""

    @QtCore.pyqtSlot(str, result=bool)
    def hasProperty(self, name):
        if not self.isValid():
            return False
        return self.m_dev.properties.get(name) is not None

    @QtCore.pyqtProperty(list, constant=True)
    def deviceProperties(self):
        if not self.isValid():
            return []
        return list(self.m_dev.properties)

    @QtCore.pyqtProperty(list, constant=True)
    def sysfsProperties(self):
        if not self.isValid():
            return []
        return list(self.m_dev.attributes.available_attributes)

    @QtCore.pyqtProperty(QtCore.QObject, constant=True)
    def parentDevice(self):
        if not self.isValid:
            return 
        if self.m_dev.parent:
            parent_device = QtUdevDevice()
            parent_device.initialize(self.m_dev.parent)
            return parent_device

    @QtCore.pyqtProperty(str, constant=True)
    def action(self):
        if not self.isValid():
            return ""
        if self.m_dev.action is None:
            return ""
        return self.m_dev.action

    def __repr__(self):
        if self.isValid():
            return "UdevDevice({})".format(self.sysfsPath())
        return "Invalid UdevDevice"

class QtMonitorObserver(QtCore.QObject):
    deviceEvent = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
    deviceAdded = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
    deviceRemoved = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
    deviceChanged = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
    deviceOnlined = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])
    deviceOfflined = QtCore.pyqtSignal(QtUdevDevice, arguments=["device"])

    def __init__(self, parent=None):
        super(QtMonitorObserver, self).__init__(parent)
        context = Context()
        self._monitor = Monitor.from_netlink(context)
        self._observer = MonitorObserver(self._monitor, self)
        self._observer.deviceEvent.connect(self.setup_new_signals)

    @QtCore.pyqtSlot()
    def start(self):
        self._monitor.start()

    @QtCore.pyqtSlot(str)
    def filter_by(self, filter):
        self._monitor.filter_by(subsystem=filter)

    @QtCore.pyqtSlot(str)
    def filter_by_tag(self, tag):
        self._monitor.filter_by_tag(tag)

    @QtCore.pyqtSlot()
    def remove_filter(self):
        self._monitor.remove_filter()

    @QtCore.pyqtSlot(Device)
    def setup_new_signals(self, device):
        new_signals_map = {
            'add': self.deviceAdded,
            'remove': self.deviceRemoved,
            'change': self.deviceChanged,
            'online': self.deviceOnlined,
            'offline': self.deviceOfflined,
        }
        signal = new_signals_map.get(device.action)
        qtdevice = QtUdevDevice()
        qtdevice.initialize(device)
        if signal:
            signal.emit(qtdevice)
        self.deviceEvent.emit(qtdevice)

main.py

import os
import sys
from PyQt5 import QtCore, QtGui, QtQml

from pyqtudev import QtMonitorObserver

def run():
    app = QtGui.QGuiApplication(sys.argv)
    engine = QtQml.QQmlApplicationEngine()
    observer = QtMonitorObserver()
    engine.rootContext().setContextProperty("observer", observer)
    directory = os.path.dirname(os.path.abspath(__file__))
    engine.load(QtCore.QUrl.fromLocalFile(os.path.join(directory, 'main.qml')))
    if not engine.rootObjects():
        return -1
    return app.exec_()

if __name__ == "__main__":
    sys.exit(run())

main.qml

import QtQuick 2.11
import QtQuick.Window 2.2
import QtQuick.Controls 2.2

ApplicationWindow {    
    visible: true
    width: Screen.width/2
    height: Screen.height/2
    Connections {
        target: observer
        onDeviceEvent: {
            console.log(device, device.name, device.action, device.parentDevice)
            if(device.hasProperty("ID_VENDOR_ID")){
                console.log(device.property("ID_VENDOR_ID"))
            }
        }
    }
    Component.onCompleted: {
        observer.start()
        observer.filter_by("usb")
    } 
}