#!/usr/bin/env python3 from __future__ import annotations import re import subprocess import sys import time from typing import Literal from PySide6 import QtCore from PySide6.QtCore import Qt, QSettings, QTimer from PySide6.QtGui import QTextCursor from PySide6.QtWidgets import ( QApplication, QComboBox, QFormLayout, QGroupBox, QHBoxLayout, QLabel, QListView, QListWidget, QListWidgetItem, QMainWindow, QMessageBox, QPushButton, QPlainTextEdit, QRadioButton, QStackedWidget, QTabWidget, QVBoxLayout, QWidget, QLineEdit, QCheckBox, QProgressBar, ) from api_client import ApiClient from dashboard_controller import DashboardController, TraceMode from dns_benchmark_dialog import DNSBenchmarkDialog from traffic_mode_dialog import TrafficModeDialog _NEXT_CHECK_RE = re.compile(r"(?i)next check in \d+s") LoginPage = Literal["main", "login"] class EventThread(QtCore.QThread): eventReceived = QtCore.Signal(object) error = QtCore.Signal(str) def __init__(self, controller: DashboardController, parent=None) -> None: super().__init__(parent) self.ctrl = controller self._stop = False self._since = 0 def stop(self) -> None: self._stop = True def run(self) -> None: # pragma: no cover - thread while not self._stop: try: for ev in self.ctrl.iter_events(since=self._since, stop=lambda: self._stop): if self._stop: break try: self._since = int(getattr(ev, "id", self._since)) except Exception: pass self.eventReceived.emit(ev) # graceful end -> short delay time.sleep(0.5) except Exception as e: self.error.emit(str(e)) time.sleep(1.5) class MainWindow(QMainWindow): def __init__(self, controller: DashboardController) -> None: super().__init__() self.ctrl = controller self.setWindowTitle("Selective VPN Dashboard (Qt)") self.resize(1024, 700) # login-flow state self._login_flow_active: bool = False self._login_cursor: int = 0 self._login_url_opened: bool = False self.events_thread: EventThread | None = None self._routes_progress_last: int = 0 self._dns_ui_refresh: bool = False self._ui_settings = QSettings("AdGuardVPN", "SelectiveVPNDashboardQt") self.login_poll_timer = QTimer(self) self.login_poll_timer.setInterval(250) self.login_poll_timer.timeout.connect(self._login_poll_tick) self.dns_save_timer = QTimer(self) self.dns_save_timer.setSingleShot(True) self.dns_save_timer.setInterval(700) self.dns_save_timer.timeout.connect(self._apply_dns_autosave) self._build_ui() self._load_ui_preferences() self.refresh_everything() self._start_events_stream() # ---------------- UI BUILD ---------------- def _build_ui(self) -> None: root = QWidget() root_layout = QVBoxLayout(root) root.setLayout(root_layout) self.setCentralWidget(root) # top bar --------------------------------------------------------- top = QHBoxLayout() root_layout.addLayout(top) # клик по этому баннеру показывает whoami self.btn_login_banner = QPushButton("AdGuard VPN: —") self.btn_login_banner.setFlat(True) self.btn_login_banner.setStyleSheet( "text-align: left; border: none; color: gray;" ) self.btn_login_banner.clicked.connect(self.on_login_banner_clicked) top.addWidget(self.btn_login_banner, stretch=1) self.btn_auth = QPushButton("Login") self.btn_auth.clicked.connect(self.on_auth_button) top.addWidget(self.btn_auth) self.btn_refresh_all = QPushButton("Refresh all") self.btn_refresh_all.clicked.connect(self.refresh_everything) top.addWidget(self.btn_refresh_all) # tabs ------------------------------------------------------------- self.tabs = QTabWidget() root_layout.addWidget(self.tabs, stretch=1) self._build_tab_status() self._build_tab_vpn() self._build_tab_routes() self._build_tab_dns() self._build_tab_domains() self._build_tab_trace() # ---------------- STATUS TAB ---------------- def _build_tab_status(self) -> None: tab = QWidget() layout = QVBoxLayout(tab) grid = QFormLayout() layout.addLayout(grid) self.st_timestamp = QLabel("—") self.st_counts = QLabel("—") self.st_iface = QLabel("—") self.st_route = QLabel("—") self.st_routes_service = QLabel("—") self.st_smartdns_service = QLabel("—") self.st_vpn_service = QLabel("—") grid.addRow("Timestamp:", self.st_timestamp) grid.addRow("Counts:", self.st_counts) grid.addRow("Iface / table / mark:", self.st_iface) grid.addRow("Policy route:", self.st_route) grid.addRow("Routes service:", self.st_routes_service) grid.addRow("SmartDNS:", self.st_smartdns_service) grid.addRow("VPN service:", self.st_vpn_service) btns = QHBoxLayout() layout.addLayout(btns) btn_refresh = QPushButton("Refresh") btn_refresh.clicked.connect(self.refresh_status_tab) btns.addWidget(btn_refresh) btns.addStretch(1) self.tabs.addTab(tab, "Status") # ---------------- VPN TAB ---------------- def _build_tab_vpn(self) -> None: tab = QWidget() self.tab_vpn = tab # нужно, чтобы переключаться сюда из шапки layout = QVBoxLayout(tab) # stack: main vs login-flow page self.vpn_stack = QStackedWidget() layout.addWidget(self.vpn_stack, stretch=1) # ---- main page page_main = QWidget() main_layout = QVBoxLayout(page_main) # Autoconnect group auto_group = QGroupBox("Autoconnect (AdGuardVPN autoloop)") auto_layout = QHBoxLayout(auto_group) self.btn_autoconnect_toggle = QPushButton("Enable autoconnect") self.btn_autoconnect_toggle.clicked.connect(self.on_toggle_autoconnect) auto_layout.addWidget(self.btn_autoconnect_toggle) auto_layout.addStretch(1) # справа текст "unit: active/inactive" с цветом self.lbl_autoconnect_state = QLabel("unit: —") self.lbl_autoconnect_state.setStyleSheet("color: gray;") auto_layout.addWidget(self.lbl_autoconnect_state) main_layout.addWidget(auto_group) # Locations group loc_group = QGroupBox("Location") loc_layout = QHBoxLayout(loc_group) self.cmb_locations = QComboBox() # компактный popup со скроллом, а не на весь экран self.cmb_locations.setMaxVisibleItems(12) self.cmb_locations.setStyleSheet("combobox-popup: 0;") view = QListView() view.setUniformItemSizes(True) self.cmb_locations.setView(view) loc_layout.addWidget(self.cmb_locations, stretch=1) self.btn_set_location = QPushButton("Apply & restart loop") self.btn_set_location.clicked.connect(self.on_set_location) loc_layout.addWidget(self.btn_set_location) main_layout.addWidget(loc_group) # Status output self.txt_vpn = QPlainTextEdit() self.txt_vpn.setReadOnly(True) main_layout.addWidget(self.txt_vpn, stretch=1) self.vpn_stack.addWidget(page_main) # ---- login page page_login = QWidget() lf_layout = QVBoxLayout(page_login) top = QHBoxLayout() lf_layout.addLayout(top) self.lbl_login_flow_status = QLabel("Status: —") top.addWidget(self.lbl_login_flow_status) self.lbl_login_flow_email = QLabel("") self.lbl_login_flow_email.setStyleSheet("color: gray;") top.addWidget(self.lbl_login_flow_email) top.addStretch(1) # URL + buttons row row2 = QHBoxLayout() lf_layout.addLayout(row2) row2.addWidget(QLabel("URL:")) self.edit_login_url = QLineEdit() row2.addWidget(self.edit_login_url, stretch=1) self.btn_login_open = QPushButton("Open") self.btn_login_open.clicked.connect(self.on_login_open) row2.addWidget(self.btn_login_open) self.btn_login_copy = QPushButton("Copy") self.btn_login_copy.clicked.connect(self.on_login_copy) row2.addWidget(self.btn_login_copy) self.btn_login_check = QPushButton("Check") self.btn_login_check.clicked.connect(self.on_login_check) row2.addWidget(self.btn_login_check) self.btn_login_close = QPushButton("Cancel") self.btn_login_close.clicked.connect(self.on_login_cancel) row2.addWidget(self.btn_login_close) self.btn_login_stop = QPushButton("Stop session") self.btn_login_stop.clicked.connect(self.on_login_stop) row2.addWidget(self.btn_login_stop) # log text self.txt_login_flow = QPlainTextEdit() self.txt_login_flow.setReadOnly(True) lf_layout.addWidget(self.txt_login_flow, stretch=1) # bottom buttons bottom = QHBoxLayout() lf_layout.addLayout(bottom) # Start login визуально убираем, но объект оставим на всякий self.btn_login_start = QPushButton("Start login") self.btn_login_start.clicked.connect(self.on_start_login) self.btn_login_start.setVisible(False) bottom.addWidget(self.btn_login_start) btn_back = QPushButton("Back to VPN") btn_back.clicked.connect(lambda: self._show_vpn_page("main")) bottom.addWidget(btn_back) bottom.addStretch(1) self.vpn_stack.addWidget(page_login) self.tabs.addTab(tab, "AdGuardVPN") # ---------------- ROUTES TAB ---------------- def _build_tab_routes(self) -> None: tab = QWidget() layout = QVBoxLayout(tab) # --- Service actions --- act_group = QGroupBox("Selective routes service") act_layout = QHBoxLayout(act_group) self.btn_routes_start = QPushButton("Start") self.btn_routes_start.clicked.connect( lambda: self.on_routes_action("start") ) self.btn_routes_restart = QPushButton("Restart") self.btn_routes_restart.clicked.connect( lambda: self.on_routes_action("restart") ) self.btn_routes_stop = QPushButton("Stop") self.btn_routes_stop.clicked.connect( lambda: self.on_routes_action("stop") ) act_layout.addWidget(self.btn_routes_start) act_layout.addWidget(self.btn_routes_restart) act_layout.addWidget(self.btn_routes_stop) act_layout.addStretch(1) layout.addWidget(act_group) # --- Timer / policy route --- timer_group = QGroupBox("Timer") timer_layout = QHBoxLayout(timer_group) self.chk_timer = QCheckBox("Enable timer") self.chk_timer.stateChanged.connect(self.on_toggle_timer) timer_layout.addWidget(self.chk_timer) self.btn_fix_policy = QPushButton("Fix policy route") self.btn_fix_policy.clicked.connect(self.on_fix_policy_route) timer_layout.addWidget(self.btn_fix_policy) timer_layout.addStretch(1) layout.addWidget(timer_group) # --- Traffic mode relay --- traffic_group = QGroupBox("Traffic mode relay") traffic_layout = QVBoxLayout(traffic_group) relay_row = QHBoxLayout() self.btn_traffic_settings = QPushButton("Open traffic settings") self.btn_traffic_settings.clicked.connect(self.on_open_traffic_settings) relay_row.addWidget(self.btn_traffic_settings) self.btn_traffic_test = QPushButton("Test mode") self.btn_traffic_test.clicked.connect(self.on_test_traffic_mode) relay_row.addWidget(self.btn_traffic_test) self.btn_routes_prewarm = QPushButton("Prewarm wildcard now") self.btn_routes_prewarm.setToolTip("""EN: Sends DNS queries for wildcard domains to prefill agvpn_dyn4 before traffic arrives. RU: Делает DNS-запросы wildcard-доменов, чтобы заранее наполнить agvpn_dyn4.""") self.btn_routes_prewarm.clicked.connect(self.on_smartdns_prewarm) relay_row.addWidget(self.btn_routes_prewarm) relay_row.addStretch(1) traffic_layout.addLayout(relay_row) self.chk_routes_prewarm_aggressive = QCheckBox("Aggressive prewarm (use subs)") self.chk_routes_prewarm_aggressive.setToolTip("""EN: Aggressive mode also queries subs list. This can increase DNS load. RU: Агрессивный режим дополнительно дергает subs список. Может увеличить нагрузку на DNS.""") self.chk_routes_prewarm_aggressive.stateChanged.connect(self._on_prewarm_aggressive_changed) traffic_layout.addWidget(self.chk_routes_prewarm_aggressive) self.lbl_routes_prewarm_mode = QLabel("Prewarm mode: wildcard-only") self.lbl_routes_prewarm_mode.setStyleSheet("color: gray;") traffic_layout.addWidget(self.lbl_routes_prewarm_mode) self._update_prewarm_mode_label() self.lbl_traffic_mode_state = QLabel("Traffic mode: —") self.lbl_traffic_mode_state.setStyleSheet("color: gray;") traffic_layout.addWidget(self.lbl_traffic_mode_state) self.lbl_traffic_mode_diag = QLabel("—") self.lbl_traffic_mode_diag.setStyleSheet("color: gray;") traffic_layout.addWidget(self.lbl_traffic_mode_diag) layout.addWidget(traffic_group) # --- NFT progress (agvpn4) --- progress_row = QHBoxLayout() self.routes_progress = QProgressBar() self.routes_progress.setRange(0, 100) self.routes_progress.setValue(0) self.routes_progress.setFormat("") # текст выводим отдельным лейблом self.routes_progress.setTextVisible(False) self.routes_progress.setEnabled(False) # idle по умолчанию self.lbl_routes_progress = QLabel("NFT: idle") self.lbl_routes_progress.setStyleSheet("color: gray;") progress_row.addWidget(self.routes_progress) progress_row.addWidget(self.lbl_routes_progress) layout.addLayout(progress_row) # --- Log output --- self.txt_routes = QPlainTextEdit() self.txt_routes.setReadOnly(True) layout.addWidget(self.txt_routes, stretch=1) self.tabs.addTab(tab, "Routes") # ---------------- DNS TAB ---------------- def _build_tab_dns(self) -> None: tab = QWidget() main_layout = QVBoxLayout(tab) tip = QLabel("Tip: hover fields for help. Подсказка: наведи на элементы для описания.") tip.setWordWrap(True) tip.setStyleSheet("color: gray;") main_layout.addWidget(tip) resolver_group = QGroupBox("Resolver DNS") resolver_group.setToolTip("""EN: Compact resolver DNS status. Open benchmark to test/apply upstreams. RU: Компактный статус DNS резолвера. Открой benchmark для проверки/применения апстримов.""") resolver_layout = QVBoxLayout(resolver_group) row = QHBoxLayout() self.btn_dns_benchmark = QPushButton("Open DNS benchmark") self.btn_dns_benchmark.clicked.connect(self.on_open_dns_benchmark) row.addWidget(self.btn_dns_benchmark) row.addStretch(1) resolver_layout.addLayout(row) self.lbl_dns_resolver_upstreams = QLabel("Resolver upstreams: default[—, —] meta[—, —]") self.lbl_dns_resolver_upstreams.setStyleSheet("color: gray;") resolver_layout.addWidget(self.lbl_dns_resolver_upstreams) self.lbl_dns_resolver_health = QLabel("Resolver health: —") self.lbl_dns_resolver_health.setStyleSheet("color: gray;") resolver_layout.addWidget(self.lbl_dns_resolver_health) main_layout.addWidget(resolver_group) smart_group = QGroupBox("SmartDNS") smart_group.setToolTip("""EN: SmartDNS is used for wildcard domains in hybrid mode. RU: SmartDNS используется для wildcard-доменов в hybrid режиме.""") smart_layout = QVBoxLayout(smart_group) smart_form = QFormLayout() self.ent_smartdns_addr = QLineEdit() self.ent_smartdns_addr.setToolTip("""EN: SmartDNS address in host#port format (example: 127.0.0.1#6053). RU: Адрес SmartDNS в формате host#port (пример: 127.0.0.1#6053).""") self.ent_smartdns_addr.setPlaceholderText("127.0.0.1#6053") self.ent_smartdns_addr.textEdited.connect(self._schedule_dns_autosave) smart_form.addRow("SmartDNS address", self.ent_smartdns_addr) smart_layout.addLayout(smart_form) self.chk_dns_via_smartdns = QCheckBox("Use SmartDNS for wildcard domains") self.chk_dns_via_smartdns.setToolTip("""EN: Hybrid wildcard mode: wildcard domains resolve via SmartDNS, other lists resolve via direct upstreams. RU: Hybrid wildcard режим: wildcard-домены резолвятся через SmartDNS, остальные списки через direct апстримы.""") self.chk_dns_via_smartdns.stateChanged.connect(self.on_dns_mode_toggle) smart_layout.addWidget(self.chk_dns_via_smartdns) self.lbl_dns_mode_state = QLabel("Resolver mode: unknown") self.lbl_dns_mode_state.setToolTip("""EN: Current resolver mode reported by API. RU: Текущий режим резолвера по данным API.""") smart_layout.addWidget(self.lbl_dns_mode_state) self.chk_dns_unit_relay = QCheckBox("SmartDNS unit relay: OFF") self.chk_dns_unit_relay.setToolTip("""EN: Starts/stops smartdns-local.service. Service state is independent from resolver mode. RU: Запускает/останавливает smartdns-local.service. Состояние сервиса не равно режиму резолвера.""") self.chk_dns_unit_relay.stateChanged.connect(self.on_smartdns_unit_toggle) smart_layout.addWidget(self.chk_dns_unit_relay) self.chk_dns_runtime_nftset = QCheckBox("SmartDNS runtime accelerator (nftset -> agvpn_dyn4): ON") self.chk_dns_runtime_nftset.setToolTip("""EN: Optional accelerator: SmartDNS can add resolved IPs to agvpn_dyn4 in runtime (via nftset). EN: Wildcard still works without it (resolver job + prewarm). RU: Опциональный ускоритель: SmartDNS может добавлять IP в agvpn_dyn4 в runtime (через nftset). RU: Wildcard работает и без него (resolver job + prewarm).""") self.chk_dns_runtime_nftset.stateChanged.connect(self.on_smartdns_runtime_toggle) smart_layout.addWidget(self.chk_dns_runtime_nftset) self.lbl_dns_wildcard_source = QLabel("Wildcard source: resolver") self.lbl_dns_wildcard_source.setToolTip("""EN: Where wildcard IPs come from: resolver job, SmartDNS runtime nftset, or both. RU: Источник wildcard IP: резолвер, runtime nftset SmartDNS, или оба.""") self.lbl_dns_wildcard_source.setStyleSheet("color: gray;") smart_layout.addWidget(self.lbl_dns_wildcard_source) main_layout.addWidget(smart_group) main_layout.addStretch(1) self.tabs.addTab(tab, "DNS") # ---------------- DOMAINS TAB ---------------- def _build_tab_domains(self) -> None: tab = QWidget() main_layout = QHBoxLayout(tab) left = QVBoxLayout() main_layout.addLayout(left) left.addWidget(QLabel("Files:")) self.lst_files = QListWidget() for name in ( "bases", "meta-special", "subs", "static-ips", "last-ips-map-direct", "last-ips-map-wildcard", "smartdns.conf", ): QListWidgetItem(name, self.lst_files) self.lst_files.setCurrentRow(0) self.lst_files.itemSelectionChanged.connect(self.on_domains_load) left.addWidget(self.lst_files) self.btn_domains_save = QPushButton("Save file") self.btn_domains_save.clicked.connect(self.on_domains_save) left.addWidget(self.btn_domains_save) left.addStretch(1) right_layout = QVBoxLayout() main_layout.addLayout(right_layout, stretch=1) self.lbl_domains_info = QLabel("—") self.lbl_domains_info.setStyleSheet("color: gray;") right_layout.addWidget(self.lbl_domains_info) self.txt_domains = QPlainTextEdit() right_layout.addWidget(self.txt_domains, stretch=1) self.tabs.addTab(tab, "Domains") # ---------------- TRACE TAB ---------------- def _build_tab_trace(self) -> None: tab = QWidget() layout = QVBoxLayout(tab) top = QHBoxLayout() layout.addLayout(top) self.radio_trace_full = QRadioButton("Full") self.radio_trace_full.setChecked(True) self.radio_trace_full.toggled.connect(self.refresh_trace_tab) top.addWidget(self.radio_trace_full) self.radio_trace_gui = QRadioButton("Events") self.radio_trace_gui.toggled.connect(self.refresh_trace_tab) top.addWidget(self.radio_trace_gui) self.radio_trace_smartdns = QRadioButton("SmartDNS") self.radio_trace_smartdns.toggled.connect(self.refresh_trace_tab) top.addWidget(self.radio_trace_smartdns) btn_refresh = QPushButton("Refresh") btn_refresh.clicked.connect(self.refresh_trace_tab) top.addWidget(btn_refresh) top.addStretch(1) self.txt_trace = QPlainTextEdit() self.txt_trace.setReadOnly(True) layout.addWidget(self.txt_trace, stretch=1) self.tabs.addTab(tab, "Trace") # ---------------- UI HELPERS ---------------- def _safe(self, fn, *, title: str = "Error"): try: return fn() except Exception as e: # pragma: no cover - GUI try: self.ctrl.log_gui(f"[ui-error] {title}: {e}") except Exception: pass QMessageBox.critical(self, title, str(e)) return None def _set_text(self, widget: QPlainTextEdit, text: str, *, preserve_scroll: bool = False) -> None: """Set text, optionally сохраняя положение скролла (для trace).""" if not preserve_scroll: widget.setPlainText(text) return sb = widget.verticalScrollBar() old_max = sb.maximum() old_val = sb.value() at_end = old_val >= old_max - 2 widget.setPlainText(text) new_max = sb.maximum() if at_end: sb.setValue(new_max) else: # подвинем на ту же относительную позицию, учитывая прирост размера sb.setValue(max(0, min(new_max, old_val+(new_max-old_max)))) def _append_text(self, widget: QPlainTextEdit, text: str) -> None: cursor = widget.textCursor() cursor.movePosition(QTextCursor.End) cursor.insertText(text) widget.setTextCursor(cursor) widget.ensureCursorVisible() def _clean_ui_lines(self, lines) -> str: buf = "\n".join([str(x) for x in (lines or [])]).replace("\r", "\n") out_lines = [] for ln in buf.splitlines(): t = ln.strip() if not t: continue t2 = _NEXT_CHECK_RE.sub("", t).strip() if not t2: continue out_lines.append(t2) return "\n".join(out_lines).rstrip() def _get_selected_domains_file(self) -> str: item = self.lst_files.currentItem() return item.text() if item is not None else "bases" def _load_file_content(self, name: str) -> tuple[str, str, str]: api_map = { "bases": "bases", "meta-special": "meta", "subs": "subs", "static-ips": "static", "last-ips-map-direct": "last-ips-map-direct", "last-ips-map-wildcard": "last-ips-map-wildcard", "smartdns.conf": "smartdns", } if name in api_map: f = self.ctrl.domains_file_load(api_map[name]) content = f.content or "" source = getattr(f, "source", "") or "api" if name == "smartdns.conf": path = "/var/lib/selective-vpn/smartdns-wildcards.json -> /etc/selective-vpn/smartdns.conf" elif name == "last-ips-map-direct": path = "/var/lib/selective-vpn/last-ips-map-direct.txt (artifact: agvpn4)" elif name == "last-ips-map-wildcard": path = "/var/lib/selective-vpn/last-ips-map-wildcard.txt (artifact: agvpn_dyn4)" else: path = f"/etc/selective-vpn/domains/{name}.txt" return content, source, path return "", "unknown", name def _save_file_content(self, name: str, content: str) -> None: api_map = { "bases": "bases", "meta-special": "meta", "subs": "subs", "static-ips": "static", "smartdns.conf": "smartdns", } if name in api_map: self.ctrl.domains_file_save(api_map[name], content) return def _show_vpn_page(self, which: LoginPage) -> None: self.vpn_stack.setCurrentIndex(1 if which == "login" else 0) def _set_auth_button(self, logged: bool) -> None: self.btn_auth.setText("Logout" if logged else "Login") def _set_status_label_color(self, lbl: QLabel, text: str, *, kind: str) -> None: """Подкраска Policy route / services.""" lbl.setText(text) low = (text or "").lower() color = "black" if kind == "policy": if "ok" in low and "missing" not in low and "error" not in low: color = "green" elif any(w in low for w in ("missing", "error", "failed")): color = "red" else: color = "orange" else: # service if any(w in low for w in ("failed", "error", "unknown", "inactive", "dead")): color = "red" elif "active" in low or "running" in low: color = "green" else: color = "orange" lbl.setStyleSheet(f"color: {color};") def _set_dns_unit_relay_state(self, enabled: bool) -> None: txt = "SmartDNS unit relay: ON" if enabled else "SmartDNS unit relay: OFF" color = "green" if enabled else "red" self.chk_dns_unit_relay.setText(txt) self.chk_dns_unit_relay.setStyleSheet(f"color: {color};") def _set_dns_runtime_state(self, enabled: bool, source: str, cfg_error: str = "") -> None: txt = "SmartDNS runtime accelerator (nftset -> agvpn_dyn4): ON" if enabled else "SmartDNS runtime accelerator (nftset -> agvpn_dyn4): OFF" color = "green" if enabled else "orange" self.chk_dns_runtime_nftset.setText(txt) self.chk_dns_runtime_nftset.setStyleSheet(f"color: {color};") src = (source or "").strip().lower() if src == "both": src_txt = "Wildcard source: both (resolver + smartdns_runtime)" src_color = "green" elif src == "smartdns_runtime": src_txt = "Wildcard source: smartdns_runtime" src_color = "orange" else: src_txt = "Wildcard source: resolver" src_color = "gray" if cfg_error: src_txt = f"{src_txt} | runtime cfg: {cfg_error}" src_color = "orange" self.lbl_dns_wildcard_source.setText(src_txt) self.lbl_dns_wildcard_source.setStyleSheet(f"color: {src_color};") def _set_dns_mode_state(self, mode: str) -> None: low = (mode or "").strip().lower() if low in ("hybrid_wildcard", "hybrid"): txt = "Resolver mode: hybrid wildcard (SmartDNS for wildcard domains)" color = "green" elif low == "direct": txt = "Resolver mode: direct upstreams" color = "red" else: txt = "Resolver mode: unknown" color = "orange" self.lbl_dns_mode_state.setText(txt) self.lbl_dns_mode_state.setStyleSheet(f"color: {color};") def _set_dns_resolver_summary(self, pool_items) -> None: active = [] total = 0 for item in pool_items or []: addr = str(getattr(item, "addr", "") or "").strip() if not addr: continue total += 1 if bool(getattr(item, "enabled", False)): active.append(addr) applied = len(active) if applied > 12: applied = 12 if not active: text = f"Resolver upstreams: active=0/{total} (empty set)" else: preview = ", ".join(active[:4]) if len(active) > 4: preview += f", +{len(active)-4} more" text = f"Resolver upstreams: active={len(active)}/{total}, applied={applied}/12 [{preview}]" self.lbl_dns_resolver_upstreams.setText(text) self.lbl_dns_resolver_upstreams.setStyleSheet("color: gray;") avg_ms = self._ui_settings.value("dns_benchmark/last_avg_ms", None) ok = self._ui_settings.value("dns_benchmark/last_ok", None) fail = self._ui_settings.value("dns_benchmark/last_fail", None) timeout = self._ui_settings.value("dns_benchmark/last_timeout", None) if avg_ms is None or ok is None or fail is None: self.lbl_dns_resolver_health.setText("Resolver health: no benchmark yet") self.lbl_dns_resolver_health.setStyleSheet("color: gray;") return try: avg = int(avg_ms) ok_i = int(ok) fail_i = int(fail) timeout_i = int(timeout or 0) except Exception: self.lbl_dns_resolver_health.setText("Resolver health: no benchmark yet") self.lbl_dns_resolver_health.setStyleSheet("color: gray;") return color = "green" if avg < 200 else ("#b58900" if avg <= 400 else "red") if timeout_i > 0 and color != "red": color = "#b58900" self.lbl_dns_resolver_health.setText( f"Resolver health: avg={avg}ms ok={ok_i} fail={fail_i} timeout={timeout_i}" ) self.lbl_dns_resolver_health.setStyleSheet(f"color: {color};") def _set_traffic_mode_state( self, desired_mode: str, applied_mode: str, preferred_iface: str, advanced_active: bool, auto_local_bypass: bool, auto_local_active: bool, ingress_reply_bypass: bool, ingress_reply_active: bool, bypass_candidates: int, overrides_applied: int, cgroup_resolved_uids: int, cgroup_warning: str, healthy: bool, ingress_rule_present: bool, ingress_nft_active: bool, probe_ok: bool, probe_message: str, active_iface: str, iface_reason: str, message: str, ) -> None: desired = (desired_mode or "").strip().lower() or "selective" applied = (applied_mode or "").strip().lower() or "direct" if healthy: color = "green" health_txt = "OK" else: color = "red" health_txt = "MISMATCH" text = f"Traffic mode: {desired} (applied: {applied}) [{health_txt}]" diag_parts = [] diag_parts.append(f"preferred={preferred_iface or 'auto'}") diag_parts.append( f"advanced={'on' if advanced_active else 'off'}" ) diag_parts.append( f"auto_local={'on' if auto_local_bypass else 'off'}" f"({'active' if auto_local_active else 'saved'})" ) diag_parts.append( f"ingress_reply={'on' if ingress_reply_bypass else 'off'}" f"({'active' if ingress_reply_active else 'saved'})" ) if auto_local_active and bypass_candidates > 0: diag_parts.append(f"bypass_routes={bypass_candidates}") diag_parts.append(f"overrides={overrides_applied}") if cgroup_resolved_uids > 0: diag_parts.append(f"cgroup_uids={cgroup_resolved_uids}") if cgroup_warning: diag_parts.append(f"cgroup_warning={cgroup_warning}") if active_iface: diag_parts.append(f"iface={active_iface}") if iface_reason: diag_parts.append(f"source={iface_reason}") diag_parts.append( f"ingress_diag=rule:{'ok' if ingress_rule_present else 'off'}" f"/nft:{'ok' if ingress_nft_active else 'off'}" ) diag_parts.append(f"probe={'ok' if probe_ok else 'fail'}") if probe_message: diag_parts.append(probe_message) if message: diag_parts.append(message) diag = " | ".join(diag_parts) if diag_parts else "—" self.lbl_traffic_mode_state.setText(text) self.lbl_traffic_mode_state.setStyleSheet(f"color: {color};") self.lbl_traffic_mode_diag.setText(diag) self.lbl_traffic_mode_diag.setStyleSheet("color: gray;") def _update_routes_progress_label(self, view) -> None: """ Обновляет прогресс nft по RoutesNftProgressView. view ожидаем с полями percent, message, active (duck-typing). """ if view is None: # сброс до idle self._routes_progress_last = 0 self.routes_progress.setValue(0) self.lbl_routes_progress.setText("NFT: idle") self.lbl_routes_progress.setStyleSheet("color: gray;") return # аккуратно ограничим 0..100 try: percent = max(0, min(100, int(view.percent))) except Exception: percent = 0 # не даём прогрессу дёргаться назад, кроме явного сброса (percent==0) if percent == 0: self._routes_progress_last = 0 else: percent = max(percent, self._routes_progress_last) self._routes_progress_last = percent self.routes_progress.setValue(percent) text = f"{percent}% – {view.message}" if not view.active and percent >= 100: color = "green" elif view.active: color = "orange" else: color = "gray" self.lbl_routes_progress.setText(text) self.lbl_routes_progress.setStyleSheet(f"color: {color};") def _load_ui_preferences(self) -> None: raw = self._ui_settings.value("routes/prewarm_aggressive", False) if isinstance(raw, str): val = raw.strip().lower() in ("1", "true", "yes", "on") else: val = bool(raw) self.chk_routes_prewarm_aggressive.blockSignals(True) self.chk_routes_prewarm_aggressive.setChecked(val) self.chk_routes_prewarm_aggressive.blockSignals(False) self._update_prewarm_mode_label() def _save_ui_preferences(self) -> None: self._ui_settings.setValue( "routes/prewarm_aggressive", bool(self.chk_routes_prewarm_aggressive.isChecked()), ) self._ui_settings.sync() # ---------------- EVENTS STREAM ---------------- def _start_events_stream(self) -> None: if self.events_thread: return self.events_thread = EventThread(self.ctrl, self) self.events_thread.eventReceived.connect(self._handle_event) self.events_thread.error.connect(self._handle_event_error) self.events_thread.start() @QtCore.Slot(object) def _handle_event(self, ev) -> None: try: kinds = self.ctrl.classify_event(ev) except Exception: kinds = [] # Отдельно ловим routes_nft_progress, чтобы обновить лейбл прогресса. try: k = (getattr(ev, "kind", "") or "").strip().lower() except Exception: k = "" if k == "routes_nft_progress": try: prog_view = self.ctrl.routes_nft_progress_from_event(ev) self._update_routes_progress_label(prog_view) except Exception: # не роняем UI, просто игнор pass # Простая стратегия: триггерить существующие refresh-функции. if "status" in kinds: self.refresh_status_tab() if "login" in kinds: self.refresh_login_banner() if "vpn" in kinds: self.refresh_vpn_tab() if "routes" in kinds: self.refresh_routes_tab() if "dns" in kinds: self.refresh_dns_tab() if "trace" in kinds: self.refresh_trace_tab() @QtCore.Slot(str) def _handle_event_error(self, msg: str) -> None: # Логируем в trace, UI не блокируем. try: self.ctrl.log_gui(f"[sse-error] {msg}") except Exception: pass # ---------------- REFRESH ---------------- def refresh_everything(self) -> None: self.refresh_login_banner() self.refresh_status_tab() self.refresh_vpn_tab() self.refresh_routes_tab() self.refresh_dns_tab() self.refresh_domains_tab() self.refresh_trace_tab() def refresh_login_banner(self) -> None: def work(): view = self.ctrl.get_login_view() self.btn_login_banner.setText(view.text) self._set_auth_button(view.logged_in) # Принудительно: зелёный если залогинен, серый если нет color = "green" if view.logged_in else "gray" base_style = "text-align: left; border: none;" self.btn_login_banner.setStyleSheet( f"{base_style} color: {color};" ) self._safe(work, title="Login state error") def refresh_status_tab(self) -> None: def work(): view = self.ctrl.get_status_overview() self.st_timestamp.setText(view.timestamp) self.st_counts.setText(view.counts) self.st_iface.setText(view.iface_table_mark) self._set_status_label_color( self.st_route, view.policy_route, kind="policy" ) self._set_status_label_color( self.st_routes_service, view.routes_service, kind="service" ) self._set_status_label_color( self.st_smartdns_service, view.smartdns_service, kind="service" ) self._set_status_label_color( self.st_vpn_service, view.vpn_service, kind="service" ) self._safe(work, title="Status error") def refresh_vpn_tab(self) -> None: def work(): view = self.ctrl.vpn_status_view() txt = [] if view.desired_location: txt.append(f"Desired location: {view.desired_location}") if view.pretty_text: txt.append(view.pretty_text.rstrip()) self._set_text(self.txt_vpn, "\n".join(txt).strip() + "\n") auto_view = self.ctrl.vpn_autoconnect_view() self.btn_autoconnect_toggle.setText( "Disable autoconnect" if auto_view.enabled else "Enable autoconnect" ) self.lbl_autoconnect_state.setText(auto_view.unit_text) self.lbl_autoconnect_state.setStyleSheet( f"color: {auto_view.color};" ) locs = self.ctrl.vpn_locations_view() self.cmb_locations.blockSignals(True) self.cmb_locations.clear() current_iso = (view.desired_location or "").strip().upper() current_index = 0 for i, loc in enumerate(locs or []): self.cmb_locations.addItem(loc.label, loc.iso) if (loc.iso or "").upper() == current_iso: current_index = i if self.cmb_locations.count() > 0: self.cmb_locations.setCurrentIndex(current_index) self.cmb_locations.blockSignals(False) self._safe(work, title="VPN error") def refresh_routes_tab(self) -> None: def work(): timer_enabled = self.ctrl.routes_timer_enabled() self.chk_timer.blockSignals(True) self.chk_timer.setChecked(bool(timer_enabled)) self.chk_timer.blockSignals(False) t = self.ctrl.traffic_mode_view() self._set_traffic_mode_state( t.desired_mode, t.applied_mode, t.preferred_iface, bool(t.advanced_active), bool(t.auto_local_bypass), bool(t.auto_local_active), bool(t.ingress_reply_bypass), bool(t.ingress_reply_active), int(t.bypass_candidates), int(t.overrides_applied), int(t.cgroup_resolved_uids), t.cgroup_warning, bool(t.healthy), bool(t.ingress_rule_present), bool(t.ingress_nft_active), bool(t.probe_ok), t.probe_message, t.active_iface, t.iface_reason, t.message, ) self._safe(work, title="Routes error") def refresh_dns_tab(self) -> None: def work(): self._dns_ui_refresh = True try: pool = self.ctrl.dns_upstream_pool_view() self._set_dns_resolver_summary(getattr(pool, "items", [])) st = self.ctrl.dns_status_view() self.ent_smartdns_addr.setText(st.smartdns_addr or "") mode = (getattr(st, "mode", "") or "").strip().lower() if mode in ("hybrid_wildcard", "hybrid"): hybrid_enabled = True mode = "hybrid_wildcard" else: hybrid_enabled = False mode = "direct" self.chk_dns_via_smartdns.blockSignals(True) self.chk_dns_via_smartdns.setChecked(hybrid_enabled) self.chk_dns_via_smartdns.blockSignals(False) unit_state = (st.unit_state or "unknown").strip().lower() unit_active = unit_state == "active" self.chk_dns_unit_relay.blockSignals(True) self.chk_dns_unit_relay.setChecked(unit_active) self.chk_dns_unit_relay.blockSignals(False) self.chk_dns_runtime_nftset.blockSignals(True) self.chk_dns_runtime_nftset.setChecked(bool(getattr(st, "runtime_nftset", True))) self.chk_dns_runtime_nftset.blockSignals(False) self._set_dns_unit_relay_state(unit_active) self._set_dns_runtime_state( bool(getattr(st, "runtime_nftset", True)), str(getattr(st, "wildcard_source", "") or ""), str(getattr(st, "runtime_config_error", "") or ""), ) self._set_dns_mode_state(mode) finally: self._dns_ui_refresh = False self._safe(work, title="DNS error") def refresh_domains_tab(self) -> None: def work(): # reload currently selected file self.on_domains_load() self._safe(work, title="Domains error") def refresh_trace_tab(self) -> None: def work(): if self.radio_trace_gui.isChecked(): mode: TraceMode = "gui" elif self.radio_trace_smartdns.isChecked(): mode = "smartdns" else: mode = "full" dump = self.ctrl.trace_view(mode) text = "\n".join(dump.lines).rstrip() if dump.lines: text += "\n" self._set_text(self.txt_trace, text, preserve_scroll=True) self._safe(work, title="Trace error") # ---------------- TOP AUTH / BANNER ---------------- def on_auth_button(self) -> None: def work(): view = self.ctrl.get_login_view() if view.logged_in: self.on_logout() else: # при логине всегда переходим на вкладку AdGuardVPN и # показываем страницу логина self.tabs.setCurrentWidget(self.tab_vpn) self._show_vpn_page("login") self.on_start_login() self._safe(work, title="Auth error") def on_login_banner_clicked(self) -> None: def work(): txt = self.ctrl.login_banner_cli_text() QMessageBox.information(self, "AdGuard VPN", txt) self._safe(work, title="Login banner error") # ---------------- LOGIN FLOW ACTIONS ---------------- def on_start_login(self) -> None: def work(): self.ctrl.log_gui("Top Login clicked") self._show_vpn_page("login") self._login_flow_reset_ui() start = self.ctrl.login_flow_start() self._login_cursor = int(start.cursor) self.lbl_login_flow_status.setText( f"Status: {start.status_text or '—'}" ) self.lbl_login_flow_email.setText( f"User: {start.email}" if start.email else "" ) self.edit_login_url.setText(start.url or "") self._login_flow_set_buttons( can_open=start.can_open, can_check=start.can_check, can_cancel=start.can_cancel, ) if start.lines: cleaned = self._clean_ui_lines(start.lines) if cleaned: self._append_text(self.txt_login_flow, cleaned + "\n") if not start.alive: self._login_flow_autopoll_stop() self._login_flow_set_buttons( can_open=False, can_check=False, can_cancel=False ) self.btn_login_stop.setEnabled(False) QTimer.singleShot(250, self.refresh_login_banner) return self._login_flow_autopoll_start() self._safe(work, title="Login start error") def _login_flow_reset_ui(self) -> None: self._login_cursor = 0 self._login_url_opened = False self.edit_login_url.setText("") self.lbl_login_flow_status.setText("Status: —") self.lbl_login_flow_email.setText("") self._set_text(self.txt_login_flow, "") def _login_flow_set_buttons( self, *, can_open: bool, can_check: bool, can_cancel: bool, ) -> None: self.btn_login_open.setEnabled(bool(can_open)) self.btn_login_copy.setEnabled(bool(self.edit_login_url.text().strip())) self.btn_login_check.setEnabled(bool(can_check)) self.btn_login_close.setEnabled(bool(can_cancel)) self.btn_login_stop.setEnabled(True) def _login_flow_autopoll_start(self) -> None: self._login_flow_active = True if not self.login_poll_timer.isActive(): self.login_poll_timer.start() def _login_flow_autopoll_stop(self) -> None: self._login_flow_active = False if self.login_poll_timer.isActive(): self.login_poll_timer.stop() def _login_poll_tick(self) -> None: if not self._login_flow_active: return def work(): view = self.ctrl.login_flow_poll(self._login_cursor) self._login_cursor = int(view.cursor) self.lbl_login_flow_status.setText( f"Status: {view.status_text or '—'}" ) self.lbl_login_flow_email.setText( f"User: {view.email}" if view.email else "" ) if view.url: self.edit_login_url.setText(view.url) self._login_flow_set_buttons( can_open=view.can_open, can_check=view.can_check, can_cancel=view.can_cancel, ) cleaned = self._clean_ui_lines(view.lines) if cleaned: self._append_text(self.txt_login_flow, cleaned + "\n") if (not self._login_url_opened) and view.url: self._login_url_opened = True try: subprocess.Popen(["xdg-open", view.url]) except Exception: pass phase = (view.phase or "").strip().lower() if (not view.alive) or phase in ( "success", "failed", "cancelled", "already_logged", ): self._login_flow_autopoll_stop() self._login_flow_set_buttons( can_open=False, can_check=False, can_cancel=False ) self.btn_login_stop.setEnabled(False) QTimer.singleShot(250, self.refresh_login_banner) self._safe(work, title="Login flow error") def on_login_copy(self) -> None: def work(): u = self.edit_login_url.text().strip() if u: QApplication.clipboard().setText(u) self.ctrl.log_gui("Login flow: copy-url") self._safe(work, title="Login copy error") def on_login_open(self) -> None: def work(): u = self.edit_login_url.text().strip() if u: try: subprocess.Popen(["xdg-open", u]) except Exception: pass self.ctrl.log_gui("Login flow: open") self._safe(work, title="Login open error") def on_login_check(self) -> None: def work(): # если ещё ничего не запущено — считаем это стартом логина if ( not self._login_flow_active and self._login_cursor == 0 and not self.edit_login_url.text().strip() and not self.txt_login_flow.toPlainText().strip() ): self.on_start_login() return self.ctrl.login_flow_action("check") self.ctrl.log_gui("Login flow: check") self._safe(work, title="Login check error") def on_login_cancel(self) -> None: def work(): self.ctrl.login_flow_action("cancel") self.ctrl.log_gui("Login flow: cancel") self._safe(work, title="Login cancel error") def on_login_stop(self) -> None: def work(): self.ctrl.login_flow_stop() self.ctrl.log_gui("Login flow: stop") self._login_flow_autopoll_stop() QTimer.singleShot(250, self.refresh_login_banner) self._safe(work, title="Login stop error") def on_logout(self) -> None: def work(): self.ctrl.log_gui("Top Logout clicked") res = self.ctrl.vpn_logout() self._set_text(self.txt_vpn, res.pretty_text or str(res)) QTimer.singleShot(250, self.refresh_login_banner) self._safe(work, title="Logout error") # ---- VPN actions --------------------------------------------------- def on_toggle_autoconnect(self) -> None: def work(): current = self.ctrl.vpn_autoconnect_enabled() enable = not current self.ctrl.vpn_set_autoconnect(enable) self.ctrl.log_gui(f"VPN autoconnect set to {enable}") self.refresh_vpn_tab() self._safe(work, title="Autoconnect error") def on_set_location(self) -> None: def work(): idx = self.cmb_locations.currentIndex() if idx < 0: return iso = self.cmb_locations.currentData() self.ctrl.vpn_set_location(iso) self.ctrl.log_gui(f"VPN location set to {iso}") self.refresh_vpn_tab() self._safe(work, title="Location error") # ---- Routes actions ------------------------------------------------ def on_routes_action( self, action: Literal["start", "stop", "restart"] ) -> None: def work(): res = self.ctrl.routes_service_action(action) self._set_text(self.txt_routes, res.pretty_text or str(res)) self.refresh_status_tab() self._safe(work, title="Routes error") def _append_routes_log(self, msg: str) -> None: line = (msg or "").strip() if not line: return self._append_text(self.txt_routes, line + "\n") self.ctrl.log_gui(line) def on_open_traffic_settings(self) -> None: def work(): def refresh_all_traffic() -> None: self.refresh_routes_tab() self.refresh_status_tab() dlg = TrafficModeDialog( self.ctrl, log_cb=self._append_routes_log, refresh_cb=refresh_all_traffic, parent=self, ) dlg.exec() refresh_all_traffic() self._safe(work, title="Traffic mode dialog error") def on_test_traffic_mode(self) -> None: def work(): view = self.ctrl.traffic_mode_test() msg = ( f"Traffic mode test: desired={view.desired_mode}, applied={view.applied_mode}, " f"iface={view.active_iface or '-'}, probe_ok={view.probe_ok}, " f"healthy={view.healthy}, auto_local_bypass={view.auto_local_bypass}, " f"bypass_routes={view.bypass_candidates}, overrides={view.overrides_applied}, " f"cgroup_uids={view.cgroup_resolved_uids}, cgroup_warning={view.cgroup_warning or '-'}, " f"message={view.message}, probe={view.probe_message}" ) self._append_routes_log(msg) self.refresh_routes_tab() self.refresh_status_tab() self._safe(work, title="Traffic mode test error") def on_toggle_timer(self) -> None: def work(): enabled = self.chk_timer.isChecked() res = self.ctrl.routes_timer_set(enabled) self.ctrl.log_gui(f"Routes timer set to {enabled}") self._set_text(self.txt_routes, res.pretty_text or str(res)) self.refresh_routes_tab() self._safe(work, title="Timer error") def on_fix_policy_route(self) -> None: def work(): res = self.ctrl.routes_fix_policy_route() self._set_text(self.txt_routes, res.pretty_text or str(res)) self.refresh_status_tab() self._safe(work, title="Policy route error") # ---- DNS actions --------------------------------------------------- def _schedule_dns_autosave(self, _text: str = "") -> None: if self._dns_ui_refresh: return self.dns_save_timer.start() def _apply_dns_autosave(self) -> None: def work(): if self._dns_ui_refresh: return self.ctrl.dns_mode_set( self.chk_dns_via_smartdns.isChecked(), self.ent_smartdns_addr.text().strip(), ) self.ctrl.log_gui("DNS settings autosaved") self._safe(work, title="DNS save error") def on_open_dns_benchmark(self) -> None: def work(): dlg = DNSBenchmarkDialog( self.ctrl, settings=self._ui_settings, refresh_cb=self.refresh_dns_tab, parent=self, ) dlg.exec() self.refresh_dns_tab() self._safe(work, title="DNS benchmark error") def on_dns_mode_toggle(self) -> None: def work(): via = self.chk_dns_via_smartdns.isChecked() self.ctrl.dns_mode_set(via, self.ent_smartdns_addr.text().strip()) mode = "hybrid_wildcard" if via else "direct" self.ctrl.log_gui(f"DNS mode changed: mode={mode}") self.refresh_dns_tab() self._safe(work, title="DNS mode error") def on_smartdns_unit_toggle(self) -> None: def work(): enable = self.chk_dns_unit_relay.isChecked() action = "start" if enable else "stop" self.ctrl.smartdns_service_action(action) self.ctrl.log_smartdns(f"SmartDNS unit action from GUI: {action}") self.refresh_dns_tab() self.refresh_status_tab() self._safe(work, title="SmartDNS error") def on_smartdns_runtime_toggle(self) -> None: def work(): if self._dns_ui_refresh: return enable = self.chk_dns_runtime_nftset.isChecked() st = self.ctrl.smartdns_runtime_set(enabled=enable, restart=True) self.ctrl.log_smartdns( f"SmartDNS runtime accelerator set from GUI: enabled={enable} changed={st.changed} restarted={st.restarted} source={st.wildcard_source}" ) self.refresh_dns_tab() self.refresh_trace_tab() self._safe(work, title="SmartDNS runtime error") def on_smartdns_prewarm(self) -> None: def work(): aggressive = bool(self.chk_routes_prewarm_aggressive.isChecked()) result = self.ctrl.smartdns_prewarm(aggressive_subs=aggressive) mode_txt = "aggressive_subs=on" if aggressive else "aggressive_subs=off" self.ctrl.log_smartdns(f"SmartDNS prewarm requested from GUI: {mode_txt}") txt = (result.pretty_text or "").strip() if result.ok: QMessageBox.information(self, "SmartDNS prewarm", txt or "OK") else: QMessageBox.critical(self, "SmartDNS prewarm", txt or "ERROR") self.refresh_trace_tab() self._safe(work, title="SmartDNS prewarm error") def _update_prewarm_mode_label(self, _state: int = 0) -> None: aggressive = bool(self.chk_routes_prewarm_aggressive.isChecked()) if aggressive: self.lbl_routes_prewarm_mode.setText("Prewarm mode: aggressive (subs enabled)") self.lbl_routes_prewarm_mode.setStyleSheet("color: orange;") else: self.lbl_routes_prewarm_mode.setText("Prewarm mode: wildcard-only") self.lbl_routes_prewarm_mode.setStyleSheet("color: gray;") def _on_prewarm_aggressive_changed(self, _state: int = 0) -> None: self._update_prewarm_mode_label(_state) self._save_ui_preferences() # ---- Domains actions ----------------------------------------------- def on_domains_load(self) -> None: def work(): name = self._get_selected_domains_file() content, source, path = self._load_file_content(name) is_readonly = name in ("last-ips-map-direct", "last-ips-map-wildcard") self.txt_domains.setReadOnly(is_readonly) self.btn_domains_save.setEnabled(not is_readonly) self._set_text(self.txt_domains, content) ro = "read-only" if is_readonly else "editable" self.lbl_domains_info.setText(f"{name} ({source}, {ro}) [{path}]") self._safe(work, title="Domains load error") def on_domains_save(self) -> None: def work(): name = self._get_selected_domains_file() content = self.txt_domains.toPlainText() self._save_file_content(name, content) self.ctrl.log_gui(f"Domains file saved: {name}") self._safe(work, title="Domains save error") # ---- close event --------------------------------------------------- def closeEvent(self, event) -> None: # pragma: no cover - GUI try: self._save_ui_preferences() self._login_flow_autopoll_stop() if self.events_thread: self.events_thread.stop() self.events_thread.wait(1500) finally: super().closeEvent(event) def main(argv: list[str] | None = None) -> int: if argv is None: argv = sys.argv[1:] base_url = "http://127.0.0.1:8080" if argv: base_url = argv[0] client = ApiClient(base_url) ctrl = DashboardController(client) app = QApplication(sys.argv) win = MainWindow(ctrl) win.show() return app.exec() if __name__ == "__main__": raise SystemExit(main())