#!/usr/bin/env python3 from __future__ import annotations import re from typing import Iterable, List, Optional from api_client import CmdResult, Event, LoginState, VpnStatus # Вырезаем спам автопроверки из логов (CLI любит писать "Next check in ..."). _NEXT_CHECK_RE = re.compile( r"(?:\b\d+s\.)?\s*Next check in\s+\d+s\.?,?", re.IGNORECASE ) class ControllerCoreMixin: # -------- logging -------- def log_gui(self, msg: str) -> None: self.client.trace_append("gui", msg) def log_smartdns(self, msg: str) -> None: self.client.trace_append("smartdns", msg) # -------- events stream -------- def iter_events(self, since: int = 0, stop=None): return self.client.events_stream(since=since, stop=stop) def classify_event(self, ev: Event) -> List[str]: """Return list of areas to refresh for given event kind.""" k = (ev.kind or "").strip().lower() if not k: return [] if k in ("status_changed", "status_error"): return ["status", "routes", "vpn"] if k in ("login_state_changed", "login_state_error"): return ["login", "vpn"] if k == "autoloop_status_changed": return ["vpn"] if k == "vpn_locations_changed": return ["vpn"] if k == "unit_state_changed": return ["status", "vpn", "routes", "dns"] if k in ("trace_changed", "trace_append"): return ["trace"] if k == "routes_nft_progress": # Перерисовать блок "routes" (кнопки + прогресс). return ["routes"] if k == "traffic_mode_changed": return ["routes", "status"] if k == "traffic_profiles_changed": # Used by Traffic mode dialog (Apps/runtime) for persistent app profiles. return ["routes"] if k in ( "transport_client_state_changed", "transport_client_health_changed", "transport_client_provisioned", "transport_policy_validated", "transport_policy_applied", "transport_conflict_detected", ): return ["transport", "status"] if k == "egress_identity_changed": return ["vpn", "transport"] return [] # -------- helpers -------- def _is_logged_in_state(self, st: LoginState) -> bool: # Backend "state" может быть любым, делаем устойчивую проверку. s = (st.state or "").strip().lower() if st.email: return True if s in ("ok", "logged", "logged_in", "success", "authorized", "ready"): return True return False def _level_to_color(self, level: str) -> str: lv = (level or "").strip().lower() if lv in ("green", "ok", "true", "success"): return "green" if lv in ("red", "error", "false", "failed"): return "red" return "orange" def _format_policy_route( self, policy_ok: Optional[bool], route_ok: Optional[bool], ) -> str: if policy_ok is None and route_ok is None: return "unknown (not checked)" val = policy_ok if policy_ok is not None else route_ok if val is True: return "OK (default route present in VPN table)" return "MISSING default route in VPN table" def _resolve_routes_unit(self, iface: str) -> str: forced = (self.routes_unit or "").strip() if forced: return forced ifc = (iface or "").strip() if ifc and ifc != "-": return f"selective-vpn2@{ifc}.service" return "" # -------- formatting helpers -------- def _pretty_cmd(self, res: CmdResult) -> str: lines: List[str] = [] lines.append("OK" if res.ok else "ERROR") if res.message: lines.append(res.message.strip()) if res.exit_code is not None: lines.append(f"exit_code: {res.exit_code}") if res.stdout.strip(): lines.append("") lines.append("stdout:") lines.append(res.stdout.rstrip()) if res.stderr.strip() and res.stderr.strip() != res.stdout.strip(): lines.append("") lines.append("stderr:") lines.append(res.stderr.rstrip()) return "\n".join(lines).strip() + "\n" def _pretty_cmd_then_status(self, res: CmdResult, st: VpnStatus) -> str: return ( self._pretty_cmd(res).rstrip() + "\n\n" + self._pretty_vpn_status(st).rstrip() + "\n" ) def _clean_login_lines(self, lines: Iterable[str]) -> List[str]: out: List[str] = [] for raw in lines or []: if raw is None: continue s = str(raw).replace("\r", "\n") for part in s.splitlines(): t = part.strip() if not t: continue # Вырезаем спам "Next check in ...". t2 = _NEXT_CHECK_RE.sub("", t).strip() if not t2: continue # На всякий повторно. t2 = _NEXT_CHECK_RE.sub("", t2).strip() if not t2: continue out.append(t2) return out