Files
elmprodvpn/selective-vpn-gui/api_client.py

1598 lines
58 KiB
Python

#!/usr/bin/env python3
"""Selective-VPN API client (UI-agnostic).
Design goals:
- The dashboard (GUI) must NOT know any URLs, HTTP methods, JSON keys, or payload shapes.
- All REST details live here.
- Returned values are normalized into dataclasses for clean UI usage.
Env:
- SELECTIVE_VPN_API (default: http://127.0.0.1:8080)
This file is meant to be imported by a controller (dashboard_controller.py) and UI.
"""
from __future__ import annotations
from dataclasses import dataclass
import json
import os
import re
import time
from typing import Any, Callable, Dict, Iterator, List, Literal, Optional, cast
import requests
# ---------------------------
# Small utilities
# ---------------------------
_ANSI_RE = re.compile(r"\x1B\[[0-9;]*[A-Za-z]")
def strip_ansi(s: str) -> str:
"""Remove ANSI escape sequences."""
if not s:
return ""
return _ANSI_RE.sub("", s)
# ---------------------------
# Models (UI-friendly)
# ---------------------------
@dataclass(frozen=True)
class Status:
timestamp: str
ip_count: int
domain_count: int
iface: str
table: str
mark: str
# NOTE: backend uses omitempty for these, so they may be absent.
policy_route_ok: Optional[bool]
route_ok: Optional[bool]
@dataclass(frozen=True)
class CmdResult:
ok: bool
message: str
exit_code: Optional[int] = None
stdout: str = ""
stderr: str = ""
@dataclass(frozen=True)
class LoginState:
state: str
email: str
msg: str
# backend may also provide UI-ready fields
text: str
color: str
@dataclass(frozen=True)
class UnitState:
state: str
@dataclass(frozen=True)
class RoutesTimerState:
enabled: bool
@dataclass(frozen=True)
class TrafficModeStatus:
mode: str
desired_mode: str
applied_mode: str
preferred_iface: str
advanced_active: bool
auto_local_bypass: bool
auto_local_active: bool
ingress_reply_bypass: bool
ingress_reply_active: bool
bypass_candidates: int
force_vpn_subnets: List[str]
force_vpn_uids: List[str]
force_vpn_cgroups: List[str]
force_direct_subnets: List[str]
force_direct_uids: List[str]
force_direct_cgroups: List[str]
overrides_applied: int
cgroup_resolved_uids: int
cgroup_warning: str
active_iface: str
iface_reason: str
rule_mark: bool
rule_full: bool
ingress_rule_present: bool
ingress_nft_active: bool
table_default: bool
probe_ok: bool
probe_message: str
healthy: bool
message: str
@dataclass(frozen=True)
class TrafficInterfaces:
interfaces: List[str]
preferred_iface: str
active_iface: str
iface_reason: str
@dataclass(frozen=True)
class TrafficAppMarksStatus:
vpn_count: int
direct_count: int
message: str
@dataclass(frozen=True)
class TrafficAppMarksResult:
ok: bool
message: str
op: str = ""
target: str = ""
cgroup: str = ""
cgroup_id: int = 0
timeout_sec: int = 0
@dataclass(frozen=True)
class TrafficAppMarkItem:
id: int
target: str # vpn|direct
cgroup: str
cgroup_rel: str
level: int
unit: str
command: str
app_key: str
added_at: str
expires_at: str
remaining_sec: int
@dataclass(frozen=True)
class TrafficAppProfile:
id: str
name: str
app_key: str
command: str
target: str # vpn|direct
ttl_sec: int
vpn_profile: str
created_at: str
updated_at: str
@dataclass(frozen=True)
class TrafficAppProfileSaveResult:
ok: bool
message: str
profile: Optional[TrafficAppProfile] = None
@dataclass(frozen=True)
class TrafficAudit:
ok: bool
message: str
now: str
pretty: str
issues: List[str]
@dataclass(frozen=True)
class TrafficCandidateSubnet:
cidr: str
dev: str
kind: str
linkdown: bool
@dataclass(frozen=True)
class TrafficCandidateUnit:
unit: str
description: str
cgroup: str
@dataclass(frozen=True)
class TrafficCandidateUID:
uid: int
user: str
examples: List[str]
@dataclass(frozen=True)
class TrafficCandidates:
generated_at: str
subnets: List[TrafficCandidateSubnet]
units: List[TrafficCandidateUnit]
uids: List[TrafficCandidateUID]
@dataclass(frozen=True)
class DnsUpstreams:
default1: str
default2: str
meta1: str
meta2: str
@dataclass(frozen=True)
class DNSBenchmarkUpstream:
addr: str
enabled: bool = True
@dataclass(frozen=True)
class DNSBenchmarkResult:
upstream: str
attempts: int
ok: int
fail: int
nxdomain: int
timeout: int
temporary: int
other: int
avg_ms: int
p95_ms: int
score: float
color: str
@dataclass(frozen=True)
class DNSBenchmarkResponse:
results: List[DNSBenchmarkResult]
domains_used: List[str]
timeout_ms: int
attempts_per_domain: int
recommended_default: List[str]
recommended_meta: List[str]
@dataclass(frozen=True)
class DNSUpstreamPoolState:
items: List[DNSBenchmarkUpstream]
@dataclass(frozen=True)
class SmartdnsServiceState:
state: str
@dataclass(frozen=True)
class DNSStatus:
via_smartdns: bool
smartdns_addr: str
mode: str
unit_state: str
runtime_nftset: bool
wildcard_source: str
runtime_config_path: str
runtime_config_error: str
@dataclass(frozen=True)
class SmartdnsRuntimeState:
enabled: bool
applied_enabled: bool
wildcard_source: str
unit_state: str
config_path: str
changed: bool = False
restarted: bool = False
message: str = ""
@dataclass(frozen=True)
class DomainsTable:
lines: List[str]
@dataclass(frozen=True)
class DomainsFile:
name: str
content: str
source: str = ""
@dataclass(frozen=True)
class VpnAutoloopStatus:
raw_text: str
status_word: str
@dataclass(frozen=True)
class VpnStatus:
desired_location: str
status_word: str
raw_text: str
unit_state: str
@dataclass(frozen=True)
class VpnLocation:
label: str
iso: str
@dataclass(frozen=True)
class TraceDump:
lines: List[str]
@dataclass(frozen=True)
class Event:
id: int
kind: str
ts: str
data: Any
# ---------------------------
# AdGuard VPN interactive login-session (PTY)
# ---------------------------
@dataclass(frozen=True)
class LoginSessionStart:
ok: bool
phase: str
level: str
pid: Optional[int] = None
email: str = ""
error: str = ""
@dataclass(frozen=True)
class LoginSessionState:
ok: bool
phase: str
level: str
alive: bool
url: str
email: str
cursor: int
lines: List[str]
can_open: bool
can_check: bool
can_cancel: bool
@dataclass(frozen=True)
class LoginSessionAction:
ok: bool
phase: str = ""
level: str = ""
error: str = ""
# ---------------------------
# Errors
# ---------------------------
@dataclass(frozen=True)
class ApiError(Exception):
"""Raised when API call fails (network or non-2xx)."""
message: str
method: str
url: str
status_code: Optional[int] = None
response_text: str = ""
def __str__(self) -> str:
code = f" ({self.status_code})" if self.status_code is not None else ""
tail = f": {self.response_text}" if self.response_text else ""
return f"{self.message}{code} [{self.method} {self.url}]{tail}"
# ---------------------------
# Client
# ---------------------------
TraceMode = Literal["full", "gui", "smartdns"]
ServiceAction = Literal["start", "stop", "restart"]
class ApiClient:
"""Domain API client.
Public methods here are the ONLY surface the dashboard/controller should use.
"""
def __init__(
self,
base_url: str,
*,
timeout: float = 5.0,
session: Optional[requests.Session] = None,
) -> None:
self.base_url = base_url.rstrip("/")
self.timeout = float(timeout)
self._s = session or requests.Session()
@classmethod
def from_env(
cls,
env_var: str = "SELECTIVE_VPN_API",
default: str = "http://127.0.0.1:8080",
*,
timeout: float = 5.0,
) -> "ApiClient":
base = os.environ.get(env_var, default).rstrip("/")
return cls(base, timeout=timeout)
# ---- low-level internals (private) ----
def _url(self, path: str) -> str:
if not path.startswith("/"):
path = "/" + path
return self.base_url + path
def _request(
self,
method: str,
path: str,
*,
params: Optional[Dict[str, Any]] = None,
json_body: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
accept_json: bool = True,
) -> requests.Response:
url = self._url(path)
headers: Dict[str, str] = {}
if accept_json:
headers["Accept"] = "application/json"
try:
resp = self._s.request(
method=method.upper(),
url=url,
params=params,
json=json_body,
timeout=self.timeout if timeout is None else float(timeout),
headers=headers,
)
except requests.RequestException as e:
raise ApiError("API request failed", method.upper(), url, None, str(e)) from e
if not (200 <= resp.status_code < 300):
txt = resp.text.strip()
raise ApiError("API returned error", method.upper(), url, resp.status_code, txt)
return resp
def _json(self, resp: requests.Response) -> Any:
if not resp.content:
return None
try:
return resp.json()
except ValueError:
# Backend should be JSON, but keep safe fallback.
return {"raw": resp.text}
# ---- event stream (SSE) ----
def events_stream(self, since: int = 0, stop: Optional[Callable[[], bool]] = None) -> Iterator[Event]:
"""
Iterate over server-sent events. Reconnects automatically on errors.
Args:
since: last seen event id (inclusive). Server will replay newer ones.
stop: optional callable returning True to stop streaming.
"""
last = max(0, int(since))
backoff = 1.0
while True:
if stop and stop():
return
try:
for ev in self._sse_once(last, stop):
if stop and stop():
return
last = ev.id if ev.id else last
yield ev
# normal end → reconnect
backoff = 1.0
except ApiError:
# bubble up API errors; caller decides
raise
except Exception:
# transient error, retry with backoff
time.sleep(backoff)
backoff = min(backoff * 2, 10.0)
def _sse_once(self, since: int, stop: Optional[Callable[[], bool]]) -> Iterator[Event]:
headers = {
"Accept": "text/event-stream",
"Cache-Control": "no-cache",
}
params = {}
if since > 0:
params["since"] = str(since)
url = self._url("/api/v1/events/stream")
# SSE соединение живёт долго: backend шлёт heartbeat каждые 15s,
# поэтому ставим более длинный read-timeout, иначе стандартные 5s
# приводят к ложным ошибокам чтения.
read_timeout = max(self.timeout * 3, 60.0)
try:
resp = self._s.request(
method="GET",
url=url,
headers=headers,
params=params,
stream=True,
timeout=(self.timeout, read_timeout),
)
except requests.RequestException as e:
raise ApiError("API request failed", "GET", url, None, str(e)) from e
if not (200 <= resp.status_code < 300):
txt = resp.text.strip()
raise ApiError("API returned error", "GET", url, resp.status_code, txt)
ev_id: Optional[int] = None
ev_kind: str = ""
data_lines: List[str] = []
for raw in resp.iter_lines(decode_unicode=True):
if stop and stop():
resp.close()
return
if raw is None:
continue
line = raw.strip("\r")
if line == "":
if data_lines or ev_kind or ev_id is not None:
ev = self._make_event(ev_id, ev_kind, data_lines)
if ev:
yield ev
ev_id = None
ev_kind = ""
data_lines = []
continue
if line.startswith(":"):
# heartbeat/comment
continue
if line.startswith("id:"):
try:
ev_id = int(line[3:].strip())
except ValueError:
ev_id = None
continue
if line.startswith("event:"):
ev_kind = line[6:].strip()
continue
if line.startswith("data:"):
data_lines.append(line[5:].lstrip())
continue
# unknown field → ignore
def _make_event(self, ev_id: Optional[int], ev_kind: str, data_lines: List[str]) -> Optional[Event]:
payload: Any = None
if data_lines:
data_str = "\n".join(data_lines)
try:
payload = json.loads(data_str)
except Exception:
payload = data_str
if isinstance(payload, dict):
id_val = ev_id
if id_val is None:
try:
id_val = int(payload.get("id", 0))
except Exception:
id_val = 0
kind_val = ev_kind or str(payload.get("kind") or "")
ts_val = str(payload.get("ts") or "")
data_val = payload.get("data", payload)
return Event(id=id_val, kind=kind_val, ts=ts_val, data=data_val)
return Event(id=ev_id or 0, kind=ev_kind, ts="", data=payload)
# ---- domain methods ----
# Status / system
def get_status(self) -> Status:
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/status")) or {})
return Status(
timestamp=str(data.get("timestamp") or ""),
ip_count=int(data.get("ip_count") or 0),
domain_count=int(data.get("domain_count") or 0),
iface=str(data.get("iface") or ""),
table=str(data.get("table") or ""),
mark=str(data.get("mark") or ""),
policy_route_ok=cast(Optional[bool], data.get("policy_route_ok", None)),
route_ok=cast(Optional[bool], data.get("route_ok", None)),
)
def systemd_state(self, unit: str) -> UnitState:
data = cast(
Dict[str, Any],
self._json(
self._request("GET", "/api/v1/systemd/state", params={"unit": unit}, timeout=2.0)
)
or {},
)
st = str(data.get("state") or "unknown").strip() or "unknown"
return UnitState(state=st)
def get_login_state(self) -> LoginState:
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/vpn/login-state", timeout=2.0)) or {})
# Normalize and strip ANSI
state = str(data.get("state") or "unknown").strip()
email = strip_ansi(str(data.get("email") or "").strip())
msg = strip_ansi(str(data.get("msg") or "").strip())
text = strip_ansi(str(data.get("text") or "").strip())
color = str(data.get("color") or "").strip()
return LoginState(
state=state,
email=email,
msg=msg,
text=text,
color=color,
)
# Routes
def routes_service(self, action: ServiceAction) -> CmdResult:
action_l = action.lower()
if action_l not in ("start", "stop", "restart"):
raise ValueError(f"Invalid action: {action}")
url = self._url("/api/v1/routes/service")
payload = {"action": action_l}
try:
# короткий read-timeout: если systemctl висит минутами, отваливаемся,
# но сервер всё равно продолжит выполнение (runCommand не привязан к r.Context()).
resp = self._s.post(url, json=payload, timeout=(self.timeout, 2.0))
except requests.Timeout:
return CmdResult(
ok=True,
message=f"{action_l} accepted; backend is still running systemctl",
exit_code=None,
)
except requests.RequestException as e:
raise ApiError("API request failed", "POST", url, None, str(e)) from e
if not (200 <= resp.status_code < 300):
txt = resp.text.strip()
raise ApiError("API returned error", "POST", url, resp.status_code, txt)
data = cast(Dict[str, Any], self._json(resp) or {})
return self._parse_cmd_result(data)
def routes_clear(self) -> CmdResult:
data = cast(Dict[str, Any], self._json(self._request("POST", "/api/v1/routes/clear")) or {})
return self._parse_cmd_result(data)
def routes_cache_restore(self) -> CmdResult:
data = cast(
Dict[str, Any],
self._json(self._request("POST", "/api/v1/routes/cache/restore")) or {},
)
return self._parse_cmd_result(data)
def routes_fix_policy_route(self) -> CmdResult:
data = cast(Dict[str, Any], self._json(self._request("POST", "/api/v1/routes/fix-policy-route")) or {})
return self._parse_cmd_result(data)
def routes_timer_get(self) -> RoutesTimerState:
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/routes/timer")) or {})
return RoutesTimerState(enabled=bool(data.get("enabled", False)))
def routes_timer_set(self, enabled: bool) -> CmdResult:
data = cast(Dict[str, Any], self._json(self._request("POST", "/api/v1/routes/timer", json_body={"enabled": bool(enabled)})) or {})
return self._parse_cmd_result(data)
def traffic_mode_get(self) -> TrafficModeStatus:
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/traffic/mode")) or {})
return TrafficModeStatus(
mode=str(data.get("mode") or "selective"),
desired_mode=str(data.get("desired_mode") or data.get("mode") or "selective"),
applied_mode=str(data.get("applied_mode") or "direct"),
preferred_iface=str(data.get("preferred_iface") or ""),
advanced_active=bool(data.get("advanced_active", False)),
auto_local_bypass=bool(data.get("auto_local_bypass", True)),
auto_local_active=bool(data.get("auto_local_active", False)),
ingress_reply_bypass=bool(data.get("ingress_reply_bypass", False)),
ingress_reply_active=bool(data.get("ingress_reply_active", False)),
bypass_candidates=int(data.get("bypass_candidates", 0) or 0),
force_vpn_subnets=[str(x) for x in (data.get("force_vpn_subnets") or []) if str(x).strip()],
force_vpn_uids=[str(x) for x in (data.get("force_vpn_uids") or []) if str(x).strip()],
force_vpn_cgroups=[str(x) for x in (data.get("force_vpn_cgroups") or []) if str(x).strip()],
force_direct_subnets=[str(x) for x in (data.get("force_direct_subnets") or []) if str(x).strip()],
force_direct_uids=[str(x) for x in (data.get("force_direct_uids") or []) if str(x).strip()],
force_direct_cgroups=[str(x) for x in (data.get("force_direct_cgroups") or []) if str(x).strip()],
overrides_applied=int(data.get("overrides_applied", 0) or 0),
cgroup_resolved_uids=int(data.get("cgroup_resolved_uids", 0) or 0),
cgroup_warning=str(data.get("cgroup_warning") or ""),
active_iface=str(data.get("active_iface") or ""),
iface_reason=str(data.get("iface_reason") or ""),
rule_mark=bool(data.get("rule_mark", False)),
rule_full=bool(data.get("rule_full", False)),
ingress_rule_present=bool(data.get("ingress_rule_present", False)),
ingress_nft_active=bool(data.get("ingress_nft_active", False)),
table_default=bool(data.get("table_default", False)),
probe_ok=bool(data.get("probe_ok", False)),
probe_message=str(data.get("probe_message") or ""),
healthy=bool(data.get("healthy", False)),
message=str(data.get("message") or ""),
)
def traffic_mode_set(
self,
mode: str,
preferred_iface: Optional[str] = None,
auto_local_bypass: Optional[bool] = None,
ingress_reply_bypass: Optional[bool] = None,
force_vpn_subnets: Optional[List[str]] = None,
force_vpn_uids: Optional[List[str]] = None,
force_vpn_cgroups: Optional[List[str]] = None,
force_direct_subnets: Optional[List[str]] = None,
force_direct_uids: Optional[List[str]] = None,
force_direct_cgroups: Optional[List[str]] = None,
) -> TrafficModeStatus:
m = str(mode or "").strip().lower()
if m not in ("selective", "full_tunnel", "direct"):
raise ValueError(f"Invalid traffic mode: {mode}")
payload: Dict[str, Any] = {"mode": m}
if preferred_iface is not None:
payload["preferred_iface"] = str(preferred_iface).strip()
if auto_local_bypass is not None:
payload["auto_local_bypass"] = bool(auto_local_bypass)
if ingress_reply_bypass is not None:
payload["ingress_reply_bypass"] = bool(ingress_reply_bypass)
if force_vpn_subnets is not None:
payload["force_vpn_subnets"] = [str(x) for x in force_vpn_subnets]
if force_vpn_uids is not None:
payload["force_vpn_uids"] = [str(x) for x in force_vpn_uids]
if force_vpn_cgroups is not None:
payload["force_vpn_cgroups"] = [str(x) for x in force_vpn_cgroups]
if force_direct_subnets is not None:
payload["force_direct_subnets"] = [str(x) for x in force_direct_subnets]
if force_direct_uids is not None:
payload["force_direct_uids"] = [str(x) for x in force_direct_uids]
if force_direct_cgroups is not None:
payload["force_direct_cgroups"] = [str(x) for x in force_direct_cgroups]
data = cast(
Dict[str, Any],
self._json(
self._request(
"POST",
"/api/v1/traffic/mode",
json_body=payload,
)
)
or {},
)
return TrafficModeStatus(
mode=str(data.get("mode") or m),
desired_mode=str(data.get("desired_mode") or data.get("mode") or m),
applied_mode=str(data.get("applied_mode") or "direct"),
preferred_iface=str(data.get("preferred_iface") or ""),
advanced_active=bool(data.get("advanced_active", False)),
auto_local_bypass=bool(data.get("auto_local_bypass", True)),
auto_local_active=bool(data.get("auto_local_active", False)),
ingress_reply_bypass=bool(data.get("ingress_reply_bypass", False)),
ingress_reply_active=bool(data.get("ingress_reply_active", False)),
bypass_candidates=int(data.get("bypass_candidates", 0) or 0),
force_vpn_subnets=[str(x) for x in (data.get("force_vpn_subnets") or []) if str(x).strip()],
force_vpn_uids=[str(x) for x in (data.get("force_vpn_uids") or []) if str(x).strip()],
force_vpn_cgroups=[str(x) for x in (data.get("force_vpn_cgroups") or []) if str(x).strip()],
force_direct_subnets=[str(x) for x in (data.get("force_direct_subnets") or []) if str(x).strip()],
force_direct_uids=[str(x) for x in (data.get("force_direct_uids") or []) if str(x).strip()],
force_direct_cgroups=[str(x) for x in (data.get("force_direct_cgroups") or []) if str(x).strip()],
overrides_applied=int(data.get("overrides_applied", 0) or 0),
cgroup_resolved_uids=int(data.get("cgroup_resolved_uids", 0) or 0),
cgroup_warning=str(data.get("cgroup_warning") or ""),
active_iface=str(data.get("active_iface") or ""),
iface_reason=str(data.get("iface_reason") or ""),
rule_mark=bool(data.get("rule_mark", False)),
rule_full=bool(data.get("rule_full", False)),
ingress_rule_present=bool(data.get("ingress_rule_present", False)),
ingress_nft_active=bool(data.get("ingress_nft_active", False)),
table_default=bool(data.get("table_default", False)),
probe_ok=bool(data.get("probe_ok", False)),
probe_message=str(data.get("probe_message") or ""),
healthy=bool(data.get("healthy", False)),
message=str(data.get("message") or ""),
)
def traffic_mode_test(self) -> TrafficModeStatus:
data = cast(
Dict[str, Any],
self._json(self._request("GET", "/api/v1/traffic/mode/test")) or {},
)
return TrafficModeStatus(
mode=str(data.get("mode") or "selective"),
desired_mode=str(data.get("desired_mode") or data.get("mode") or "selective"),
applied_mode=str(data.get("applied_mode") or "direct"),
preferred_iface=str(data.get("preferred_iface") or ""),
advanced_active=bool(data.get("advanced_active", False)),
auto_local_bypass=bool(data.get("auto_local_bypass", True)),
auto_local_active=bool(data.get("auto_local_active", False)),
ingress_reply_bypass=bool(data.get("ingress_reply_bypass", False)),
ingress_reply_active=bool(data.get("ingress_reply_active", False)),
bypass_candidates=int(data.get("bypass_candidates", 0) or 0),
force_vpn_subnets=[str(x) for x in (data.get("force_vpn_subnets") or []) if str(x).strip()],
force_vpn_uids=[str(x) for x in (data.get("force_vpn_uids") or []) if str(x).strip()],
force_vpn_cgroups=[str(x) for x in (data.get("force_vpn_cgroups") or []) if str(x).strip()],
force_direct_subnets=[str(x) for x in (data.get("force_direct_subnets") or []) if str(x).strip()],
force_direct_uids=[str(x) for x in (data.get("force_direct_uids") or []) if str(x).strip()],
force_direct_cgroups=[str(x) for x in (data.get("force_direct_cgroups") or []) if str(x).strip()],
overrides_applied=int(data.get("overrides_applied", 0) or 0),
cgroup_resolved_uids=int(data.get("cgroup_resolved_uids", 0) or 0),
cgroup_warning=str(data.get("cgroup_warning") or ""),
active_iface=str(data.get("active_iface") or ""),
iface_reason=str(data.get("iface_reason") or ""),
rule_mark=bool(data.get("rule_mark", False)),
rule_full=bool(data.get("rule_full", False)),
ingress_rule_present=bool(data.get("ingress_rule_present", False)),
ingress_nft_active=bool(data.get("ingress_nft_active", False)),
table_default=bool(data.get("table_default", False)),
probe_ok=bool(data.get("probe_ok", False)),
probe_message=str(data.get("probe_message") or ""),
healthy=bool(data.get("healthy", False)),
message=str(data.get("message") or ""),
)
def traffic_advanced_reset(self) -> TrafficModeStatus:
data = cast(
Dict[str, Any],
self._json(self._request("POST", "/api/v1/traffic/advanced/reset")) or {},
)
return TrafficModeStatus(
mode=str(data.get("mode") or "selective"),
desired_mode=str(data.get("desired_mode") or data.get("mode") or "selective"),
applied_mode=str(data.get("applied_mode") or "direct"),
preferred_iface=str(data.get("preferred_iface") or ""),
advanced_active=bool(data.get("advanced_active", False)),
auto_local_bypass=bool(data.get("auto_local_bypass", True)),
auto_local_active=bool(data.get("auto_local_active", False)),
ingress_reply_bypass=bool(data.get("ingress_reply_bypass", False)),
ingress_reply_active=bool(data.get("ingress_reply_active", False)),
bypass_candidates=int(data.get("bypass_candidates", 0) or 0),
force_vpn_subnets=[str(x) for x in (data.get("force_vpn_subnets") or []) if str(x).strip()],
force_vpn_uids=[str(x) for x in (data.get("force_vpn_uids") or []) if str(x).strip()],
force_vpn_cgroups=[str(x) for x in (data.get("force_vpn_cgroups") or []) if str(x).strip()],
force_direct_subnets=[str(x) for x in (data.get("force_direct_subnets") or []) if str(x).strip()],
force_direct_uids=[str(x) for x in (data.get("force_direct_uids") or []) if str(x).strip()],
force_direct_cgroups=[str(x) for x in (data.get("force_direct_cgroups") or []) if str(x).strip()],
overrides_applied=int(data.get("overrides_applied", 0) or 0),
cgroup_resolved_uids=int(data.get("cgroup_resolved_uids", 0) or 0),
cgroup_warning=str(data.get("cgroup_warning") or ""),
active_iface=str(data.get("active_iface") or ""),
iface_reason=str(data.get("iface_reason") or ""),
rule_mark=bool(data.get("rule_mark", False)),
rule_full=bool(data.get("rule_full", False)),
ingress_rule_present=bool(data.get("ingress_rule_present", False)),
ingress_nft_active=bool(data.get("ingress_nft_active", False)),
table_default=bool(data.get("table_default", False)),
probe_ok=bool(data.get("probe_ok", False)),
probe_message=str(data.get("probe_message") or ""),
healthy=bool(data.get("healthy", False)),
message=str(data.get("message") or ""),
)
def traffic_interfaces_get(self) -> TrafficInterfaces:
data = cast(
Dict[str, Any],
self._json(self._request("GET", "/api/v1/traffic/interfaces")) or {},
)
raw = data.get("interfaces") or []
if not isinstance(raw, list):
raw = []
return TrafficInterfaces(
interfaces=[str(x) for x in raw if str(x).strip()],
preferred_iface=str(data.get("preferred_iface") or ""),
active_iface=str(data.get("active_iface") or ""),
iface_reason=str(data.get("iface_reason") or ""),
)
def traffic_candidates_get(self) -> TrafficCandidates:
data = cast(
Dict[str, Any],
self._json(self._request("GET", "/api/v1/traffic/candidates")) or {},
)
subnets: List[TrafficCandidateSubnet] = []
for it in (data.get("subnets") or []):
if not isinstance(it, dict):
continue
cidr = str(it.get("cidr") or "").strip()
if not cidr:
continue
subnets.append(
TrafficCandidateSubnet(
cidr=cidr,
dev=str(it.get("dev") or "").strip(),
kind=str(it.get("kind") or "").strip(),
linkdown=bool(it.get("linkdown", False)),
)
)
units: List[TrafficCandidateUnit] = []
for it in (data.get("units") or []):
if not isinstance(it, dict):
continue
unit = str(it.get("unit") or "").strip()
if not unit:
continue
units.append(
TrafficCandidateUnit(
unit=unit,
description=str(it.get("description") or "").strip(),
cgroup=str(it.get("cgroup") or "").strip(),
)
)
uids: List[TrafficCandidateUID] = []
for it in (data.get("uids") or []):
if not isinstance(it, dict):
continue
try:
uid = int(it.get("uid", 0) or 0)
except Exception:
continue
user = str(it.get("user") or "").strip()
raw_ex = it.get("examples") or []
if not isinstance(raw_ex, list):
raw_ex = []
examples = [str(x) for x in raw_ex if str(x).strip()]
uids.append(TrafficCandidateUID(uid=uid, user=user, examples=examples))
return TrafficCandidates(
generated_at=str(data.get("generated_at") or ""),
subnets=subnets,
units=units,
uids=uids,
)
def traffic_appmarks_status(self) -> TrafficAppMarksStatus:
data = cast(
Dict[str, Any],
self._json(self._request("GET", "/api/v1/traffic/appmarks")) or {},
)
return TrafficAppMarksStatus(
vpn_count=int(data.get("vpn_count", 0) or 0),
direct_count=int(data.get("direct_count", 0) or 0),
message=str(data.get("message") or ""),
)
def traffic_appmarks_items(self) -> List[TrafficAppMarkItem]:
data = cast(
Dict[str, Any],
self._json(self._request("GET", "/api/v1/traffic/appmarks/items")) or {},
)
raw = data.get("items") or []
if not isinstance(raw, list):
raw = []
out: List[TrafficAppMarkItem] = []
for it in raw:
if not isinstance(it, dict):
continue
try:
mid = int(it.get("id", 0) or 0)
except Exception:
mid = 0
tgt = str(it.get("target") or "").strip().lower()
if mid <= 0 or tgt not in ("vpn", "direct"):
continue
out.append(
TrafficAppMarkItem(
id=mid,
target=tgt,
cgroup=str(it.get("cgroup") or "").strip(),
cgroup_rel=str(it.get("cgroup_rel") or "").strip(),
level=int(it.get("level", 0) or 0),
unit=str(it.get("unit") or "").strip(),
command=str(it.get("command") or "").strip(),
app_key=str(it.get("app_key") or "").strip(),
added_at=str(it.get("added_at") or "").strip(),
expires_at=str(it.get("expires_at") or "").strip(),
remaining_sec=int(it.get("remaining_sec", 0) or 0),
)
)
return out
def traffic_appmarks_apply(
self,
*,
op: str,
target: str,
cgroup: str = "",
unit: str = "",
command: str = "",
app_key: str = "",
timeout_sec: int = 0,
) -> TrafficAppMarksResult:
payload: Dict[str, Any] = {
"op": str(op or "").strip().lower(),
"target": str(target or "").strip().lower(),
}
if cgroup:
payload["cgroup"] = str(cgroup).strip()
if unit:
payload["unit"] = str(unit).strip()
if command:
payload["command"] = str(command).strip()
if app_key:
payload["app_key"] = str(app_key).strip()
if int(timeout_sec or 0) > 0:
payload["timeout_sec"] = int(timeout_sec)
data = cast(
Dict[str, Any],
self._json(self._request("POST", "/api/v1/traffic/appmarks", json_body=payload))
or {},
)
return TrafficAppMarksResult(
ok=bool(data.get("ok", False)),
message=str(data.get("message") or ""),
op=str(data.get("op") or payload["op"]),
target=str(data.get("target") or payload["target"]),
cgroup=str(data.get("cgroup") or payload.get("cgroup") or ""),
cgroup_id=int(data.get("cgroup_id", 0) or 0),
timeout_sec=int(data.get("timeout_sec", 0) or 0),
)
def traffic_app_profiles_list(self) -> List[TrafficAppProfile]:
data = cast(
Dict[str, Any],
self._json(self._request("GET", "/api/v1/traffic/app-profiles")) or {},
)
raw = data.get("profiles") or []
if not isinstance(raw, list):
raw = []
out: List[TrafficAppProfile] = []
for it in raw:
if not isinstance(it, dict):
continue
pid = str(it.get("id") or "").strip()
if not pid:
continue
out.append(
TrafficAppProfile(
id=pid,
name=str(it.get("name") or "").strip(),
app_key=str(it.get("app_key") or "").strip(),
command=str(it.get("command") or "").strip(),
target=str(it.get("target") or "").strip().lower(),
ttl_sec=int(it.get("ttl_sec", 0) or 0),
vpn_profile=str(it.get("vpn_profile") or "").strip(),
created_at=str(it.get("created_at") or "").strip(),
updated_at=str(it.get("updated_at") or "").strip(),
)
)
return out
def traffic_app_profile_upsert(
self,
*,
id: str = "",
name: str = "",
app_key: str = "",
command: str,
target: str,
ttl_sec: int = 0,
vpn_profile: str = "",
) -> TrafficAppProfileSaveResult:
payload: Dict[str, Any] = {
"command": str(command or "").strip(),
"target": str(target or "").strip().lower(),
}
if id:
payload["id"] = str(id).strip()
if name:
payload["name"] = str(name).strip()
if app_key:
payload["app_key"] = str(app_key).strip()
if int(ttl_sec or 0) > 0:
payload["ttl_sec"] = int(ttl_sec)
if vpn_profile:
payload["vpn_profile"] = str(vpn_profile).strip()
data = cast(
Dict[str, Any],
self._json(
self._request("POST", "/api/v1/traffic/app-profiles", json_body=payload)
)
or {},
)
msg = str(data.get("message") or "")
raw = data.get("profiles") or []
if not isinstance(raw, list):
raw = []
prof: Optional[TrafficAppProfile] = None
if raw and isinstance(raw[0], dict):
it = cast(Dict[str, Any], raw[0])
pid = str(it.get("id") or "").strip()
if pid:
prof = TrafficAppProfile(
id=pid,
name=str(it.get("name") or "").strip(),
app_key=str(it.get("app_key") or "").strip(),
command=str(it.get("command") or "").strip(),
target=str(it.get("target") or "").strip().lower(),
ttl_sec=int(it.get("ttl_sec", 0) or 0),
vpn_profile=str(it.get("vpn_profile") or "").strip(),
created_at=str(it.get("created_at") or "").strip(),
updated_at=str(it.get("updated_at") or "").strip(),
)
ok = bool(prof) and (msg.strip().lower() in ("saved", "ok"))
if not msg and ok:
msg = "saved"
return TrafficAppProfileSaveResult(ok=ok, message=msg, profile=prof)
def traffic_app_profile_delete(self, id: str) -> CmdResult:
pid = str(id or "").strip()
if not pid:
raise ValueError("missing id")
data = cast(
Dict[str, Any],
self._json(
self._request("DELETE", "/api/v1/traffic/app-profiles", params={"id": pid})
)
or {},
)
return CmdResult(
ok=bool(data.get("ok", False)),
message=str(data.get("message") or ""),
exit_code=None,
stdout="",
stderr="",
)
def traffic_audit_get(self) -> TrafficAudit:
data = cast(
Dict[str, Any],
self._json(self._request("GET", "/api/v1/traffic/audit")) or {},
)
raw_issues = data.get("issues") or []
if not isinstance(raw_issues, list):
raw_issues = []
return TrafficAudit(
ok=bool(data.get("ok", False)),
message=strip_ansi(str(data.get("message") or "").strip()),
now=str(data.get("now") or "").strip(),
pretty=strip_ansi(str(data.get("pretty") or "").strip()),
issues=[strip_ansi(str(x)).strip() for x in raw_issues if str(x).strip()],
)
# DNS / SmartDNS
def dns_upstreams_get(self) -> DnsUpstreams:
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/dns-upstreams")) or {})
return DnsUpstreams(
default1=str(data.get("default1") or ""),
default2=str(data.get("default2") or ""),
meta1=str(data.get("meta1") or ""),
meta2=str(data.get("meta2") or ""),
)
def dns_upstreams_set(self, cfg: DnsUpstreams) -> None:
self._request(
"POST",
"/api/v1/dns-upstreams",
json_body={
"default1": cfg.default1,
"default2": cfg.default2,
"meta1": cfg.meta1,
"meta2": cfg.meta2,
},
)
def dns_upstream_pool_get(self) -> DNSUpstreamPoolState:
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/dns/upstream-pool")) or {})
raw = data.get("items") or []
if not isinstance(raw, list):
raw = []
items: List[DNSBenchmarkUpstream] = []
for row in raw:
if not isinstance(row, dict):
continue
addr = str(row.get("addr") or "").strip()
if not addr:
continue
items.append(DNSBenchmarkUpstream(addr=addr, enabled=bool(row.get("enabled", True))))
return DNSUpstreamPoolState(items=items)
def dns_upstream_pool_set(self, items: List[DNSBenchmarkUpstream]) -> DNSUpstreamPoolState:
data = cast(
Dict[str, Any],
self._json(
self._request(
"POST",
"/api/v1/dns/upstream-pool",
json_body={
"items": [{"addr": u.addr, "enabled": bool(u.enabled)} for u in (items or [])],
},
)
)
or {},
)
raw = data.get("items") or []
if not isinstance(raw, list):
raw = []
out: List[DNSBenchmarkUpstream] = []
for row in raw:
if not isinstance(row, dict):
continue
addr = str(row.get("addr") or "").strip()
if not addr:
continue
out.append(DNSBenchmarkUpstream(addr=addr, enabled=bool(row.get("enabled", True))))
return DNSUpstreamPoolState(items=out)
def dns_benchmark(
self,
upstreams: List[DNSBenchmarkUpstream],
domains: List[str],
timeout_ms: int = 1800,
attempts: int = 1,
concurrency: int = 6,
) -> DNSBenchmarkResponse:
data = cast(
Dict[str, Any],
self._json(
self._request(
"POST",
"/api/v1/dns/benchmark",
json_body={
"upstreams": [{"addr": u.addr, "enabled": bool(u.enabled)} for u in (upstreams or [])],
"domains": [str(d or "").strip() for d in (domains or []) if str(d or "").strip()],
"timeout_ms": int(timeout_ms),
"attempts": int(attempts),
"concurrency": int(concurrency),
},
)
)
or {},
)
raw_results = data.get("results") or []
if not isinstance(raw_results, list):
raw_results = []
results: List[DNSBenchmarkResult] = []
for row in raw_results:
if not isinstance(row, dict):
continue
results.append(
DNSBenchmarkResult(
upstream=str(row.get("upstream") or "").strip(),
attempts=int(row.get("attempts", 0) or 0),
ok=int(row.get("ok", 0) or 0),
fail=int(row.get("fail", 0) or 0),
nxdomain=int(row.get("nxdomain", 0) or 0),
timeout=int(row.get("timeout", 0) or 0),
temporary=int(row.get("temporary", 0) or 0),
other=int(row.get("other", 0) or 0),
avg_ms=int(row.get("avg_ms", 0) or 0),
p95_ms=int(row.get("p95_ms", 0) or 0),
score=float(row.get("score", 0.0) or 0.0),
color=str(row.get("color") or "").strip().lower(),
)
)
return DNSBenchmarkResponse(
results=results,
domains_used=[str(d or "").strip() for d in (data.get("domains_used") or []) if str(d or "").strip()],
timeout_ms=int(data.get("timeout_ms", 0) or 0),
attempts_per_domain=int(data.get("attempts_per_domain", 0) or 0),
recommended_default=[str(d or "").strip() for d in (data.get("recommended_default") or []) if str(d or "").strip()],
recommended_meta=[str(d or "").strip() for d in (data.get("recommended_meta") or []) if str(d or "").strip()],
)
def dns_status_get(self) -> DNSStatus:
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/dns/status")) or {})
return self._parse_dns_status(data)
def dns_mode_set(self, via_smartdns: bool, smartdns_addr: str) -> DNSStatus:
mode = "hybrid_wildcard" if bool(via_smartdns) else "direct"
data = cast(
Dict[str, Any],
self._json(
self._request(
"POST",
"/api/v1/dns/mode",
json_body={
"via_smartdns": bool(via_smartdns),
"smartdns_addr": str(smartdns_addr or ""),
"mode": mode,
},
)
)
or {},
)
return self._parse_dns_status(data)
def dns_smartdns_service_set(self, action: ServiceAction) -> DNSStatus:
act = action.lower()
if act not in ("start", "stop", "restart"):
raise ValueError(f"Invalid action: {action}")
data = cast(
Dict[str, Any],
self._json(
self._request(
"POST",
"/api/v1/dns/smartdns-service",
json_body={"action": act},
)
)
or {},
)
if not bool(data.get("ok", False)):
raise ValueError(str(data.get("message") or f"SmartDNS {act} failed"))
return self._parse_dns_status(data)
def smartdns_service_get(self) -> SmartdnsServiceState:
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/smartdns/service")) or {})
return SmartdnsServiceState(state=str(data.get("state") or "unknown"))
def smartdns_service_set(self, action: ServiceAction) -> CmdResult:
act = action.lower()
if act not in ("start", "stop", "restart"):
raise ValueError(f"Invalid action: {action}")
data = cast(Dict[str, Any], self._json(self._request("POST", "/api/v1/smartdns/service", json_body={"action": act})) or {})
return self._parse_cmd_result(data)
def smartdns_runtime_get(self) -> SmartdnsRuntimeState:
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/smartdns/runtime")) or {})
return SmartdnsRuntimeState(
enabled=bool(data.get("enabled", False)),
applied_enabled=bool(data.get("applied_enabled", False)),
wildcard_source=str(data.get("wildcard_source") or ("both" if bool(data.get("enabled", False)) else "resolver")),
unit_state=str(data.get("unit_state") or "unknown"),
config_path=str(data.get("config_path") or ""),
changed=bool(data.get("changed", False)),
restarted=bool(data.get("restarted", False)),
message=str(data.get("message") or ""),
)
def smartdns_runtime_set(self, enabled: bool, restart: bool = True) -> SmartdnsRuntimeState:
data = cast(
Dict[str, Any],
self._json(
self._request(
"POST",
"/api/v1/smartdns/runtime",
json_body={"enabled": bool(enabled), "restart": bool(restart)},
)
)
or {},
)
return SmartdnsRuntimeState(
enabled=bool(data.get("enabled", False)),
applied_enabled=bool(data.get("applied_enabled", False)),
wildcard_source=str(data.get("wildcard_source") or ("both" if bool(data.get("enabled", False)) else "resolver")),
unit_state=str(data.get("unit_state") or "unknown"),
config_path=str(data.get("config_path") or ""),
changed=bool(data.get("changed", False)),
restarted=bool(data.get("restarted", False)),
message=str(data.get("message") or ""),
)
def smartdns_prewarm(self, limit: int = 0, aggressive_subs: bool = False) -> CmdResult:
payload: Dict[str, Any] = {}
if int(limit) > 0:
payload["limit"] = int(limit)
if aggressive_subs:
payload["aggressive_subs"] = True
data = cast(
Dict[str, Any],
self._json(self._request("POST", "/api/v1/smartdns/prewarm", json_body=payload)) or {},
)
return self._parse_cmd_result(data)
def _parse_dns_status(self, data: Dict[str, Any]) -> DNSStatus:
via = bool(data.get("via_smartdns", False))
runtime = bool(data.get("runtime_nftset", True))
return DNSStatus(
via_smartdns=via,
smartdns_addr=str(data.get("smartdns_addr") or ""),
mode=str(data.get("mode") or ("hybrid_wildcard" if via else "direct")),
unit_state=str(data.get("unit_state") or "unknown"),
runtime_nftset=runtime,
wildcard_source=str(data.get("wildcard_source") or ("both" if runtime else "resolver")),
runtime_config_path=str(data.get("runtime_config_path") or ""),
runtime_config_error=str(data.get("runtime_config_error") or ""),
)
# Domains editor
def domains_table(self) -> DomainsTable:
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/domains/table")) or {})
lines = data.get("lines") or []
if not isinstance(lines, list):
lines = []
return DomainsTable(lines=[str(x) for x in lines])
def domains_file_get(self, name: Literal["bases", "meta", "subs", "static", "smartdns", "last-ips-map", "last-ips-map-direct", "last-ips-map-wildcard"]) -> DomainsFile:
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/domains/file", params={"name": name})) or {})
content = str(data.get("content") or "")
source = str(data.get("source") or "")
return DomainsFile(name=name, content=content, source=source)
def domains_file_set(self, name: Literal["bases", "meta", "subs", "static", "smartdns", "last-ips-map", "last-ips-map-direct", "last-ips-map-wildcard"], content: str) -> None:
self._request("POST", "/api/v1/domains/file", json_body={"name": name, "content": content})
# VPN
def vpn_autoloop_status(self) -> VpnAutoloopStatus:
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/vpn/autoloop-status", timeout=2.0)) or {})
raw = strip_ansi(str(data.get("raw_text") or "").strip())
word = str(data.get("status_word") or "unknown").strip()
return VpnAutoloopStatus(raw_text=raw, status_word=word)
def vpn_status(self) -> VpnStatus:
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/vpn/status", timeout=2.0)) or {})
return VpnStatus(
desired_location=str(data.get("desired_location") or "").strip(),
status_word=str(data.get("status_word") or "unknown").strip(),
raw_text=strip_ansi(str(data.get("raw_text") or "").strip()),
unit_state=str(data.get("unit_state") or "unknown").strip(),
)
def vpn_autoconnect(self, enable: bool) -> CmdResult:
action = "start" if enable else "stop"
data = cast(Dict[str, Any], self._json(self._request("POST", "/api/v1/vpn/autoconnect", json_body={"action": action})) or {})
return self._parse_cmd_result(data)
def vpn_locations(self) -> List[VpnLocation]:
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/vpn/locations", timeout=10.0)) or {})
locs = data.get("locations") or []
res: List[VpnLocation] = []
if isinstance(locs, list):
for item in locs:
if isinstance(item, dict):
label = str(item.get("label") or "")
iso = str(item.get("iso") or "")
if label and iso:
res.append(VpnLocation(label=label, iso=iso))
return res
def vpn_set_location(self, iso: str) -> None:
val = str(iso).strip()
if not val:
raise ValueError("iso is required")
self._request("POST", "/api/v1/vpn/location", json_body={"iso": val})
# Trace
def trace_get(self, mode: TraceMode = "full") -> TraceDump:
m = str(mode).lower().strip()
if m not in ("full", "gui", "smartdns"):
m = "full"
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/trace-json", params={"mode": m}, timeout=5.0)) or {})
lines = data.get("lines") or []
if not isinstance(lines, list):
lines = []
return TraceDump(lines=[strip_ansi(str(x)) for x in lines])
def trace_append(self, kind: Literal["gui", "smartdns", "info"], line: str) -> None:
try:
self._request("POST", "/api/v1/trace/append", json_body={"kind": kind, "line": str(line)}, timeout=2.0)
except ApiError:
# Logging must never crash UI.
pass
# ---- AdGuard VPN interactive login-session (NEW) ----
def vpn_login_session_start(self) -> LoginSessionStart:
data = cast(
Dict[str, Any],
self._json(self._request("POST", "/api/v1/vpn/login/session/start", timeout=10.0)) or {},
)
pid_val = data.get("pid", None)
pid: Optional[int]
try:
pid = int(pid_val) if pid_val is not None else None
except (TypeError, ValueError):
pid = None
return LoginSessionStart(
ok=bool(data.get("ok", False)),
phase=str(data.get("phase") or ""),
level=str(data.get("level") or ""),
pid=pid,
email=strip_ansi(str(data.get("email") or "").strip()),
error=strip_ansi(str(data.get("error") or "").strip()),
)
def vpn_login_session_state(self, since: int = 0) -> LoginSessionState:
since_i = int(since) if since is not None else 0
data = cast(
Dict[str, Any],
self._json(
self._request(
"GET",
"/api/v1/vpn/login/session/state",
params={"since": str(max(0, since_i))},
timeout=5.0,
)
)
or {},
)
lines = data.get("lines") or []
if not isinstance(lines, list):
lines = []
cursor_val = data.get("cursor", 0)
try:
cursor = int(cursor_val)
except (TypeError, ValueError):
cursor = 0
return LoginSessionState(
ok=bool(data.get("ok", False)),
phase=str(data.get("phase") or ""),
level=str(data.get("level") or ""),
alive=bool(data.get("alive", False)),
url=strip_ansi(str(data.get("url") or "").strip()),
email=strip_ansi(str(data.get("email") or "").strip()),
cursor=cursor,
lines=[strip_ansi(str(x)) for x in lines],
can_open=bool(data.get("can_open", False)),
can_check=bool(data.get("can_check", False)),
can_cancel=bool(data.get("can_cancel", False)),
)
def vpn_login_session_action(self, action: Literal["open", "check", "cancel"]) -> LoginSessionAction:
act = str(action).strip().lower()
if act not in ("open", "check", "cancel"):
raise ValueError(f"Invalid login-session action: {action}")
data = cast(
Dict[str, Any],
self._json(
self._request(
"POST",
"/api/v1/vpn/login/session/action",
json_body={"action": act},
timeout=10.0,
)
)
or {},
)
# backend может вернуть {ok:false,error:"..."} без phase/level
return LoginSessionAction(
ok=bool(data.get("ok", False)),
phase=str(data.get("phase") or ""),
level=str(data.get("level") or ""),
error=strip_ansi(str(data.get("error") or "").strip()),
)
def vpn_login_session_stop(self) -> CmdResult:
# stop returns {"ok": true} — завернём в CmdResult, чтобы UI/Controller единообразно печатал
data = cast(
Dict[str, Any],
self._json(self._request("POST", "/api/v1/vpn/login/session/stop", timeout=10.0)) or {},
)
ok = bool(data.get("ok", False))
return CmdResult(ok=ok, message="login session stopped" if ok else "failed to stop login session")
def vpn_logout(self) -> CmdResult:
data = cast(Dict[str, Any], self._json(self._request("POST", "/api/v1/vpn/logout", timeout=20.0)) or {})
return self._parse_cmd_result(data)
# ---- helpers ----
def _parse_cmd_result(self, data: Dict[str, Any]) -> CmdResult:
ok = bool(data.get("ok", False))
msg = str(data.get("message") or "")
exit_code_val = data.get("exitCode", None)
exit_code: Optional[int]
try:
exit_code = int(exit_code_val) if exit_code_val is not None else None
except (TypeError, ValueError):
exit_code = None
stdout = strip_ansi(str(data.get("stdout") or ""))
stderr = strip_ansi(str(data.get("stderr") or ""))
return CmdResult(ok=ok, message=msg, exit_code=exit_code, stdout=stdout, stderr=stderr)