423 lines
16 KiB
Python
423 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any, Dict, List, Optional, cast
|
|
|
|
from .models import *
|
|
|
|
|
|
class TransportPolicyApiMixin:
|
|
def transport_policy_get(self) -> TransportPolicy:
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("GET", "/api/v1/transport/policies")) or {},
|
|
)
|
|
raw = data.get("intents") or []
|
|
if not isinstance(raw, list):
|
|
raw = []
|
|
intents: List[TransportPolicyIntent] = []
|
|
for row in raw:
|
|
it = self._parse_transport_intent(row)
|
|
if it is not None:
|
|
intents.append(it)
|
|
return TransportPolicy(
|
|
revision=self._to_int(data.get("policy_revision")),
|
|
intents=intents,
|
|
)
|
|
|
|
def transport_policy_validate(
|
|
self,
|
|
*,
|
|
base_revision: int = 0,
|
|
intents: List[TransportPolicyIntent],
|
|
allow_warnings: bool = True,
|
|
force_override: bool = False,
|
|
) -> TransportPolicyValidateResult:
|
|
payload: Dict[str, Any] = {
|
|
"intents": [self._transport_intent_payload(it) for it in (intents or [])],
|
|
}
|
|
if int(base_revision or 0) > 0:
|
|
payload["base_revision"] = int(base_revision)
|
|
payload["options"] = {
|
|
"allow_warnings": bool(allow_warnings),
|
|
"force_override": bool(force_override),
|
|
}
|
|
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(
|
|
self._request(
|
|
"POST",
|
|
"/api/v1/transport/policies/validate",
|
|
json_body=payload,
|
|
)
|
|
)
|
|
or {},
|
|
)
|
|
|
|
summary_raw = data.get("summary") or {}
|
|
if not isinstance(summary_raw, dict):
|
|
summary_raw = {}
|
|
diff_raw = data.get("diff") or {}
|
|
if not isinstance(diff_raw, dict):
|
|
diff_raw = {}
|
|
|
|
conflicts_raw = data.get("conflicts") or []
|
|
if not isinstance(conflicts_raw, list):
|
|
conflicts_raw = []
|
|
conflicts: List[TransportConflict] = []
|
|
for row in conflicts_raw:
|
|
c = self._parse_transport_conflict(row)
|
|
if c is not None:
|
|
conflicts.append(c)
|
|
|
|
return TransportPolicyValidateResult(
|
|
ok=bool(data.get("ok", False)),
|
|
message=str(data.get("message") or ""),
|
|
code=str(data.get("code") or ""),
|
|
valid=bool(data.get("valid", False)),
|
|
base_revision=self._to_int(data.get("base_revision")),
|
|
confirm_token=str(data.get("confirm_token") or "").strip(),
|
|
summary=TransportPolicyValidateSummary(
|
|
block_count=self._to_int(summary_raw.get("block_count")),
|
|
warn_count=self._to_int(summary_raw.get("warn_count")),
|
|
),
|
|
conflicts=conflicts,
|
|
diff=TransportPolicyDiff(
|
|
added=self._to_int(diff_raw.get("added")),
|
|
changed=self._to_int(diff_raw.get("changed")),
|
|
removed=self._to_int(diff_raw.get("removed")),
|
|
),
|
|
)
|
|
|
|
def transport_policy_apply(
|
|
self,
|
|
*,
|
|
base_revision: int,
|
|
intents: List[TransportPolicyIntent],
|
|
force_override: bool = False,
|
|
confirm_token: str = "",
|
|
) -> TransportPolicyApplyResult:
|
|
payload: Dict[str, Any] = {
|
|
"base_revision": int(base_revision),
|
|
"intents": [self._transport_intent_payload(it) for it in (intents or [])],
|
|
}
|
|
opts: Dict[str, Any] = {}
|
|
if force_override:
|
|
opts["force_override"] = True
|
|
if confirm_token:
|
|
opts["confirm_token"] = str(confirm_token).strip()
|
|
if opts:
|
|
payload["options"] = opts
|
|
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(
|
|
self._request(
|
|
"POST",
|
|
"/api/v1/transport/policies/apply",
|
|
json_body=payload,
|
|
)
|
|
)
|
|
or {},
|
|
)
|
|
return self._parse_transport_policy_apply(data)
|
|
|
|
def transport_policy_rollback(self, *, base_revision: int = 0) -> TransportPolicyApplyResult:
|
|
payload: Dict[str, Any] = {}
|
|
if int(base_revision or 0) > 0:
|
|
payload["base_revision"] = int(base_revision)
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(
|
|
self._request(
|
|
"POST",
|
|
"/api/v1/transport/policies/rollback",
|
|
json_body=payload,
|
|
)
|
|
)
|
|
or {},
|
|
)
|
|
return self._parse_transport_policy_apply(data)
|
|
|
|
def transport_conflicts_get(self) -> TransportConflicts:
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("GET", "/api/v1/transport/conflicts")) or {},
|
|
)
|
|
raw = data.get("items") or []
|
|
if not isinstance(raw, list):
|
|
raw = []
|
|
items: List[TransportConflict] = []
|
|
for row in raw:
|
|
c = self._parse_transport_conflict(row)
|
|
if c is not None:
|
|
items.append(c)
|
|
return TransportConflicts(
|
|
has_blocking=bool(data.get("has_blocking", False)),
|
|
items=items,
|
|
)
|
|
|
|
def transport_capabilities_get(self) -> TransportCapabilities:
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("GET", "/api/v1/transport/capabilities")) or {},
|
|
)
|
|
raw = data.get("clients") or {}
|
|
if not isinstance(raw, dict):
|
|
raw = {}
|
|
clients: Dict[str, Dict[str, bool]] = {}
|
|
for kind, caps_raw in raw.items():
|
|
key = str(kind or "").strip().lower()
|
|
if not key:
|
|
continue
|
|
if not isinstance(caps_raw, dict):
|
|
continue
|
|
caps: Dict[str, bool] = {}
|
|
for cap_name, cap_value in caps_raw.items():
|
|
cname = str(cap_name or "").strip().lower()
|
|
if not cname:
|
|
continue
|
|
caps[cname] = bool(cap_value)
|
|
clients[key] = caps
|
|
return TransportCapabilities(clients=clients)
|
|
|
|
def transport_ownership_get(self) -> TransportOwnershipSnapshot:
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("GET", "/api/v1/transport/owners")) or {},
|
|
)
|
|
raw = data.get("items") or []
|
|
if not isinstance(raw, list):
|
|
raw = []
|
|
items: List[TransportOwnershipRecord] = []
|
|
for row in raw:
|
|
rec = self._parse_transport_ownership_record(row)
|
|
if rec is not None:
|
|
items.append(rec)
|
|
return TransportOwnershipSnapshot(
|
|
ok=bool(data.get("ok", False)),
|
|
message=str(data.get("message") or "").strip(),
|
|
code=str(data.get("code") or "").strip(),
|
|
policy_revision=self._to_int(data.get("policy_revision")),
|
|
plan_digest=str(data.get("plan_digest") or "").strip(),
|
|
count=self._to_int(data.get("count")),
|
|
lock_count=self._to_int(data.get("lock_count")),
|
|
items=items,
|
|
)
|
|
|
|
def transport_owner_locks_get(self) -> TransportOwnerLocksSnapshot:
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(self._request("GET", "/api/v1/transport/owner-locks")) or {},
|
|
)
|
|
raw = data.get("items") or []
|
|
if not isinstance(raw, list):
|
|
raw = []
|
|
items: List[TransportOwnerLockRecord] = []
|
|
for row in raw:
|
|
rec = self._parse_transport_owner_lock_record(row)
|
|
if rec is not None:
|
|
items.append(rec)
|
|
return TransportOwnerLocksSnapshot(
|
|
ok=bool(data.get("ok", False)),
|
|
message=str(data.get("message") or "").strip(),
|
|
code=str(data.get("code") or "").strip(),
|
|
policy_revision=self._to_int(data.get("policy_revision")),
|
|
count=self._to_int(data.get("count")),
|
|
items=items,
|
|
)
|
|
|
|
def transport_owner_locks_clear(
|
|
self,
|
|
*,
|
|
base_revision: int = 0,
|
|
client_id: str = "",
|
|
destination_ip: str = "",
|
|
destination_ips: Optional[List[str]] = None,
|
|
confirm_token: str = "",
|
|
) -> TransportOwnerLocksClearResult:
|
|
payload: Dict[str, Any] = {}
|
|
if int(base_revision or 0) > 0:
|
|
payload["base_revision"] = int(base_revision)
|
|
cid = str(client_id or "").strip()
|
|
if cid:
|
|
payload["client_id"] = cid
|
|
dst = str(destination_ip or "").strip()
|
|
if dst:
|
|
payload["destination_ip"] = dst
|
|
ips: List[str] = []
|
|
for raw in list(destination_ips or []):
|
|
value = str(raw or "").strip()
|
|
if not value:
|
|
continue
|
|
ips.append(value)
|
|
if ips:
|
|
payload["destination_ips"] = ips
|
|
token = str(confirm_token or "").strip()
|
|
if token:
|
|
payload["confirm_token"] = token
|
|
|
|
data = cast(
|
|
Dict[str, Any],
|
|
self._json(
|
|
self._request(
|
|
"POST",
|
|
"/api/v1/transport/owner-locks/clear",
|
|
json_body=payload,
|
|
)
|
|
)
|
|
or {},
|
|
)
|
|
|
|
raw = data.get("items") or []
|
|
if not isinstance(raw, list):
|
|
raw = []
|
|
items: List[TransportOwnerLockRecord] = []
|
|
for row in raw:
|
|
rec = self._parse_transport_owner_lock_record(row)
|
|
if rec is not None:
|
|
items.append(rec)
|
|
|
|
return TransportOwnerLocksClearResult(
|
|
ok=bool(data.get("ok", False)),
|
|
message=str(data.get("message") or "").strip(),
|
|
code=str(data.get("code") or "").strip(),
|
|
base_revision=self._to_int(data.get("base_revision")),
|
|
confirm_required=bool(data.get("confirm_required", False)),
|
|
confirm_token=str(data.get("confirm_token") or "").strip(),
|
|
match_count=self._to_int(data.get("match_count")),
|
|
cleared_count=self._to_int(data.get("cleared_count")),
|
|
remaining_count=self._to_int(data.get("remaining_count")),
|
|
items=items,
|
|
)
|
|
|
|
def _parse_transport_intent(self, row: Any) -> Optional[TransportPolicyIntent]:
|
|
if not isinstance(row, dict):
|
|
return None
|
|
selector_type = str(row.get("selector_type") or "").strip().lower()
|
|
selector_value = str(row.get("selector_value") or "").strip()
|
|
client_id = str(row.get("client_id") or "").strip()
|
|
if not selector_type or not selector_value or not client_id:
|
|
return None
|
|
mode = str(row.get("mode") or "strict").strip().lower() or "strict"
|
|
if mode not in ("strict", "fallback"):
|
|
mode = "strict"
|
|
priority = self._to_int(row.get("priority"), default=100)
|
|
if priority <= 0:
|
|
priority = 100
|
|
return TransportPolicyIntent(
|
|
selector_type=selector_type,
|
|
selector_value=selector_value,
|
|
client_id=client_id,
|
|
priority=priority,
|
|
mode=mode,
|
|
)
|
|
|
|
def _transport_intent_payload(self, intent: TransportPolicyIntent) -> Dict[str, Any]:
|
|
if isinstance(intent, dict):
|
|
src = cast(Dict[str, Any], intent)
|
|
priority = self._to_int(src.get("priority"), default=100)
|
|
mode = str(src.get("mode") or "strict").strip().lower()
|
|
selector_type = str(src.get("selector_type") or "").strip().lower()
|
|
selector_value = str(src.get("selector_value") or "").strip()
|
|
client_id = str(src.get("client_id") or "").strip()
|
|
else:
|
|
priority = int(getattr(intent, "priority", 100) or 100)
|
|
mode = str(getattr(intent, "mode", "strict") or "strict").strip().lower()
|
|
selector_type = str(getattr(intent, "selector_type", "") or "").strip().lower()
|
|
selector_value = str(getattr(intent, "selector_value", "") or "").strip()
|
|
client_id = str(getattr(intent, "client_id", "") or "").strip()
|
|
if mode not in ("strict", "fallback"):
|
|
mode = "strict"
|
|
payload: Dict[str, Any] = {
|
|
"selector_type": selector_type,
|
|
"selector_value": selector_value,
|
|
"client_id": client_id,
|
|
"priority": priority,
|
|
"mode": mode,
|
|
}
|
|
return payload
|
|
|
|
def _parse_transport_conflict(self, row: Any) -> Optional[TransportConflict]:
|
|
if not isinstance(row, dict):
|
|
return None
|
|
key = str(row.get("key") or "").strip()
|
|
if not key:
|
|
return None
|
|
raw_owners = row.get("owners") or []
|
|
if not isinstance(raw_owners, list):
|
|
raw_owners = []
|
|
owners = [str(x).strip() for x in raw_owners if str(x).strip()]
|
|
return TransportConflict(
|
|
key=key,
|
|
type=str(row.get("type") or "").strip().lower(),
|
|
severity=str(row.get("severity") or "warn").strip().lower(),
|
|
owners=owners,
|
|
reason=str(row.get("reason") or "").strip(),
|
|
suggested_resolution=str(row.get("suggested_resolution") or "").strip(),
|
|
)
|
|
|
|
def _parse_transport_ownership_record(self, row: Any) -> Optional[TransportOwnershipRecord]:
|
|
if not isinstance(row, dict):
|
|
return None
|
|
key = str(row.get("key") or "").strip()
|
|
selector_type = str(row.get("selector_type") or "").strip().lower()
|
|
selector_value = str(row.get("selector_value") or "").strip()
|
|
client_id = str(row.get("client_id") or "").strip()
|
|
if not key or not selector_type or not selector_value or not client_id:
|
|
return None
|
|
return TransportOwnershipRecord(
|
|
key=key,
|
|
selector_type=selector_type,
|
|
selector_value=selector_value,
|
|
client_id=client_id,
|
|
client_kind=str(row.get("client_kind") or "").strip().lower(),
|
|
owner_scope=str(row.get("owner_scope") or "").strip(),
|
|
owner_status=str(row.get("owner_status") or "").strip().lower(),
|
|
lock_active=bool(row.get("lock_active", False)),
|
|
iface_id=str(row.get("iface_id") or "").strip(),
|
|
routing_table=str(row.get("routing_table") or "").strip(),
|
|
mark_hex=str(row.get("mark_hex") or "").strip(),
|
|
priority_base=self._to_int(row.get("priority_base")),
|
|
mode=str(row.get("mode") or "").strip().lower(),
|
|
priority=self._to_int(row.get("priority")),
|
|
updated_at=str(row.get("updated_at") or "").strip(),
|
|
)
|
|
|
|
def _parse_transport_owner_lock_record(self, row: Any) -> Optional[TransportOwnerLockRecord]:
|
|
if not isinstance(row, dict):
|
|
return None
|
|
destination_ip = str(row.get("destination_ip") or "").strip()
|
|
client_id = str(row.get("client_id") or "").strip()
|
|
if not destination_ip or not client_id:
|
|
return None
|
|
return TransportOwnerLockRecord(
|
|
destination_ip=destination_ip,
|
|
client_id=client_id,
|
|
client_kind=str(row.get("client_kind") or "").strip().lower(),
|
|
iface_id=str(row.get("iface_id") or "").strip(),
|
|
mark_hex=str(row.get("mark_hex") or "").strip(),
|
|
proto=str(row.get("proto") or "").strip().lower(),
|
|
updated_at=str(row.get("updated_at") or "").strip(),
|
|
)
|
|
|
|
def _parse_transport_policy_apply(self, data: Dict[str, Any]) -> TransportPolicyApplyResult:
|
|
raw = data.get("conflicts") or []
|
|
if not isinstance(raw, list):
|
|
raw = []
|
|
conflicts: List[TransportConflict] = []
|
|
for row in raw:
|
|
c = self._parse_transport_conflict(row)
|
|
if c is not None:
|
|
conflicts.append(c)
|
|
return TransportPolicyApplyResult(
|
|
ok=bool(data.get("ok", False)),
|
|
message=str(data.get("message") or "").strip(),
|
|
code=str(data.get("code") or "").strip(),
|
|
policy_revision=self._to_int(data.get("policy_revision")),
|
|
current_revision=self._to_int(data.get("current_revision")),
|
|
apply_id=str(data.get("apply_id") or "").strip(),
|
|
rollback_available=bool(data.get("rollback_available", False)),
|
|
conflicts=conflicts,
|
|
)
|