from __future__ import annotations from typing import Any, Dict, List, Optional, cast from .models import * class TransportPolicyApiMixin: def transport_policy_get(self) -> TransportPolicy: data = cast( Dict[str, Any], self._json(self._request("GET", "/api/v1/transport/policies")) or {}, ) raw = data.get("intents") or [] if not isinstance(raw, list): raw = [] intents: List[TransportPolicyIntent] = [] for row in raw: it = self._parse_transport_intent(row) if it is not None: intents.append(it) return TransportPolicy( revision=self._to_int(data.get("policy_revision")), intents=intents, ) def transport_policy_validate( self, *, base_revision: int = 0, intents: List[TransportPolicyIntent], allow_warnings: bool = True, force_override: bool = False, ) -> TransportPolicyValidateResult: payload: Dict[str, Any] = { "intents": [self._transport_intent_payload(it) for it in (intents or [])], } if int(base_revision or 0) > 0: payload["base_revision"] = int(base_revision) payload["options"] = { "allow_warnings": bool(allow_warnings), "force_override": bool(force_override), } data = cast( Dict[str, Any], self._json( self._request( "POST", "/api/v1/transport/policies/validate", json_body=payload, ) ) or {}, ) summary_raw = data.get("summary") or {} if not isinstance(summary_raw, dict): summary_raw = {} diff_raw = data.get("diff") or {} if not isinstance(diff_raw, dict): diff_raw = {} conflicts_raw = data.get("conflicts") or [] if not isinstance(conflicts_raw, list): conflicts_raw = [] conflicts: List[TransportConflict] = [] for row in conflicts_raw: c = self._parse_transport_conflict(row) if c is not None: conflicts.append(c) return TransportPolicyValidateResult( ok=bool(data.get("ok", False)), message=str(data.get("message") or ""), code=str(data.get("code") or ""), valid=bool(data.get("valid", False)), base_revision=self._to_int(data.get("base_revision")), confirm_token=str(data.get("confirm_token") or "").strip(), summary=TransportPolicyValidateSummary( block_count=self._to_int(summary_raw.get("block_count")), warn_count=self._to_int(summary_raw.get("warn_count")), ), conflicts=conflicts, diff=TransportPolicyDiff( added=self._to_int(diff_raw.get("added")), changed=self._to_int(diff_raw.get("changed")), removed=self._to_int(diff_raw.get("removed")), ), ) def transport_policy_apply( self, *, base_revision: int, intents: List[TransportPolicyIntent], force_override: bool = False, confirm_token: str = "", ) -> TransportPolicyApplyResult: payload: Dict[str, Any] = { "base_revision": int(base_revision), "intents": [self._transport_intent_payload(it) for it in (intents or [])], } opts: Dict[str, Any] = {} if force_override: opts["force_override"] = True if confirm_token: opts["confirm_token"] = str(confirm_token).strip() if opts: payload["options"] = opts data = cast( Dict[str, Any], self._json( self._request( "POST", "/api/v1/transport/policies/apply", json_body=payload, ) ) or {}, ) return self._parse_transport_policy_apply(data) def transport_policy_rollback(self, *, base_revision: int = 0) -> TransportPolicyApplyResult: payload: Dict[str, Any] = {} if int(base_revision or 0) > 0: payload["base_revision"] = int(base_revision) data = cast( Dict[str, Any], self._json( self._request( "POST", "/api/v1/transport/policies/rollback", json_body=payload, ) ) or {}, ) return self._parse_transport_policy_apply(data) def transport_conflicts_get(self) -> TransportConflicts: data = cast( Dict[str, Any], self._json(self._request("GET", "/api/v1/transport/conflicts")) or {}, ) raw = data.get("items") or [] if not isinstance(raw, list): raw = [] items: List[TransportConflict] = [] for row in raw: c = self._parse_transport_conflict(row) if c is not None: items.append(c) return TransportConflicts( has_blocking=bool(data.get("has_blocking", False)), items=items, ) def transport_capabilities_get(self) -> TransportCapabilities: data = cast( Dict[str, Any], self._json(self._request("GET", "/api/v1/transport/capabilities")) or {}, ) raw = data.get("clients") or {} if not isinstance(raw, dict): raw = {} clients: Dict[str, Dict[str, bool]] = {} for kind, caps_raw in raw.items(): key = str(kind or "").strip().lower() if not key: continue if not isinstance(caps_raw, dict): continue caps: Dict[str, bool] = {} for cap_name, cap_value in caps_raw.items(): cname = str(cap_name or "").strip().lower() if not cname: continue caps[cname] = bool(cap_value) clients[key] = caps return TransportCapabilities(clients=clients) def transport_ownership_get(self) -> TransportOwnershipSnapshot: data = cast( Dict[str, Any], self._json(self._request("GET", "/api/v1/transport/owners")) or {}, ) raw = data.get("items") or [] if not isinstance(raw, list): raw = [] items: List[TransportOwnershipRecord] = [] for row in raw: rec = self._parse_transport_ownership_record(row) if rec is not None: items.append(rec) return TransportOwnershipSnapshot( ok=bool(data.get("ok", False)), message=str(data.get("message") or "").strip(), code=str(data.get("code") or "").strip(), policy_revision=self._to_int(data.get("policy_revision")), plan_digest=str(data.get("plan_digest") or "").strip(), count=self._to_int(data.get("count")), lock_count=self._to_int(data.get("lock_count")), items=items, ) def transport_owner_locks_get(self) -> TransportOwnerLocksSnapshot: data = cast( Dict[str, Any], self._json(self._request("GET", "/api/v1/transport/owner-locks")) or {}, ) raw = data.get("items") or [] if not isinstance(raw, list): raw = [] items: List[TransportOwnerLockRecord] = [] for row in raw: rec = self._parse_transport_owner_lock_record(row) if rec is not None: items.append(rec) return TransportOwnerLocksSnapshot( ok=bool(data.get("ok", False)), message=str(data.get("message") or "").strip(), code=str(data.get("code") or "").strip(), policy_revision=self._to_int(data.get("policy_revision")), count=self._to_int(data.get("count")), items=items, ) def transport_owner_locks_clear( self, *, base_revision: int = 0, client_id: str = "", destination_ip: str = "", destination_ips: Optional[List[str]] = None, confirm_token: str = "", ) -> TransportOwnerLocksClearResult: payload: Dict[str, Any] = {} if int(base_revision or 0) > 0: payload["base_revision"] = int(base_revision) cid = str(client_id or "").strip() if cid: payload["client_id"] = cid dst = str(destination_ip or "").strip() if dst: payload["destination_ip"] = dst ips: List[str] = [] for raw in list(destination_ips or []): value = str(raw or "").strip() if not value: continue ips.append(value) if ips: payload["destination_ips"] = ips token = str(confirm_token or "").strip() if token: payload["confirm_token"] = token data = cast( Dict[str, Any], self._json( self._request( "POST", "/api/v1/transport/owner-locks/clear", json_body=payload, ) ) or {}, ) raw = data.get("items") or [] if not isinstance(raw, list): raw = [] items: List[TransportOwnerLockRecord] = [] for row in raw: rec = self._parse_transport_owner_lock_record(row) if rec is not None: items.append(rec) return TransportOwnerLocksClearResult( ok=bool(data.get("ok", False)), message=str(data.get("message") or "").strip(), code=str(data.get("code") or "").strip(), base_revision=self._to_int(data.get("base_revision")), confirm_required=bool(data.get("confirm_required", False)), confirm_token=str(data.get("confirm_token") or "").strip(), match_count=self._to_int(data.get("match_count")), cleared_count=self._to_int(data.get("cleared_count")), remaining_count=self._to_int(data.get("remaining_count")), items=items, ) def _parse_transport_intent(self, row: Any) -> Optional[TransportPolicyIntent]: if not isinstance(row, dict): return None selector_type = str(row.get("selector_type") or "").strip().lower() selector_value = str(row.get("selector_value") or "").strip() client_id = str(row.get("client_id") or "").strip() if not selector_type or not selector_value or not client_id: return None mode = str(row.get("mode") or "strict").strip().lower() or "strict" if mode not in ("strict", "fallback"): mode = "strict" priority = self._to_int(row.get("priority"), default=100) if priority <= 0: priority = 100 return TransportPolicyIntent( selector_type=selector_type, selector_value=selector_value, client_id=client_id, priority=priority, mode=mode, ) def _transport_intent_payload(self, intent: TransportPolicyIntent) -> Dict[str, Any]: if isinstance(intent, dict): src = cast(Dict[str, Any], intent) priority = self._to_int(src.get("priority"), default=100) mode = str(src.get("mode") or "strict").strip().lower() selector_type = str(src.get("selector_type") or "").strip().lower() selector_value = str(src.get("selector_value") or "").strip() client_id = str(src.get("client_id") or "").strip() else: priority = int(getattr(intent, "priority", 100) or 100) mode = str(getattr(intent, "mode", "strict") or "strict").strip().lower() selector_type = str(getattr(intent, "selector_type", "") or "").strip().lower() selector_value = str(getattr(intent, "selector_value", "") or "").strip() client_id = str(getattr(intent, "client_id", "") or "").strip() if mode not in ("strict", "fallback"): mode = "strict" payload: Dict[str, Any] = { "selector_type": selector_type, "selector_value": selector_value, "client_id": client_id, "priority": priority, "mode": mode, } return payload def _parse_transport_conflict(self, row: Any) -> Optional[TransportConflict]: if not isinstance(row, dict): return None key = str(row.get("key") or "").strip() if not key: return None raw_owners = row.get("owners") or [] if not isinstance(raw_owners, list): raw_owners = [] owners = [str(x).strip() for x in raw_owners if str(x).strip()] return TransportConflict( key=key, type=str(row.get("type") or "").strip().lower(), severity=str(row.get("severity") or "warn").strip().lower(), owners=owners, reason=str(row.get("reason") or "").strip(), suggested_resolution=str(row.get("suggested_resolution") or "").strip(), ) def _parse_transport_ownership_record(self, row: Any) -> Optional[TransportOwnershipRecord]: if not isinstance(row, dict): return None key = str(row.get("key") or "").strip() selector_type = str(row.get("selector_type") or "").strip().lower() selector_value = str(row.get("selector_value") or "").strip() client_id = str(row.get("client_id") or "").strip() if not key or not selector_type or not selector_value or not client_id: return None return TransportOwnershipRecord( key=key, selector_type=selector_type, selector_value=selector_value, client_id=client_id, client_kind=str(row.get("client_kind") or "").strip().lower(), owner_scope=str(row.get("owner_scope") or "").strip(), owner_status=str(row.get("owner_status") or "").strip().lower(), lock_active=bool(row.get("lock_active", False)), iface_id=str(row.get("iface_id") or "").strip(), routing_table=str(row.get("routing_table") or "").strip(), mark_hex=str(row.get("mark_hex") or "").strip(), priority_base=self._to_int(row.get("priority_base")), mode=str(row.get("mode") or "").strip().lower(), priority=self._to_int(row.get("priority")), updated_at=str(row.get("updated_at") or "").strip(), ) def _parse_transport_owner_lock_record(self, row: Any) -> Optional[TransportOwnerLockRecord]: if not isinstance(row, dict): return None destination_ip = str(row.get("destination_ip") or "").strip() client_id = str(row.get("client_id") or "").strip() if not destination_ip or not client_id: return None return TransportOwnerLockRecord( destination_ip=destination_ip, client_id=client_id, client_kind=str(row.get("client_kind") or "").strip().lower(), iface_id=str(row.get("iface_id") or "").strip(), mark_hex=str(row.get("mark_hex") or "").strip(), proto=str(row.get("proto") or "").strip().lower(), updated_at=str(row.get("updated_at") or "").strip(), ) def _parse_transport_policy_apply(self, data: Dict[str, Any]) -> TransportPolicyApplyResult: raw = data.get("conflicts") or [] if not isinstance(raw, list): raw = [] conflicts: List[TransportConflict] = [] for row in raw: c = self._parse_transport_conflict(row) if c is not None: conflicts.append(c) return TransportPolicyApplyResult( ok=bool(data.get("ok", False)), message=str(data.get("message") or "").strip(), code=str(data.get("code") or "").strip(), policy_revision=self._to_int(data.get("policy_revision")), current_revision=self._to_int(data.get("current_revision")), apply_id=str(data.get("apply_id") or "").strip(), rollback_available=bool(data.get("rollback_available", False)), conflicts=conflicts, )