#!/usr/bin/env python3 """Selective-VPN API client (UI-agnostic). Design goals: - The dashboard (GUI) must NOT know any URLs, HTTP methods, JSON keys, or payload shapes. - All REST details live here. - Returned values are normalized into dataclasses for clean UI usage. Env: - SELECTIVE_VPN_API (default: http://127.0.0.1:8080) This file is meant to be imported by a controller (dashboard_controller.py) and UI. """ from __future__ import annotations import json import os import time from typing import Any, Callable, Dict, Iterator, List, Optional import requests from .errors import ApiError from .models import * from .utils import strip_ansi from .dns import DNSApiMixin from .domains import DomainsApiMixin from .routes import RoutesApiMixin from .status import StatusApiMixin from .trace import TraceApiMixin from .traffic import TrafficApiMixin from .transport import TransportApiMixin from .vpn import VpnApiMixin class ApiClient( TransportApiMixin, StatusApiMixin, RoutesApiMixin, TrafficApiMixin, DNSApiMixin, DomainsApiMixin, VpnApiMixin, TraceApiMixin, ): """Domain API client. Public methods here are the ONLY surface the dashboard/controller should use. """ def __init__( self, base_url: str, *, timeout: float = 5.0, session: Optional[requests.Session] = None, ) -> None: self.base_url = base_url.rstrip("/") self.timeout = float(timeout) self._s = session or requests.Session() @classmethod def from_env( cls, env_var: str = "SELECTIVE_VPN_API", default: str = "http://127.0.0.1:8080", *, timeout: float = 5.0, ) -> "ApiClient": base = os.environ.get(env_var, default).rstrip("/") return cls(base, timeout=timeout) # ---- low-level internals (private) ---- def _url(self, path: str) -> str: if not path.startswith("/"): path = "/" + path return self.base_url + path def _request( self, method: str, path: str, *, params: Optional[Dict[str, Any]] = None, json_body: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None, accept_json: bool = True, ) -> requests.Response: url = self._url(path) headers: Dict[str, str] = {} if accept_json: headers["Accept"] = "application/json" try: resp = self._s.request( method=method.upper(), url=url, params=params, json=json_body, timeout=self.timeout if timeout is None else float(timeout), headers=headers, ) except requests.RequestException as e: raise ApiError("API request failed", method.upper(), url, None, str(e)) from e if not (200 <= resp.status_code < 300): txt = resp.text.strip() raise ApiError("API returned error", method.upper(), url, resp.status_code, txt) return resp def _json(self, resp: requests.Response) -> Any: if not resp.content: return None try: return resp.json() except ValueError: # Backend should be JSON, but keep safe fallback. return {"raw": resp.text} # ---- event stream (SSE) ---- def events_stream(self, since: int = 0, stop: Optional[Callable[[], bool]] = None) -> Iterator[Event]: """ Iterate over server-sent events. Reconnects automatically on errors. Args: since: last seen event id (inclusive). Server will replay newer ones. stop: optional callable returning True to stop streaming. """ last = max(0, int(since)) backoff = 1.0 while True: if stop and stop(): return try: for ev in self._sse_once(last, stop): if stop and stop(): return last = ev.id if ev.id else last yield ev # normal end -> reconnect backoff = 1.0 except ApiError: # bubble up API errors; caller decides raise except Exception: # transient error, retry with backoff time.sleep(backoff) backoff = min(backoff * 2, 10.0) def _sse_once(self, since: int, stop: Optional[Callable[[], bool]]) -> Iterator[Event]: headers = { "Accept": "text/event-stream", "Cache-Control": "no-cache", } params = {} if since > 0: params["since"] = str(since) url = self._url("/api/v1/events/stream") # SSE соединение живёт долго: backend шлёт heartbeat каждые 15s, # поэтому ставим более длинный read-timeout, иначе стандартные 5s # приводят к ложным ошибкам чтения. read_timeout = max(self.timeout * 3, 60.0) try: resp = self._s.request( method="GET", url=url, headers=headers, params=params, stream=True, timeout=(self.timeout, read_timeout), ) except requests.RequestException as e: raise ApiError("API request failed", "GET", url, None, str(e)) from e if not (200 <= resp.status_code < 300): txt = resp.text.strip() raise ApiError("API returned error", "GET", url, resp.status_code, txt) ev_id: Optional[int] = None ev_kind: str = "" data_lines: List[str] = [] for raw in resp.iter_lines(decode_unicode=True): if stop and stop(): resp.close() return if raw is None: continue line = raw.strip("\r") if line == "": if data_lines or ev_kind or ev_id is not None: ev = self._make_event(ev_id, ev_kind, data_lines) if ev: yield ev ev_id = None ev_kind = "" data_lines = [] continue if line.startswith(":"): # heartbeat/comment continue if line.startswith("id:"): try: ev_id = int(line[3:].strip()) except ValueError: ev_id = None continue if line.startswith("event:"): ev_kind = line[6:].strip() continue if line.startswith("data:"): data_lines.append(line[5:].lstrip()) continue # unknown field -> ignore def _make_event(self, ev_id: Optional[int], ev_kind: str, data_lines: List[str]) -> Optional[Event]: payload: Any = None if data_lines: data_str = "\n".join(data_lines) try: payload = json.loads(data_str) except Exception: payload = data_str if isinstance(payload, dict): id_val = ev_id if id_val is None: try: id_val = int(payload.get("id", 0)) except Exception: id_val = 0 kind_val = ev_kind or str(payload.get("kind") or "") ts_val = str(payload.get("ts") or "") data_val = payload.get("data", payload) return Event(id=id_val, kind=kind_val, ts=ts_val, data=data_val) return Event(id=ev_id or 0, kind=ev_kind, ts="", data=payload) # ---- shared helpers ---- def _to_int(self, value: Any, default: int = 0) -> int: try: return int(value) except (TypeError, ValueError): return int(default) def _parse_cmd_result(self, data: Dict[str, Any]) -> CmdResult: ok = bool(data.get("ok", False)) msg = str(data.get("message") or "") exit_code_val = data.get("exitCode", None) exit_code: Optional[int] try: exit_code = int(exit_code_val) if exit_code_val is not None else None except (TypeError, ValueError): exit_code = None stdout = strip_ansi(str(data.get("stdout") or "")) stderr = strip_ansi(str(data.get("stderr") or "")) return CmdResult(ok=ok, message=msg, exit_code=exit_code, stdout=stdout, stderr=stderr)