#!/usr/bin/env python3 from __future__ import annotations import configparser import os import pathlib import pwd import re import shlex import subprocess import time from dataclasses import dataclass from typing import Callable, Optional from PySide6 import QtCore, QtGui from PySide6.QtWidgets import ( QButtonGroup, QCheckBox, QComboBox, QDialog, QGroupBox, QHBoxLayout, QLabel, QLineEdit, QListWidget, QListWidgetItem, QAbstractItemView, QMessageBox, QPlainTextEdit, QPushButton, QRadioButton, QSpinBox, QTabWidget, QVBoxLayout, QWidget, ) from dashboard_controller import DashboardController _DESKTOP_BOOL_TRUE = {"1", "true", "yes", "y", "on"} _DESKTOP_EXEC_FIELD_RE = re.compile(r"%[A-Za-z]") @dataclass(frozen=True) class DesktopAppEntry: desktop_id: str name: str exec_raw: str path: str source: str # system|flatpak|user @dataclass(frozen=True) class RuntimeScopeInfo: unit: str target: str # vpn|direct|? cgroup_path: str cgroup_id: int @dataclass(frozen=True) class RunningProcessEntry: pid: int uid: int user: str comm: str exe: str cmdline: str cgroup: str class TrafficModeDialog(QDialog): def __init__( self, controller: DashboardController, *, log_cb: Callable[[str], None] | None = None, refresh_cb: Callable[[], None] | None = None, parent=None, ) -> None: super().__init__(parent) self.ctrl = controller self.log_cb = log_cb self.refresh_cb = refresh_cb self.setWindowTitle("Traffic mode settings") self.resize(780, 760) root = QVBoxLayout(self) # EN: Persist small UI state across dialog sessions. # RU: Сохраняем небольшой UI state между открытиями окна. self._settings = QtCore.QSettings("AdGuardVPN", "SelectiveVPNDashboardQt") self._last_app_unit: str = str(self._settings.value("traffic_app_last_unit", "") or "") self._last_app_target: str = str(self._settings.value("traffic_app_last_target", "") or "") self._last_app_key: str = str(self._settings.value("traffic_app_last_key", "") or "") self._last_app_cmdline: str = str(self._settings.value("traffic_app_last_cmdline", "") or "") try: self._last_app_cgroup_id: int = int(self._settings.value("traffic_app_last_cgroup_id", 0) or 0) except Exception: self._last_app_cgroup_id = 0 self._adv_auto_local_bypass: bool = True self._adv_ingress_reply_bypass: bool = False hint_group = QGroupBox("Mode behavior") hint_layout = QVBoxLayout(hint_group) hint_layout.addWidget(QLabel("Selective: only marked traffic goes via VPN.")) hint_layout.addWidget(QLabel("Full tunnel: all traffic goes via VPN.")) hint_layout.addWidget(QLabel("Direct: VPN routing rules are disabled.")) warn = QLabel( "Warning: Full tunnel can break local/LAN access depending on your host routes." ) warn.setStyleSheet("color: red;") hint_layout.addWidget(warn) root.addWidget(hint_group) tip = QLabel("Tip: hover any control for help. Подсказка: наведи на элемент для описания.") tip.setWordWrap(True) tip.setStyleSheet("color: gray;") root.addWidget(tip) self.tabs = QTabWidget() root.addWidget(self.tabs, stretch=1) tab_basic = QWidget() tab_basic_layout = QVBoxLayout(tab_basic) mode_group = QGroupBox("Traffic mode relay") mode_layout = QVBoxLayout(mode_group) row_mode = QHBoxLayout() self.rad_selective = QRadioButton("Selective") self.rad_selective.setToolTip("""EN: Only marked traffic (fwmark 0x66) uses VPN policy table (agvpn). RU: Только помеченный трафик (fwmark 0x66) идет через policy-table (agvpn).""") self.rad_selective.toggled.connect( lambda checked: self.on_mode_toggle("selective", checked) ) row_mode.addWidget(self.rad_selective) self.rad_full = QRadioButton("Full tunnel") self.rad_full.setToolTip("""EN: All traffic uses VPN policy table (agvpn). Use with auto-local bypass for LAN/docker. RU: Весь трафик идет через policy-table (agvpn). Для LAN/docker включай auto-local bypass.""") self.rad_full.toggled.connect( lambda checked: self.on_mode_toggle("full_tunnel", checked) ) row_mode.addWidget(self.rad_full) self.rad_direct = QRadioButton("Direct") self.rad_direct.setToolTip("""EN: Disables base VPN routing rules (no full/selective rule). RU: Отключает базовые VPN policy-rules (нет full/selective правила).""") self.rad_direct.toggled.connect( lambda checked: self.on_mode_toggle("direct", checked) ) row_mode.addWidget(self.rad_direct) row_mode.addStretch(1) mode_layout.addLayout(row_mode) row_iface = QHBoxLayout() row_iface.addWidget(QLabel("Preferred iface")) self.cmb_iface = QComboBox() self.cmb_iface.setToolTip("""EN: VPN interface for policy routing. Use auto unless you know the exact iface. RU: Интерфейс VPN для policy routing. Оставь auto, если не уверен.""") self.cmb_iface.setEditable(True) self.cmb_iface.setInsertPolicy(QComboBox.NoInsert) self.cmb_iface.setMinimumWidth(180) row_iface.addWidget(self.cmb_iface) self.btn_refresh_ifaces = QPushButton("Detect ifaces") self.btn_refresh_ifaces.setToolTip("""EN: Refresh list of available interfaces (UP). RU: Обновить список доступных интерфейсов (UP).""") self.btn_refresh_ifaces.clicked.connect(self.on_refresh_ifaces) row_iface.addWidget(self.btn_refresh_ifaces) row_iface.addStretch(1) mode_layout.addLayout(row_iface) row_adv_button = QHBoxLayout() self.btn_adv_bypass = QPushButton("Advanced bypass...") self.btn_adv_bypass.setToolTip( "EN: Open compact Full tunnel advanced bypass settings (auto-local + ingress-reply).\n" "RU: Открыть компактные расширенные bypass-настройки Full tunnel (auto-local + ingress-reply)." ) self.btn_adv_bypass.clicked.connect(self.on_open_advanced_bypass_dialog) row_adv_button.addWidget(self.btn_adv_bypass) self.btn_mode_checklist = QPushButton("Checklist...") self.btn_mode_checklist.setToolTip( "EN: Quick production checklist for traffic mode/full tunnel safety.\n" "RU: Короткий боевой чеклист по режимам трафика и безопасному full tunnel." ) self.btn_mode_checklist.clicked.connect(self.on_show_mode_checklist) row_adv_button.addWidget(self.btn_mode_checklist) self.lbl_adv_quick = QLabel("Advanced bypass: —") self.lbl_adv_quick.setToolTip( "EN: Saved and active state for Full tunnel advanced bypass.\n" "RU: Сохраненное и активное состояние advanced bypass для Full tunnel." ) self.lbl_adv_quick.setStyleSheet("color: gray;") row_adv_button.addWidget(self.lbl_adv_quick, stretch=1) row_adv_button.addStretch(1) mode_layout.addLayout(row_adv_button) self.lbl_state = QLabel("Traffic mode: —") self.lbl_state.setStyleSheet("color: gray;") mode_layout.addWidget(self.lbl_state) self.lbl_diag = QLabel("—") self.lbl_diag.setStyleSheet("color: gray;") mode_layout.addWidget(self.lbl_diag) tab_basic_layout.addWidget(mode_group) maint_group = QGroupBox("Rollback / cache") maint_layout = QHBoxLayout(maint_group) self.btn_rollback = QPushButton("Clear routes (save cache)") self.btn_rollback.setToolTip("""EN: Clears VPN routes and nft sets, but saves a cache snapshot for restore. RU: Очищает VPN маршруты и nft-сеты, но сохраняет снапшот для восстановления.""") self.btn_rollback.clicked.connect(self.on_rollback) maint_layout.addWidget(self.btn_rollback) self.btn_restore_cache = QPushButton("Restore cached routes") self.btn_restore_cache.setToolTip("""EN: Restores routes/nft from the last clear snapshot. Skips non-critical route restore errors. RU: Восстанавливает маршруты/nft из последнего снапшота clear. Некритичные ошибки восстановления пропускаются.""") self.btn_restore_cache.clicked.connect(self.on_restore_cache) maint_layout.addWidget(self.btn_restore_cache) maint_layout.addStretch(1) tab_basic_layout.addWidget(maint_group) tab_basic_layout.addStretch(1) self.tabs.addTab(tab_basic, "Traffic basics") # ----------------------------------------------------------------- # Apps (runtime): systemd --user unit + backend appmarks # ----------------------------------------------------------------- tab_apps = QWidget() tab_apps_layout = QVBoxLayout(tab_apps) apps_hint = QLabel( "Runtime per-app routing (Wayland-friendly):\n" "- Launch uses systemd-run --user (transient unit).\n" "- Backend adds the unit cgroup into nftset -> fwmark rules.\n" "- Marks are temporary (TTL). Use Policy overrides for persistent policy." ) apps_hint.setWordWrap(True) apps_hint.setStyleSheet("color: gray;") tab_apps_layout.addWidget(apps_hint) # EN: Split this UX into subtabs to fit smaller screens (no giant vertical stack). # RU: Разбиваем на под-вкладки, чтобы помещалось на маленьких экранах. self.apps_tabs = QTabWidget() tab_apps_layout.addWidget(self.apps_tabs, stretch=1) profiles_group = QGroupBox("Added apps (profiles)") profiles_layout = QVBoxLayout(profiles_group) profiles_hint = QLabel( "Persistent launch configs (saved):\n" "- These describe what to run and how to route it.\n" "- Separate from runtime marks/units (which are tied to a specific cgroup)." ) profiles_hint.setWordWrap(True) profiles_hint.setStyleSheet("color: gray;") profiles_layout.addWidget(profiles_hint) row_prof_name = QHBoxLayout() row_prof_name.addWidget(QLabel("Name")) self.ed_app_profile_name = QLineEdit() self.ed_app_profile_name.setPlaceholderText( "optional (default: basename of app)" ) self.ed_app_profile_name.setToolTip( "EN: Optional profile name (for display). Leave empty to auto-name.\n" "RU: Необязательное имя профиля (для отображения). Можно оставить пустым." ) row_prof_name.addWidget(self.ed_app_profile_name, stretch=1) self.btn_app_profiles_refresh = QPushButton("Refresh profiles") self.btn_app_profiles_refresh.setToolTip( "EN: Reload saved app profiles from backend.\n" "RU: Обновить список сохранённых профилей из backend." ) self.btn_app_profiles_refresh.clicked.connect(self.refresh_app_profiles) row_prof_name.addWidget(self.btn_app_profiles_refresh) profiles_layout.addLayout(row_prof_name) row_prof_btn = QHBoxLayout() self.btn_app_profile_save = QPushButton("Save / update profile") self.btn_app_profile_save.setToolTip( "EN: Save current command/route/TTL as a persistent profile (upsert).\n" "RU: Сохранить текущую команду/маршрут/TTL как постоянный профиль (upsert)." ) self.btn_app_profile_save.clicked.connect(self.on_app_profile_save) row_prof_btn.addWidget(self.btn_app_profile_save) self.btn_app_profile_load = QPushButton("Load to form") self.btn_app_profile_load.setToolTip( "EN: Load selected profile into the form (command/route/TTL).\n" "RU: Загрузить выбранный профиль в форму (команда/маршрут/TTL)." ) self.btn_app_profile_load.clicked.connect(self.on_app_profile_load) row_prof_btn.addWidget(self.btn_app_profile_load) self.btn_app_profile_run = QPushButton("Run profile") self.btn_app_profile_run.setToolTip( "EN: Launch selected profile via systemd-run --user and apply routing mark.\n" "RU: Запустить выбранный профиль через systemd-run --user и применить метку маршрутизации." ) self.btn_app_profile_run.clicked.connect(self.on_app_profile_run) row_prof_btn.addWidget(self.btn_app_profile_run) self.btn_app_profile_delete = QPushButton("Delete profile") self.btn_app_profile_delete.setToolTip( "EN: Delete selected saved profile.\n" "RU: Удалить выбранный сохранённый профиль." ) self.btn_app_profile_delete.clicked.connect(self.on_app_profile_delete) row_prof_btn.addWidget(self.btn_app_profile_delete) row_prof_btn.addStretch(1) profiles_layout.addLayout(row_prof_btn) row_prof_shortcuts = QHBoxLayout() self.btn_app_profile_shortcut_create = QPushButton("Create shortcut") self.btn_app_profile_shortcut_create.setToolTip( "EN: Create/overwrite a .desktop shortcut for the selected profile.\n" "EN: The shortcut will launch the app and apply routing mark automatically.\n" "RU: Создать/перезаписать .desktop ярлык для выбранного профиля.\n" "RU: Ярлык запускает приложение и автоматически применяет routing mark." ) self.btn_app_profile_shortcut_create.clicked.connect(self.on_app_profile_shortcut_create) row_prof_shortcuts.addWidget(self.btn_app_profile_shortcut_create) self.btn_app_profile_shortcut_remove = QPushButton("Remove shortcut") self.btn_app_profile_shortcut_remove.setToolTip( "EN: Remove the .desktop shortcut for the selected profile.\n" "RU: Удалить .desktop ярлык для выбранного профиля." ) self.btn_app_profile_shortcut_remove.clicked.connect(self.on_app_profile_shortcut_remove) row_prof_shortcuts.addWidget(self.btn_app_profile_shortcut_remove) row_prof_shortcuts.addStretch(1) profiles_layout.addLayout(row_prof_shortcuts) self.lst_app_profiles = QListWidget() self.lst_app_profiles.setSelectionMode(QAbstractItemView.SingleSelection) self.lst_app_profiles.setToolTip( "EN: Saved app profiles. Double click loads into the form.\n" "RU: Сохранённые профили приложений. Двойной клик загружает в форму." ) self.lst_app_profiles.itemDoubleClicked.connect( lambda _it: self.on_app_profile_load() ) self.lst_app_profiles.currentItemChanged.connect( lambda _cur, _prev: self._update_profile_shortcut_ui() ) self.lst_app_profiles.setFixedHeight(140) profiles_layout.addWidget(self.lst_app_profiles) self.lbl_app_profiles = QLabel("Saved profiles: —") self.lbl_app_profiles.setStyleSheet("color: gray;") profiles_layout.addWidget(self.lbl_app_profiles) self.lbl_profile_shortcut = QLabel("Shortcut: —") self.lbl_profile_shortcut.setWordWrap(True) self.lbl_profile_shortcut.setStyleSheet("color: gray;") profiles_layout.addWidget(self.lbl_profile_shortcut) tab_profiles = QWidget() tab_profiles_layout = QVBoxLayout(tab_profiles) tab_profiles_layout.addWidget(profiles_group) tab_profiles_layout.addStretch(1) self.apps_tabs.addTab(tab_profiles, "Profiles") run_group = QGroupBox("Run app (systemd unit) + apply mark") run_layout = QVBoxLayout(run_group) row_cmd = QHBoxLayout() row_cmd.addWidget(QLabel("Command")) self.ed_app_cmd = QLineEdit() self.ed_app_cmd.setPlaceholderText( "e.g. firefox --private-window https://example.com" ) self.ed_app_cmd.setToolTip( "EN: Command line to run. This runs as current user via systemd --user.\n" "RU: Команда запуска. Запускается от текущего пользователя через systemd --user." ) row_cmd.addWidget(self.ed_app_cmd, stretch=1) self.btn_app_pick = QPushButton("Pick app...") self.btn_app_pick.setToolTip( "EN: Pick an installed app from .desktop entries (system + flatpak + snap) and fill the command.\n" "RU: Выбрать установленное приложение из .desktop (system + flatpak + snap) и заполнить команду." ) self.btn_app_pick.clicked.connect(self.on_app_pick) row_cmd.addWidget(self.btn_app_pick) run_layout.addLayout(row_cmd) row_harden = QHBoxLayout() self.chk_app_browser_harden = QCheckBox("Browser anti-leak flags (WebRTC/QUIC)") self.chk_app_browser_harden.setChecked(True) self.chk_app_browser_harden.setToolTip( "EN: For Chromium-family browsers, auto-add flags to reduce WebRTC/STUN and QUIC leaks.\n" "RU: Для Chromium-подобных браузеров автоматически добавляет флаги против утечек WebRTC/STUN и QUIC." ) row_harden.addWidget(self.chk_app_browser_harden) row_harden.addStretch(1) run_layout.addLayout(row_harden) row_target = QHBoxLayout() row_target.addWidget(QLabel("Route via")) self.rad_app_vpn = QRadioButton("VPN") self.rad_app_vpn.setToolTip( "EN: Force this app traffic via VPN policy table (agvpn).\n" "RU: Форсировать трафик приложения через VPN policy-table (agvpn)." ) self.rad_app_direct = QRadioButton("Direct") self.rad_app_direct.setToolTip( "EN: Force this app traffic to bypass VPN (lookup main), even in full tunnel.\n" "RU: Форсировать трафик приложения мимо VPN (lookup main), даже в full tunnel." ) bg_app = QButtonGroup(self) bg_app.addButton(self.rad_app_vpn) bg_app.addButton(self.rad_app_direct) self.rad_app_vpn.setChecked(True) row_target.addWidget(self.rad_app_vpn) row_target.addWidget(self.rad_app_direct) row_target.addStretch(1) run_layout.addLayout(row_target) row_ttl = QHBoxLayout() self.chk_app_temporary = QCheckBox("Temporary mark (TTL)") self.chk_app_temporary.setToolTip( "EN: Off (default): mark is persistent until manual unmark/clear.\n" "EN: On: mark expires after TTL hours.\n" "RU: Выкл (по умолчанию): метка постоянная до ручного удаления.\n" "RU: Вкл: метка истекает через TTL часов." ) self.chk_app_temporary.setChecked(False) row_ttl.addWidget(self.chk_app_temporary) row_ttl.addWidget(QLabel("TTL (hours)")) self.spn_app_ttl = QSpinBox() self.spn_app_ttl.setRange(1, 24 * 30) # up to ~30 days self.spn_app_ttl.setValue(24) self.spn_app_ttl.setEnabled(False) self.spn_app_ttl.setToolTip( "EN: How long the runtime mark stays active (backend nftset element timeout).\n" "RU: Сколько живет runtime-метка (timeout элемента в nftset)." ) row_ttl.addWidget(self.spn_app_ttl) row_ttl.addStretch(1) run_layout.addLayout(row_ttl) self.chk_app_temporary.toggled.connect(self.spn_app_ttl.setEnabled) pid_group = QGroupBox("Mark existing PID (no launch)") pid_layout = QHBoxLayout(pid_group) pid_layout.addWidget(QLabel("PID")) self.ed_app_pid = QLineEdit() self.ed_app_pid.setPlaceholderText("e.g. 12345") self.ed_app_pid.setToolTip( "EN: Apply a runtime mark to an already running process.\n" "EN: Reads /proc//cgroup to get a cgroupv2 path.\n" "RU: Применить runtime-метку к уже запущенному процессу.\n" "RU: Читает /proc//cgroup чтобы получить cgroupv2 path." ) pid_layout.addWidget(self.ed_app_pid, stretch=1) self.btn_app_pick_pid = QPushButton("Pick process...") self.btn_app_pick_pid.setToolTip( "EN: Pick a running process and fill PID.\n" "RU: Выбрать запущенный процесс и заполнить PID." ) self.btn_app_pick_pid.clicked.connect(self.on_app_pick_pid) pid_layout.addWidget(self.btn_app_pick_pid) self.btn_app_mark_pid = QPushButton("Apply mark") self.btn_app_mark_pid.setToolTip( "EN: Apply routing mark to the PID (does not launch/stop the app).\n" "RU: Применить метку маршрутизации к PID (не запускает/не останавливает приложение)." ) self.btn_app_mark_pid.clicked.connect(self.on_app_mark_pid) pid_layout.addWidget(self.btn_app_mark_pid) run_layout.addWidget(pid_group) row_btn = QHBoxLayout() self.btn_app_run = QPushButton("Run + apply mark") self.btn_app_run.clicked.connect(self.on_app_run) row_btn.addWidget(self.btn_app_run) self.btn_app_refresh = QPushButton("Refresh counts") self.btn_app_refresh.clicked.connect(self.refresh_appmarks_counts) row_btn.addWidget(self.btn_app_refresh) row_btn.addStretch(1) run_layout.addLayout(row_btn) row_btn2 = QHBoxLayout() self.btn_app_clear_vpn = QPushButton("Clear VPN marks") self.btn_app_clear_vpn.clicked.connect(lambda: self.on_appmarks_clear("vpn")) row_btn2.addWidget(self.btn_app_clear_vpn) self.btn_app_clear_direct = QPushButton("Clear Direct marks") self.btn_app_clear_direct.clicked.connect( lambda: self.on_appmarks_clear("direct") ) row_btn2.addWidget(self.btn_app_clear_direct) self.btn_app_stop_last = QPushButton("Stop last scope") self.btn_app_stop_last.setToolTip( "EN: Stops the last launched systemd --user scope.\n" "RU: Останавливает последний запущенный systemd --user scope." ) self.btn_app_stop_last.clicked.connect(self.on_app_stop_last_scope) row_btn2.addWidget(self.btn_app_stop_last) self.btn_app_unmark_last = QPushButton("Unmark last") self.btn_app_unmark_last.setToolTip( "EN: Removes routing mark for the last launched scope (by cgroup id).\n" "RU: Удаляет метку маршрутизации для последнего scope (по cgroup id)." ) self.btn_app_unmark_last.clicked.connect(self.on_app_unmark_last) row_btn2.addWidget(self.btn_app_unmark_last) row_btn2.addStretch(1) run_layout.addLayout(row_btn2) self.lbl_app_counts = QLabel("Marks: —") self.lbl_app_counts.setStyleSheet("color: gray;") run_layout.addWidget(self.lbl_app_counts) self.lbl_app_last = QLabel("Last scope: —") self.lbl_app_last.setStyleSheet("color: gray;") run_layout.addWidget(self.lbl_app_last) self._refresh_last_scope_ui() tab_run = QWidget() tab_run_layout = QVBoxLayout(tab_run) tab_run_layout.addWidget(run_group) tab_run_layout.addStretch(1) self.apps_tabs.addTab(tab_run, "Run") marks_group = QGroupBox("Active runtime marks") marks_layout = QVBoxLayout(marks_group) marks_row = QHBoxLayout() self.btn_marks_refresh = QPushButton("Refresh marks") self.btn_marks_refresh.setToolTip( "EN: Reload active runtime marks from backend (prunes expired).\n" "RU: Обновить активные runtime-метки из backend (просроченные удаляются)." ) self.btn_marks_refresh.clicked.connect(self.refresh_appmarks_items) marks_row.addWidget(self.btn_marks_refresh) self.btn_marks_unmark = QPushButton("Unmark selected") self.btn_marks_unmark.setToolTip( "EN: Remove routing marks for selected items (does not necessarily stop the app).\n" "RU: Удалить метки маршрутизации для выбранных элементов (не обязательно останавливает приложение)." ) self.btn_marks_unmark.clicked.connect(self.on_appmarks_unmark_selected) marks_row.addWidget(self.btn_marks_unmark) marks_row.addStretch(1) marks_layout.addLayout(marks_row) self.lst_marks = QListWidget() self.lst_marks.setSelectionMode(QAbstractItemView.ExtendedSelection) self.lst_marks.setToolTip( "EN: Active runtime marks. Can be persistent or temporary (TTL).\n" "RU: Активные runtime-метки. Могут быть постоянными или временными (TTL)." ) self.lst_marks.setFixedHeight(140) marks_layout.addWidget(self.lst_marks) self.lbl_marks = QLabel("Active marks: —") self.lbl_marks.setStyleSheet("color: gray;") marks_layout.addWidget(self.lbl_marks) tab_marks = QWidget() tab_marks_layout = QVBoxLayout(tab_marks) tab_marks_layout.addWidget(marks_group) tab_marks_layout.addStretch(1) self.apps_tabs.addTab(tab_marks, "Marks") scopes_group = QGroupBox("Active svpn units (systemd --user)") scopes_layout = QVBoxLayout(scopes_group) scopes_row = QHBoxLayout() self.btn_scopes_refresh = QPushButton("Refresh units") self.btn_scopes_refresh.setToolTip( "EN: Refresh list of running svpn-* units (.service/.scope).\n" "RU: Обновить список запущенных svpn-* unit (.service/.scope)." ) self.btn_scopes_refresh.clicked.connect(self.refresh_running_scopes) scopes_row.addWidget(self.btn_scopes_refresh) self.btn_scopes_stop_selected = QPushButton("Stop selected") self.btn_scopes_stop_selected.setToolTip( "EN: Unmarks + stops selected units.\n" "RU: Удаляет метки + останавливает выбранные unit." ) self.btn_scopes_stop_selected.clicked.connect(self.on_scopes_stop_selected) scopes_row.addWidget(self.btn_scopes_stop_selected) self.btn_scopes_cleanup = QPushButton("Cleanup all svpn units") self.btn_scopes_cleanup.setToolTip( "EN: Unmarks + stops ALL running svpn-* units.\n" "RU: Удаляет метки + останавливает ВСЕ запущенные svpn-* unit." ) self.btn_scopes_cleanup.clicked.connect(self.on_scopes_cleanup_all) scopes_row.addWidget(self.btn_scopes_cleanup) scopes_row.addStretch(1) scopes_layout.addLayout(scopes_row) self.lst_scopes = QListWidget() self.lst_scopes.setSelectionMode(QAbstractItemView.ExtendedSelection) self.lst_scopes.setToolTip( "EN: Running svpn units. Double click to copy unit name.\n" "RU: Запущенные svpn unit. Двойной клик копирует имя unit." ) self.lst_scopes.itemDoubleClicked.connect(lambda it: self._copy_scope_unit(it)) self.lst_scopes.setFixedHeight(140) scopes_layout.addWidget(self.lst_scopes) self.lbl_scopes = QLabel("Running units: —") self.lbl_scopes.setStyleSheet("color: gray;") scopes_layout.addWidget(self.lbl_scopes) tab_units = QWidget() tab_units_layout = QVBoxLayout(tab_units) tab_units_layout.addWidget(scopes_group) tab_units_layout.addStretch(1) self.apps_tabs.addTab(tab_units, "Units") self.txt_app = QPlainTextEdit() self.txt_app.setReadOnly(True) tab_log = QWidget() tab_log_layout = QVBoxLayout(tab_log) row_log = QHBoxLayout() self.btn_app_audit = QPushButton("Run audit") self.btn_app_audit.setToolTip( "EN: Runs backend traffic audit (duplicates + nft consistency) and prints it here.\n" "RU: Запускает backend-аудит трафика (дубли + nft консистентность) и печатает сюда." ) self.btn_app_audit.clicked.connect(self.on_app_audit) row_log.addWidget(self.btn_app_audit) row_log.addStretch(1) tab_log_layout.addLayout(row_log) tab_log_layout.addWidget(self.txt_app, stretch=1) self.apps_tabs.addTab(tab_log, "Log") self.tabs.addTab(tab_apps, "Apps (runtime)") tab_adv = QWidget() tab_adv_layout = QVBoxLayout(tab_adv) adv_hint = QLabel( "Policy overrides are source-based rules (subnet/UID/cgroup).\n" "Full tunnel advanced bypass (auto-local + ingress-reply) is configured from Traffic basics." ) adv_hint.setWordWrap(True) adv_hint.setStyleSheet("color: gray;") tab_adv_layout.addWidget(adv_hint) self.ed_vpn_subnets = QPlainTextEdit() self.ed_vpn_subnets.setToolTip("""EN: Force VPN by source subnet. Useful for docker subnets when you want containers via VPN. RU: Принудительно через VPN по source subnet. Полезно для docker-подсетей, если хочешь контейнеры через VPN.""") self.ed_vpn_subnets.setPlaceholderText("Force VPN by source subnet, one per line (e.g. 172.18.0.0/16)") self.ed_vpn_subnets.setFixedHeight(72) self.ed_vpn_uids = QPlainTextEdit() self.ed_vpn_uids.setToolTip("""EN: Force VPN by UID/uidrange (host OUTPUT only). Does not affect forwarded docker traffic. RU: Принудительно через VPN по UID (только процессы хоста). На forwarded docker-трафик не влияет.""") self.ed_vpn_uids.setPlaceholderText("Force VPN by UID/UID range, one per line (e.g. 1000 or 1000-1010)") self.ed_vpn_uids.setFixedHeight(60) self.ed_vpn_cgroups = QPlainTextEdit() self.ed_vpn_cgroups.setToolTip("""EN: Force VPN by systemd cgroup. Backend resolves cgroup -> PIDs -> UID rules at apply time. RU: Принудительно через VPN по cgroup (systemd). Backend резолвит cgroup -> PID -> UID при применении.""") self.ed_vpn_cgroups.setPlaceholderText("Force VPN by cgroup path/name, one per line") self.ed_vpn_cgroups.setFixedHeight(60) self.ed_direct_subnets = QPlainTextEdit() self.ed_direct_subnets.setToolTip("""EN: Force Direct by source subnet. Useful to keep docker subnets direct in full tunnel. RU: Принудительно direct по source subnet. Полезно, чтобы docker-подсети были direct в full tunnel.""") self.ed_direct_subnets.setPlaceholderText("Force Direct by source subnet, one per line") self.ed_direct_subnets.setFixedHeight(72) self.ed_direct_uids = QPlainTextEdit() self.ed_direct_uids.setToolTip("""EN: Force Direct by UID/uidrange (host OUTPUT only). RU: Принудительно direct по UID (только процессы хоста).""") self.ed_direct_uids.setPlaceholderText("Force Direct by UID/UID range, one per line") self.ed_direct_uids.setFixedHeight(60) self.ed_direct_cgroups = QPlainTextEdit() self.ed_direct_cgroups.setToolTip("""EN: Force Direct by systemd cgroup (resolved to UID rules at apply time). RU: Принудительно direct по cgroup (резолвится в UID правила при применении).""") self.ed_direct_cgroups.setPlaceholderText("Force Direct by cgroup path/name, one per line") self.ed_direct_cgroups.setFixedHeight(60) cols = QHBoxLayout() vpn_group = QGroupBox("Force VPN") vpn_layout = QVBoxLayout(vpn_group) vpn_layout.addWidget(QLabel("Source subnets")) vpn_layout.addWidget(self.ed_vpn_subnets) vpn_layout.addWidget(QLabel("UIDs")) vpn_layout.addWidget(self.ed_vpn_uids) vpn_layout.addWidget(QLabel("Cgroups / services")) vpn_layout.addWidget(self.ed_vpn_cgroups) cols.addWidget(vpn_group, stretch=1) direct_group = QGroupBox("Force Direct") direct_layout = QVBoxLayout(direct_group) direct_layout.addWidget(QLabel("Source subnets")) direct_layout.addWidget(self.ed_direct_subnets) direct_layout.addWidget(QLabel("UIDs")) direct_layout.addWidget(self.ed_direct_uids) direct_layout.addWidget(QLabel("Cgroups / services")) direct_layout.addWidget(self.ed_direct_cgroups) cols.addWidget(direct_group, stretch=1) tab_adv_layout.addLayout(cols, stretch=1) row_adv = QHBoxLayout() self.btn_pick_detected = QPushButton("Add detected...") self.btn_pick_detected.setToolTip("""EN: Opens a selector with detected subnets/services/UIDs. Only fills fields; nothing is applied automatically. RU: Открывает список обнаруженных subnet/service/UID. Только заполняет поля; ничего не применяется автоматически.""") self.btn_pick_detected.clicked.connect(self.on_pick_detected) row_adv.addWidget(self.btn_pick_detected) self.btn_apply_overrides = QPushButton("Apply overrides") self.btn_apply_overrides.setToolTip("""EN: Applies policy rules and verifies health. On failure backend rolls back. RU: Применяет policy-rules и проверяет health. При ошибке backend делает откат.""") self.btn_apply_overrides.clicked.connect(self.on_apply_overrides) row_adv.addWidget(self.btn_apply_overrides) self.btn_reload_overrides = QPushButton("Reload overrides") self.btn_reload_overrides.clicked.connect(self.refresh_state) row_adv.addWidget(self.btn_reload_overrides) row_adv.addStretch(1) tab_adv_layout.addLayout(row_adv) self.tabs.addTab(tab_adv, "Policy overrides (Advanced)") # EN: Small status line for last action performed in this dialog. # RU: Строка статуса последнего действия в этом окне. self.lbl_action = QLabel("—") self.lbl_action.setWordWrap(True) self.lbl_action.setStyleSheet("color: gray;") root.addWidget(self.lbl_action) row_bottom = QHBoxLayout() row_bottom.addStretch(1) btn_close = QPushButton("Close") btn_close.clicked.connect(self.accept) row_bottom.addWidget(btn_close) root.addLayout(row_bottom) QtCore.QTimer.singleShot(0, self.refresh_state) QtCore.QTimer.singleShot(0, self.refresh_app_profiles) QtCore.QTimer.singleShot(0, self.refresh_appmarks_counts) QtCore.QTimer.singleShot(0, self.refresh_appmarks_items) QtCore.QTimer.singleShot(0, self.refresh_running_scopes) # EN: Auto-refresh runtime marks/units while dialog is open. # RU: Авто-обновление runtime меток/юнитов пока окно открыто. self._runtime_auto_timer = QtCore.QTimer(self) self._runtime_auto_timer.setInterval(5000) self._runtime_auto_timer.timeout.connect(self._auto_refresh_runtime) self._runtime_auto_timer.start() def _auto_refresh_runtime(self) -> None: # Keep this quiet: no modal popups. self.refresh_appmarks_counts() try: # Only refresh runtime lists when Apps(runtime) tab is visible. if int(self.tabs.currentIndex() or 0) == 1: sub = int(self.apps_tabs.currentIndex() or 0) # Profiles: keep state flags (SC/MARK/RUN) fresh. if sub == 0: self.refresh_app_profiles(quiet=True) # Marks tab. if sub == 2: self.refresh_appmarks_items(quiet=True) # Units tab. if sub == 3: self.refresh_running_scopes(quiet=True) except Exception: pass def _is_operation_error(self, message: str) -> bool: low = (message or "").strip().lower() return ("rolled back" in low) or ("apply failed" in low) or ("verification failed" in low) def _set_action_status(self, msg: str, ok: bool | None = None) -> None: text = (msg or "").strip() or "—" self.lbl_action.setText(text) if ok is True: self.lbl_action.setStyleSheet("color: green;") elif ok is False: self.lbl_action.setStyleSheet("color: red;") else: self.lbl_action.setStyleSheet("color: gray;") def _safe(self, fn, *, title: str = "Traffic mode error") -> None: try: fn() except Exception as e: msg = f"[ui-error] {title}: {e}" self._set_action_status(msg, ok=False) self._emit_log(msg) QMessageBox.critical(self, title, str(e)) def _emit_log(self, msg: str) -> None: text = (msg or "").strip() if not text: return if self.log_cb: self.log_cb(text) else: try: self.ctrl.log_gui(text) except Exception: pass def _preferred_iface_value(self) -> str: raw = self.cmb_iface.currentText().strip() if raw.lower() in ("", "auto", "-", "default"): return "" return raw def _set_preferred_iface_options(self, ifaces: list[str], selected: str) -> None: vals = ["auto"] + [x for x in ifaces if x] sel = selected.strip() if selected else "auto" if not sel: sel = "auto" if sel not in vals: vals.append(sel) self.cmb_iface.blockSignals(True) self.cmb_iface.clear() self.cmb_iface.addItems(vals) idx = self.cmb_iface.findText(sel) if idx < 0: idx = self.cmb_iface.findText("auto") if idx >= 0: self.cmb_iface.setCurrentIndex(idx) else: self.cmb_iface.setEditText(sel) self.cmb_iface.blockSignals(False) def _lines_from_text(self, txt: str) -> list[str]: out: list[str] = [] for raw in (txt or "").replace("\r", "\n").split("\n"): line = raw.strip() if line: out.append(line) return out def _set_lines(self, widget: QPlainTextEdit, vals: list[str]) -> None: widget.blockSignals(True) widget.setPlainText("\n".join([x for x in vals if str(x).strip()])) widget.blockSignals(False) def _merge_lines(self, widget: QPlainTextEdit, vals: list[str]) -> int: cur = self._lines_from_text(widget.toPlainText()) seen = {x.strip() for x in cur} added = 0 for v in (vals or []): vv = str(v).strip() if not vv or vv in seen: continue cur.append(vv) seen.add(vv) added += 1 if added > 0: self._set_lines(widget, cur) return added def _candidates_add(self, target: str, kind: str, values: list[str]) -> None: tgt = (target or "").strip().lower() k = (kind or "").strip().lower() if tgt not in ("vpn", "direct"): return widget: QPlainTextEdit | None = None if tgt == "vpn": if k == "subnet": widget = self.ed_vpn_subnets elif k == "uid": widget = self.ed_vpn_uids elif k == "cgroup": widget = self.ed_vpn_cgroups else: if k == "subnet": widget = self.ed_direct_subnets elif k == "uid": widget = self.ed_direct_uids elif k == "cgroup": widget = self.ed_direct_cgroups if widget is None: return added = self._merge_lines(widget, values or []) if added > 0: msg = f"Traffic candidates added: target={tgt} kind={k} added={added}" self._set_action_status(msg, ok=True) self._emit_log(msg) else: msg = f"Traffic candidates add: nothing new (target={tgt} kind={k})" self._set_action_status(msg, ok=None) self._emit_log(msg) def on_pick_detected(self) -> None: def work() -> None: cands = self.ctrl.traffic_candidates() existing = { "vpn": { "subnet": set(self._lines_from_text(self.ed_vpn_subnets.toPlainText())), "uid": set(self._lines_from_text(self.ed_vpn_uids.toPlainText())), "cgroup": set(self._lines_from_text(self.ed_vpn_cgroups.toPlainText())), }, "direct": { "subnet": set(self._lines_from_text(self.ed_direct_subnets.toPlainText())), "uid": set(self._lines_from_text(self.ed_direct_uids.toPlainText())), "cgroup": set(self._lines_from_text(self.ed_direct_cgroups.toPlainText())), }, } dlg = TrafficCandidatesDialog( cands, existing=existing, add_cb=self._candidates_add, parent=self, ) dlg.exec() self._safe(work, title="Traffic candidates error") def _set_mode_state( self, desired_mode: str, applied_mode: str, preferred_iface: str, advanced_active: bool, auto_local_bypass: bool, auto_local_active: bool, ingress_reply_bypass: bool, ingress_reply_active: bool, bypass_candidates: int, overrides_applied: int, cgroup_resolved_uids: int, cgroup_warning: str, healthy: bool, ingress_rule_present: bool, ingress_nft_active: bool, probe_ok: bool, probe_message: str, active_iface: str, iface_reason: str, message: str, ) -> None: desired = (desired_mode or "").strip().lower() or "selective" applied = (applied_mode or "").strip().lower() or "direct" if healthy: color = "green" health_txt = "OK" else: color = "red" health_txt = "MISMATCH" text = f"Traffic mode: {desired} (applied: {applied}) [{health_txt}]" diag_parts = [] diag_parts.append(f"preferred={preferred_iface or 'auto'}") diag_parts.append(f"advanced={'on' if advanced_active else 'off'}") diag_parts.append( f"auto_local={'on' if auto_local_bypass else 'off'}" f"({'active' if auto_local_active else 'saved'})" ) diag_parts.append( f"ingress_reply={'on' if ingress_reply_bypass else 'off'}" f"({'active' if ingress_reply_active else 'saved'})" ) if auto_local_active and bypass_candidates > 0: diag_parts.append(f"bypass_routes={bypass_candidates}") diag_parts.append(f"overrides={overrides_applied}") if cgroup_resolved_uids > 0: diag_parts.append(f"cgroup_uids={cgroup_resolved_uids}") if cgroup_warning: diag_parts.append(f"cgroup_warning={cgroup_warning}") if active_iface: diag_parts.append(f"iface={active_iface}") if iface_reason: diag_parts.append(f"source={iface_reason}") diag_parts.append( f"ingress_diag=rule:{'ok' if ingress_rule_present else 'off'}" f"/nft:{'ok' if ingress_nft_active else 'off'}" ) diag_parts.append(f"probe={'ok' if probe_ok else 'fail'}") if probe_message: diag_parts.append(probe_message) if message: diag_parts.append(message) diag = " | ".join(diag_parts) if diag_parts else "—" self.lbl_state.setText(text) self.lbl_state.setStyleSheet(f"color: {color};") self.lbl_diag.setText(diag) self.lbl_diag.setStyleSheet("color: gray;") quick = ( f"Advanced bypass: auto-local={'on' if auto_local_bypass else 'off'} " f"({('active' if auto_local_active else 'saved')}), " f"ingress-reply={'on' if ingress_reply_bypass else 'off'} " f"({('active' if ingress_reply_active else 'saved')})" ) if advanced_active: adv_color = "green" if (ingress_reply_active or auto_local_active) else "gray" self.lbl_adv_quick.setText(quick) self.lbl_adv_quick.setStyleSheet(f"color: {adv_color};") else: self.lbl_adv_quick.setText(f"{quick} | applies only in Full tunnel") self.lbl_adv_quick.setStyleSheet("color: gray;") def refresh_state(self) -> None: def work() -> None: view = self.ctrl.traffic_mode_view() mode = (view.desired_mode or "selective").strip().lower() self.rad_selective.blockSignals(True) self.rad_full.blockSignals(True) self.rad_direct.blockSignals(True) self.rad_selective.setChecked(mode == "selective") self.rad_full.setChecked(mode == "full_tunnel") self.rad_direct.setChecked(mode == "direct") self.rad_selective.blockSignals(False) self.rad_full.blockSignals(False) self.rad_direct.blockSignals(False) opts = self.ctrl.traffic_interfaces() self._set_preferred_iface_options(opts, view.preferred_iface) self._set_full_tunnel_advanced_enabled(mode) self._adv_auto_local_bypass = bool(view.auto_local_bypass) self._adv_ingress_reply_bypass = bool(view.ingress_reply_bypass) self._set_lines(self.ed_vpn_subnets, list(view.force_vpn_subnets or [])) self._set_lines(self.ed_vpn_uids, list(view.force_vpn_uids or [])) self._set_lines(self.ed_vpn_cgroups, list(view.force_vpn_cgroups or [])) self._set_lines(self.ed_direct_subnets, list(view.force_direct_subnets or [])) self._set_lines(self.ed_direct_uids, list(view.force_direct_uids or [])) self._set_lines(self.ed_direct_cgroups, list(view.force_direct_cgroups or [])) self._set_mode_state( view.desired_mode, view.applied_mode, view.preferred_iface, bool(view.advanced_active), bool(view.auto_local_bypass), bool(view.auto_local_active), bool(view.ingress_reply_bypass), bool(view.ingress_reply_active), int(view.bypass_candidates), int(view.overrides_applied), int(view.cgroup_resolved_uids), view.cgroup_warning, bool(view.healthy), bool(view.ingress_rule_present), bool(view.ingress_nft_active), bool(view.probe_ok), view.probe_message, view.active_iface, view.iface_reason, view.message, ) self._safe(work) def on_mode_toggle(self, mode: str, checked: bool) -> None: if not checked: return def work() -> None: preferred = self._preferred_iface_value() auto_local = bool(self._adv_auto_local_bypass) ingress_reply = bool(self._adv_ingress_reply_bypass) view = self.ctrl.traffic_mode_set(mode, preferred, auto_local, ingress_reply) msg = ( f"Traffic mode set: desired={view.desired_mode}, " f"applied={view.applied_mode}, iface={view.active_iface or '-'}, " f"preferred={preferred or 'auto'}, probe_ok={view.probe_ok}, " f"healthy={view.healthy}, auto_local_bypass={view.auto_local_bypass}, " f"ingress_reply_bypass={view.ingress_reply_bypass}, ingress_reply_active={view.ingress_reply_active}, " f"bypass_routes={view.bypass_candidates}, overrides={view.overrides_applied}, " f"cgroup_uids={view.cgroup_resolved_uids}, message={view.message}" ) self._emit_log(msg) op_ok = bool(view.healthy) and not self._is_operation_error(view.message) self._set_action_status( f"Traffic mode set: desired={view.desired_mode} applied={view.applied_mode} message={view.message}", ok=op_ok, ) self.refresh_state() if self.refresh_cb: self.refresh_cb() self._safe(work) def on_refresh_ifaces(self) -> None: def work() -> None: view = self.ctrl.traffic_mode_view() opts = self.ctrl.traffic_interfaces() self._set_preferred_iface_options(opts, view.preferred_iface) self._emit_log( "Traffic ifaces refreshed: " f"preferred={view.preferred_iface or 'auto'} " f"active={view.active_iface or '-'}" ) self._set_action_status("Traffic ifaces refreshed", ok=True) self.refresh_state() if self.refresh_cb: self.refresh_cb() self._safe(work, title="Traffic iface detect error") def _selected_mode(self) -> str: if self.rad_full.isChecked(): return "full_tunnel" if self.rad_direct.isChecked(): return "direct" return "selective" def _set_full_tunnel_advanced_enabled(self, mode: str) -> None: is_full = (mode or "").strip().lower() == "full_tunnel" self.btn_adv_bypass.setEnabled(True) if is_full: self.btn_adv_bypass.setText("Advanced bypass...") self.btn_adv_bypass.setStyleSheet("") else: self.btn_adv_bypass.setText("Advanced bypass... (saved only)") self.btn_adv_bypass.setStyleSheet("color: gray;") def on_show_mode_checklist(self) -> None: text = ( "Quick checklist\n\n" "1) Select mode:\n" "- Selective: safest default for mixed host/server workloads.\n" "- Full tunnel: all traffic via VPN (then review advanced bypass).\n" "- Direct: VPN policy rules disabled.\n\n" "2) For Full tunnel:\n" "- Open Advanced bypass.\n" "- Enable Auto-local bypass for LAN/container reachability.\n" "- Enable Ingress-reply bypass to keep public services reachable.\n\n" "3) Verify status line:\n" "- health must be [OK].\n" "- ingress_diag should be rule:ok/nft:ok when ingress-reply is ON.\n\n" "4) If something breaks:\n" "- Use Advanced bypass -> Reset bypass.\n" "- Or switch back to Selective and re-test." ) QMessageBox.information(self, "Traffic mode checklist", text) def on_open_advanced_bypass_dialog(self) -> None: mode = self._selected_mode() dlg = QDialog(self) dlg.setWindowTitle("Advanced bypass (Full tunnel)") dlg.setModal(True) layout = QVBoxLayout(dlg) hint = QLabel( "Applies only in Full tunnel.\n" "- Auto-local bypass: keep LAN/docker reachable.\n" "- Ingress-reply bypass: keep inbound/public services reachable." ) hint.setWordWrap(True) hint.setStyleSheet("color: gray;") layout.addWidget(hint) chk_auto = QCheckBox("Auto-local bypass (LAN/container subnets)") chk_auto.setChecked(bool(self._adv_auto_local_bypass)) chk_auto.setToolTip( "EN: Keeps LAN/container routes direct in Full tunnel.\n" "RU: Сохраняет LAN/контейнерные маршруты direct в Full tunnel." ) layout.addWidget(chk_auto) chk_ingress = QCheckBox("Ingress-reply bypass (keep public services reachable)") chk_ingress.setChecked(bool(self._adv_ingress_reply_bypass)) chk_ingress.setToolTip( "EN: Keeps replies for inbound WAN connections on main/direct route.\n" "RU: Оставляет ответы на входящие WAN-соединения по main/direct." ) layout.addWidget(chk_ingress) state = QLabel( "Current mode is Full tunnel: changes apply now." if mode == "full_tunnel" else "Current mode is not Full tunnel: changes are saved and applied later." ) state.setWordWrap(True) state.setStyleSheet("color: green;" if mode == "full_tunnel" else "color: #b07f00;") layout.addWidget(state) reset_note = QLabel( "Reset bypass = disable both toggles and apply to current mode." ) reset_note.setWordWrap(True) reset_note.setStyleSheet("color: gray;") layout.addWidget(reset_note) row = QHBoxLayout() row.addStretch(1) btn_cancel = QPushButton("Cancel") btn_reset = QPushButton("Reset bypass") btn_apply = QPushButton("Apply") row.addWidget(btn_cancel) row.addWidget(btn_reset) row.addWidget(btn_apply) layout.addLayout(row) btn_cancel.clicked.connect(dlg.reject) btn_apply.clicked.connect(dlg.accept) action = {"mode": "apply"} def on_reset_click() -> None: action["mode"] = "reset" dlg.accept() btn_reset.clicked.connect(on_reset_click) if dlg.exec() != QDialog.Accepted: return def work() -> None: if action["mode"] == "reset": view = self.ctrl.traffic_advanced_reset() self._adv_auto_local_bypass = bool(view.auto_local_bypass) self._adv_ingress_reply_bypass = bool(view.ingress_reply_bypass) self._emit_log( "Traffic advanced bypass reset: " f"mode={view.desired_mode}, auto_local={view.auto_local_bypass}, " f"ingress_reply={view.ingress_reply_bypass}, message={view.message}" ) op_ok = bool(view.healthy) and not self._is_operation_error(view.message) self._set_action_status( f"Advanced bypass reset ({view.message})", ok=op_ok, ) self.refresh_state() if self.refresh_cb: self.refresh_cb() return auto_local = bool(chk_auto.isChecked()) ingress_reply = bool(chk_ingress.isChecked()) preferred = self._preferred_iface_value() view = self.ctrl.traffic_mode_set(mode, preferred, auto_local, ingress_reply) self._adv_auto_local_bypass = bool(view.auto_local_bypass) self._adv_ingress_reply_bypass = bool(view.ingress_reply_bypass) self._emit_log( "Traffic advanced bypass set: " f"mode={view.desired_mode}, auto_local={view.auto_local_bypass}, " f"ingress_reply={view.ingress_reply_bypass}, ingress_active={view.ingress_reply_active}, " f"message={view.message}" ) op_ok = bool(view.healthy) and not self._is_operation_error(view.message) self._set_action_status( "Advanced bypass saved: " f"auto_local={'on' if view.auto_local_bypass else 'off'}, " f"ingress_reply={'on' if view.ingress_reply_bypass else 'off'} ({view.message})", ok=op_ok, ) self.refresh_state() if self.refresh_cb: self.refresh_cb() self._safe(work, title="Advanced bypass error") def on_apply_overrides(self) -> None: def work() -> None: mode = self._selected_mode() preferred = self._preferred_iface_value() auto_local = bool(self._adv_auto_local_bypass) ingress_reply = bool(self._adv_ingress_reply_bypass) vpn_subnets = self._lines_from_text(self.ed_vpn_subnets.toPlainText()) vpn_uids = self._lines_from_text(self.ed_vpn_uids.toPlainText()) vpn_cgroups = self._lines_from_text(self.ed_vpn_cgroups.toPlainText()) direct_subnets = self._lines_from_text(self.ed_direct_subnets.toPlainText()) direct_uids = self._lines_from_text(self.ed_direct_uids.toPlainText()) direct_cgroups = self._lines_from_text(self.ed_direct_cgroups.toPlainText()) view = self.ctrl.traffic_mode_set( mode, preferred, auto_local, ingress_reply, vpn_subnets, vpn_uids, vpn_cgroups, direct_subnets, direct_uids, direct_cgroups, ) msg = ( f"Traffic overrides applied: mode={view.desired_mode}, " f"auto_local={view.auto_local_bypass}, ingress_reply={view.ingress_reply_bypass}, ingress_active={view.ingress_reply_active}, " f"vpn_subnets={len(view.force_vpn_subnets)}, vpn_uids={len(view.force_vpn_uids)}, vpn_cgroups={len(view.force_vpn_cgroups)}, " f"direct_subnets={len(view.force_direct_subnets)}, direct_uids={len(view.force_direct_uids)}, direct_cgroups={len(view.force_direct_cgroups)}, " f"overrides={view.overrides_applied}, cgroup_uids={view.cgroup_resolved_uids}, " f"healthy={view.healthy}, message={view.message}" ) self._emit_log(msg) op_ok = bool(view.healthy) and not self._is_operation_error(view.message) self._set_action_status( f"Overrides applied: overrides={view.overrides_applied} message={view.message}", ok=op_ok, ) self.refresh_state() if self.refresh_cb: self.refresh_cb() self._safe(work, title="Apply overrides error") # ----------------------------------------------------------------- # Apps (runtime) tab # ----------------------------------------------------------------- def _append_app_log(self, msg: str) -> None: line = (msg or "").rstrip() if not line: return try: self.txt_app.appendPlainText(line) except Exception: pass self._emit_log(line) def _infer_app_key_from_cmdline(self, cmdline: str) -> str: # Keep this aligned with backend canonicalization (flatpak/snap/env/path). cmd = (cmdline or "").strip() if not cmd: return "" try: args = shlex.split(cmd) except Exception: args = cmd.split() return self._canonical_app_key_from_tokens(args) def _canonical_app_key_from_tokens(self, tokens: list[str]) -> str: toks = [str(x or "").strip() for x in (tokens or []) if str(x or "").strip()] if not toks: return "" def base(t: str) -> str: return os.path.basename(str(t or "").strip()) def extract_run_target(toks2: list[str]) -> str: idx = -1 for i, t in enumerate(toks2): if t == "run": idx = i break if idx < 0: return "" for j in range(idx + 1, len(toks2)): t = toks2[j].strip() if not t or t == "--": continue if t.startswith("-"): continue return t return "" primary = toks[0] b = base(primary).lower() if b == "env": # env VAR=1 /usr/bin/app ... for j in range(1, len(toks)): t = toks[j].strip() if not t or t == "--": continue if t.startswith("-"): continue if "=" in t: # VAR=VAL continue return self._canonical_app_key_from_tokens(toks[j:]) return "env" if b == "flatpak": appid = extract_run_target(toks) return f"flatpak:{appid}" if appid else "flatpak" if b == "snap": name = extract_run_target(toks) return f"snap:{name}" if name else "snap" if b == "gtk-launch" and len(toks) >= 2: did = toks[1].strip() if did and not did.startswith("-"): return f"desktop:{did}" if "/" in primary: return base(primary) or primary return primary def _ui_runtime_mark_ttl_sec(self) -> int: if bool(getattr(self, "chk_app_temporary", None)) and self.chk_app_temporary.isChecked(): return int(self.spn_app_ttl.value()) * 3600 return 0 def _is_chromium_like_cmd(self, tokens: list[str]) -> bool: toks = [str(x or "").strip() for x in (tokens or []) if str(x or "").strip()] if not toks: return False exe = os.path.basename(toks[0]).lower() known = { "google-chrome", "google-chrome-stable", "chromium", "chromium-browser", "microsoft-edge", "microsoft-edge-stable", "brave", "brave-browser", "opera", "opera-beta", "opera-developer", "vivaldi", "vivaldi-stable", } if exe in known: return True if any(x in exe for x in ("chrome", "chromium", "edge", "brave", "opera", "vivaldi")): return True # flatpak run if exe == "flatpak": for i, t in enumerate(toks): if t == "run": for cand in toks[i + 1:]: c = cand.strip().lower() if not c or c.startswith("-") or c == "--": continue return any(x in c for x in ("chrome", "chromium", "edge", "brave", "opera", "vivaldi")) break return False def _maybe_harden_browser_cmdline(self, cmdline: str) -> str: raw = (cmdline or "").strip() if not raw: return raw if not bool(getattr(self, "chk_app_browser_harden", None)) or not self.chk_app_browser_harden.isChecked(): return raw try: toks = shlex.split(raw) except Exception: return raw if not self._is_chromium_like_cmd(toks): return raw flags = [ "--disable-quic", "--force-webrtc-ip-handling-policy=disable_non_proxied_udp", ] low = [t.lower() for t in toks] changed = False for fl in flags: fl_low = fl.lower() if any(t == fl_low or t.startswith(fl_low + "=") for t in low): continue toks.append(fl) changed = True if not changed: return raw return " ".join(shlex.quote(t) for t in toks) def _launch_and_mark( self, *, cmdline: str, target: str, ttl_sec: int, app_key: str = "", ) -> None: cmdline = (cmdline or "").strip() if not cmdline: raise ValueError("empty command") tgt = (target or "").strip().lower() if tgt not in ("vpn", "direct"): raise ValueError("invalid target") ttl = int(ttl_sec or 0) if ttl <= 0: ttl = self._ui_runtime_mark_ttl_sec() ttl_log = "persistent" if ttl <= 0 else f"{ttl}s" key = (app_key or "").strip() or self._infer_app_key_from_cmdline(cmdline) run_cmdline = self._maybe_harden_browser_cmdline(cmdline) if run_cmdline != cmdline: self._append_app_log("[app] browser hardening: added anti-leak flags") # EN: If we already have a running unit for the same app_key+target, refresh mark instead of spawning. # RU: Если уже есть запущенный unit для того же app_key+target — обновляем метку, не плодим инстансы. try: items = list(self.ctrl.traffic_appmarks_items() or []) except Exception: items = [] for it in items: if (getattr(it, "target", "") or "").strip().lower() != tgt: continue if (getattr(it, "app_key", "") or "").strip() != key: continue unit = (getattr(it, "unit", "") or "").strip() if not unit: continue code, out = self._systemctl_user(["is-active", unit]) if code == 0 and (out or "").strip().lower() == "active": cg = self._effective_cgroup_for_unit_retry(unit, timeout_sec=3.0) self._append_app_log( f"[app] already running: app={key} target={tgt} unit={unit} (refreshing mark)" ) res = self.ctrl.traffic_appmarks_apply( op="add", target=tgt, cgroup=cg, unit=unit, command=run_cmdline, app_key=key, timeout_sec=ttl, ) if not res.ok: raise RuntimeError(res.message or "appmark refresh failed") self._set_action_status( f"App mark refreshed: target={tgt} cgroup_id={res.cgroup_id}", ok=True, ) self._set_last_scope( unit=unit, target=tgt, app_key=key, cmdline=run_cmdline, cgroup_id=int(res.cgroup_id or 0), ) self.refresh_appmarks_items(quiet=True) self.refresh_appmarks_counts() self.refresh_running_scopes(quiet=True) self.refresh_app_profiles(quiet=True) return unit = f"svpn-{tgt}-{int(time.time())}.service" self._append_app_log(f"[app] launching: app={key or '-'} target={tgt} ttl={ttl_log} unit={unit}") try: cg, out = self._run_systemd_unit(run_cmdline, unit=unit) except Exception as e: try: self._stop_scope_unit(unit) self._append_app_log(f"[app] fail-closed: stopped unit after launch error: {unit}") except Exception as stop_err: self._append_app_log(f"[app] fail-closed WARN: stop failed for {unit}: {stop_err}") raise RuntimeError(f"{e}\n\nUnit: {unit}") from e if out: self._append_app_log(f"[app] systemd-run:\n{out}") self._append_app_log(f"[app] ControlGroup: {cg}") res = self.ctrl.traffic_appmarks_apply( op="add", target=tgt, cgroup=cg, unit=unit, command=run_cmdline, app_key=key, timeout_sec=ttl, ) if not res.ok: stop_note = "" try: self._stop_scope_unit(unit) self._append_app_log(f"[app] fail-closed: stopped unit after mark failure: {unit}") stop_note = "\n\nUnit was stopped (fail-closed)." except Exception as stop_err: self._append_app_log(f"[app] fail-closed WARN: stop failed for {unit}: {stop_err}") stop_note = f"\n\nWARNING: failed to stop unit after mark error: {stop_err}" low = (res.message or "").lower() if "cgroupv2 path fails" in low or "no such file or directory" in low: raise RuntimeError( (res.message or "appmark apply failed") + stop_note + "\n\n" + "EN: This usually means the app didn't stay inside the new systemd unit " + "(often because it was already running). Close the app completely and run again.\n" + "RU: Обычно это значит, что приложение не осталось в новом systemd unit " + "(часто потому что оно уже было запущено). Полностью закрой приложение и запусти снова." ) raise RuntimeError((res.message or "appmark apply failed") + stop_note) timeout_txt = "persistent" if int(res.timeout_sec or 0) <= 0 else f"{res.timeout_sec}s" self._append_app_log(f"[appmarks] OK: {res.message} cgroup_id={res.cgroup_id} timeout={timeout_txt}") self._set_action_status(f"App mark added: target={tgt} cgroup_id={res.cgroup_id}", ok=True) self._set_last_scope( unit=unit, target=tgt, app_key=key, cmdline=run_cmdline, cgroup_id=int(res.cgroup_id or 0), ) self.refresh_appmarks_items(quiet=True) self.refresh_appmarks_counts() self.refresh_running_scopes(quiet=True) self.refresh_app_profiles(quiet=True) def _selected_app_profile(self): it = self.lst_app_profiles.currentItem() if not it: return None return it.data(QtCore.Qt.UserRole) def _profile_shortcuts_dir(self) -> str: # ~/.local/share/applications is the standard per-user location. return os.path.join(os.path.expanduser("~"), ".local", "share", "applications") def _profile_shortcut_path(self, profile_id: str) -> str: pid = (profile_id or "").strip() if not pid: return "" safe = re.sub(r"[^A-Za-z0-9._-]+", "-", pid).strip("-") if not safe: safe = "profile" return os.path.join(self._profile_shortcuts_dir(), f"svpn-profile-{safe}.desktop") def _profile_shortcut_exists(self, profile_id: str) -> bool: p = self._profile_shortcut_path(profile_id) return bool(p) and os.path.isfile(p) def _render_profile_shortcut(self, prof) -> str: pid = (getattr(prof, "id", "") or "").strip() name = (getattr(prof, "name", "") or "").strip() or pid or "SVPN profile" target = (getattr(prof, "target", "") or "").strip().lower() if target not in ("vpn", "direct"): target = "vpn" label_target = "VPN" if target == "vpn" else "Direct" script = os.path.abspath(os.path.join(os.path.dirname(__file__), "svpn_run_profile.py")) # Use env python3 so the shortcut works even if python3 is not /usr/bin/python3. exec_line = f"/usr/bin/env SVPN_BROWSER_HARDEN=1 python3 {script} --id {pid}" # Keep .desktop content ASCII-ish. Values are UTF-8-safe by spec, but avoid surprises. name_safe = (name or "SVPN profile").replace("\n", " ").replace("\r", " ").strip() return ( "[Desktop Entry]\n" "Version=1.0\n" "Type=Application\n" f"Name=SVPN: {name_safe} [{label_target}]\n" f"Comment=Selective VPN: run traffic profile id={pid} target={target}\n" f"Exec={exec_line}\n" "Terminal=false\n" "Categories=Network;\n" f"X-SVPN-ProfileID={pid}\n" f"X-SVPN-Target={target}\n" ) def _update_profile_shortcut_ui(self) -> None: prof = self._selected_app_profile() if prof is None: self.btn_app_profile_shortcut_create.setEnabled(False) self.btn_app_profile_shortcut_remove.setEnabled(False) self.lbl_profile_shortcut.setText("Shortcut: —") return pid = (getattr(prof, "id", "") or "").strip() path = self._profile_shortcut_path(pid) installed = self._profile_shortcut_exists(pid) self.btn_app_profile_shortcut_create.setEnabled(True) self.btn_app_profile_shortcut_remove.setEnabled(installed) state = "installed" if installed else "not installed" self.lbl_profile_shortcut.setText(f"Shortcut: {state} ({path})") def refresh_app_profiles(self, quiet: bool = False) -> None: def work() -> None: profs = list(self.ctrl.traffic_app_profiles_list() or []) self.lst_app_profiles.clear() # Best-effort runtime context for UI flags (MARK/RUN). try: mark_items = list(self.ctrl.traffic_appmarks_items() or []) except Exception: mark_items = [] marks_by_key: dict[tuple[str, str], list] = {} unit_active: dict[str, bool] = {} for it in mark_items: tgt = (getattr(it, "target", "") or "").strip().lower() key = (getattr(it, "app_key", "") or "").strip() if tgt not in ("vpn", "direct") or not key: continue marks_by_key.setdefault((tgt, key), []).append(it) unit = (getattr(it, "unit", "") or "").strip() if unit and unit not in unit_active: code, out = self._systemctl_user(["is-active", unit]) unit_active[unit] = bool(code == 0 and (out or "").strip().lower() == "active") shortcuts = 0 with_marks = 0 running = 0 for p in profs: # p is a UI-friendly dataclass from ApiClient. name = (getattr(p, "name", "") or "").strip() pid = (getattr(p, "id", "") or "").strip() target = (getattr(p, "target", "") or "").strip().lower() app_key = (getattr(p, "app_key", "") or "").strip() cmd = (getattr(p, "command", "") or "").strip() ttl_sec = int(getattr(p, "ttl_sec", 0) or 0) ttl_txt = "persistent" if ttl_sec <= 0 else f"{ttl_sec}s" label = name or pid or "(unnamed)" if target in ("vpn", "direct"): label += f" [{target}]" flags: list[str] = [] sc_path = self._profile_shortcut_path(pid) has_shortcut = bool(sc_path and os.path.isfile(sc_path)) if has_shortcut: flags.append("SC") shortcuts += 1 mkey = (target, app_key) items = marks_by_key.get(mkey) or [] if items: flags.append("MARK") with_marks += 1 run_units: list[str] = [] for it in items: unit = (getattr(it, "unit", "") or "").strip() if not unit: continue if unit_active.get(unit, False): run_units.append(unit) if run_units: flags.append("RUN") running += 1 if flags: label += " [" + ",".join(flags) + "]" it = QListWidgetItem(label) sc_state = "yes" if has_shortcut else "no" unit_txt = ", ".join(run_units[:3]) if len(run_units) > 3: unit_txt += f", +{len(run_units) - 3}" it.setToolTip( ( f"id: {pid}\n" f"app_key: {app_key}\n" f"target: {target}\n" f"ttl: {ttl_txt}\n\n" f"shortcut: {sc_state}\n" f"shortcut_path: {sc_path}\n\n" f"runtime_marks: {len(items)}\n" f"running_units: {len(run_units)}\n" f"units: {unit_txt or '-'}\n\n" f"{cmd}" ).strip() ) it.setData(QtCore.Qt.UserRole, p) self.lst_app_profiles.addItem(it) self.lbl_app_profiles.setText( f"Saved profiles: {len(profs)} (shortcut={shortcuts}, mark={with_marks}, run={running})" ) self._update_profile_shortcut_ui() if quiet: try: work() except Exception as e: self.lbl_app_profiles.setText(f"Saved profiles: error: {e}") return self._safe(work, title="Refresh profiles error") def on_app_profile_save(self) -> None: def work() -> None: cmdline = (self.ed_app_cmd.text() or "").strip() if not cmdline: QMessageBox.warning(self, "Missing command", "Please enter a command first.") return target = "vpn" if self.rad_app_vpn.isChecked() else "direct" ttl_sec = self._ui_runtime_mark_ttl_sec() name = (self.ed_app_profile_name.text() or "").strip() app_key = self._infer_app_key_from_cmdline(cmdline) res = self.ctrl.traffic_app_profile_upsert( name=name, app_key=app_key, command=cmdline, target=target, ttl_sec=ttl_sec, ) if not res.ok: self._set_action_status(f"Save profile failed: {res.message}", ok=False) raise RuntimeError(res.message or "save failed") prof = getattr(res, "profile", None) pid = (getattr(prof, "id", "") or "").strip() if prof is not None else "" self._set_action_status(f"Profile saved: {pid or '(ok)'}", ok=True) self._append_app_log(f"[profile] saved: id={pid or '-'} target={target} app={app_key or '-'}") self.refresh_app_profiles(quiet=True) # If shortcut exists already, rewrite it to reflect updated name/target. if prof is not None and pid and self._profile_shortcut_exists(pid): try: sc_path = self._profile_shortcut_path(pid) os.makedirs(os.path.dirname(sc_path), exist_ok=True) with open(sc_path, "w", encoding="utf-8", errors="replace") as f: f.write(self._render_profile_shortcut(prof)) self._append_app_log(f"[shortcut] updated: {sc_path}") except Exception as e: # Non-fatal: profile save is more important than shortcut rewrite. self._append_app_log(f"[shortcut] update failed: {e}") # Best-effort select newly saved profile. if pid: for i in range(self.lst_app_profiles.count()): it = self.lst_app_profiles.item(i) if not it: continue p = it.data(QtCore.Qt.UserRole) if (getattr(p, "id", "") or "").strip() == pid: self.lst_app_profiles.setCurrentRow(i) break self._safe(work, title="Save profile error") def on_app_profile_shortcut_create(self) -> None: prof = self._selected_app_profile() if prof is None: return pid = (getattr(prof, "id", "") or "").strip() if not pid: return def work() -> None: path = self._profile_shortcut_path(pid) if not path: raise RuntimeError("cannot derive shortcut path") os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8", errors="replace") as f: f.write(self._render_profile_shortcut(prof)) self._append_app_log(f"[shortcut] saved: {path} id={pid}") self._set_action_status(f"Shortcut saved: {os.path.basename(path)}", ok=True) self._update_profile_shortcut_ui() self._safe(work, title="Create shortcut error") def on_app_profile_shortcut_remove(self) -> None: prof = self._selected_app_profile() if prof is None: return pid = (getattr(prof, "id", "") or "").strip() if not pid: return def work() -> None: path = self._profile_shortcut_path(pid) if not path: raise RuntimeError("cannot derive shortcut path") if not os.path.exists(path): self._set_action_status("Shortcut not installed", ok=True) self._update_profile_shortcut_ui() return try: os.remove(path) except FileNotFoundError: pass self._append_app_log(f"[shortcut] removed: {path} id={pid}") self._set_action_status("Shortcut removed", ok=True) self._update_profile_shortcut_ui() self._safe(work, title="Remove shortcut error") def on_app_profile_load(self) -> None: prof = self._selected_app_profile() if prof is None: return def work() -> None: cmd = (getattr(prof, "command", "") or "").strip() target = (getattr(prof, "target", "") or "").strip().lower() ttl_sec = int(getattr(prof, "ttl_sec", 0) or 0) name = (getattr(prof, "name", "") or "").strip() if cmd: self.ed_app_cmd.setText(cmd) if target == "direct": self.rad_app_direct.setChecked(True) else: self.rad_app_vpn.setChecked(True) if ttl_sec > 0: # UI uses hours; round up. hours = max(1, (ttl_sec + 3599) // 3600) self.spn_app_ttl.setValue(int(hours)) self.chk_app_temporary.setChecked(True) else: self.chk_app_temporary.setChecked(False) self.ed_app_profile_name.setText(name) self._set_action_status("Profile loaded into form", ok=True) self._safe(work, title="Load profile error") def on_app_profile_run(self) -> None: prof = self._selected_app_profile() if prof is None: return def work() -> None: cmd = (getattr(prof, "command", "") or "").strip() target = (getattr(prof, "target", "") or "").strip().lower() ttl = int(getattr(prof, "ttl_sec", 0) or 0) app_key = (getattr(prof, "app_key", "") or "").strip() if not cmd: raise RuntimeError("profile has empty command") if target not in ("vpn", "direct"): target = "vpn" self._launch_and_mark(cmdline=cmd, target=target, ttl_sec=ttl, app_key=app_key) self._safe(work, title="Run profile error") def on_app_profile_delete(self) -> None: prof = self._selected_app_profile() if prof is None: return pid = (getattr(prof, "id", "") or "").strip() if not pid: return def work() -> None: if QMessageBox.question( self, "Delete profile", f"Delete saved profile?\n\nid={pid}", ) != QMessageBox.StandardButton.Yes: return res = self.ctrl.traffic_app_profile_delete(pid) if not res.ok: self._set_action_status(f"Delete profile failed: {res.message}", ok=False) raise RuntimeError(res.message or "delete failed") # Keep a tight coupling: deleting profile removes its shortcut too. sc_path = self._profile_shortcut_path(pid) if sc_path and os.path.exists(sc_path): try: os.remove(sc_path) self._append_app_log(f"[shortcut] removed: {sc_path} (profile deleted)") except Exception as e: self._append_app_log(f"[shortcut] remove failed: {e}") self._set_action_status(f"Profile deleted: {pid}", ok=True) self._append_app_log(f"[profile] deleted: id={pid}") self.refresh_app_profiles(quiet=True) self._safe(work, title="Delete profile error") def refresh_appmarks_counts(self) -> None: try: st = self.ctrl.traffic_appmarks_status() self.lbl_app_counts.setText( f"Marks: VPN={st.vpn_count}, Direct={st.direct_count}" ) except Exception as e: self.lbl_app_counts.setText(f"Marks: error: {e}") def refresh_appmarks_items(self, quiet: bool = False) -> None: def work() -> None: items = list(self.ctrl.traffic_appmarks_items() or []) self.lst_marks.clear() vpn = 0 direct = 0 for it in items: tgt = (getattr(it, "target", "") or "").strip().lower() if tgt == "vpn": vpn += 1 elif tgt == "direct": direct += 1 mid = int(getattr(it, "id", 0) or 0) app_key = (getattr(it, "app_key", "") or "").strip() unit = (getattr(it, "unit", "") or "").strip() cmd = (getattr(it, "command", "") or "").strip() rem = int(getattr(it, "remaining_sec", 0) or 0) if rem < 0: rem_txt = "persistent" else: rem_h = rem // 3600 rem_m = (rem % 3600) // 60 rem_s = rem % 60 rem_txt = f"ttl {rem_h:02d}:{rem_m:02d}:{rem_s:02d}" label = f"{tgt} {app_key or unit or mid} ({rem_txt})" q = QListWidgetItem(label) q.setToolTip( ( f"id: {mid}\n" f"target: {tgt}\n" f"app_key: {app_key}\n" f"unit: {unit}\n" f"remaining: {('persistent' if rem < 0 else str(rem) + 's')}\n\n" f"{cmd}" ).strip() ) q.setData(QtCore.Qt.UserRole, it) self.lst_marks.addItem(q) self.lbl_marks.setText(f"Active marks: {len(items)} (VPN={vpn}, Direct={direct})") self.btn_marks_unmark.setEnabled(self.lst_marks.count() > 0) if quiet: try: work() except Exception as e: self.lbl_marks.setText(f"Active marks: error: {e}") return self._safe(work, title="Refresh marks error") def on_appmarks_unmark_selected(self) -> None: sel = list(self.lst_marks.selectedItems() or []) if not sel: return # Convert selection to (target,id). pairs: list[tuple[str, int]] = [] for it in sel: obj = it.data(QtCore.Qt.UserRole) tgt = (getattr(obj, "target", "") or "").strip().lower() mid = int(getattr(obj, "id", 0) or 0) if tgt in ("vpn", "direct") and mid > 0: pairs.append((tgt, mid)) if not pairs: return def work() -> None: if QMessageBox.question( self, "Unmark selected", "Remove routing marks for selected items?\n\n" + "\n".join([f"{t}:{i}" for (t, i) in pairs[:20]]) + ("\n..." if len(pairs) > 20 else ""), ) != QMessageBox.StandardButton.Yes: return for (tgt, mid) in pairs: res = self.ctrl.traffic_appmarks_apply(op="del", target=tgt, cgroup=str(mid)) if not res.ok: raise RuntimeError(res.message or f"unmark failed: {tgt}:{mid}") self._set_action_status(f"Unmarked: {len(pairs)} item(s)", ok=True) self.refresh_appmarks_items(quiet=True) self.refresh_appmarks_counts() self.refresh_app_profiles(quiet=True) self._safe(work, title="Unmark selected error") def _refresh_last_scope_ui(self) -> None: unit = (self._last_app_unit or "").strip() target = (self._last_app_target or "").strip().lower() app_key = (self._last_app_key or "").strip() cg_id = int(self._last_app_cgroup_id or 0) parts = [] if unit: parts.append(f"unit={unit}") if target in ("vpn", "direct"): parts.append(f"target={target}") if app_key: parts.append(f"app={app_key}") if cg_id > 0: parts.append(f"cgroup_id={cg_id}") if parts: self.lbl_app_last.setText("Last scope: " + " | ".join(parts)) else: self.lbl_app_last.setText("Last scope: —") self.btn_app_stop_last.setEnabled(bool(unit)) self.btn_app_unmark_last.setEnabled(target in ("vpn", "direct") and cg_id > 0) def _set_last_scope( self, *, unit: str = "", target: str = "", app_key: str = "", cmdline: str = "", cgroup_id: int = 0, ) -> None: self._last_app_unit = str(unit or "").strip() self._last_app_target = str(target or "").strip().lower() self._last_app_key = str(app_key or "").strip() self._last_app_cmdline = str(cmdline or "").strip() try: self._last_app_cgroup_id = int(cgroup_id or 0) except Exception: self._last_app_cgroup_id = 0 self._settings.setValue("traffic_app_last_unit", self._last_app_unit) self._settings.setValue("traffic_app_last_target", self._last_app_target) self._settings.setValue("traffic_app_last_key", self._last_app_key) self._settings.setValue("traffic_app_last_cmdline", self._last_app_cmdline) self._settings.setValue("traffic_app_last_cgroup_id", int(self._last_app_cgroup_id or 0)) self._refresh_last_scope_ui() def on_app_pick(self) -> None: def work() -> None: dlg = AppPickerDialog(parent=self) if dlg.exec() != QDialog.Accepted: return cmd = (dlg.selected_command() or "").strip() if not cmd: return self.ed_app_cmd.setText(cmd) self._append_app_log(f"[picker] command filled: {cmd}") self._safe(work, title="App picker error") def on_app_audit(self) -> None: def work() -> None: audit = self.ctrl.traffic_audit() pretty = (getattr(audit, "pretty", "") or "").strip() if not pretty: pretty = f"ok={getattr(audit, 'ok', False)} message={getattr(audit, 'message', '')}" self._append_app_log("[audit]\n" + pretty) issues = list(getattr(audit, "issues", []) or []) ok = bool(getattr(audit, "ok", False)) and len(issues) == 0 if issues: self._set_action_status(f"Audit: issues={len(issues)}", ok=False) else: self._set_action_status("Audit: OK", ok=True) self._safe(work, title="Traffic audit error") def _run_systemd_unit(self, cmdline: str, *, unit: str) -> tuple[str, str]: args = shlex.split(cmdline or "") if not args: raise ValueError("empty command") run_cmd = [ "systemd-run", "--user", "--unit", unit, "--collect", "--same-dir", ] + args try: p = subprocess.run( run_cmd, capture_output=True, text=True, check=False, timeout=6, ) except subprocess.TimeoutExpired as e: raise RuntimeError( "systemd-run timed out (UI would freeze without this guard). " "Try again or verify that systemd --user is responsive.\n\n" f"command: {' '.join(run_cmd)}" ) from e out = ((p.stdout or "") + (p.stderr or "")).strip() if p.returncode != 0: raise RuntimeError(f"systemd-run failed: {p.returncode}\n{out}".strip()) # EN: Some apps can be migrated into a different app scope by the desktop/session # EN: integration. Using unit ControlGroup is then incorrect. Prefer reading the # EN: effective cgroup from the unit MainPID (/proc//cgroup) and fall back # EN: to systemctl ControlGroup only if needed. # RU: Некоторые приложения могут мигрировать в другой app scope (интеграция # RU: с desktop/session). Тогда ControlGroup юнита неверен. Предпочитаем читать # RU: реальный cgroup по MainPID (/proc//cgroup) и только потом fallback # RU: на systemctl ControlGroup. cg = self._effective_cgroup_for_unit_retry(unit, timeout_sec=3.0) return cg, out def _effective_cgroup_for_unit_retry(self, unit: str, *, timeout_sec: float = 2.0) -> str: u = (unit or "").strip() if not u: raise ValueError("empty unit") deadline = time.time() + max(0.2, float(timeout_sec or 0)) last_pid_out = "" while time.time() < deadline: code, out = self._systemctl_user(["show", "-p", "MainPID", "--value", u]) last_pid_out = out or "" if code == 0: try: pid = int((out or "").strip() or "0") except Exception: pid = 0 if pid > 0: cg = self._cgroup_path_from_pid(pid) if cg: return cg low = (out or "").lower() if "could not be found" in low or "not found" in low: break time.sleep(0.1) # Fallback: unit ControlGroup (may be wrong for migrated apps). try: cg = self._control_group_for_unit_retry(u, timeout_sec=1.0) if cg: return cg except Exception: pass raise RuntimeError( ( "failed to query effective cgroup\n" + (last_pid_out.strip() or "(no output)") + "\n\n" + "EN: Could not resolve unit MainPID->/proc//cgroup.\n" + "RU: Не удалось получить MainPID и прочитать /proc//cgroup." ).strip() ) def _cgroup_path_from_pid(self, pid: int) -> str: p = int(pid or 0) if p <= 0: return "" try: with open(f"/proc/{p}/cgroup", "r", encoding="utf-8", errors="replace") as f: for raw in f: line = (raw or "").strip() if not line: continue if line.startswith("0::"): cg = line[len("0::") :].strip() return cg except Exception: return "" return "" def _cmdline_from_pid(self, pid: int) -> str: p = int(pid or 0) if p <= 0: return "" try: with open(f"/proc/{p}/cmdline", "rb") as f: raw = f.read() or b"" parts = [x for x in raw.split(b"\x00") if x] out = " ".join([x.decode("utf-8", errors="replace") for x in parts]) return out.strip() except Exception: return "" def _exe_from_pid(self, pid: int) -> str: p = int(pid or 0) if p <= 0: return "" try: return os.readlink(f"/proc/{p}/exe").strip() except Exception: return "" def on_app_pick_pid(self) -> None: def work() -> None: dlg = ProcessPickerDialog(parent=self) if dlg.exec() != QDialog.Accepted: return ent = dlg.selected_entry() if ent is None: return self.ed_app_pid.setText(str(int(ent.pid or 0))) self._append_app_log( f"[picker] pid selected: pid={ent.pid} user={ent.user or ent.uid} comm={ent.comm or '-'} exe={ent.exe or '-'}" ) self._safe(work, title="Process picker error") def _control_group_for_unit_retry(self, unit: str, *, timeout_sec: float = 2.0) -> str: u = (unit or "").strip() if not u: raise ValueError("empty unit") deadline = time.time() + max(0.2, float(timeout_sec or 0)) last_out = "" while time.time() < deadline: code, out = self._systemctl_user(["show", "-p", "ControlGroup", "--value", u]) last_out = out or "" if code == 0: cg = (out or "").strip() if cg: return cg low = (out or "").lower() if "could not be found" in low or "not found" in low or "loaded units listed" in low: break time.sleep(0.1) # Provide a more actionable error for users. code_s, out_s = self._systemctl_user(["status", u, "--no-pager", "--plain"]) status_txt = (out_s or "").strip() hint = ( "EN: Scope unit may have exited immediately. If the app is already running, " "this launch may not create a new process tree inside the scope.\n" "EN: Try closing the app полностью and запуск again from here.\n" "RU: Scope мог завершиться сразу. Если приложение уже запущено, повторный запуск " "может не создать новый процесс внутри scope.\n" "RU: Попробуй полностью закрыть приложение и запустить снова отсюда." ) raise RuntimeError( ( "failed to query ControlGroup\n" + (last_out.strip() or "(no output)") + ("\n\nstatus:\n" + status_txt if status_txt else "") + "\n\n" + hint ).strip() ) def on_app_mark_pid(self) -> None: def work() -> None: raw = (self.ed_app_pid.text() or "").strip() if not raw: QMessageBox.warning(self, "Missing PID", "Please enter a PID first.") return try: pid = int(raw) except Exception: QMessageBox.warning(self, "Invalid PID", f"PID must be an integer: {raw!r}") return if pid <= 0: QMessageBox.warning(self, "Invalid PID", f"PID must be > 0: {pid}") return cg = self._cgroup_path_from_pid(pid) if not cg: raise RuntimeError(f"failed to read cgroup for pid={pid} (process may not exist)") cmdline = self._cmdline_from_pid(pid) or f"pid={pid}" # Prefer command-derived key to align with Profiles/Run behavior (less duplication). app_key = self._infer_app_key_from_cmdline(cmdline) if not app_key: app_key = self._exe_from_pid(pid) if not app_key: app_key = f"pid:{pid}" target = "vpn" if self.rad_app_vpn.isChecked() else "direct" ttl_sec = self._ui_runtime_mark_ttl_sec() ttl_txt = "persistent" if ttl_sec <= 0 else f"{ttl_sec}s" self._append_app_log(f"[pid] mark: pid={pid} target={target} ttl={ttl_txt}") self._append_app_log(f"[pid] cgroup: {cg}") if cmdline: self._append_app_log(f"[pid] cmdline: {cmdline}") res = self.ctrl.traffic_appmarks_apply( op="add", target=target, cgroup=cg, unit="", command=cmdline, app_key=app_key, timeout_sec=ttl_sec, ) if not res.ok: self._append_app_log(f"[pid] ERROR: {res.message}") self._set_action_status(f"PID mark failed: {res.message}", ok=False) QMessageBox.critical(self, "Mark PID error", res.message or "mark failed") return res_timeout_txt = "persistent" if int(res.timeout_sec or 0) <= 0 else f"{res.timeout_sec}s" self._append_app_log(f"[pid] OK: {res.message} cgroup_id={res.cgroup_id} timeout={res_timeout_txt}") self._set_action_status(f"PID marked: target={target} cgroup_id={res.cgroup_id}", ok=True) self._set_last_scope(unit="", target=target, app_key=app_key, cmdline=cmdline, cgroup_id=int(res.cgroup_id or 0)) self.refresh_appmarks_counts() self.refresh_appmarks_items(quiet=True) self.refresh_app_profiles(quiet=True) self._safe(work, title="Mark PID error") def on_app_run(self) -> None: def work() -> None: cmdline = (self.ed_app_cmd.text() or "").strip() if not cmdline: QMessageBox.warning(self, "Missing command", "Please enter a command to run.") return target = "vpn" if self.rad_app_vpn.isChecked() else "direct" ttl_sec = self._ui_runtime_mark_ttl_sec() app_key = self._infer_app_key_from_cmdline(cmdline) self._launch_and_mark(cmdline=cmdline, target=target, ttl_sec=ttl_sec, app_key=app_key) self._safe(work, title="Run app error") def on_app_stop_last_scope(self) -> None: unit = (self._last_app_unit or "").strip() if not unit: return def work() -> None: self._append_app_log(f"[app] stop last scope: unit={unit}") p = subprocess.run( ["systemctl", "--user", "stop", unit], capture_output=True, text=True, check=False, ) out = ((p.stdout or "") + (p.stderr or "")).strip() if p.returncode == 0: self._append_app_log("[app] stopped OK") if out: self._append_app_log(out) self._set_action_status(f"Stopped scope: {unit}", ok=True) self.refresh_running_scopes(quiet=True) self.refresh_app_profiles(quiet=True) return self._append_app_log(f"[app] stop failed: rc={p.returncode}") if out: self._append_app_log(out) # fallback: kill p2 = subprocess.run( ["systemctl", "--user", "kill", unit], capture_output=True, text=True, check=False, ) out2 = ((p2.stdout or "") + (p2.stderr or "")).strip() if p2.returncode == 0: self._append_app_log("[app] kill OK (fallback)") if out2: self._append_app_log(out2) self._set_action_status(f"Killed scope: {unit}", ok=True) self.refresh_running_scopes(quiet=True) self.refresh_app_profiles(quiet=True) return self._append_app_log(f"[app] kill failed: rc={p2.returncode}") if out2: self._append_app_log(out2) self._set_action_status(f"Stop scope failed: {unit}", ok=False) QMessageBox.critical(self, "Stop scope error", out2 or out or "stop failed") self._safe(work, title="Stop scope error") def on_app_unmark_last(self) -> None: unit = (self._last_app_unit or "").strip() target = (self._last_app_target or "").strip().lower() cg_id = int(self._last_app_cgroup_id or 0) if target not in ("vpn", "direct") or cg_id <= 0: return def work() -> None: if QMessageBox.question( self, "Unmark last", f"Remove routing mark for last item?\n\nunit={unit or '-'}\ntarget={target}\ncgroup_id={cg_id}", ) != QMessageBox.StandardButton.Yes: return res = self.ctrl.traffic_appmarks_apply( op="del", target=target, cgroup=str(cg_id), ) if res.ok: self._append_app_log(f"[appmarks] unmarked: target={target} cgroup_id={cg_id}") self._set_action_status(f"Unmarked last: target={target} cgroup_id={cg_id}", ok=True) else: self._append_app_log(f"[appmarks] unmark error: {res.message}") self._set_action_status(f"Unmark last failed: {res.message}", ok=False) QMessageBox.critical(self, "Unmark error", res.message or "unmark failed") self.refresh_appmarks_counts() self.refresh_app_profiles(quiet=True) self._safe(work, title="Unmark error") def on_appmarks_clear(self, target: str) -> None: tgt = (target or "").strip().lower() if tgt not in ("vpn", "direct"): return def work() -> None: if QMessageBox.question( self, "Clear marks", f"Clear ALL runtime app marks for target={tgt}?", ) != QMessageBox.StandardButton.Yes: return res = self.ctrl.traffic_appmarks_apply(op="clear", target=tgt) if res.ok: self._append_app_log(f"[appmarks] cleared: target={tgt}") self._set_action_status(f"App marks cleared: target={tgt}", ok=True) else: self._append_app_log(f"[appmarks] clear error: {res.message}") self._set_action_status( f"App marks clear failed: target={tgt} ({res.message})", ok=False, ) QMessageBox.critical( self, "Clear marks error", res.message or "unknown error" ) self.refresh_appmarks_counts() self.refresh_appmarks_items(quiet=True) self.refresh_app_profiles(quiet=True) self._safe(work, title="Clear app marks error") # ----------------------------------------------------------------- # Scopes list + cleanup (runtime) # ----------------------------------------------------------------- def _scope_target_from_unit(self, unit: str) -> str: u = (unit or "").strip() if not u.startswith("svpn-"): return "" rest = u[len("svpn-") :] if rest.startswith("vpn-"): return "vpn" if rest.startswith("direct-"): return "direct" return "" def _systemctl_user(self, args: list[str]) -> tuple[int, str]: cmd = ["systemctl", "--user"] + list(args or []) try: p = subprocess.run( cmd, capture_output=True, text=True, check=False, timeout=4, ) out = ((p.stdout or "") + (p.stderr or "")).strip() return int(p.returncode or 0), out except subprocess.TimeoutExpired: return 124, f"timeout running: {' '.join(cmd)}" def _list_running_svpn_units(self) -> list[str]: code, out = self._systemctl_user( [ "list-units", "--type=scope", "--type=service", "--state=running", "--no-legend", "--no-pager", "--plain", ] ) if code != 0: raise RuntimeError(out or f"systemctl list-units failed: rc={code}") units: list[str] = [] for raw in (out or "").splitlines(): line = raw.strip() if not line: continue fields = line.split() if not fields: continue unit = fields[0].strip() if unit.startswith("svpn-") and (unit.endswith(".scope") or unit.endswith(".service")): units.append(unit) units.sort() return units def _control_group_for_unit(self, unit: str) -> str: code, out = self._systemctl_user(["show", "-p", "ControlGroup", "--value", unit]) if code != 0: raise RuntimeError(out or f"systemctl show failed: rc={code}") return (out or "").strip() def _cgroup_inode_id(self, cgroup_path: str) -> int: cg = (cgroup_path or "").strip() if not cg: return 0 rel = cg.lstrip("/") if not rel: return 0 full = os.path.join("/sys/fs/cgroup", rel) try: st = os.stat(full) return int(getattr(st, "st_ino", 0) or 0) except Exception: return 0 def refresh_running_scopes(self, *, quiet: bool = False) -> None: def work() -> None: units = self._list_running_svpn_units() self.lst_scopes.clear() for unit in units: target = self._scope_target_from_unit(unit) cg = "" cg_id = 0 try: # Prefer effective cgroup via MainPID (/proc//cgroup) for services. # Some GUI apps can migrate into a different app scope after launch. if unit.endswith(".service"): code, out = self._systemctl_user(["show", "-p", "MainPID", "--value", unit]) if code == 0: try: pid = int((out or "").strip() or "0") except Exception: pid = 0 if pid > 0: cg = self._cgroup_path_from_pid(pid) if not cg: cg = self._control_group_for_unit(unit) cg_id = self._cgroup_inode_id(cg) except Exception: pass label = unit if target in ("vpn", "direct"): label += f" [{target.upper()}]" it = QListWidgetItem(label) info = RuntimeScopeInfo( unit=unit, target=target or "?", cgroup_path=cg, cgroup_id=int(cg_id or 0), ) it.setData(QtCore.Qt.UserRole, info) it.setToolTip( f"Unit: {unit}\n" f"Target: {target or '-'}\n" f"ControlGroup: {cg or '-'}\n" f"cgroup_id: {cg_id or '-'}\n\n" "EN: Stop selected will unmark by cgroup_id (if known) and stop the unit.\n" "RU: Stop selected удалит метку по cgroup_id (если известен) и остановит unit." ) self.lst_scopes.addItem(it) self.lbl_scopes.setText(f"Running units: {len(units)}") has_any = self.lst_scopes.count() > 0 self.btn_scopes_stop_selected.setEnabled(has_any) self.btn_scopes_cleanup.setEnabled(has_any) if quiet: try: work() except Exception: # Quiet mode: avoid modal popups on background refresh. pass return self._safe(work, title="Units refresh error") def _copy_scope_unit(self, it: QListWidgetItem) -> None: unit = "" info = it.data(QtCore.Qt.UserRole) if it else None if isinstance(info, RuntimeScopeInfo): unit = info.unit else: unit = (it.text() or "").split()[0].strip() if it else "" if not unit: return try: QtGui.QGuiApplication.clipboard().setText(unit) except Exception: pass self._append_app_log(f"[unit] copied unit: {unit}") self._set_action_status(f"Copied unit: {unit}", ok=True) def _stop_scope_unit(self, unit: str) -> None: u = (unit or "").strip() if not u: return code, out = self._systemctl_user(["stop", u]) if code == 0: if out: self._append_app_log(out) return code2, out2 = self._systemctl_user(["kill", u]) if code2 != 0: raise RuntimeError(out2 or out or f"stop/kill failed: {u}") def _unmark_scope(self, info: RuntimeScopeInfo) -> None: target = (info.target or "").strip().lower() cg_id = int(info.cgroup_id or 0) if target not in ("vpn", "direct") or cg_id <= 0: return res = self.ctrl.traffic_appmarks_apply( op="del", target=target, cgroup=str(cg_id), ) if not res.ok: raise RuntimeError(res.message or "unmark failed") def _selected_scope_infos(self) -> list[RuntimeScopeInfo]: infos: list[RuntimeScopeInfo] = [] for it in self.lst_scopes.selectedItems() or []: info = it.data(QtCore.Qt.UserRole) if it else None if isinstance(info, RuntimeScopeInfo): infos.append(info) return infos def on_scopes_stop_selected(self) -> None: def work() -> None: infos = self._selected_scope_infos() if not infos: QMessageBox.information(self, "No selection", "Select one or more units first.") return if QMessageBox.question( self, "Stop selected", f"Unmark + stop {len(infos)} selected unit(s)?", ) != QMessageBox.StandardButton.Yes: return for info in infos: self._append_app_log( f"[unit] stop: unit={info.unit} target={info.target} cgroup_id={info.cgroup_id}" ) try: self._unmark_scope(info) self._append_app_log("[unit] unmark OK") except Exception as e: self._append_app_log(f"[unit] unmark WARN: {e}") self._stop_scope_unit(info.unit) self._append_app_log("[unit] stop OK") self.refresh_running_scopes() self.refresh_appmarks_counts() self.refresh_appmarks_items(quiet=True) self.refresh_app_profiles(quiet=True) self._set_action_status(f"Stopped units: {len(infos)}", ok=True) self._safe(work, title="Stop selected units error") def on_scopes_cleanup_all(self) -> None: def work() -> None: units = self._list_running_svpn_units() if not units: self._set_action_status("No running svpn units", ok=True) self.refresh_running_scopes() return if QMessageBox.question( self, "Cleanup all", f"Unmark + stop ALL running svpn units ({len(units)})?", ) != QMessageBox.StandardButton.Yes: return stopped = 0 for unit in units: target = self._scope_target_from_unit(unit) cg = "" cg_id = 0 try: cg = self._control_group_for_unit(unit) cg_id = self._cgroup_inode_id(cg) except Exception: pass info = RuntimeScopeInfo( unit=unit, target=target or "?", cgroup_path=cg, cgroup_id=int(cg_id or 0), ) self._append_app_log( f"[unit] cleanup: unit={info.unit} target={info.target} cgroup_id={info.cgroup_id}" ) try: self._unmark_scope(info) self._append_app_log("[unit] unmark OK") except Exception as e: self._append_app_log(f"[unit] unmark WARN: {e}") try: self._stop_scope_unit(info.unit) stopped += 1 except Exception as e: self._append_app_log(f"[unit] stop ERROR: {e}") self.refresh_running_scopes() self.refresh_appmarks_counts() self.refresh_appmarks_items(quiet=True) self.refresh_app_profiles(quiet=True) self._set_action_status(f"Cleanup done: stopped={stopped}/{len(units)}", ok=True) self._safe(work, title="Cleanup units error") def on_rollback(self) -> None: def work() -> None: res = self.ctrl.routes_clear() self._emit_log(res.pretty_text or "rollback done") self._set_action_status(res.pretty_text or "routes cleared (cache saved)", ok=bool(res.ok)) self.refresh_state() if self.refresh_cb: self.refresh_cb() self._safe(work, title="Rollback error") def on_restore_cache(self) -> None: def work() -> None: res = self.ctrl.routes_cache_restore() self._emit_log(res.pretty_text or "cache restore done") self._set_action_status(res.pretty_text or "routes restored from cache", ok=bool(res.ok)) self.refresh_state() if self.refresh_cb: self.refresh_cb() self._safe(work, title="Restore cache error") class TrafficCandidatesDialog(QDialog): def __init__( self, candidates, *, existing: dict[str, dict[str, set[str]]] | None = None, add_cb: Callable[[str, str, list[str]], None], parent=None, ) -> None: super().__init__(parent) self.cands = candidates self.add_cb = add_cb self.existing = existing or {"vpn": {}, "direct": {}} self.setWindowTitle("Add detected overrides") self.resize(820, 680) root = QVBoxLayout(self) note = QLabel( "Tip: hover list items for details. Подсказка: наведи на элементы списка.\n" "Detect results from backend. Nothing is applied until you click Apply overrides." ) note.setWordWrap(True) note.setStyleSheet("color: gray;") root.addWidget(note) self.chk_hide_existing = QCheckBox("Hide already added") self.chk_hide_existing.setToolTip( """EN: Hides items that are already present in Force VPN/Force Direct fields. RU: Скрывает элементы, которые уже есть в Force VPN/Force Direct.""" ) self.chk_hide_existing.stateChanged.connect(lambda _s: self._refilter_current()) root.addWidget(self.chk_hide_existing) self.tabs = QTabWidget() root.addWidget(self.tabs, stretch=1) self._tab_kind: dict[QWidget, str] = {} self._tab_list: dict[QWidget, QListWidget] = {} self._tab_filter: dict[QWidget, QLineEdit] = {} self._list_kind: dict[QListWidget, str] = {} self._list_title: dict[QListWidget, str] = {} self.tabs.currentChanged.connect(lambda _idx: self._refilter_current()) self._build_subnets_tab() self._build_services_tab() self._build_uids_tab() row = QHBoxLayout() btn_vpn = QPushButton("Add to Force VPN") btn_vpn.clicked.connect(lambda: self._add_selected("vpn")) row.addWidget(btn_vpn) btn_direct = QPushButton("Add to Force Direct") btn_direct.clicked.connect(lambda: self._add_selected("direct")) row.addWidget(btn_direct) row.addStretch(1) btn_close = QPushButton("Close") btn_close.clicked.connect(self.accept) row.addWidget(btn_close) root.addLayout(row) self.lbl_status = QLabel("—") self.lbl_status.setWordWrap(True) self.lbl_status.setStyleSheet("color: gray;") root.addWidget(self.lbl_status) def _mark_state(self, kind: str, value: str) -> tuple[bool, bool]: k = (kind or "").strip().lower() v = (value or "").strip() if not k or not v: return False, False in_vpn = v in (self.existing.get("vpn", {}).get(k, set()) or set()) in_direct = v in (self.existing.get("direct", {}).get(k, set()) or set()) return bool(in_vpn), bool(in_direct) def _set_status(self, msg: str, ok: bool | None = None) -> None: text = (msg or "").strip() or "—" self.lbl_status.setText(text) if ok is True: self.lbl_status.setStyleSheet("color: green;") elif ok is False: self.lbl_status.setStyleSheet("color: red;") else: self.lbl_status.setStyleSheet("color: gray;") def _apply_filter(self, lst: QListWidget, query: str) -> None: q = (query or "").strip().lower() hide_existing = bool(self.chk_hide_existing.isChecked()) kind = (self._list_kind.get(lst) or "").strip().lower() # Subnets-only quick filters. allow_lan = True allow_docker = True allow_link = True allow_linkdown = True if kind == "subnet": allow_lan = bool(getattr(self, "chk_sub_show_lan", None) and self.chk_sub_show_lan.isChecked()) allow_docker = bool(getattr(self, "chk_sub_show_docker", None) and self.chk_sub_show_docker.isChecked()) allow_link = bool(getattr(self, "chk_sub_show_link", None) and self.chk_sub_show_link.isChecked()) allow_linkdown = not bool(getattr(self, "chk_sub_hide_linkdown", None) and self.chk_sub_hide_linkdown.isChecked()) for i in range(lst.count()): it = lst.item(i) if not it: continue if hide_existing and bool(it.data(QtCore.Qt.UserRole + 1) or False): it.setHidden(True) continue if kind == "subnet": it_kind = str(it.data(QtCore.Qt.UserRole + 4) or "").strip().lower() it_linkdown = bool(it.data(QtCore.Qt.UserRole + 5) or False) if it_linkdown and not allow_linkdown: it.setHidden(True) continue if it_kind == "docker" and not allow_docker: it.setHidden(True) continue if it_kind == "lan" and not allow_lan: it.setHidden(True) continue if it_kind == "link" and not allow_link: it.setHidden(True) continue if not q: it.setHidden(False) continue it.setHidden(q not in it.text().lower()) def _refilter_current(self) -> None: tab = self.tabs.currentWidget() if tab is None: return lst = self._tab_list.get(tab) filt = self._tab_filter.get(tab) if lst is None or filt is None: return self._apply_filter(lst, filt.text()) def _filter_for_title(self, title: str) -> QLineEdit | None: for i in range(self.tabs.count()): if self.tabs.tabText(i) == title: tab = self.tabs.widget(i) return self._tab_filter.get(tab) return None def _preset_set_filter(self, title: str, text: str) -> None: filt = self._filter_for_title(title) if filt is not None: filt.setText(text) def _preset_filter_subnets(self, *, lan: bool, docker: bool, link: bool) -> None: # EN: "Filter LAN/Docker" buttons should not rely on text search; they should # EN: toggle the kind checkboxes and clear the text filter. # RU: Кнопки "Filter LAN/Docker" не должны полагаться на текстовый поиск; # RU: они должны переключать чекбоксы видов и очищать текстовый фильтр. chk_lan = getattr(self, "chk_sub_show_lan", None) chk_docker = getattr(self, "chk_sub_show_docker", None) chk_link = getattr(self, "chk_sub_show_link", None) if chk_lan is None or chk_docker is None or chk_link is None: self._preset_set_filter("Subnets", "") self._refilter_current() return for chk, val in ( (chk_lan, lan), (chk_docker, docker), (chk_link, link), ): try: chk.blockSignals(True) chk.setChecked(bool(val)) finally: chk.blockSignals(False) self._preset_set_filter("Subnets", "") self._refilter_current() def _update_item_render(self, it: QListWidgetItem, kind: str) -> None: value = str(it.data(QtCore.Qt.UserRole) or "").strip() base_label = str(it.data(QtCore.Qt.UserRole + 2) or it.text() or "") base_tip = str(it.data(QtCore.Qt.UserRole + 3) or it.toolTip() or "") in_vpn, in_direct = self._mark_state(kind, value) flags = [] if in_vpn: flags.append("VPN") if in_direct: flags.append("DIRECT") label = base_label if flags: label = f"{base_label} [{' + '.join(flags)}]" it.setText(label) it.setData(QtCore.Qt.UserRole + 1, bool(in_vpn or in_direct)) if base_tip.strip(): extra_tip = ( f"\n\nAlready in Force VPN: {'yes' if in_vpn else 'no'}\n" f"Already in Force Direct: {'yes' if in_direct else 'no'}" ) it.setToolTip(base_tip + extra_tip) if in_vpn or in_direct: it.setForeground(QtGui.QBrush(QtGui.QColor("gray"))) def _add_tab(self, title: str, kind: str, items: list[tuple[str, str, str]], *, extra=None) -> None: tab = QWidget() layout = QVBoxLayout(tab) if extra is not None: extra(layout) filt = QLineEdit() filt.setPlaceholderText("Filter...") layout.addWidget(filt) lst = QListWidget() lst.setSelectionMode(QAbstractItemView.ExtendedSelection) for entry in items: label = str(entry[0]) if len(entry) > 0 else "" value = str(entry[1]) if len(entry) > 1 else "" tip = str(entry[2]) if len(entry) > 2 else "" meta_kind = str(entry[3]) if len(entry) > 3 else "" meta_linkdown = bool(entry[4]) if len(entry) > 4 else False it = QListWidgetItem(label) it.setData(QtCore.Qt.UserRole, value) it.setData(QtCore.Qt.UserRole + 2, label) # base label (without [VPN]/[DIRECT]) it.setData(QtCore.Qt.UserRole + 3, tip) # base tooltip (without existing-state) it.setData(QtCore.Qt.UserRole + 4, meta_kind) it.setData(QtCore.Qt.UserRole + 5, meta_linkdown) lst.addItem(it) self._update_item_render(it, kind) layout.addWidget(lst, stretch=1) filt.textChanged.connect(lambda txt, l=lst: self._apply_filter(l, txt)) self.tabs.addTab(tab, title) self._tab_kind[tab] = kind self._tab_list[tab] = lst self._tab_filter[tab] = filt self._list_kind[lst] = kind self._list_title[lst] = title def _current_kind_and_list(self) -> tuple[str, QListWidget | None]: tab = self.tabs.currentWidget() if tab is None: return "", None return self._tab_kind.get(tab, ""), self._tab_list.get(tab) def _add_selected(self, target: str) -> None: kind, lst = self._current_kind_and_list() if not kind or lst is None: return vals: list[str] = [] for it in lst.selectedItems(): v = it.data(QtCore.Qt.UserRole) vv = str(v or "").strip() if vv: vals.append(vv) # stable de-dupe out: list[str] = [] seen: set[str] = set() for v in vals: if v in seen: continue seen.add(v) out.append(v) if not out: self._set_status("Nothing selected", ok=None) return tgt = (target or "").strip().lower() k = (kind or "").strip().lower() other = "direct" if tgt == "vpn" else "vpn" have_tgt = self.existing.get(tgt, {}).get(k, set()) or set() have_other = self.existing.get(other, {}).get(k, set()) or set() to_add: list[str] = [] skipped = 0 conflicts = 0 for v in out: if v in have_tgt: skipped += 1 continue if v in have_other: conflicts += 1 to_add.append(v) if not to_add: self._set_status(f"Nothing new to add (skipped={skipped}, conflicts={conflicts})", ok=None) return self.add_cb(target, kind, to_add) # Update local state so UI marks newly added items immediately. if tgt not in self.existing: self.existing[tgt] = {} if k not in self.existing[tgt]: self.existing[tgt][k] = set() for v in to_add: self.existing[tgt][k].add(v) for i in range(lst.count()): it = lst.item(i) if it is None: continue self._update_item_render(it, kind) self._refilter_current() msg = f"Added {len(to_add)} item(s) to Force {tgt.upper()} ({k})." if skipped or conflicts: msg += f" skipped={skipped} conflicts={conflicts}" self._set_status(msg, ok=True) def _list_for_title(self, title: str) -> QListWidget | None: for i in range(self.tabs.count()): if self.tabs.tabText(i) == title: tab = self.tabs.widget(i) return self._tab_list.get(tab) return None def _preset_clear_selection(self, title: str) -> None: lst = self._list_for_title(title) if lst is not None: lst.clearSelection() def _preset_select_services(self, keywords: list[str]) -> None: lst = self._list_for_title("Services") if lst is None: return keys = [str(k).strip().lower() for k in (keywords or []) if str(k).strip()] if not keys: return lst.clearSelection() for i in range(lst.count()): it = lst.item(i) if it is None: continue txt = (it.text() or "").lower() val = str(it.data(QtCore.Qt.UserRole) or "").lower() if any(k in txt or k in val for k in keys): it.setSelected(True) def _preset_select_uids(self, uids: list[int]) -> None: lst = self._list_for_title("UIDs") if lst is None: return want = {f"{int(u)}-{int(u)}" for u in (uids or [])} if not want: return lst.clearSelection() for i in range(lst.count()): it = lst.item(i) if it is None: continue token = str(it.data(QtCore.Qt.UserRole) or "").strip() if token in want: it.setSelected(True) def _build_subnets_tab(self) -> None: subs = list(getattr(self.cands, "subnets", []) or []) items: list[tuple[str, str, str, str, bool]] = [] for s in subs: cidr = str(getattr(s, "cidr", "") or "").strip() if not cidr: continue dev = str(getattr(s, "dev", "") or "").strip() kind = str(getattr(s, "kind", "") or "").strip() linkdown = bool(getattr(s, "linkdown", False)) tags = [] if kind: tags.append(kind) if dev: tags.append(dev) if linkdown: tags.append("linkdown") tag_txt = " " + "[" + ", ".join(tags) + "]" if tags else "" tip = ( f"CIDR: {cidr}\n" f"kind={kind or '-'} dev={dev or '-'} linkdown={linkdown}\n\n" "EN: Source subnet overrides affect forwarded traffic (Docker).\n" "RU: Source subnet влияет на forwarded трафик (Docker)." ) items.append((f"{cidr}{tag_txt}", cidr, tip, kind, linkdown)) def extra(layout: QVBoxLayout) -> None: row = QHBoxLayout() btn_lan = QPushButton("Keep LAN direct") btn_lan.clicked.connect(lambda: self._preset_add_lan_direct()) row.addWidget(btn_lan) btn_docker = QPushButton("Keep Docker direct") btn_docker.clicked.connect(lambda: self._preset_add_docker_direct()) row.addWidget(btn_docker) row.addStretch(1) layout.addLayout(row) row2 = QHBoxLayout() btn_f_lan = QPushButton("Filter LAN") btn_f_lan.clicked.connect(lambda: self._preset_filter_subnets(lan=True, docker=False, link=True)) row2.addWidget(btn_f_lan) btn_f_docker = QPushButton("Filter Docker") btn_f_docker.clicked.connect(lambda: self._preset_filter_subnets(lan=False, docker=True, link=False)) row2.addWidget(btn_f_docker) btn_f_clear = QPushButton("Clear filter") btn_f_clear.clicked.connect(lambda: self._preset_filter_subnets(lan=True, docker=True, link=True)) row2.addWidget(btn_f_clear) row2.addStretch(1) layout.addLayout(row2) row3 = QHBoxLayout() self.chk_sub_show_lan = QCheckBox("LAN") self.chk_sub_show_lan.setChecked(True) self.chk_sub_show_lan.setToolTip("EN: Show LAN subnets.\nRU: Показать LAN подсети.") self.chk_sub_show_lan.stateChanged.connect(lambda _s: self._refilter_current()) row3.addWidget(self.chk_sub_show_lan) self.chk_sub_show_docker = QCheckBox("Docker") self.chk_sub_show_docker.setChecked(True) self.chk_sub_show_docker.setToolTip("EN: Show Docker/container subnets.\nRU: Показать Docker/контейнерные подсети.") self.chk_sub_show_docker.stateChanged.connect(lambda _s: self._refilter_current()) row3.addWidget(self.chk_sub_show_docker) self.chk_sub_show_link = QCheckBox("Link (scope link)") self.chk_sub_show_link.setChecked(True) self.chk_sub_show_link.setToolTip("EN: Show link-scope routes.\nRU: Показать маршруты scope link.") self.chk_sub_show_link.stateChanged.connect(lambda _s: self._refilter_current()) row3.addWidget(self.chk_sub_show_link) self.chk_sub_hide_linkdown = QCheckBox("Hide linkdown") self.chk_sub_hide_linkdown.setChecked(True) self.chk_sub_hide_linkdown.setToolTip("EN: Hide routes marked as linkdown.\nRU: Скрыть маршруты с меткой linkdown.") self.chk_sub_hide_linkdown.stateChanged.connect(lambda _s: self._refilter_current()) row3.addWidget(self.chk_sub_hide_linkdown) row3.addStretch(1) layout.addLayout(row3) self._add_tab("Subnets", "subnet", items, extra=extra) def _preset_add_lan_direct(self) -> None: subs = list(getattr(self.cands, "subnets", []) or []) vals: list[str] = [] for s in subs: kind = str(getattr(s, "kind", "") or "").strip() cidr = str(getattr(s, "cidr", "") or "").strip() if not cidr: continue if kind in ("lan", "link"): vals.append(cidr) if vals: self.add_cb("direct", "subnet", vals) def _preset_add_docker_direct(self) -> None: subs = list(getattr(self.cands, "subnets", []) or []) vals: list[str] = [] for s in subs: kind = str(getattr(s, "kind", "") or "").strip() cidr = str(getattr(s, "cidr", "") or "").strip() if not cidr: continue if kind == "docker": vals.append(cidr) if vals: self.add_cb("direct", "subnet", vals) def _build_services_tab(self) -> None: units = list(getattr(self.cands, "units", []) or []) items: list[tuple[str, str, str]] = [] for u in units: unit = str(getattr(u, "unit", "") or "").strip() if not unit: continue desc = str(getattr(u, "description", "") or "").strip() cgroup = str(getattr(u, "cgroup", "") or "").strip() or unit label = unit if desc: label += " - " + desc tip = ( f"Unit: {unit}\n" f"Cgroup token: {cgroup}\n\n" "EN: Adds a cgroup override; backend resolves it to UID rules at apply time.\n" "RU: Добавляет cgroup override; backend резолвит его в UID правила при применении." ) items.append((label, cgroup, tip)) def extra(layout: QVBoxLayout) -> None: row = QHBoxLayout() btn_docker = QPushButton("Select docker/container") btn_docker.clicked.connect(lambda: self._preset_select_services(["docker", "containerd", "podman"])) row.addWidget(btn_docker) btn_media = QPushButton("Select media (jellyfin/plex)") btn_media.clicked.connect(lambda: self._preset_select_services(["jellyfin", "plex", "emby"])) row.addWidget(btn_media) btn_clear = QPushButton("Clear selection") btn_clear.clicked.connect(lambda: self._preset_clear_selection("Services")) row.addWidget(btn_clear) row.addStretch(1) layout.addLayout(row) row2 = QHBoxLayout() btn_f_docker = QPushButton("Filter docker") btn_f_docker.clicked.connect(lambda: self._preset_set_filter("Services", "docker")) row2.addWidget(btn_f_docker) btn_f_media = QPushButton("Filter media") btn_f_media.clicked.connect(lambda: self._preset_set_filter("Services", "jellyfin")) row2.addWidget(btn_f_media) btn_f_clear = QPushButton("Clear filter") btn_f_clear.clicked.connect(lambda: self._preset_set_filter("Services", "")) row2.addWidget(btn_f_clear) row2.addStretch(1) layout.addLayout(row2) self._add_tab("Services", "cgroup", items, extra=extra) def _build_uids_tab(self) -> None: uids = list(getattr(self.cands, "uids", []) or []) items: list[tuple[str, str, str]] = [] for u in uids: try: uid = int(getattr(u, "uid", 0) or 0) except Exception: continue user = str(getattr(u, "user", "") or "").strip() examples = list(getattr(u, "examples", []) or []) ex_txt = ", ".join([str(x) for x in examples if str(x).strip()]) label = str(uid) if user: label += f" ({user})" if ex_txt: label += " - " + ex_txt token = f"{uid}-{uid}" tip = ( f"UID: {uid}\n" f"User: {user or '-'}\n" f"Examples: {ex_txt or '-'}\n\n" "EN: UID rules affect host-local processes (OUTPUT).\n" "RU: UID правила влияют на процессы хоста (OUTPUT)." ) items.append((label, token, tip)) def extra(layout: QVBoxLayout) -> None: row = QHBoxLayout() btn_me = QPushButton("Select my UID") btn_me.clicked.connect(lambda: self._preset_select_uids([os.getuid()])) row.addWidget(btn_me) btn_root = QPushButton("Select root UID") btn_root.clicked.connect(lambda: self._preset_select_uids([0])) row.addWidget(btn_root) btn_clear = QPushButton("Clear selection") btn_clear.clicked.connect(lambda: self._preset_clear_selection("UIDs")) row.addWidget(btn_clear) row.addStretch(1) layout.addLayout(row) self._add_tab("UIDs", "uid", items, extra=extra) # --------------------------------------------------------------------- # App picker (.desktop entries) # --------------------------------------------------------------------- def _desktop_bool(v: str) -> bool: return str(v or "").strip().lower() in _DESKTOP_BOOL_TRUE def _desktop_name_from_section(sec: configparser.SectionProxy) -> str: name = str(sec.get("Name", "") or "").strip() if name: return name # Prefer Russian if present, then English, then any localized variant. for key in ("Name[ru]", "Name[en_US]", "Name[en]"): v = str(sec.get(key, "") or "").strip() if v: return v for k, v in sec.items(): kk = str(k or "") if kk.startswith("Name[") and str(v or "").strip(): return str(v).strip() return "" def _sanitize_desktop_exec(exec_raw: str, *, name: str = "", desktop_path: str = "") -> str: raw = str(exec_raw or "").strip() if not raw: return "" # Desktop spec: "%%" -> literal "%". raw = raw.replace("%%", "%") # Best-effort expansion for a couple of common fields. if name: raw = raw.replace("%c", name) if desktop_path: raw = raw.replace("%k", desktop_path) try: tokens = shlex.split(raw) except Exception: tokens = raw.split() out: list[str] = [] for t in tokens: tok = str(t or "").strip() if not tok: continue # Drop standalone field codes (%u, %U, %f, ...). if len(tok) == 2 and tok.startswith("%"): continue # Remove field codes inside tokens (e.g. --name=%c). tok = _DESKTOP_EXEC_FIELD_RE.sub("", tok).strip() if tok: out.append(tok) if not out: return "" return " ".join(shlex.quote(x) for x in out) def _desktop_entry_from_file(path: str, *, source: str) -> Optional[DesktopAppEntry]: p = str(path or "").strip() if not p or not p.endswith(".desktop"): return None cp = configparser.ConfigParser(interpolation=None, strict=False) cp.optionxform = str # keep case try: cp.read(p, encoding="utf-8") except Exception: try: data = pathlib.Path(p).read_bytes() cp.read_string(data.decode("utf-8", errors="replace")) except Exception: return None if "Desktop Entry" not in cp: return None sec = cp["Desktop Entry"] if _desktop_bool(sec.get("Hidden", "")) or _desktop_bool(sec.get("NoDisplay", "")): return None typ = str(sec.get("Type", "") or "").strip().lower() if typ and typ != "application": return None exec_raw = str(sec.get("Exec", "") or "").strip() if not exec_raw: return None name = _desktop_name_from_section(sec) desktop_id = pathlib.Path(p).name if not name: name = desktop_id.replace(".desktop", "") src = str(source or "").strip().lower() or "system" return DesktopAppEntry( desktop_id=desktop_id, name=name, exec_raw=exec_raw, path=p, source=src, ) def _scan_desktop_entries() -> list[DesktopAppEntry]: home = os.path.expanduser("~") dirs: list[tuple[str, str]] = [ (os.path.join(home, ".local/share/applications"), "user"), ("/usr/local/share/applications", "system"), ("/usr/share/applications", "system"), (os.path.join(home, ".local/share/flatpak/exports/share/applications"), "flatpak"), ("/var/lib/flatpak/exports/share/applications", "flatpak"), ("/var/lib/snapd/desktop/applications", "snap"), ] seen: set[tuple[str, str]] = set() out: list[DesktopAppEntry] = [] for d, src in dirs: if not d or not os.path.isdir(d): continue try: paths = sorted(pathlib.Path(d).glob("*.desktop")) except Exception: continue for fp in paths: ent = _desktop_entry_from_file(str(fp), source=src) if ent is None: continue key = (ent.desktop_id, ent.source) if key in seen: continue seen.add(key) out.append(ent) out.sort(key=lambda e: (e.name.lower(), e.source, e.desktop_id.lower())) return out class AppPickerDialog(QDialog): def __init__(self, *, parent=None) -> None: super().__init__(parent) self.setWindowTitle("Pick app (.desktop)") self.resize(860, 640) self._selected_cmd: str = "" self._entries: list[DesktopAppEntry] = _scan_desktop_entries() root = QVBoxLayout(self) note = QLabel( "EN: Pick an installed GUI app from .desktop entries (system + flatpak + snap). " "The command will be filled without %u/%U/%f placeholders.\n" "RU: Выбери приложение из .desktop (system + flatpak + snap). " "Команда будет заполнена без плейсхолдеров %u/%U/%f." ) note.setWordWrap(True) note.setStyleSheet("color: gray;") root.addWidget(note) row = QHBoxLayout() row.addWidget(QLabel("Search")) self.ed_search = QLineEdit() self.ed_search.setPlaceholderText("Type to filter (name / id / exec)...") row.addWidget(self.ed_search, stretch=1) self.lbl_count = QLabel("—") self.lbl_count.setStyleSheet("color: gray;") row.addWidget(self.lbl_count) root.addLayout(row) self.lst = QListWidget() self.lst.setSelectionMode(QAbstractItemView.SingleSelection) root.addWidget(self.lst, stretch=1) self.preview = QPlainTextEdit() self.preview.setReadOnly(True) self.preview.setFixedHeight(180) root.addWidget(self.preview) row_btn = QHBoxLayout() self.btn_use = QPushButton("Use selected") self.btn_use.clicked.connect(self.on_use_selected) row_btn.addWidget(self.btn_use) row_btn.addStretch(1) btn_close = QPushButton("Close") btn_close.clicked.connect(self.reject) row_btn.addWidget(btn_close) root.addLayout(row_btn) self._populate() self.ed_search.textChanged.connect(lambda _t: self._apply_filter()) self.lst.currentItemChanged.connect(lambda _a, _b: self._update_preview()) self.lst.itemDoubleClicked.connect(lambda _it: self.on_use_selected()) QtCore.QTimer.singleShot(0, self._apply_filter) def selected_command(self) -> str: return str(self._selected_cmd or "") def _populate(self) -> None: self.lst.clear() for ent in self._entries: src = ent.source label = f"{ent.name} [{src}] ({ent.desktop_id})" tip = ( f"Name: {ent.name}\n" f"ID: {ent.desktop_id}\n" f"Source: {src}\n" f"Path: {ent.path}\n\n" f"Exec: {ent.exec_raw}" ) it = QListWidgetItem(label) it.setToolTip(tip) it.setData(QtCore.Qt.UserRole, ent) # precomputed search string it.setData( QtCore.Qt.UserRole + 1, (label + "\n" + ent.exec_raw + "\n" + ent.path).lower(), ) self.lst.addItem(it) if self.lst.count() > 0: self.lst.setCurrentRow(0) self._update_preview() def _apply_filter(self) -> None: q = (self.ed_search.text() or "").strip().lower() shown = 0 for i in range(self.lst.count()): it = self.lst.item(i) if not it: continue hay = str(it.data(QtCore.Qt.UserRole + 1) or "") hide = bool(q) and q not in hay it.setHidden(hide) if not hide: shown += 1 self.lbl_count.setText(f"Apps: {shown}/{self.lst.count()}") def _current_entry(self) -> Optional[DesktopAppEntry]: it = self.lst.currentItem() if not it: return None ent = it.data(QtCore.Qt.UserRole) if isinstance(ent, DesktopAppEntry): return ent return None def _update_preview(self) -> None: ent = self._current_entry() if ent is None: self.preview.setPlainText("—") self.btn_use.setEnabled(False) return cmd = _sanitize_desktop_exec(ent.exec_raw, name=ent.name, desktop_path=ent.path) text = ( f"Name: {ent.name}\n" f"ID: {ent.desktop_id}\n" f"Source: {ent.source}\n" f"Path: {ent.path}\n\n" f"Exec (raw):\n{ent.exec_raw}\n\n" f"Command (sanitized):\n{cmd}" ) self.preview.setPlainText(text) self.btn_use.setEnabled(bool(cmd.strip())) def on_use_selected(self) -> None: ent = self._current_entry() if ent is None: return cmd = _sanitize_desktop_exec(ent.exec_raw, name=ent.name, desktop_path=ent.path).strip() if not cmd: QMessageBox.warning(self, "No command", "Selected app has no usable Exec command.") return self._selected_cmd = cmd self.accept() def _proc_read_text(path: str) -> str: try: with open(path, "r", encoding="utf-8", errors="replace") as f: return (f.read() or "").strip() except Exception: return "" def _proc_read_cmdline(pid: int) -> str: p = int(pid or 0) if p <= 0: return "" try: with open(f"/proc/{p}/cmdline", "rb") as f: raw = f.read() or b"" parts = [x for x in raw.split(b"\x00") if x] out = " ".join([x.decode("utf-8", errors="replace") for x in parts]) return out.strip() except Exception: return "" def _proc_read_uid(pid: int) -> int: p = int(pid or 0) if p <= 0: return -1 try: with open(f"/proc/{p}/status", "r", encoding="utf-8", errors="replace") as f: for raw in f: line = (raw or "").strip() if not line.startswith("Uid:"): continue fields = line.split() if len(fields) >= 2: return int(fields[1]) except Exception: return -1 return -1 def _proc_read_exe(pid: int) -> str: p = int(pid or 0) if p <= 0: return "" try: return os.readlink(f"/proc/{p}/exe").strip() except Exception: return "" def _proc_read_cgroup(pid: int) -> str: p = int(pid or 0) if p <= 0: return "" try: with open(f"/proc/{p}/cgroup", "r", encoding="utf-8", errors="replace") as f: for raw in f: line = (raw or "").strip() if line.startswith("0::"): return line[len("0::") :].strip() except Exception: return "" return "" def _username_for_uid(uid: int) -> str: try: return pwd.getpwuid(int(uid)).pw_name except Exception: return str(int(uid)) class ProcessPickerDialog(QDialog): def __init__(self, *, parent=None) -> None: super().__init__(parent) self.setWindowTitle("Pick process (PID)") self.resize(900, 660) self._selected: RunningProcessEntry | None = None root = QVBoxLayout(self) note = QLabel( "EN: Pick a running process to apply a runtime mark by PID (no launch).\n" "RU: Выбери запущенный процесс, чтобы применить runtime-метку по PID (без запуска)." ) note.setWordWrap(True) note.setStyleSheet("color: gray;") root.addWidget(note) row = QHBoxLayout() row.addWidget(QLabel("Search")) self.ed_search = QLineEdit() self.ed_search.setPlaceholderText("Filter (pid / user / name / exe / cmdline / cgroup)...") row.addWidget(self.ed_search, stretch=1) self.chk_user_only = QCheckBox("Only my user") self.chk_user_only.setChecked(True) self.chk_user_only.setToolTip( "EN: Show only processes owned by current user.\n" "RU: Показать только процессы текущего пользователя." ) row.addWidget(self.chk_user_only) self.chk_hide_empty = QCheckBox("Hide empty cmdline") self.chk_hide_empty.setChecked(True) self.chk_hide_empty.setToolTip( "EN: Hide kernel threads / processes without cmdline.\n" "RU: Скрыть kernel threads / процессы без cmdline." ) row.addWidget(self.chk_hide_empty) self.btn_refresh = QPushButton("Refresh") self.btn_refresh.clicked.connect(self.refresh) row.addWidget(self.btn_refresh) self.lbl_count = QLabel("—") self.lbl_count.setStyleSheet("color: gray;") row.addWidget(self.lbl_count) root.addLayout(row) self.lst = QListWidget() self.lst.setSelectionMode(QAbstractItemView.SingleSelection) root.addWidget(self.lst, stretch=1) self.preview = QPlainTextEdit() self.preview.setReadOnly(True) self.preview.setFixedHeight(200) root.addWidget(self.preview) row_btn = QHBoxLayout() self.btn_use = QPushButton("Use selected") self.btn_use.clicked.connect(self.on_use_selected) row_btn.addWidget(self.btn_use) row_btn.addStretch(1) btn_close = QPushButton("Close") btn_close.clicked.connect(self.reject) row_btn.addWidget(btn_close) root.addLayout(row_btn) self.ed_search.textChanged.connect(lambda _t: self._apply_filter()) self.chk_user_only.stateChanged.connect(lambda _s: self.refresh()) self.chk_hide_empty.stateChanged.connect(lambda _s: self.refresh()) self.lst.currentItemChanged.connect(lambda _a, _b: self._update_preview()) self.lst.itemDoubleClicked.connect(lambda _it: self.on_use_selected()) self._set_loading() QtCore.QTimer.singleShot(0, self.refresh) def selected_entry(self) -> RunningProcessEntry | None: return self._selected def _set_loading(self) -> None: self.lbl_count.setText("Loading...") self.lst.clear() self.preview.setPlainText("—") self.btn_use.setEnabled(False) def _scan(self) -> list[RunningProcessEntry]: only_uid = os.getuid() if self.chk_user_only.isChecked() else None hide_empty = bool(self.chk_hide_empty.isChecked()) out: list[RunningProcessEntry] = [] try: names = os.listdir("/proc") except Exception: names = [] pids: list[int] = [] for n in names: if n.isdigit(): try: pids.append(int(n)) except Exception: pass pids.sort() for i, pid in enumerate(pids): uid = _proc_read_uid(pid) if uid < 0: continue if only_uid is not None and uid != int(only_uid): continue comm = _proc_read_text(f"/proc/{pid}/comm") cmdline = _proc_read_cmdline(pid) if hide_empty and not cmdline: continue exe = _proc_read_exe(pid) cg = _proc_read_cgroup(pid) user = _username_for_uid(uid) out.append( RunningProcessEntry( pid=int(pid), uid=int(uid), user=str(user or uid), comm=str(comm or "").strip(), exe=str(exe or "").strip(), cmdline=str(cmdline or "").strip(), cgroup=str(cg or "").strip(), ) ) # Keep UI responsive for large /proc. if (i % 200) == 0: try: QtCore.QCoreApplication.processEvents() except Exception: pass out.sort(key=lambda e: (e.user.lower(), e.comm.lower(), e.pid)) return out def refresh(self) -> None: self._set_loading() entries = self._scan() self.lst.clear() for ent in entries: comm = ent.comm or "(no comm)" exe_base = os.path.basename(ent.exe) if ent.exe else "" label = f"{ent.pid} {ent.user} {comm}" if exe_base and exe_base.lower() not in comm.lower(): label += f" [{exe_base}]" tip = ( f"PID: {ent.pid}\n" f"User: {ent.user} (uid={ent.uid})\n" f"Comm: {ent.comm or '-'}\n" f"Exe: {ent.exe or '-'}\n" f"Cgroup: {ent.cgroup or '-'}\n\n" f"Cmdline:\n{ent.cmdline or '-'}" ) it = QListWidgetItem(label) it.setToolTip(tip) it.setData(QtCore.Qt.UserRole, ent) it.setData( QtCore.Qt.UserRole + 1, (label + "\n" + tip).lower(), ) self.lst.addItem(it) if self.lst.count() > 0: self.lst.setCurrentRow(0) self._apply_filter() self._update_preview() def _apply_filter(self) -> None: q = (self.ed_search.text() or "").strip().lower() shown = 0 for i in range(self.lst.count()): it = self.lst.item(i) if not it: continue hay = str(it.data(QtCore.Qt.UserRole + 1) or "") hide = bool(q) and q not in hay it.setHidden(hide) if not hide: shown += 1 self.lbl_count.setText(f"Processes: {shown}/{self.lst.count()}") self.btn_use.setEnabled(shown > 0 and self._current_entry() is not None) def _current_entry(self) -> RunningProcessEntry | None: it = self.lst.currentItem() if not it: return None ent = it.data(QtCore.Qt.UserRole) if isinstance(ent, RunningProcessEntry): return ent return None def _update_preview(self) -> None: ent = self._current_entry() if ent is None: self.preview.setPlainText("—") self.btn_use.setEnabled(False) return text = ( f"PID: {ent.pid}\n" f"User: {ent.user} (uid={ent.uid})\n" f"Comm: {ent.comm or '-'}\n" f"Exe: {ent.exe or '-'}\n" f"Cgroup: {ent.cgroup or '-'}\n\n" f"Cmdline:\n{ent.cmdline or '-'}" ) self.preview.setPlainText(text) self.btn_use.setEnabled(True) def on_use_selected(self) -> None: ent = self._current_entry() if ent is None: return self._selected = ent self.accept()