ui: app profile desktop shortcuts
This commit is contained in:
299
selective-vpn-gui/svpn_run_profile.py
Normal file
299
selective-vpn-gui/svpn_run_profile.py
Normal file
@@ -0,0 +1,299 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
svpn_run_profile.py
|
||||||
|
|
||||||
|
EN:
|
||||||
|
Headless launcher for a saved "traffic app profile".
|
||||||
|
- Runs app via `systemd-run --user` (transient unit)
|
||||||
|
- Resolves effective cgroupv2 path for that unit
|
||||||
|
- Calls Selective-VPN API to apply a per-app routing mark (VPN/Direct)
|
||||||
|
|
||||||
|
RU:
|
||||||
|
Headless-лаунчер для сохраненного "app profile".
|
||||||
|
- Запускает приложение через `systemd-run --user` (transient unit)
|
||||||
|
- Получает реальный cgroupv2 путь для юнита
|
||||||
|
- Дергает Selective-VPN API чтобы применить per-app метку маршрутизации (VPN/Direct)
|
||||||
|
|
||||||
|
This script is used for "desktop shortcuts" and can be run manually.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import shlex
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
|
||||||
|
DEFAULT_API = "http://127.0.0.1:8080"
|
||||||
|
|
||||||
|
|
||||||
|
def _log_path() -> str:
|
||||||
|
base = os.path.join(os.path.expanduser("~"), ".local", "share", "selective-vpn")
|
||||||
|
os.makedirs(base, exist_ok=True)
|
||||||
|
return os.path.join(base, "svpn_run_profile.log")
|
||||||
|
|
||||||
|
|
||||||
|
def log(msg: str) -> None:
|
||||||
|
line = (msg or "").rstrip()
|
||||||
|
if not line:
|
||||||
|
return
|
||||||
|
ts = time.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
out = f"[{ts}] {line}"
|
||||||
|
try:
|
||||||
|
with open(_log_path(), "a", encoding="utf-8", errors="replace") as f:
|
||||||
|
f.write(out + "\n")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
print(out, flush=True)
|
||||||
|
|
||||||
|
|
||||||
|
def api_base() -> str:
|
||||||
|
return str(os.environ.get("SELECTIVE_VPN_API") or DEFAULT_API).rstrip("/")
|
||||||
|
|
||||||
|
|
||||||
|
def api_request(method: str, path: str, *, json_body: Optional[Dict[str, Any]] = None, params=None, timeout: float = 5.0) -> Dict[str, Any]:
|
||||||
|
url = api_base() + path
|
||||||
|
try:
|
||||||
|
r = requests.request(method.upper(), url, json=json_body, params=params, timeout=timeout, headers={"Accept": "application/json"})
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(f"api request failed: {method} {url}: {e}") from e
|
||||||
|
|
||||||
|
if not (200 <= r.status_code < 300):
|
||||||
|
raise RuntimeError(f"api error: {method} {url} ({r.status_code}): {r.text.strip()}")
|
||||||
|
|
||||||
|
if not r.content:
|
||||||
|
return {}
|
||||||
|
try:
|
||||||
|
v = r.json()
|
||||||
|
if isinstance(v, dict):
|
||||||
|
return v
|
||||||
|
return {"raw": v}
|
||||||
|
except ValueError:
|
||||||
|
return {"raw": r.text}
|
||||||
|
|
||||||
|
|
||||||
|
def list_profiles() -> list[dict]:
|
||||||
|
data = api_request("GET", "/api/v1/traffic/app-profiles", timeout=4.0)
|
||||||
|
raw = data.get("profiles") or []
|
||||||
|
if not isinstance(raw, list):
|
||||||
|
return []
|
||||||
|
out = []
|
||||||
|
for it in raw:
|
||||||
|
if isinstance(it, dict):
|
||||||
|
out.append(it)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def get_profile(profile_id: str) -> dict:
|
||||||
|
pid = str(profile_id or "").strip()
|
||||||
|
if not pid:
|
||||||
|
raise ValueError("missing profile id")
|
||||||
|
for p in list_profiles():
|
||||||
|
if str(p.get("id") or "").strip() == pid:
|
||||||
|
return p
|
||||||
|
raise RuntimeError(f"profile not found: {pid}")
|
||||||
|
|
||||||
|
|
||||||
|
def infer_app_key(cmdline: str) -> str:
|
||||||
|
cmd = (cmdline or "").strip()
|
||||||
|
if not cmd:
|
||||||
|
return ""
|
||||||
|
try:
|
||||||
|
args = shlex.split(cmd)
|
||||||
|
if args:
|
||||||
|
return str(args[0] or "").strip()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return (cmd.split() or [""])[0].strip()
|
||||||
|
|
||||||
|
|
||||||
|
def systemctl_user(args: list[str], *, timeout: float = 4.0) -> tuple[int, str]:
|
||||||
|
cmd = ["systemctl", "--user"] + list(args or [])
|
||||||
|
try:
|
||||||
|
p = subprocess.run(cmd, capture_output=True, text=True, check=False, timeout=timeout)
|
||||||
|
except subprocess.TimeoutExpired as e:
|
||||||
|
return 124, f"timeout: {' '.join(cmd)}"
|
||||||
|
out = ((p.stdout or "") + (p.stderr or "")).strip()
|
||||||
|
return int(p.returncode or 0), out
|
||||||
|
|
||||||
|
|
||||||
|
def cgroup_path_from_pid(pid: int) -> str:
|
||||||
|
p = int(pid or 0)
|
||||||
|
if p <= 0:
|
||||||
|
return ""
|
||||||
|
try:
|
||||||
|
with open(f"/proc/{p}/cgroup", "r", encoding="utf-8", errors="replace") as f:
|
||||||
|
for raw in f:
|
||||||
|
line = (raw or "").strip()
|
||||||
|
if line.startswith("0::"):
|
||||||
|
cg = line[len("0::"):].strip()
|
||||||
|
return cg
|
||||||
|
except Exception:
|
||||||
|
return ""
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def effective_cgroup_for_unit(unit: str, *, timeout_sec: float = 2.5) -> str:
|
||||||
|
u = (unit or "").strip()
|
||||||
|
if not u:
|
||||||
|
return ""
|
||||||
|
deadline = time.time() + max(0.2, float(timeout_sec))
|
||||||
|
last = ""
|
||||||
|
while time.time() < deadline:
|
||||||
|
code, out = systemctl_user(["show", "-p", "MainPID", "--value", u], timeout=2.0)
|
||||||
|
last = out or ""
|
||||||
|
if code == 0:
|
||||||
|
try:
|
||||||
|
pid = int((out or "").strip() or "0")
|
||||||
|
except Exception:
|
||||||
|
pid = 0
|
||||||
|
if pid > 0:
|
||||||
|
cg = cgroup_path_from_pid(pid)
|
||||||
|
if cg:
|
||||||
|
return cg
|
||||||
|
time.sleep(0.1)
|
||||||
|
raise RuntimeError(f"failed to resolve effective cgroup for unit={u}: {last.strip() or '(no output)'}")
|
||||||
|
|
||||||
|
|
||||||
|
def run_systemd_unit(cmdline: str, *, unit: str) -> str:
|
||||||
|
cmd = (cmdline or "").strip()
|
||||||
|
if not cmd:
|
||||||
|
raise ValueError("empty command")
|
||||||
|
args = shlex.split(cmd)
|
||||||
|
if not args:
|
||||||
|
raise ValueError("empty args")
|
||||||
|
|
||||||
|
run_cmd = [
|
||||||
|
"systemd-run",
|
||||||
|
"--user",
|
||||||
|
"--unit",
|
||||||
|
unit,
|
||||||
|
"--collect",
|
||||||
|
"--same-dir",
|
||||||
|
] + args
|
||||||
|
|
||||||
|
try:
|
||||||
|
p = subprocess.run(run_cmd, capture_output=True, text=True, check=False, timeout=8)
|
||||||
|
except subprocess.TimeoutExpired as e:
|
||||||
|
raise RuntimeError("systemd-run timed out") from e
|
||||||
|
out = ((p.stdout or "") + (p.stderr or "")).strip()
|
||||||
|
if p.returncode != 0:
|
||||||
|
raise RuntimeError(f"systemd-run failed: rc={p.returncode}\n{out}".strip())
|
||||||
|
|
||||||
|
cg = effective_cgroup_for_unit(unit, timeout_sec=3.0)
|
||||||
|
return cg
|
||||||
|
|
||||||
|
|
||||||
|
def refresh_if_running(*, target: str, app_key: str, command: str, ttl_sec: int) -> bool:
|
||||||
|
tgt = (target or "").strip().lower()
|
||||||
|
key = (app_key or "").strip()
|
||||||
|
if tgt not in ("vpn", "direct") or not key:
|
||||||
|
return False
|
||||||
|
|
||||||
|
data = api_request("GET", "/api/v1/traffic/appmarks/items", timeout=4.0)
|
||||||
|
items = data.get("items") or []
|
||||||
|
if not isinstance(items, list):
|
||||||
|
return False
|
||||||
|
|
||||||
|
for it in items:
|
||||||
|
if not isinstance(it, dict):
|
||||||
|
continue
|
||||||
|
if str(it.get("target") or "").strip().lower() != tgt:
|
||||||
|
continue
|
||||||
|
if str(it.get("app_key") or "").strip() != key:
|
||||||
|
continue
|
||||||
|
unit = str(it.get("unit") or "").strip()
|
||||||
|
if not unit:
|
||||||
|
continue
|
||||||
|
code, out = systemctl_user(["is-active", unit], timeout=2.0)
|
||||||
|
if code != 0 or (out or "").strip().lower() != "active":
|
||||||
|
continue
|
||||||
|
|
||||||
|
cg = effective_cgroup_for_unit(unit, timeout_sec=2.5)
|
||||||
|
payload = {
|
||||||
|
"op": "add",
|
||||||
|
"target": tgt,
|
||||||
|
"cgroup": cg,
|
||||||
|
"unit": unit,
|
||||||
|
"command": command,
|
||||||
|
"app_key": key,
|
||||||
|
"timeout_sec": int(ttl_sec or 0),
|
||||||
|
}
|
||||||
|
res = api_request("POST", "/api/v1/traffic/appmarks", json_body=payload, timeout=4.0)
|
||||||
|
if not bool(res.get("ok", False)):
|
||||||
|
raise RuntimeError(f"appmark refresh failed: {res.get('message')}")
|
||||||
|
log(f"refreshed mark: target={tgt} app={key} unit={unit} cgroup_id={res.get('cgroup_id')}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def apply_mark(*, target: str, cgroup: str, unit: str, command: str, app_key: str, ttl_sec: int) -> None:
|
||||||
|
payload = {
|
||||||
|
"op": "add",
|
||||||
|
"target": target,
|
||||||
|
"cgroup": cgroup,
|
||||||
|
"unit": unit,
|
||||||
|
"command": command,
|
||||||
|
"app_key": app_key,
|
||||||
|
"timeout_sec": int(ttl_sec or 0),
|
||||||
|
}
|
||||||
|
res = api_request("POST", "/api/v1/traffic/appmarks", json_body=payload, timeout=4.0)
|
||||||
|
if not bool(res.get("ok", False)):
|
||||||
|
raise RuntimeError(f"appmark failed: {res.get('message')}")
|
||||||
|
log(f"mark added: target={target} app={app_key} unit={unit} cgroup_id={res.get('cgroup_id')} ttl={res.get('timeout_sec')}")
|
||||||
|
|
||||||
|
|
||||||
|
def main(argv: list[str]) -> int:
|
||||||
|
ap = argparse.ArgumentParser()
|
||||||
|
ap.add_argument("--id", required=True, help="profile id")
|
||||||
|
ap.add_argument("--json", action="store_true", help="print machine-readable json result")
|
||||||
|
args = ap.parse_args(argv)
|
||||||
|
|
||||||
|
pid = str(args.id or "").strip()
|
||||||
|
prof = get_profile(pid)
|
||||||
|
|
||||||
|
cmd = str(prof.get("command") or "").strip()
|
||||||
|
if not cmd:
|
||||||
|
raise RuntimeError("profile command is empty")
|
||||||
|
target = str(prof.get("target") or "vpn").strip().lower()
|
||||||
|
if target not in ("vpn", "direct"):
|
||||||
|
target = "vpn"
|
||||||
|
|
||||||
|
app_key = str(prof.get("app_key") or "").strip() or infer_app_key(cmd)
|
||||||
|
ttl = int(prof.get("ttl_sec", 0) or 0)
|
||||||
|
if ttl <= 0:
|
||||||
|
ttl = 24 * 60 * 60
|
||||||
|
|
||||||
|
# Try refresh first if already running.
|
||||||
|
if refresh_if_running(target=target, app_key=app_key, command=cmd, ttl_sec=ttl):
|
||||||
|
if args.json:
|
||||||
|
print(json.dumps({"ok": True, "op": "refresh", "id": pid, "target": target, "app_key": app_key}))
|
||||||
|
return 0
|
||||||
|
|
||||||
|
unit = f"svpn-{target}-{int(time.time())}.service"
|
||||||
|
log(f"launching profile id={pid} target={target} app={app_key} unit={unit}")
|
||||||
|
cg = run_systemd_unit(cmd, unit=unit)
|
||||||
|
log(f"ControlGroup: {cg}")
|
||||||
|
apply_mark(target=target, cgroup=cg, unit=unit, command=cmd, app_key=app_key, ttl_sec=ttl)
|
||||||
|
if args.json:
|
||||||
|
print(json.dumps({"ok": True, "op": "run", "id": pid, "target": target, "app_key": app_key, "unit": unit}))
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
raise SystemExit(main(sys.argv[1:]))
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
log(f"ERROR: {e}")
|
||||||
|
raise SystemExit(1)
|
||||||
|
|
||||||
@@ -285,6 +285,27 @@ RU: Восстанавливает маршруты/nft из последнег
|
|||||||
row_prof_btn.addStretch(1)
|
row_prof_btn.addStretch(1)
|
||||||
profiles_layout.addLayout(row_prof_btn)
|
profiles_layout.addLayout(row_prof_btn)
|
||||||
|
|
||||||
|
row_prof_shortcuts = QHBoxLayout()
|
||||||
|
self.btn_app_profile_shortcut_create = QPushButton("Create shortcut")
|
||||||
|
self.btn_app_profile_shortcut_create.setToolTip(
|
||||||
|
"EN: Create/overwrite a .desktop shortcut for the selected profile.\n"
|
||||||
|
"EN: The shortcut will launch the app and apply routing mark automatically.\n"
|
||||||
|
"RU: Создать/перезаписать .desktop ярлык для выбранного профиля.\n"
|
||||||
|
"RU: Ярлык запускает приложение и автоматически применяет routing mark."
|
||||||
|
)
|
||||||
|
self.btn_app_profile_shortcut_create.clicked.connect(self.on_app_profile_shortcut_create)
|
||||||
|
row_prof_shortcuts.addWidget(self.btn_app_profile_shortcut_create)
|
||||||
|
|
||||||
|
self.btn_app_profile_shortcut_remove = QPushButton("Remove shortcut")
|
||||||
|
self.btn_app_profile_shortcut_remove.setToolTip(
|
||||||
|
"EN: Remove the .desktop shortcut for the selected profile.\n"
|
||||||
|
"RU: Удалить .desktop ярлык для выбранного профиля."
|
||||||
|
)
|
||||||
|
self.btn_app_profile_shortcut_remove.clicked.connect(self.on_app_profile_shortcut_remove)
|
||||||
|
row_prof_shortcuts.addWidget(self.btn_app_profile_shortcut_remove)
|
||||||
|
row_prof_shortcuts.addStretch(1)
|
||||||
|
profiles_layout.addLayout(row_prof_shortcuts)
|
||||||
|
|
||||||
self.lst_app_profiles = QListWidget()
|
self.lst_app_profiles = QListWidget()
|
||||||
self.lst_app_profiles.setSelectionMode(QAbstractItemView.SingleSelection)
|
self.lst_app_profiles.setSelectionMode(QAbstractItemView.SingleSelection)
|
||||||
self.lst_app_profiles.setToolTip(
|
self.lst_app_profiles.setToolTip(
|
||||||
@@ -294,6 +315,9 @@ RU: Восстанавливает маршруты/nft из последнег
|
|||||||
self.lst_app_profiles.itemDoubleClicked.connect(
|
self.lst_app_profiles.itemDoubleClicked.connect(
|
||||||
lambda _it: self.on_app_profile_load()
|
lambda _it: self.on_app_profile_load()
|
||||||
)
|
)
|
||||||
|
self.lst_app_profiles.currentItemChanged.connect(
|
||||||
|
lambda _cur, _prev: self._update_profile_shortcut_ui()
|
||||||
|
)
|
||||||
self.lst_app_profiles.setFixedHeight(140)
|
self.lst_app_profiles.setFixedHeight(140)
|
||||||
profiles_layout.addWidget(self.lst_app_profiles)
|
profiles_layout.addWidget(self.lst_app_profiles)
|
||||||
|
|
||||||
@@ -301,6 +325,11 @@ RU: Восстанавливает маршруты/nft из последнег
|
|||||||
self.lbl_app_profiles.setStyleSheet("color: gray;")
|
self.lbl_app_profiles.setStyleSheet("color: gray;")
|
||||||
profiles_layout.addWidget(self.lbl_app_profiles)
|
profiles_layout.addWidget(self.lbl_app_profiles)
|
||||||
|
|
||||||
|
self.lbl_profile_shortcut = QLabel("Shortcut: —")
|
||||||
|
self.lbl_profile_shortcut.setWordWrap(True)
|
||||||
|
self.lbl_profile_shortcut.setStyleSheet("color: gray;")
|
||||||
|
profiles_layout.addWidget(self.lbl_profile_shortcut)
|
||||||
|
|
||||||
tab_profiles = QWidget()
|
tab_profiles = QWidget()
|
||||||
tab_profiles_layout = QVBoxLayout(tab_profiles)
|
tab_profiles_layout = QVBoxLayout(tab_profiles)
|
||||||
tab_profiles_layout.addWidget(profiles_group)
|
tab_profiles_layout.addWidget(profiles_group)
|
||||||
@@ -1138,6 +1167,70 @@ RU: Применяет policy-rules и проверяет health. При оши
|
|||||||
return None
|
return None
|
||||||
return it.data(QtCore.Qt.UserRole)
|
return it.data(QtCore.Qt.UserRole)
|
||||||
|
|
||||||
|
def _profile_shortcuts_dir(self) -> str:
|
||||||
|
# ~/.local/share/applications is the standard per-user location.
|
||||||
|
return os.path.join(os.path.expanduser("~"), ".local", "share", "applications")
|
||||||
|
|
||||||
|
def _profile_shortcut_path(self, profile_id: str) -> str:
|
||||||
|
pid = (profile_id or "").strip()
|
||||||
|
if not pid:
|
||||||
|
return ""
|
||||||
|
safe = re.sub(r"[^A-Za-z0-9._-]+", "-", pid).strip("-")
|
||||||
|
if not safe:
|
||||||
|
safe = "profile"
|
||||||
|
return os.path.join(self._profile_shortcuts_dir(), f"svpn-profile-{safe}.desktop")
|
||||||
|
|
||||||
|
def _profile_shortcut_exists(self, profile_id: str) -> bool:
|
||||||
|
p = self._profile_shortcut_path(profile_id)
|
||||||
|
return bool(p) and os.path.isfile(p)
|
||||||
|
|
||||||
|
def _render_profile_shortcut(self, prof) -> str:
|
||||||
|
pid = (getattr(prof, "id", "") or "").strip()
|
||||||
|
name = (getattr(prof, "name", "") or "").strip() or pid or "SVPN profile"
|
||||||
|
target = (getattr(prof, "target", "") or "").strip().lower()
|
||||||
|
if target not in ("vpn", "direct"):
|
||||||
|
target = "vpn"
|
||||||
|
|
||||||
|
label_target = "VPN" if target == "vpn" else "Direct"
|
||||||
|
|
||||||
|
script = os.path.abspath(os.path.join(os.path.dirname(__file__), "svpn_run_profile.py"))
|
||||||
|
# Use env python3 so the shortcut works even if python3 is not /usr/bin/python3.
|
||||||
|
exec_line = f"/usr/bin/env python3 {script} --id {pid}"
|
||||||
|
|
||||||
|
# Keep .desktop content ASCII-ish. Values are UTF-8-safe by spec, but avoid surprises.
|
||||||
|
name_safe = (name or "SVPN profile").replace("\n", " ").replace("\r", " ").strip()
|
||||||
|
|
||||||
|
return (
|
||||||
|
"[Desktop Entry]\n"
|
||||||
|
"Version=1.0\n"
|
||||||
|
"Type=Application\n"
|
||||||
|
f"Name=SVPN: {name_safe} [{label_target}]\n"
|
||||||
|
f"Comment=Selective VPN: run traffic profile id={pid} target={target}\n"
|
||||||
|
f"Exec={exec_line}\n"
|
||||||
|
"Terminal=false\n"
|
||||||
|
"Categories=Network;\n"
|
||||||
|
f"X-SVPN-ProfileID={pid}\n"
|
||||||
|
f"X-SVPN-Target={target}\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _update_profile_shortcut_ui(self) -> None:
|
||||||
|
prof = self._selected_app_profile()
|
||||||
|
if prof is None:
|
||||||
|
self.btn_app_profile_shortcut_create.setEnabled(False)
|
||||||
|
self.btn_app_profile_shortcut_remove.setEnabled(False)
|
||||||
|
self.lbl_profile_shortcut.setText("Shortcut: —")
|
||||||
|
return
|
||||||
|
|
||||||
|
pid = (getattr(prof, "id", "") or "").strip()
|
||||||
|
path = self._profile_shortcut_path(pid)
|
||||||
|
installed = self._profile_shortcut_exists(pid)
|
||||||
|
|
||||||
|
self.btn_app_profile_shortcut_create.setEnabled(True)
|
||||||
|
self.btn_app_profile_shortcut_remove.setEnabled(installed)
|
||||||
|
|
||||||
|
state = "installed" if installed else "not installed"
|
||||||
|
self.lbl_profile_shortcut.setText(f"Shortcut: {state} ({path})")
|
||||||
|
|
||||||
def refresh_app_profiles(self, quiet: bool = False) -> None:
|
def refresh_app_profiles(self, quiet: bool = False) -> None:
|
||||||
def work() -> None:
|
def work() -> None:
|
||||||
profs = list(self.ctrl.traffic_app_profiles_list() or [])
|
profs = list(self.ctrl.traffic_app_profiles_list() or [])
|
||||||
@@ -1156,12 +1249,16 @@ RU: Применяет policy-rules и проверяет health. При оши
|
|||||||
if target in ("vpn", "direct"):
|
if target in ("vpn", "direct"):
|
||||||
label += f" [{target}]"
|
label += f" [{target}]"
|
||||||
it = QListWidgetItem(label)
|
it = QListWidgetItem(label)
|
||||||
|
sc_path = self._profile_shortcut_path(pid)
|
||||||
|
sc_state = "yes" if (sc_path and os.path.isfile(sc_path)) else "no"
|
||||||
it.setToolTip(
|
it.setToolTip(
|
||||||
(
|
(
|
||||||
f"id: {pid}\n"
|
f"id: {pid}\n"
|
||||||
f"app_key: {app_key}\n"
|
f"app_key: {app_key}\n"
|
||||||
f"target: {target}\n"
|
f"target: {target}\n"
|
||||||
f"ttl: {ttl_sec}s\n\n"
|
f"ttl: {ttl_sec}s\n\n"
|
||||||
|
f"shortcut: {sc_state}\n"
|
||||||
|
f"shortcut_path: {sc_path}\n\n"
|
||||||
f"{cmd}"
|
f"{cmd}"
|
||||||
).strip()
|
).strip()
|
||||||
)
|
)
|
||||||
@@ -1169,6 +1266,7 @@ RU: Применяет policy-rules и проверяет health. При оши
|
|||||||
self.lst_app_profiles.addItem(it)
|
self.lst_app_profiles.addItem(it)
|
||||||
|
|
||||||
self.lbl_app_profiles.setText(f"Saved profiles: {len(profs)}")
|
self.lbl_app_profiles.setText(f"Saved profiles: {len(profs)}")
|
||||||
|
self._update_profile_shortcut_ui()
|
||||||
|
|
||||||
if quiet:
|
if quiet:
|
||||||
try:
|
try:
|
||||||
@@ -1208,6 +1306,18 @@ RU: Применяет policy-rules и проверяет health. При оши
|
|||||||
self._append_app_log(f"[profile] saved: id={pid or '-'} target={target} app={app_key or '-'}")
|
self._append_app_log(f"[profile] saved: id={pid or '-'} target={target} app={app_key or '-'}")
|
||||||
self.refresh_app_profiles(quiet=True)
|
self.refresh_app_profiles(quiet=True)
|
||||||
|
|
||||||
|
# If shortcut exists already, rewrite it to reflect updated name/target.
|
||||||
|
if prof is not None and pid and self._profile_shortcut_exists(pid):
|
||||||
|
try:
|
||||||
|
sc_path = self._profile_shortcut_path(pid)
|
||||||
|
os.makedirs(os.path.dirname(sc_path), exist_ok=True)
|
||||||
|
with open(sc_path, "w", encoding="utf-8", errors="replace") as f:
|
||||||
|
f.write(self._render_profile_shortcut(prof))
|
||||||
|
self._append_app_log(f"[shortcut] updated: {sc_path}")
|
||||||
|
except Exception as e:
|
||||||
|
# Non-fatal: profile save is more important than shortcut rewrite.
|
||||||
|
self._append_app_log(f"[shortcut] update failed: {e}")
|
||||||
|
|
||||||
# Best-effort select newly saved profile.
|
# Best-effort select newly saved profile.
|
||||||
if pid:
|
if pid:
|
||||||
for i in range(self.lst_app_profiles.count()):
|
for i in range(self.lst_app_profiles.count()):
|
||||||
@@ -1221,6 +1331,55 @@ RU: Применяет policy-rules и проверяет health. При оши
|
|||||||
|
|
||||||
self._safe(work, title="Save profile error")
|
self._safe(work, title="Save profile error")
|
||||||
|
|
||||||
|
def on_app_profile_shortcut_create(self) -> None:
|
||||||
|
prof = self._selected_app_profile()
|
||||||
|
if prof is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
pid = (getattr(prof, "id", "") or "").strip()
|
||||||
|
if not pid:
|
||||||
|
return
|
||||||
|
|
||||||
|
def work() -> None:
|
||||||
|
path = self._profile_shortcut_path(pid)
|
||||||
|
if not path:
|
||||||
|
raise RuntimeError("cannot derive shortcut path")
|
||||||
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||||
|
with open(path, "w", encoding="utf-8", errors="replace") as f:
|
||||||
|
f.write(self._render_profile_shortcut(prof))
|
||||||
|
self._append_app_log(f"[shortcut] saved: {path} id={pid}")
|
||||||
|
self._set_action_status(f"Shortcut saved: {os.path.basename(path)}", ok=True)
|
||||||
|
self._update_profile_shortcut_ui()
|
||||||
|
|
||||||
|
self._safe(work, title="Create shortcut error")
|
||||||
|
|
||||||
|
def on_app_profile_shortcut_remove(self) -> None:
|
||||||
|
prof = self._selected_app_profile()
|
||||||
|
if prof is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
pid = (getattr(prof, "id", "") or "").strip()
|
||||||
|
if not pid:
|
||||||
|
return
|
||||||
|
|
||||||
|
def work() -> None:
|
||||||
|
path = self._profile_shortcut_path(pid)
|
||||||
|
if not path:
|
||||||
|
raise RuntimeError("cannot derive shortcut path")
|
||||||
|
if not os.path.exists(path):
|
||||||
|
self._set_action_status("Shortcut not installed", ok=True)
|
||||||
|
self._update_profile_shortcut_ui()
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
os.remove(path)
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
self._append_app_log(f"[shortcut] removed: {path} id={pid}")
|
||||||
|
self._set_action_status("Shortcut removed", ok=True)
|
||||||
|
self._update_profile_shortcut_ui()
|
||||||
|
|
||||||
|
self._safe(work, title="Remove shortcut error")
|
||||||
|
|
||||||
def on_app_profile_load(self) -> None:
|
def on_app_profile_load(self) -> None:
|
||||||
prof = self._selected_app_profile()
|
prof = self._selected_app_profile()
|
||||||
if prof is None:
|
if prof is None:
|
||||||
@@ -1290,6 +1449,15 @@ RU: Применяет policy-rules и проверяет health. При оши
|
|||||||
self._set_action_status(f"Delete profile failed: {res.message}", ok=False)
|
self._set_action_status(f"Delete profile failed: {res.message}", ok=False)
|
||||||
raise RuntimeError(res.message or "delete failed")
|
raise RuntimeError(res.message or "delete failed")
|
||||||
|
|
||||||
|
# Keep a tight coupling: deleting profile removes its shortcut too.
|
||||||
|
sc_path = self._profile_shortcut_path(pid)
|
||||||
|
if sc_path and os.path.exists(sc_path):
|
||||||
|
try:
|
||||||
|
os.remove(sc_path)
|
||||||
|
self._append_app_log(f"[shortcut] removed: {sc_path} (profile deleted)")
|
||||||
|
except Exception as e:
|
||||||
|
self._append_app_log(f"[shortcut] remove failed: {e}")
|
||||||
|
|
||||||
self._set_action_status(f"Profile deleted: {pid}", ok=True)
|
self._set_action_status(f"Profile deleted: {pid}", ok=True)
|
||||||
self._append_app_log(f"[profile] deleted: id={pid}")
|
self._append_app_log(f"[profile] deleted: id={pid}")
|
||||||
self.refresh_app_profiles(quiet=True)
|
self.refresh_app_profiles(quiet=True)
|
||||||
|
|||||||
Reference in New Issue
Block a user