Files
elmprodvpn/selective-vpn-gui/dashboard_controller.py
2026-02-16 01:36:42 +03:00

913 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
DashboardController
Тонкий "мозг" между UI и ApiClient.
UI не должен знать URL'ы / JSON, только вызывать методы этого контроллера.
"""
from __future__ import annotations
from dataclasses import dataclass
import os
import re
from typing import Iterable, List, Literal, Optional, cast
# вырезаем спам автопроверки из логов (CLI любит писать "Next check in ...")
_NEXT_CHECK_RE = re.compile(
r"(?:\b\d+s\.)?\s*Next check in\s+\d+s\.?", re.IGNORECASE
)
from api_client import (
ApiClient,
CmdResult,
DNSStatus,
DnsUpstreams,
DomainsFile,
DomainsTable,
Event,
LoginState,
Status,
TrafficCandidates,
TrafficAppMarksResult,
TrafficAppMarksStatus,
TrafficAppMarkItem,
TrafficAppProfile,
TrafficAppProfileSaveResult,
TrafficAudit,
TrafficInterfaces,
TrafficModeStatus,
TraceDump,
UnitState,
VpnLocation,
VpnStatus,
SmartdnsRuntimeState,
# login flow models
LoginSessionStart,
LoginSessionState,
LoginSessionAction,
)
TraceMode = Literal["full", "gui", "smartdns"]
ServiceAction = Literal["start", "stop", "restart"]
LoginAction = Literal["open", "check", "cancel"]
# ---------------------------
# View models (UI-friendly)
# ---------------------------
@dataclass(frozen=True)
class LoginView:
text: str
color: str
logged_in: bool
email: str
@dataclass(frozen=True)
class StatusOverviewView:
timestamp: str
counts: str
iface_table_mark: str
policy_route: str
routes_service: str
smartdns_service: str
vpn_service: str
@dataclass(frozen=True)
class VpnStatusView:
desired_location: str
pretty_text: str
@dataclass(frozen=True)
class ActionView:
ok: bool
pretty_text: str
@dataclass(frozen=True)
class LoginFlowView:
phase: str
level: str
dot_color: str
status_text: str
url: str
email: str
alive: bool
cursor: int
lines: List[str]
can_open: bool
can_check: bool
can_cancel: bool
@dataclass(frozen=True)
class VpnAutoconnectView:
"""Для блока Autoconnect на вкладке AdGuardVPN."""
enabled: bool # True = включён autoloop
unit_text: str # строка вида "unit: active"
color: str # "green" / "red" / "orange"
@dataclass(frozen=True)
class RoutesNftProgressView:
"""Прогресс обновления nft-наборов (agvpn4)."""
percent: int
message: str
active: bool # True — пока идёт апдейт, False — когда закончили / ничего не идёт
@dataclass(frozen=True)
class TrafficModeView:
desired_mode: str
applied_mode: str
preferred_iface: str
auto_local_bypass: bool
bypass_candidates: int
force_vpn_subnets: List[str]
force_vpn_uids: List[str]
force_vpn_cgroups: List[str]
force_direct_subnets: List[str]
force_direct_uids: List[str]
force_direct_cgroups: List[str]
overrides_applied: int
cgroup_resolved_uids: int
cgroup_warning: str
active_iface: str
iface_reason: str
probe_ok: bool
probe_message: str
healthy: bool
message: str
# ---------------------------
# Controller
# ---------------------------
class DashboardController:
def __init__(
self,
client: ApiClient,
*,
routes_unit: Optional[str] = None,
smartdns_unit: Optional[str] = None,
) -> None:
self.client = client
self.routes_unit = (
routes_unit
or os.environ.get("SELECTIVE_VPN_ROUTES_UNIT")
or ""
)
self.smartdns_unit = (
smartdns_unit
or os.environ.get("SELECTIVE_VPN_SMARTDNS_UNIT")
or "smartdns-local.service"
)
# -------- logging --------
def log_gui(self, msg: str) -> None:
self.client.trace_append("gui", msg)
def log_smartdns(self, msg: str) -> None:
self.client.trace_append("smartdns", msg)
# -------- events stream --------
def iter_events(self, since: int = 0, stop=None):
return self.client.events_stream(since=since, stop=stop)
def classify_event(self, ev: Event) -> List[str]:
"""Return list of areas to refresh for given event kind."""
k = (ev.kind or "").strip().lower()
if not k:
return []
if k in ("status_changed", "status_error"):
return ["status", "routes", "vpn"]
if k in ("login_state_changed", "login_state_error"):
return ["login", "vpn"]
if k == "autoloop_status_changed":
return ["vpn"]
if k == "unit_state_changed":
return ["status", "vpn", "routes", "dns"]
if k in ("trace_changed", "trace_append"):
return ["trace"]
if k == "routes_nft_progress":
# перерисовать блок "routes" (кнопки + прогресс)
return ["routes"]
if k == "traffic_mode_changed":
return ["routes", "status"]
if k == "traffic_profiles_changed":
# Used by Traffic mode dialog (Apps/runtime) for persistent app profiles.
return ["routes"]
return []
# -------- helpers --------
def _is_logged_in_state(self, st: LoginState) -> bool:
# backend “state” может быть любым, делаем устойчивую проверку
s = (st.state or "").strip().lower()
if st.email:
return True
if s in ("ok", "logged", "logged_in", "success", "authorized", "ready"):
return True
return False
def _level_to_color(self, level: str) -> str:
lv = (level or "").strip().lower()
if lv in ("green", "ok", "true", "success"):
return "green"
if lv in ("red", "error", "false", "failed"):
return "red"
return "orange"
# -------- overview / status --------
def get_login_view(self) -> LoginView:
st: LoginState = self.client.get_login_state()
# Prefer backend UI-ready "text" if provided, else build it.
if st.text:
txt = st.text
else:
if st.email:
txt = f"AdGuard VPN: logged in as {st.email}"
else:
txt = "AdGuard VPN: (no login data)"
logged_in = self._is_logged_in_state(st)
# Цвет: либо из backend, либо простой нормализованный вариант
if st.color:
color = st.color
else:
if logged_in:
color = "green"
else:
s = (st.state or "").strip().lower()
color = "orange" if s in ("unknown", "checking") else "red"
return LoginView(
text=txt,
color=color,
logged_in=logged_in,
email=st.email or "",
)
def get_status_overview(self) -> StatusOverviewView:
st: Status = self.client.get_status()
routes_unit = self._resolve_routes_unit(st.iface)
routes_s: UnitState = (
self.client.systemd_state(routes_unit)
if routes_unit
else UnitState(state="unknown")
)
smartdns_s: UnitState = self.client.systemd_state(self.smartdns_unit)
vpn_st: VpnStatus = self.client.vpn_status()
counts = f"domains={st.domain_count}, ips={st.ip_count}"
iface = f"iface={st.iface} table={st.table} mark={st.mark}"
policy_route = self._format_policy_route(st.policy_route_ok, st.route_ok)
# SmartDNS: если state пустой/unknown — считаем это ошибкой
smart_state = smartdns_s.state or "unknown"
if smart_state.lower() in ("", "unknown", "failed"):
smart_state = "ERROR (unknown state)"
return StatusOverviewView(
timestamp=st.timestamp or "",
counts=counts,
iface_table_mark=iface,
policy_route=policy_route,
routes_service=f"{routes_unit or 'selective-vpn2@<auto>.service'}: {routes_s.state}",
smartdns_service=f"{self.smartdns_unit}: {smart_state}",
# это состояние самого VPN-юнита, НЕ autoloop:
# т.е. работает ли AdGuardVPN-daemon / туннель
vpn_service=f"VPN: {vpn_st.unit_state}",
)
def _format_policy_route(
self,
policy_ok: Optional[bool],
route_ok: Optional[bool],
) -> str:
if policy_ok is None and route_ok is None:
return "unknown (not checked)"
val = policy_ok if policy_ok is not None else route_ok
if val is True:
return "OK (default route present in VPN table)"
return "MISSING default route in VPN table"
def _resolve_routes_unit(self, iface: str) -> str:
forced = (self.routes_unit or "").strip()
if forced:
return forced
ifc = (iface or "").strip()
if ifc and ifc != "-":
return f"selective-vpn2@{ifc}.service"
return ""
# -------- VPN --------
def vpn_locations_view(self) -> List[VpnLocation]:
return self.client.vpn_locations()
def vpn_status_view(self) -> VpnStatusView:
st = self.client.vpn_status()
pretty = self._pretty_vpn_status(st)
return VpnStatusView(
desired_location=st.desired_location,
pretty_text=pretty,
)
# --- autoconnect / autoloop ---
def _autoconnect_from_auto(self, auto) -> bool:
"""
Вытаскиваем True/False из ответа /vpn/autoloop/status.
Приоритет:
1) явное поле auto.enabled (bool)
2) эвристика по status_word / raw_text
"""
enabled_field = getattr(auto, "enabled", None)
if isinstance(enabled_field, bool):
return enabled_field
word = (getattr(auto, "status_word", "") or "").strip().lower()
raw = (getattr(auto, "raw_text", "") or "").lower()
# приоритет — явные статусы
if word in (
"active",
"running",
"enabled",
"on",
"up",
"started",
"ok",
"true",
"yes",
):
return True
if word in ("inactive", "stopped", "disabled", "off", "down", "false", "no"):
return False
# фоллбек — по raw_text
if "inactive" in raw or "disabled" in raw or "failed" in raw:
return False
if "active" in raw or "running" in raw or "enabled" in raw:
return True
return False
def vpn_autoconnect_view(self) -> VpnAutoconnectView:
try:
auto = self.client.vpn_autoloop_status()
except Exception as e:
return VpnAutoconnectView(
enabled=False,
unit_text=f"unit: ERROR ({e})",
color="red",
)
enabled = self._autoconnect_from_auto(auto)
unit_state = (
getattr(auto, "unit_state", "") # если backend так отдаёт
or (auto.status_word or "")
or "unknown"
)
text = f"unit: {unit_state}"
low = f"{unit_state} {(auto.raw_text or '')}".lower()
if any(x in low for x in ("failed", "error", "unknown", "inactive", "dead")):
color = "red"
elif "active" in low or "running" in low or "enabled" in low:
color = "green"
else:
color = "orange"
return VpnAutoconnectView(enabled=enabled, unit_text=text, color=color)
def vpn_autoconnect_enabled(self) -> bool:
"""Старый интерфейс — оставляем для кнопки toggle."""
return self.vpn_autoconnect_view().enabled
def vpn_set_autoconnect(self, enable: bool) -> VpnStatusView:
res = self.client.vpn_autoconnect(enable)
st = self.client.vpn_status()
pretty = self._pretty_cmd_then_status(res, st)
return VpnStatusView(
desired_location=st.desired_location,
pretty_text=pretty,
)
def vpn_set_location(self, iso: str) -> VpnStatusView:
self.client.vpn_set_location(iso)
st = self.client.vpn_status()
pretty = self._pretty_vpn_status(st)
return VpnStatusView(
desired_location=st.desired_location,
pretty_text=pretty,
)
def _pretty_vpn_status(self, st: VpnStatus) -> str:
lines = [
f"unit_state: {st.unit_state}",
f"desired_location: {st.desired_location or ''}",
f"status: {st.status_word}",
]
if st.raw_text:
lines.append("")
lines.append(st.raw_text.strip())
return "\n".join(lines).strip() + "\n"
# -------- Login Flow (interactive) --------
def login_flow_start(self) -> LoginFlowView:
s: LoginSessionStart = self.client.vpn_login_session_start()
dot = self._level_to_color(s.level)
if not s.ok:
txt = s.error or "Failed to start login session"
return LoginFlowView(
phase=s.phase or "failed",
level=s.level or "red",
dot_color="red",
status_text=txt,
url="",
email="",
alive=False,
cursor=0,
lines=[txt],
can_open=False,
can_check=False,
can_cancel=False,
)
if (s.phase or "").lower() == "already_logged":
txt = (
f"Already logged in as {s.email}"
if s.email
else "Already logged in"
)
return LoginFlowView(
phase="already_logged",
level="green",
dot_color="green",
status_text=txt,
url="",
email=s.email or "",
alive=False,
cursor=0,
lines=[txt],
can_open=False,
can_check=False,
can_cancel=False,
)
txt = f"Login started (pid={s.pid})" if s.pid else "Login started"
return LoginFlowView(
phase=s.phase or "starting",
level=s.level or "yellow",
dot_color=dot,
status_text=txt,
url="",
email="",
alive=True,
cursor=0,
lines=[],
can_open=True,
can_check=True,
can_cancel=True,
)
def login_flow_poll(self, since: int) -> LoginFlowView:
st: LoginSessionState = self.client.vpn_login_session_state(since=since)
dot = self._level_to_color(st.level)
phase = (st.phase or "").lower()
if phase == "waiting_browser":
status_txt = "Waiting for browser authorization…"
elif phase == "checking":
status_txt = "Checking…"
elif phase == "success":
status_txt = "✅ Logged in"
elif phase == "failed":
status_txt = "❌ Login failed"
elif phase == "cancelled":
status_txt = "Cancelled"
elif phase == "already_logged":
status_txt = (
f"Already logged in as {st.email}"
if st.email
else "Already logged in"
)
else:
status_txt = st.phase or ""
clean_lines = self._clean_login_lines(st.lines)
return LoginFlowView(
phase=st.phase,
level=st.level,
dot_color=dot,
status_text=status_txt,
url=st.url,
email=st.email,
alive=st.alive,
cursor=st.cursor,
can_open=st.can_open,
can_check=st.can_cancel,
can_cancel=st.can_cancel,
lines=clean_lines,
)
def login_flow_action(self, action: str) -> ActionView:
act = action.strip().lower()
if act not in ("open", "check", "cancel"):
raise ValueError(f"Invalid login action: {action}")
res: LoginSessionAction = self.client.vpn_login_session_action(
cast(LoginAction, act)
)
if not res.ok:
txt = res.error or "Login action failed"
return ActionView(ok=False, pretty_text=txt + "\n")
txt = f"OK: {act} → phase={res.phase} level={res.level}"
return ActionView(ok=True, pretty_text=txt + "\n")
def login_flow_stop(self) -> ActionView:
res = self.client.vpn_login_session_stop()
return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res))
def vpn_logout(self) -> ActionView:
res = self.client.vpn_logout()
return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res))
# Баннер "AdGuard VPN: logged in as ...", по клику показываем инфу как в CLI
def login_banner_cli_text(self) -> str:
try:
st: LoginState = self.client.get_login_state()
except Exception as e:
return f"Failed to query login state: {e}"
# backend может не иметь поля error, поэтому через getattr
err = getattr(st, "error", None) or getattr(st, "message", None)
if err:
return str(err)
if st.email:
return f"You are already logged in.\nCurrent user is {st.email}"
if st.state:
return f"Login state: {st.state}"
return "No login information available."
# -------- Routes --------
def routes_service_action(self, action: str) -> ActionView:
act = action.strip().lower()
if act not in ("start", "stop", "restart"):
raise ValueError(f"Invalid routes action: {action}")
res = self.client.routes_service(cast(ServiceAction, act))
return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res))
def routes_clear(self) -> ActionView:
res = self.client.routes_clear()
return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res))
def routes_cache_restore(self) -> ActionView:
res = self.client.routes_cache_restore()
return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res))
def routes_fix_policy_route(self) -> ActionView:
res = self.client.routes_fix_policy_route()
return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res))
def routes_timer_enabled(self) -> bool:
st = self.client.routes_timer_get()
return bool(st.enabled)
def routes_timer_set(self, enabled: bool) -> ActionView:
res = self.client.routes_timer_set(bool(enabled))
return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res))
def traffic_mode_view(self) -> TrafficModeView:
st: TrafficModeStatus = self.client.traffic_mode_get()
return TrafficModeView(
desired_mode=(st.desired_mode or st.mode or "selective"),
applied_mode=(st.applied_mode or "direct"),
preferred_iface=st.preferred_iface or "",
auto_local_bypass=bool(st.auto_local_bypass),
bypass_candidates=int(st.bypass_candidates),
force_vpn_subnets=list(st.force_vpn_subnets or []),
force_vpn_uids=list(st.force_vpn_uids or []),
force_vpn_cgroups=list(st.force_vpn_cgroups or []),
force_direct_subnets=list(st.force_direct_subnets or []),
force_direct_uids=list(st.force_direct_uids or []),
force_direct_cgroups=list(st.force_direct_cgroups or []),
overrides_applied=int(st.overrides_applied),
cgroup_resolved_uids=int(st.cgroup_resolved_uids),
cgroup_warning=st.cgroup_warning or "",
active_iface=st.active_iface or "",
iface_reason=st.iface_reason or "",
probe_ok=bool(st.probe_ok),
probe_message=st.probe_message or "",
healthy=bool(st.healthy),
message=st.message or "",
)
def traffic_mode_set(
self,
mode: str,
preferred_iface: Optional[str] = None,
auto_local_bypass: Optional[bool] = None,
force_vpn_subnets: Optional[List[str]] = None,
force_vpn_uids: Optional[List[str]] = None,
force_vpn_cgroups: Optional[List[str]] = None,
force_direct_subnets: Optional[List[str]] = None,
force_direct_uids: Optional[List[str]] = None,
force_direct_cgroups: Optional[List[str]] = None,
) -> TrafficModeView:
st: TrafficModeStatus = self.client.traffic_mode_set(
mode,
preferred_iface,
auto_local_bypass,
force_vpn_subnets,
force_vpn_uids,
force_vpn_cgroups,
force_direct_subnets,
force_direct_uids,
force_direct_cgroups,
)
return TrafficModeView(
desired_mode=(st.desired_mode or st.mode or mode),
applied_mode=(st.applied_mode or "direct"),
preferred_iface=st.preferred_iface or "",
auto_local_bypass=bool(st.auto_local_bypass),
bypass_candidates=int(st.bypass_candidates),
force_vpn_subnets=list(st.force_vpn_subnets or []),
force_vpn_uids=list(st.force_vpn_uids or []),
force_vpn_cgroups=list(st.force_vpn_cgroups or []),
force_direct_subnets=list(st.force_direct_subnets or []),
force_direct_uids=list(st.force_direct_uids or []),
force_direct_cgroups=list(st.force_direct_cgroups or []),
overrides_applied=int(st.overrides_applied),
cgroup_resolved_uids=int(st.cgroup_resolved_uids),
cgroup_warning=st.cgroup_warning or "",
active_iface=st.active_iface or "",
iface_reason=st.iface_reason or "",
probe_ok=bool(st.probe_ok),
probe_message=st.probe_message or "",
healthy=bool(st.healthy),
message=st.message or "",
)
def traffic_mode_test(self) -> TrafficModeView:
st: TrafficModeStatus = self.client.traffic_mode_test()
return TrafficModeView(
desired_mode=(st.desired_mode or st.mode or "selective"),
applied_mode=(st.applied_mode or "direct"),
preferred_iface=st.preferred_iface or "",
auto_local_bypass=bool(st.auto_local_bypass),
bypass_candidates=int(st.bypass_candidates),
force_vpn_subnets=list(st.force_vpn_subnets or []),
force_vpn_uids=list(st.force_vpn_uids or []),
force_vpn_cgroups=list(st.force_vpn_cgroups or []),
force_direct_subnets=list(st.force_direct_subnets or []),
force_direct_uids=list(st.force_direct_uids or []),
force_direct_cgroups=list(st.force_direct_cgroups or []),
overrides_applied=int(st.overrides_applied),
cgroup_resolved_uids=int(st.cgroup_resolved_uids),
cgroup_warning=st.cgroup_warning or "",
active_iface=st.active_iface or "",
iface_reason=st.iface_reason or "",
probe_ok=bool(st.probe_ok),
probe_message=st.probe_message or "",
healthy=bool(st.healthy),
message=st.message or "",
)
def traffic_interfaces(self) -> List[str]:
st: TrafficInterfaces = self.client.traffic_interfaces_get()
vals = [x for x in st.interfaces if x]
if st.preferred_iface and st.preferred_iface not in vals:
vals.insert(0, st.preferred_iface)
return vals
def traffic_candidates(self) -> TrafficCandidates:
return self.client.traffic_candidates_get()
def traffic_appmarks_status(self) -> TrafficAppMarksStatus:
return self.client.traffic_appmarks_status()
def traffic_appmarks_items(self) -> List[TrafficAppMarkItem]:
return self.client.traffic_appmarks_items()
def traffic_appmarks_apply(
self,
*,
op: str,
target: str,
cgroup: str = "",
unit: str = "",
command: str = "",
app_key: str = "",
timeout_sec: int = 0,
) -> TrafficAppMarksResult:
return self.client.traffic_appmarks_apply(
op=op,
target=target,
cgroup=cgroup,
unit=unit,
command=command,
app_key=app_key,
timeout_sec=timeout_sec,
)
def traffic_app_profiles_list(self) -> List[TrafficAppProfile]:
return self.client.traffic_app_profiles_list()
def traffic_app_profile_upsert(
self,
*,
id: str = "",
name: str = "",
app_key: str = "",
command: str,
target: str,
ttl_sec: int = 0,
vpn_profile: str = "",
) -> TrafficAppProfileSaveResult:
return self.client.traffic_app_profile_upsert(
id=id,
name=name,
app_key=app_key,
command=command,
target=target,
ttl_sec=ttl_sec,
vpn_profile=vpn_profile,
)
def traffic_app_profile_delete(self, id: str) -> CmdResult:
return self.client.traffic_app_profile_delete(id)
def traffic_audit(self) -> TrafficAudit:
return self.client.traffic_audit_get()
def routes_nft_progress_from_event(self, ev: Event) -> RoutesNftProgressView:
"""
Превращает Event(kind='routes_nft_progress') в удобную модель
для прогресс-бара/лейбла.
"""
payload = (
getattr(ev, "data", None)
or getattr(ev, "payload", None)
or getattr(ev, "extra", None)
or {}
)
if not isinstance(payload, dict):
payload = {}
try:
percent = int(payload.get("percent", 0))
except Exception:
percent = 0
msg = str(payload.get("message", "")) if payload is not None else ""
if not msg:
msg = "Updating nft set…"
active = 0 <= percent < 100
return RoutesNftProgressView(
percent=percent,
message=msg,
active=active,
)
# -------- DNS / SmartDNS --------
def dns_upstreams_view(self) -> DnsUpstreams:
return self.client.dns_upstreams_get()
def dns_upstreams_save(self, cfg: DnsUpstreams) -> None:
self.client.dns_upstreams_set(cfg)
def dns_status_view(self) -> DNSStatus:
return self.client.dns_status_get()
def dns_mode_set(self, via: bool, smartdns_addr: str) -> DNSStatus:
return self.client.dns_mode_set(via, smartdns_addr)
def smartdns_service_action(self, action: str) -> DNSStatus:
act = action.strip().lower()
if act not in ("start", "stop", "restart"):
raise ValueError(f"Invalid SmartDNS action: {action}")
return self.client.dns_smartdns_service_set(cast(ServiceAction, act))
def smartdns_prewarm(self, limit: int = 0, aggressive_subs: bool = False) -> ActionView:
res = self.client.smartdns_prewarm(limit=limit, aggressive_subs=aggressive_subs)
return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res))
def smartdns_runtime_view(self) -> SmartdnsRuntimeState:
return self.client.smartdns_runtime_get()
def smartdns_runtime_set(self, enabled: bool, restart: bool = True) -> SmartdnsRuntimeState:
return self.client.smartdns_runtime_set(enabled=enabled, restart=restart)
# -------- Domains --------
def domains_table_view(self) -> DomainsTable:
return self.client.domains_table()
def domains_file_load(self, name: str) -> DomainsFile:
nm = name.strip().lower()
if nm not in ("bases", "meta", "subs", "static", "smartdns", "last-ips-map", "last-ips-map-direct", "last-ips-map-wildcard"):
raise ValueError(f"Invalid domains file name: {name}")
return self.client.domains_file_get(
cast(Literal["bases", "meta", "subs", "static", "smartdns", "last-ips-map", "last-ips-map-direct", "last-ips-map-wildcard"], nm)
)
def domains_file_save(self, name: str, content: str) -> None:
nm = name.strip().lower()
if nm not in ("bases", "meta", "subs", "static", "smartdns", "last-ips-map", "last-ips-map-direct", "last-ips-map-wildcard"):
raise ValueError(f"Invalid domains file name: {name}")
self.client.domains_file_set(
cast(Literal["bases", "meta", "subs", "static", "smartdns", "last-ips-map", "last-ips-map-direct", "last-ips-map-wildcard"], nm), content
)
# -------- Trace --------
def trace_view(self, mode: TraceMode = "full") -> TraceDump:
return self.client.trace_get(mode)
# -------- formatting helpers --------
def _pretty_cmd(self, res: CmdResult) -> str:
lines: List[str] = []
lines.append("OK" if res.ok else "ERROR")
if res.message:
lines.append(res.message.strip())
if res.exit_code is not None:
lines.append(f"exit_code: {res.exit_code}")
if res.stdout.strip():
lines.append("")
lines.append("stdout:")
lines.append(res.stdout.rstrip())
if res.stderr.strip() and res.stderr.strip() != res.stdout.strip():
lines.append("")
lines.append("stderr:")
lines.append(res.stderr.rstrip())
return "\n".join(lines).strip() + "\n"
def _pretty_cmd_then_status(self, res: CmdResult, st: VpnStatus) -> str:
return (
self._pretty_cmd(res).rstrip()
+ "\n\n"
+ self._pretty_vpn_status(st).rstrip()
+ "\n"
)
def _clean_login_lines(self, lines: Iterable[str]) -> List[str]:
out: List[str] = []
for raw in lines or []:
if raw is None:
continue
s = str(raw).replace("\r", "\n")
for part in s.splitlines():
t = part.strip()
if not t:
continue
# вырезаем спам "Next check in ..."
t2 = _NEXT_CHECK_RE.sub("", t).strip()
if not t2:
continue
# на всякий — повторно
t2 = _NEXT_CHECK_RE.sub("", t2).strip()
if not t2:
continue
out.append(t2)
return out