283 lines
10 KiB
Python
283 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any, Dict, List, cast
|
|
|
|
from .models import *
|
|
from .utils import strip_ansi
|
|
|
|
|
|
class VpnApiMixin:
|
|
# VPN
|
|
def vpn_autoloop_status(self) -> VpnAutoloopStatus:
|
|
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/vpn/autoloop-status", timeout=2.0)) or {})
|
|
raw = strip_ansi(str(data.get("raw_text") or "").strip())
|
|
word = str(data.get("status_word") or "unknown").strip()
|
|
return VpnAutoloopStatus(raw_text=raw, status_word=word)
|
|
|
|
def vpn_status(self) -> VpnStatus:
|
|
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/vpn/status", timeout=2.0)) or {})
|
|
return VpnStatus(
|
|
desired_location=str(data.get("desired_location") or "").strip(),
|
|
status_word=str(data.get("status_word") or "unknown").strip(),
|
|
raw_text=strip_ansi(str(data.get("raw_text") or "").strip()),
|
|
unit_state=str(data.get("unit_state") or "unknown").strip(),
|
|
)
|
|
|
|
def vpn_autoconnect(self, enable: bool) -> CmdResult:
|
|
action = "start" if enable else "stop"
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("POST", "/api/v1/vpn/autoconnect", json_body={"action": action})) or {},
|
|
)
|
|
return self._parse_cmd_result(data)
|
|
|
|
def vpn_locations_state(self) -> VpnLocationsState:
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("GET", "/api/v1/vpn/locations", timeout=3.0)) or {},
|
|
)
|
|
locs = data.get("locations") or []
|
|
res: List[VpnLocation] = []
|
|
if isinstance(locs, list):
|
|
for item in locs:
|
|
if isinstance(item, dict):
|
|
label = str(item.get("label") or "")
|
|
iso = str(item.get("iso") or "")
|
|
target = str(item.get("target") or "").strip()
|
|
if label and iso:
|
|
if not target:
|
|
target = iso
|
|
res.append(VpnLocation(label=label, iso=iso, target=target))
|
|
return VpnLocationsState(
|
|
locations=res,
|
|
updated_at=str(data.get("updated_at") or "").strip(),
|
|
stale=bool(data.get("stale", False)),
|
|
refresh_in_progress=bool(data.get("refresh_in_progress", False)),
|
|
last_error=strip_ansi(str(data.get("last_error") or "").strip()),
|
|
next_retry_at=str(data.get("next_retry_at") or "").strip(),
|
|
)
|
|
|
|
def vpn_locations(self) -> List[VpnLocation]:
|
|
return self.vpn_locations_state().locations
|
|
|
|
def vpn_locations_refresh_trigger(self) -> None:
|
|
self._request(
|
|
"GET",
|
|
"/api/v1/vpn/locations",
|
|
params={"refresh": "1"},
|
|
timeout=2.0,
|
|
)
|
|
|
|
def vpn_set_location(self, target: str, iso: str = "", label: str = "") -> None:
|
|
val = str(target).strip()
|
|
if not val:
|
|
raise ValueError("target is required")
|
|
self._request(
|
|
"POST",
|
|
"/api/v1/vpn/location",
|
|
json_body={
|
|
"target": val,
|
|
"iso": str(iso).strip(),
|
|
"label": str(label).strip(),
|
|
},
|
|
)
|
|
|
|
def egress_identity_get(self, scope: str, *, refresh: bool = False) -> EgressIdentity:
|
|
scope_v = str(scope or "").strip()
|
|
if not scope_v:
|
|
raise ValueError("scope is required")
|
|
params: Dict[str, Any] = {"scope": scope_v}
|
|
if refresh:
|
|
params["refresh"] = "1"
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(
|
|
self._request(
|
|
"GET",
|
|
"/api/v1/egress/identity",
|
|
params=params,
|
|
timeout=2.0,
|
|
)
|
|
)
|
|
or {},
|
|
)
|
|
item_raw = data.get("item") or {}
|
|
if not isinstance(item_raw, dict):
|
|
item_raw = {}
|
|
return self._parse_egress_identity(item_raw, scope_fallback=scope_v)
|
|
|
|
def egress_identity_refresh(
|
|
self,
|
|
*,
|
|
scopes: Optional[List[str]] = None,
|
|
force: bool = False,
|
|
) -> EgressIdentityRefreshResult:
|
|
payload: Dict[str, Any] = {}
|
|
scope_items: List[str] = []
|
|
for raw in list(scopes or []):
|
|
v = str(raw or "").strip()
|
|
if v:
|
|
scope_items.append(v)
|
|
if scope_items:
|
|
payload["scopes"] = scope_items
|
|
if force:
|
|
payload["force"] = True
|
|
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(
|
|
self._request(
|
|
"POST",
|
|
"/api/v1/egress/identity/refresh",
|
|
json_body=payload,
|
|
timeout=2.0,
|
|
)
|
|
)
|
|
or {},
|
|
)
|
|
|
|
raw_items = data.get("items") or []
|
|
if not isinstance(raw_items, list):
|
|
raw_items = []
|
|
items: List[EgressIdentityRefreshItem] = []
|
|
for row in raw_items:
|
|
if not isinstance(row, dict):
|
|
continue
|
|
items.append(
|
|
EgressIdentityRefreshItem(
|
|
scope=str(row.get("scope") or "").strip(),
|
|
status=str(row.get("status") or "").strip().lower(),
|
|
queued=bool(row.get("queued", False)),
|
|
reason=strip_ansi(str(row.get("reason") or "").strip()),
|
|
)
|
|
)
|
|
return EgressIdentityRefreshResult(
|
|
ok=bool(data.get("ok", False)),
|
|
message=strip_ansi(str(data.get("message") or "").strip()),
|
|
count=self._to_int(data.get("count")),
|
|
queued=self._to_int(data.get("queued")),
|
|
skipped=self._to_int(data.get("skipped")),
|
|
items=items,
|
|
)
|
|
|
|
# ---- AdGuard VPN interactive login-session ----
|
|
|
|
def vpn_login_session_start(self) -> LoginSessionStart:
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("POST", "/api/v1/vpn/login/session/start", timeout=10.0)) or {},
|
|
)
|
|
pid_val = data.get("pid", None)
|
|
pid: Optional[int]
|
|
try:
|
|
pid = int(pid_val) if pid_val is not None else None
|
|
except (TypeError, ValueError):
|
|
pid = None
|
|
|
|
return LoginSessionStart(
|
|
ok=bool(data.get("ok", False)),
|
|
phase=str(data.get("phase") or ""),
|
|
level=str(data.get("level") or ""),
|
|
pid=pid,
|
|
email=strip_ansi(str(data.get("email") or "").strip()),
|
|
error=strip_ansi(str(data.get("error") or "").strip()),
|
|
)
|
|
|
|
def vpn_login_session_state(self, since: int = 0) -> LoginSessionState:
|
|
since_i = int(since) if since is not None else 0
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(
|
|
self._request(
|
|
"GET",
|
|
"/api/v1/vpn/login/session/state",
|
|
params={"since": str(max(0, since_i))},
|
|
timeout=5.0,
|
|
)
|
|
)
|
|
or {},
|
|
)
|
|
|
|
lines = data.get("lines") or []
|
|
if not isinstance(lines, list):
|
|
lines = []
|
|
|
|
cursor_val = data.get("cursor", 0)
|
|
try:
|
|
cursor = int(cursor_val)
|
|
except (TypeError, ValueError):
|
|
cursor = 0
|
|
|
|
return LoginSessionState(
|
|
ok=bool(data.get("ok", False)),
|
|
phase=str(data.get("phase") or ""),
|
|
level=str(data.get("level") or ""),
|
|
alive=bool(data.get("alive", False)),
|
|
url=strip_ansi(str(data.get("url") or "").strip()),
|
|
email=strip_ansi(str(data.get("email") or "").strip()),
|
|
cursor=cursor,
|
|
lines=[strip_ansi(str(x)) for x in lines],
|
|
can_open=bool(data.get("can_open", False)),
|
|
can_check=bool(data.get("can_check", False)),
|
|
can_cancel=bool(data.get("can_cancel", False)),
|
|
)
|
|
|
|
def vpn_login_session_action(self, action: Literal["open", "check", "cancel"]) -> LoginSessionAction:
|
|
act = str(action).strip().lower()
|
|
if act not in ("open", "check", "cancel"):
|
|
raise ValueError(f"Invalid login-session action: {action}")
|
|
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(
|
|
self._request(
|
|
"POST",
|
|
"/api/v1/vpn/login/session/action",
|
|
json_body={"action": act},
|
|
timeout=10.0,
|
|
)
|
|
)
|
|
or {},
|
|
)
|
|
|
|
return LoginSessionAction(
|
|
ok=bool(data.get("ok", False)),
|
|
phase=str(data.get("phase") or ""),
|
|
level=str(data.get("level") or ""),
|
|
error=strip_ansi(str(data.get("error") or "").strip()),
|
|
)
|
|
|
|
def vpn_login_session_stop(self) -> CmdResult:
|
|
# Stop returns {"ok": true}; wrap into CmdResult for controller consistency.
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("POST", "/api/v1/vpn/login/session/stop", timeout=10.0)) or {},
|
|
)
|
|
ok = bool(data.get("ok", False))
|
|
return CmdResult(ok=ok, message="login session stopped" if ok else "failed to stop login session")
|
|
|
|
def vpn_logout(self) -> CmdResult:
|
|
data = cast(Dict[str, Any], self._json(self._request("POST", "/api/v1/vpn/logout", timeout=20.0)) or {})
|
|
return self._parse_cmd_result(data)
|
|
|
|
def _parse_egress_identity(
|
|
self,
|
|
raw: Dict[str, Any],
|
|
*,
|
|
scope_fallback: str = "",
|
|
) -> EgressIdentity:
|
|
data = raw if isinstance(raw, dict) else {}
|
|
return EgressIdentity(
|
|
scope=str(data.get("scope") or scope_fallback).strip(),
|
|
source=str(data.get("source") or "").strip().lower(),
|
|
source_id=str(data.get("source_id") or "").strip(),
|
|
ip=str(data.get("ip") or "").strip(),
|
|
country_code=str(data.get("country_code") or "").strip().upper(),
|
|
country_name=str(data.get("country_name") or "").strip(),
|
|
updated_at=str(data.get("updated_at") or "").strip(),
|
|
stale=bool(data.get("stale", False)),
|
|
refresh_in_progress=bool(data.get("refresh_in_progress", False)),
|
|
last_error=strip_ansi(str(data.get("last_error") or "").strip()),
|
|
next_retry_at=str(data.get("next_retry_at") or "").strip(),
|
|
)
|