Files
elmprodvpn/selective-vpn-gui/api/transport_policy.py

423 lines
16 KiB
Python

from __future__ import annotations
from typing import Any, Dict, List, Optional, cast
from .models import *
class TransportPolicyApiMixin:
def transport_policy_get(self) -> TransportPolicy:
data = cast(
Dict[str, Any],
self._json(self._request("GET", "/api/v1/transport/policies")) or {},
)
raw = data.get("intents") or []
if not isinstance(raw, list):
raw = []
intents: List[TransportPolicyIntent] = []
for row in raw:
it = self._parse_transport_intent(row)
if it is not None:
intents.append(it)
return TransportPolicy(
revision=self._to_int(data.get("policy_revision")),
intents=intents,
)
def transport_policy_validate(
self,
*,
base_revision: int = 0,
intents: List[TransportPolicyIntent],
allow_warnings: bool = True,
force_override: bool = False,
) -> TransportPolicyValidateResult:
payload: Dict[str, Any] = {
"intents": [self._transport_intent_payload(it) for it in (intents or [])],
}
if int(base_revision or 0) > 0:
payload["base_revision"] = int(base_revision)
payload["options"] = {
"allow_warnings": bool(allow_warnings),
"force_override": bool(force_override),
}
data = cast(
Dict[str, Any],
self._json(
self._request(
"POST",
"/api/v1/transport/policies/validate",
json_body=payload,
)
)
or {},
)
summary_raw = data.get("summary") or {}
if not isinstance(summary_raw, dict):
summary_raw = {}
diff_raw = data.get("diff") or {}
if not isinstance(diff_raw, dict):
diff_raw = {}
conflicts_raw = data.get("conflicts") or []
if not isinstance(conflicts_raw, list):
conflicts_raw = []
conflicts: List[TransportConflict] = []
for row in conflicts_raw:
c = self._parse_transport_conflict(row)
if c is not None:
conflicts.append(c)
return TransportPolicyValidateResult(
ok=bool(data.get("ok", False)),
message=str(data.get("message") or ""),
code=str(data.get("code") or ""),
valid=bool(data.get("valid", False)),
base_revision=self._to_int(data.get("base_revision")),
confirm_token=str(data.get("confirm_token") or "").strip(),
summary=TransportPolicyValidateSummary(
block_count=self._to_int(summary_raw.get("block_count")),
warn_count=self._to_int(summary_raw.get("warn_count")),
),
conflicts=conflicts,
diff=TransportPolicyDiff(
added=self._to_int(diff_raw.get("added")),
changed=self._to_int(diff_raw.get("changed")),
removed=self._to_int(diff_raw.get("removed")),
),
)
def transport_policy_apply(
self,
*,
base_revision: int,
intents: List[TransportPolicyIntent],
force_override: bool = False,
confirm_token: str = "",
) -> TransportPolicyApplyResult:
payload: Dict[str, Any] = {
"base_revision": int(base_revision),
"intents": [self._transport_intent_payload(it) for it in (intents or [])],
}
opts: Dict[str, Any] = {}
if force_override:
opts["force_override"] = True
if confirm_token:
opts["confirm_token"] = str(confirm_token).strip()
if opts:
payload["options"] = opts
data = cast(
Dict[str, Any],
self._json(
self._request(
"POST",
"/api/v1/transport/policies/apply",
json_body=payload,
)
)
or {},
)
return self._parse_transport_policy_apply(data)
def transport_policy_rollback(self, *, base_revision: int = 0) -> TransportPolicyApplyResult:
payload: Dict[str, Any] = {}
if int(base_revision or 0) > 0:
payload["base_revision"] = int(base_revision)
data = cast(
Dict[str, Any],
self._json(
self._request(
"POST",
"/api/v1/transport/policies/rollback",
json_body=payload,
)
)
or {},
)
return self._parse_transport_policy_apply(data)
def transport_conflicts_get(self) -> TransportConflicts:
data = cast(
Dict[str, Any],
self._json(self._request("GET", "/api/v1/transport/conflicts")) or {},
)
raw = data.get("items") or []
if not isinstance(raw, list):
raw = []
items: List[TransportConflict] = []
for row in raw:
c = self._parse_transport_conflict(row)
if c is not None:
items.append(c)
return TransportConflicts(
has_blocking=bool(data.get("has_blocking", False)),
items=items,
)
def transport_capabilities_get(self) -> TransportCapabilities:
data = cast(
Dict[str, Any],
self._json(self._request("GET", "/api/v1/transport/capabilities")) or {},
)
raw = data.get("clients") or {}
if not isinstance(raw, dict):
raw = {}
clients: Dict[str, Dict[str, bool]] = {}
for kind, caps_raw in raw.items():
key = str(kind or "").strip().lower()
if not key:
continue
if not isinstance(caps_raw, dict):
continue
caps: Dict[str, bool] = {}
for cap_name, cap_value in caps_raw.items():
cname = str(cap_name or "").strip().lower()
if not cname:
continue
caps[cname] = bool(cap_value)
clients[key] = caps
return TransportCapabilities(clients=clients)
def transport_ownership_get(self) -> TransportOwnershipSnapshot:
data = cast(
Dict[str, Any],
self._json(self._request("GET", "/api/v1/transport/owners")) or {},
)
raw = data.get("items") or []
if not isinstance(raw, list):
raw = []
items: List[TransportOwnershipRecord] = []
for row in raw:
rec = self._parse_transport_ownership_record(row)
if rec is not None:
items.append(rec)
return TransportOwnershipSnapshot(
ok=bool(data.get("ok", False)),
message=str(data.get("message") or "").strip(),
code=str(data.get("code") or "").strip(),
policy_revision=self._to_int(data.get("policy_revision")),
plan_digest=str(data.get("plan_digest") or "").strip(),
count=self._to_int(data.get("count")),
lock_count=self._to_int(data.get("lock_count")),
items=items,
)
def transport_owner_locks_get(self) -> TransportOwnerLocksSnapshot:
data = cast(
Dict[str, Any],
self._json(self._request("GET", "/api/v1/transport/owner-locks")) or {},
)
raw = data.get("items") or []
if not isinstance(raw, list):
raw = []
items: List[TransportOwnerLockRecord] = []
for row in raw:
rec = self._parse_transport_owner_lock_record(row)
if rec is not None:
items.append(rec)
return TransportOwnerLocksSnapshot(
ok=bool(data.get("ok", False)),
message=str(data.get("message") or "").strip(),
code=str(data.get("code") or "").strip(),
policy_revision=self._to_int(data.get("policy_revision")),
count=self._to_int(data.get("count")),
items=items,
)
def transport_owner_locks_clear(
self,
*,
base_revision: int = 0,
client_id: str = "",
destination_ip: str = "",
destination_ips: Optional[List[str]] = None,
confirm_token: str = "",
) -> TransportOwnerLocksClearResult:
payload: Dict[str, Any] = {}
if int(base_revision or 0) > 0:
payload["base_revision"] = int(base_revision)
cid = str(client_id or "").strip()
if cid:
payload["client_id"] = cid
dst = str(destination_ip or "").strip()
if dst:
payload["destination_ip"] = dst
ips: List[str] = []
for raw in list(destination_ips or []):
value = str(raw or "").strip()
if not value:
continue
ips.append(value)
if ips:
payload["destination_ips"] = ips
token = str(confirm_token or "").strip()
if token:
payload["confirm_token"] = token
data = cast(
Dict[str, Any],
self._json(
self._request(
"POST",
"/api/v1/transport/owner-locks/clear",
json_body=payload,
)
)
or {},
)
raw = data.get("items") or []
if not isinstance(raw, list):
raw = []
items: List[TransportOwnerLockRecord] = []
for row in raw:
rec = self._parse_transport_owner_lock_record(row)
if rec is not None:
items.append(rec)
return TransportOwnerLocksClearResult(
ok=bool(data.get("ok", False)),
message=str(data.get("message") or "").strip(),
code=str(data.get("code") or "").strip(),
base_revision=self._to_int(data.get("base_revision")),
confirm_required=bool(data.get("confirm_required", False)),
confirm_token=str(data.get("confirm_token") or "").strip(),
match_count=self._to_int(data.get("match_count")),
cleared_count=self._to_int(data.get("cleared_count")),
remaining_count=self._to_int(data.get("remaining_count")),
items=items,
)
def _parse_transport_intent(self, row: Any) -> Optional[TransportPolicyIntent]:
if not isinstance(row, dict):
return None
selector_type = str(row.get("selector_type") or "").strip().lower()
selector_value = str(row.get("selector_value") or "").strip()
client_id = str(row.get("client_id") or "").strip()
if not selector_type or not selector_value or not client_id:
return None
mode = str(row.get("mode") or "strict").strip().lower() or "strict"
if mode not in ("strict", "fallback"):
mode = "strict"
priority = self._to_int(row.get("priority"), default=100)
if priority <= 0:
priority = 100
return TransportPolicyIntent(
selector_type=selector_type,
selector_value=selector_value,
client_id=client_id,
priority=priority,
mode=mode,
)
def _transport_intent_payload(self, intent: TransportPolicyIntent) -> Dict[str, Any]:
if isinstance(intent, dict):
src = cast(Dict[str, Any], intent)
priority = self._to_int(src.get("priority"), default=100)
mode = str(src.get("mode") or "strict").strip().lower()
selector_type = str(src.get("selector_type") or "").strip().lower()
selector_value = str(src.get("selector_value") or "").strip()
client_id = str(src.get("client_id") or "").strip()
else:
priority = int(getattr(intent, "priority", 100) or 100)
mode = str(getattr(intent, "mode", "strict") or "strict").strip().lower()
selector_type = str(getattr(intent, "selector_type", "") or "").strip().lower()
selector_value = str(getattr(intent, "selector_value", "") or "").strip()
client_id = str(getattr(intent, "client_id", "") or "").strip()
if mode not in ("strict", "fallback"):
mode = "strict"
payload: Dict[str, Any] = {
"selector_type": selector_type,
"selector_value": selector_value,
"client_id": client_id,
"priority": priority,
"mode": mode,
}
return payload
def _parse_transport_conflict(self, row: Any) -> Optional[TransportConflict]:
if not isinstance(row, dict):
return None
key = str(row.get("key") or "").strip()
if not key:
return None
raw_owners = row.get("owners") or []
if not isinstance(raw_owners, list):
raw_owners = []
owners = [str(x).strip() for x in raw_owners if str(x).strip()]
return TransportConflict(
key=key,
type=str(row.get("type") or "").strip().lower(),
severity=str(row.get("severity") or "warn").strip().lower(),
owners=owners,
reason=str(row.get("reason") or "").strip(),
suggested_resolution=str(row.get("suggested_resolution") or "").strip(),
)
def _parse_transport_ownership_record(self, row: Any) -> Optional[TransportOwnershipRecord]:
if not isinstance(row, dict):
return None
key = str(row.get("key") or "").strip()
selector_type = str(row.get("selector_type") or "").strip().lower()
selector_value = str(row.get("selector_value") or "").strip()
client_id = str(row.get("client_id") or "").strip()
if not key or not selector_type or not selector_value or not client_id:
return None
return TransportOwnershipRecord(
key=key,
selector_type=selector_type,
selector_value=selector_value,
client_id=client_id,
client_kind=str(row.get("client_kind") or "").strip().lower(),
owner_scope=str(row.get("owner_scope") or "").strip(),
owner_status=str(row.get("owner_status") or "").strip().lower(),
lock_active=bool(row.get("lock_active", False)),
iface_id=str(row.get("iface_id") or "").strip(),
routing_table=str(row.get("routing_table") or "").strip(),
mark_hex=str(row.get("mark_hex") or "").strip(),
priority_base=self._to_int(row.get("priority_base")),
mode=str(row.get("mode") or "").strip().lower(),
priority=self._to_int(row.get("priority")),
updated_at=str(row.get("updated_at") or "").strip(),
)
def _parse_transport_owner_lock_record(self, row: Any) -> Optional[TransportOwnerLockRecord]:
if not isinstance(row, dict):
return None
destination_ip = str(row.get("destination_ip") or "").strip()
client_id = str(row.get("client_id") or "").strip()
if not destination_ip or not client_id:
return None
return TransportOwnerLockRecord(
destination_ip=destination_ip,
client_id=client_id,
client_kind=str(row.get("client_kind") or "").strip().lower(),
iface_id=str(row.get("iface_id") or "").strip(),
mark_hex=str(row.get("mark_hex") or "").strip(),
proto=str(row.get("proto") or "").strip().lower(),
updated_at=str(row.get("updated_at") or "").strip(),
)
def _parse_transport_policy_apply(self, data: Dict[str, Any]) -> TransportPolicyApplyResult:
raw = data.get("conflicts") or []
if not isinstance(raw, list):
raw = []
conflicts: List[TransportConflict] = []
for row in raw:
c = self._parse_transport_conflict(row)
if c is not None:
conflicts.append(c)
return TransportPolicyApplyResult(
ok=bool(data.get("ok", False)),
message=str(data.get("message") or "").strip(),
code=str(data.get("code") or "").strip(),
policy_revision=self._to_int(data.get("policy_revision")),
current_revision=self._to_int(data.get("current_revision")),
apply_id=str(data.get("apply_id") or "").strip(),
rollback_available=bool(data.get("rollback_available", False)),
conflicts=conflicts,
)