diff --git a/selective-vpn-gui/svpn_run_profile.py b/selective-vpn-gui/svpn_run_profile.py new file mode 100644 index 0000000..93092af --- /dev/null +++ b/selective-vpn-gui/svpn_run_profile.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python3 +""" +svpn_run_profile.py + +EN: + Headless launcher for a saved "traffic app profile". + - Runs app via `systemd-run --user` (transient unit) + - Resolves effective cgroupv2 path for that unit + - Calls Selective-VPN API to apply a per-app routing mark (VPN/Direct) + +RU: + Headless-лаунчер для сохраненного "app profile". + - Запускает приложение через `systemd-run --user` (transient unit) + - Получает реальный cgroupv2 путь для юнита + - Дергает Selective-VPN API чтобы применить per-app метку маршрутизации (VPN/Direct) + +This script is used for "desktop shortcuts" and can be run manually. +""" + +from __future__ import annotations + +import argparse +import json +import os +import shlex +import subprocess +import sys +import time +from typing import Any, Dict, Optional + +import requests + + +DEFAULT_API = "http://127.0.0.1:8080" + + +def _log_path() -> str: + base = os.path.join(os.path.expanduser("~"), ".local", "share", "selective-vpn") + os.makedirs(base, exist_ok=True) + return os.path.join(base, "svpn_run_profile.log") + + +def log(msg: str) -> None: + line = (msg or "").rstrip() + if not line: + return + ts = time.strftime("%Y-%m-%d %H:%M:%S") + out = f"[{ts}] {line}" + try: + with open(_log_path(), "a", encoding="utf-8", errors="replace") as f: + f.write(out + "\n") + except Exception: + pass + print(out, flush=True) + + +def api_base() -> str: + return str(os.environ.get("SELECTIVE_VPN_API") or DEFAULT_API).rstrip("/") + + +def api_request(method: str, path: str, *, json_body: Optional[Dict[str, Any]] = None, params=None, timeout: float = 5.0) -> Dict[str, Any]: + url = api_base() + path + try: + r = requests.request(method.upper(), url, json=json_body, params=params, timeout=timeout, headers={"Accept": "application/json"}) + except Exception as e: + raise RuntimeError(f"api request failed: {method} {url}: {e}") from e + + if not (200 <= r.status_code < 300): + raise RuntimeError(f"api error: {method} {url} ({r.status_code}): {r.text.strip()}") + + if not r.content: + return {} + try: + v = r.json() + if isinstance(v, dict): + return v + return {"raw": v} + except ValueError: + return {"raw": r.text} + + +def list_profiles() -> list[dict]: + data = api_request("GET", "/api/v1/traffic/app-profiles", timeout=4.0) + raw = data.get("profiles") or [] + if not isinstance(raw, list): + return [] + out = [] + for it in raw: + if isinstance(it, dict): + out.append(it) + return out + + +def get_profile(profile_id: str) -> dict: + pid = str(profile_id or "").strip() + if not pid: + raise ValueError("missing profile id") + for p in list_profiles(): + if str(p.get("id") or "").strip() == pid: + return p + raise RuntimeError(f"profile not found: {pid}") + + +def infer_app_key(cmdline: str) -> str: + cmd = (cmdline or "").strip() + if not cmd: + return "" + try: + args = shlex.split(cmd) + if args: + return str(args[0] or "").strip() + except Exception: + pass + return (cmd.split() or [""])[0].strip() + + +def systemctl_user(args: list[str], *, timeout: float = 4.0) -> tuple[int, str]: + cmd = ["systemctl", "--user"] + list(args or []) + try: + p = subprocess.run(cmd, capture_output=True, text=True, check=False, timeout=timeout) + except subprocess.TimeoutExpired as e: + return 124, f"timeout: {' '.join(cmd)}" + out = ((p.stdout or "") + (p.stderr or "")).strip() + return int(p.returncode or 0), out + + +def cgroup_path_from_pid(pid: int) -> str: + p = int(pid or 0) + if p <= 0: + return "" + try: + with open(f"/proc/{p}/cgroup", "r", encoding="utf-8", errors="replace") as f: + for raw in f: + line = (raw or "").strip() + if line.startswith("0::"): + cg = line[len("0::"):].strip() + return cg + except Exception: + return "" + return "" + + +def effective_cgroup_for_unit(unit: str, *, timeout_sec: float = 2.5) -> str: + u = (unit or "").strip() + if not u: + return "" + deadline = time.time() + max(0.2, float(timeout_sec)) + last = "" + while time.time() < deadline: + code, out = systemctl_user(["show", "-p", "MainPID", "--value", u], timeout=2.0) + last = out or "" + if code == 0: + try: + pid = int((out or "").strip() or "0") + except Exception: + pid = 0 + if pid > 0: + cg = cgroup_path_from_pid(pid) + if cg: + return cg + time.sleep(0.1) + raise RuntimeError(f"failed to resolve effective cgroup for unit={u}: {last.strip() or '(no output)'}") + + +def run_systemd_unit(cmdline: str, *, unit: str) -> str: + cmd = (cmdline or "").strip() + if not cmd: + raise ValueError("empty command") + args = shlex.split(cmd) + if not args: + raise ValueError("empty args") + + run_cmd = [ + "systemd-run", + "--user", + "--unit", + unit, + "--collect", + "--same-dir", + ] + args + + try: + p = subprocess.run(run_cmd, capture_output=True, text=True, check=False, timeout=8) + except subprocess.TimeoutExpired as e: + raise RuntimeError("systemd-run timed out") from e + out = ((p.stdout or "") + (p.stderr or "")).strip() + if p.returncode != 0: + raise RuntimeError(f"systemd-run failed: rc={p.returncode}\n{out}".strip()) + + cg = effective_cgroup_for_unit(unit, timeout_sec=3.0) + return cg + + +def refresh_if_running(*, target: str, app_key: str, command: str, ttl_sec: int) -> bool: + tgt = (target or "").strip().lower() + key = (app_key or "").strip() + if tgt not in ("vpn", "direct") or not key: + return False + + data = api_request("GET", "/api/v1/traffic/appmarks/items", timeout=4.0) + items = data.get("items") or [] + if not isinstance(items, list): + return False + + for it in items: + if not isinstance(it, dict): + continue + if str(it.get("target") or "").strip().lower() != tgt: + continue + if str(it.get("app_key") or "").strip() != key: + continue + unit = str(it.get("unit") or "").strip() + if not unit: + continue + code, out = systemctl_user(["is-active", unit], timeout=2.0) + if code != 0 or (out or "").strip().lower() != "active": + continue + + cg = effective_cgroup_for_unit(unit, timeout_sec=2.5) + payload = { + "op": "add", + "target": tgt, + "cgroup": cg, + "unit": unit, + "command": command, + "app_key": key, + "timeout_sec": int(ttl_sec or 0), + } + res = api_request("POST", "/api/v1/traffic/appmarks", json_body=payload, timeout=4.0) + if not bool(res.get("ok", False)): + raise RuntimeError(f"appmark refresh failed: {res.get('message')}") + log(f"refreshed mark: target={tgt} app={key} unit={unit} cgroup_id={res.get('cgroup_id')}") + return True + + return False + + +def apply_mark(*, target: str, cgroup: str, unit: str, command: str, app_key: str, ttl_sec: int) -> None: + payload = { + "op": "add", + "target": target, + "cgroup": cgroup, + "unit": unit, + "command": command, + "app_key": app_key, + "timeout_sec": int(ttl_sec or 0), + } + res = api_request("POST", "/api/v1/traffic/appmarks", json_body=payload, timeout=4.0) + if not bool(res.get("ok", False)): + raise RuntimeError(f"appmark failed: {res.get('message')}") + log(f"mark added: target={target} app={app_key} unit={unit} cgroup_id={res.get('cgroup_id')} ttl={res.get('timeout_sec')}") + + +def main(argv: list[str]) -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--id", required=True, help="profile id") + ap.add_argument("--json", action="store_true", help="print machine-readable json result") + args = ap.parse_args(argv) + + pid = str(args.id or "").strip() + prof = get_profile(pid) + + cmd = str(prof.get("command") or "").strip() + if not cmd: + raise RuntimeError("profile command is empty") + target = str(prof.get("target") or "vpn").strip().lower() + if target not in ("vpn", "direct"): + target = "vpn" + + app_key = str(prof.get("app_key") or "").strip() or infer_app_key(cmd) + ttl = int(prof.get("ttl_sec", 0) or 0) + if ttl <= 0: + ttl = 24 * 60 * 60 + + # Try refresh first if already running. + if refresh_if_running(target=target, app_key=app_key, command=cmd, ttl_sec=ttl): + if args.json: + print(json.dumps({"ok": True, "op": "refresh", "id": pid, "target": target, "app_key": app_key})) + return 0 + + unit = f"svpn-{target}-{int(time.time())}.service" + log(f"launching profile id={pid} target={target} app={app_key} unit={unit}") + cg = run_systemd_unit(cmd, unit=unit) + log(f"ControlGroup: {cg}") + apply_mark(target=target, cgroup=cg, unit=unit, command=cmd, app_key=app_key, ttl_sec=ttl) + if args.json: + print(json.dumps({"ok": True, "op": "run", "id": pid, "target": target, "app_key": app_key, "unit": unit})) + return 0 + + +if __name__ == "__main__": + try: + raise SystemExit(main(sys.argv[1:])) + except KeyboardInterrupt: + raise + except Exception as e: + log(f"ERROR: {e}") + raise SystemExit(1) + diff --git a/selective-vpn-gui/traffic_mode_dialog.py b/selective-vpn-gui/traffic_mode_dialog.py index 6e79f20..7d72616 100644 --- a/selective-vpn-gui/traffic_mode_dialog.py +++ b/selective-vpn-gui/traffic_mode_dialog.py @@ -285,6 +285,27 @@ RU: Восстанавливает маршруты/nft из последнег row_prof_btn.addStretch(1) profiles_layout.addLayout(row_prof_btn) + row_prof_shortcuts = QHBoxLayout() + self.btn_app_profile_shortcut_create = QPushButton("Create shortcut") + self.btn_app_profile_shortcut_create.setToolTip( + "EN: Create/overwrite a .desktop shortcut for the selected profile.\n" + "EN: The shortcut will launch the app and apply routing mark automatically.\n" + "RU: Создать/перезаписать .desktop ярлык для выбранного профиля.\n" + "RU: Ярлык запускает приложение и автоматически применяет routing mark." + ) + self.btn_app_profile_shortcut_create.clicked.connect(self.on_app_profile_shortcut_create) + row_prof_shortcuts.addWidget(self.btn_app_profile_shortcut_create) + + self.btn_app_profile_shortcut_remove = QPushButton("Remove shortcut") + self.btn_app_profile_shortcut_remove.setToolTip( + "EN: Remove the .desktop shortcut for the selected profile.\n" + "RU: Удалить .desktop ярлык для выбранного профиля." + ) + self.btn_app_profile_shortcut_remove.clicked.connect(self.on_app_profile_shortcut_remove) + row_prof_shortcuts.addWidget(self.btn_app_profile_shortcut_remove) + row_prof_shortcuts.addStretch(1) + profiles_layout.addLayout(row_prof_shortcuts) + self.lst_app_profiles = QListWidget() self.lst_app_profiles.setSelectionMode(QAbstractItemView.SingleSelection) self.lst_app_profiles.setToolTip( @@ -294,6 +315,9 @@ RU: Восстанавливает маршруты/nft из последнег self.lst_app_profiles.itemDoubleClicked.connect( lambda _it: self.on_app_profile_load() ) + self.lst_app_profiles.currentItemChanged.connect( + lambda _cur, _prev: self._update_profile_shortcut_ui() + ) self.lst_app_profiles.setFixedHeight(140) profiles_layout.addWidget(self.lst_app_profiles) @@ -301,6 +325,11 @@ RU: Восстанавливает маршруты/nft из последнег self.lbl_app_profiles.setStyleSheet("color: gray;") profiles_layout.addWidget(self.lbl_app_profiles) + self.lbl_profile_shortcut = QLabel("Shortcut: —") + self.lbl_profile_shortcut.setWordWrap(True) + self.lbl_profile_shortcut.setStyleSheet("color: gray;") + profiles_layout.addWidget(self.lbl_profile_shortcut) + tab_profiles = QWidget() tab_profiles_layout = QVBoxLayout(tab_profiles) tab_profiles_layout.addWidget(profiles_group) @@ -1138,6 +1167,70 @@ RU: Применяет policy-rules и проверяет health. При оши return None return it.data(QtCore.Qt.UserRole) + def _profile_shortcuts_dir(self) -> str: + # ~/.local/share/applications is the standard per-user location. + return os.path.join(os.path.expanduser("~"), ".local", "share", "applications") + + def _profile_shortcut_path(self, profile_id: str) -> str: + pid = (profile_id or "").strip() + if not pid: + return "" + safe = re.sub(r"[^A-Za-z0-9._-]+", "-", pid).strip("-") + if not safe: + safe = "profile" + return os.path.join(self._profile_shortcuts_dir(), f"svpn-profile-{safe}.desktop") + + def _profile_shortcut_exists(self, profile_id: str) -> bool: + p = self._profile_shortcut_path(profile_id) + return bool(p) and os.path.isfile(p) + + def _render_profile_shortcut(self, prof) -> str: + pid = (getattr(prof, "id", "") or "").strip() + name = (getattr(prof, "name", "") or "").strip() or pid or "SVPN profile" + target = (getattr(prof, "target", "") or "").strip().lower() + if target not in ("vpn", "direct"): + target = "vpn" + + label_target = "VPN" if target == "vpn" else "Direct" + + script = os.path.abspath(os.path.join(os.path.dirname(__file__), "svpn_run_profile.py")) + # Use env python3 so the shortcut works even if python3 is not /usr/bin/python3. + exec_line = f"/usr/bin/env python3 {script} --id {pid}" + + # Keep .desktop content ASCII-ish. Values are UTF-8-safe by spec, but avoid surprises. + name_safe = (name or "SVPN profile").replace("\n", " ").replace("\r", " ").strip() + + return ( + "[Desktop Entry]\n" + "Version=1.0\n" + "Type=Application\n" + f"Name=SVPN: {name_safe} [{label_target}]\n" + f"Comment=Selective VPN: run traffic profile id={pid} target={target}\n" + f"Exec={exec_line}\n" + "Terminal=false\n" + "Categories=Network;\n" + f"X-SVPN-ProfileID={pid}\n" + f"X-SVPN-Target={target}\n" + ) + + def _update_profile_shortcut_ui(self) -> None: + prof = self._selected_app_profile() + if prof is None: + self.btn_app_profile_shortcut_create.setEnabled(False) + self.btn_app_profile_shortcut_remove.setEnabled(False) + self.lbl_profile_shortcut.setText("Shortcut: —") + return + + pid = (getattr(prof, "id", "") or "").strip() + path = self._profile_shortcut_path(pid) + installed = self._profile_shortcut_exists(pid) + + self.btn_app_profile_shortcut_create.setEnabled(True) + self.btn_app_profile_shortcut_remove.setEnabled(installed) + + state = "installed" if installed else "not installed" + self.lbl_profile_shortcut.setText(f"Shortcut: {state} ({path})") + def refresh_app_profiles(self, quiet: bool = False) -> None: def work() -> None: profs = list(self.ctrl.traffic_app_profiles_list() or []) @@ -1156,12 +1249,16 @@ RU: Применяет policy-rules и проверяет health. При оши if target in ("vpn", "direct"): label += f" [{target}]" it = QListWidgetItem(label) + sc_path = self._profile_shortcut_path(pid) + sc_state = "yes" if (sc_path and os.path.isfile(sc_path)) else "no" it.setToolTip( ( f"id: {pid}\n" f"app_key: {app_key}\n" f"target: {target}\n" f"ttl: {ttl_sec}s\n\n" + f"shortcut: {sc_state}\n" + f"shortcut_path: {sc_path}\n\n" f"{cmd}" ).strip() ) @@ -1169,6 +1266,7 @@ RU: Применяет policy-rules и проверяет health. При оши self.lst_app_profiles.addItem(it) self.lbl_app_profiles.setText(f"Saved profiles: {len(profs)}") + self._update_profile_shortcut_ui() if quiet: try: @@ -1208,6 +1306,18 @@ RU: Применяет policy-rules и проверяет health. При оши self._append_app_log(f"[profile] saved: id={pid or '-'} target={target} app={app_key or '-'}") self.refresh_app_profiles(quiet=True) + # If shortcut exists already, rewrite it to reflect updated name/target. + if prof is not None and pid and self._profile_shortcut_exists(pid): + try: + sc_path = self._profile_shortcut_path(pid) + os.makedirs(os.path.dirname(sc_path), exist_ok=True) + with open(sc_path, "w", encoding="utf-8", errors="replace") as f: + f.write(self._render_profile_shortcut(prof)) + self._append_app_log(f"[shortcut] updated: {sc_path}") + except Exception as e: + # Non-fatal: profile save is more important than shortcut rewrite. + self._append_app_log(f"[shortcut] update failed: {e}") + # Best-effort select newly saved profile. if pid: for i in range(self.lst_app_profiles.count()): @@ -1221,6 +1331,55 @@ RU: Применяет policy-rules и проверяет health. При оши self._safe(work, title="Save profile error") + def on_app_profile_shortcut_create(self) -> None: + prof = self._selected_app_profile() + if prof is None: + return + + pid = (getattr(prof, "id", "") or "").strip() + if not pid: + return + + def work() -> None: + path = self._profile_shortcut_path(pid) + if not path: + raise RuntimeError("cannot derive shortcut path") + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w", encoding="utf-8", errors="replace") as f: + f.write(self._render_profile_shortcut(prof)) + self._append_app_log(f"[shortcut] saved: {path} id={pid}") + self._set_action_status(f"Shortcut saved: {os.path.basename(path)}", ok=True) + self._update_profile_shortcut_ui() + + self._safe(work, title="Create shortcut error") + + def on_app_profile_shortcut_remove(self) -> None: + prof = self._selected_app_profile() + if prof is None: + return + + pid = (getattr(prof, "id", "") or "").strip() + if not pid: + return + + def work() -> None: + path = self._profile_shortcut_path(pid) + if not path: + raise RuntimeError("cannot derive shortcut path") + if not os.path.exists(path): + self._set_action_status("Shortcut not installed", ok=True) + self._update_profile_shortcut_ui() + return + try: + os.remove(path) + except FileNotFoundError: + pass + self._append_app_log(f"[shortcut] removed: {path} id={pid}") + self._set_action_status("Shortcut removed", ok=True) + self._update_profile_shortcut_ui() + + self._safe(work, title="Remove shortcut error") + def on_app_profile_load(self) -> None: prof = self._selected_app_profile() if prof is None: @@ -1290,6 +1449,15 @@ RU: Применяет policy-rules и проверяет health. При оши self._set_action_status(f"Delete profile failed: {res.message}", ok=False) raise RuntimeError(res.message or "delete failed") + # Keep a tight coupling: deleting profile removes its shortcut too. + sc_path = self._profile_shortcut_path(pid) + if sc_path and os.path.exists(sc_path): + try: + os.remove(sc_path) + self._append_app_log(f"[shortcut] removed: {sc_path} (profile deleted)") + except Exception as e: + self._append_app_log(f"[shortcut] remove failed: {e}") + self._set_action_status(f"Profile deleted: {pid}", ok=True) self._append_app_log(f"[profile] deleted: id={pid}") self.refresh_app_profiles(quiet=True)