from __future__ import annotations import time from PySide6 import QtCore from dashboard_controller import DashboardController class EventThread(QtCore.QThread): eventReceived = QtCore.Signal(object) error = QtCore.Signal(str) def __init__(self, controller: DashboardController, parent=None) -> None: super().__init__(parent) self.ctrl = controller self._stop = False self._since = 0 def stop(self) -> None: self._stop = True def run(self) -> None: # pragma: no cover - thread while not self._stop: try: for ev in self.ctrl.iter_events(since=self._since, stop=lambda: self._stop): if self._stop: break try: self._since = int(getattr(ev, "id", self._since)) except Exception: pass self.eventReceived.emit(ev) time.sleep(0.5) except Exception as e: self.error.emit(str(e)) time.sleep(1.5) class LocationsThread(QtCore.QThread): loaded = QtCore.Signal(object) error = QtCore.Signal(str) def __init__( self, controller: DashboardController, force_refresh: bool = False, parent=None, ) -> None: super().__init__(parent) self.ctrl = controller self.force_refresh = bool(force_refresh) def run(self) -> None: # pragma: no cover - thread try: if self.force_refresh: self.ctrl.vpn_locations_refresh_trigger() self.loaded.emit(self.ctrl.vpn_locations_state_view()) except Exception as e: self.error.emit(str(e)) __all__ = ["EventThread", "LocationsThread"]