Files
elmprodvpn/selective-vpn-gui/traffic_mode_dialog.py

3233 lines
131 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
from __future__ import annotations
import configparser
import os
import pathlib
import re
import shlex
import subprocess
import time
from dataclasses import dataclass
from typing import Callable, Optional
from PySide6 import QtCore, QtGui
from PySide6.QtWidgets import (
QButtonGroup,
QCheckBox,
QComboBox,
QDialog,
QGroupBox,
QHBoxLayout,
QLabel,
QLineEdit,
QListWidget,
QListWidgetItem,
QAbstractItemView,
QMessageBox,
QPlainTextEdit,
QPushButton,
QRadioButton,
QSpinBox,
QTabWidget,
QVBoxLayout,
QWidget,
)
from dashboard_controller import DashboardController
_DESKTOP_BOOL_TRUE = {"1", "true", "yes", "y", "on"}
_DESKTOP_EXEC_FIELD_RE = re.compile(r"%[A-Za-z]")
@dataclass(frozen=True)
class DesktopAppEntry:
desktop_id: str
name: str
exec_raw: str
path: str
source: str # system|flatpak|user
@dataclass(frozen=True)
class RuntimeScopeInfo:
unit: str
target: str # vpn|direct|?
cgroup_path: str
cgroup_id: int
class TrafficModeDialog(QDialog):
def __init__(
self,
controller: DashboardController,
*,
log_cb: Callable[[str], None] | None = None,
refresh_cb: Callable[[], None] | None = None,
parent=None,
) -> None:
super().__init__(parent)
self.ctrl = controller
self.log_cb = log_cb
self.refresh_cb = refresh_cb
self.setWindowTitle("Traffic mode settings")
self.resize(780, 760)
root = QVBoxLayout(self)
# EN: Persist small UI state across dialog sessions.
# RU: Сохраняем небольшой UI state между открытиями окна.
self._settings = QtCore.QSettings("AdGuardVPN", "SelectiveVPNDashboardQt")
self._last_app_unit: str = str(self._settings.value("traffic_app_last_unit", "") or "")
self._last_app_target: str = str(self._settings.value("traffic_app_last_target", "") or "")
self._last_app_key: str = str(self._settings.value("traffic_app_last_key", "") or "")
self._last_app_cmdline: str = str(self._settings.value("traffic_app_last_cmdline", "") or "")
try:
self._last_app_cgroup_id: int = int(self._settings.value("traffic_app_last_cgroup_id", 0) or 0)
except Exception:
self._last_app_cgroup_id = 0
hint_group = QGroupBox("Mode behavior")
hint_layout = QVBoxLayout(hint_group)
hint_layout.addWidget(QLabel("Selective: only marked traffic goes via VPN."))
hint_layout.addWidget(QLabel("Full tunnel: all traffic goes via VPN."))
hint_layout.addWidget(QLabel("Direct: VPN routing rules are disabled."))
warn = QLabel(
"Warning: Full tunnel can break local/LAN access depending on your host routes."
)
warn.setStyleSheet("color: red;")
hint_layout.addWidget(warn)
root.addWidget(hint_group)
tip = QLabel("Tip: hover any control for help. Подсказка: наведи на элемент для описания.")
tip.setWordWrap(True)
tip.setStyleSheet("color: gray;")
root.addWidget(tip)
self.tabs = QTabWidget()
root.addWidget(self.tabs, stretch=1)
tab_basic = QWidget()
tab_basic_layout = QVBoxLayout(tab_basic)
mode_group = QGroupBox("Traffic mode relay")
mode_layout = QVBoxLayout(mode_group)
row_mode = QHBoxLayout()
self.rad_selective = QRadioButton("Selective")
self.rad_selective.setToolTip("""EN: Only marked traffic (fwmark 0x66) uses VPN policy table (agvpn).
RU: Только помеченный трафик (fwmark 0x66) идет через policy-table (agvpn).""")
self.rad_selective.toggled.connect(
lambda checked: self.on_mode_toggle("selective", checked)
)
row_mode.addWidget(self.rad_selective)
self.rad_full = QRadioButton("Full tunnel")
self.rad_full.setToolTip("""EN: All traffic uses VPN policy table (agvpn). Use with auto-local bypass for LAN/docker.
RU: Весь трафик идет через policy-table (agvpn). Для LAN/docker включай auto-local bypass.""")
self.rad_full.toggled.connect(
lambda checked: self.on_mode_toggle("full_tunnel", checked)
)
row_mode.addWidget(self.rad_full)
self.rad_direct = QRadioButton("Direct")
self.rad_direct.setToolTip("""EN: Disables base VPN routing rules (no full/selective rule).
RU: Отключает базовые VPN policy-rules (нет full/selective правила).""")
self.rad_direct.toggled.connect(
lambda checked: self.on_mode_toggle("direct", checked)
)
row_mode.addWidget(self.rad_direct)
row_mode.addStretch(1)
mode_layout.addLayout(row_mode)
row_iface = QHBoxLayout()
row_iface.addWidget(QLabel("Preferred iface"))
self.cmb_iface = QComboBox()
self.cmb_iface.setToolTip("""EN: VPN interface for policy routing. Use auto unless you know the exact iface.
RU: Интерфейс VPN для policy routing. Оставь auto, если не уверен.""")
self.cmb_iface.setEditable(True)
self.cmb_iface.setInsertPolicy(QComboBox.NoInsert)
self.cmb_iface.setMinimumWidth(180)
row_iface.addWidget(self.cmb_iface)
self.btn_refresh_ifaces = QPushButton("Detect ifaces")
self.btn_refresh_ifaces.setToolTip("""EN: Refresh list of available interfaces (UP).
RU: Обновить список доступных интерфейсов (UP).""")
self.btn_refresh_ifaces.clicked.connect(self.on_refresh_ifaces)
row_iface.addWidget(self.btn_refresh_ifaces)
row_iface.addStretch(1)
mode_layout.addLayout(row_iface)
self.chk_auto_local = QCheckBox("Auto-local bypass (LAN/container subnets)")
self.chk_auto_local.setToolTip("""EN: Mirrors local/LAN/docker routes from main into agvpn table to prevent breakage in full tunnel.
EN: This does NOT force containers to use direct internet; use Force Direct subnets for that.
RU: Копирует локальные/LAN/docker маршруты из main в agvpn, чтобы не ломалась локалка в full tunnel.
RU: Это НЕ делает контейнеры direct в интернет; для этого используй Force Direct subnets.""")
self.chk_auto_local.stateChanged.connect(lambda _state: self.on_auto_local_toggle())
mode_layout.addWidget(self.chk_auto_local)
self.lbl_state = QLabel("Traffic mode: —")
self.lbl_state.setStyleSheet("color: gray;")
mode_layout.addWidget(self.lbl_state)
self.lbl_diag = QLabel("")
self.lbl_diag.setStyleSheet("color: gray;")
mode_layout.addWidget(self.lbl_diag)
tab_basic_layout.addWidget(mode_group)
maint_group = QGroupBox("Rollback / cache")
maint_layout = QHBoxLayout(maint_group)
self.btn_rollback = QPushButton("Clear routes (save cache)")
self.btn_rollback.setToolTip("""EN: Clears VPN routes and nft sets, but saves a cache snapshot for restore.
RU: Очищает VPN маршруты и nft-сеты, но сохраняет снапшот для восстановления.""")
self.btn_rollback.clicked.connect(self.on_rollback)
maint_layout.addWidget(self.btn_rollback)
self.btn_restore_cache = QPushButton("Restore cached routes")
self.btn_restore_cache.setToolTip("""EN: Restores routes/nft from the last clear snapshot. Skips non-critical route restore errors.
RU: Восстанавливает маршруты/nft из последнего снапшота clear. Некритичные ошибки восстановления пропускаются.""")
self.btn_restore_cache.clicked.connect(self.on_restore_cache)
maint_layout.addWidget(self.btn_restore_cache)
maint_layout.addStretch(1)
tab_basic_layout.addWidget(maint_group)
tab_basic_layout.addStretch(1)
self.tabs.addTab(tab_basic, "Traffic basics")
# -----------------------------------------------------------------
# Apps (runtime): systemd --user unit + backend appmarks
# -----------------------------------------------------------------
tab_apps = QWidget()
tab_apps_layout = QVBoxLayout(tab_apps)
apps_hint = QLabel(
"Runtime per-app routing (Wayland-friendly):\n"
"- Launch uses systemd-run --user (transient unit).\n"
"- Backend adds the unit cgroup into nftset -> fwmark rules.\n"
"- Marks are temporary (TTL). Use Policy overrides for persistent policy."
)
apps_hint.setWordWrap(True)
apps_hint.setStyleSheet("color: gray;")
tab_apps_layout.addWidget(apps_hint)
# EN: Split this UX into subtabs to fit smaller screens (no giant vertical stack).
# RU: Разбиваем на под-вкладки, чтобы помещалось на маленьких экранах.
self.apps_tabs = QTabWidget()
tab_apps_layout.addWidget(self.apps_tabs, stretch=1)
profiles_group = QGroupBox("Added apps (profiles)")
profiles_layout = QVBoxLayout(profiles_group)
profiles_hint = QLabel(
"Persistent launch configs (saved):\n"
"- These describe what to run and how to route it.\n"
"- Separate from runtime marks/units (which are tied to a specific cgroup)."
)
profiles_hint.setWordWrap(True)
profiles_hint.setStyleSheet("color: gray;")
profiles_layout.addWidget(profiles_hint)
row_prof_name = QHBoxLayout()
row_prof_name.addWidget(QLabel("Name"))
self.ed_app_profile_name = QLineEdit()
self.ed_app_profile_name.setPlaceholderText(
"optional (default: basename of app)"
)
self.ed_app_profile_name.setToolTip(
"EN: Optional profile name (for display). Leave empty to auto-name.\n"
"RU: Необязательное имя профиля (для отображения). Можно оставить пустым."
)
row_prof_name.addWidget(self.ed_app_profile_name, stretch=1)
self.btn_app_profiles_refresh = QPushButton("Refresh profiles")
self.btn_app_profiles_refresh.setToolTip(
"EN: Reload saved app profiles from backend.\n"
"RU: Обновить список сохранённых профилей из backend."
)
self.btn_app_profiles_refresh.clicked.connect(self.refresh_app_profiles)
row_prof_name.addWidget(self.btn_app_profiles_refresh)
profiles_layout.addLayout(row_prof_name)
row_prof_btn = QHBoxLayout()
self.btn_app_profile_save = QPushButton("Save / update profile")
self.btn_app_profile_save.setToolTip(
"EN: Save current command/route/TTL as a persistent profile (upsert).\n"
"RU: Сохранить текущую команду/маршрут/TTL как постоянный профиль (upsert)."
)
self.btn_app_profile_save.clicked.connect(self.on_app_profile_save)
row_prof_btn.addWidget(self.btn_app_profile_save)
self.btn_app_profile_load = QPushButton("Load to form")
self.btn_app_profile_load.setToolTip(
"EN: Load selected profile into the form (command/route/TTL).\n"
"RU: Загрузить выбранный профиль в форму (команда/маршрут/TTL)."
)
self.btn_app_profile_load.clicked.connect(self.on_app_profile_load)
row_prof_btn.addWidget(self.btn_app_profile_load)
self.btn_app_profile_run = QPushButton("Run profile")
self.btn_app_profile_run.setToolTip(
"EN: Launch selected profile via systemd-run --user and apply routing mark.\n"
"RU: Запустить выбранный профиль через systemd-run --user и применить метку маршрутизации."
)
self.btn_app_profile_run.clicked.connect(self.on_app_profile_run)
row_prof_btn.addWidget(self.btn_app_profile_run)
self.btn_app_profile_delete = QPushButton("Delete profile")
self.btn_app_profile_delete.setToolTip(
"EN: Delete selected saved profile.\n"
"RU: Удалить выбранный сохранённый профиль."
)
self.btn_app_profile_delete.clicked.connect(self.on_app_profile_delete)
row_prof_btn.addWidget(self.btn_app_profile_delete)
row_prof_btn.addStretch(1)
profiles_layout.addLayout(row_prof_btn)
row_prof_shortcuts = QHBoxLayout()
self.btn_app_profile_shortcut_create = QPushButton("Create shortcut")
self.btn_app_profile_shortcut_create.setToolTip(
"EN: Create/overwrite a .desktop shortcut for the selected profile.\n"
"EN: The shortcut will launch the app and apply routing mark automatically.\n"
"RU: Создать/перезаписать .desktop ярлык для выбранного профиля.\n"
"RU: Ярлык запускает приложение и автоматически применяет routing mark."
)
self.btn_app_profile_shortcut_create.clicked.connect(self.on_app_profile_shortcut_create)
row_prof_shortcuts.addWidget(self.btn_app_profile_shortcut_create)
self.btn_app_profile_shortcut_remove = QPushButton("Remove shortcut")
self.btn_app_profile_shortcut_remove.setToolTip(
"EN: Remove the .desktop shortcut for the selected profile.\n"
"RU: Удалить .desktop ярлык для выбранного профиля."
)
self.btn_app_profile_shortcut_remove.clicked.connect(self.on_app_profile_shortcut_remove)
row_prof_shortcuts.addWidget(self.btn_app_profile_shortcut_remove)
row_prof_shortcuts.addStretch(1)
profiles_layout.addLayout(row_prof_shortcuts)
self.lst_app_profiles = QListWidget()
self.lst_app_profiles.setSelectionMode(QAbstractItemView.SingleSelection)
self.lst_app_profiles.setToolTip(
"EN: Saved app profiles. Double click loads into the form.\n"
"RU: Сохранённые профили приложений. Двойной клик загружает в форму."
)
self.lst_app_profiles.itemDoubleClicked.connect(
lambda _it: self.on_app_profile_load()
)
self.lst_app_profiles.currentItemChanged.connect(
lambda _cur, _prev: self._update_profile_shortcut_ui()
)
self.lst_app_profiles.setFixedHeight(140)
profiles_layout.addWidget(self.lst_app_profiles)
self.lbl_app_profiles = QLabel("Saved profiles: —")
self.lbl_app_profiles.setStyleSheet("color: gray;")
profiles_layout.addWidget(self.lbl_app_profiles)
self.lbl_profile_shortcut = QLabel("Shortcut: —")
self.lbl_profile_shortcut.setWordWrap(True)
self.lbl_profile_shortcut.setStyleSheet("color: gray;")
profiles_layout.addWidget(self.lbl_profile_shortcut)
tab_profiles = QWidget()
tab_profiles_layout = QVBoxLayout(tab_profiles)
tab_profiles_layout.addWidget(profiles_group)
tab_profiles_layout.addStretch(1)
self.apps_tabs.addTab(tab_profiles, "Profiles")
run_group = QGroupBox("Run app (systemd unit) + apply mark")
run_layout = QVBoxLayout(run_group)
row_cmd = QHBoxLayout()
row_cmd.addWidget(QLabel("Command"))
self.ed_app_cmd = QLineEdit()
self.ed_app_cmd.setPlaceholderText(
"e.g. firefox --private-window https://example.com"
)
self.ed_app_cmd.setToolTip(
"EN: Command line to run. This runs as current user via systemd --user.\n"
"RU: Команда запуска. Запускается от текущего пользователя через systemd --user."
)
row_cmd.addWidget(self.ed_app_cmd, stretch=1)
self.btn_app_pick = QPushButton("Pick app...")
self.btn_app_pick.setToolTip(
"EN: Pick an installed app from .desktop entries (system + flatpak + snap) and fill the command.\n"
"RU: Выбрать установленное приложение из .desktop (system + flatpak + snap) и заполнить команду."
)
self.btn_app_pick.clicked.connect(self.on_app_pick)
row_cmd.addWidget(self.btn_app_pick)
run_layout.addLayout(row_cmd)
row_target = QHBoxLayout()
row_target.addWidget(QLabel("Route via"))
self.rad_app_vpn = QRadioButton("VPN")
self.rad_app_vpn.setToolTip(
"EN: Force this app traffic via VPN policy table (agvpn).\n"
"RU: Форсировать трафик приложения через VPN policy-table (agvpn)."
)
self.rad_app_direct = QRadioButton("Direct")
self.rad_app_direct.setToolTip(
"EN: Force this app traffic to bypass VPN (lookup main), even in full tunnel.\n"
"RU: Форсировать трафик приложения мимо VPN (lookup main), даже в full tunnel."
)
bg_app = QButtonGroup(self)
bg_app.addButton(self.rad_app_vpn)
bg_app.addButton(self.rad_app_direct)
self.rad_app_vpn.setChecked(True)
row_target.addWidget(self.rad_app_vpn)
row_target.addWidget(self.rad_app_direct)
row_target.addStretch(1)
run_layout.addLayout(row_target)
row_ttl = QHBoxLayout()
row_ttl.addWidget(QLabel("TTL (hours)"))
self.spn_app_ttl = QSpinBox()
self.spn_app_ttl.setRange(1, 24 * 30) # up to ~30 days
self.spn_app_ttl.setValue(24)
self.spn_app_ttl.setToolTip(
"EN: How long the runtime mark stays active (backend nftset element timeout).\n"
"RU: Сколько живет runtime-метка (timeout элемента в nftset)."
)
row_ttl.addWidget(self.spn_app_ttl)
row_ttl.addStretch(1)
run_layout.addLayout(row_ttl)
row_btn = QHBoxLayout()
self.btn_app_run = QPushButton("Run + apply mark")
self.btn_app_run.clicked.connect(self.on_app_run)
row_btn.addWidget(self.btn_app_run)
self.btn_app_refresh = QPushButton("Refresh counts")
self.btn_app_refresh.clicked.connect(self.refresh_appmarks_counts)
row_btn.addWidget(self.btn_app_refresh)
row_btn.addStretch(1)
run_layout.addLayout(row_btn)
row_btn2 = QHBoxLayout()
self.btn_app_clear_vpn = QPushButton("Clear VPN marks")
self.btn_app_clear_vpn.clicked.connect(lambda: self.on_appmarks_clear("vpn"))
row_btn2.addWidget(self.btn_app_clear_vpn)
self.btn_app_clear_direct = QPushButton("Clear Direct marks")
self.btn_app_clear_direct.clicked.connect(
lambda: self.on_appmarks_clear("direct")
)
row_btn2.addWidget(self.btn_app_clear_direct)
self.btn_app_stop_last = QPushButton("Stop last scope")
self.btn_app_stop_last.setToolTip(
"EN: Stops the last launched systemd --user scope.\n"
"RU: Останавливает последний запущенный systemd --user scope."
)
self.btn_app_stop_last.clicked.connect(self.on_app_stop_last_scope)
row_btn2.addWidget(self.btn_app_stop_last)
self.btn_app_unmark_last = QPushButton("Unmark last")
self.btn_app_unmark_last.setToolTip(
"EN: Removes routing mark for the last launched scope (by cgroup id).\n"
"RU: Удаляет метку маршрутизации для последнего scope (по cgroup id)."
)
self.btn_app_unmark_last.clicked.connect(self.on_app_unmark_last)
row_btn2.addWidget(self.btn_app_unmark_last)
row_btn2.addStretch(1)
run_layout.addLayout(row_btn2)
self.lbl_app_counts = QLabel("Marks: —")
self.lbl_app_counts.setStyleSheet("color: gray;")
run_layout.addWidget(self.lbl_app_counts)
self.lbl_app_last = QLabel("Last scope: —")
self.lbl_app_last.setStyleSheet("color: gray;")
run_layout.addWidget(self.lbl_app_last)
self._refresh_last_scope_ui()
tab_run = QWidget()
tab_run_layout = QVBoxLayout(tab_run)
tab_run_layout.addWidget(run_group)
tab_run_layout.addStretch(1)
self.apps_tabs.addTab(tab_run, "Run")
marks_group = QGroupBox("Active runtime marks (TTL)")
marks_layout = QVBoxLayout(marks_group)
marks_row = QHBoxLayout()
self.btn_marks_refresh = QPushButton("Refresh marks")
self.btn_marks_refresh.setToolTip(
"EN: Reload active runtime marks from backend (prunes expired).\n"
"RU: Обновить активные runtime-метки из backend (просроченные удаляются)."
)
self.btn_marks_refresh.clicked.connect(self.refresh_appmarks_items)
marks_row.addWidget(self.btn_marks_refresh)
self.btn_marks_unmark = QPushButton("Unmark selected")
self.btn_marks_unmark.setToolTip(
"EN: Remove routing marks for selected items (does not necessarily stop the app).\n"
"RU: Удалить метки маршрутизации для выбранных элементов (не обязательно останавливает приложение)."
)
self.btn_marks_unmark.clicked.connect(self.on_appmarks_unmark_selected)
marks_row.addWidget(self.btn_marks_unmark)
marks_row.addStretch(1)
marks_layout.addLayout(marks_row)
self.lst_marks = QListWidget()
self.lst_marks.setSelectionMode(QAbstractItemView.ExtendedSelection)
self.lst_marks.setToolTip(
"EN: Active runtime marks. Stored by backend with TTL.\n"
"RU: Активные runtime-метки. Хранятся backend с TTL."
)
self.lst_marks.setFixedHeight(140)
marks_layout.addWidget(self.lst_marks)
self.lbl_marks = QLabel("Active marks: —")
self.lbl_marks.setStyleSheet("color: gray;")
marks_layout.addWidget(self.lbl_marks)
tab_marks = QWidget()
tab_marks_layout = QVBoxLayout(tab_marks)
tab_marks_layout.addWidget(marks_group)
tab_marks_layout.addStretch(1)
self.apps_tabs.addTab(tab_marks, "Marks")
scopes_group = QGroupBox("Active svpn units (systemd --user)")
scopes_layout = QVBoxLayout(scopes_group)
scopes_row = QHBoxLayout()
self.btn_scopes_refresh = QPushButton("Refresh units")
self.btn_scopes_refresh.setToolTip(
"EN: Refresh list of running svpn-* units (.service/.scope).\n"
"RU: Обновить список запущенных svpn-* unit (.service/.scope)."
)
self.btn_scopes_refresh.clicked.connect(self.refresh_running_scopes)
scopes_row.addWidget(self.btn_scopes_refresh)
self.btn_scopes_stop_selected = QPushButton("Stop selected")
self.btn_scopes_stop_selected.setToolTip(
"EN: Unmarks + stops selected units.\n"
"RU: Удаляет метки + останавливает выбранные unit."
)
self.btn_scopes_stop_selected.clicked.connect(self.on_scopes_stop_selected)
scopes_row.addWidget(self.btn_scopes_stop_selected)
self.btn_scopes_cleanup = QPushButton("Cleanup all svpn units")
self.btn_scopes_cleanup.setToolTip(
"EN: Unmarks + stops ALL running svpn-* units.\n"
"RU: Удаляет метки + останавливает ВСЕ запущенные svpn-* unit."
)
self.btn_scopes_cleanup.clicked.connect(self.on_scopes_cleanup_all)
scopes_row.addWidget(self.btn_scopes_cleanup)
scopes_row.addStretch(1)
scopes_layout.addLayout(scopes_row)
self.lst_scopes = QListWidget()
self.lst_scopes.setSelectionMode(QAbstractItemView.ExtendedSelection)
self.lst_scopes.setToolTip(
"EN: Running svpn units. Double click to copy unit name.\n"
"RU: Запущенные svpn unit. Двойной клик копирует имя unit."
)
self.lst_scopes.itemDoubleClicked.connect(lambda it: self._copy_scope_unit(it))
self.lst_scopes.setFixedHeight(140)
scopes_layout.addWidget(self.lst_scopes)
self.lbl_scopes = QLabel("Running units: —")
self.lbl_scopes.setStyleSheet("color: gray;")
scopes_layout.addWidget(self.lbl_scopes)
tab_units = QWidget()
tab_units_layout = QVBoxLayout(tab_units)
tab_units_layout.addWidget(scopes_group)
tab_units_layout.addStretch(1)
self.apps_tabs.addTab(tab_units, "Units")
self.txt_app = QPlainTextEdit()
self.txt_app.setReadOnly(True)
tab_log = QWidget()
tab_log_layout = QVBoxLayout(tab_log)
tab_log_layout.addWidget(self.txt_app, stretch=1)
self.apps_tabs.addTab(tab_log, "Log")
self.tabs.addTab(tab_apps, "Apps (runtime)")
tab_adv = QWidget()
tab_adv_layout = QVBoxLayout(tab_adv)
self.ed_vpn_subnets = QPlainTextEdit()
self.ed_vpn_subnets.setToolTip("""EN: Force VPN by source subnet. Useful for docker subnets when you want containers via VPN.
RU: Принудительно через VPN по source subnet. Полезно для docker-подсетей, если хочешь контейнеры через VPN.""")
self.ed_vpn_subnets.setPlaceholderText("Force VPN by source subnet, one per line (e.g. 172.18.0.0/16)")
self.ed_vpn_subnets.setFixedHeight(72)
self.ed_vpn_uids = QPlainTextEdit()
self.ed_vpn_uids.setToolTip("""EN: Force VPN by UID/uidrange (host OUTPUT only). Does not affect forwarded docker traffic.
RU: Принудительно через VPN по UID (только процессы хоста). На forwarded docker-трафик не влияет.""")
self.ed_vpn_uids.setPlaceholderText("Force VPN by UID/UID range, one per line (e.g. 1000 or 1000-1010)")
self.ed_vpn_uids.setFixedHeight(60)
self.ed_vpn_cgroups = QPlainTextEdit()
self.ed_vpn_cgroups.setToolTip("""EN: Force VPN by systemd cgroup. Backend resolves cgroup -> PIDs -> UID rules at apply time.
RU: Принудительно через VPN по cgroup (systemd). Backend резолвит cgroup -> PID -> UID при применении.""")
self.ed_vpn_cgroups.setPlaceholderText("Force VPN by cgroup path/name, one per line")
self.ed_vpn_cgroups.setFixedHeight(60)
self.ed_direct_subnets = QPlainTextEdit()
self.ed_direct_subnets.setToolTip("""EN: Force Direct by source subnet. Useful to keep docker subnets direct in full tunnel.
RU: Принудительно direct по source subnet. Полезно, чтобы docker-подсети были direct в full tunnel.""")
self.ed_direct_subnets.setPlaceholderText("Force Direct by source subnet, one per line")
self.ed_direct_subnets.setFixedHeight(72)
self.ed_direct_uids = QPlainTextEdit()
self.ed_direct_uids.setToolTip("""EN: Force Direct by UID/uidrange (host OUTPUT only).
RU: Принудительно direct по UID (только процессы хоста).""")
self.ed_direct_uids.setPlaceholderText("Force Direct by UID/UID range, one per line")
self.ed_direct_uids.setFixedHeight(60)
self.ed_direct_cgroups = QPlainTextEdit()
self.ed_direct_cgroups.setToolTip("""EN: Force Direct by systemd cgroup (resolved to UID rules at apply time).
RU: Принудительно direct по cgroup (резолвится в UID правила при применении).""")
self.ed_direct_cgroups.setPlaceholderText("Force Direct by cgroup path/name, one per line")
self.ed_direct_cgroups.setFixedHeight(60)
cols = QHBoxLayout()
vpn_group = QGroupBox("Force VPN")
vpn_layout = QVBoxLayout(vpn_group)
vpn_layout.addWidget(QLabel("Source subnets"))
vpn_layout.addWidget(self.ed_vpn_subnets)
vpn_layout.addWidget(QLabel("UIDs"))
vpn_layout.addWidget(self.ed_vpn_uids)
vpn_layout.addWidget(QLabel("Cgroups / services"))
vpn_layout.addWidget(self.ed_vpn_cgroups)
cols.addWidget(vpn_group, stretch=1)
direct_group = QGroupBox("Force Direct")
direct_layout = QVBoxLayout(direct_group)
direct_layout.addWidget(QLabel("Source subnets"))
direct_layout.addWidget(self.ed_direct_subnets)
direct_layout.addWidget(QLabel("UIDs"))
direct_layout.addWidget(self.ed_direct_uids)
direct_layout.addWidget(QLabel("Cgroups / services"))
direct_layout.addWidget(self.ed_direct_cgroups)
cols.addWidget(direct_group, stretch=1)
tab_adv_layout.addLayout(cols, stretch=1)
row_adv = QHBoxLayout()
self.btn_pick_detected = QPushButton("Add detected...")
self.btn_pick_detected.setToolTip("""EN: Opens a selector with detected subnets/services/UIDs. Only fills fields; nothing is applied automatically.
RU: Открывает список обнаруженных subnet/service/UID. Только заполняет поля; ничего не применяется автоматически.""")
self.btn_pick_detected.clicked.connect(self.on_pick_detected)
row_adv.addWidget(self.btn_pick_detected)
self.btn_apply_overrides = QPushButton("Apply overrides")
self.btn_apply_overrides.setToolTip("""EN: Applies policy rules and verifies health. On failure backend rolls back.
RU: Применяет policy-rules и проверяет health. При ошибке backend делает откат.""")
self.btn_apply_overrides.clicked.connect(self.on_apply_overrides)
row_adv.addWidget(self.btn_apply_overrides)
self.btn_reload_overrides = QPushButton("Reload overrides")
self.btn_reload_overrides.clicked.connect(self.refresh_state)
row_adv.addWidget(self.btn_reload_overrides)
row_adv.addStretch(1)
tab_adv_layout.addLayout(row_adv)
self.tabs.addTab(tab_adv, "Policy overrides (Advanced)")
# EN: Small status line for last action performed in this dialog.
# RU: Строка статуса последнего действия в этом окне.
self.lbl_action = QLabel("")
self.lbl_action.setWordWrap(True)
self.lbl_action.setStyleSheet("color: gray;")
root.addWidget(self.lbl_action)
row_bottom = QHBoxLayout()
row_bottom.addStretch(1)
btn_close = QPushButton("Close")
btn_close.clicked.connect(self.accept)
row_bottom.addWidget(btn_close)
root.addLayout(row_bottom)
QtCore.QTimer.singleShot(0, self.refresh_state)
QtCore.QTimer.singleShot(0, self.refresh_app_profiles)
QtCore.QTimer.singleShot(0, self.refresh_appmarks_counts)
QtCore.QTimer.singleShot(0, self.refresh_appmarks_items)
QtCore.QTimer.singleShot(0, self.refresh_running_scopes)
# EN: Auto-refresh runtime marks/units while dialog is open.
# RU: Авто-обновление runtime меток/юнитов пока окно открыто.
self._runtime_auto_timer = QtCore.QTimer(self)
self._runtime_auto_timer.setInterval(5000)
self._runtime_auto_timer.timeout.connect(self._auto_refresh_runtime)
self._runtime_auto_timer.start()
def _auto_refresh_runtime(self) -> None:
# Keep this quiet: no modal popups.
self.refresh_appmarks_counts()
try:
# Only refresh units list when Apps(runtime) tab is visible.
if int(self.tabs.currentIndex() or 0) == 1:
self.refresh_appmarks_items(quiet=True)
self.refresh_running_scopes(quiet=True)
except Exception:
pass
def _is_operation_error(self, message: str) -> bool:
low = (message or "").strip().lower()
return ("rolled back" in low) or ("apply failed" in low) or ("verification failed" in low)
def _set_action_status(self, msg: str, ok: bool | None = None) -> None:
text = (msg or "").strip() or ""
self.lbl_action.setText(text)
if ok is True:
self.lbl_action.setStyleSheet("color: green;")
elif ok is False:
self.lbl_action.setStyleSheet("color: red;")
else:
self.lbl_action.setStyleSheet("color: gray;")
def _safe(self, fn, *, title: str = "Traffic mode error") -> None:
try:
fn()
except Exception as e:
msg = f"[ui-error] {title}: {e}"
self._set_action_status(msg, ok=False)
self._emit_log(msg)
QMessageBox.critical(self, title, str(e))
def _emit_log(self, msg: str) -> None:
text = (msg or "").strip()
if not text:
return
if self.log_cb:
self.log_cb(text)
else:
try:
self.ctrl.log_gui(text)
except Exception:
pass
def _preferred_iface_value(self) -> str:
raw = self.cmb_iface.currentText().strip()
if raw.lower() in ("", "auto", "-", "default"):
return ""
return raw
def _set_preferred_iface_options(self, ifaces: list[str], selected: str) -> None:
vals = ["auto"] + [x for x in ifaces if x]
sel = selected.strip() if selected else "auto"
if not sel:
sel = "auto"
if sel not in vals:
vals.append(sel)
self.cmb_iface.blockSignals(True)
self.cmb_iface.clear()
self.cmb_iface.addItems(vals)
idx = self.cmb_iface.findText(sel)
if idx < 0:
idx = self.cmb_iface.findText("auto")
if idx >= 0:
self.cmb_iface.setCurrentIndex(idx)
else:
self.cmb_iface.setEditText(sel)
self.cmb_iface.blockSignals(False)
def _lines_from_text(self, txt: str) -> list[str]:
out: list[str] = []
for raw in (txt or "").replace("\r", "\n").split("\n"):
line = raw.strip()
if line:
out.append(line)
return out
def _set_lines(self, widget: QPlainTextEdit, vals: list[str]) -> None:
widget.blockSignals(True)
widget.setPlainText("\n".join([x for x in vals if str(x).strip()]))
widget.blockSignals(False)
def _merge_lines(self, widget: QPlainTextEdit, vals: list[str]) -> int:
cur = self._lines_from_text(widget.toPlainText())
seen = {x.strip() for x in cur}
added = 0
for v in (vals or []):
vv = str(v).strip()
if not vv or vv in seen:
continue
cur.append(vv)
seen.add(vv)
added += 1
if added > 0:
self._set_lines(widget, cur)
return added
def _candidates_add(self, target: str, kind: str, values: list[str]) -> None:
tgt = (target or "").strip().lower()
k = (kind or "").strip().lower()
if tgt not in ("vpn", "direct"):
return
widget: QPlainTextEdit | None = None
if tgt == "vpn":
if k == "subnet":
widget = self.ed_vpn_subnets
elif k == "uid":
widget = self.ed_vpn_uids
elif k == "cgroup":
widget = self.ed_vpn_cgroups
else:
if k == "subnet":
widget = self.ed_direct_subnets
elif k == "uid":
widget = self.ed_direct_uids
elif k == "cgroup":
widget = self.ed_direct_cgroups
if widget is None:
return
added = self._merge_lines(widget, values or [])
if added > 0:
msg = f"Traffic candidates added: target={tgt} kind={k} added={added}"
self._set_action_status(msg, ok=True)
self._emit_log(msg)
else:
msg = f"Traffic candidates add: nothing new (target={tgt} kind={k})"
self._set_action_status(msg, ok=None)
self._emit_log(msg)
def on_pick_detected(self) -> None:
def work() -> None:
cands = self.ctrl.traffic_candidates()
existing = {
"vpn": {
"subnet": set(self._lines_from_text(self.ed_vpn_subnets.toPlainText())),
"uid": set(self._lines_from_text(self.ed_vpn_uids.toPlainText())),
"cgroup": set(self._lines_from_text(self.ed_vpn_cgroups.toPlainText())),
},
"direct": {
"subnet": set(self._lines_from_text(self.ed_direct_subnets.toPlainText())),
"uid": set(self._lines_from_text(self.ed_direct_uids.toPlainText())),
"cgroup": set(self._lines_from_text(self.ed_direct_cgroups.toPlainText())),
},
}
dlg = TrafficCandidatesDialog(
cands,
existing=existing,
add_cb=self._candidates_add,
parent=self,
)
dlg.exec()
self._safe(work, title="Traffic candidates error")
def _set_mode_state(
self,
desired_mode: str,
applied_mode: str,
preferred_iface: str,
auto_local_bypass: bool,
bypass_candidates: int,
overrides_applied: int,
cgroup_resolved_uids: int,
cgroup_warning: str,
healthy: bool,
probe_ok: bool,
probe_message: str,
active_iface: str,
iface_reason: str,
message: str,
) -> None:
desired = (desired_mode or "").strip().lower() or "selective"
applied = (applied_mode or "").strip().lower() or "direct"
if healthy:
color = "green"
health_txt = "OK"
else:
color = "red"
health_txt = "MISMATCH"
text = f"Traffic mode: {desired} (applied: {applied}) [{health_txt}]"
diag_parts = []
diag_parts.append(f"preferred={preferred_iface or 'auto'}")
diag_parts.append(
f"auto_local_bypass={'on' if auto_local_bypass else 'off'}"
)
if bypass_candidates > 0:
diag_parts.append(f"bypass_routes={bypass_candidates}")
diag_parts.append(f"overrides={overrides_applied}")
if cgroup_resolved_uids > 0:
diag_parts.append(f"cgroup_uids={cgroup_resolved_uids}")
if cgroup_warning:
diag_parts.append(f"cgroup_warning={cgroup_warning}")
if active_iface:
diag_parts.append(f"iface={active_iface}")
if iface_reason:
diag_parts.append(f"source={iface_reason}")
diag_parts.append(f"probe={'ok' if probe_ok else 'fail'}")
if probe_message:
diag_parts.append(probe_message)
if message:
diag_parts.append(message)
diag = " | ".join(diag_parts) if diag_parts else ""
self.lbl_state.setText(text)
self.lbl_state.setStyleSheet(f"color: {color};")
self.lbl_diag.setText(diag)
self.lbl_diag.setStyleSheet("color: gray;")
def refresh_state(self) -> None:
def work() -> None:
view = self.ctrl.traffic_mode_view()
mode = (view.desired_mode or "selective").strip().lower()
self.rad_selective.blockSignals(True)
self.rad_full.blockSignals(True)
self.rad_direct.blockSignals(True)
self.rad_selective.setChecked(mode == "selective")
self.rad_full.setChecked(mode == "full_tunnel")
self.rad_direct.setChecked(mode == "direct")
self.rad_selective.blockSignals(False)
self.rad_full.blockSignals(False)
self.rad_direct.blockSignals(False)
opts = self.ctrl.traffic_interfaces()
self._set_preferred_iface_options(opts, view.preferred_iface)
self.chk_auto_local.blockSignals(True)
self.chk_auto_local.setChecked(bool(view.auto_local_bypass))
self.chk_auto_local.blockSignals(False)
self._set_lines(self.ed_vpn_subnets, list(view.force_vpn_subnets or []))
self._set_lines(self.ed_vpn_uids, list(view.force_vpn_uids or []))
self._set_lines(self.ed_vpn_cgroups, list(view.force_vpn_cgroups or []))
self._set_lines(self.ed_direct_subnets, list(view.force_direct_subnets or []))
self._set_lines(self.ed_direct_uids, list(view.force_direct_uids or []))
self._set_lines(self.ed_direct_cgroups, list(view.force_direct_cgroups or []))
self._set_mode_state(
view.desired_mode,
view.applied_mode,
view.preferred_iface,
bool(view.auto_local_bypass),
int(view.bypass_candidates),
int(view.overrides_applied),
int(view.cgroup_resolved_uids),
view.cgroup_warning,
bool(view.healthy),
bool(view.probe_ok),
view.probe_message,
view.active_iface,
view.iface_reason,
view.message,
)
self._safe(work)
def on_mode_toggle(self, mode: str, checked: bool) -> None:
if not checked:
return
def work() -> None:
preferred = self._preferred_iface_value()
auto_local = self.chk_auto_local.isChecked()
view = self.ctrl.traffic_mode_set(mode, preferred, auto_local)
msg = (
f"Traffic mode set: desired={view.desired_mode}, "
f"applied={view.applied_mode}, iface={view.active_iface or '-'}, "
f"preferred={preferred or 'auto'}, probe_ok={view.probe_ok}, "
f"healthy={view.healthy}, auto_local_bypass={view.auto_local_bypass}, "
f"bypass_routes={view.bypass_candidates}, overrides={view.overrides_applied}, "
f"cgroup_uids={view.cgroup_resolved_uids}, message={view.message}"
)
self._emit_log(msg)
op_ok = bool(view.healthy) and not self._is_operation_error(view.message)
self._set_action_status(
f"Traffic mode set: desired={view.desired_mode} applied={view.applied_mode} message={view.message}",
ok=op_ok,
)
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work)
def on_refresh_ifaces(self) -> None:
def work() -> None:
view = self.ctrl.traffic_mode_view()
opts = self.ctrl.traffic_interfaces()
self._set_preferred_iface_options(opts, view.preferred_iface)
self._emit_log(
"Traffic ifaces refreshed: "
f"preferred={view.preferred_iface or 'auto'} "
f"active={view.active_iface or '-'}"
)
self._set_action_status("Traffic ifaces refreshed", ok=True)
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work, title="Traffic iface detect error")
def _selected_mode(self) -> str:
if self.rad_full.isChecked():
return "full_tunnel"
if self.rad_direct.isChecked():
return "direct"
return "selective"
def on_auto_local_toggle(self) -> None:
def work() -> None:
mode = self._selected_mode()
preferred = self._preferred_iface_value()
auto_local = self.chk_auto_local.isChecked()
view = self.ctrl.traffic_mode_set(mode, preferred, auto_local)
msg = (
f"Traffic auto-local set: mode={view.desired_mode}, "
f"auto_local_bypass={view.auto_local_bypass}, "
f"bypass_routes={view.bypass_candidates}, overrides={view.overrides_applied}, "
f"cgroup_uids={view.cgroup_resolved_uids}, message={view.message}"
)
self._emit_log(msg)
op_ok = bool(view.healthy) and not self._is_operation_error(view.message)
self._set_action_status(
f"Auto-local bypass set: {'on' if view.auto_local_bypass else 'off'} ({view.message})",
ok=op_ok,
)
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work, title="Auto-local bypass error")
def on_apply_overrides(self) -> None:
def work() -> None:
mode = self._selected_mode()
preferred = self._preferred_iface_value()
auto_local = self.chk_auto_local.isChecked()
vpn_subnets = self._lines_from_text(self.ed_vpn_subnets.toPlainText())
vpn_uids = self._lines_from_text(self.ed_vpn_uids.toPlainText())
vpn_cgroups = self._lines_from_text(self.ed_vpn_cgroups.toPlainText())
direct_subnets = self._lines_from_text(self.ed_direct_subnets.toPlainText())
direct_uids = self._lines_from_text(self.ed_direct_uids.toPlainText())
direct_cgroups = self._lines_from_text(self.ed_direct_cgroups.toPlainText())
view = self.ctrl.traffic_mode_set(
mode,
preferred,
auto_local,
vpn_subnets,
vpn_uids,
vpn_cgroups,
direct_subnets,
direct_uids,
direct_cgroups,
)
msg = (
f"Traffic overrides applied: mode={view.desired_mode}, "
f"vpn_subnets={len(view.force_vpn_subnets)}, vpn_uids={len(view.force_vpn_uids)}, vpn_cgroups={len(view.force_vpn_cgroups)}, "
f"direct_subnets={len(view.force_direct_subnets)}, direct_uids={len(view.force_direct_uids)}, direct_cgroups={len(view.force_direct_cgroups)}, "
f"overrides={view.overrides_applied}, cgroup_uids={view.cgroup_resolved_uids}, "
f"healthy={view.healthy}, message={view.message}"
)
self._emit_log(msg)
op_ok = bool(view.healthy) and not self._is_operation_error(view.message)
self._set_action_status(
f"Overrides applied: overrides={view.overrides_applied} message={view.message}",
ok=op_ok,
)
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work, title="Apply overrides error")
# -----------------------------------------------------------------
# Apps (runtime) tab
# -----------------------------------------------------------------
def _append_app_log(self, msg: str) -> None:
line = (msg or "").rstrip()
if not line:
return
try:
self.txt_app.appendPlainText(line)
except Exception:
pass
self._emit_log(line)
def _infer_app_key_from_cmdline(self, cmdline: str) -> str:
cmd = (cmdline or "").strip()
if not cmd:
return ""
try:
args = shlex.split(cmd)
if args:
return str(args[0] or "").strip()
except Exception:
pass
# Fallback: first token
return (cmd.split() or [""])[0].strip()
def _launch_and_mark(
self,
*,
cmdline: str,
target: str,
ttl_sec: int,
app_key: str = "",
) -> None:
cmdline = (cmdline or "").strip()
if not cmdline:
raise ValueError("empty command")
tgt = (target or "").strip().lower()
if tgt not in ("vpn", "direct"):
raise ValueError("invalid target")
ttl = int(ttl_sec or 0)
if ttl <= 0:
ttl = int(self.spn_app_ttl.value()) * 3600
key = (app_key or "").strip() or self._infer_app_key_from_cmdline(cmdline)
# EN: If we already have a running unit for the same app_key+target, refresh mark instead of spawning.
# RU: Если уже есть запущенный unit для того же app_key+target — обновляем метку, не плодим инстансы.
try:
items = list(self.ctrl.traffic_appmarks_items() or [])
except Exception:
items = []
for it in items:
if (getattr(it, "target", "") or "").strip().lower() != tgt:
continue
if (getattr(it, "app_key", "") or "").strip() != key:
continue
unit = (getattr(it, "unit", "") or "").strip()
if not unit:
continue
code, out = self._systemctl_user(["is-active", unit])
if code == 0 and (out or "").strip().lower() == "active":
cg = self._effective_cgroup_for_unit_retry(unit, timeout_sec=3.0)
self._append_app_log(
f"[profile] already running: app={key} target={tgt} unit={unit} (refreshing mark)"
)
res = self.ctrl.traffic_appmarks_apply(
op="add",
target=tgt,
cgroup=cg,
unit=unit,
command=cmdline,
app_key=key,
timeout_sec=ttl,
)
if not res.ok:
raise RuntimeError(res.message or "appmark refresh failed")
self._set_action_status(
f"App mark refreshed: target={tgt} cgroup_id={res.cgroup_id}",
ok=True,
)
self._set_last_scope(
unit=unit,
target=tgt,
app_key=key,
cmdline=cmdline,
cgroup_id=int(res.cgroup_id or 0),
)
self.refresh_appmarks_items(quiet=True)
self.refresh_appmarks_counts()
self.refresh_running_scopes(quiet=True)
return
unit = f"svpn-{tgt}-{int(time.time())}.service"
self._append_app_log(f"[profile] launching: app={key or '-'} target={tgt} ttl={ttl}s unit={unit}")
cg, out = self._run_systemd_unit(cmdline, unit=unit)
if out:
self._append_app_log(f"[profile] systemd-run:\n{out}")
self._append_app_log(f"[profile] ControlGroup: {cg}")
self._set_last_scope(unit=unit, target=tgt, app_key=key, cmdline=cmdline, cgroup_id=0)
res = self.ctrl.traffic_appmarks_apply(
op="add",
target=tgt,
cgroup=cg,
unit=unit,
command=cmdline,
app_key=key,
timeout_sec=ttl,
)
if not res.ok:
raise RuntimeError(res.message or "appmark apply failed")
self._append_app_log(f"[appmarks] OK: {res.message} cgroup_id={res.cgroup_id} timeout={res.timeout_sec}s")
self._set_action_status(f"App mark added: target={tgt} cgroup_id={res.cgroup_id}", ok=True)
self._set_last_scope(
unit=unit,
target=tgt,
app_key=key,
cmdline=cmdline,
cgroup_id=int(res.cgroup_id or 0),
)
self.refresh_appmarks_items(quiet=True)
self.refresh_appmarks_counts()
self.refresh_running_scopes(quiet=True)
def _selected_app_profile(self):
it = self.lst_app_profiles.currentItem()
if not it:
return None
return it.data(QtCore.Qt.UserRole)
def _profile_shortcuts_dir(self) -> str:
# ~/.local/share/applications is the standard per-user location.
return os.path.join(os.path.expanduser("~"), ".local", "share", "applications")
def _profile_shortcut_path(self, profile_id: str) -> str:
pid = (profile_id or "").strip()
if not pid:
return ""
safe = re.sub(r"[^A-Za-z0-9._-]+", "-", pid).strip("-")
if not safe:
safe = "profile"
return os.path.join(self._profile_shortcuts_dir(), f"svpn-profile-{safe}.desktop")
def _profile_shortcut_exists(self, profile_id: str) -> bool:
p = self._profile_shortcut_path(profile_id)
return bool(p) and os.path.isfile(p)
def _render_profile_shortcut(self, prof) -> str:
pid = (getattr(prof, "id", "") or "").strip()
name = (getattr(prof, "name", "") or "").strip() or pid or "SVPN profile"
target = (getattr(prof, "target", "") or "").strip().lower()
if target not in ("vpn", "direct"):
target = "vpn"
label_target = "VPN" if target == "vpn" else "Direct"
script = os.path.abspath(os.path.join(os.path.dirname(__file__), "svpn_run_profile.py"))
# Use env python3 so the shortcut works even if python3 is not /usr/bin/python3.
exec_line = f"/usr/bin/env python3 {script} --id {pid}"
# Keep .desktop content ASCII-ish. Values are UTF-8-safe by spec, but avoid surprises.
name_safe = (name or "SVPN profile").replace("\n", " ").replace("\r", " ").strip()
return (
"[Desktop Entry]\n"
"Version=1.0\n"
"Type=Application\n"
f"Name=SVPN: {name_safe} [{label_target}]\n"
f"Comment=Selective VPN: run traffic profile id={pid} target={target}\n"
f"Exec={exec_line}\n"
"Terminal=false\n"
"Categories=Network;\n"
f"X-SVPN-ProfileID={pid}\n"
f"X-SVPN-Target={target}\n"
)
def _update_profile_shortcut_ui(self) -> None:
prof = self._selected_app_profile()
if prof is None:
self.btn_app_profile_shortcut_create.setEnabled(False)
self.btn_app_profile_shortcut_remove.setEnabled(False)
self.lbl_profile_shortcut.setText("Shortcut: —")
return
pid = (getattr(prof, "id", "") or "").strip()
path = self._profile_shortcut_path(pid)
installed = self._profile_shortcut_exists(pid)
self.btn_app_profile_shortcut_create.setEnabled(True)
self.btn_app_profile_shortcut_remove.setEnabled(installed)
state = "installed" if installed else "not installed"
self.lbl_profile_shortcut.setText(f"Shortcut: {state} ({path})")
def refresh_app_profiles(self, quiet: bool = False) -> None:
def work() -> None:
profs = list(self.ctrl.traffic_app_profiles_list() or [])
self.lst_app_profiles.clear()
# Best-effort runtime context for UI flags (MARK/RUN).
try:
mark_items = list(self.ctrl.traffic_appmarks_items() or [])
except Exception:
mark_items = []
marks_by_key: dict[tuple[str, str], list] = {}
unit_active: dict[str, bool] = {}
for it in mark_items:
tgt = (getattr(it, "target", "") or "").strip().lower()
key = (getattr(it, "app_key", "") or "").strip()
if tgt not in ("vpn", "direct") or not key:
continue
marks_by_key.setdefault((tgt, key), []).append(it)
unit = (getattr(it, "unit", "") or "").strip()
if unit and unit not in unit_active:
code, out = self._systemctl_user(["is-active", unit])
unit_active[unit] = bool(code == 0 and (out or "").strip().lower() == "active")
shortcuts = 0
with_marks = 0
running = 0
for p in profs:
# p is a UI-friendly dataclass from ApiClient.
name = (getattr(p, "name", "") or "").strip()
pid = (getattr(p, "id", "") or "").strip()
target = (getattr(p, "target", "") or "").strip().lower()
app_key = (getattr(p, "app_key", "") or "").strip()
cmd = (getattr(p, "command", "") or "").strip()
ttl_sec = int(getattr(p, "ttl_sec", 0) or 0)
label = name or pid or "(unnamed)"
if target in ("vpn", "direct"):
label += f" [{target}]"
flags: list[str] = []
sc_path = self._profile_shortcut_path(pid)
has_shortcut = bool(sc_path and os.path.isfile(sc_path))
if has_shortcut:
flags.append("SC")
shortcuts += 1
mkey = (target, app_key)
items = marks_by_key.get(mkey) or []
if items:
flags.append("MARK")
with_marks += 1
run_units: list[str] = []
for it in items:
unit = (getattr(it, "unit", "") or "").strip()
if not unit:
continue
if unit_active.get(unit, False):
run_units.append(unit)
if run_units:
flags.append("RUN")
running += 1
if flags:
label += " [" + ",".join(flags) + "]"
it = QListWidgetItem(label)
sc_state = "yes" if has_shortcut else "no"
unit_txt = ", ".join(run_units[:3])
if len(run_units) > 3:
unit_txt += f", +{len(run_units) - 3}"
it.setToolTip(
(
f"id: {pid}\n"
f"app_key: {app_key}\n"
f"target: {target}\n"
f"ttl: {ttl_sec}s\n\n"
f"shortcut: {sc_state}\n"
f"shortcut_path: {sc_path}\n\n"
f"runtime_marks: {len(items)}\n"
f"running_units: {len(run_units)}\n"
f"units: {unit_txt or '-'}\n\n"
f"{cmd}"
).strip()
)
it.setData(QtCore.Qt.UserRole, p)
self.lst_app_profiles.addItem(it)
self.lbl_app_profiles.setText(
f"Saved profiles: {len(profs)} (shortcut={shortcuts}, mark={with_marks}, run={running})"
)
self._update_profile_shortcut_ui()
if quiet:
try:
work()
except Exception as e:
self.lbl_app_profiles.setText(f"Saved profiles: error: {e}")
return
self._safe(work, title="Refresh profiles error")
def on_app_profile_save(self) -> None:
def work() -> None:
cmdline = (self.ed_app_cmd.text() or "").strip()
if not cmdline:
QMessageBox.warning(self, "Missing command", "Please enter a command first.")
return
target = "vpn" if self.rad_app_vpn.isChecked() else "direct"
ttl_sec = int(self.spn_app_ttl.value()) * 3600
name = (self.ed_app_profile_name.text() or "").strip()
app_key = self._infer_app_key_from_cmdline(cmdline)
res = self.ctrl.traffic_app_profile_upsert(
name=name,
app_key=app_key,
command=cmdline,
target=target,
ttl_sec=ttl_sec,
)
if not res.ok:
self._set_action_status(f"Save profile failed: {res.message}", ok=False)
raise RuntimeError(res.message or "save failed")
prof = getattr(res, "profile", None)
pid = (getattr(prof, "id", "") or "").strip() if prof is not None else ""
self._set_action_status(f"Profile saved: {pid or '(ok)'}", ok=True)
self._append_app_log(f"[profile] saved: id={pid or '-'} target={target} app={app_key or '-'}")
self.refresh_app_profiles(quiet=True)
# If shortcut exists already, rewrite it to reflect updated name/target.
if prof is not None and pid and self._profile_shortcut_exists(pid):
try:
sc_path = self._profile_shortcut_path(pid)
os.makedirs(os.path.dirname(sc_path), exist_ok=True)
with open(sc_path, "w", encoding="utf-8", errors="replace") as f:
f.write(self._render_profile_shortcut(prof))
self._append_app_log(f"[shortcut] updated: {sc_path}")
except Exception as e:
# Non-fatal: profile save is more important than shortcut rewrite.
self._append_app_log(f"[shortcut] update failed: {e}")
# Best-effort select newly saved profile.
if pid:
for i in range(self.lst_app_profiles.count()):
it = self.lst_app_profiles.item(i)
if not it:
continue
p = it.data(QtCore.Qt.UserRole)
if (getattr(p, "id", "") or "").strip() == pid:
self.lst_app_profiles.setCurrentRow(i)
break
self._safe(work, title="Save profile error")
def on_app_profile_shortcut_create(self) -> None:
prof = self._selected_app_profile()
if prof is None:
return
pid = (getattr(prof, "id", "") or "").strip()
if not pid:
return
def work() -> None:
path = self._profile_shortcut_path(pid)
if not path:
raise RuntimeError("cannot derive shortcut path")
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8", errors="replace") as f:
f.write(self._render_profile_shortcut(prof))
self._append_app_log(f"[shortcut] saved: {path} id={pid}")
self._set_action_status(f"Shortcut saved: {os.path.basename(path)}", ok=True)
self._update_profile_shortcut_ui()
self._safe(work, title="Create shortcut error")
def on_app_profile_shortcut_remove(self) -> None:
prof = self._selected_app_profile()
if prof is None:
return
pid = (getattr(prof, "id", "") or "").strip()
if not pid:
return
def work() -> None:
path = self._profile_shortcut_path(pid)
if not path:
raise RuntimeError("cannot derive shortcut path")
if not os.path.exists(path):
self._set_action_status("Shortcut not installed", ok=True)
self._update_profile_shortcut_ui()
return
try:
os.remove(path)
except FileNotFoundError:
pass
self._append_app_log(f"[shortcut] removed: {path} id={pid}")
self._set_action_status("Shortcut removed", ok=True)
self._update_profile_shortcut_ui()
self._safe(work, title="Remove shortcut error")
def on_app_profile_load(self) -> None:
prof = self._selected_app_profile()
if prof is None:
return
def work() -> None:
cmd = (getattr(prof, "command", "") or "").strip()
target = (getattr(prof, "target", "") or "").strip().lower()
ttl_sec = int(getattr(prof, "ttl_sec", 0) or 0)
name = (getattr(prof, "name", "") or "").strip()
if cmd:
self.ed_app_cmd.setText(cmd)
if target == "direct":
self.rad_app_direct.setChecked(True)
else:
self.rad_app_vpn.setChecked(True)
if ttl_sec > 0:
# UI uses hours; round up.
hours = max(1, (ttl_sec + 3599) // 3600)
self.spn_app_ttl.setValue(int(hours))
self.ed_app_profile_name.setText(name)
self._set_action_status("Profile loaded into form", ok=True)
self._safe(work, title="Load profile error")
def on_app_profile_run(self) -> None:
prof = self._selected_app_profile()
if prof is None:
return
def work() -> None:
cmd = (getattr(prof, "command", "") or "").strip()
target = (getattr(prof, "target", "") or "").strip().lower()
ttl = int(getattr(prof, "ttl_sec", 0) or 0)
app_key = (getattr(prof, "app_key", "") or "").strip()
if not cmd:
raise RuntimeError("profile has empty command")
if target not in ("vpn", "direct"):
target = "vpn"
self._launch_and_mark(cmdline=cmd, target=target, ttl_sec=ttl, app_key=app_key)
self._safe(work, title="Run profile error")
def on_app_profile_delete(self) -> None:
prof = self._selected_app_profile()
if prof is None:
return
pid = (getattr(prof, "id", "") or "").strip()
if not pid:
return
def work() -> None:
if QMessageBox.question(
self,
"Delete profile",
f"Delete saved profile?\n\nid={pid}",
) != QMessageBox.StandardButton.Yes:
return
res = self.ctrl.traffic_app_profile_delete(pid)
if not res.ok:
self._set_action_status(f"Delete profile failed: {res.message}", ok=False)
raise RuntimeError(res.message or "delete failed")
# Keep a tight coupling: deleting profile removes its shortcut too.
sc_path = self._profile_shortcut_path(pid)
if sc_path and os.path.exists(sc_path):
try:
os.remove(sc_path)
self._append_app_log(f"[shortcut] removed: {sc_path} (profile deleted)")
except Exception as e:
self._append_app_log(f"[shortcut] remove failed: {e}")
self._set_action_status(f"Profile deleted: {pid}", ok=True)
self._append_app_log(f"[profile] deleted: id={pid}")
self.refresh_app_profiles(quiet=True)
self._safe(work, title="Delete profile error")
def refresh_appmarks_counts(self) -> None:
try:
st = self.ctrl.traffic_appmarks_status()
self.lbl_app_counts.setText(
f"Marks: VPN={st.vpn_count}, Direct={st.direct_count}"
)
except Exception as e:
self.lbl_app_counts.setText(f"Marks: error: {e}")
def refresh_appmarks_items(self, quiet: bool = False) -> None:
def work() -> None:
items = list(self.ctrl.traffic_appmarks_items() or [])
self.lst_marks.clear()
vpn = 0
direct = 0
for it in items:
tgt = (getattr(it, "target", "") or "").strip().lower()
if tgt == "vpn":
vpn += 1
elif tgt == "direct":
direct += 1
mid = int(getattr(it, "id", 0) or 0)
app_key = (getattr(it, "app_key", "") or "").strip()
unit = (getattr(it, "unit", "") or "").strip()
cmd = (getattr(it, "command", "") or "").strip()
rem = int(getattr(it, "remaining_sec", 0) or 0)
rem_h = rem // 3600
rem_m = (rem % 3600) // 60
rem_s = rem % 60
rem_txt = f"{rem_h:02d}:{rem_m:02d}:{rem_s:02d}"
label = f"{tgt} {app_key or unit or mid} (ttl {rem_txt})"
q = QListWidgetItem(label)
q.setToolTip(
(
f"id: {mid}\n"
f"target: {tgt}\n"
f"app_key: {app_key}\n"
f"unit: {unit}\n"
f"remaining: {rem}s\n\n"
f"{cmd}"
).strip()
)
q.setData(QtCore.Qt.UserRole, it)
self.lst_marks.addItem(q)
self.lbl_marks.setText(f"Active marks: {len(items)} (VPN={vpn}, Direct={direct})")
self.btn_marks_unmark.setEnabled(self.lst_marks.count() > 0)
if quiet:
try:
work()
except Exception as e:
self.lbl_marks.setText(f"Active marks: error: {e}")
return
self._safe(work, title="Refresh marks error")
def on_appmarks_unmark_selected(self) -> None:
sel = list(self.lst_marks.selectedItems() or [])
if not sel:
return
# Convert selection to (target,id).
pairs: list[tuple[str, int]] = []
for it in sel:
obj = it.data(QtCore.Qt.UserRole)
tgt = (getattr(obj, "target", "") or "").strip().lower()
mid = int(getattr(obj, "id", 0) or 0)
if tgt in ("vpn", "direct") and mid > 0:
pairs.append((tgt, mid))
if not pairs:
return
def work() -> None:
if QMessageBox.question(
self,
"Unmark selected",
"Remove routing marks for selected items?\n\n"
+ "\n".join([f"{t}:{i}" for (t, i) in pairs[:20]])
+ ("\n..." if len(pairs) > 20 else ""),
) != QMessageBox.StandardButton.Yes:
return
for (tgt, mid) in pairs:
res = self.ctrl.traffic_appmarks_apply(op="del", target=tgt, cgroup=str(mid))
if not res.ok:
raise RuntimeError(res.message or f"unmark failed: {tgt}:{mid}")
self._set_action_status(f"Unmarked: {len(pairs)} item(s)", ok=True)
self.refresh_appmarks_items(quiet=True)
self.refresh_appmarks_counts()
self.refresh_app_profiles(quiet=True)
self._safe(work, title="Unmark selected error")
def _refresh_last_scope_ui(self) -> None:
unit = (self._last_app_unit or "").strip()
target = (self._last_app_target or "").strip().lower()
app_key = (self._last_app_key or "").strip()
cg_id = int(self._last_app_cgroup_id or 0)
parts = []
if unit:
parts.append(f"unit={unit}")
if target in ("vpn", "direct"):
parts.append(f"target={target}")
if app_key:
parts.append(f"app={app_key}")
if cg_id > 0:
parts.append(f"cgroup_id={cg_id}")
if parts:
self.lbl_app_last.setText("Last scope: " + " | ".join(parts))
else:
self.lbl_app_last.setText("Last scope: —")
self.btn_app_stop_last.setEnabled(bool(unit))
self.btn_app_unmark_last.setEnabled(bool(unit) and target in ("vpn", "direct") and cg_id > 0)
def _set_last_scope(
self,
*,
unit: str = "",
target: str = "",
app_key: str = "",
cmdline: str = "",
cgroup_id: int = 0,
) -> None:
self._last_app_unit = str(unit or "").strip()
self._last_app_target = str(target or "").strip().lower()
self._last_app_key = str(app_key or "").strip()
self._last_app_cmdline = str(cmdline or "").strip()
try:
self._last_app_cgroup_id = int(cgroup_id or 0)
except Exception:
self._last_app_cgroup_id = 0
self._settings.setValue("traffic_app_last_unit", self._last_app_unit)
self._settings.setValue("traffic_app_last_target", self._last_app_target)
self._settings.setValue("traffic_app_last_key", self._last_app_key)
self._settings.setValue("traffic_app_last_cmdline", self._last_app_cmdline)
self._settings.setValue("traffic_app_last_cgroup_id", int(self._last_app_cgroup_id or 0))
self._refresh_last_scope_ui()
def on_app_pick(self) -> None:
def work() -> None:
dlg = AppPickerDialog(parent=self)
if dlg.exec() != QDialog.Accepted:
return
cmd = (dlg.selected_command() or "").strip()
if not cmd:
return
self.ed_app_cmd.setText(cmd)
self._append_app_log(f"[picker] command filled: {cmd}")
self._safe(work, title="App picker error")
def _run_systemd_unit(self, cmdline: str, *, unit: str) -> tuple[str, str]:
args = shlex.split(cmdline or "")
if not args:
raise ValueError("empty command")
run_cmd = [
"systemd-run",
"--user",
"--unit",
unit,
"--collect",
"--same-dir",
] + args
try:
p = subprocess.run(
run_cmd,
capture_output=True,
text=True,
check=False,
timeout=6,
)
except subprocess.TimeoutExpired as e:
raise RuntimeError(
"systemd-run timed out (UI would freeze without this guard). "
"Try again or verify that systemd --user is responsive.\n\n"
f"command: {' '.join(run_cmd)}"
) from e
out = ((p.stdout or "") + (p.stderr or "")).strip()
if p.returncode != 0:
raise RuntimeError(f"systemd-run failed: {p.returncode}\n{out}".strip())
# EN: Some apps can be migrated into a different app scope by the desktop/session
# EN: integration. Using unit ControlGroup is then incorrect. Prefer reading the
# EN: effective cgroup from the unit MainPID (/proc/<pid>/cgroup) and fall back
# EN: to systemctl ControlGroup only if needed.
# RU: Некоторые приложения могут мигрировать в другой app scope (интеграция
# RU: с desktop/session). Тогда ControlGroup юнита неверен. Предпочитаем читать
# RU: реальный cgroup по MainPID (/proc/<pid>/cgroup) и только потом fallback
# RU: на systemctl ControlGroup.
cg = self._effective_cgroup_for_unit_retry(unit, timeout_sec=3.0)
return cg, out
def _effective_cgroup_for_unit_retry(self, unit: str, *, timeout_sec: float = 2.0) -> str:
u = (unit or "").strip()
if not u:
raise ValueError("empty unit")
deadline = time.time() + max(0.2, float(timeout_sec or 0))
last_pid_out = ""
while time.time() < deadline:
code, out = self._systemctl_user(["show", "-p", "MainPID", "--value", u])
last_pid_out = out or ""
if code == 0:
try:
pid = int((out or "").strip() or "0")
except Exception:
pid = 0
if pid > 0:
cg = self._cgroup_path_from_pid(pid)
if cg:
return cg
low = (out or "").lower()
if "could not be found" in low or "not found" in low:
break
time.sleep(0.1)
# Fallback: unit ControlGroup (may be wrong for migrated apps).
try:
cg = self._control_group_for_unit_retry(u, timeout_sec=1.0)
if cg:
return cg
except Exception:
pass
raise RuntimeError(
(
"failed to query effective cgroup\n"
+ (last_pid_out.strip() or "(no output)")
+ "\n\n"
+ "EN: Could not resolve unit MainPID->/proc/<pid>/cgroup.\n"
+ "RU: Не удалось получить MainPID и прочитать /proc/<pid>/cgroup."
).strip()
)
def _cgroup_path_from_pid(self, pid: int) -> str:
p = int(pid or 0)
if p <= 0:
return ""
try:
with open(f"/proc/{p}/cgroup", "r", encoding="utf-8", errors="replace") as f:
for raw in f:
line = (raw or "").strip()
if not line:
continue
if line.startswith("0::"):
cg = line[len("0::") :].strip()
return cg
except Exception:
return ""
return ""
def _control_group_for_unit_retry(self, unit: str, *, timeout_sec: float = 2.0) -> str:
u = (unit or "").strip()
if not u:
raise ValueError("empty unit")
deadline = time.time() + max(0.2, float(timeout_sec or 0))
last_out = ""
while time.time() < deadline:
code, out = self._systemctl_user(["show", "-p", "ControlGroup", "--value", u])
last_out = out or ""
if code == 0:
cg = (out or "").strip()
if cg:
return cg
low = (out or "").lower()
if "could not be found" in low or "not found" in low or "loaded units listed" in low:
break
time.sleep(0.1)
# Provide a more actionable error for users.
code_s, out_s = self._systemctl_user(["status", u, "--no-pager", "--plain"])
status_txt = (out_s or "").strip()
hint = (
"EN: Scope unit may have exited immediately. If the app is already running, "
"this launch may not create a new process tree inside the scope.\n"
"EN: Try closing the app полностью and запуск again from here.\n"
"RU: Scope мог завершиться сразу. Если приложение уже запущено, повторный запуск "
"может не создать новый процесс внутри scope.\n"
"RU: Попробуй полностью закрыть приложение и запустить снова отсюда."
)
raise RuntimeError(
(
"failed to query ControlGroup\n"
+ (last_out.strip() or "(no output)")
+ ("\n\nstatus:\n" + status_txt if status_txt else "")
+ "\n\n"
+ hint
).strip()
)
def on_app_run(self) -> None:
def work() -> None:
cmdline = (self.ed_app_cmd.text() or "").strip()
if not cmdline:
QMessageBox.warning(self, "Missing command", "Please enter a command to run.")
return
app_key = ""
try:
args = shlex.split(cmdline or "")
if args:
app_key = str(args[0] or "").strip()
except Exception:
app_key = ""
if not app_key:
# Fallback: best-effort first token.
app_key = (cmdline.split() or [""])[0].strip()
target = "vpn" if self.rad_app_vpn.isChecked() else "direct"
ttl_sec = int(self.spn_app_ttl.value()) * 3600
# EN: If the app is already running inside the last svpn unit, don't spawn a new instance.
# RU: Если приложение уже запущено в последнем svpn unit, не запускаем второй экземпляр.
last_unit = (self._last_app_unit or "").strip()
last_target = (self._last_app_target or "").strip().lower()
last_key = (self._last_app_key or "").strip()
if last_unit and last_target == target and last_key and last_key == app_key:
code, out = self._systemctl_user(["is-active", last_unit])
if code == 0 and (out or "").strip().lower() == "active":
cg = self._query_control_group_for_unit(last_unit)
self._append_app_log(
f"[app] already running: app={app_key} target={target} unit={last_unit} (refreshing mark)"
)
res = self.ctrl.traffic_appmarks_apply(
op="add",
target=target,
cgroup=cg,
unit=last_unit,
command=cmdline,
app_key=app_key,
timeout_sec=ttl_sec,
)
if res.ok:
self._append_app_log(
f"[appmarks] OK: {res.message} cgroup_id={res.cgroup_id} timeout={res.timeout_sec}s"
)
self._set_action_status(
f"App mark refreshed: target={target} cgroup_id={res.cgroup_id}",
ok=True,
)
self._set_last_scope(
unit=last_unit,
target=target,
app_key=app_key,
cmdline=cmdline,
cgroup_id=int(res.cgroup_id or 0),
)
else:
self._append_app_log(f"[appmarks] ERROR: {res.message}")
self._set_action_status(
f"App mark refresh failed: target={target} ({res.message})",
ok=False,
)
QMessageBox.critical(self, "App mark error", res.message or "unknown error")
self.refresh_appmarks_counts()
self.refresh_running_scopes()
self.refresh_app_profiles(quiet=True)
return
unit = f"svpn-{target}-{int(time.time())}.service"
self._append_app_log(
f"[app] launching: app={app_key or '-'} target={target} ttl={ttl_sec}s unit={unit}"
)
cg, out = self._run_systemd_unit(cmdline, unit=unit)
if out:
self._append_app_log(f"[app] systemd-run:\n{out}")
self._append_app_log(f"[app] ControlGroup: {cg}")
self._set_last_scope(unit=unit, target=target, app_key=app_key, cmdline=cmdline, cgroup_id=0)
res = self.ctrl.traffic_appmarks_apply(
op="add",
target=target,
cgroup=cg,
unit=unit,
command=cmdline,
app_key=app_key,
timeout_sec=ttl_sec,
)
if res.ok:
self._append_app_log(
f"[appmarks] OK: {res.message} cgroup_id={res.cgroup_id} timeout={res.timeout_sec}s"
)
self._set_action_status(
f"App mark added: target={target} cgroup_id={res.cgroup_id}",
ok=True,
)
self._set_last_scope(
unit=unit,
target=target,
app_key=app_key,
cmdline=cmdline,
cgroup_id=int(res.cgroup_id or 0),
)
else:
self._append_app_log(f"[appmarks] ERROR: {res.message}")
self._set_action_status(
f"App mark failed: target={target} ({res.message})",
ok=False,
)
low = (res.message or "").lower()
if "cgroupv2 path fails" in low or "no such file or directory" in low:
QMessageBox.critical(
self,
"App mark error",
(res.message or "unknown error")
+ "\n\n"
+ "EN: This usually means the app didn't stay inside the new systemd unit "
+ "(often because it was already running). Close the app completely and run again from here.\n"
+ "RU: Обычно это значит, что приложение не осталось в новом systemd unit "
+ "(часто потому что оно уже было запущено). Полностью закрой приложение и запусти снова отсюда.",
)
else:
QMessageBox.critical(
self,
"App mark error",
res.message or "unknown error",
)
self.refresh_appmarks_counts()
self.refresh_running_scopes()
self.refresh_app_profiles(quiet=True)
self._safe(work, title="Run app error")
def on_app_stop_last_scope(self) -> None:
unit = (self._last_app_unit or "").strip()
if not unit:
return
def work() -> None:
self._append_app_log(f"[app] stop last scope: unit={unit}")
p = subprocess.run(
["systemctl", "--user", "stop", unit],
capture_output=True,
text=True,
check=False,
)
out = ((p.stdout or "") + (p.stderr or "")).strip()
if p.returncode == 0:
self._append_app_log("[app] stopped OK")
if out:
self._append_app_log(out)
self._set_action_status(f"Stopped scope: {unit}", ok=True)
self.refresh_running_scopes(quiet=True)
self.refresh_app_profiles(quiet=True)
return
self._append_app_log(f"[app] stop failed: rc={p.returncode}")
if out:
self._append_app_log(out)
# fallback: kill
p2 = subprocess.run(
["systemctl", "--user", "kill", unit],
capture_output=True,
text=True,
check=False,
)
out2 = ((p2.stdout or "") + (p2.stderr or "")).strip()
if p2.returncode == 0:
self._append_app_log("[app] kill OK (fallback)")
if out2:
self._append_app_log(out2)
self._set_action_status(f"Killed scope: {unit}", ok=True)
self.refresh_running_scopes(quiet=True)
self.refresh_app_profiles(quiet=True)
return
self._append_app_log(f"[app] kill failed: rc={p2.returncode}")
if out2:
self._append_app_log(out2)
self._set_action_status(f"Stop scope failed: {unit}", ok=False)
QMessageBox.critical(self, "Stop scope error", out2 or out or "stop failed")
self._safe(work, title="Stop scope error")
def on_app_unmark_last(self) -> None:
unit = (self._last_app_unit or "").strip()
target = (self._last_app_target or "").strip().lower()
cg_id = int(self._last_app_cgroup_id or 0)
if not unit or target not in ("vpn", "direct") or cg_id <= 0:
return
def work() -> None:
if QMessageBox.question(
self,
"Unmark last",
f"Remove routing mark for last scope?\n\nunit={unit}\ntarget={target}\ncgroup_id={cg_id}",
) != QMessageBox.StandardButton.Yes:
return
res = self.ctrl.traffic_appmarks_apply(
op="del",
target=target,
cgroup=str(cg_id),
)
if res.ok:
self._append_app_log(f"[appmarks] unmarked: target={target} cgroup_id={cg_id}")
self._set_action_status(f"Unmarked last: target={target} cgroup_id={cg_id}", ok=True)
else:
self._append_app_log(f"[appmarks] unmark error: {res.message}")
self._set_action_status(f"Unmark last failed: {res.message}", ok=False)
QMessageBox.critical(self, "Unmark error", res.message or "unmark failed")
self.refresh_appmarks_counts()
self.refresh_app_profiles(quiet=True)
self._safe(work, title="Unmark error")
def on_appmarks_clear(self, target: str) -> None:
tgt = (target or "").strip().lower()
if tgt not in ("vpn", "direct"):
return
def work() -> None:
if QMessageBox.question(
self,
"Clear marks",
f"Clear ALL runtime app marks for target={tgt}?",
) != QMessageBox.StandardButton.Yes:
return
res = self.ctrl.traffic_appmarks_apply(op="clear", target=tgt)
if res.ok:
self._append_app_log(f"[appmarks] cleared: target={tgt}")
self._set_action_status(f"App marks cleared: target={tgt}", ok=True)
else:
self._append_app_log(f"[appmarks] clear error: {res.message}")
self._set_action_status(
f"App marks clear failed: target={tgt} ({res.message})",
ok=False,
)
QMessageBox.critical(
self, "Clear marks error", res.message or "unknown error"
)
self.refresh_appmarks_counts()
self.refresh_appmarks_items(quiet=True)
self.refresh_app_profiles(quiet=True)
self._safe(work, title="Clear app marks error")
# -----------------------------------------------------------------
# Scopes list + cleanup (runtime)
# -----------------------------------------------------------------
def _scope_target_from_unit(self, unit: str) -> str:
u = (unit or "").strip()
if not u.startswith("svpn-"):
return ""
rest = u[len("svpn-") :]
if rest.startswith("vpn-"):
return "vpn"
if rest.startswith("direct-"):
return "direct"
return ""
def _systemctl_user(self, args: list[str]) -> tuple[int, str]:
cmd = ["systemctl", "--user"] + list(args or [])
try:
p = subprocess.run(
cmd,
capture_output=True,
text=True,
check=False,
timeout=4,
)
out = ((p.stdout or "") + (p.stderr or "")).strip()
return int(p.returncode or 0), out
except subprocess.TimeoutExpired:
return 124, f"timeout running: {' '.join(cmd)}"
def _list_running_svpn_units(self) -> list[str]:
code, out = self._systemctl_user(
[
"list-units",
"--type=scope",
"--type=service",
"--state=running",
"--no-legend",
"--no-pager",
"--plain",
]
)
if code != 0:
raise RuntimeError(out or f"systemctl list-units failed: rc={code}")
units: list[str] = []
for raw in (out or "").splitlines():
line = raw.strip()
if not line:
continue
fields = line.split()
if not fields:
continue
unit = fields[0].strip()
if unit.startswith("svpn-") and (unit.endswith(".scope") or unit.endswith(".service")):
units.append(unit)
units.sort()
return units
def _control_group_for_unit(self, unit: str) -> str:
code, out = self._systemctl_user(["show", "-p", "ControlGroup", "--value", unit])
if code != 0:
raise RuntimeError(out or f"systemctl show failed: rc={code}")
return (out or "").strip()
def _cgroup_inode_id(self, cgroup_path: str) -> int:
cg = (cgroup_path or "").strip()
if not cg:
return 0
rel = cg.lstrip("/")
if not rel:
return 0
full = os.path.join("/sys/fs/cgroup", rel)
try:
st = os.stat(full)
return int(getattr(st, "st_ino", 0) or 0)
except Exception:
return 0
def refresh_running_scopes(self, *, quiet: bool = False) -> None:
def work() -> None:
units = self._list_running_svpn_units()
self.lst_scopes.clear()
for unit in units:
target = self._scope_target_from_unit(unit)
cg = ""
cg_id = 0
try:
# Prefer effective cgroup via MainPID (/proc/<pid>/cgroup) for services.
# Some GUI apps can migrate into a different app scope after launch.
if unit.endswith(".service"):
code, out = self._systemctl_user(["show", "-p", "MainPID", "--value", unit])
if code == 0:
try:
pid = int((out or "").strip() or "0")
except Exception:
pid = 0
if pid > 0:
cg = self._cgroup_path_from_pid(pid)
if not cg:
cg = self._control_group_for_unit(unit)
cg_id = self._cgroup_inode_id(cg)
except Exception:
pass
label = unit
if target in ("vpn", "direct"):
label += f" [{target.upper()}]"
it = QListWidgetItem(label)
info = RuntimeScopeInfo(
unit=unit,
target=target or "?",
cgroup_path=cg,
cgroup_id=int(cg_id or 0),
)
it.setData(QtCore.Qt.UserRole, info)
it.setToolTip(
f"Unit: {unit}\n"
f"Target: {target or '-'}\n"
f"ControlGroup: {cg or '-'}\n"
f"cgroup_id: {cg_id or '-'}\n\n"
"EN: Stop selected will unmark by cgroup_id (if known) and stop the unit.\n"
"RU: Stop selected удалит метку по cgroup_id (если известен) и остановит unit."
)
self.lst_scopes.addItem(it)
self.lbl_scopes.setText(f"Running units: {len(units)}")
has_any = self.lst_scopes.count() > 0
self.btn_scopes_stop_selected.setEnabled(has_any)
self.btn_scopes_cleanup.setEnabled(has_any)
if quiet:
try:
work()
except Exception:
# Quiet mode: avoid modal popups on background refresh.
pass
return
self._safe(work, title="Units refresh error")
def _copy_scope_unit(self, it: QListWidgetItem) -> None:
unit = ""
info = it.data(QtCore.Qt.UserRole) if it else None
if isinstance(info, RuntimeScopeInfo):
unit = info.unit
else:
unit = (it.text() or "").split()[0].strip() if it else ""
if not unit:
return
try:
QtGui.QGuiApplication.clipboard().setText(unit)
except Exception:
pass
self._append_app_log(f"[unit] copied unit: {unit}")
self._set_action_status(f"Copied unit: {unit}", ok=True)
def _stop_scope_unit(self, unit: str) -> None:
u = (unit or "").strip()
if not u:
return
code, out = self._systemctl_user(["stop", u])
if code == 0:
if out:
self._append_app_log(out)
return
code2, out2 = self._systemctl_user(["kill", u])
if code2 != 0:
raise RuntimeError(out2 or out or f"stop/kill failed: {u}")
def _unmark_scope(self, info: RuntimeScopeInfo) -> None:
target = (info.target or "").strip().lower()
cg_id = int(info.cgroup_id or 0)
if target not in ("vpn", "direct") or cg_id <= 0:
return
res = self.ctrl.traffic_appmarks_apply(
op="del",
target=target,
cgroup=str(cg_id),
)
if not res.ok:
raise RuntimeError(res.message or "unmark failed")
def _selected_scope_infos(self) -> list[RuntimeScopeInfo]:
infos: list[RuntimeScopeInfo] = []
for it in self.lst_scopes.selectedItems() or []:
info = it.data(QtCore.Qt.UserRole) if it else None
if isinstance(info, RuntimeScopeInfo):
infos.append(info)
return infos
def on_scopes_stop_selected(self) -> None:
def work() -> None:
infos = self._selected_scope_infos()
if not infos:
QMessageBox.information(self, "No selection", "Select one or more units first.")
return
if QMessageBox.question(
self,
"Stop selected",
f"Unmark + stop {len(infos)} selected unit(s)?",
) != QMessageBox.StandardButton.Yes:
return
for info in infos:
self._append_app_log(
f"[unit] stop: unit={info.unit} target={info.target} cgroup_id={info.cgroup_id}"
)
try:
self._unmark_scope(info)
self._append_app_log("[unit] unmark OK")
except Exception as e:
self._append_app_log(f"[unit] unmark WARN: {e}")
self._stop_scope_unit(info.unit)
self._append_app_log("[unit] stop OK")
self.refresh_running_scopes()
self.refresh_appmarks_counts()
self.refresh_appmarks_items(quiet=True)
self.refresh_app_profiles(quiet=True)
self._set_action_status(f"Stopped units: {len(infos)}", ok=True)
self._safe(work, title="Stop selected units error")
def on_scopes_cleanup_all(self) -> None:
def work() -> None:
units = self._list_running_svpn_units()
if not units:
self._set_action_status("No running svpn units", ok=True)
self.refresh_running_scopes()
return
if QMessageBox.question(
self,
"Cleanup all",
f"Unmark + stop ALL running svpn units ({len(units)})?",
) != QMessageBox.StandardButton.Yes:
return
stopped = 0
for unit in units:
target = self._scope_target_from_unit(unit)
cg = ""
cg_id = 0
try:
cg = self._control_group_for_unit(unit)
cg_id = self._cgroup_inode_id(cg)
except Exception:
pass
info = RuntimeScopeInfo(
unit=unit,
target=target or "?",
cgroup_path=cg,
cgroup_id=int(cg_id or 0),
)
self._append_app_log(
f"[unit] cleanup: unit={info.unit} target={info.target} cgroup_id={info.cgroup_id}"
)
try:
self._unmark_scope(info)
self._append_app_log("[unit] unmark OK")
except Exception as e:
self._append_app_log(f"[unit] unmark WARN: {e}")
try:
self._stop_scope_unit(info.unit)
stopped += 1
except Exception as e:
self._append_app_log(f"[unit] stop ERROR: {e}")
self.refresh_running_scopes()
self.refresh_appmarks_counts()
self.refresh_appmarks_items(quiet=True)
self.refresh_app_profiles(quiet=True)
self._set_action_status(f"Cleanup done: stopped={stopped}/{len(units)}", ok=True)
self._safe(work, title="Cleanup units error")
def on_rollback(self) -> None:
def work() -> None:
res = self.ctrl.routes_clear()
self._emit_log(res.pretty_text or "rollback done")
self._set_action_status(res.pretty_text or "routes cleared (cache saved)", ok=bool(res.ok))
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work, title="Rollback error")
def on_restore_cache(self) -> None:
def work() -> None:
res = self.ctrl.routes_cache_restore()
self._emit_log(res.pretty_text or "cache restore done")
self._set_action_status(res.pretty_text or "routes restored from cache", ok=bool(res.ok))
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work, title="Restore cache error")
class TrafficCandidatesDialog(QDialog):
def __init__(
self,
candidates,
*,
existing: dict[str, dict[str, set[str]]] | None = None,
add_cb: Callable[[str, str, list[str]], None],
parent=None,
) -> None:
super().__init__(parent)
self.cands = candidates
self.add_cb = add_cb
self.existing = existing or {"vpn": {}, "direct": {}}
self.setWindowTitle("Add detected overrides")
self.resize(820, 680)
root = QVBoxLayout(self)
note = QLabel(
"Tip: hover list items for details. Подсказка: наведи на элементы списка.\n"
"Detect results from backend. Nothing is applied until you click Apply overrides."
)
note.setWordWrap(True)
note.setStyleSheet("color: gray;")
root.addWidget(note)
self.chk_hide_existing = QCheckBox("Hide already added")
self.chk_hide_existing.setToolTip(
"""EN: Hides items that are already present in Force VPN/Force Direct fields.
RU: Скрывает элементы, которые уже есть в Force VPN/Force Direct."""
)
self.chk_hide_existing.stateChanged.connect(lambda _s: self._refilter_current())
root.addWidget(self.chk_hide_existing)
self.tabs = QTabWidget()
root.addWidget(self.tabs, stretch=1)
self._tab_kind: dict[QWidget, str] = {}
self._tab_list: dict[QWidget, QListWidget] = {}
self._tab_filter: dict[QWidget, QLineEdit] = {}
self._list_kind: dict[QListWidget, str] = {}
self._list_title: dict[QListWidget, str] = {}
self.tabs.currentChanged.connect(lambda _idx: self._refilter_current())
self._build_subnets_tab()
self._build_services_tab()
self._build_uids_tab()
row = QHBoxLayout()
btn_vpn = QPushButton("Add to Force VPN")
btn_vpn.clicked.connect(lambda: self._add_selected("vpn"))
row.addWidget(btn_vpn)
btn_direct = QPushButton("Add to Force Direct")
btn_direct.clicked.connect(lambda: self._add_selected("direct"))
row.addWidget(btn_direct)
row.addStretch(1)
btn_close = QPushButton("Close")
btn_close.clicked.connect(self.accept)
row.addWidget(btn_close)
root.addLayout(row)
self.lbl_status = QLabel("")
self.lbl_status.setWordWrap(True)
self.lbl_status.setStyleSheet("color: gray;")
root.addWidget(self.lbl_status)
def _mark_state(self, kind: str, value: str) -> tuple[bool, bool]:
k = (kind or "").strip().lower()
v = (value or "").strip()
if not k or not v:
return False, False
in_vpn = v in (self.existing.get("vpn", {}).get(k, set()) or set())
in_direct = v in (self.existing.get("direct", {}).get(k, set()) or set())
return bool(in_vpn), bool(in_direct)
def _set_status(self, msg: str, ok: bool | None = None) -> None:
text = (msg or "").strip() or ""
self.lbl_status.setText(text)
if ok is True:
self.lbl_status.setStyleSheet("color: green;")
elif ok is False:
self.lbl_status.setStyleSheet("color: red;")
else:
self.lbl_status.setStyleSheet("color: gray;")
def _apply_filter(self, lst: QListWidget, query: str) -> None:
q = (query or "").strip().lower()
hide_existing = bool(self.chk_hide_existing.isChecked())
kind = (self._list_kind.get(lst) or "").strip().lower()
# Subnets-only quick filters.
allow_lan = True
allow_docker = True
allow_link = True
allow_linkdown = True
if kind == "subnet":
allow_lan = bool(getattr(self, "chk_sub_show_lan", None) and self.chk_sub_show_lan.isChecked())
allow_docker = bool(getattr(self, "chk_sub_show_docker", None) and self.chk_sub_show_docker.isChecked())
allow_link = bool(getattr(self, "chk_sub_show_link", None) and self.chk_sub_show_link.isChecked())
allow_linkdown = not bool(getattr(self, "chk_sub_hide_linkdown", None) and self.chk_sub_hide_linkdown.isChecked())
for i in range(lst.count()):
it = lst.item(i)
if not it:
continue
if hide_existing and bool(it.data(QtCore.Qt.UserRole + 1) or False):
it.setHidden(True)
continue
if kind == "subnet":
it_kind = str(it.data(QtCore.Qt.UserRole + 4) or "").strip().lower()
it_linkdown = bool(it.data(QtCore.Qt.UserRole + 5) or False)
if it_linkdown and not allow_linkdown:
it.setHidden(True)
continue
if it_kind == "docker" and not allow_docker:
it.setHidden(True)
continue
if it_kind == "lan" and not allow_lan:
it.setHidden(True)
continue
if it_kind == "link" and not allow_link:
it.setHidden(True)
continue
if not q:
it.setHidden(False)
continue
it.setHidden(q not in it.text().lower())
def _refilter_current(self) -> None:
tab = self.tabs.currentWidget()
if tab is None:
return
lst = self._tab_list.get(tab)
filt = self._tab_filter.get(tab)
if lst is None or filt is None:
return
self._apply_filter(lst, filt.text())
def _filter_for_title(self, title: str) -> QLineEdit | None:
for i in range(self.tabs.count()):
if self.tabs.tabText(i) == title:
tab = self.tabs.widget(i)
return self._tab_filter.get(tab)
return None
def _preset_set_filter(self, title: str, text: str) -> None:
filt = self._filter_for_title(title)
if filt is not None:
filt.setText(text)
def _preset_filter_subnets(self, *, lan: bool, docker: bool, link: bool) -> None:
# EN: "Filter LAN/Docker" buttons should not rely on text search; they should
# EN: toggle the kind checkboxes and clear the text filter.
# RU: Кнопки "Filter LAN/Docker" не должны полагаться на текстовый поиск;
# RU: они должны переключать чекбоксы видов и очищать текстовый фильтр.
chk_lan = getattr(self, "chk_sub_show_lan", None)
chk_docker = getattr(self, "chk_sub_show_docker", None)
chk_link = getattr(self, "chk_sub_show_link", None)
if chk_lan is None or chk_docker is None or chk_link is None:
self._preset_set_filter("Subnets", "")
self._refilter_current()
return
for chk, val in (
(chk_lan, lan),
(chk_docker, docker),
(chk_link, link),
):
try:
chk.blockSignals(True)
chk.setChecked(bool(val))
finally:
chk.blockSignals(False)
self._preset_set_filter("Subnets", "")
self._refilter_current()
def _update_item_render(self, it: QListWidgetItem, kind: str) -> None:
value = str(it.data(QtCore.Qt.UserRole) or "").strip()
base_label = str(it.data(QtCore.Qt.UserRole + 2) or it.text() or "")
base_tip = str(it.data(QtCore.Qt.UserRole + 3) or it.toolTip() or "")
in_vpn, in_direct = self._mark_state(kind, value)
flags = []
if in_vpn:
flags.append("VPN")
if in_direct:
flags.append("DIRECT")
label = base_label
if flags:
label = f"{base_label} [{' + '.join(flags)}]"
it.setText(label)
it.setData(QtCore.Qt.UserRole + 1, bool(in_vpn or in_direct))
if base_tip.strip():
extra_tip = (
f"\n\nAlready in Force VPN: {'yes' if in_vpn else 'no'}\n"
f"Already in Force Direct: {'yes' if in_direct else 'no'}"
)
it.setToolTip(base_tip + extra_tip)
if in_vpn or in_direct:
it.setForeground(QtGui.QBrush(QtGui.QColor("gray")))
def _add_tab(self, title: str, kind: str, items: list[tuple[str, str, str]], *, extra=None) -> None:
tab = QWidget()
layout = QVBoxLayout(tab)
if extra is not None:
extra(layout)
filt = QLineEdit()
filt.setPlaceholderText("Filter...")
layout.addWidget(filt)
lst = QListWidget()
lst.setSelectionMode(QAbstractItemView.ExtendedSelection)
for entry in items:
label = str(entry[0]) if len(entry) > 0 else ""
value = str(entry[1]) if len(entry) > 1 else ""
tip = str(entry[2]) if len(entry) > 2 else ""
meta_kind = str(entry[3]) if len(entry) > 3 else ""
meta_linkdown = bool(entry[4]) if len(entry) > 4 else False
it = QListWidgetItem(label)
it.setData(QtCore.Qt.UserRole, value)
it.setData(QtCore.Qt.UserRole + 2, label) # base label (without [VPN]/[DIRECT])
it.setData(QtCore.Qt.UserRole + 3, tip) # base tooltip (without existing-state)
it.setData(QtCore.Qt.UserRole + 4, meta_kind)
it.setData(QtCore.Qt.UserRole + 5, meta_linkdown)
lst.addItem(it)
self._update_item_render(it, kind)
layout.addWidget(lst, stretch=1)
filt.textChanged.connect(lambda txt, l=lst: self._apply_filter(l, txt))
self.tabs.addTab(tab, title)
self._tab_kind[tab] = kind
self._tab_list[tab] = lst
self._tab_filter[tab] = filt
self._list_kind[lst] = kind
self._list_title[lst] = title
def _current_kind_and_list(self) -> tuple[str, QListWidget | None]:
tab = self.tabs.currentWidget()
if tab is None:
return "", None
return self._tab_kind.get(tab, ""), self._tab_list.get(tab)
def _add_selected(self, target: str) -> None:
kind, lst = self._current_kind_and_list()
if not kind or lst is None:
return
vals: list[str] = []
for it in lst.selectedItems():
v = it.data(QtCore.Qt.UserRole)
vv = str(v or "").strip()
if vv:
vals.append(vv)
# stable de-dupe
out: list[str] = []
seen: set[str] = set()
for v in vals:
if v in seen:
continue
seen.add(v)
out.append(v)
if not out:
self._set_status("Nothing selected", ok=None)
return
tgt = (target or "").strip().lower()
k = (kind or "").strip().lower()
other = "direct" if tgt == "vpn" else "vpn"
have_tgt = self.existing.get(tgt, {}).get(k, set()) or set()
have_other = self.existing.get(other, {}).get(k, set()) or set()
to_add: list[str] = []
skipped = 0
conflicts = 0
for v in out:
if v in have_tgt:
skipped += 1
continue
if v in have_other:
conflicts += 1
to_add.append(v)
if not to_add:
self._set_status(f"Nothing new to add (skipped={skipped}, conflicts={conflicts})", ok=None)
return
self.add_cb(target, kind, to_add)
# Update local state so UI marks newly added items immediately.
if tgt not in self.existing:
self.existing[tgt] = {}
if k not in self.existing[tgt]:
self.existing[tgt][k] = set()
for v in to_add:
self.existing[tgt][k].add(v)
for i in range(lst.count()):
it = lst.item(i)
if it is None:
continue
self._update_item_render(it, kind)
self._refilter_current()
msg = f"Added {len(to_add)} item(s) to Force {tgt.upper()} ({k})."
if skipped or conflicts:
msg += f" skipped={skipped} conflicts={conflicts}"
self._set_status(msg, ok=True)
def _list_for_title(self, title: str) -> QListWidget | None:
for i in range(self.tabs.count()):
if self.tabs.tabText(i) == title:
tab = self.tabs.widget(i)
return self._tab_list.get(tab)
return None
def _preset_clear_selection(self, title: str) -> None:
lst = self._list_for_title(title)
if lst is not None:
lst.clearSelection()
def _preset_select_services(self, keywords: list[str]) -> None:
lst = self._list_for_title("Services")
if lst is None:
return
keys = [str(k).strip().lower() for k in (keywords or []) if str(k).strip()]
if not keys:
return
lst.clearSelection()
for i in range(lst.count()):
it = lst.item(i)
if it is None:
continue
txt = (it.text() or "").lower()
val = str(it.data(QtCore.Qt.UserRole) or "").lower()
if any(k in txt or k in val for k in keys):
it.setSelected(True)
def _preset_select_uids(self, uids: list[int]) -> None:
lst = self._list_for_title("UIDs")
if lst is None:
return
want = {f"{int(u)}-{int(u)}" for u in (uids or [])}
if not want:
return
lst.clearSelection()
for i in range(lst.count()):
it = lst.item(i)
if it is None:
continue
token = str(it.data(QtCore.Qt.UserRole) or "").strip()
if token in want:
it.setSelected(True)
def _build_subnets_tab(self) -> None:
subs = list(getattr(self.cands, "subnets", []) or [])
items: list[tuple[str, str, str, str, bool]] = []
for s in subs:
cidr = str(getattr(s, "cidr", "") or "").strip()
if not cidr:
continue
dev = str(getattr(s, "dev", "") or "").strip()
kind = str(getattr(s, "kind", "") or "").strip()
linkdown = bool(getattr(s, "linkdown", False))
tags = []
if kind:
tags.append(kind)
if dev:
tags.append(dev)
if linkdown:
tags.append("linkdown")
tag_txt = " " + "[" + ", ".join(tags) + "]" if tags else ""
tip = (
f"CIDR: {cidr}\n"
f"kind={kind or '-'} dev={dev or '-'} linkdown={linkdown}\n\n"
"EN: Source subnet overrides affect forwarded traffic (Docker).\n"
"RU: Source subnet влияет на forwarded трафик (Docker)."
)
items.append((f"{cidr}{tag_txt}", cidr, tip, kind, linkdown))
def extra(layout: QVBoxLayout) -> None:
row = QHBoxLayout()
btn_lan = QPushButton("Keep LAN direct")
btn_lan.clicked.connect(lambda: self._preset_add_lan_direct())
row.addWidget(btn_lan)
btn_docker = QPushButton("Keep Docker direct")
btn_docker.clicked.connect(lambda: self._preset_add_docker_direct())
row.addWidget(btn_docker)
row.addStretch(1)
layout.addLayout(row)
row2 = QHBoxLayout()
btn_f_lan = QPushButton("Filter LAN")
btn_f_lan.clicked.connect(lambda: self._preset_filter_subnets(lan=True, docker=False, link=True))
row2.addWidget(btn_f_lan)
btn_f_docker = QPushButton("Filter Docker")
btn_f_docker.clicked.connect(lambda: self._preset_filter_subnets(lan=False, docker=True, link=False))
row2.addWidget(btn_f_docker)
btn_f_clear = QPushButton("Clear filter")
btn_f_clear.clicked.connect(lambda: self._preset_filter_subnets(lan=True, docker=True, link=True))
row2.addWidget(btn_f_clear)
row2.addStretch(1)
layout.addLayout(row2)
row3 = QHBoxLayout()
self.chk_sub_show_lan = QCheckBox("LAN")
self.chk_sub_show_lan.setChecked(True)
self.chk_sub_show_lan.setToolTip("EN: Show LAN subnets.\nRU: Показать LAN подсети.")
self.chk_sub_show_lan.stateChanged.connect(lambda _s: self._refilter_current())
row3.addWidget(self.chk_sub_show_lan)
self.chk_sub_show_docker = QCheckBox("Docker")
self.chk_sub_show_docker.setChecked(True)
self.chk_sub_show_docker.setToolTip("EN: Show Docker/container subnets.\nRU: Показать Docker/контейнерные подсети.")
self.chk_sub_show_docker.stateChanged.connect(lambda _s: self._refilter_current())
row3.addWidget(self.chk_sub_show_docker)
self.chk_sub_show_link = QCheckBox("Link (scope link)")
self.chk_sub_show_link.setChecked(True)
self.chk_sub_show_link.setToolTip("EN: Show link-scope routes.\nRU: Показать маршруты scope link.")
self.chk_sub_show_link.stateChanged.connect(lambda _s: self._refilter_current())
row3.addWidget(self.chk_sub_show_link)
self.chk_sub_hide_linkdown = QCheckBox("Hide linkdown")
self.chk_sub_hide_linkdown.setChecked(True)
self.chk_sub_hide_linkdown.setToolTip("EN: Hide routes marked as linkdown.\nRU: Скрыть маршруты с меткой linkdown.")
self.chk_sub_hide_linkdown.stateChanged.connect(lambda _s: self._refilter_current())
row3.addWidget(self.chk_sub_hide_linkdown)
row3.addStretch(1)
layout.addLayout(row3)
self._add_tab("Subnets", "subnet", items, extra=extra)
def _preset_add_lan_direct(self) -> None:
subs = list(getattr(self.cands, "subnets", []) or [])
vals: list[str] = []
for s in subs:
kind = str(getattr(s, "kind", "") or "").strip()
cidr = str(getattr(s, "cidr", "") or "").strip()
if not cidr:
continue
if kind in ("lan", "link"):
vals.append(cidr)
if vals:
self.add_cb("direct", "subnet", vals)
def _preset_add_docker_direct(self) -> None:
subs = list(getattr(self.cands, "subnets", []) or [])
vals: list[str] = []
for s in subs:
kind = str(getattr(s, "kind", "") or "").strip()
cidr = str(getattr(s, "cidr", "") or "").strip()
if not cidr:
continue
if kind == "docker":
vals.append(cidr)
if vals:
self.add_cb("direct", "subnet", vals)
def _build_services_tab(self) -> None:
units = list(getattr(self.cands, "units", []) or [])
items: list[tuple[str, str, str]] = []
for u in units:
unit = str(getattr(u, "unit", "") or "").strip()
if not unit:
continue
desc = str(getattr(u, "description", "") or "").strip()
cgroup = str(getattr(u, "cgroup", "") or "").strip() or unit
label = unit
if desc:
label += " - " + desc
tip = (
f"Unit: {unit}\n"
f"Cgroup token: {cgroup}\n\n"
"EN: Adds a cgroup override; backend resolves it to UID rules at apply time.\n"
"RU: Добавляет cgroup override; backend резолвит его в UID правила при применении."
)
items.append((label, cgroup, tip))
def extra(layout: QVBoxLayout) -> None:
row = QHBoxLayout()
btn_docker = QPushButton("Select docker/container")
btn_docker.clicked.connect(lambda: self._preset_select_services(["docker", "containerd", "podman"]))
row.addWidget(btn_docker)
btn_media = QPushButton("Select media (jellyfin/plex)")
btn_media.clicked.connect(lambda: self._preset_select_services(["jellyfin", "plex", "emby"]))
row.addWidget(btn_media)
btn_clear = QPushButton("Clear selection")
btn_clear.clicked.connect(lambda: self._preset_clear_selection("Services"))
row.addWidget(btn_clear)
row.addStretch(1)
layout.addLayout(row)
row2 = QHBoxLayout()
btn_f_docker = QPushButton("Filter docker")
btn_f_docker.clicked.connect(lambda: self._preset_set_filter("Services", "docker"))
row2.addWidget(btn_f_docker)
btn_f_media = QPushButton("Filter media")
btn_f_media.clicked.connect(lambda: self._preset_set_filter("Services", "jellyfin"))
row2.addWidget(btn_f_media)
btn_f_clear = QPushButton("Clear filter")
btn_f_clear.clicked.connect(lambda: self._preset_set_filter("Services", ""))
row2.addWidget(btn_f_clear)
row2.addStretch(1)
layout.addLayout(row2)
self._add_tab("Services", "cgroup", items, extra=extra)
def _build_uids_tab(self) -> None:
uids = list(getattr(self.cands, "uids", []) or [])
items: list[tuple[str, str, str]] = []
for u in uids:
try:
uid = int(getattr(u, "uid", 0) or 0)
except Exception:
continue
user = str(getattr(u, "user", "") or "").strip()
examples = list(getattr(u, "examples", []) or [])
ex_txt = ", ".join([str(x) for x in examples if str(x).strip()])
label = str(uid)
if user:
label += f" ({user})"
if ex_txt:
label += " - " + ex_txt
token = f"{uid}-{uid}"
tip = (
f"UID: {uid}\n"
f"User: {user or '-'}\n"
f"Examples: {ex_txt or '-'}\n\n"
"EN: UID rules affect host-local processes (OUTPUT).\n"
"RU: UID правила влияют на процессы хоста (OUTPUT)."
)
items.append((label, token, tip))
def extra(layout: QVBoxLayout) -> None:
row = QHBoxLayout()
btn_me = QPushButton("Select my UID")
btn_me.clicked.connect(lambda: self._preset_select_uids([os.getuid()]))
row.addWidget(btn_me)
btn_root = QPushButton("Select root UID")
btn_root.clicked.connect(lambda: self._preset_select_uids([0]))
row.addWidget(btn_root)
btn_clear = QPushButton("Clear selection")
btn_clear.clicked.connect(lambda: self._preset_clear_selection("UIDs"))
row.addWidget(btn_clear)
row.addStretch(1)
layout.addLayout(row)
self._add_tab("UIDs", "uid", items, extra=extra)
# ---------------------------------------------------------------------
# App picker (.desktop entries)
# ---------------------------------------------------------------------
def _desktop_bool(v: str) -> bool:
return str(v or "").strip().lower() in _DESKTOP_BOOL_TRUE
def _desktop_name_from_section(sec: configparser.SectionProxy) -> str:
name = str(sec.get("Name", "") or "").strip()
if name:
return name
# Prefer Russian if present, then English, then any localized variant.
for key in ("Name[ru]", "Name[en_US]", "Name[en]"):
v = str(sec.get(key, "") or "").strip()
if v:
return v
for k, v in sec.items():
kk = str(k or "")
if kk.startswith("Name[") and str(v or "").strip():
return str(v).strip()
return ""
def _sanitize_desktop_exec(exec_raw: str, *, name: str = "", desktop_path: str = "") -> str:
raw = str(exec_raw or "").strip()
if not raw:
return ""
# Desktop spec: "%%" -> literal "%".
raw = raw.replace("%%", "%")
# Best-effort expansion for a couple of common fields.
if name:
raw = raw.replace("%c", name)
if desktop_path:
raw = raw.replace("%k", desktop_path)
try:
tokens = shlex.split(raw)
except Exception:
tokens = raw.split()
out: list[str] = []
for t in tokens:
tok = str(t or "").strip()
if not tok:
continue
# Drop standalone field codes (%u, %U, %f, ...).
if len(tok) == 2 and tok.startswith("%"):
continue
# Remove field codes inside tokens (e.g. --name=%c).
tok = _DESKTOP_EXEC_FIELD_RE.sub("", tok).strip()
if tok:
out.append(tok)
if not out:
return ""
return " ".join(shlex.quote(x) for x in out)
def _desktop_entry_from_file(path: str, *, source: str) -> Optional[DesktopAppEntry]:
p = str(path or "").strip()
if not p or not p.endswith(".desktop"):
return None
cp = configparser.ConfigParser(interpolation=None, strict=False)
cp.optionxform = str # keep case
try:
cp.read(p, encoding="utf-8")
except Exception:
try:
data = pathlib.Path(p).read_bytes()
cp.read_string(data.decode("utf-8", errors="replace"))
except Exception:
return None
if "Desktop Entry" not in cp:
return None
sec = cp["Desktop Entry"]
if _desktop_bool(sec.get("Hidden", "")) or _desktop_bool(sec.get("NoDisplay", "")):
return None
typ = str(sec.get("Type", "") or "").strip().lower()
if typ and typ != "application":
return None
exec_raw = str(sec.get("Exec", "") or "").strip()
if not exec_raw:
return None
name = _desktop_name_from_section(sec)
desktop_id = pathlib.Path(p).name
if not name:
name = desktop_id.replace(".desktop", "")
src = str(source or "").strip().lower() or "system"
return DesktopAppEntry(
desktop_id=desktop_id,
name=name,
exec_raw=exec_raw,
path=p,
source=src,
)
def _scan_desktop_entries() -> list[DesktopAppEntry]:
home = os.path.expanduser("~")
dirs: list[tuple[str, str]] = [
(os.path.join(home, ".local/share/applications"), "user"),
("/usr/local/share/applications", "system"),
("/usr/share/applications", "system"),
(os.path.join(home, ".local/share/flatpak/exports/share/applications"), "flatpak"),
("/var/lib/flatpak/exports/share/applications", "flatpak"),
("/var/lib/snapd/desktop/applications", "snap"),
]
seen: set[tuple[str, str]] = set()
out: list[DesktopAppEntry] = []
for d, src in dirs:
if not d or not os.path.isdir(d):
continue
try:
paths = sorted(pathlib.Path(d).glob("*.desktop"))
except Exception:
continue
for fp in paths:
ent = _desktop_entry_from_file(str(fp), source=src)
if ent is None:
continue
key = (ent.desktop_id, ent.source)
if key in seen:
continue
seen.add(key)
out.append(ent)
out.sort(key=lambda e: (e.name.lower(), e.source, e.desktop_id.lower()))
return out
class AppPickerDialog(QDialog):
def __init__(self, *, parent=None) -> None:
super().__init__(parent)
self.setWindowTitle("Pick app (.desktop)")
self.resize(860, 640)
self._selected_cmd: str = ""
self._entries: list[DesktopAppEntry] = _scan_desktop_entries()
root = QVBoxLayout(self)
note = QLabel(
"EN: Pick an installed GUI app from .desktop entries (system + flatpak + snap). "
"The command will be filled without %u/%U/%f placeholders.\n"
"RU: Выбери приложение из .desktop (system + flatpak + snap). "
"Команда будет заполнена без плейсхолдеров %u/%U/%f."
)
note.setWordWrap(True)
note.setStyleSheet("color: gray;")
root.addWidget(note)
row = QHBoxLayout()
row.addWidget(QLabel("Search"))
self.ed_search = QLineEdit()
self.ed_search.setPlaceholderText("Type to filter (name / id / exec)...")
row.addWidget(self.ed_search, stretch=1)
self.lbl_count = QLabel("")
self.lbl_count.setStyleSheet("color: gray;")
row.addWidget(self.lbl_count)
root.addLayout(row)
self.lst = QListWidget()
self.lst.setSelectionMode(QAbstractItemView.SingleSelection)
root.addWidget(self.lst, stretch=1)
self.preview = QPlainTextEdit()
self.preview.setReadOnly(True)
self.preview.setFixedHeight(180)
root.addWidget(self.preview)
row_btn = QHBoxLayout()
self.btn_use = QPushButton("Use selected")
self.btn_use.clicked.connect(self.on_use_selected)
row_btn.addWidget(self.btn_use)
row_btn.addStretch(1)
btn_close = QPushButton("Close")
btn_close.clicked.connect(self.reject)
row_btn.addWidget(btn_close)
root.addLayout(row_btn)
self._populate()
self.ed_search.textChanged.connect(lambda _t: self._apply_filter())
self.lst.currentItemChanged.connect(lambda _a, _b: self._update_preview())
self.lst.itemDoubleClicked.connect(lambda _it: self.on_use_selected())
QtCore.QTimer.singleShot(0, self._apply_filter)
def selected_command(self) -> str:
return str(self._selected_cmd or "")
def _populate(self) -> None:
self.lst.clear()
for ent in self._entries:
src = ent.source
label = f"{ent.name} [{src}] ({ent.desktop_id})"
tip = (
f"Name: {ent.name}\n"
f"ID: {ent.desktop_id}\n"
f"Source: {src}\n"
f"Path: {ent.path}\n\n"
f"Exec: {ent.exec_raw}"
)
it = QListWidgetItem(label)
it.setToolTip(tip)
it.setData(QtCore.Qt.UserRole, ent)
# precomputed search string
it.setData(
QtCore.Qt.UserRole + 1,
(label + "\n" + ent.exec_raw + "\n" + ent.path).lower(),
)
self.lst.addItem(it)
if self.lst.count() > 0:
self.lst.setCurrentRow(0)
self._update_preview()
def _apply_filter(self) -> None:
q = (self.ed_search.text() or "").strip().lower()
shown = 0
for i in range(self.lst.count()):
it = self.lst.item(i)
if not it:
continue
hay = str(it.data(QtCore.Qt.UserRole + 1) or "")
hide = bool(q) and q not in hay
it.setHidden(hide)
if not hide:
shown += 1
self.lbl_count.setText(f"Apps: {shown}/{self.lst.count()}")
def _current_entry(self) -> Optional[DesktopAppEntry]:
it = self.lst.currentItem()
if not it:
return None
ent = it.data(QtCore.Qt.UserRole)
if isinstance(ent, DesktopAppEntry):
return ent
return None
def _update_preview(self) -> None:
ent = self._current_entry()
if ent is None:
self.preview.setPlainText("")
self.btn_use.setEnabled(False)
return
cmd = _sanitize_desktop_exec(ent.exec_raw, name=ent.name, desktop_path=ent.path)
text = (
f"Name: {ent.name}\n"
f"ID: {ent.desktop_id}\n"
f"Source: {ent.source}\n"
f"Path: {ent.path}\n\n"
f"Exec (raw):\n{ent.exec_raw}\n\n"
f"Command (sanitized):\n{cmd}"
)
self.preview.setPlainText(text)
self.btn_use.setEnabled(bool(cmd.strip()))
def on_use_selected(self) -> None:
ent = self._current_entry()
if ent is None:
return
cmd = _sanitize_desktop_exec(ent.exec_raw, name=ent.name, desktop_path=ent.path).strip()
if not cmd:
QMessageBox.warning(self, "No command", "Selected app has no usable Exec command.")
return
self._selected_cmd = cmd
self.accept()