482 lines
20 KiB
Python
482 lines
20 KiB
Python
from __future__ import annotations
|
||
|
||
from PySide6.QtWidgets import QLabel
|
||
|
||
from main_window.constants import LoginPage
|
||
|
||
|
||
class RuntimeStateMixin:
|
||
def _get_selected_domains_file(self) -> str:
|
||
item = self.lst_files.currentItem()
|
||
return item.text() if item is not None else "bases"
|
||
|
||
def _load_file_content(self, name: str) -> tuple[str, str, str]:
|
||
api_map = {
|
||
"bases": "bases",
|
||
"meta-special": "meta",
|
||
"subs": "subs",
|
||
"static-ips": "static",
|
||
"last-ips-map-direct": "last-ips-map-direct",
|
||
"last-ips-map-wildcard": "last-ips-map-wildcard",
|
||
"wildcard-observed-hosts": "wildcard-observed-hosts",
|
||
"smartdns.conf": "smartdns",
|
||
}
|
||
if name in api_map:
|
||
f = self.ctrl.domains_file_load(api_map[name])
|
||
content = f.content or ""
|
||
source = getattr(f, "source", "") or "api"
|
||
if name == "smartdns.conf":
|
||
path = "/var/lib/selective-vpn/smartdns-wildcards.json -> /etc/selective-vpn/smartdns.conf"
|
||
elif name == "last-ips-map-direct":
|
||
path = "/var/lib/selective-vpn/last-ips-map-direct.txt (artifact: agvpn4)"
|
||
elif name == "last-ips-map-wildcard":
|
||
path = "/var/lib/selective-vpn/last-ips-map-wildcard.txt (artifact: agvpn_dyn4)"
|
||
elif name == "wildcard-observed-hosts":
|
||
path = "/var/lib/selective-vpn/last-ips-map-wildcard.txt (derived unique hosts)"
|
||
else:
|
||
path = f"/etc/selective-vpn/domains/{name}.txt"
|
||
return content, source, path
|
||
return "", "unknown", name
|
||
|
||
def _save_file_content(self, name: str, content: str) -> None:
|
||
api_map = {
|
||
"bases": "bases",
|
||
"meta-special": "meta",
|
||
"subs": "subs",
|
||
"static-ips": "static",
|
||
"smartdns.conf": "smartdns",
|
||
}
|
||
if name in api_map:
|
||
self.ctrl.domains_file_save(api_map[name], content)
|
||
return
|
||
|
||
def _show_vpn_page(self, which: LoginPage) -> None:
|
||
self.vpn_stack.setCurrentIndex(1 if which == "login" else 0)
|
||
|
||
def _set_auth_button(self, logged: bool) -> None:
|
||
self.btn_auth.setText("Logout" if logged else "Login")
|
||
|
||
def _set_status_label_color(self, lbl: QLabel, text: str, *, kind: str) -> None:
|
||
"""Подкраска Policy route / services."""
|
||
lbl.setText(text)
|
||
low = (text or "").lower()
|
||
color = "black"
|
||
if kind == "policy":
|
||
if "ok" in low and "missing" not in low and "error" not in low:
|
||
color = "green"
|
||
elif any(w in low for w in ("missing", "error", "failed")):
|
||
color = "red"
|
||
else:
|
||
color = "orange"
|
||
else: # service
|
||
if any(w in low for w in ("failed", "error", "unknown", "inactive", "dead")):
|
||
color = "red"
|
||
elif "active" in low or "running" in low:
|
||
color = "green"
|
||
else:
|
||
color = "orange"
|
||
lbl.setStyleSheet(f"color: {color};")
|
||
|
||
def _set_dns_unit_relay_state(self, enabled: bool) -> None:
|
||
txt = "SmartDNS unit relay: ON" if enabled else "SmartDNS unit relay: OFF"
|
||
color = "green" if enabled else "red"
|
||
self.chk_dns_unit_relay.setText(txt)
|
||
self.chk_dns_unit_relay.setStyleSheet(f"color: {color};")
|
||
|
||
def _set_dns_runtime_state(self, enabled: bool, source: str, cfg_error: str = "") -> None:
|
||
txt = "SmartDNS runtime accelerator (nftset -> agvpn_dyn4): ON" if enabled else "SmartDNS runtime accelerator (nftset -> agvpn_dyn4): OFF"
|
||
color = "green" if enabled else "orange"
|
||
self.chk_dns_runtime_nftset.setText(txt)
|
||
self.chk_dns_runtime_nftset.setStyleSheet(f"color: {color};")
|
||
|
||
src = (source or "").strip().lower()
|
||
if src == "both":
|
||
src_txt = "Wildcard source: both (resolver + smartdns_runtime)"
|
||
src_color = "green"
|
||
elif src == "smartdns_runtime":
|
||
src_txt = "Wildcard source: smartdns_runtime"
|
||
src_color = "orange"
|
||
else:
|
||
src_txt = "Wildcard source: resolver"
|
||
src_color = "gray"
|
||
if cfg_error:
|
||
src_txt = f"{src_txt} | runtime cfg: {cfg_error}"
|
||
src_color = "orange"
|
||
self.lbl_dns_wildcard_source.setText(src_txt)
|
||
self.lbl_dns_wildcard_source.setStyleSheet(f"color: {src_color};")
|
||
|
||
def _set_dns_mode_state(self, mode: str) -> None:
|
||
low = (mode or "").strip().lower()
|
||
if low in ("hybrid_wildcard", "hybrid"):
|
||
txt = "Resolver mode: hybrid wildcard (SmartDNS for wildcard domains)"
|
||
color = "green"
|
||
elif low == "direct":
|
||
txt = "Resolver mode: direct upstreams"
|
||
color = "red"
|
||
else:
|
||
txt = "Resolver mode: unknown"
|
||
color = "orange"
|
||
self.lbl_dns_mode_state.setText(txt)
|
||
self.lbl_dns_mode_state.setStyleSheet(f"color: {color};")
|
||
|
||
def _set_dns_resolver_summary(self, pool_items) -> None:
|
||
active = []
|
||
total = 0
|
||
for item in pool_items or []:
|
||
addr = str(getattr(item, "addr", "") or "").strip()
|
||
if not addr:
|
||
continue
|
||
total += 1
|
||
if bool(getattr(item, "enabled", False)):
|
||
active.append(addr)
|
||
applied = len(active)
|
||
if applied > 12:
|
||
applied = 12
|
||
if not active:
|
||
text = f"Resolver upstreams: active=0/{total} (empty set)"
|
||
else:
|
||
preview = ", ".join(active[:4])
|
||
if len(active) > 4:
|
||
preview += f", +{len(active)-4} more"
|
||
text = f"Resolver upstreams: active={len(active)}/{total}, applied={applied}/12 [{preview}]"
|
||
self.lbl_dns_resolver_upstreams.setText(text)
|
||
self.lbl_dns_resolver_upstreams.setStyleSheet("color: gray;")
|
||
|
||
avg_ms = self._ui_settings.value("dns_benchmark/last_avg_ms", None)
|
||
ok = self._ui_settings.value("dns_benchmark/last_ok", None)
|
||
fail = self._ui_settings.value("dns_benchmark/last_fail", None)
|
||
timeout = self._ui_settings.value("dns_benchmark/last_timeout", None)
|
||
if avg_ms is None or ok is None or fail is None:
|
||
self.lbl_dns_resolver_health.setText("Resolver health: no benchmark yet")
|
||
self.lbl_dns_resolver_health.setStyleSheet("color: gray;")
|
||
return
|
||
try:
|
||
avg = int(avg_ms)
|
||
ok_i = int(ok)
|
||
fail_i = int(fail)
|
||
timeout_i = int(timeout or 0)
|
||
except Exception:
|
||
self.lbl_dns_resolver_health.setText("Resolver health: no benchmark yet")
|
||
self.lbl_dns_resolver_health.setStyleSheet("color: gray;")
|
||
return
|
||
color = "green" if avg < 200 else ("#b58900" if avg <= 400 else "red")
|
||
if timeout_i > 0 and color != "red":
|
||
color = "#b58900"
|
||
self.lbl_dns_resolver_health.setText(
|
||
f"Resolver health: avg={avg}ms ok={ok_i} fail={fail_i} timeout={timeout_i}"
|
||
)
|
||
self.lbl_dns_resolver_health.setStyleSheet(f"color: {color};")
|
||
|
||
def _set_traffic_mode_state(
|
||
self,
|
||
desired_mode: str,
|
||
applied_mode: str,
|
||
preferred_iface: str,
|
||
advanced_active: bool,
|
||
auto_local_bypass: bool,
|
||
auto_local_active: bool,
|
||
ingress_reply_bypass: bool,
|
||
ingress_reply_active: bool,
|
||
bypass_candidates: int,
|
||
overrides_applied: int,
|
||
cgroup_resolved_uids: int,
|
||
cgroup_warning: str,
|
||
healthy: bool,
|
||
ingress_rule_present: bool,
|
||
ingress_nft_active: bool,
|
||
probe_ok: bool,
|
||
probe_message: str,
|
||
active_iface: str,
|
||
iface_reason: str,
|
||
message: str,
|
||
) -> None:
|
||
desired = (desired_mode or "").strip().lower() or "selective"
|
||
applied = (applied_mode or "").strip().lower() or "direct"
|
||
|
||
if healthy:
|
||
color = "green"
|
||
health_txt = "OK"
|
||
else:
|
||
color = "red"
|
||
health_txt = "MISMATCH"
|
||
|
||
text = f"Traffic mode: {desired} (applied: {applied}) [{health_txt}]"
|
||
diag_parts = []
|
||
diag_parts.append(f"preferred={preferred_iface or 'auto'}")
|
||
diag_parts.append(
|
||
f"advanced={'on' if advanced_active else 'off'}"
|
||
)
|
||
diag_parts.append(
|
||
f"auto_local={'on' if auto_local_bypass else 'off'}"
|
||
f"({'active' if auto_local_active else 'saved'})"
|
||
)
|
||
diag_parts.append(
|
||
f"ingress_reply={'on' if ingress_reply_bypass else 'off'}"
|
||
f"({'active' if ingress_reply_active else 'saved'})"
|
||
)
|
||
if auto_local_active and bypass_candidates > 0:
|
||
diag_parts.append(f"bypass_routes={bypass_candidates}")
|
||
diag_parts.append(f"overrides={overrides_applied}")
|
||
if cgroup_resolved_uids > 0:
|
||
diag_parts.append(f"cgroup_uids={cgroup_resolved_uids}")
|
||
if cgroup_warning:
|
||
diag_parts.append(f"cgroup_warning={cgroup_warning}")
|
||
if active_iface:
|
||
diag_parts.append(f"iface={active_iface}")
|
||
if iface_reason:
|
||
diag_parts.append(f"source={iface_reason}")
|
||
diag_parts.append(
|
||
f"ingress_diag=rule:{'ok' if ingress_rule_present else 'off'}"
|
||
f"/nft:{'ok' if ingress_nft_active else 'off'}"
|
||
)
|
||
diag_parts.append(f"probe={'ok' if probe_ok else 'fail'}")
|
||
if probe_message:
|
||
diag_parts.append(probe_message)
|
||
if message:
|
||
diag_parts.append(message)
|
||
diag = " | ".join(diag_parts) if diag_parts else "—"
|
||
|
||
self.lbl_traffic_mode_state.setText(text)
|
||
self.lbl_traffic_mode_state.setStyleSheet(f"color: {color};")
|
||
self.lbl_traffic_mode_diag.setText(diag)
|
||
self.lbl_traffic_mode_diag.setStyleSheet("color: gray;")
|
||
|
||
def _update_routes_progress_label(self, view) -> None:
|
||
"""
|
||
Обновляет прогресс nft по RoutesNftProgressView.
|
||
view ожидаем с полями percent, message, active (duck-typing).
|
||
"""
|
||
if view is None:
|
||
# сброс до idle
|
||
self._routes_progress_last = 0
|
||
self.routes_progress.setValue(0)
|
||
self.lbl_routes_progress.setText("NFT: idle")
|
||
self.lbl_routes_progress.setStyleSheet("color: gray;")
|
||
return
|
||
|
||
# аккуратно ограничим 0..100
|
||
try:
|
||
percent = max(0, min(100, int(view.percent)))
|
||
except Exception:
|
||
percent = 0
|
||
|
||
# не даём прогрессу дёргаться назад, кроме явного сброса (percent==0)
|
||
if percent == 0:
|
||
self._routes_progress_last = 0
|
||
else:
|
||
percent = max(percent, self._routes_progress_last)
|
||
self._routes_progress_last = percent
|
||
|
||
self.routes_progress.setValue(percent)
|
||
|
||
text = f"{percent}% – {view.message}"
|
||
if not view.active and percent >= 100:
|
||
color = "green"
|
||
elif view.active:
|
||
color = "orange"
|
||
else:
|
||
color = "gray"
|
||
|
||
self.lbl_routes_progress.setText(text)
|
||
self.lbl_routes_progress.setStyleSheet(f"color: {color};")
|
||
|
||
def _load_ui_preferences(self) -> None:
|
||
raw = self._ui_settings.value("routes/prewarm_aggressive", False)
|
||
if isinstance(raw, str):
|
||
val = raw.strip().lower() in ("1", "true", "yes", "on")
|
||
else:
|
||
val = bool(raw)
|
||
self.chk_routes_prewarm_aggressive.blockSignals(True)
|
||
self.chk_routes_prewarm_aggressive.setChecked(val)
|
||
self.chk_routes_prewarm_aggressive.blockSignals(False)
|
||
self._update_prewarm_mode_label()
|
||
|
||
sort_mode = str(self._ui_settings.value("vpn/locations_sort", "ping") or "ping").strip().lower()
|
||
idx = self.cmb_locations_sort.findData(sort_mode)
|
||
if idx < 0:
|
||
idx = 0
|
||
self.cmb_locations_sort.blockSignals(True)
|
||
self.cmb_locations_sort.setCurrentIndex(idx)
|
||
self.cmb_locations_sort.blockSignals(False)
|
||
|
||
g_route = str(
|
||
self._ui_settings.value("singbox/global_routing", "selective") or "selective"
|
||
).strip().lower()
|
||
idx = self.cmb_singbox_global_routing.findData(g_route)
|
||
if idx < 0:
|
||
idx = 0
|
||
self.cmb_singbox_global_routing.blockSignals(True)
|
||
self.cmb_singbox_global_routing.setCurrentIndex(idx)
|
||
self.cmb_singbox_global_routing.blockSignals(False)
|
||
|
||
g_dns = str(
|
||
self._ui_settings.value("singbox/global_dns", "system_resolver") or "system_resolver"
|
||
).strip().lower()
|
||
idx = self.cmb_singbox_global_dns.findData(g_dns)
|
||
if idx < 0:
|
||
idx = 0
|
||
self.cmb_singbox_global_dns.blockSignals(True)
|
||
self.cmb_singbox_global_dns.setCurrentIndex(idx)
|
||
self.cmb_singbox_global_dns.blockSignals(False)
|
||
|
||
g_kill = str(
|
||
self._ui_settings.value("singbox/global_killswitch", "on") or "on"
|
||
).strip().lower()
|
||
idx = self.cmb_singbox_global_killswitch.findData(g_kill)
|
||
if idx < 0:
|
||
idx = 0
|
||
self.cmb_singbox_global_killswitch.blockSignals(True)
|
||
self.cmb_singbox_global_killswitch.setCurrentIndex(idx)
|
||
self.cmb_singbox_global_killswitch.blockSignals(False)
|
||
|
||
p_route = str(
|
||
self._ui_settings.value("singbox/profile_routing", "global") or "global"
|
||
).strip().lower()
|
||
idx = self.cmb_singbox_profile_routing.findData(p_route)
|
||
if idx < 0:
|
||
idx = 0
|
||
self.cmb_singbox_profile_routing.blockSignals(True)
|
||
self.cmb_singbox_profile_routing.setCurrentIndex(idx)
|
||
self.cmb_singbox_profile_routing.blockSignals(False)
|
||
|
||
p_dns = str(
|
||
self._ui_settings.value("singbox/profile_dns", "global") or "global"
|
||
).strip().lower()
|
||
idx = self.cmb_singbox_profile_dns.findData(p_dns)
|
||
if idx < 0:
|
||
idx = 0
|
||
self.cmb_singbox_profile_dns.blockSignals(True)
|
||
self.cmb_singbox_profile_dns.setCurrentIndex(idx)
|
||
self.cmb_singbox_profile_dns.blockSignals(False)
|
||
|
||
p_kill = str(
|
||
self._ui_settings.value("singbox/profile_killswitch", "global") or "global"
|
||
).strip().lower()
|
||
idx = self.cmb_singbox_profile_killswitch.findData(p_kill)
|
||
if idx < 0:
|
||
idx = 0
|
||
self.cmb_singbox_profile_killswitch.blockSignals(True)
|
||
self.cmb_singbox_profile_killswitch.setCurrentIndex(idx)
|
||
self.cmb_singbox_profile_killswitch.blockSignals(False)
|
||
|
||
raw = self._ui_settings.value("singbox/profile_use_global_routing", True)
|
||
use_global_route = (
|
||
raw.strip().lower() in ("1", "true", "yes", "on")
|
||
if isinstance(raw, str)
|
||
else bool(raw)
|
||
)
|
||
raw = self._ui_settings.value("singbox/profile_use_global_dns", True)
|
||
use_global_dns = (
|
||
raw.strip().lower() in ("1", "true", "yes", "on")
|
||
if isinstance(raw, str)
|
||
else bool(raw)
|
||
)
|
||
raw = self._ui_settings.value("singbox/profile_use_global_killswitch", True)
|
||
use_global_kill = (
|
||
raw.strip().lower() in ("1", "true", "yes", "on")
|
||
if isinstance(raw, str)
|
||
else bool(raw)
|
||
)
|
||
self.chk_singbox_profile_use_global_routing.blockSignals(True)
|
||
self.chk_singbox_profile_use_global_routing.setChecked(use_global_route)
|
||
self.chk_singbox_profile_use_global_routing.blockSignals(False)
|
||
self.chk_singbox_profile_use_global_dns.blockSignals(True)
|
||
self.chk_singbox_profile_use_global_dns.setChecked(use_global_dns)
|
||
self.chk_singbox_profile_use_global_dns.blockSignals(False)
|
||
self.chk_singbox_profile_use_global_killswitch.blockSignals(True)
|
||
self.chk_singbox_profile_use_global_killswitch.setChecked(use_global_kill)
|
||
self.chk_singbox_profile_use_global_killswitch.blockSignals(False)
|
||
|
||
raw = self._ui_settings.value("singbox/ui_show_profile_settings", False)
|
||
show_profile = (
|
||
raw.strip().lower() in ("1", "true", "yes", "on")
|
||
if isinstance(raw, str)
|
||
else bool(raw)
|
||
)
|
||
raw = self._ui_settings.value("singbox/ui_show_global_defaults", False)
|
||
show_global = (
|
||
raw.strip().lower() in ("1", "true", "yes", "on")
|
||
if isinstance(raw, str)
|
||
else bool(raw)
|
||
)
|
||
raw = self._ui_settings.value("singbox/ui_show_activity_log", False)
|
||
show_activity = (
|
||
raw.strip().lower() in ("1", "true", "yes", "on")
|
||
if isinstance(raw, str)
|
||
else bool(raw)
|
||
)
|
||
if show_profile and show_global:
|
||
show_global = False
|
||
|
||
self.btn_singbox_toggle_profile_settings.blockSignals(True)
|
||
self.btn_singbox_toggle_profile_settings.setChecked(show_profile)
|
||
self.btn_singbox_toggle_profile_settings.blockSignals(False)
|
||
self.btn_singbox_toggle_global_defaults.blockSignals(True)
|
||
self.btn_singbox_toggle_global_defaults.setChecked(show_global)
|
||
self.btn_singbox_toggle_global_defaults.blockSignals(False)
|
||
self.btn_singbox_toggle_activity.blockSignals(True)
|
||
self.btn_singbox_toggle_activity.setChecked(show_activity)
|
||
self.btn_singbox_toggle_activity.blockSignals(False)
|
||
|
||
self._apply_singbox_profile_controls()
|
||
self._apply_singbox_compact_visibility()
|
||
|
||
def _save_ui_preferences(self) -> None:
|
||
self._ui_settings.setValue(
|
||
"routes/prewarm_aggressive",
|
||
bool(self.chk_routes_prewarm_aggressive.isChecked()),
|
||
)
|
||
self._ui_settings.setValue(
|
||
"vpn/locations_sort",
|
||
str(self.cmb_locations_sort.currentData() or "ping"),
|
||
)
|
||
self._ui_settings.setValue(
|
||
"singbox/global_routing",
|
||
str(self.cmb_singbox_global_routing.currentData() or "selective"),
|
||
)
|
||
self._ui_settings.setValue(
|
||
"singbox/global_dns",
|
||
str(self.cmb_singbox_global_dns.currentData() or "system_resolver"),
|
||
)
|
||
self._ui_settings.setValue(
|
||
"singbox/global_killswitch",
|
||
str(self.cmb_singbox_global_killswitch.currentData() or "on"),
|
||
)
|
||
self._ui_settings.setValue(
|
||
"singbox/profile_use_global_routing",
|
||
bool(self.chk_singbox_profile_use_global_routing.isChecked()),
|
||
)
|
||
self._ui_settings.setValue(
|
||
"singbox/profile_use_global_dns",
|
||
bool(self.chk_singbox_profile_use_global_dns.isChecked()),
|
||
)
|
||
self._ui_settings.setValue(
|
||
"singbox/profile_use_global_killswitch",
|
||
bool(self.chk_singbox_profile_use_global_killswitch.isChecked()),
|
||
)
|
||
self._ui_settings.setValue(
|
||
"singbox/profile_routing",
|
||
str(self.cmb_singbox_profile_routing.currentData() or "global"),
|
||
)
|
||
self._ui_settings.setValue(
|
||
"singbox/profile_dns",
|
||
str(self.cmb_singbox_profile_dns.currentData() or "global"),
|
||
)
|
||
self._ui_settings.setValue(
|
||
"singbox/profile_killswitch",
|
||
str(self.cmb_singbox_profile_killswitch.currentData() or "global"),
|
||
)
|
||
self._ui_settings.setValue(
|
||
"singbox/ui_show_profile_settings",
|
||
bool(self.btn_singbox_toggle_profile_settings.isChecked()),
|
||
)
|
||
self._ui_settings.setValue(
|
||
"singbox/ui_show_global_defaults",
|
||
bool(self.btn_singbox_toggle_global_defaults.isChecked()),
|
||
)
|
||
self._ui_settings.setValue(
|
||
"singbox/ui_show_activity_log",
|
||
bool(self.btn_singbox_toggle_activity.isChecked()),
|
||
)
|
||
self._ui_settings.sync()
|