#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ DashboardController Тонкий "мозг" между UI и ApiClient. UI не должен знать URL'ы / JSON, только вызывать методы этого контроллера. """ from __future__ import annotations from dataclasses import dataclass import os import re from typing import Iterable, List, Literal, Optional, cast # вырезаем спам автопроверки из логов (CLI любит писать "Next check in ...") _NEXT_CHECK_RE = re.compile( r"(?:\b\d+s\.)?\s*Next check in\s+\d+s\.?", re.IGNORECASE ) from api_client import ( ApiClient, CmdResult, DNSStatus, DnsUpstreams, DomainsFile, DomainsTable, Event, LoginState, Status, TrafficCandidates, TrafficInterfaces, TrafficModeStatus, TraceDump, UnitState, VpnLocation, VpnStatus, SmartdnsRuntimeState, # login flow models LoginSessionStart, LoginSessionState, LoginSessionAction, ) TraceMode = Literal["full", "gui", "smartdns"] ServiceAction = Literal["start", "stop", "restart"] LoginAction = Literal["open", "check", "cancel"] # --------------------------- # View models (UI-friendly) # --------------------------- @dataclass(frozen=True) class LoginView: text: str color: str logged_in: bool email: str @dataclass(frozen=True) class StatusOverviewView: timestamp: str counts: str iface_table_mark: str policy_route: str routes_service: str smartdns_service: str vpn_service: str @dataclass(frozen=True) class VpnStatusView: desired_location: str pretty_text: str @dataclass(frozen=True) class ActionView: ok: bool pretty_text: str @dataclass(frozen=True) class LoginFlowView: phase: str level: str dot_color: str status_text: str url: str email: str alive: bool cursor: int lines: List[str] can_open: bool can_check: bool can_cancel: bool @dataclass(frozen=True) class VpnAutoconnectView: """Для блока Autoconnect на вкладке AdGuardVPN.""" enabled: bool # True = включён autoloop unit_text: str # строка вида "unit: active" color: str # "green" / "red" / "orange" @dataclass(frozen=True) class RoutesNftProgressView: """Прогресс обновления nft-наборов (agvpn4).""" percent: int message: str active: bool # True — пока идёт апдейт, False — когда закончили / ничего не идёт @dataclass(frozen=True) class TrafficModeView: desired_mode: str applied_mode: str preferred_iface: str auto_local_bypass: bool bypass_candidates: int force_vpn_subnets: List[str] force_vpn_uids: List[str] force_vpn_cgroups: List[str] force_direct_subnets: List[str] force_direct_uids: List[str] force_direct_cgroups: List[str] overrides_applied: int cgroup_resolved_uids: int cgroup_warning: str active_iface: str iface_reason: str probe_ok: bool probe_message: str healthy: bool message: str # --------------------------- # Controller # --------------------------- class DashboardController: def __init__( self, client: ApiClient, *, routes_unit: Optional[str] = None, smartdns_unit: Optional[str] = None, ) -> None: self.client = client self.routes_unit = ( routes_unit or os.environ.get("SELECTIVE_VPN_ROUTES_UNIT") or "" ) self.smartdns_unit = ( smartdns_unit or os.environ.get("SELECTIVE_VPN_SMARTDNS_UNIT") or "smartdns-local.service" ) # -------- logging -------- def log_gui(self, msg: str) -> None: self.client.trace_append("gui", msg) def log_smartdns(self, msg: str) -> None: self.client.trace_append("smartdns", msg) # -------- events stream -------- def iter_events(self, since: int = 0, stop=None): return self.client.events_stream(since=since, stop=stop) def classify_event(self, ev: Event) -> List[str]: """Return list of areas to refresh for given event kind.""" k = (ev.kind or "").strip().lower() if not k: return [] if k in ("status_changed", "status_error"): return ["status", "routes", "vpn"] if k in ("login_state_changed", "login_state_error"): return ["login", "vpn"] if k == "autoloop_status_changed": return ["vpn"] if k == "unit_state_changed": return ["status", "vpn", "routes", "dns"] if k in ("trace_changed", "trace_append"): return ["trace"] if k == "routes_nft_progress": # перерисовать блок "routes" (кнопки + прогресс) return ["routes"] if k == "traffic_mode_changed": return ["routes", "status"] return [] # -------- helpers -------- def _is_logged_in_state(self, st: LoginState) -> bool: # backend “state” может быть любым, делаем устойчивую проверку s = (st.state or "").strip().lower() if st.email: return True if s in ("ok", "logged", "logged_in", "success", "authorized", "ready"): return True return False def _level_to_color(self, level: str) -> str: lv = (level or "").strip().lower() if lv in ("green", "ok", "true", "success"): return "green" if lv in ("red", "error", "false", "failed"): return "red" return "orange" # -------- overview / status -------- def get_login_view(self) -> LoginView: st: LoginState = self.client.get_login_state() # Prefer backend UI-ready "text" if provided, else build it. if st.text: txt = st.text else: if st.email: txt = f"AdGuard VPN: logged in as {st.email}" else: txt = "AdGuard VPN: (no login data)" logged_in = self._is_logged_in_state(st) # Цвет: либо из backend, либо простой нормализованный вариант if st.color: color = st.color else: if logged_in: color = "green" else: s = (st.state or "").strip().lower() color = "orange" if s in ("unknown", "checking") else "red" return LoginView( text=txt, color=color, logged_in=logged_in, email=st.email or "", ) def get_status_overview(self) -> StatusOverviewView: st: Status = self.client.get_status() routes_unit = self._resolve_routes_unit(st.iface) routes_s: UnitState = ( self.client.systemd_state(routes_unit) if routes_unit else UnitState(state="unknown") ) smartdns_s: UnitState = self.client.systemd_state(self.smartdns_unit) vpn_st: VpnStatus = self.client.vpn_status() counts = f"domains={st.domain_count}, ips={st.ip_count}" iface = f"iface={st.iface} table={st.table} mark={st.mark}" policy_route = self._format_policy_route(st.policy_route_ok, st.route_ok) # SmartDNS: если state пустой/unknown — считаем это ошибкой smart_state = smartdns_s.state or "unknown" if smart_state.lower() in ("", "unknown", "failed"): smart_state = "ERROR (unknown state)" return StatusOverviewView( timestamp=st.timestamp or "—", counts=counts, iface_table_mark=iface, policy_route=policy_route, routes_service=f"{routes_unit or 'selective-vpn2@.service'}: {routes_s.state}", smartdns_service=f"{self.smartdns_unit}: {smart_state}", # это состояние самого VPN-юнита, НЕ autoloop: # т.е. работает ли AdGuardVPN-daemon / туннель vpn_service=f"VPN: {vpn_st.unit_state}", ) def _format_policy_route( self, policy_ok: Optional[bool], route_ok: Optional[bool], ) -> str: if policy_ok is None and route_ok is None: return "unknown (not checked)" val = policy_ok if policy_ok is not None else route_ok if val is True: return "OK (default route present in VPN table)" return "MISSING default route in VPN table" def _resolve_routes_unit(self, iface: str) -> str: forced = (self.routes_unit or "").strip() if forced: return forced ifc = (iface or "").strip() if ifc and ifc != "-": return f"selective-vpn2@{ifc}.service" return "" # -------- VPN -------- def vpn_locations_view(self) -> List[VpnLocation]: return self.client.vpn_locations() def vpn_status_view(self) -> VpnStatusView: st = self.client.vpn_status() pretty = self._pretty_vpn_status(st) return VpnStatusView( desired_location=st.desired_location, pretty_text=pretty, ) # --- autoconnect / autoloop --- def _autoconnect_from_auto(self, auto) -> bool: """ Вытаскиваем True/False из ответа /vpn/autoloop/status. Приоритет: 1) явное поле auto.enabled (bool) 2) эвристика по status_word / raw_text """ enabled_field = getattr(auto, "enabled", None) if isinstance(enabled_field, bool): return enabled_field word = (getattr(auto, "status_word", "") or "").strip().lower() raw = (getattr(auto, "raw_text", "") or "").lower() # приоритет — явные статусы if word in ( "active", "running", "enabled", "on", "up", "started", "ok", "true", "yes", ): return True if word in ("inactive", "stopped", "disabled", "off", "down", "false", "no"): return False # фоллбек — по raw_text if "inactive" in raw or "disabled" in raw or "failed" in raw: return False if "active" in raw or "running" in raw or "enabled" in raw: return True return False def vpn_autoconnect_view(self) -> VpnAutoconnectView: try: auto = self.client.vpn_autoloop_status() except Exception as e: return VpnAutoconnectView( enabled=False, unit_text=f"unit: ERROR ({e})", color="red", ) enabled = self._autoconnect_from_auto(auto) unit_state = ( getattr(auto, "unit_state", "") # если backend так отдаёт or (auto.status_word or "") or "unknown" ) text = f"unit: {unit_state}" low = f"{unit_state} {(auto.raw_text or '')}".lower() if any(x in low for x in ("failed", "error", "unknown", "inactive", "dead")): color = "red" elif "active" in low or "running" in low or "enabled" in low: color = "green" else: color = "orange" return VpnAutoconnectView(enabled=enabled, unit_text=text, color=color) def vpn_autoconnect_enabled(self) -> bool: """Старый интерфейс — оставляем для кнопки toggle.""" return self.vpn_autoconnect_view().enabled def vpn_set_autoconnect(self, enable: bool) -> VpnStatusView: res = self.client.vpn_autoconnect(enable) st = self.client.vpn_status() pretty = self._pretty_cmd_then_status(res, st) return VpnStatusView( desired_location=st.desired_location, pretty_text=pretty, ) def vpn_set_location(self, iso: str) -> VpnStatusView: self.client.vpn_set_location(iso) st = self.client.vpn_status() pretty = self._pretty_vpn_status(st) return VpnStatusView( desired_location=st.desired_location, pretty_text=pretty, ) def _pretty_vpn_status(self, st: VpnStatus) -> str: lines = [ f"unit_state: {st.unit_state}", f"desired_location: {st.desired_location or '—'}", f"status: {st.status_word}", ] if st.raw_text: lines.append("") lines.append(st.raw_text.strip()) return "\n".join(lines).strip() + "\n" # -------- Login Flow (interactive) -------- def login_flow_start(self) -> LoginFlowView: s: LoginSessionStart = self.client.vpn_login_session_start() dot = self._level_to_color(s.level) if not s.ok: txt = s.error or "Failed to start login session" return LoginFlowView( phase=s.phase or "failed", level=s.level or "red", dot_color="red", status_text=txt, url="", email="", alive=False, cursor=0, lines=[txt], can_open=False, can_check=False, can_cancel=False, ) if (s.phase or "").lower() == "already_logged": txt = ( f"Already logged in as {s.email}" if s.email else "Already logged in" ) return LoginFlowView( phase="already_logged", level="green", dot_color="green", status_text=txt, url="", email=s.email or "", alive=False, cursor=0, lines=[txt], can_open=False, can_check=False, can_cancel=False, ) txt = f"Login started (pid={s.pid})" if s.pid else "Login started" return LoginFlowView( phase=s.phase or "starting", level=s.level or "yellow", dot_color=dot, status_text=txt, url="", email="", alive=True, cursor=0, lines=[], can_open=True, can_check=True, can_cancel=True, ) def login_flow_poll(self, since: int) -> LoginFlowView: st: LoginSessionState = self.client.vpn_login_session_state(since=since) dot = self._level_to_color(st.level) phase = (st.phase or "").lower() if phase == "waiting_browser": status_txt = "Waiting for browser authorization…" elif phase == "checking": status_txt = "Checking…" elif phase == "success": status_txt = "✅ Logged in" elif phase == "failed": status_txt = "❌ Login failed" elif phase == "cancelled": status_txt = "Cancelled" elif phase == "already_logged": status_txt = ( f"Already logged in as {st.email}" if st.email else "Already logged in" ) else: status_txt = st.phase or "…" clean_lines = self._clean_login_lines(st.lines) return LoginFlowView( phase=st.phase, level=st.level, dot_color=dot, status_text=status_txt, url=st.url, email=st.email, alive=st.alive, cursor=st.cursor, can_open=st.can_open, can_check=st.can_cancel, can_cancel=st.can_cancel, lines=clean_lines, ) def login_flow_action(self, action: str) -> ActionView: act = action.strip().lower() if act not in ("open", "check", "cancel"): raise ValueError(f"Invalid login action: {action}") res: LoginSessionAction = self.client.vpn_login_session_action( cast(LoginAction, act) ) if not res.ok: txt = res.error or "Login action failed" return ActionView(ok=False, pretty_text=txt + "\n") txt = f"OK: {act} → phase={res.phase} level={res.level}" return ActionView(ok=True, pretty_text=txt + "\n") def login_flow_stop(self) -> ActionView: res = self.client.vpn_login_session_stop() return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res)) def vpn_logout(self) -> ActionView: res = self.client.vpn_logout() return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res)) # Баннер "AdGuard VPN: logged in as ...", по клику показываем инфу как в CLI def login_banner_cli_text(self) -> str: try: st: LoginState = self.client.get_login_state() except Exception as e: return f"Failed to query login state: {e}" # backend может не иметь поля error, поэтому через getattr err = getattr(st, "error", None) or getattr(st, "message", None) if err: return str(err) if st.email: return f"You are already logged in.\nCurrent user is {st.email}" if st.state: return f"Login state: {st.state}" return "No login information available." # -------- Routes -------- def routes_service_action(self, action: str) -> ActionView: act = action.strip().lower() if act not in ("start", "stop", "restart"): raise ValueError(f"Invalid routes action: {action}") res = self.client.routes_service(cast(ServiceAction, act)) return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res)) def routes_clear(self) -> ActionView: res = self.client.routes_clear() return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res)) def routes_cache_restore(self) -> ActionView: res = self.client.routes_cache_restore() return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res)) def routes_fix_policy_route(self) -> ActionView: res = self.client.routes_fix_policy_route() return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res)) def routes_timer_enabled(self) -> bool: st = self.client.routes_timer_get() return bool(st.enabled) def routes_timer_set(self, enabled: bool) -> ActionView: res = self.client.routes_timer_set(bool(enabled)) return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res)) def traffic_mode_view(self) -> TrafficModeView: st: TrafficModeStatus = self.client.traffic_mode_get() return TrafficModeView( desired_mode=(st.desired_mode or st.mode or "selective"), applied_mode=(st.applied_mode or "direct"), preferred_iface=st.preferred_iface or "", auto_local_bypass=bool(st.auto_local_bypass), bypass_candidates=int(st.bypass_candidates), force_vpn_subnets=list(st.force_vpn_subnets or []), force_vpn_uids=list(st.force_vpn_uids or []), force_vpn_cgroups=list(st.force_vpn_cgroups or []), force_direct_subnets=list(st.force_direct_subnets or []), force_direct_uids=list(st.force_direct_uids or []), force_direct_cgroups=list(st.force_direct_cgroups or []), overrides_applied=int(st.overrides_applied), cgroup_resolved_uids=int(st.cgroup_resolved_uids), cgroup_warning=st.cgroup_warning or "", active_iface=st.active_iface or "", iface_reason=st.iface_reason or "", probe_ok=bool(st.probe_ok), probe_message=st.probe_message or "", healthy=bool(st.healthy), message=st.message or "", ) def traffic_mode_set( self, mode: str, preferred_iface: Optional[str] = None, auto_local_bypass: Optional[bool] = None, force_vpn_subnets: Optional[List[str]] = None, force_vpn_uids: Optional[List[str]] = None, force_vpn_cgroups: Optional[List[str]] = None, force_direct_subnets: Optional[List[str]] = None, force_direct_uids: Optional[List[str]] = None, force_direct_cgroups: Optional[List[str]] = None, ) -> TrafficModeView: st: TrafficModeStatus = self.client.traffic_mode_set( mode, preferred_iface, auto_local_bypass, force_vpn_subnets, force_vpn_uids, force_vpn_cgroups, force_direct_subnets, force_direct_uids, force_direct_cgroups, ) return TrafficModeView( desired_mode=(st.desired_mode or st.mode or mode), applied_mode=(st.applied_mode or "direct"), preferred_iface=st.preferred_iface or "", auto_local_bypass=bool(st.auto_local_bypass), bypass_candidates=int(st.bypass_candidates), force_vpn_subnets=list(st.force_vpn_subnets or []), force_vpn_uids=list(st.force_vpn_uids or []), force_vpn_cgroups=list(st.force_vpn_cgroups or []), force_direct_subnets=list(st.force_direct_subnets or []), force_direct_uids=list(st.force_direct_uids or []), force_direct_cgroups=list(st.force_direct_cgroups or []), overrides_applied=int(st.overrides_applied), cgroup_resolved_uids=int(st.cgroup_resolved_uids), cgroup_warning=st.cgroup_warning or "", active_iface=st.active_iface or "", iface_reason=st.iface_reason or "", probe_ok=bool(st.probe_ok), probe_message=st.probe_message or "", healthy=bool(st.healthy), message=st.message or "", ) def traffic_mode_test(self) -> TrafficModeView: st: TrafficModeStatus = self.client.traffic_mode_test() return TrafficModeView( desired_mode=(st.desired_mode or st.mode or "selective"), applied_mode=(st.applied_mode or "direct"), preferred_iface=st.preferred_iface or "", auto_local_bypass=bool(st.auto_local_bypass), bypass_candidates=int(st.bypass_candidates), force_vpn_subnets=list(st.force_vpn_subnets or []), force_vpn_uids=list(st.force_vpn_uids or []), force_vpn_cgroups=list(st.force_vpn_cgroups or []), force_direct_subnets=list(st.force_direct_subnets or []), force_direct_uids=list(st.force_direct_uids or []), force_direct_cgroups=list(st.force_direct_cgroups or []), overrides_applied=int(st.overrides_applied), cgroup_resolved_uids=int(st.cgroup_resolved_uids), cgroup_warning=st.cgroup_warning or "", active_iface=st.active_iface or "", iface_reason=st.iface_reason or "", probe_ok=bool(st.probe_ok), probe_message=st.probe_message or "", healthy=bool(st.healthy), message=st.message or "", ) def traffic_interfaces(self) -> List[str]: st: TrafficInterfaces = self.client.traffic_interfaces_get() vals = [x for x in st.interfaces if x] if st.preferred_iface and st.preferred_iface not in vals: vals.insert(0, st.preferred_iface) return vals def traffic_candidates(self) -> TrafficCandidates: return self.client.traffic_candidates_get() def routes_nft_progress_from_event(self, ev: Event) -> RoutesNftProgressView: """ Превращает Event(kind='routes_nft_progress') в удобную модель для прогресс-бара/лейбла. """ payload = ( getattr(ev, "data", None) or getattr(ev, "payload", None) or getattr(ev, "extra", None) or {} ) if not isinstance(payload, dict): payload = {} try: percent = int(payload.get("percent", 0)) except Exception: percent = 0 msg = str(payload.get("message", "")) if payload is not None else "" if not msg: msg = "Updating nft set…" active = 0 <= percent < 100 return RoutesNftProgressView( percent=percent, message=msg, active=active, ) # -------- DNS / SmartDNS -------- def dns_upstreams_view(self) -> DnsUpstreams: return self.client.dns_upstreams_get() def dns_upstreams_save(self, cfg: DnsUpstreams) -> None: self.client.dns_upstreams_set(cfg) def dns_status_view(self) -> DNSStatus: return self.client.dns_status_get() def dns_mode_set(self, via: bool, smartdns_addr: str) -> DNSStatus: return self.client.dns_mode_set(via, smartdns_addr) def smartdns_service_action(self, action: str) -> DNSStatus: act = action.strip().lower() if act not in ("start", "stop", "restart"): raise ValueError(f"Invalid SmartDNS action: {action}") return self.client.dns_smartdns_service_set(cast(ServiceAction, act)) def smartdns_prewarm(self, limit: int = 0, aggressive_subs: bool = False) -> ActionView: res = self.client.smartdns_prewarm(limit=limit, aggressive_subs=aggressive_subs) return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res)) def smartdns_runtime_view(self) -> SmartdnsRuntimeState: return self.client.smartdns_runtime_get() def smartdns_runtime_set(self, enabled: bool, restart: bool = True) -> SmartdnsRuntimeState: return self.client.smartdns_runtime_set(enabled=enabled, restart=restart) # -------- Domains -------- def domains_table_view(self) -> DomainsTable: return self.client.domains_table() def domains_file_load(self, name: str) -> DomainsFile: nm = name.strip().lower() if nm not in ("bases", "meta", "subs", "static", "smartdns", "last-ips-map", "last-ips-map-direct", "last-ips-map-wildcard"): raise ValueError(f"Invalid domains file name: {name}") return self.client.domains_file_get( cast(Literal["bases", "meta", "subs", "static", "smartdns", "last-ips-map", "last-ips-map-direct", "last-ips-map-wildcard"], nm) ) def domains_file_save(self, name: str, content: str) -> None: nm = name.strip().lower() if nm not in ("bases", "meta", "subs", "static", "smartdns", "last-ips-map", "last-ips-map-direct", "last-ips-map-wildcard"): raise ValueError(f"Invalid domains file name: {name}") self.client.domains_file_set( cast(Literal["bases", "meta", "subs", "static", "smartdns", "last-ips-map", "last-ips-map-direct", "last-ips-map-wildcard"], nm), content ) # -------- Trace -------- def trace_view(self, mode: TraceMode = "full") -> TraceDump: return self.client.trace_get(mode) # -------- formatting helpers -------- def _pretty_cmd(self, res: CmdResult) -> str: lines: List[str] = [] lines.append("OK" if res.ok else "ERROR") if res.message: lines.append(res.message.strip()) if res.exit_code is not None: lines.append(f"exit_code: {res.exit_code}") if res.stdout.strip(): lines.append("") lines.append("stdout:") lines.append(res.stdout.rstrip()) if res.stderr.strip() and res.stderr.strip() != res.stdout.strip(): lines.append("") lines.append("stderr:") lines.append(res.stderr.rstrip()) return "\n".join(lines).strip() + "\n" def _pretty_cmd_then_status(self, res: CmdResult, st: VpnStatus) -> str: return ( self._pretty_cmd(res).rstrip() + "\n\n" + self._pretty_vpn_status(st).rstrip() + "\n" ) def _clean_login_lines(self, lines: Iterable[str]) -> List[str]: out: List[str] = [] for raw in lines or []: if raw is None: continue s = str(raw).replace("\r", "\n") for part in s.splitlines(): t = part.strip() if not t: continue # вырезаем спам "Next check in ..." t2 = _NEXT_CHECK_RE.sub("", t).strip() if not t2: continue # на всякий — повторно t2 = _NEXT_CHECK_RE.sub("", t2).strip() if not t2: continue out.append(t2) return out