Files
elmprodvpn/selective-vpn-gui/svpn_run_profile.py
2026-02-15 22:28:56 +03:00

300 lines
9.4 KiB
Python

#!/usr/bin/env python3
"""
svpn_run_profile.py
EN:
Headless launcher for a saved "traffic app profile".
- Runs app via `systemd-run --user` (transient unit)
- Resolves effective cgroupv2 path for that unit
- Calls Selective-VPN API to apply a per-app routing mark (VPN/Direct)
RU:
Headless-лаунчер для сохраненного "app profile".
- Запускает приложение через `systemd-run --user` (transient unit)
- Получает реальный cgroupv2 путь для юнита
- Дергает Selective-VPN API чтобы применить per-app метку маршрутизации (VPN/Direct)
This script is used for "desktop shortcuts" and can be run manually.
"""
from __future__ import annotations
import argparse
import json
import os
import shlex
import subprocess
import sys
import time
from typing import Any, Dict, Optional
import requests
DEFAULT_API = "http://127.0.0.1:8080"
def _log_path() -> str:
base = os.path.join(os.path.expanduser("~"), ".local", "share", "selective-vpn")
os.makedirs(base, exist_ok=True)
return os.path.join(base, "svpn_run_profile.log")
def log(msg: str) -> None:
line = (msg or "").rstrip()
if not line:
return
ts = time.strftime("%Y-%m-%d %H:%M:%S")
out = f"[{ts}] {line}"
try:
with open(_log_path(), "a", encoding="utf-8", errors="replace") as f:
f.write(out + "\n")
except Exception:
pass
print(out, flush=True)
def api_base() -> str:
return str(os.environ.get("SELECTIVE_VPN_API") or DEFAULT_API).rstrip("/")
def api_request(method: str, path: str, *, json_body: Optional[Dict[str, Any]] = None, params=None, timeout: float = 5.0) -> Dict[str, Any]:
url = api_base() + path
try:
r = requests.request(method.upper(), url, json=json_body, params=params, timeout=timeout, headers={"Accept": "application/json"})
except Exception as e:
raise RuntimeError(f"api request failed: {method} {url}: {e}") from e
if not (200 <= r.status_code < 300):
raise RuntimeError(f"api error: {method} {url} ({r.status_code}): {r.text.strip()}")
if not r.content:
return {}
try:
v = r.json()
if isinstance(v, dict):
return v
return {"raw": v}
except ValueError:
return {"raw": r.text}
def list_profiles() -> list[dict]:
data = api_request("GET", "/api/v1/traffic/app-profiles", timeout=4.0)
raw = data.get("profiles") or []
if not isinstance(raw, list):
return []
out = []
for it in raw:
if isinstance(it, dict):
out.append(it)
return out
def get_profile(profile_id: str) -> dict:
pid = str(profile_id or "").strip()
if not pid:
raise ValueError("missing profile id")
for p in list_profiles():
if str(p.get("id") or "").strip() == pid:
return p
raise RuntimeError(f"profile not found: {pid}")
def infer_app_key(cmdline: str) -> str:
cmd = (cmdline or "").strip()
if not cmd:
return ""
try:
args = shlex.split(cmd)
if args:
return str(args[0] or "").strip()
except Exception:
pass
return (cmd.split() or [""])[0].strip()
def systemctl_user(args: list[str], *, timeout: float = 4.0) -> tuple[int, str]:
cmd = ["systemctl", "--user"] + list(args or [])
try:
p = subprocess.run(cmd, capture_output=True, text=True, check=False, timeout=timeout)
except subprocess.TimeoutExpired as e:
return 124, f"timeout: {' '.join(cmd)}"
out = ((p.stdout or "") + (p.stderr or "")).strip()
return int(p.returncode or 0), out
def cgroup_path_from_pid(pid: int) -> str:
p = int(pid or 0)
if p <= 0:
return ""
try:
with open(f"/proc/{p}/cgroup", "r", encoding="utf-8", errors="replace") as f:
for raw in f:
line = (raw or "").strip()
if line.startswith("0::"):
cg = line[len("0::"):].strip()
return cg
except Exception:
return ""
return ""
def effective_cgroup_for_unit(unit: str, *, timeout_sec: float = 2.5) -> str:
u = (unit or "").strip()
if not u:
return ""
deadline = time.time() + max(0.2, float(timeout_sec))
last = ""
while time.time() < deadline:
code, out = systemctl_user(["show", "-p", "MainPID", "--value", u], timeout=2.0)
last = out or ""
if code == 0:
try:
pid = int((out or "").strip() or "0")
except Exception:
pid = 0
if pid > 0:
cg = cgroup_path_from_pid(pid)
if cg:
return cg
time.sleep(0.1)
raise RuntimeError(f"failed to resolve effective cgroup for unit={u}: {last.strip() or '(no output)'}")
def run_systemd_unit(cmdline: str, *, unit: str) -> str:
cmd = (cmdline or "").strip()
if not cmd:
raise ValueError("empty command")
args = shlex.split(cmd)
if not args:
raise ValueError("empty args")
run_cmd = [
"systemd-run",
"--user",
"--unit",
unit,
"--collect",
"--same-dir",
] + args
try:
p = subprocess.run(run_cmd, capture_output=True, text=True, check=False, timeout=8)
except subprocess.TimeoutExpired as e:
raise RuntimeError("systemd-run timed out") from e
out = ((p.stdout or "") + (p.stderr or "")).strip()
if p.returncode != 0:
raise RuntimeError(f"systemd-run failed: rc={p.returncode}\n{out}".strip())
cg = effective_cgroup_for_unit(unit, timeout_sec=3.0)
return cg
def refresh_if_running(*, target: str, app_key: str, command: str, ttl_sec: int) -> bool:
tgt = (target or "").strip().lower()
key = (app_key or "").strip()
if tgt not in ("vpn", "direct") or not key:
return False
data = api_request("GET", "/api/v1/traffic/appmarks/items", timeout=4.0)
items = data.get("items") or []
if not isinstance(items, list):
return False
for it in items:
if not isinstance(it, dict):
continue
if str(it.get("target") or "").strip().lower() != tgt:
continue
if str(it.get("app_key") or "").strip() != key:
continue
unit = str(it.get("unit") or "").strip()
if not unit:
continue
code, out = systemctl_user(["is-active", unit], timeout=2.0)
if code != 0 or (out or "").strip().lower() != "active":
continue
cg = effective_cgroup_for_unit(unit, timeout_sec=2.5)
payload = {
"op": "add",
"target": tgt,
"cgroup": cg,
"unit": unit,
"command": command,
"app_key": key,
"timeout_sec": int(ttl_sec or 0),
}
res = api_request("POST", "/api/v1/traffic/appmarks", json_body=payload, timeout=4.0)
if not bool(res.get("ok", False)):
raise RuntimeError(f"appmark refresh failed: {res.get('message')}")
log(f"refreshed mark: target={tgt} app={key} unit={unit} cgroup_id={res.get('cgroup_id')}")
return True
return False
def apply_mark(*, target: str, cgroup: str, unit: str, command: str, app_key: str, ttl_sec: int) -> None:
payload = {
"op": "add",
"target": target,
"cgroup": cgroup,
"unit": unit,
"command": command,
"app_key": app_key,
"timeout_sec": int(ttl_sec or 0),
}
res = api_request("POST", "/api/v1/traffic/appmarks", json_body=payload, timeout=4.0)
if not bool(res.get("ok", False)):
raise RuntimeError(f"appmark failed: {res.get('message')}")
log(f"mark added: target={target} app={app_key} unit={unit} cgroup_id={res.get('cgroup_id')} ttl={res.get('timeout_sec')}")
def main(argv: list[str]) -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--id", required=True, help="profile id")
ap.add_argument("--json", action="store_true", help="print machine-readable json result")
args = ap.parse_args(argv)
pid = str(args.id or "").strip()
prof = get_profile(pid)
cmd = str(prof.get("command") or "").strip()
if not cmd:
raise RuntimeError("profile command is empty")
target = str(prof.get("target") or "vpn").strip().lower()
if target not in ("vpn", "direct"):
target = "vpn"
app_key = str(prof.get("app_key") or "").strip() or infer_app_key(cmd)
ttl = int(prof.get("ttl_sec", 0) or 0)
if ttl <= 0:
ttl = 24 * 60 * 60
# Try refresh first if already running.
if refresh_if_running(target=target, app_key=app_key, command=cmd, ttl_sec=ttl):
if args.json:
print(json.dumps({"ok": True, "op": "refresh", "id": pid, "target": target, "app_key": app_key}))
return 0
unit = f"svpn-{target}-{int(time.time())}.service"
log(f"launching profile id={pid} target={target} app={app_key} unit={unit}")
cg = run_systemd_unit(cmd, unit=unit)
log(f"ControlGroup: {cg}")
apply_mark(target=target, cgroup=cg, unit=unit, command=cmd, app_key=app_key, ttl_sec=ttl)
if args.json:
print(json.dumps({"ok": True, "op": "run", "id": pid, "target": target, "app_key": app_key, "unit": unit}))
return 0
if __name__ == "__main__":
try:
raise SystemExit(main(sys.argv[1:]))
except KeyboardInterrupt:
raise
except Exception as e:
log(f"ERROR: {e}")
raise SystemExit(1)