I would like to add a keyboard detection for Linux to my existing Keyboard Detector for Windows. So I used pyudev
to create a LinuxKeyboardDetector
.
The script can be started and the graphical user interface appears, but unfortunately the keyboard detection does not recognize anything and does not report any error.
I suspect that there is a problem with multithreading using QRunnable
.
Code
import sys
from datetime import datetime
import platform
from PyQt5. QtCore import QObject, QRunnable, QThreadPool, pyqtSignal
from PyQt5.QtWidgets import QApplication, QMainWindow, QTableWidget, QTableWidgetItem, QHeaderView
current_platform = platform.system()
if current_platform == "Windows":
import pythoncom
import wmi
elif current_platform == "Linux":
import pyudev
from pyudev.pyqt5 import MonitorObserver
def create_keyboard_detector():
keyboard_detector = None
if current_platform == "Windows":
keyboard_detector = WindowsKeyboardDetector()
elif current_platform == "Linux":
keyboard_detector = LinuxKeyboardDetector()
return keyboard_detector
class KeyboardDetectorSignals(QObject):
keyboard_changed = pyqtSignal(str)
class WindowsKeyboardDetector(QRunnable):
def __init__(self):
super().__init__()
self.signals = KeyboardDetectorSignals()
def run(self):
pythoncom.CoInitialize()
device_connected_wql = "SELECT * FROM __InstanceCreationEvent WITHIN 2 WHERE TargetInstance ISA 'Win32_Keyboard'"
device_disconnected_wql = "SELECT * FROM __InstanceDeletionEvent WITHIN 2 WHERE TargetInstance ISA 'Win32_Keyboard'"
c = wmi.WMI()
connected_watcher = c.watch_for(raw_wql=device_connected_wql)
disconnected_watcher = c.watch_for(raw_wql=device_disconnected_wql)
while True:
try:
connected = connected_watcher(timeout_ms=10)
except wmi.x_wmi_timed_out:
pass
else:
if connected:
self.signals.keyboard_changed.emit("Keyboard connected.")
try:
disconnected = disconnected_watcher(timeout_ms=10)
except wmi.x_wmi_timed_out:
pass
else:
if disconnected:
self.signals.keyboard_changed.emit("Keyboard disconnected.")
class LinuxKeyboardDetector(QRunnable):
def __init__(self):
super().__init__()
self.signals = KeyboardDetectorSignals()
self.context = pyudev.Context()
self.monitor = pyudev.Monitor.from_netlink(self.context)
self.observer = MonitorObserver(self.monitor)
def run(self):
self.monitor.filter_by(subsystem="usb", device_type="usb_device")
self.observer.deviceEvent.connect(self.process_device_event)
self.monitor.start()
def process_device_event(self, device):
if device['ID_INPUT_KEYBOARD'] == '1':
if device.action == "add":
self.signals.keyboard_changed.emit("Keyboard connected.")
if device.action == "remove":
self.signals.keyboard_changed.emit("Keyboard disconnected.")
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setGeometry(100, 100, 500, 500)
self.setWindowTitle("Keyboard Logger")
self.log_table = QTableWidget()
self.log_table.setColumnCount(2)
self.log_table.setShowGrid(True)
self.log_table.setHorizontalHeaderLabels(["Time", "Event"])
self.log_table.horizontalHeader().setStretchLastSection(True)
self.log_table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeToContents)
self.setCentralWidget(self.log_table)
self.show()
self.threadpool = QThreadPool()
keyboard_detector = create_keyboard_detector()
keyboard_detector.signals.keyboard_changed.connect(self.add_row)
self.threadpool.start(keyboard_detector)
def add_row(self, event: str):
now = datetime.now()
datetime_string = now.strftime("%Y-%m-%d %H:%M:%S")
row_count = self.log_table.rowCount()
self.log_table.insertRow(row_count)
self.log_table.setItem(row_count, 0, QTableWidgetItem(datetime_string))
self.log_table.setItem(row_count, 1, QTableWidgetItem(event))
def main():
app = QApplication(sys.argv)
window = MainWindow()
sys.exit(app.exec_())
if __name__ == '__main__':
main()
Edit 1: Update LinuxKeyboardDetector
class to use basic pyudev.MonitorObserver, instead of the dedicated pyqt version.
class LinuxKeyboardDetector(QRunnable):
def __init__(self):
super().__init__()
self.signals = KeyboardDetectorSignals()
self.context = pyudev.Context()
self.monitor = pyudev.Monitor.from_netlink(self.context)
# self.observer = MonitorObserver(self.monitor)
self.observer = pyudev.MonitorObserver(self.monitor, self.process_device_event)
def run(self):
self.monitor.filter_by(subsystem="usb", device_type="usb_device")
# self.observer.deviceEvent.connect(self.process_device_event)
# self.monitor.start()
self.observer.start()
def process_device_event(self, device):
if device['ID_INPUT_KEYBOARD'] == '1':
if device.action == "add":
self.signals.keyboard_changed.emit("Keyboard connected.")
if device.action == "remove":
self.signals.keyboard_changed.emit("Keyboard disconnected.")
Result 1: The following error message appears when a USB keyboard is plugged in or off.
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/home/ata/source/venv/lib/python3.6/site-packages/pyudev/monitor.py", line 532, in run
self._callback(device)
File "/home/ata/source/venv/lib/python3.6/site-packages/pyudev/monitor.py", line 508, in <lambda>
callback = lambda d: event_handler(d.action, d)
TypeError: process_device_event() takes 2 positional arguments but 3 were given
See Question&Answers more detail:
os