#!/usr/bin/env python3 from __future__ import annotations import json import os import time from typing import Dict, Optional, Tuple import urllib.error import urllib.request def fail(msg: str) -> int: print(f"[transport_dnstt_e2e] ERROR: {msg}") return 1 def request_json(api_url: str, method: str, path: str, payload: Optional[Dict] = None) -> Tuple[int, Dict]: data = None headers = {"Accept": "application/json"} if payload is not None: data = json.dumps(payload).encode("utf-8") headers["Content-Type"] = "application/json" req = urllib.request.Request( f"{api_url.rstrip('/')}{path}", data=data, method=method.upper(), headers=headers, ) try: with urllib.request.urlopen(req, timeout=20.0) as resp: raw = resp.read().decode("utf-8", errors="replace") status = int(resp.getcode() or 200) except urllib.error.HTTPError as e: raw = e.read().decode("utf-8", errors="replace") status = int(e.code or 500) except Exception: return 0, {} try: data_json = json.loads(raw) if raw else {} except Exception: data_json = {} if not isinstance(data_json, dict): data_json = {} return status, data_json def ensure_client_deleted(api_url: str, client_id: str) -> None: request_json(api_url, "DELETE", f"/api/v1/transport/clients/{client_id}?force=true") def create_client(api_url: str, payload: Dict) -> Tuple[int, Dict]: return request_json(api_url, "POST", "/api/v1/transport/clients", payload) def main() -> int: api_url = os.environ.get("API_URL", "http://127.0.0.1:8080").strip() if not api_url: return fail("empty API_URL") print(f"[transport_dnstt_e2e] API_URL={api_url}") status, caps = request_json(api_url, "GET", "/api/v1/transport/capabilities") if status == 404: print("[transport_dnstt_e2e] SKIP: transport endpoints are not available on this backend") return 0 if status != 200 or not bool(caps.get("ok", False)): return fail(f"capabilities failed status={status} payload={caps}") clients_caps = caps.get("clients") or {} if not isinstance(clients_caps, dict) or "dnstt" not in clients_caps: return fail(f"dnstt capability is missing: {caps}") runtime_modes = caps.get("runtime_modes") or {} if isinstance(runtime_modes, dict) and runtime_modes: if not bool(runtime_modes.get("exec", False)): return fail(f"runtime_modes.exec is not supported: {caps}") else: print("[transport_dnstt_e2e] WARN: runtime_modes are not advertised by current backend build") ts = int(time.time()) pid = os.getpid() # Case 1: successful lifecycle on mock runner. client_ok = f"e2e-dnstt-ok-{ts}-{pid}" ensure_client_deleted(api_url, client_ok) status, create_ok = create_client( api_url, { "id": client_ok, "name": "E2E DNSTT Mock", "kind": "dnstt", "enabled": False, "config": { "runner": "mock", "runtime_mode": "exec", }, }, ) if status != 200 or not bool(create_ok.get("ok", False)): return fail(f"create mock dnstt failed status={status} payload={create_ok}") try: status, provision = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_ok}/provision") if status == 404: print("[transport_dnstt_e2e] SKIP: provision endpoint is not available on current backend build") return 0 if status != 200 or not bool(provision.get("ok", False)): return fail(f"provision mock dnstt failed status={status} payload={provision}") status, start = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_ok}/start") if status != 200 or not bool(start.get("ok", False)): return fail(f"start mock dnstt failed status={status} payload={start}") if str(start.get("status_after") or "").strip().lower() != "up": return fail(f"start did not set status_after=up: {start}") status, health = request_json(api_url, "GET", f"/api/v1/transport/clients/{client_ok}/health") if status != 200 or not bool(health.get("ok", False)): return fail(f"health mock dnstt failed status={status} payload={health}") if str(health.get("status") or "").strip().lower() not in ("up", "degraded"): return fail(f"unexpected health status after start: {health}") status, restart = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_ok}/restart") if status != 200 or not bool(restart.get("ok", False)): return fail(f"restart mock dnstt failed status={status} payload={restart}") status, stop = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_ok}/stop") if status != 200 or not bool(stop.get("ok", False)): return fail(f"stop mock dnstt failed status={status} payload={stop}") if str(stop.get("status_after") or "").strip().lower() != "down": return fail(f"stop did not set status_after=down: {stop}") status, metrics = request_json(api_url, "GET", f"/api/v1/transport/clients/{client_ok}/metrics") if status == 404: print("[transport_dnstt_e2e] WARN: metrics endpoint is not available on current backend build") else: if status != 200 or not bool(metrics.get("ok", False)): return fail(f"metrics mock dnstt failed status={status} payload={metrics}") metrics_obj = metrics.get("metrics") or {} if not isinstance(metrics_obj, dict): return fail(f"metrics payload is invalid: {metrics}") if int(metrics_obj.get("state_changes", 0) or 0) < 2: return fail(f"state_changes must be >=2 after lifecycle sequence: {metrics}") print("[transport_dnstt_e2e] case1 mock lifecycle: ok") finally: ensure_client_deleted(api_url, client_ok) # Case 2: ssh overlay requires ssh_host. client_ssh = f"e2e-dnstt-ssh-host-{ts}-{pid}" ensure_client_deleted(api_url, client_ssh) status, create_ssh = create_client( api_url, { "id": client_ssh, "name": "E2E DNSTT SSH Overlay", "kind": "dnstt", "enabled": False, "config": { "runner": "systemd", "runtime_mode": "exec", "unit": f"{client_ssh}.service", "exec_start": "/usr/bin/true", "ssh_tunnel": True, }, }, ) if status != 200 or not bool(create_ssh.get("ok", False)): return fail(f"create dnstt ssh overlay failed status={status} payload={create_ssh}") try: status, provision_ssh = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_ssh}/provision") if status != 200 or bool(provision_ssh.get("ok", True)): return fail(f"dnstt ssh overlay provision must fail status={status} payload={provision_ssh}") if str(provision_ssh.get("code") or "").strip() != "TRANSPORT_BACKEND_PROVISION_CONFIG_REQUIRED": return fail(f"dnstt ssh overlay wrong code: {provision_ssh}") msg = str(provision_ssh.get("message") or "").strip().lower() if "ssh_host" not in msg: return fail(f"dnstt ssh overlay message must reference ssh_host: {provision_ssh}") print("[transport_dnstt_e2e] case2 ssh overlay host guard: ok") finally: ensure_client_deleted(api_url, client_ssh) # Case 3: ssh overlay requires valid ssh_unit when provided. client_ssh_unit = f"e2e-dnstt-ssh-unit-{ts}-{pid}" ensure_client_deleted(api_url, client_ssh_unit) status, create_ssh_unit = create_client( api_url, { "id": client_ssh_unit, "name": "E2E DNSTT SSH Unit Validation", "kind": "dnstt", "enabled": False, "config": { "runner": "systemd", "runtime_mode": "exec", "unit": f"{client_ssh_unit}.service", "exec_start": "/usr/bin/true", "ssh_tunnel": True, "ssh_host": "127.0.0.1", "ssh_unit": "invalid unit name", }, }, ) if status != 200 or not bool(create_ssh_unit.get("ok", False)): return fail(f"create dnstt ssh unit validation client failed status={status} payload={create_ssh_unit}") try: status, provision_ssh_unit = request_json( api_url, "POST", f"/api/v1/transport/clients/{client_ssh_unit}/provision" ) if status != 200 or bool(provision_ssh_unit.get("ok", True)): return fail(f"dnstt ssh unit validation provision must fail status={status} payload={provision_ssh_unit}") if str(provision_ssh_unit.get("code") or "").strip() != "TRANSPORT_BACKEND_PROVISION_CONFIG_REQUIRED": return fail(f"dnstt ssh unit validation wrong code: {provision_ssh_unit}") msg = str(provision_ssh_unit.get("message") or "").strip().lower() if "ssh_unit" not in msg: return fail(f"dnstt ssh unit validation message must reference ssh_unit: {provision_ssh_unit}") print("[transport_dnstt_e2e] case3 ssh overlay unit guard: ok") finally: ensure_client_deleted(api_url, client_ssh_unit) # Case 4: dnstt template validation must reject incomplete config. client_tpl = f"e2e-dnstt-template-{ts}-{pid}" ensure_client_deleted(api_url, client_tpl) status, create_tpl = create_client( api_url, { "id": client_tpl, "name": "E2E DNSTT Template Validation", "kind": "dnstt", "enabled": False, "config": { "runner": "systemd", "runtime_mode": "exec", "unit": f"{client_tpl}.service", # no exec_start, missing template fields on purpose. }, }, ) if status != 200 or not bool(create_tpl.get("ok", False)): return fail(f"create dnstt template client failed status={status} payload={create_tpl}") try: status, provision_tpl = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_tpl}/provision") if status != 200 or bool(provision_tpl.get("ok", True)): return fail(f"dnstt template provision must fail status={status} payload={provision_tpl}") if str(provision_tpl.get("code") or "").strip() != "TRANSPORT_BACKEND_PROVISION_CONFIG_REQUIRED": return fail(f"dnstt template wrong code: {provision_tpl}") msg = str(provision_tpl.get("message") or "").strip().lower() if "dnstt template requires" not in msg: return fail(f"dnstt template message must mention required fields: {provision_tpl}") print("[transport_dnstt_e2e] case4 template validation guard: ok") finally: ensure_client_deleted(api_url, client_tpl) print("[transport_dnstt_e2e] passed") return 0 if __name__ == "__main__": raise SystemExit(main())