from __future__ import annotations from typing import Any, Dict, List, Optional, cast from .models import * from .utils import strip_ansi class TransportClientsApiMixin: def transport_interfaces_get(self) -> TransportInterfacesSnapshot: data = cast( Dict[str, Any], self._json(self._request("GET", "/api/v1/transport/interfaces")) or {}, ) raw = data.get("items") or [] if not isinstance(raw, list): raw = [] items: List[TransportInterfaceItem] = [] for row in raw: parsed = self._parse_transport_interface_item(row) if parsed is not None: items.append(parsed) return TransportInterfacesSnapshot( ok=bool(data.get("ok", False)), message=strip_ansi(str(data.get("message") or "").strip()), code=str(data.get("code") or "").strip(), count=self._to_int(data.get("count")), items=items, ) def transport_clients_get(self, enabled_only: bool = False, kind: str = "", include_virtual: bool = False) -> List[TransportClient]: params: Dict[str, Any] = {} if enabled_only: params["enabled_only"] = "true" kind_l = str(kind or "").strip().lower() if kind_l: params["kind"] = kind_l if include_virtual: params["include_virtual"] = "true" data = cast( Dict[str, Any], self._json( self._request("GET", "/api/v1/transport/clients", params=(params or None)) ) or {}, ) raw = data.get("items") or [] if not isinstance(raw, list): raw = [] out: List[TransportClient] = [] for row in raw: item = self._parse_transport_client(row) if item is not None: out.append(item) return out def transport_health_refresh( self, *, client_ids: Optional[List[str]] = None, force: bool = False, ) -> TransportHealthRefreshResult: payload: Dict[str, Any] = {} ids: List[str] = [] for raw in list(client_ids or []): cid = str(raw or "").strip() if cid: ids.append(cid) if ids: payload["client_ids"] = ids if force: payload["force"] = True data = cast( Dict[str, Any], self._json( self._request( "POST", "/api/v1/transport/health/refresh", json_body=payload, ) ) or {}, ) raw_items = data.get("items") or [] if not isinstance(raw_items, list): raw_items = [] items: List[TransportHealthRefreshItem] = [] for row in raw_items: if not isinstance(row, dict): continue items.append( TransportHealthRefreshItem( client_id=str(row.get("client_id") or "").strip(), status=str(row.get("status") or "").strip().lower(), queued=bool(row.get("queued", False)), reason=strip_ansi(str(row.get("reason") or "").strip()), ) ) return TransportHealthRefreshResult( ok=bool(data.get("ok", False)), message=strip_ansi(str(data.get("message") or "").strip()), code=str(data.get("code") or "").strip(), count=self._to_int(data.get("count")), queued=self._to_int(data.get("queued")), skipped=self._to_int(data.get("skipped")), items=items, ) def transport_client_health_get(self, client_id: str) -> TransportClientHealthSnapshot: cid = str(client_id or "").strip() if not cid: raise ValueError("missing transport client id") data = cast( Dict[str, Any], self._json( self._request( "GET", f"/api/v1/transport/clients/{cid}/health", ) ) or {}, ) raw_health = data.get("health") or {} if not isinstance(raw_health, dict): raw_health = {} latency_raw = raw_health.get("latency_ms") if latency_raw is None: latency_raw = data.get("latency_ms") last_err = ( str(raw_health.get("last_error") or "").strip() or str(data.get("last_error") or "").strip() ) last_check = ( str(raw_health.get("last_check") or "").strip() or str(data.get("last_check") or "").strip() ) return TransportClientHealthSnapshot( client_id=str(data.get("client_id") or cid).strip(), status=str(data.get("status") or "").strip().lower(), latency_ms=self._to_int(latency_raw), last_error=strip_ansi(last_err), last_check=last_check, ) def transport_client_create( self, *, client_id: str, kind: str, name: str = "", enabled: bool = True, config: Optional[Dict[str, Any]] = None, ) -> CmdResult: cid = str(client_id or "").strip() if not cid: raise ValueError("missing transport client id") k = str(kind or "").strip().lower() if not k: raise ValueError("missing transport client kind") payload: Dict[str, Any] = { "id": cid, "kind": k, "enabled": bool(enabled), } if str(name or "").strip(): payload["name"] = str(name).strip() if config is not None: payload["config"] = cast(Dict[str, Any], config) data = cast( Dict[str, Any], self._json( self._request( "POST", "/api/v1/transport/clients", json_body=payload, ) ) or {}, ) return CmdResult( ok=bool(data.get("ok", False)), message=strip_ansi(str(data.get("message") or "").strip()), exit_code=None, stdout="", stderr="", ) def transport_client_patch( self, client_id: str, *, name: Optional[str] = None, enabled: Optional[bool] = None, config: Optional[Dict[str, Any]] = None, ) -> CmdResult: cid = str(client_id or "").strip() if not cid: raise ValueError("missing transport client id") payload: Dict[str, Any] = {} if name is not None: payload["name"] = str(name).strip() if enabled is not None: payload["enabled"] = bool(enabled) if config is not None: payload["config"] = cast(Dict[str, Any], config) if not payload: raise ValueError("empty patch payload") data = cast( Dict[str, Any], self._json( self._request( "PATCH", f"/api/v1/transport/clients/{cid}", json_body=payload, ) ) or {}, ) return CmdResult( ok=bool(data.get("ok", False)), message=strip_ansi(str(data.get("message") or "").strip()), exit_code=None, stdout="", stderr="", ) def transport_client_action( self, client_id: str, action: TransportClientAction, ) -> TransportClientActionResult: cid = str(client_id or "").strip() if not cid: raise ValueError("missing transport client id") act = str(action or "").strip().lower() if act not in ("provision", "start", "stop", "restart"): raise ValueError(f"invalid transport action: {action}") data = cast( Dict[str, Any], self._json( self._request( "POST", f"/api/v1/transport/clients/{cid}/{act}", ) ) or {}, ) health_raw = data.get("health") or {} if not isinstance(health_raw, dict): health_raw = {} runtime_raw = data.get("runtime") or {} if not isinstance(runtime_raw, dict): runtime_raw = {} runtime_err_raw = runtime_raw.get("last_error") or {} if not isinstance(runtime_err_raw, dict): runtime_err_raw = {} last_error = ( str(health_raw.get("last_error") or "").strip() or str(runtime_err_raw.get("message") or "").strip() or str(data.get("stderr") or "").strip() ) exit_code_val = data.get("exitCode", None) exit_code: Optional[int] try: exit_code = int(exit_code_val) if exit_code_val is not None else None except (TypeError, ValueError): exit_code = None return TransportClientActionResult( ok=bool(data.get("ok", False)), message=strip_ansi(str(data.get("message") or "").strip()), code=str(data.get("code") or "").strip(), client_id=str(data.get("client_id") or cid).strip(), kind=str(data.get("kind") or "").strip().lower(), action=str(data.get("action") or act).strip().lower(), status_before=str(data.get("status_before") or "").strip().lower(), status_after=str(data.get("status_after") or "").strip().lower(), last_error=strip_ansi(last_error), exit_code=exit_code, stdout=strip_ansi(str(data.get("stdout") or "")), stderr=strip_ansi(str(data.get("stderr") or "")), ) def transport_client_delete( self, client_id: str, *, force: bool = False, cleanup: bool = True, ) -> CmdResult: cid = str(client_id or "").strip() if not cid: raise ValueError("missing transport client id") params: Dict[str, Any] = {} if force: params["force"] = "true" if not cleanup: params["cleanup"] = "false" data = cast( Dict[str, Any], self._json( self._request( "DELETE", f"/api/v1/transport/clients/{cid}", params=(params or None), ) ) or {}, ) return CmdResult( ok=bool(data.get("ok", False)), message=strip_ansi(str(data.get("message") or "").strip()), exit_code=None, stdout="", stderr="", ) def transport_netns_toggle( self, *, enabled: Optional[bool] = None, client_ids: Optional[List[str]] = None, provision: bool = True, restart_running: bool = True, ) -> TransportNetnsToggleResult: payload: Dict[str, Any] = { "provision": bool(provision), "restart_running": bool(restart_running), } if enabled is not None: payload["enabled"] = bool(enabled) if client_ids is not None: payload["client_ids"] = [ str(x).strip() for x in (client_ids or []) if str(x).strip() ] data = cast( Dict[str, Any], self._json( self._request( "POST", "/api/v1/transport/netns/toggle", json_body=payload, ) ) or {}, ) raw_items = data.get("items") or [] if not isinstance(raw_items, list): raw_items = [] items: List[TransportNetnsToggleItem] = [] for row in raw_items: if not isinstance(row, dict): continue items.append( TransportNetnsToggleItem( ok=bool(row.get("ok", False)), message=strip_ansi(str(row.get("message") or "").strip()), code=str(row.get("code") or "").strip(), client_id=str(row.get("client_id") or "").strip(), kind=str(row.get("kind") or "").strip().lower(), status_before=str(row.get("status_before") or "").strip().lower(), status_after=str(row.get("status_after") or "").strip().lower(), netns_enabled=bool(row.get("netns_enabled", False)), config_updated=bool(row.get("config_updated", False)), provisioned=bool(row.get("provisioned", False)), restarted=bool(row.get("restarted", False)), stdout=strip_ansi(str(row.get("stdout") or "")), stderr=strip_ansi(str(row.get("stderr") or "")), ) ) return TransportNetnsToggleResult( ok=bool(data.get("ok", False)), message=strip_ansi(str(data.get("message") or "").strip()), code=str(data.get("code") or "").strip(), enabled=bool(data.get("enabled", False)), count=self._to_int(data.get("count")), success_count=self._to_int(data.get("success_count")), failure_count=self._to_int(data.get("failure_count")), items=items, ) def _parse_transport_interface_item(self, row: Any) -> Optional[TransportInterfaceItem]: if not isinstance(row, dict): return None iface_id = str(row.get("id") or "").strip() if not iface_id: return None raw_ids = row.get("client_ids") or [] if not isinstance(raw_ids, list): raw_ids = [] client_ids = [str(x).strip() for x in raw_ids if str(x).strip()] cfg = row.get("config") or {} if not isinstance(cfg, dict): cfg = {} return TransportInterfaceItem( id=iface_id, name=str(row.get("name") or "").strip(), mode=str(row.get("mode") or "").strip().lower(), runtime_iface=str(row.get("runtime_iface") or "").strip(), netns_name=str(row.get("netns_name") or "").strip(), routing_table=str(row.get("routing_table") or "").strip(), client_ids=client_ids, client_count=self._to_int(row.get("client_count")), up_count=self._to_int(row.get("up_count")), updated_at=str(row.get("updated_at") or "").strip(), config=cast(Dict[str, Any], cfg), ) def _parse_transport_client(self, row: Any) -> Optional[TransportClient]: if not isinstance(row, dict): return None cid = str(row.get("id") or "").strip() if not cid: return None raw_health = row.get("health") or {} if not isinstance(raw_health, dict): raw_health = {} raw_caps = row.get("capabilities") or [] if not isinstance(raw_caps, list): raw_caps = [] raw_cfg = row.get("config") or {} if not isinstance(raw_cfg, dict): raw_cfg = {} return TransportClient( id=cid, name=str(row.get("name") or "").strip(), kind=str(row.get("kind") or "").strip().lower(), enabled=bool(row.get("enabled", False)), status=str(row.get("status") or "").strip().lower(), iface=str(row.get("iface") or "").strip(), routing_table=str(row.get("routing_table") or "").strip(), mark_hex=str(row.get("mark_hex") or "").strip(), priority_base=self._to_int(row.get("priority_base")), capabilities=[str(x).strip() for x in raw_caps if str(x).strip()], health=TransportClientHealth( last_check=str(raw_health.get("last_check") or "").strip(), latency_ms=self._to_int(raw_health.get("latency_ms")), last_error=str(raw_health.get("last_error") or "").strip(), ), config=cast(Dict[str, Any], raw_cfg), updated_at=str(row.get("updated_at") or "").strip(), )