from __future__ import annotations import time from PySide6 import QtCore from dashboard_controller import TraceMode from main_window.workers import EventThread, LocationsThread class RuntimeRefreshMixin: def _start_events_stream(self) -> None: if self.events_thread: return self.events_thread = EventThread(self.ctrl, self) self.events_thread.eventReceived.connect(self._handle_event) self.events_thread.error.connect(self._handle_event_error) self.events_thread.start() @QtCore.Slot(object) def _handle_event(self, ev) -> None: try: kinds = self.ctrl.classify_event(ev) except Exception: kinds = [] # Отдельно ловим routes_nft_progress, чтобы обновить лейбл прогресса. try: k = (getattr(ev, "kind", "") or "").strip().lower() except Exception: k = "" if k == "routes_nft_progress": try: prog_view = self.ctrl.routes_nft_progress_from_event(ev) self._update_routes_progress_label(prog_view) except Exception: # не роняем UI, просто игнор pass # Простая стратегия: триггерить существующие refresh-функции. if "status" in kinds: self.refresh_status_tab() if "login" in kinds: self.refresh_login_banner() if "vpn" in kinds: self.refresh_vpn_tab() if "routes" in kinds: self.refresh_routes_tab() if "dns" in kinds: self.refresh_dns_tab() if "transport" in kinds: self.refresh_singbox_tab() self._refresh_selected_transport_health_live(silent=True) if "trace" in kinds: self.refresh_trace_tab() @QtCore.Slot(str) def _handle_event_error(self, msg: str) -> None: # Логируем в trace, UI не блокируем. try: self.ctrl.log_gui(f"[sse-error] {msg}") except Exception: pass # ---------------- REFRESH ---------------- def refresh_everything(self) -> None: self.refresh_login_banner() self.refresh_status_tab() self.refresh_vpn_tab() self.refresh_singbox_tab() self.refresh_routes_tab() self.refresh_dns_tab() self.refresh_domains_tab() self.refresh_trace_tab() def refresh_login_banner(self) -> None: def work(): view = self.ctrl.get_login_view() self._set_auth_button(view.logged_in) if self._vpn_switching_active: if self._is_vpn_switching_expired(): self._stop_vpn_location_switching() else: target = (self._vpn_switching_target or "").strip() msg = "AdGuard VPN: Switching location..." if target: msg = f"AdGuard VPN: Switching location to {target}..." self.btn_login_banner.setText(msg) self.btn_login_banner.setStyleSheet( "text-align: left; border: none; color: #d4a200;" ) return self.btn_login_banner.setText(view.text) # Принудительно: зелёный если залогинен, серый если нет color = "green" if view.logged_in else "gray" base_style = "text-align: left; border: none;" self.btn_login_banner.setStyleSheet( f"{base_style} color: {color};" ) self._safe(work, title="Login state error") def refresh_status_tab(self) -> None: def work(): view = self.ctrl.get_status_overview() self.st_timestamp.setText(view.timestamp) self.st_counts.setText(view.counts) self.st_iface.setText(view.iface_table_mark) self._set_status_label_color( self.st_route, view.policy_route, kind="policy" ) self._set_status_label_color( self.st_routes_service, view.routes_service, kind="service" ) self._set_status_label_color( self.st_smartdns_service, view.smartdns_service, kind="service" ) self._set_status_label_color( self.st_vpn_service, view.vpn_service, kind="service" ) self._safe(work, title="Status error") def refresh_vpn_tab(self) -> None: def work(): view = self.ctrl.vpn_status_view() prev_desired = (self._vpn_desired_location_last_seen or "").strip().lower() self._vpn_desired_location = (view.desired_location or "").strip() current_desired = (self._vpn_desired_location or "").strip().lower() self._vpn_desired_location_last_seen = self._vpn_desired_location txt = [] if view.desired_location: txt.append(f"Desired location: {view.desired_location}") if view.pretty_text: txt.append(view.pretty_text.rstrip()) self._set_text(self.txt_vpn, "\n".join(txt).strip() + "\n") auto_view = self.ctrl.vpn_autoconnect_view() self.btn_autoconnect_toggle.setText( "Disable autoconnect" if auto_view.enabled else "Enable autoconnect" ) self.lbl_autoconnect_state.setText(auto_view.unit_text) self.lbl_autoconnect_state.setStyleSheet( f"color: {auto_view.color};" ) vpn_egress = self._refresh_egress_identity_scope( "adguardvpn", trigger_refresh=True, silent=True, ) self._render_vpn_egress_label(vpn_egress) self._maybe_trigger_vpn_egress_refresh_on_autoloop(auto_view.unit_text) if prev_desired and current_desired and prev_desired != current_desired: self._trigger_vpn_egress_refresh( reason=f"desired location changed: {prev_desired} -> {current_desired}" ) if self._vpn_switching_active: unit_low = (auto_view.unit_text or "").strip().lower() elapsed = self._vpn_switching_elapsed_sec() if any( x in unit_low for x in ("disconnected", "reconnecting", "unknown", "error", "inactive", "failed", "dead") ): self._vpn_switching_seen_non_connected = True desired_now = (self._vpn_desired_location or "").strip().lower() target_now = (self._vpn_switching_target or "").strip().lower() desired_matches = bool(target_now and desired_now and target_now == desired_now) if self._is_vpn_switching_expired(): self._stop_vpn_location_switching() elif ( "connected" in unit_low and "disconnected" not in unit_low and elapsed >= float(self._vpn_switching_min_visible_sec) and (self._vpn_switching_seen_non_connected or desired_matches) ): switched_to = (self._vpn_switching_target or "").strip() self._stop_vpn_location_switching() if switched_to: self._trigger_vpn_egress_refresh( reason=f"location switch completed: {switched_to}" ) self.refresh_login_banner() self._refresh_locations_async() self._safe(work, title="VPN error") def refresh_singbox_tab(self) -> None: def work(): self.refresh_transport_engines(silent=True) self.refresh_transport_policy_locks(silent=True) self._apply_singbox_profile_controls() self._safe(work, title="SingBox error") def _start_vpn_location_switching(self, target: str) -> None: self._vpn_switching_active = True self._vpn_switching_target = str(target or "").strip() self._vpn_switching_started_at = time.monotonic() self._vpn_switching_seen_non_connected = False def _stop_vpn_location_switching(self) -> None: self._vpn_switching_active = False self._vpn_switching_target = "" self._vpn_switching_started_at = 0.0 self._vpn_switching_seen_non_connected = False def _is_vpn_switching_expired(self) -> bool: if not self._vpn_switching_active: return False started = float(self._vpn_switching_started_at or 0.0) if started <= 0: return False return (time.monotonic() - started) >= float(self._vpn_switching_timeout_sec) def _vpn_switching_elapsed_sec(self) -> float: if not self._vpn_switching_active: return 0.0 started = float(self._vpn_switching_started_at or 0.0) if started <= 0: return 0.0 return max(0.0, time.monotonic() - started) def _refresh_locations_async(self, force_refresh: bool = False) -> None: if self.locations_thread and self.locations_thread.isRunning(): self._locations_refresh_pending = True if force_refresh: self._locations_force_refresh_pending = True return run_force_refresh = bool(force_refresh or self._locations_force_refresh_pending) self._locations_refresh_pending = False self._locations_force_refresh_pending = False self.locations_thread = LocationsThread( self.ctrl, force_refresh=run_force_refresh, parent=self, ) self.locations_thread.loaded.connect(self._on_locations_loaded) self.locations_thread.error.connect(self._on_locations_error) self.locations_thread.finished.connect(self._on_locations_finished) self.locations_thread.start() @QtCore.Slot(object) def _on_locations_loaded(self, state) -> None: try: self._apply_locations_state(state) except Exception as e: self._on_locations_error(str(e)) @QtCore.Slot(str) def _on_locations_error(self, msg: str) -> None: msg = (msg or "").strip() if not msg: msg = "failed to load locations" self.lbl_locations_meta.setText(f"Locations: {msg}") self.lbl_locations_meta.setStyleSheet("color: red;") try: self.ctrl.log_gui(f"[vpn-locations] {msg}") except Exception: pass @QtCore.Slot() def _on_locations_finished(self) -> None: self.locations_thread = None if self._locations_refresh_pending: force_refresh = self._locations_force_refresh_pending self._locations_refresh_pending = False self._locations_force_refresh_pending = False self._refresh_locations_async(force_refresh=force_refresh) def _apply_locations_state(self, state) -> None: all_items: list[tuple[str, str, str, str, int]] = [] for loc in getattr(state, "locations", []) or []: iso = str(getattr(loc, "iso", "") or "").strip().upper() label = str(getattr(loc, "label", "") or "").strip() target = str(getattr(loc, "target", "") or "").strip() if not iso or not label: continue if not target: target = iso name, ping = self._location_name_ping(label, iso, target) all_items.append((label, iso, target, name, ping)) self._all_locations = all_items self._apply_location_search_filter() self._render_locations_meta(state) def _render_locations_meta(self, state) -> None: parts = [] color = "gray" updated_at = str(getattr(state, "updated_at", "") or "").strip() stale = bool(getattr(state, "stale", False)) refreshing = bool(getattr(state, "refresh_in_progress", False)) last_error = str(getattr(state, "last_error", "") or "").strip() next_retry = str(getattr(state, "next_retry_at", "") or "").strip() if refreshing: parts.append("refreshing") color = "orange" if updated_at: parts.append(f"updated: {updated_at}") else: parts.append("updated: n/a") if stale: parts.append("stale cache") color = "orange" if last_error: cut = last_error if len(last_error) <= 120 else last_error[:117] + "..." parts.append(f"last error: {cut}") color = "red" if not refreshing else "orange" if next_retry: parts.append(f"next retry: {next_retry}") self.lbl_locations_meta.setText(" | ".join(parts)) self.lbl_locations_meta.setStyleSheet(f"color: {color};") def refresh_routes_tab(self) -> None: def work(): timer_enabled = self.ctrl.routes_timer_enabled() self.chk_timer.blockSignals(True) self.chk_timer.setChecked(bool(timer_enabled)) self.chk_timer.blockSignals(False) t = self.ctrl.traffic_mode_view() self._set_traffic_mode_state( t.desired_mode, t.applied_mode, t.preferred_iface, bool(t.advanced_active), bool(t.auto_local_bypass), bool(t.auto_local_active), bool(t.ingress_reply_bypass), bool(t.ingress_reply_active), int(t.bypass_candidates), int(t.overrides_applied), int(t.cgroup_resolved_uids), t.cgroup_warning, bool(t.healthy), bool(t.ingress_rule_present), bool(t.ingress_nft_active), bool(t.probe_ok), t.probe_message, t.active_iface, t.iface_reason, t.message, ) rs = self.ctrl.routes_resolve_summary_view() self.lbl_routes_resolve_summary.setText(rs.text) self.lbl_routes_resolve_summary.setStyleSheet(f"color: {rs.color};") self.lbl_routes_recheck_summary.setText(rs.recheck_text) self.lbl_routes_recheck_summary.setStyleSheet(f"color: {rs.recheck_color};") self._safe(work, title="Routes error") def refresh_dns_tab(self) -> None: def work(): self._dns_ui_refresh = True try: pool = self.ctrl.dns_upstream_pool_view() self._set_dns_resolver_summary(getattr(pool, "items", [])) st = self.ctrl.dns_status_view() self.ent_smartdns_addr.setText(st.smartdns_addr or "") mode = (getattr(st, "mode", "") or "").strip().lower() if mode in ("hybrid_wildcard", "hybrid"): hybrid_enabled = True mode = "hybrid_wildcard" else: hybrid_enabled = False mode = "direct" self.chk_dns_via_smartdns.blockSignals(True) self.chk_dns_via_smartdns.setChecked(hybrid_enabled) self.chk_dns_via_smartdns.blockSignals(False) unit_state = (st.unit_state or "unknown").strip().lower() unit_active = unit_state == "active" self.chk_dns_unit_relay.blockSignals(True) self.chk_dns_unit_relay.setChecked(unit_active) self.chk_dns_unit_relay.blockSignals(False) self.chk_dns_runtime_nftset.blockSignals(True) self.chk_dns_runtime_nftset.setChecked(bool(getattr(st, "runtime_nftset", True))) self.chk_dns_runtime_nftset.blockSignals(False) self._set_dns_unit_relay_state(unit_active) self._set_dns_runtime_state( bool(getattr(st, "runtime_nftset", True)), str(getattr(st, "wildcard_source", "") or ""), str(getattr(st, "runtime_config_error", "") or ""), ) self._set_dns_mode_state(mode) finally: self._dns_ui_refresh = False self._safe(work, title="DNS error") def refresh_domains_tab(self) -> None: def work(): # reload currently selected file self.on_domains_load() self._safe(work, title="Domains error") def refresh_trace_tab(self) -> None: def work(): if self.radio_trace_gui.isChecked(): mode: TraceMode = "gui" elif self.radio_trace_smartdns.isChecked(): mode = "smartdns" else: mode = "full" dump = self.ctrl.trace_view(mode) text = "\n".join(dump.lines).rstrip() if dump.lines: text += "\n" self._set_text(self.txt_trace, text, preserve_scroll=True) self._safe(work, title="Trace error")