from __future__ import annotations from PySide6.QtWidgets import QLabel from main_window.constants import LoginPage class RuntimeStateMixin: def _get_selected_domains_file(self) -> str: item = self.lst_files.currentItem() return item.text() if item is not None else "bases" def _load_file_content(self, name: str) -> tuple[str, str, str]: api_map = { "bases": "bases", "meta-special": "meta", "subs": "subs", "static-ips": "static", "last-ips-map-direct": "last-ips-map-direct", "last-ips-map-wildcard": "last-ips-map-wildcard", "wildcard-observed-hosts": "wildcard-observed-hosts", "smartdns.conf": "smartdns", } if name in api_map: f = self.ctrl.domains_file_load(api_map[name]) content = f.content or "" source = getattr(f, "source", "") or "api" if name == "smartdns.conf": path = "/var/lib/selective-vpn/smartdns-wildcards.json -> /etc/selective-vpn/smartdns.conf" elif name == "last-ips-map-direct": path = "/var/lib/selective-vpn/last-ips-map-direct.txt (artifact: agvpn4)" elif name == "last-ips-map-wildcard": path = "/var/lib/selective-vpn/last-ips-map-wildcard.txt (artifact: agvpn_dyn4)" elif name == "wildcard-observed-hosts": path = "/var/lib/selective-vpn/last-ips-map-wildcard.txt (derived unique hosts)" else: path = f"/etc/selective-vpn/domains/{name}.txt" return content, source, path return "", "unknown", name def _save_file_content(self, name: str, content: str) -> None: api_map = { "bases": "bases", "meta-special": "meta", "subs": "subs", "static-ips": "static", "smartdns.conf": "smartdns", } if name in api_map: self.ctrl.domains_file_save(api_map[name], content) return def _show_vpn_page(self, which: LoginPage) -> None: self.vpn_stack.setCurrentIndex(1 if which == "login" else 0) def _set_auth_button(self, logged: bool) -> None: self.btn_auth.setText("Logout" if logged else "Login") def _set_status_label_color(self, lbl: QLabel, text: str, *, kind: str) -> None: """Подкраска Policy route / services.""" lbl.setText(text) low = (text or "").lower() color = "black" if kind == "policy": if "ok" in low and "missing" not in low and "error" not in low: color = "green" elif any(w in low for w in ("missing", "error", "failed")): color = "red" else: color = "orange" else: # service if any(w in low for w in ("failed", "error", "unknown", "inactive", "dead")): color = "red" elif "active" in low or "running" in low: color = "green" else: color = "orange" lbl.setStyleSheet(f"color: {color};") def _set_dns_unit_relay_state(self, enabled: bool) -> None: txt = "SmartDNS unit relay: ON" if enabled else "SmartDNS unit relay: OFF" color = "green" if enabled else "red" self.chk_dns_unit_relay.setText(txt) self.chk_dns_unit_relay.setStyleSheet(f"color: {color};") def _set_dns_runtime_state(self, enabled: bool, source: str, cfg_error: str = "") -> None: txt = "SmartDNS runtime accelerator (nftset -> agvpn_dyn4): ON" if enabled else "SmartDNS runtime accelerator (nftset -> agvpn_dyn4): OFF" color = "green" if enabled else "orange" self.chk_dns_runtime_nftset.setText(txt) self.chk_dns_runtime_nftset.setStyleSheet(f"color: {color};") src = (source or "").strip().lower() if src == "both": src_txt = "Wildcard source: both (resolver + smartdns_runtime)" src_color = "green" elif src == "smartdns_runtime": src_txt = "Wildcard source: smartdns_runtime" src_color = "orange" else: src_txt = "Wildcard source: resolver" src_color = "gray" if cfg_error: src_txt = f"{src_txt} | runtime cfg: {cfg_error}" src_color = "orange" self.lbl_dns_wildcard_source.setText(src_txt) self.lbl_dns_wildcard_source.setStyleSheet(f"color: {src_color};") def _set_dns_mode_state(self, mode: str) -> None: low = (mode or "").strip().lower() if low in ("hybrid_wildcard", "hybrid"): txt = "Resolver mode: hybrid wildcard (SmartDNS for wildcard domains)" color = "green" elif low == "direct": txt = "Resolver mode: direct upstreams" color = "red" else: txt = "Resolver mode: unknown" color = "orange" self.lbl_dns_mode_state.setText(txt) self.lbl_dns_mode_state.setStyleSheet(f"color: {color};") def _set_dns_resolver_summary(self, pool_items) -> None: active = [] total = 0 for item in pool_items or []: addr = str(getattr(item, "addr", "") or "").strip() if not addr: continue total += 1 if bool(getattr(item, "enabled", False)): active.append(addr) applied = len(active) if applied > 12: applied = 12 if not active: text = f"Resolver upstreams: active=0/{total} (empty set)" else: preview = ", ".join(active[:4]) if len(active) > 4: preview += f", +{len(active)-4} more" text = f"Resolver upstreams: active={len(active)}/{total}, applied={applied}/12 [{preview}]" self.lbl_dns_resolver_upstreams.setText(text) self.lbl_dns_resolver_upstreams.setStyleSheet("color: gray;") avg_ms = self._ui_settings.value("dns_benchmark/last_avg_ms", None) ok = self._ui_settings.value("dns_benchmark/last_ok", None) fail = self._ui_settings.value("dns_benchmark/last_fail", None) timeout = self._ui_settings.value("dns_benchmark/last_timeout", None) if avg_ms is None or ok is None or fail is None: self.lbl_dns_resolver_health.setText("Resolver health: no benchmark yet") self.lbl_dns_resolver_health.setStyleSheet("color: gray;") return try: avg = int(avg_ms) ok_i = int(ok) fail_i = int(fail) timeout_i = int(timeout or 0) except Exception: self.lbl_dns_resolver_health.setText("Resolver health: no benchmark yet") self.lbl_dns_resolver_health.setStyleSheet("color: gray;") return color = "green" if avg < 200 else ("#b58900" if avg <= 400 else "red") if timeout_i > 0 and color != "red": color = "#b58900" self.lbl_dns_resolver_health.setText( f"Resolver health: avg={avg}ms ok={ok_i} fail={fail_i} timeout={timeout_i}" ) self.lbl_dns_resolver_health.setStyleSheet(f"color: {color};") def _set_traffic_mode_state( self, desired_mode: str, applied_mode: str, preferred_iface: str, advanced_active: bool, auto_local_bypass: bool, auto_local_active: bool, ingress_reply_bypass: bool, ingress_reply_active: bool, bypass_candidates: int, overrides_applied: int, cgroup_resolved_uids: int, cgroup_warning: str, healthy: bool, ingress_rule_present: bool, ingress_nft_active: bool, probe_ok: bool, probe_message: str, active_iface: str, iface_reason: str, message: str, ) -> None: desired = (desired_mode or "").strip().lower() or "selective" applied = (applied_mode or "").strip().lower() or "direct" if healthy: color = "green" health_txt = "OK" else: color = "red" health_txt = "MISMATCH" text = f"Traffic mode: {desired} (applied: {applied}) [{health_txt}]" diag_parts = [] diag_parts.append(f"preferred={preferred_iface or 'auto'}") diag_parts.append( f"advanced={'on' if advanced_active else 'off'}" ) diag_parts.append( f"auto_local={'on' if auto_local_bypass else 'off'}" f"({'active' if auto_local_active else 'saved'})" ) diag_parts.append( f"ingress_reply={'on' if ingress_reply_bypass else 'off'}" f"({'active' if ingress_reply_active else 'saved'})" ) if auto_local_active and bypass_candidates > 0: diag_parts.append(f"bypass_routes={bypass_candidates}") diag_parts.append(f"overrides={overrides_applied}") if cgroup_resolved_uids > 0: diag_parts.append(f"cgroup_uids={cgroup_resolved_uids}") if cgroup_warning: diag_parts.append(f"cgroup_warning={cgroup_warning}") if active_iface: diag_parts.append(f"iface={active_iface}") if iface_reason: diag_parts.append(f"source={iface_reason}") diag_parts.append( f"ingress_diag=rule:{'ok' if ingress_rule_present else 'off'}" f"/nft:{'ok' if ingress_nft_active else 'off'}" ) diag_parts.append(f"probe={'ok' if probe_ok else 'fail'}") if probe_message: diag_parts.append(probe_message) if message: diag_parts.append(message) diag = " | ".join(diag_parts) if diag_parts else "—" self.lbl_traffic_mode_state.setText(text) self.lbl_traffic_mode_state.setStyleSheet(f"color: {color};") self.lbl_traffic_mode_diag.setText(diag) self.lbl_traffic_mode_diag.setStyleSheet("color: gray;") def _update_routes_progress_label(self, view) -> None: """ Обновляет прогресс nft по RoutesNftProgressView. view ожидаем с полями percent, message, active (duck-typing). """ if view is None: # сброс до idle self._routes_progress_last = 0 self.routes_progress.setValue(0) self.lbl_routes_progress.setText("NFT: idle") self.lbl_routes_progress.setStyleSheet("color: gray;") return # аккуратно ограничим 0..100 try: percent = max(0, min(100, int(view.percent))) except Exception: percent = 0 # не даём прогрессу дёргаться назад, кроме явного сброса (percent==0) if percent == 0: self._routes_progress_last = 0 else: percent = max(percent, self._routes_progress_last) self._routes_progress_last = percent self.routes_progress.setValue(percent) text = f"{percent}% – {view.message}" if not view.active and percent >= 100: color = "green" elif view.active: color = "orange" else: color = "gray" self.lbl_routes_progress.setText(text) self.lbl_routes_progress.setStyleSheet(f"color: {color};") def _load_ui_preferences(self) -> None: raw = self._ui_settings.value("routes/prewarm_aggressive", False) if isinstance(raw, str): val = raw.strip().lower() in ("1", "true", "yes", "on") else: val = bool(raw) self.chk_routes_prewarm_aggressive.blockSignals(True) self.chk_routes_prewarm_aggressive.setChecked(val) self.chk_routes_prewarm_aggressive.blockSignals(False) self._update_prewarm_mode_label() sort_mode = str(self._ui_settings.value("vpn/locations_sort", "ping") or "ping").strip().lower() idx = self.cmb_locations_sort.findData(sort_mode) if idx < 0: idx = 0 self.cmb_locations_sort.blockSignals(True) self.cmb_locations_sort.setCurrentIndex(idx) self.cmb_locations_sort.blockSignals(False) g_route = str( self._ui_settings.value("singbox/global_routing", "selective") or "selective" ).strip().lower() idx = self.cmb_singbox_global_routing.findData(g_route) if idx < 0: idx = 0 self.cmb_singbox_global_routing.blockSignals(True) self.cmb_singbox_global_routing.setCurrentIndex(idx) self.cmb_singbox_global_routing.blockSignals(False) g_dns = str( self._ui_settings.value("singbox/global_dns", "system_resolver") or "system_resolver" ).strip().lower() idx = self.cmb_singbox_global_dns.findData(g_dns) if idx < 0: idx = 0 self.cmb_singbox_global_dns.blockSignals(True) self.cmb_singbox_global_dns.setCurrentIndex(idx) self.cmb_singbox_global_dns.blockSignals(False) g_kill = str( self._ui_settings.value("singbox/global_killswitch", "on") or "on" ).strip().lower() idx = self.cmb_singbox_global_killswitch.findData(g_kill) if idx < 0: idx = 0 self.cmb_singbox_global_killswitch.blockSignals(True) self.cmb_singbox_global_killswitch.setCurrentIndex(idx) self.cmb_singbox_global_killswitch.blockSignals(False) p_route = str( self._ui_settings.value("singbox/profile_routing", "global") or "global" ).strip().lower() idx = self.cmb_singbox_profile_routing.findData(p_route) if idx < 0: idx = 0 self.cmb_singbox_profile_routing.blockSignals(True) self.cmb_singbox_profile_routing.setCurrentIndex(idx) self.cmb_singbox_profile_routing.blockSignals(False) p_dns = str( self._ui_settings.value("singbox/profile_dns", "global") or "global" ).strip().lower() idx = self.cmb_singbox_profile_dns.findData(p_dns) if idx < 0: idx = 0 self.cmb_singbox_profile_dns.blockSignals(True) self.cmb_singbox_profile_dns.setCurrentIndex(idx) self.cmb_singbox_profile_dns.blockSignals(False) p_kill = str( self._ui_settings.value("singbox/profile_killswitch", "global") or "global" ).strip().lower() idx = self.cmb_singbox_profile_killswitch.findData(p_kill) if idx < 0: idx = 0 self.cmb_singbox_profile_killswitch.blockSignals(True) self.cmb_singbox_profile_killswitch.setCurrentIndex(idx) self.cmb_singbox_profile_killswitch.blockSignals(False) raw = self._ui_settings.value("singbox/profile_use_global_routing", True) use_global_route = ( raw.strip().lower() in ("1", "true", "yes", "on") if isinstance(raw, str) else bool(raw) ) raw = self._ui_settings.value("singbox/profile_use_global_dns", True) use_global_dns = ( raw.strip().lower() in ("1", "true", "yes", "on") if isinstance(raw, str) else bool(raw) ) raw = self._ui_settings.value("singbox/profile_use_global_killswitch", True) use_global_kill = ( raw.strip().lower() in ("1", "true", "yes", "on") if isinstance(raw, str) else bool(raw) ) self.chk_singbox_profile_use_global_routing.blockSignals(True) self.chk_singbox_profile_use_global_routing.setChecked(use_global_route) self.chk_singbox_profile_use_global_routing.blockSignals(False) self.chk_singbox_profile_use_global_dns.blockSignals(True) self.chk_singbox_profile_use_global_dns.setChecked(use_global_dns) self.chk_singbox_profile_use_global_dns.blockSignals(False) self.chk_singbox_profile_use_global_killswitch.blockSignals(True) self.chk_singbox_profile_use_global_killswitch.setChecked(use_global_kill) self.chk_singbox_profile_use_global_killswitch.blockSignals(False) raw = self._ui_settings.value("singbox/ui_show_profile_settings", False) show_profile = ( raw.strip().lower() in ("1", "true", "yes", "on") if isinstance(raw, str) else bool(raw) ) raw = self._ui_settings.value("singbox/ui_show_global_defaults", False) show_global = ( raw.strip().lower() in ("1", "true", "yes", "on") if isinstance(raw, str) else bool(raw) ) raw = self._ui_settings.value("singbox/ui_show_activity_log", False) show_activity = ( raw.strip().lower() in ("1", "true", "yes", "on") if isinstance(raw, str) else bool(raw) ) if show_profile and show_global: show_global = False self.btn_singbox_toggle_profile_settings.blockSignals(True) self.btn_singbox_toggle_profile_settings.setChecked(show_profile) self.btn_singbox_toggle_profile_settings.blockSignals(False) self.btn_singbox_toggle_global_defaults.blockSignals(True) self.btn_singbox_toggle_global_defaults.setChecked(show_global) self.btn_singbox_toggle_global_defaults.blockSignals(False) self.btn_singbox_toggle_activity.blockSignals(True) self.btn_singbox_toggle_activity.setChecked(show_activity) self.btn_singbox_toggle_activity.blockSignals(False) self._apply_singbox_profile_controls() self._apply_singbox_compact_visibility() def _save_ui_preferences(self) -> None: self._ui_settings.setValue( "routes/prewarm_aggressive", bool(self.chk_routes_prewarm_aggressive.isChecked()), ) self._ui_settings.setValue( "vpn/locations_sort", str(self.cmb_locations_sort.currentData() or "ping"), ) self._ui_settings.setValue( "singbox/global_routing", str(self.cmb_singbox_global_routing.currentData() or "selective"), ) self._ui_settings.setValue( "singbox/global_dns", str(self.cmb_singbox_global_dns.currentData() or "system_resolver"), ) self._ui_settings.setValue( "singbox/global_killswitch", str(self.cmb_singbox_global_killswitch.currentData() or "on"), ) self._ui_settings.setValue( "singbox/profile_use_global_routing", bool(self.chk_singbox_profile_use_global_routing.isChecked()), ) self._ui_settings.setValue( "singbox/profile_use_global_dns", bool(self.chk_singbox_profile_use_global_dns.isChecked()), ) self._ui_settings.setValue( "singbox/profile_use_global_killswitch", bool(self.chk_singbox_profile_use_global_killswitch.isChecked()), ) self._ui_settings.setValue( "singbox/profile_routing", str(self.cmb_singbox_profile_routing.currentData() or "global"), ) self._ui_settings.setValue( "singbox/profile_dns", str(self.cmb_singbox_profile_dns.currentData() or "global"), ) self._ui_settings.setValue( "singbox/profile_killswitch", str(self.cmb_singbox_profile_killswitch.currentData() or "global"), ) self._ui_settings.setValue( "singbox/ui_show_profile_settings", bool(self.btn_singbox_toggle_profile_settings.isChecked()), ) self._ui_settings.setValue( "singbox/ui_show_global_defaults", bool(self.btn_singbox_toggle_global_defaults.isChecked()), ) self._ui_settings.setValue( "singbox/ui_show_activity_log", bool(self.btn_singbox_toggle_activity.isChecked()), ) self._ui_settings.sync()