Files
elmprodvpn/tests/transport_systemd_real_e2e.py

297 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import json
import os
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import urllib.error
import urllib.request
SINGBOX_INSTANCE_DROPIN = "10-selective-vpn.conf"
def fail(msg: str) -> int:
print(f"[transport_systemd_real_e2e] ERROR: {msg}")
return 1
def request_json(api_url: str, method: str, path: str, payload: Optional[Dict] = None) -> Tuple[int, Dict]:
data = None
headers = {"Accept": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(
f"{api_url.rstrip('/')}{path}",
data=data,
method=method.upper(),
headers=headers,
)
try:
with urllib.request.urlopen(req, timeout=30.0) as resp:
raw = resp.read().decode("utf-8", errors="replace")
status = int(resp.getcode() or 200)
except urllib.error.HTTPError as e:
raw = e.read().decode("utf-8", errors="replace")
status = int(e.code or 500)
except Exception:
return 0, {}
try:
data_json = json.loads(raw) if raw else {}
except Exception:
data_json = {}
if not isinstance(data_json, dict):
data_json = {}
return status, data_json
def ensure_client_deleted(api_url: str, client_id: str) -> None:
request_json(api_url, "DELETE", f"/api/v1/transport/clients/{client_id}?force=true")
def is_systemd_unavailable(resp: Dict) -> bool:
text = (
str(resp.get("message") or "")
+ " "
+ str(resp.get("stderr") or "")
+ " "
+ str(resp.get("stdout") or "")
).lower()
checks = (
"not been booted with systemd",
"failed to connect to bus",
"systemctl daemon-reload failed",
"operation not permitted",
)
return any(c in text for c in checks)
def unit_file_path(unit: str) -> Path:
return Path("/etc/systemd/system") / unit
def unit_dropin_path(unit: str, file_name: str = SINGBOX_INSTANCE_DROPIN) -> Path:
return Path("/etc/systemd/system") / f"{unit}.d" / file_name
def assert_unit_owned(unit: str, client_id: str) -> None:
marker = f"Environment=SVPN_TRANSPORT_ID={client_id}"
unit_path = unit_file_path(unit)
if unit_path.exists():
body = unit_path.read_text(encoding="utf-8", errors="replace")
if marker in body:
return
dropin_path = unit_dropin_path(unit)
if dropin_path.exists():
body = dropin_path.read_text(encoding="utf-8", errors="replace")
if marker in body:
return
if unit_path.exists() and not dropin_path.exists():
raise AssertionError(f"ownership marker {marker} not found in unit: {unit_path}")
raise AssertionError(f"unit artifacts are missing for ownership check: {unit_path} {dropin_path}")
def assert_unit_removed(unit: str, client_id: str) -> None:
marker = f"Environment=SVPN_TRANSPORT_ID={client_id}"
unit_path = unit_file_path(unit)
dropin_path = unit_dropin_path(unit)
if dropin_path.exists():
raise AssertionError(f"drop-in file still exists after cleanup: {dropin_path}")
if unit_path.exists():
body = unit_path.read_text(encoding="utf-8", errors="replace")
if marker in body:
raise AssertionError(f"owned unit file still exists after cleanup: {unit_path}")
def assert_file_exists(path: str) -> None:
p = Path(path)
if not p.exists():
raise AssertionError(f"expected file missing: {p}")
def create_client(api_url: str, payload: Dict) -> Tuple[int, Dict]:
return request_json(api_url, "POST", "/api/v1/transport/clients", payload)
def run_case(
api_url: str,
*,
client_id: str,
name: str,
kind: str,
cfg: Dict,
units: List[str],
template_units: Optional[List[str]] = None,
) -> Tuple[bool, str]:
ensure_client_deleted(api_url, client_id)
status, created = create_client(
api_url,
{
"id": client_id,
"name": name,
"kind": kind,
"enabled": False,
"config": cfg,
},
)
if status != 200 or not bool(created.get("ok", False)):
raise AssertionError(f"create failed status={status} payload={created}")
try:
status, provision = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_id}/provision")
if status == 404:
return False, "provision endpoint is not available on current backend build"
if status != 200:
raise AssertionError(f"provision failed status={status} payload={provision}")
if not bool(provision.get("ok", False)):
if is_systemd_unavailable(provision):
return False, f"systemd is unavailable: {provision}"
raise AssertionError(f"provision returned ok=false payload={provision}")
for unit in units:
assert_unit_owned(unit, client_id)
for t_unit in (template_units or []):
assert_file_exists(str(unit_file_path(t_unit)))
status, started = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_id}/start")
if status != 200 or not bool(started.get("ok", False)):
raise AssertionError(f"start failed status={status} payload={started}")
if str(started.get("status_after") or "").strip().lower() != "up":
raise AssertionError(f"start did not set status_after=up payload={started}")
status, health = request_json(api_url, "GET", f"/api/v1/transport/clients/{client_id}/health")
if status != 200 or not bool(health.get("ok", False)):
raise AssertionError(f"health failed status={status} payload={health}")
if str(health.get("status") or "").strip().lower() not in ("up", "degraded"):
raise AssertionError(f"health status is unexpected payload={health}")
status, metrics = request_json(api_url, "GET", f"/api/v1/transport/clients/{client_id}/metrics")
if status != 200 or not bool(metrics.get("ok", False)):
raise AssertionError(f"metrics failed status={status} payload={metrics}")
m = metrics.get("metrics") or {}
if not isinstance(m, dict):
raise AssertionError(f"metrics payload is invalid: {metrics}")
if int(m.get("state_changes", 0) or 0) < 1:
raise AssertionError(f"state_changes must be >=1 after start: {metrics}")
status, restarted = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_id}/restart")
if status != 200 or not bool(restarted.get("ok", False)):
raise AssertionError(f"restart failed status={status} payload={restarted}")
status, stopped = request_json(api_url, "POST", f"/api/v1/transport/clients/{client_id}/stop")
if status != 200 or not bool(stopped.get("ok", False)):
raise AssertionError(f"stop failed status={status} payload={stopped}")
if str(stopped.get("status_after") or "").strip().lower() != "down":
raise AssertionError(f"stop did not set status_after=down payload={stopped}")
finally:
status, deleted = request_json(api_url, "DELETE", f"/api/v1/transport/clients/{client_id}?force=true")
if status != 200 or not bool(deleted.get("ok", False)):
raise AssertionError(f"delete failed status={status} payload={deleted}")
for unit in units:
assert_unit_removed(unit, client_id)
for t_unit in (template_units or []):
assert_file_exists(str(unit_file_path(t_unit)))
return True, "ok"
def main() -> int:
api_url = os.environ.get("API_URL", "http://127.0.0.1:8080").strip()
if not api_url:
return fail("empty API_URL")
print(f"[transport_systemd_real_e2e] API_URL={api_url}")
status, caps = request_json(api_url, "GET", "/api/v1/transport/capabilities")
if status == 404:
print("[transport_systemd_real_e2e] SKIP: transport endpoints are not available on this backend")
return 0
if status != 200 or not bool(caps.get("ok", False)):
return fail(f"capabilities failed status={status} payload={caps}")
runtime_modes = caps.get("runtime_modes") or {}
if isinstance(runtime_modes, dict) and runtime_modes:
if not bool(runtime_modes.get("exec", False)):
return fail(f"runtime_modes.exec is not supported: {caps}")
ts = int(time.time())
pid = os.getpid()
tag = f"{ts}-{pid}"
cases = [
{
"client_id": f"e2e-sys-singbox-{tag}",
"name": "E2E Systemd Singbox",
"kind": "singbox",
"cfg": {
"runner": "systemd",
"runtime_mode": "exec",
"exec_start": "/usr/bin/sleep 120",
"hardening_enabled": False,
},
"units": [f"singbox@e2e-sys-singbox-{tag}.service"],
"template_units": ["singbox@.service"],
},
{
"client_id": f"e2e-sys-phoenix-{tag}",
"name": "E2E Systemd Phoenix",
"kind": "phoenix",
"cfg": {
"runner": "systemd",
"runtime_mode": "exec",
"unit": f"svpn-e2e-phoenix-{tag}.service",
"exec_start": "/usr/bin/sleep 120",
"hardening_enabled": False,
},
"units": [f"svpn-e2e-phoenix-{tag}.service"],
},
{
"client_id": f"e2e-sys-dnstt-{tag}",
"name": "E2E Systemd DNSTT",
"kind": "dnstt",
"cfg": {
"runner": "systemd",
"runtime_mode": "exec",
"unit": f"svpn-e2e-dnstt-{tag}.service",
"exec_start": "/usr/bin/sleep 120",
"ssh_tunnel": True,
"ssh_unit": f"svpn-e2e-dnstt-ssh-{tag}.service",
"ssh_exec_start": "/usr/bin/sleep 120",
"hardening_enabled": False,
"ssh_hardening_enabled": False,
},
"units": [
f"svpn-e2e-dnstt-{tag}.service",
f"svpn-e2e-dnstt-ssh-{tag}.service",
],
},
]
for case in cases:
try:
ok, reason = run_case(
api_url,
client_id=case["client_id"],
name=case["name"],
kind=case["kind"],
cfg=case["cfg"],
units=case["units"],
template_units=case.get("template_units"),
)
except AssertionError as e:
return fail(f"{case['kind']} failed: {e}")
if not ok:
print(f"[transport_systemd_real_e2e] SKIP: {reason}")
return 0
print(f"[transport_systemd_real_e2e] {case['kind']} real-systemd lifecycle: ok")
print("[transport_systemd_real_e2e] passed")
return 0
if __name__ == "__main__":
raise SystemExit(main())