from __future__ import annotations from typing import Any, Dict, List, cast from .models import * from .utils import strip_ansi class VpnApiMixin: # VPN def vpn_autoloop_status(self) -> VpnAutoloopStatus: data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/vpn/autoloop-status", timeout=2.0)) or {}) raw = strip_ansi(str(data.get("raw_text") or "").strip()) word = str(data.get("status_word") or "unknown").strip() return VpnAutoloopStatus(raw_text=raw, status_word=word) def vpn_status(self) -> VpnStatus: data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/vpn/status", timeout=2.0)) or {}) return VpnStatus( desired_location=str(data.get("desired_location") or "").strip(), status_word=str(data.get("status_word") or "unknown").strip(), raw_text=strip_ansi(str(data.get("raw_text") or "").strip()), unit_state=str(data.get("unit_state") or "unknown").strip(), ) def vpn_autoconnect(self, enable: bool) -> CmdResult: action = "start" if enable else "stop" data = cast( Dict[str, Any], self._json(self._request("POST", "/api/v1/vpn/autoconnect", json_body={"action": action})) or {}, ) return self._parse_cmd_result(data) def vpn_locations_state(self) -> VpnLocationsState: data = cast( Dict[str, Any], self._json(self._request("GET", "/api/v1/vpn/locations", timeout=3.0)) or {}, ) locs = data.get("locations") or [] res: List[VpnLocation] = [] if isinstance(locs, list): for item in locs: if isinstance(item, dict): label = str(item.get("label") or "") iso = str(item.get("iso") or "") target = str(item.get("target") or "").strip() if label and iso: if not target: target = iso res.append(VpnLocation(label=label, iso=iso, target=target)) return VpnLocationsState( locations=res, updated_at=str(data.get("updated_at") or "").strip(), stale=bool(data.get("stale", False)), refresh_in_progress=bool(data.get("refresh_in_progress", False)), last_error=strip_ansi(str(data.get("last_error") or "").strip()), next_retry_at=str(data.get("next_retry_at") or "").strip(), ) def vpn_locations(self) -> List[VpnLocation]: return self.vpn_locations_state().locations def vpn_locations_refresh_trigger(self) -> None: self._request( "GET", "/api/v1/vpn/locations", params={"refresh": "1"}, timeout=2.0, ) def vpn_set_location(self, target: str, iso: str = "", label: str = "") -> None: val = str(target).strip() if not val: raise ValueError("target is required") self._request( "POST", "/api/v1/vpn/location", json_body={ "target": val, "iso": str(iso).strip(), "label": str(label).strip(), }, ) def egress_identity_get(self, scope: str, *, refresh: bool = False) -> EgressIdentity: scope_v = str(scope or "").strip() if not scope_v: raise ValueError("scope is required") params: Dict[str, Any] = {"scope": scope_v} if refresh: params["refresh"] = "1" data = cast( Dict[str, Any], self._json( self._request( "GET", "/api/v1/egress/identity", params=params, timeout=2.0, ) ) or {}, ) item_raw = data.get("item") or {} if not isinstance(item_raw, dict): item_raw = {} return self._parse_egress_identity(item_raw, scope_fallback=scope_v) def egress_identity_refresh( self, *, scopes: Optional[List[str]] = None, force: bool = False, ) -> EgressIdentityRefreshResult: payload: Dict[str, Any] = {} scope_items: List[str] = [] for raw in list(scopes or []): v = str(raw or "").strip() if v: scope_items.append(v) if scope_items: payload["scopes"] = scope_items if force: payload["force"] = True data = cast( Dict[str, Any], self._json( self._request( "POST", "/api/v1/egress/identity/refresh", json_body=payload, timeout=2.0, ) ) or {}, ) raw_items = data.get("items") or [] if not isinstance(raw_items, list): raw_items = [] items: List[EgressIdentityRefreshItem] = [] for row in raw_items: if not isinstance(row, dict): continue items.append( EgressIdentityRefreshItem( scope=str(row.get("scope") or "").strip(), status=str(row.get("status") or "").strip().lower(), queued=bool(row.get("queued", False)), reason=strip_ansi(str(row.get("reason") or "").strip()), ) ) return EgressIdentityRefreshResult( ok=bool(data.get("ok", False)), message=strip_ansi(str(data.get("message") or "").strip()), count=self._to_int(data.get("count")), queued=self._to_int(data.get("queued")), skipped=self._to_int(data.get("skipped")), items=items, ) # ---- AdGuard VPN interactive login-session ---- def vpn_login_session_start(self) -> LoginSessionStart: data = cast( Dict[str, Any], self._json(self._request("POST", "/api/v1/vpn/login/session/start", timeout=10.0)) or {}, ) pid_val = data.get("pid", None) pid: Optional[int] try: pid = int(pid_val) if pid_val is not None else None except (TypeError, ValueError): pid = None return LoginSessionStart( ok=bool(data.get("ok", False)), phase=str(data.get("phase") or ""), level=str(data.get("level") or ""), pid=pid, email=strip_ansi(str(data.get("email") or "").strip()), error=strip_ansi(str(data.get("error") or "").strip()), ) def vpn_login_session_state(self, since: int = 0) -> LoginSessionState: since_i = int(since) if since is not None else 0 data = cast( Dict[str, Any], self._json( self._request( "GET", "/api/v1/vpn/login/session/state", params={"since": str(max(0, since_i))}, timeout=5.0, ) ) or {}, ) lines = data.get("lines") or [] if not isinstance(lines, list): lines = [] cursor_val = data.get("cursor", 0) try: cursor = int(cursor_val) except (TypeError, ValueError): cursor = 0 return LoginSessionState( ok=bool(data.get("ok", False)), phase=str(data.get("phase") or ""), level=str(data.get("level") or ""), alive=bool(data.get("alive", False)), url=strip_ansi(str(data.get("url") or "").strip()), email=strip_ansi(str(data.get("email") or "").strip()), cursor=cursor, lines=[strip_ansi(str(x)) for x in lines], can_open=bool(data.get("can_open", False)), can_check=bool(data.get("can_check", False)), can_cancel=bool(data.get("can_cancel", False)), ) def vpn_login_session_action(self, action: Literal["open", "check", "cancel"]) -> LoginSessionAction: act = str(action).strip().lower() if act not in ("open", "check", "cancel"): raise ValueError(f"Invalid login-session action: {action}") data = cast( Dict[str, Any], self._json( self._request( "POST", "/api/v1/vpn/login/session/action", json_body={"action": act}, timeout=10.0, ) ) or {}, ) return LoginSessionAction( ok=bool(data.get("ok", False)), phase=str(data.get("phase") or ""), level=str(data.get("level") or ""), error=strip_ansi(str(data.get("error") or "").strip()), ) def vpn_login_session_stop(self) -> CmdResult: # Stop returns {"ok": true}; wrap into CmdResult for controller consistency. data = cast( Dict[str, Any], self._json(self._request("POST", "/api/v1/vpn/login/session/stop", timeout=10.0)) or {}, ) ok = bool(data.get("ok", False)) return CmdResult(ok=ok, message="login session stopped" if ok else "failed to stop login session") def vpn_logout(self) -> CmdResult: data = cast(Dict[str, Any], self._json(self._request("POST", "/api/v1/vpn/logout", timeout=20.0)) or {}) return self._parse_cmd_result(data) def _parse_egress_identity( self, raw: Dict[str, Any], *, scope_fallback: str = "", ) -> EgressIdentity: data = raw if isinstance(raw, dict) else {} return EgressIdentity( scope=str(data.get("scope") or scope_fallback).strip(), source=str(data.get("source") or "").strip().lower(), source_id=str(data.get("source_id") or "").strip(), ip=str(data.get("ip") or "").strip(), country_code=str(data.get("country_code") or "").strip().upper(), country_name=str(data.get("country_name") or "").strip(), updated_at=str(data.get("updated_at") or "").strip(), stale=bool(data.get("stale", False)), refresh_in_progress=bool(data.get("refresh_in_progress", False)), last_error=strip_ansi(str(data.get("last_error") or "").strip()), next_retry_at=str(data.get("next_retry_at") or "").strip(), )