diff --git a/selective-vpn-gui/traffic_mode_dialog.py b/selective-vpn-gui/traffic_mode_dialog.py index cca8c86..de0b49b 100644 --- a/selective-vpn-gui/traffic_mode_dialog.py +++ b/selective-vpn-gui/traffic_mode_dialog.py @@ -1,11 +1,15 @@ #!/usr/bin/env python3 from __future__ import annotations +import configparser import os +import pathlib +import re import shlex import subprocess import time -from typing import Callable +from dataclasses import dataclass +from typing import Callable, Optional from PySide6 import QtCore, QtGui from PySide6.QtWidgets import ( @@ -33,6 +37,19 @@ from PySide6.QtWidgets import ( from dashboard_controller import DashboardController +_DESKTOP_BOOL_TRUE = {"1", "true", "yes", "y", "on"} +_DESKTOP_EXEC_FIELD_RE = re.compile(r"%[A-Za-z]") + + +@dataclass(frozen=True) +class DesktopAppEntry: + desktop_id: str + name: str + exec_raw: str + path: str + source: str # system|flatpak|user + + class TrafficModeDialog(QDialog): def __init__( self, @@ -52,6 +69,16 @@ class TrafficModeDialog(QDialog): root = QVBoxLayout(self) + # EN: Persist small UI state across dialog sessions. + # RU: Сохраняем небольшой UI state между открытиями окна. + self._settings = QtCore.QSettings("AdGuardVPN", "SelectiveVPNDashboardQt") + self._last_app_unit: str = str(self._settings.value("traffic_app_last_unit", "") or "") + self._last_app_target: str = str(self._settings.value("traffic_app_last_target", "") or "") + try: + self._last_app_cgroup_id: int = int(self._settings.value("traffic_app_last_cgroup_id", 0) or 0) + except Exception: + self._last_app_cgroup_id = 0 + hint_group = QGroupBox("Mode behavior") hint_layout = QVBoxLayout(hint_group) hint_layout.addWidget(QLabel("Selective: only marked traffic goes via VPN.")) @@ -190,6 +217,13 @@ RU: Восстанавливает маршруты/nft из последнег "RU: Команда запуска. Запускается от текущего пользователя в systemd --user scope." ) row_cmd.addWidget(self.ed_app_cmd, stretch=1) + self.btn_app_pick = QPushButton("Pick app...") + self.btn_app_pick.setToolTip( + "EN: Pick an installed app from .desktop entries (system + flatpak) and fill the command.\n" + "RU: Выбрать установленное приложение из .desktop (system + flatpak) и заполнить команду." + ) + self.btn_app_pick.clicked.connect(self.on_app_pick) + row_cmd.addWidget(self.btn_app_pick) run_layout.addLayout(row_cmd) row_target = QHBoxLayout() @@ -233,20 +267,42 @@ RU: Восстанавливает маршруты/nft из последнег self.btn_app_refresh = QPushButton("Refresh counts") self.btn_app_refresh.clicked.connect(self.refresh_appmarks_counts) row_btn.addWidget(self.btn_app_refresh) + row_btn.addStretch(1) + run_layout.addLayout(row_btn) + + row_btn2 = QHBoxLayout() self.btn_app_clear_vpn = QPushButton("Clear VPN marks") self.btn_app_clear_vpn.clicked.connect(lambda: self.on_appmarks_clear("vpn")) - row_btn.addWidget(self.btn_app_clear_vpn) + row_btn2.addWidget(self.btn_app_clear_vpn) self.btn_app_clear_direct = QPushButton("Clear Direct marks") self.btn_app_clear_direct.clicked.connect( lambda: self.on_appmarks_clear("direct") ) - row_btn.addWidget(self.btn_app_clear_direct) - row_btn.addStretch(1) - run_layout.addLayout(row_btn) + row_btn2.addWidget(self.btn_app_clear_direct) + self.btn_app_stop_last = QPushButton("Stop last scope") + self.btn_app_stop_last.setToolTip( + "EN: Stops the last launched systemd --user scope.\n" + "RU: Останавливает последний запущенный systemd --user scope." + ) + self.btn_app_stop_last.clicked.connect(self.on_app_stop_last_scope) + row_btn2.addWidget(self.btn_app_stop_last) + self.btn_app_unmark_last = QPushButton("Unmark last") + self.btn_app_unmark_last.setToolTip( + "EN: Removes routing mark for the last launched scope (by cgroup id).\n" + "RU: Удаляет метку маршрутизации для последнего scope (по cgroup id)." + ) + self.btn_app_unmark_last.clicked.connect(self.on_app_unmark_last) + row_btn2.addWidget(self.btn_app_unmark_last) + row_btn2.addStretch(1) + run_layout.addLayout(row_btn2) self.lbl_app_counts = QLabel("Marks: —") self.lbl_app_counts.setStyleSheet("color: gray;") run_layout.addWidget(self.lbl_app_counts) + self.lbl_app_last = QLabel("Last scope: —") + self.lbl_app_last.setStyleSheet("color: gray;") + run_layout.addWidget(self.lbl_app_last) + self._refresh_last_scope_ui() tab_apps_layout.addWidget(run_group) @@ -749,6 +805,53 @@ RU: Применяет policy-rules и проверяет health. При оши except Exception as e: self.lbl_app_counts.setText(f"Marks: error: {e}") + def _refresh_last_scope_ui(self) -> None: + unit = (self._last_app_unit or "").strip() + target = (self._last_app_target or "").strip().lower() + cg_id = int(self._last_app_cgroup_id or 0) + + parts = [] + if unit: + parts.append(f"unit={unit}") + if target in ("vpn", "direct"): + parts.append(f"target={target}") + if cg_id > 0: + parts.append(f"cgroup_id={cg_id}") + + if parts: + self.lbl_app_last.setText("Last scope: " + " | ".join(parts)) + else: + self.lbl_app_last.setText("Last scope: —") + + self.btn_app_stop_last.setEnabled(bool(unit)) + self.btn_app_unmark_last.setEnabled(bool(unit) and target in ("vpn", "direct") and cg_id > 0) + + def _set_last_scope(self, *, unit: str = "", target: str = "", cgroup_id: int = 0) -> None: + self._last_app_unit = str(unit or "").strip() + self._last_app_target = str(target or "").strip().lower() + try: + self._last_app_cgroup_id = int(cgroup_id or 0) + except Exception: + self._last_app_cgroup_id = 0 + + self._settings.setValue("traffic_app_last_unit", self._last_app_unit) + self._settings.setValue("traffic_app_last_target", self._last_app_target) + self._settings.setValue("traffic_app_last_cgroup_id", int(self._last_app_cgroup_id or 0)) + self._refresh_last_scope_ui() + + def on_app_pick(self) -> None: + def work() -> None: + dlg = AppPickerDialog(parent=self) + if dlg.exec() != QDialog.Accepted: + return + cmd = (dlg.selected_command() or "").strip() + if not cmd: + return + self.ed_app_cmd.setText(cmd) + self._append_app_log(f"[picker] command filled: {cmd}") + + self._safe(work, title="App picker error") + def _run_systemd_scope(self, cmdline: str, *, unit: str) -> tuple[str, str]: args = shlex.split(cmdline or "") if not args: @@ -801,6 +904,7 @@ RU: Применяет policy-rules и проверяет health. При оши if out: self._append_app_log(f"[app] systemd-run:\n{out}") self._append_app_log(f"[app] ControlGroup: {cg}") + self._set_last_scope(unit=unit, target=target, cgroup_id=0) res = self.ctrl.traffic_appmarks_apply( op="add", @@ -816,6 +920,7 @@ RU: Применяет policy-rules и проверяет health. При оши f"App mark added: target={target} cgroup_id={res.cgroup_id}", ok=True, ) + self._set_last_scope(unit=unit, target=target, cgroup_id=int(res.cgroup_id or 0)) else: self._append_app_log(f"[appmarks] ERROR: {res.message}") self._set_action_status( @@ -832,6 +937,85 @@ RU: Применяет policy-rules и проверяет health. При оши self._safe(work, title="Run app error") + def on_app_stop_last_scope(self) -> None: + unit = (self._last_app_unit or "").strip() + if not unit: + return + + def work() -> None: + self._append_app_log(f"[app] stop last scope: unit={unit}") + p = subprocess.run( + ["systemctl", "--user", "stop", unit], + capture_output=True, + text=True, + check=False, + ) + out = ((p.stdout or "") + (p.stderr or "")).strip() + if p.returncode == 0: + self._append_app_log("[app] stopped OK") + if out: + self._append_app_log(out) + self._set_action_status(f"Stopped scope: {unit}", ok=True) + return + + self._append_app_log(f"[app] stop failed: rc={p.returncode}") + if out: + self._append_app_log(out) + # fallback: kill + p2 = subprocess.run( + ["systemctl", "--user", "kill", unit], + capture_output=True, + text=True, + check=False, + ) + out2 = ((p2.stdout or "") + (p2.stderr or "")).strip() + if p2.returncode == 0: + self._append_app_log("[app] kill OK (fallback)") + if out2: + self._append_app_log(out2) + self._set_action_status(f"Killed scope: {unit}", ok=True) + return + + self._append_app_log(f"[app] kill failed: rc={p2.returncode}") + if out2: + self._append_app_log(out2) + self._set_action_status(f"Stop scope failed: {unit}", ok=False) + QMessageBox.critical(self, "Stop scope error", out2 or out or "stop failed") + + self._safe(work, title="Stop scope error") + + def on_app_unmark_last(self) -> None: + unit = (self._last_app_unit or "").strip() + target = (self._last_app_target or "").strip().lower() + cg_id = int(self._last_app_cgroup_id or 0) + if not unit or target not in ("vpn", "direct") or cg_id <= 0: + return + + def work() -> None: + if QMessageBox.question( + self, + "Unmark last", + f"Remove routing mark for last scope?\n\nunit={unit}\ntarget={target}\ncgroup_id={cg_id}", + ) != QMessageBox.StandardButton.Yes: + return + + res = self.ctrl.traffic_appmarks_apply( + op="del", + target=target, + cgroup=str(cg_id), + ) + if res.ok: + self._append_app_log(f"[appmarks] unmarked: target={target} cgroup_id={cg_id}") + self._set_action_status(f"Unmarked last: target={target} cgroup_id={cg_id}", ok=True) + else: + self._append_app_log(f"[appmarks] unmark error: {res.message}") + self._set_action_status(f"Unmark last failed: {res.message}", ok=False) + QMessageBox.critical(self, "Unmark error", res.message or "unmark failed") + + self.refresh_appmarks_counts() + + self._safe(work, title="Unmark error") + def on_appmarks_clear(self, target: str) -> None: tgt = (target or "").strip().lower() if tgt not in ("vpn", "direct"): @@ -1450,3 +1634,285 @@ RU: Скрывает элементы, которые уже есть в Force V layout.addLayout(row) self._add_tab("UIDs", "uid", items, extra=extra) + + +# --------------------------------------------------------------------- +# App picker (.desktop entries) +# --------------------------------------------------------------------- + + +def _desktop_bool(v: str) -> bool: + return str(v or "").strip().lower() in _DESKTOP_BOOL_TRUE + + +def _desktop_name_from_section(sec: configparser.SectionProxy) -> str: + name = str(sec.get("Name", "") or "").strip() + if name: + return name + + # Prefer Russian if present, then English, then any localized variant. + for key in ("Name[ru]", "Name[en_US]", "Name[en]"): + v = str(sec.get(key, "") or "").strip() + if v: + return v + + for k, v in sec.items(): + kk = str(k or "") + if kk.startswith("Name[") and str(v or "").strip(): + return str(v).strip() + return "" + + +def _sanitize_desktop_exec(exec_raw: str, *, name: str = "", desktop_path: str = "") -> str: + raw = str(exec_raw or "").strip() + if not raw: + return "" + + # Desktop spec: "%%" -> literal "%". + raw = raw.replace("%%", "%") + + # Best-effort expansion for a couple of common fields. + if name: + raw = raw.replace("%c", name) + if desktop_path: + raw = raw.replace("%k", desktop_path) + + try: + tokens = shlex.split(raw) + except Exception: + tokens = raw.split() + + out: list[str] = [] + for t in tokens: + tok = str(t or "").strip() + if not tok: + continue + # Drop standalone field codes (%u, %U, %f, ...). + if len(tok) == 2 and tok.startswith("%"): + continue + # Remove field codes inside tokens (e.g. --name=%c). + tok = _DESKTOP_EXEC_FIELD_RE.sub("", tok).strip() + if tok: + out.append(tok) + + if not out: + return "" + return " ".join(shlex.quote(x) for x in out) + + +def _desktop_entry_from_file(path: str, *, source: str) -> Optional[DesktopAppEntry]: + p = str(path or "").strip() + if not p or not p.endswith(".desktop"): + return None + + cp = configparser.ConfigParser(interpolation=None, strict=False) + cp.optionxform = str # keep case + try: + cp.read(p, encoding="utf-8") + except Exception: + try: + data = pathlib.Path(p).read_bytes() + cp.read_string(data.decode("utf-8", errors="replace")) + except Exception: + return None + + if "Desktop Entry" not in cp: + return None + + sec = cp["Desktop Entry"] + if _desktop_bool(sec.get("Hidden", "")) or _desktop_bool(sec.get("NoDisplay", "")): + return None + + typ = str(sec.get("Type", "") or "").strip().lower() + if typ and typ != "application": + return None + + exec_raw = str(sec.get("Exec", "") or "").strip() + if not exec_raw: + return None + + name = _desktop_name_from_section(sec) + desktop_id = pathlib.Path(p).name + if not name: + name = desktop_id.replace(".desktop", "") + + src = str(source or "").strip().lower() or "system" + return DesktopAppEntry( + desktop_id=desktop_id, + name=name, + exec_raw=exec_raw, + path=p, + source=src, + ) + + +def _scan_desktop_entries() -> list[DesktopAppEntry]: + home = os.path.expanduser("~") + dirs: list[tuple[str, str]] = [ + (os.path.join(home, ".local/share/applications"), "user"), + ("/usr/local/share/applications", "system"), + ("/usr/share/applications", "system"), + (os.path.join(home, ".local/share/flatpak/exports/share/applications"), "flatpak"), + ("/var/lib/flatpak/exports/share/applications", "flatpak"), + ] + + seen: set[tuple[str, str]] = set() + out: list[DesktopAppEntry] = [] + for d, src in dirs: + if not d or not os.path.isdir(d): + continue + try: + paths = sorted(pathlib.Path(d).glob("*.desktop")) + except Exception: + continue + for fp in paths: + ent = _desktop_entry_from_file(str(fp), source=src) + if ent is None: + continue + key = (ent.desktop_id, ent.source) + if key in seen: + continue + seen.add(key) + out.append(ent) + + out.sort(key=lambda e: (e.name.lower(), e.source, e.desktop_id.lower())) + return out + + +class AppPickerDialog(QDialog): + def __init__(self, *, parent=None) -> None: + super().__init__(parent) + self.setWindowTitle("Pick app (.desktop)") + self.resize(860, 640) + + self._selected_cmd: str = "" + self._entries: list[DesktopAppEntry] = _scan_desktop_entries() + + root = QVBoxLayout(self) + + note = QLabel( + "EN: Pick an installed GUI app from .desktop entries (system + flatpak). " + "The command will be filled without %u/%U/%f placeholders.\n" + "RU: Выбери приложение из .desktop (system + flatpak). " + "Команда будет заполнена без плейсхолдеров %u/%U/%f." + ) + note.setWordWrap(True) + note.setStyleSheet("color: gray;") + root.addWidget(note) + + row = QHBoxLayout() + row.addWidget(QLabel("Search")) + self.ed_search = QLineEdit() + self.ed_search.setPlaceholderText("Type to filter (name / id / exec)...") + row.addWidget(self.ed_search, stretch=1) + self.lbl_count = QLabel("—") + self.lbl_count.setStyleSheet("color: gray;") + row.addWidget(self.lbl_count) + root.addLayout(row) + + self.lst = QListWidget() + self.lst.setSelectionMode(QAbstractItemView.SingleSelection) + root.addWidget(self.lst, stretch=1) + + self.preview = QPlainTextEdit() + self.preview.setReadOnly(True) + self.preview.setFixedHeight(180) + root.addWidget(self.preview) + + row_btn = QHBoxLayout() + self.btn_use = QPushButton("Use selected") + self.btn_use.clicked.connect(self.on_use_selected) + row_btn.addWidget(self.btn_use) + row_btn.addStretch(1) + btn_close = QPushButton("Close") + btn_close.clicked.connect(self.reject) + row_btn.addWidget(btn_close) + root.addLayout(row_btn) + + self._populate() + self.ed_search.textChanged.connect(lambda _t: self._apply_filter()) + self.lst.currentItemChanged.connect(lambda _a, _b: self._update_preview()) + self.lst.itemDoubleClicked.connect(lambda _it: self.on_use_selected()) + + QtCore.QTimer.singleShot(0, self._apply_filter) + + def selected_command(self) -> str: + return str(self._selected_cmd or "") + + def _populate(self) -> None: + self.lst.clear() + for ent in self._entries: + src = ent.source + label = f"{ent.name} [{src}] ({ent.desktop_id})" + tip = ( + f"Name: {ent.name}\n" + f"ID: {ent.desktop_id}\n" + f"Source: {src}\n" + f"Path: {ent.path}\n\n" + f"Exec: {ent.exec_raw}" + ) + it = QListWidgetItem(label) + it.setToolTip(tip) + it.setData(QtCore.Qt.UserRole, ent) + # precomputed search string + it.setData( + QtCore.Qt.UserRole + 1, + (label + "\n" + ent.exec_raw + "\n" + ent.path).lower(), + ) + self.lst.addItem(it) + + if self.lst.count() > 0: + self.lst.setCurrentRow(0) + self._update_preview() + + def _apply_filter(self) -> None: + q = (self.ed_search.text() or "").strip().lower() + shown = 0 + for i in range(self.lst.count()): + it = self.lst.item(i) + if not it: + continue + hay = str(it.data(QtCore.Qt.UserRole + 1) or "") + hide = bool(q) and q not in hay + it.setHidden(hide) + if not hide: + shown += 1 + self.lbl_count.setText(f"Apps: {shown}/{self.lst.count()}") + + def _current_entry(self) -> Optional[DesktopAppEntry]: + it = self.lst.currentItem() + if not it: + return None + ent = it.data(QtCore.Qt.UserRole) + if isinstance(ent, DesktopAppEntry): + return ent + return None + + def _update_preview(self) -> None: + ent = self._current_entry() + if ent is None: + self.preview.setPlainText("—") + self.btn_use.setEnabled(False) + return + cmd = _sanitize_desktop_exec(ent.exec_raw, name=ent.name, desktop_path=ent.path) + text = ( + f"Name: {ent.name}\n" + f"ID: {ent.desktop_id}\n" + f"Source: {ent.source}\n" + f"Path: {ent.path}\n\n" + f"Exec (raw):\n{ent.exec_raw}\n\n" + f"Command (sanitized):\n{cmd}" + ) + self.preview.setPlainText(text) + self.btn_use.setEnabled(bool(cmd.strip())) + + def on_use_selected(self) -> None: + ent = self._current_entry() + if ent is None: + return + cmd = _sanitize_desktop_exec(ent.exec_raw, name=ent.name, desktop_path=ent.path).strip() + if not cmd: + QMessageBox.warning(self, "No command", "Selected app has no usable Exec command.") + return + self._selected_cmd = cmd + self.accept()