#!/usr/bin/env python3 from __future__ import annotations import json import os import sys import time from typing import Dict, Optional, Tuple import urllib.error import urllib.request REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) GUI_DIR = os.path.join(REPO_ROOT, "selective-vpn-gui") if GUI_DIR not in sys.path: sys.path.insert(0, GUI_DIR) from api_client import ApiClient, ApiError from dashboard_controller import DashboardController def fail(msg: str) -> int: print(f"[transport] ERROR: {msg}") return 1 def request_json(api_url: str, method: str, path: str, payload: Optional[Dict] = None) -> Tuple[int, Dict]: data = None headers = {"Accept": "application/json"} if payload is not None: data = json.dumps(payload).encode("utf-8") headers["Content-Type"] = "application/json" req = urllib.request.Request( f"{api_url.rstrip('/')}{path}", data=data, method=method.upper(), headers=headers, ) try: with urllib.request.urlopen(req, timeout=15.0) as resp: raw = resp.read().decode("utf-8", errors="replace") status = int(resp.getcode() or 200) except urllib.error.HTTPError as e: raw = e.read().decode("utf-8", errors="replace") status = int(e.code or 500) except Exception: return 0, {} try: data_json = json.loads(raw) if raw else {} except Exception: data_json = {} if not isinstance(data_json, dict): data_json = {} return status, data_json def main() -> int: api_url = os.environ.get("API_URL", "http://127.0.0.1:8080").strip() if not api_url: return fail("empty API_URL") print(f"[transport] API_URL={api_url}") client = ApiClient(api_url) ctrl = DashboardController(client) try: caps = ctrl.transport_capabilities() except ApiError as e: if int(getattr(e, "status_code", 0) or 0) == 404: print("[transport] SKIP: running backend has no /api/v1/transport/* endpoints") return 0 return fail(str(e)) if not caps.clients: return fail("empty transport capabilities") print(f"[transport] capabilities: {', '.join(sorted(caps.clients.keys()))}") status, caps_raw = request_json(api_url, "GET", "/api/v1/transport/capabilities") if status == 200 and bool(caps_raw.get("ok", False)): clients_raw = caps_raw.get("clients") or {} dnstt_caps = clients_raw.get("dnstt") if isinstance(clients_raw, dict) else {} if isinstance(dnstt_caps, dict): if bool(dnstt_caps.get("ssh_tunnel", False)): print("[transport] dnstt ssh_tunnel capability detected") else: print("[transport] WARN: dnstt ssh_tunnel capability is not advertised") runtime_modes_raw = caps_raw.get("runtime_modes") or {} if isinstance(runtime_modes_raw, dict): if bool(runtime_modes_raw.get("exec", False)): print("[transport] runtime_mode exec supported") else: print("[transport] WARN: runtime_mode exec is not advertised") if "embedded" in runtime_modes_raw or "sidecar" in runtime_modes_raw: print( "[transport] runtime_modes map: " + ", ".join(f"{k}={v}" for k, v in sorted(runtime_modes_raw.items())) ) packaging_profiles = caps_raw.get("packaging_profiles") or {} if isinstance(packaging_profiles, dict): if bool(packaging_profiles.get("system", False)): print("[transport] packaging profile system supported") else: print("[transport] WARN: packaging profile system is not advertised") if isinstance(caps_raw.get("error_codes"), list): print(f"[transport] capabilities error_codes={len(caps_raw.get('error_codes') or [])}") # D4.1 contract smoke: lifecycle + health + metrics + unified runtime/error fields. client_id = f"smoke-{int(time.time())}-{os.getpid()}" status, create_data = request_json( api_url, "POST", "/api/v1/transport/clients", { "id": client_id, "name": "Smoke Transport", "kind": "singbox", "enabled": False, }, ) if status != 200 or not bool(create_data.get("ok", False)): return fail(f"create client failed status={status} payload={create_data}") status, provision_data = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_id}/provision") if status == 404: print("[transport] WARN: provision endpoint not available on current backend build") elif status != 200: return fail(f"provision failed status={status} payload={provision_data}") elif not bool(provision_data.get("ok", False)): print(f"[transport] WARN: provision returned ok=false payload={provision_data}") else: print("[transport] provision action ok") status, start_data = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_id}/start") if status != 200 or not bool(start_data.get("ok", False)): return fail(f"start failed status={status} payload={start_data}") status_after = str(start_data.get("status_after") or "").strip().lower() if status_after and status_after != "up": return fail(f"start did not set status_up: {start_data}") status, health_data = request_json(api_url, "GET", f"/api/v1/transport/clients/{client_id}/health") if status != 200 or not bool(health_data.get("ok", False)): return fail(f"health failed status={status} payload={health_data}") if status_after == "" and str(health_data.get("status") or "").strip().lower() not in ("up", "degraded"): return fail(f"health status is not up/degraded after start: {health_data}") health_client_id = str(health_data.get("client_id") or "").strip() if health_client_id and health_client_id != client_id: return fail(f"health client_id mismatch: {health_data}") runtime = health_data.get("runtime") or {} if isinstance(runtime, dict) and isinstance(runtime.get("metrics"), dict): print("[transport] health runtime.metrics found") else: print("[transport] WARN: legacy health payload without runtime.metrics") status, metrics_data = request_json(api_url, "GET", f"/api/v1/transport/clients/{client_id}/metrics") if status == 404: print("[transport] WARN: metrics endpoint not available on current backend build") else: if status != 200 or not bool(metrics_data.get("ok", False)): return fail(f"metrics failed status={status} payload={metrics_data}") metrics = metrics_data.get("metrics") or {} if not isinstance(metrics, dict) or "state_changes" not in metrics: return fail(f"metrics payload missing state_changes: {metrics_data}") status, stop_data = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_id}/stop") if status != 200 or not bool(stop_data.get("ok", False)): return fail(f"stop failed status={status} payload={stop_data}") status, _ = request_json(api_url, "DELETE", f"/api/v1/transport/clients/{client_id}?force=true") if status != 200: return fail(f"cleanup delete failed status={status}") print("[transport] backend-contract smoke: lifecycle/health/metrics ok") pol = ctrl.transport_policy() print(f"[transport] current revision={pol.revision} intents={len(pol.intents)}") flow = ctrl.transport_flow_draft(pol.intents, base_revision=pol.revision) flow = ctrl.transport_flow_validate(flow) print( f"[transport] validate phase={flow.phase} valid={flow.valid} " f"blocks={flow.block_count} warns={flow.warn_count}" ) if flow.phase == "risky": flow = ctrl.transport_flow_confirm(flow) flow = ctrl.transport_flow_apply(flow, force_override=True) else: flow = ctrl.transport_flow_apply(flow, force_override=False) if flow.phase != "applied": return fail(f"apply phase={flow.phase} code={flow.code} message={flow.message}") print(f"[transport] apply ok revision={flow.applied_revision} apply_id={flow.apply_id}") flow = ctrl.transport_flow_rollback(flow) if flow.phase != "applied": return fail(f"rollback phase={flow.phase} code={flow.code} message={flow.message}") print(f"[transport] rollback ok revision={flow.applied_revision} apply_id={flow.apply_id}") conflicts = ctrl.transport_conflicts() print( f"[transport] conflicts: count={len(conflicts.items)} has_blocking={conflicts.has_blocking}" ) print("[transport] flow smoke passed") return 0 if __name__ == "__main__": raise SystemExit(main())