359 lines
11 KiB
Python
359 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
svpn_run_profile.py
|
|
|
|
EN:
|
|
Headless launcher for a saved "traffic app profile".
|
|
- Runs app via `systemd-run --user` (transient unit)
|
|
- Resolves effective cgroupv2 path for that unit
|
|
- Calls Selective-VPN API to apply a per-app routing mark (VPN/Direct)
|
|
|
|
RU:
|
|
Headless-лаунчер для сохраненного "app profile".
|
|
- Запускает приложение через `systemd-run --user` (transient unit)
|
|
- Получает реальный cgroupv2 путь для юнита
|
|
- Дергает Selective-VPN API чтобы применить per-app метку маршрутизации (VPN/Direct)
|
|
|
|
This script is used for "desktop shortcuts" and can be run manually.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shlex
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from typing import Any, Dict, Optional
|
|
|
|
import requests
|
|
|
|
|
|
DEFAULT_API = "http://127.0.0.1:8080"
|
|
|
|
|
|
def _log_path() -> str:
|
|
base = os.path.join(os.path.expanduser("~"), ".local", "share", "selective-vpn")
|
|
os.makedirs(base, exist_ok=True)
|
|
return os.path.join(base, "svpn_run_profile.log")
|
|
|
|
|
|
def log(msg: str) -> None:
|
|
line = (msg or "").rstrip()
|
|
if not line:
|
|
return
|
|
ts = time.strftime("%Y-%m-%d %H:%M:%S")
|
|
out = f"[{ts}] {line}"
|
|
try:
|
|
with open(_log_path(), "a", encoding="utf-8", errors="replace") as f:
|
|
f.write(out + "\n")
|
|
except Exception:
|
|
pass
|
|
print(out, flush=True)
|
|
|
|
|
|
def api_base() -> str:
|
|
return str(os.environ.get("SELECTIVE_VPN_API") or DEFAULT_API).rstrip("/")
|
|
|
|
|
|
def api_request(method: str, path: str, *, json_body: Optional[Dict[str, Any]] = None, params=None, timeout: float = 5.0) -> Dict[str, Any]:
|
|
url = api_base() + path
|
|
try:
|
|
r = requests.request(method.upper(), url, json=json_body, params=params, timeout=timeout, headers={"Accept": "application/json"})
|
|
except Exception as e:
|
|
raise RuntimeError(f"api request failed: {method} {url}: {e}") from e
|
|
|
|
if not (200 <= r.status_code < 300):
|
|
raise RuntimeError(f"api error: {method} {url} ({r.status_code}): {r.text.strip()}")
|
|
|
|
if not r.content:
|
|
return {}
|
|
try:
|
|
v = r.json()
|
|
if isinstance(v, dict):
|
|
return v
|
|
return {"raw": v}
|
|
except ValueError:
|
|
return {"raw": r.text}
|
|
|
|
|
|
def list_profiles() -> list[dict]:
|
|
data = api_request("GET", "/api/v1/traffic/app-profiles", timeout=4.0)
|
|
raw = data.get("profiles") or []
|
|
if not isinstance(raw, list):
|
|
return []
|
|
out = []
|
|
for it in raw:
|
|
if isinstance(it, dict):
|
|
out.append(it)
|
|
return out
|
|
|
|
|
|
def get_profile(profile_id: str) -> dict:
|
|
pid = str(profile_id or "").strip()
|
|
if not pid:
|
|
raise ValueError("missing profile id")
|
|
for p in list_profiles():
|
|
if str(p.get("id") or "").strip() == pid:
|
|
return p
|
|
raise RuntimeError(f"profile not found: {pid}")
|
|
|
|
|
|
def infer_app_key(cmdline: str) -> str:
|
|
return canonicalize_app_key("", cmdline)
|
|
|
|
|
|
def canonicalize_app_key(app_key: str, cmdline: str) -> str:
|
|
key = (app_key or "").strip()
|
|
cmd = (cmdline or "").strip()
|
|
try:
|
|
tokens = shlex.split(cmd) if cmd else []
|
|
except Exception:
|
|
tokens = cmd.split() if cmd else []
|
|
if not tokens and key:
|
|
tokens = [key]
|
|
tokens = [str(x or "").strip() for x in tokens if str(x or "").strip()]
|
|
if not tokens:
|
|
return ""
|
|
|
|
def base(t: str) -> str:
|
|
return os.path.basename(str(t or "").strip())
|
|
|
|
def extract_run_target(toks: list[str]) -> str:
|
|
idx = -1
|
|
for i, t in enumerate(toks):
|
|
if t == "run":
|
|
idx = i
|
|
break
|
|
if idx < 0:
|
|
return ""
|
|
for j in range(idx + 1, len(toks)):
|
|
t = toks[j].strip()
|
|
if not t or t == "--":
|
|
continue
|
|
if t.startswith("-"):
|
|
continue
|
|
return t
|
|
return ""
|
|
|
|
primary = tokens[0]
|
|
b = base(primary).lower()
|
|
|
|
if b == "env":
|
|
for j in range(1, len(tokens)):
|
|
t = tokens[j].strip()
|
|
if not t or t == "--":
|
|
continue
|
|
if t.startswith("-"):
|
|
continue
|
|
if "=" in t:
|
|
continue
|
|
return canonicalize_app_key("", " ".join(tokens[j:]))
|
|
return "env"
|
|
|
|
if b == "flatpak":
|
|
appid = extract_run_target(tokens)
|
|
return f"flatpak:{appid}" if appid else "flatpak"
|
|
|
|
if b == "snap":
|
|
name = extract_run_target(tokens)
|
|
return f"snap:{name}" if name else "snap"
|
|
|
|
if b == "gtk-launch" and len(tokens) >= 2:
|
|
did = tokens[1].strip()
|
|
if did and not did.startswith("-"):
|
|
return f"desktop:{did}"
|
|
|
|
if "/" in primary:
|
|
return base(primary) or primary
|
|
|
|
return primary
|
|
|
|
|
|
def systemctl_user(args: list[str], *, timeout: float = 4.0) -> tuple[int, str]:
|
|
cmd = ["systemctl", "--user"] + list(args or [])
|
|
try:
|
|
p = subprocess.run(cmd, capture_output=True, text=True, check=False, timeout=timeout)
|
|
except subprocess.TimeoutExpired as e:
|
|
return 124, f"timeout: {' '.join(cmd)}"
|
|
out = ((p.stdout or "") + (p.stderr or "")).strip()
|
|
return int(p.returncode or 0), out
|
|
|
|
|
|
def cgroup_path_from_pid(pid: int) -> str:
|
|
p = int(pid or 0)
|
|
if p <= 0:
|
|
return ""
|
|
try:
|
|
with open(f"/proc/{p}/cgroup", "r", encoding="utf-8", errors="replace") as f:
|
|
for raw in f:
|
|
line = (raw or "").strip()
|
|
if line.startswith("0::"):
|
|
cg = line[len("0::"):].strip()
|
|
return cg
|
|
except Exception:
|
|
return ""
|
|
return ""
|
|
|
|
|
|
def effective_cgroup_for_unit(unit: str, *, timeout_sec: float = 2.5) -> str:
|
|
u = (unit or "").strip()
|
|
if not u:
|
|
return ""
|
|
deadline = time.time() + max(0.2, float(timeout_sec))
|
|
last = ""
|
|
while time.time() < deadline:
|
|
code, out = systemctl_user(["show", "-p", "MainPID", "--value", u], timeout=2.0)
|
|
last = out or ""
|
|
if code == 0:
|
|
try:
|
|
pid = int((out or "").strip() or "0")
|
|
except Exception:
|
|
pid = 0
|
|
if pid > 0:
|
|
cg = cgroup_path_from_pid(pid)
|
|
if cg:
|
|
return cg
|
|
time.sleep(0.1)
|
|
raise RuntimeError(f"failed to resolve effective cgroup for unit={u}: {last.strip() or '(no output)'}")
|
|
|
|
|
|
def run_systemd_unit(cmdline: str, *, unit: str) -> str:
|
|
cmd = (cmdline or "").strip()
|
|
if not cmd:
|
|
raise ValueError("empty command")
|
|
args = shlex.split(cmd)
|
|
if not args:
|
|
raise ValueError("empty args")
|
|
|
|
run_cmd = [
|
|
"systemd-run",
|
|
"--user",
|
|
"--unit",
|
|
unit,
|
|
"--no-block",
|
|
"--collect",
|
|
"--same-dir",
|
|
] + args
|
|
|
|
try:
|
|
p = subprocess.run(run_cmd, capture_output=True, text=True, check=False, timeout=6)
|
|
except subprocess.TimeoutExpired as e:
|
|
raise RuntimeError("systemd-run timed out") from e
|
|
out = ((p.stdout or "") + (p.stderr or "")).strip()
|
|
if p.returncode != 0:
|
|
raise RuntimeError(f"systemd-run failed: rc={p.returncode}\n{out}".strip())
|
|
|
|
cg = effective_cgroup_for_unit(unit, timeout_sec=3.0)
|
|
return cg
|
|
|
|
|
|
def refresh_if_running(*, target: str, app_key: str, command: str, ttl_sec: int) -> bool:
|
|
tgt = (target or "").strip().lower()
|
|
key = (app_key or "").strip()
|
|
if tgt not in ("vpn", "direct") or not key:
|
|
return False
|
|
|
|
data = api_request("GET", "/api/v1/traffic/appmarks/items", timeout=4.0)
|
|
items = data.get("items") or []
|
|
if not isinstance(items, list):
|
|
return False
|
|
|
|
for it in items:
|
|
if not isinstance(it, dict):
|
|
continue
|
|
if str(it.get("target") or "").strip().lower() != tgt:
|
|
continue
|
|
if str(it.get("app_key") or "").strip() != key:
|
|
continue
|
|
unit = str(it.get("unit") or "").strip()
|
|
if not unit:
|
|
continue
|
|
code, out = systemctl_user(["is-active", unit], timeout=2.0)
|
|
if code != 0 or (out or "").strip().lower() != "active":
|
|
continue
|
|
|
|
cg = effective_cgroup_for_unit(unit, timeout_sec=2.5)
|
|
payload = {
|
|
"op": "add",
|
|
"target": tgt,
|
|
"cgroup": cg,
|
|
"unit": unit,
|
|
"command": command,
|
|
"app_key": key,
|
|
"timeout_sec": int(ttl_sec or 0),
|
|
}
|
|
res = api_request("POST", "/api/v1/traffic/appmarks", json_body=payload, timeout=4.0)
|
|
if not bool(res.get("ok", False)):
|
|
raise RuntimeError(f"appmark refresh failed: {res.get('message')}")
|
|
log(f"refreshed mark: target={tgt} app={key} unit={unit} cgroup_id={res.get('cgroup_id')}")
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def apply_mark(*, target: str, cgroup: str, unit: str, command: str, app_key: str, ttl_sec: int) -> None:
|
|
payload = {
|
|
"op": "add",
|
|
"target": target,
|
|
"cgroup": cgroup,
|
|
"unit": unit,
|
|
"command": command,
|
|
"app_key": app_key,
|
|
"timeout_sec": int(ttl_sec or 0),
|
|
}
|
|
res = api_request("POST", "/api/v1/traffic/appmarks", json_body=payload, timeout=4.0)
|
|
if not bool(res.get("ok", False)):
|
|
raise RuntimeError(f"appmark failed: {res.get('message')}")
|
|
log(f"mark added: target={target} app={app_key} unit={unit} cgroup_id={res.get('cgroup_id')} ttl={res.get('timeout_sec')}")
|
|
|
|
|
|
def main(argv: list[str]) -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--id", required=True, help="profile id")
|
|
ap.add_argument("--json", action="store_true", help="print machine-readable json result")
|
|
args = ap.parse_args(argv)
|
|
|
|
pid = str(args.id or "").strip()
|
|
prof = get_profile(pid)
|
|
|
|
cmd = str(prof.get("command") or "").strip()
|
|
if not cmd:
|
|
raise RuntimeError("profile command is empty")
|
|
target = str(prof.get("target") or "vpn").strip().lower()
|
|
if target not in ("vpn", "direct"):
|
|
target = "vpn"
|
|
|
|
app_key_raw = str(prof.get("app_key") or "").strip()
|
|
app_key = canonicalize_app_key(app_key_raw, cmd) or canonicalize_app_key("", cmd)
|
|
ttl = int(prof.get("ttl_sec", 0) or 0)
|
|
if ttl <= 0:
|
|
ttl = 24 * 60 * 60
|
|
|
|
# Try refresh first if already running.
|
|
if refresh_if_running(target=target, app_key=app_key, command=cmd, ttl_sec=ttl):
|
|
if args.json:
|
|
print(json.dumps({"ok": True, "op": "refresh", "id": pid, "target": target, "app_key": app_key}))
|
|
return 0
|
|
|
|
unit = f"svpn-{target}-{int(time.time())}.service"
|
|
log(f"launching profile id={pid} target={target} app={app_key} unit={unit}")
|
|
cg = run_systemd_unit(cmd, unit=unit)
|
|
log(f"ControlGroup: {cg}")
|
|
apply_mark(target=target, cgroup=cg, unit=unit, command=cmd, app_key=app_key, ttl_sec=ttl)
|
|
if args.json:
|
|
print(json.dumps({"ok": True, "op": "run", "id": pid, "target": target, "app_key": app_key, "unit": unit}))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
raise SystemExit(main(sys.argv[1:]))
|
|
except KeyboardInterrupt:
|
|
raise
|
|
except Exception as e:
|
|
log(f"ERROR: {e}")
|
|
raise SystemExit(1)
|