Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
155 views
in Technique[技术] by (71.8m points)

python - Multithreaded Keyboard Detector for Linux and Windows

I would like to add a keyboard detection for Linux to my existing Keyboard Detector for Windows. So I used pyudev to create a LinuxKeyboardDetector.

The script can be started and the graphical user interface appears, but unfortunately the keyboard detection does not recognize anything and does not report any error.

I suspect that there is a problem with multithreading using QRunnable.

Code

import sys
from datetime import datetime
import platform

from PyQt5. QtCore import QObject, QRunnable, QThreadPool, pyqtSignal
from PyQt5.QtWidgets import QApplication, QMainWindow, QTableWidget, QTableWidgetItem, QHeaderView


current_platform = platform.system()
if current_platform == "Windows":
    import pythoncom
    import wmi
elif current_platform == "Linux":
    import pyudev
    from pyudev.pyqt5 import MonitorObserver


def create_keyboard_detector():
    keyboard_detector = None
    if current_platform == "Windows":
        keyboard_detector = WindowsKeyboardDetector()
    elif current_platform == "Linux":
        keyboard_detector = LinuxKeyboardDetector()
    return keyboard_detector


class KeyboardDetectorSignals(QObject):
    keyboard_changed = pyqtSignal(str)


class WindowsKeyboardDetector(QRunnable):
    def __init__(self):
        super().__init__()

        self.signals = KeyboardDetectorSignals()

    def run(self):
        pythoncom.CoInitialize()
        device_connected_wql = "SELECT * FROM __InstanceCreationEvent WITHIN 2 WHERE TargetInstance ISA 'Win32_Keyboard'"
        device_disconnected_wql = "SELECT * FROM __InstanceDeletionEvent WITHIN 2 WHERE TargetInstance ISA 'Win32_Keyboard'"

        c = wmi.WMI()
        connected_watcher = c.watch_for(raw_wql=device_connected_wql)
        disconnected_watcher = c.watch_for(raw_wql=device_disconnected_wql)

        while True:
            try:
                connected = connected_watcher(timeout_ms=10)
            except wmi.x_wmi_timed_out:
                pass
            else:
                if connected:
                    self.signals.keyboard_changed.emit("Keyboard connected.")

            try:
                disconnected = disconnected_watcher(timeout_ms=10)
            except wmi.x_wmi_timed_out:
                pass
            else:
                if disconnected:
                    self.signals.keyboard_changed.emit("Keyboard disconnected.")


class LinuxKeyboardDetector(QRunnable):
    def __init__(self):
        super().__init__()
        self.signals = KeyboardDetectorSignals()
        self.context = pyudev.Context()
        self.monitor = pyudev.Monitor.from_netlink(self.context)
        self.observer = MonitorObserver(self.monitor)

    def run(self):
        self.monitor.filter_by(subsystem="usb", device_type="usb_device")
        self.observer.deviceEvent.connect(self.process_device_event)
        self.monitor.start()

    def process_device_event(self, device):
        if device['ID_INPUT_KEYBOARD'] == '1':
            if device.action == "add":
                self.signals.keyboard_changed.emit("Keyboard connected.")
            if device.action == "remove":
                self.signals.keyboard_changed.emit("Keyboard disconnected.")


class MainWindow(QMainWindow):

    def __init__(self):
        super().__init__()

        self.setGeometry(100, 100, 500, 500)
        self.setWindowTitle("Keyboard Logger")

        self.log_table = QTableWidget()
        self.log_table.setColumnCount(2)
        self.log_table.setShowGrid(True)
        self.log_table.setHorizontalHeaderLabels(["Time", "Event"])
        self.log_table.horizontalHeader().setStretchLastSection(True)
        self.log_table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeToContents)
        self.setCentralWidget(self.log_table)
        self.show()

        self.threadpool = QThreadPool()
        keyboard_detector = create_keyboard_detector()
        keyboard_detector.signals.keyboard_changed.connect(self.add_row)
        self.threadpool.start(keyboard_detector)

    def add_row(self, event: str):
        now = datetime.now()
        datetime_string = now.strftime("%Y-%m-%d %H:%M:%S")

        row_count = self.log_table.rowCount()
        self.log_table.insertRow(row_count)
        self.log_table.setItem(row_count, 0, QTableWidgetItem(datetime_string))
        self.log_table.setItem(row_count, 1, QTableWidgetItem(event))


def main():
    app = QApplication(sys.argv)
    window = MainWindow()
    sys.exit(app.exec_())


if __name__ == '__main__':

    main()

Edit 1: Update LinuxKeyboardDetector class to use basic pyudev.MonitorObserver, instead of the dedicated pyqt version.

class LinuxKeyboardDetector(QRunnable):
    def __init__(self):
        super().__init__()
        self.signals = KeyboardDetectorSignals()
        self.context = pyudev.Context()
        self.monitor = pyudev.Monitor.from_netlink(self.context)
        # self.observer = MonitorObserver(self.monitor)
        self.observer = pyudev.MonitorObserver(self.monitor, self.process_device_event)

    def run(self):
        self.monitor.filter_by(subsystem="usb", device_type="usb_device")
        # self.observer.deviceEvent.connect(self.process_device_event)
        # self.monitor.start()
        self.observer.start()

    def process_device_event(self, device):
        if device['ID_INPUT_KEYBOARD'] == '1':
            if device.action == "add":
                self.signals.keyboard_changed.emit("Keyboard connected.")
            if device.action == "remove":
                self.signals.keyboard_changed.emit("Keyboard disconnected.")

Result 1: The following error message appears when a USB keyboard is plugged in or off.

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/ata/source/venv/lib/python3.6/site-packages/pyudev/monitor.py", line 532, in run
    self._callback(device)
  File "/home/ata/source/venv/lib/python3.6/site-packages/pyudev/monitor.py", line 508, in <lambda>
    callback = lambda d: event_handler(d.action, d)
TypeError: process_device_event() takes 2 positional arguments but 3 were given
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

According to this answer, you have to start the monitor before the Qt app event loop. In this case, you must not use a QRunnable at all, as the monitor will work as a standard QObject, working asynchronously and sending signals whenever required.

If you still want to keep the same interface, using QRunnable, I think that the only solution is to use the basic pyudev.MonitorObserver, instead of the dedicated pyqt version.

class LinuxKeyboardDetector(QRunnable):
    def __init__(self):
        super().__init__()
        self.signals = KeyboardDetectorSignals()
        self.context = pyudev.Context()
        self.monitor = pyudev.Monitor.from_netlink(self.context)
        self.observer = pyudev.MonitorObserver(self.monitor, self.process_device_event)

    def run(self):
        self.monitor.filter_by(subsystem="usb", device_type="usb_device")
        self.observer.start()

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...