Files
elmprodvpn/tests/transport_flow_smoke.py

207 lines
8.6 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import json
import os
import sys
import time
from typing import Dict, Optional, Tuple
import urllib.error
import urllib.request
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
GUI_DIR = os.path.join(REPO_ROOT, "selective-vpn-gui")
if GUI_DIR not in sys.path:
sys.path.insert(0, GUI_DIR)
from api_client import ApiClient, ApiError
from dashboard_controller import DashboardController
def fail(msg: str) -> int:
print(f"[transport] ERROR: {msg}")
return 1
def request_json(api_url: str, method: str, path: str, payload: Optional[Dict] = None) -> Tuple[int, Dict]:
data = None
headers = {"Accept": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(
f"{api_url.rstrip('/')}{path}",
data=data,
method=method.upper(),
headers=headers,
)
try:
with urllib.request.urlopen(req, timeout=15.0) as resp:
raw = resp.read().decode("utf-8", errors="replace")
status = int(resp.getcode() or 200)
except urllib.error.HTTPError as e:
raw = e.read().decode("utf-8", errors="replace")
status = int(e.code or 500)
except Exception:
return 0, {}
try:
data_json = json.loads(raw) if raw else {}
except Exception:
data_json = {}
if not isinstance(data_json, dict):
data_json = {}
return status, data_json
def main() -> int:
api_url = os.environ.get("API_URL", "http://127.0.0.1:8080").strip()
if not api_url:
return fail("empty API_URL")
print(f"[transport] API_URL={api_url}")
client = ApiClient(api_url)
ctrl = DashboardController(client)
try:
caps = ctrl.transport_capabilities()
except ApiError as e:
if int(getattr(e, "status_code", 0) or 0) == 404:
print("[transport] SKIP: running backend has no /api/v1/transport/* endpoints")
return 0
return fail(str(e))
if not caps.clients:
return fail("empty transport capabilities")
print(f"[transport] capabilities: {', '.join(sorted(caps.clients.keys()))}")
status, caps_raw = request_json(api_url, "GET", "/api/v1/transport/capabilities")
if status == 200 and bool(caps_raw.get("ok", False)):
clients_raw = caps_raw.get("clients") or {}
dnstt_caps = clients_raw.get("dnstt") if isinstance(clients_raw, dict) else {}
if isinstance(dnstt_caps, dict):
if bool(dnstt_caps.get("ssh_tunnel", False)):
print("[transport] dnstt ssh_tunnel capability detected")
else:
print("[transport] WARN: dnstt ssh_tunnel capability is not advertised")
runtime_modes_raw = caps_raw.get("runtime_modes") or {}
if isinstance(runtime_modes_raw, dict):
if bool(runtime_modes_raw.get("exec", False)):
print("[transport] runtime_mode exec supported")
else:
print("[transport] WARN: runtime_mode exec is not advertised")
if "embedded" in runtime_modes_raw or "sidecar" in runtime_modes_raw:
print(
"[transport] runtime_modes map: "
+ ", ".join(f"{k}={v}" for k, v in sorted(runtime_modes_raw.items()))
)
packaging_profiles = caps_raw.get("packaging_profiles") or {}
if isinstance(packaging_profiles, dict):
if bool(packaging_profiles.get("system", False)):
print("[transport] packaging profile system supported")
else:
print("[transport] WARN: packaging profile system is not advertised")
if isinstance(caps_raw.get("error_codes"), list):
print(f"[transport] capabilities error_codes={len(caps_raw.get('error_codes') or [])}")
# D4.1 contract smoke: lifecycle + health + metrics + unified runtime/error fields.
client_id = f"smoke-{int(time.time())}-{os.getpid()}"
status, create_data = request_json(
api_url,
"POST",
"/api/v1/transport/clients",
{
"id": client_id,
"name": "Smoke Transport",
"kind": "singbox",
"enabled": False,
},
)
if status != 200 or not bool(create_data.get("ok", False)):
return fail(f"create client failed status={status} payload={create_data}")
status, provision_data = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_id}/provision")
if status == 404:
print("[transport] WARN: provision endpoint not available on current backend build")
elif status != 200:
return fail(f"provision failed status={status} payload={provision_data}")
elif not bool(provision_data.get("ok", False)):
print(f"[transport] WARN: provision returned ok=false payload={provision_data}")
else:
print("[transport] provision action ok")
status, start_data = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_id}/start")
if status != 200 or not bool(start_data.get("ok", False)):
return fail(f"start failed status={status} payload={start_data}")
status_after = str(start_data.get("status_after") or "").strip().lower()
if status_after and status_after != "up":
return fail(f"start did not set status_up: {start_data}")
status, health_data = request_json(api_url, "GET", f"/api/v1/transport/clients/{client_id}/health")
if status != 200 or not bool(health_data.get("ok", False)):
return fail(f"health failed status={status} payload={health_data}")
if status_after == "" and str(health_data.get("status") or "").strip().lower() not in ("up", "degraded"):
return fail(f"health status is not up/degraded after start: {health_data}")
health_client_id = str(health_data.get("client_id") or "").strip()
if health_client_id and health_client_id != client_id:
return fail(f"health client_id mismatch: {health_data}")
runtime = health_data.get("runtime") or {}
if isinstance(runtime, dict) and isinstance(runtime.get("metrics"), dict):
print("[transport] health runtime.metrics found")
else:
print("[transport] WARN: legacy health payload without runtime.metrics")
status, metrics_data = request_json(api_url, "GET", f"/api/v1/transport/clients/{client_id}/metrics")
if status == 404:
print("[transport] WARN: metrics endpoint not available on current backend build")
else:
if status != 200 or not bool(metrics_data.get("ok", False)):
return fail(f"metrics failed status={status} payload={metrics_data}")
metrics = metrics_data.get("metrics") or {}
if not isinstance(metrics, dict) or "state_changes" not in metrics:
return fail(f"metrics payload missing state_changes: {metrics_data}")
status, stop_data = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_id}/stop")
if status != 200 or not bool(stop_data.get("ok", False)):
return fail(f"stop failed status={status} payload={stop_data}")
status, _ = request_json(api_url, "DELETE", f"/api/v1/transport/clients/{client_id}?force=true")
if status != 200:
return fail(f"cleanup delete failed status={status}")
print("[transport] backend-contract smoke: lifecycle/health/metrics ok")
pol = ctrl.transport_policy()
print(f"[transport] current revision={pol.revision} intents={len(pol.intents)}")
flow = ctrl.transport_flow_draft(pol.intents, base_revision=pol.revision)
flow = ctrl.transport_flow_validate(flow)
print(
f"[transport] validate phase={flow.phase} valid={flow.valid} "
f"blocks={flow.block_count} warns={flow.warn_count}"
)
if flow.phase == "risky":
flow = ctrl.transport_flow_confirm(flow)
flow = ctrl.transport_flow_apply(flow, force_override=True)
else:
flow = ctrl.transport_flow_apply(flow, force_override=False)
if flow.phase != "applied":
return fail(f"apply phase={flow.phase} code={flow.code} message={flow.message}")
print(f"[transport] apply ok revision={flow.applied_revision} apply_id={flow.apply_id}")
flow = ctrl.transport_flow_rollback(flow)
if flow.phase != "applied":
return fail(f"rollback phase={flow.phase} code={flow.code} message={flow.message}")
print(f"[transport] rollback ok revision={flow.applied_revision} apply_id={flow.apply_id}")
conflicts = ctrl.transport_conflicts()
print(
f"[transport] conflicts: count={len(conflicts.items)} has_blocking={conflicts.has_blocking}"
)
print("[transport] flow smoke passed")
return 0
if __name__ == "__main__":
raise SystemExit(main())