#!/usr/bin/env python3 from __future__ import annotations import json import os import tempfile import time from pathlib import Path from typing import Dict, List, Optional, Tuple import urllib.error import urllib.request SINGBOX_INSTANCE_DROPIN = "10-selective-vpn.conf" def fail(msg: str) -> int: print(f"[transport_production_like_e2e] ERROR: {msg}") return 1 def request_json(api_url: str, method: str, path: str, payload: Optional[Dict] = None) -> Tuple[int, Dict]: data = None headers = {"Accept": "application/json"} if payload is not None: data = json.dumps(payload).encode("utf-8") headers["Content-Type"] = "application/json" req = urllib.request.Request( f"{api_url.rstrip('/')}{path}", data=data, method=method.upper(), headers=headers, ) try: with urllib.request.urlopen(req, timeout=30.0) as resp: raw = resp.read().decode("utf-8", errors="replace") status = int(resp.getcode() or 200) except urllib.error.HTTPError as e: raw = e.read().decode("utf-8", errors="replace") status = int(e.code or 500) except Exception: return 0, {} try: data_json = json.loads(raw) if raw else {} except Exception: data_json = {} if not isinstance(data_json, dict): data_json = {} return status, data_json def ensure_client_deleted(api_url: str, client_id: str) -> None: request_json(api_url, "DELETE", f"/api/v1/transport/clients/{client_id}?force=true") def is_systemd_unavailable(resp: Dict) -> bool: text = ( str(resp.get("message") or "") + " " + str(resp.get("stderr") or "") + " " + str(resp.get("stdout") or "") ).lower() checks = ( "not been booted with systemd", "failed to connect to bus", "systemctl daemon-reload failed", "operation not permitted", ) return any(c in text for c in checks) def write_fake_binary(path: Path) -> None: body = "#!/usr/bin/env bash\nexec /usr/bin/sleep 120\n" path.write_text(body, encoding="utf-8") path.chmod(0o755) def unit_file_path(unit: str) -> Path: return Path("/etc/systemd/system") / unit def unit_dropin_path(unit: str, file_name: str = SINGBOX_INSTANCE_DROPIN) -> Path: return Path("/etc/systemd/system") / f"{unit}.d" / file_name def read_managed_unit_text(unit: str) -> str: unit_path = unit_file_path(unit) dropin_path = unit_dropin_path(unit) chunks: List[str] = [] if unit_path.exists(): chunks.append(unit_path.read_text(encoding="utf-8", errors="replace")) if dropin_path.exists(): chunks.append(dropin_path.read_text(encoding="utf-8", errors="replace")) if not chunks: raise AssertionError(f"unit artifacts are missing: {unit_path} {dropin_path}") return "\n".join(chunks) def assert_unit_contains(unit: str, expected_parts: List[str]) -> None: text = read_managed_unit_text(unit) for part in expected_parts: if part not in text: raise AssertionError(f"unit {unit} missing {part!r}") def assert_unit_removed(unit: str, client_id: str) -> None: marker = f"Environment=SVPN_TRANSPORT_ID={client_id}" unit_path = unit_file_path(unit) dropin_path = unit_dropin_path(unit) if dropin_path.exists(): raise AssertionError(f"drop-in file still exists after cleanup: {dropin_path}") if unit_path.exists(): text = unit_path.read_text(encoding="utf-8", errors="replace") if marker in text: raise AssertionError(f"owned unit still exists after cleanup: {unit_path}") def assert_file_exists(path: str) -> None: p = Path(path) if not p.exists(): raise AssertionError(f"expected file missing: {p}") def run_case( api_url: str, *, client_id: str, kind: str, cfg: Dict, units: List[str], expected_unit_parts: List[str], template_units: Optional[List[str]] = None, ) -> Tuple[bool, str]: ensure_client_deleted(api_url, client_id) status, created = request_json( api_url, "POST", "/api/v1/transport/clients", { "id": client_id, "name": f"E2E ProductionLike {kind}", "kind": kind, "enabled": False, "config": cfg, }, ) if status != 200 or not bool(created.get("ok", False)): raise AssertionError(f"create failed status={status} payload={created}") try: status, provision = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_id}/provision") if status == 404: return False, "provision endpoint is not available on current backend build" if status != 200: raise AssertionError(f"provision failed status={status} payload={provision}") if not bool(provision.get("ok", False)): if is_systemd_unavailable(provision): return False, f"systemd is unavailable: {provision}" raise AssertionError(f"provision returned ok=false payload={provision}") for unit in units: assert_unit_contains(unit, [f"Environment=SVPN_TRANSPORT_ID={client_id}"]) assert_unit_contains(units[0], expected_unit_parts) for t_unit in (template_units or []): assert_file_exists(str(unit_file_path(t_unit))) status, started = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_id}/start") if status != 200 or not bool(started.get("ok", False)): raise AssertionError(f"start failed status={status} payload={started}") if str(started.get("status_after") or "").strip().lower() != "up": raise AssertionError(f"start did not set status_after=up payload={started}") status, health = request_json(api_url, "GET", f"/api/v1/transport/clients/{client_id}/health") if status != 200 or not bool(health.get("ok", False)): raise AssertionError(f"health failed status={status} payload={health}") status, metrics = request_json(api_url, "GET", f"/api/v1/transport/clients/{client_id}/metrics") if status != 200 or not bool(metrics.get("ok", False)): raise AssertionError(f"metrics failed status={status} payload={metrics}") m = metrics.get("metrics") or {} if int(m.get("state_changes", 0) or 0) < 1: raise AssertionError(f"state_changes must be >=1 payload={metrics}") status, restarted = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_id}/restart") if status != 200 or not bool(restarted.get("ok", False)): raise AssertionError(f"restart failed status={status} payload={restarted}") status, stopped = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_id}/stop") if status != 200 or not bool(stopped.get("ok", False)): raise AssertionError(f"stop failed status={status} payload={stopped}") finally: status, deleted = request_json(api_url, "DELETE", f"/api/v1/transport/clients/{client_id}?force=true") if status != 200 or not bool(deleted.get("ok", False)): raise AssertionError(f"delete failed status={status} payload={deleted}") for unit in units: assert_unit_removed(unit, client_id) for t_unit in (template_units or []): assert_file_exists(str(unit_file_path(t_unit))) return True, "ok" def main() -> int: api_url = os.environ.get("API_URL", "http://127.0.0.1:8080").strip() if not api_url: return fail("empty API_URL") print(f"[transport_production_like_e2e] API_URL={api_url}") status, caps = request_json(api_url, "GET", "/api/v1/transport/capabilities") if status == 404: print("[transport_production_like_e2e] SKIP: transport endpoints are not available on this backend") return 0 if status != 200 or not bool(caps.get("ok", False)): return fail(f"capabilities failed status={status} payload={caps}") runtime_modes = caps.get("runtime_modes") or {} if isinstance(runtime_modes, dict) and runtime_modes: if not bool(runtime_modes.get("exec", False)): return fail(f"runtime_modes.exec is not supported: {caps}") packaging_profiles = caps.get("packaging_profiles") or {} if isinstance(packaging_profiles, dict) and packaging_profiles: if not bool(packaging_profiles.get("bundled", False)): return fail(f"packaging_profiles.bundled is not supported: {caps}") if not bool(packaging_profiles.get("system", False)): return fail(f"packaging_profiles.system is not supported: {caps}") ts = int(time.time()) pid = os.getpid() tag = f"{ts}-{pid}" with tempfile.TemporaryDirectory(prefix="svpn-prodlike-") as tmp: root = Path(tmp) bin_root = root / "bin" bin_root.mkdir(parents=True, exist_ok=True) singbox_bin = bin_root / "sing-box" phoenix_bin = bin_root / "phoenix-client" dnstt_bin = bin_root / "dnstt-client" write_fake_binary(singbox_bin) write_fake_binary(phoenix_bin) write_fake_binary(dnstt_bin) singbox_cfg = root / "singbox.json" phoenix_cfg = root / "phoenix.toml" singbox_cfg.write_text("{}", encoding="utf-8") phoenix_cfg.write_text("{}", encoding="utf-8") phoenix_unit = f"svpn-prodlike-phoenix-{tag}.service" dnstt_unit = f"svpn-prodlike-dnstt-{tag}.service" dnstt_ssh_unit = f"svpn-prodlike-dnstt-ssh-{tag}.service" singbox_client_id = f"e2e-prodlike-singbox-{tag}" singbox_unit = f"singbox@{singbox_client_id}.service" cases = [ { "client_id": singbox_client_id, "kind": "singbox", "cfg": { "runner": "systemd", "runtime_mode": "exec", "packaging_profile": "bundled", "bin_root": str(bin_root), "packaging_system_fallback": False, "require_binary": True, "singbox_config_path": str(singbox_cfg), "hardening_enabled": False, }, "units": [singbox_unit], "template_units": ["singbox@.service"], "expected": [str(singbox_bin), "run", str(singbox_cfg)], }, { "client_id": f"e2e-prodlike-phoenix-{tag}", "kind": "phoenix", "cfg": { "runner": "systemd", "runtime_mode": "exec", "unit": phoenix_unit, "packaging_profile": "bundled", "bin_root": str(bin_root), "packaging_system_fallback": False, "require_binary": True, "phoenix_config_path": str(phoenix_cfg), "hardening_enabled": False, }, "units": [phoenix_unit], "expected": [str(phoenix_bin), "-config", str(phoenix_cfg)], }, { "client_id": f"e2e-prodlike-dnstt-{tag}", "kind": "dnstt", "cfg": { "runner": "systemd", "runtime_mode": "exec", "unit": dnstt_unit, "packaging_profile": "bundled", "bin_root": str(bin_root), "packaging_system_fallback": False, "require_binary": True, "resolver_mode": "doh", "doh_url": "https://dns.google/dns-query", "pubkey": "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef", "domain": "tunnel.example.com", "local_addr": "127.0.0.1:7005", "ssh_tunnel": True, "ssh_unit": dnstt_ssh_unit, "ssh_exec_start": "/usr/bin/sleep 120", "hardening_enabled": False, "ssh_hardening_enabled": False, }, "units": [dnstt_unit, dnstt_ssh_unit], "expected": [str(dnstt_bin), "-doh", "dns.google", "tunnel.example.com", "127.0.0.1:7005"], }, ] for case in cases: try: ok, reason = run_case( api_url, client_id=case["client_id"], kind=case["kind"], cfg=case["cfg"], units=case["units"], expected_unit_parts=case["expected"], template_units=case.get("template_units"), ) except AssertionError as e: return fail(f"{case['kind']} failed: {e}") if not ok: print(f"[transport_production_like_e2e] SKIP: {reason}") return 0 print(f"[transport_production_like_e2e] {case['kind']} production-like lifecycle: ok") print("[transport_production_like_e2e] passed") return 0 if __name__ == "__main__": raise SystemExit(main())