2280 lines
90 KiB
Python
2280 lines
90 KiB
Python
#!/usr/bin/env python3
|
||
from __future__ import annotations
|
||
|
||
import configparser
|
||
import os
|
||
import pathlib
|
||
import re
|
||
import shlex
|
||
import subprocess
|
||
import time
|
||
from dataclasses import dataclass
|
||
from typing import Callable, Optional
|
||
|
||
from PySide6 import QtCore, QtGui
|
||
from PySide6.QtWidgets import (
|
||
QButtonGroup,
|
||
QCheckBox,
|
||
QComboBox,
|
||
QDialog,
|
||
QGroupBox,
|
||
QHBoxLayout,
|
||
QLabel,
|
||
QLineEdit,
|
||
QListWidget,
|
||
QListWidgetItem,
|
||
QAbstractItemView,
|
||
QMessageBox,
|
||
QPlainTextEdit,
|
||
QPushButton,
|
||
QRadioButton,
|
||
QSpinBox,
|
||
QTabWidget,
|
||
QVBoxLayout,
|
||
QWidget,
|
||
)
|
||
|
||
from dashboard_controller import DashboardController
|
||
|
||
|
||
_DESKTOP_BOOL_TRUE = {"1", "true", "yes", "y", "on"}
|
||
_DESKTOP_EXEC_FIELD_RE = re.compile(r"%[A-Za-z]")
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class DesktopAppEntry:
|
||
desktop_id: str
|
||
name: str
|
||
exec_raw: str
|
||
path: str
|
||
source: str # system|flatpak|user
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class RuntimeScopeInfo:
|
||
unit: str
|
||
target: str # vpn|direct|?
|
||
cgroup_path: str
|
||
cgroup_id: int
|
||
|
||
|
||
class TrafficModeDialog(QDialog):
|
||
def __init__(
|
||
self,
|
||
controller: DashboardController,
|
||
*,
|
||
log_cb: Callable[[str], None] | None = None,
|
||
refresh_cb: Callable[[], None] | None = None,
|
||
parent=None,
|
||
) -> None:
|
||
super().__init__(parent)
|
||
self.ctrl = controller
|
||
self.log_cb = log_cb
|
||
self.refresh_cb = refresh_cb
|
||
|
||
self.setWindowTitle("Traffic mode settings")
|
||
self.resize(780, 760)
|
||
|
||
root = QVBoxLayout(self)
|
||
|
||
# EN: Persist small UI state across dialog sessions.
|
||
# RU: Сохраняем небольшой UI state между открытиями окна.
|
||
self._settings = QtCore.QSettings("AdGuardVPN", "SelectiveVPNDashboardQt")
|
||
self._last_app_unit: str = str(self._settings.value("traffic_app_last_unit", "") or "")
|
||
self._last_app_target: str = str(self._settings.value("traffic_app_last_target", "") or "")
|
||
try:
|
||
self._last_app_cgroup_id: int = int(self._settings.value("traffic_app_last_cgroup_id", 0) or 0)
|
||
except Exception:
|
||
self._last_app_cgroup_id = 0
|
||
|
||
hint_group = QGroupBox("Mode behavior")
|
||
hint_layout = QVBoxLayout(hint_group)
|
||
hint_layout.addWidget(QLabel("Selective: only marked traffic goes via VPN."))
|
||
hint_layout.addWidget(QLabel("Full tunnel: all traffic goes via VPN."))
|
||
hint_layout.addWidget(QLabel("Direct: VPN routing rules are disabled."))
|
||
warn = QLabel(
|
||
"Warning: Full tunnel can break local/LAN access depending on your host routes."
|
||
)
|
||
warn.setStyleSheet("color: red;")
|
||
hint_layout.addWidget(warn)
|
||
root.addWidget(hint_group)
|
||
|
||
tip = QLabel("Tip: hover any control for help. Подсказка: наведи на элемент для описания.")
|
||
tip.setWordWrap(True)
|
||
tip.setStyleSheet("color: gray;")
|
||
root.addWidget(tip)
|
||
|
||
self.tabs = QTabWidget()
|
||
root.addWidget(self.tabs, stretch=1)
|
||
|
||
tab_basic = QWidget()
|
||
tab_basic_layout = QVBoxLayout(tab_basic)
|
||
|
||
mode_group = QGroupBox("Traffic mode relay")
|
||
mode_layout = QVBoxLayout(mode_group)
|
||
|
||
row_mode = QHBoxLayout()
|
||
self.rad_selective = QRadioButton("Selective")
|
||
self.rad_selective.setToolTip("""EN: Only marked traffic (fwmark 0x66) uses VPN policy table (agvpn).
|
||
RU: Только помеченный трафик (fwmark 0x66) идет через policy-table (agvpn).""")
|
||
self.rad_selective.toggled.connect(
|
||
lambda checked: self.on_mode_toggle("selective", checked)
|
||
)
|
||
row_mode.addWidget(self.rad_selective)
|
||
|
||
self.rad_full = QRadioButton("Full tunnel")
|
||
self.rad_full.setToolTip("""EN: All traffic uses VPN policy table (agvpn). Use with auto-local bypass for LAN/docker.
|
||
RU: Весь трафик идет через policy-table (agvpn). Для LAN/docker включай auto-local bypass.""")
|
||
self.rad_full.toggled.connect(
|
||
lambda checked: self.on_mode_toggle("full_tunnel", checked)
|
||
)
|
||
row_mode.addWidget(self.rad_full)
|
||
|
||
self.rad_direct = QRadioButton("Direct")
|
||
self.rad_direct.setToolTip("""EN: Disables base VPN routing rules (no full/selective rule).
|
||
RU: Отключает базовые VPN policy-rules (нет full/selective правила).""")
|
||
self.rad_direct.toggled.connect(
|
||
lambda checked: self.on_mode_toggle("direct", checked)
|
||
)
|
||
row_mode.addWidget(self.rad_direct)
|
||
row_mode.addStretch(1)
|
||
mode_layout.addLayout(row_mode)
|
||
|
||
row_iface = QHBoxLayout()
|
||
row_iface.addWidget(QLabel("Preferred iface"))
|
||
self.cmb_iface = QComboBox()
|
||
self.cmb_iface.setToolTip("""EN: VPN interface for policy routing. Use auto unless you know the exact iface.
|
||
RU: Интерфейс VPN для policy routing. Оставь auto, если не уверен.""")
|
||
self.cmb_iface.setEditable(True)
|
||
self.cmb_iface.setInsertPolicy(QComboBox.NoInsert)
|
||
self.cmb_iface.setMinimumWidth(180)
|
||
row_iface.addWidget(self.cmb_iface)
|
||
|
||
self.btn_refresh_ifaces = QPushButton("Detect ifaces")
|
||
self.btn_refresh_ifaces.setToolTip("""EN: Refresh list of available interfaces (UP).
|
||
RU: Обновить список доступных интерфейсов (UP).""")
|
||
self.btn_refresh_ifaces.clicked.connect(self.on_refresh_ifaces)
|
||
row_iface.addWidget(self.btn_refresh_ifaces)
|
||
row_iface.addStretch(1)
|
||
mode_layout.addLayout(row_iface)
|
||
|
||
self.chk_auto_local = QCheckBox("Auto-local bypass (LAN/container subnets)")
|
||
self.chk_auto_local.setToolTip("""EN: Mirrors local/LAN/docker routes from main into agvpn table to prevent breakage in full tunnel.
|
||
EN: This does NOT force containers to use direct internet; use Force Direct subnets for that.
|
||
RU: Копирует локальные/LAN/docker маршруты из main в agvpn, чтобы не ломалась локалка в full tunnel.
|
||
RU: Это НЕ делает контейнеры direct в интернет; для этого используй Force Direct subnets.""")
|
||
self.chk_auto_local.stateChanged.connect(lambda _state: self.on_auto_local_toggle())
|
||
mode_layout.addWidget(self.chk_auto_local)
|
||
|
||
self.lbl_state = QLabel("Traffic mode: —")
|
||
self.lbl_state.setStyleSheet("color: gray;")
|
||
mode_layout.addWidget(self.lbl_state)
|
||
|
||
self.lbl_diag = QLabel("—")
|
||
self.lbl_diag.setStyleSheet("color: gray;")
|
||
mode_layout.addWidget(self.lbl_diag)
|
||
|
||
tab_basic_layout.addWidget(mode_group)
|
||
|
||
maint_group = QGroupBox("Rollback / cache")
|
||
maint_layout = QHBoxLayout(maint_group)
|
||
self.btn_rollback = QPushButton("Clear routes (save cache)")
|
||
self.btn_rollback.setToolTip("""EN: Clears VPN routes and nft sets, but saves a cache snapshot for restore.
|
||
RU: Очищает VPN маршруты и nft-сеты, но сохраняет снапшот для восстановления.""")
|
||
self.btn_rollback.clicked.connect(self.on_rollback)
|
||
maint_layout.addWidget(self.btn_rollback)
|
||
self.btn_restore_cache = QPushButton("Restore cached routes")
|
||
self.btn_restore_cache.setToolTip("""EN: Restores routes/nft from the last clear snapshot. Skips non-critical route restore errors.
|
||
RU: Восстанавливает маршруты/nft из последнего снапшота clear. Некритичные ошибки восстановления пропускаются.""")
|
||
self.btn_restore_cache.clicked.connect(self.on_restore_cache)
|
||
maint_layout.addWidget(self.btn_restore_cache)
|
||
maint_layout.addStretch(1)
|
||
tab_basic_layout.addWidget(maint_group)
|
||
|
||
tab_basic_layout.addStretch(1)
|
||
self.tabs.addTab(tab_basic, "Traffic basics")
|
||
|
||
# -----------------------------------------------------------------
|
||
# Apps (runtime): systemd --user unit + backend appmarks
|
||
# -----------------------------------------------------------------
|
||
|
||
tab_apps = QWidget()
|
||
tab_apps_layout = QVBoxLayout(tab_apps)
|
||
|
||
apps_hint = QLabel(
|
||
"Runtime per-app routing (Wayland-friendly):\n"
|
||
"- Launch uses systemd-run --user (transient unit).\n"
|
||
"- Backend adds the unit cgroup into nftset -> fwmark rules.\n"
|
||
"- Marks are temporary (TTL). Use Policy overrides for persistent policy."
|
||
)
|
||
apps_hint.setWordWrap(True)
|
||
apps_hint.setStyleSheet("color: gray;")
|
||
tab_apps_layout.addWidget(apps_hint)
|
||
|
||
run_group = QGroupBox("Run app (systemd unit) + apply mark")
|
||
run_layout = QVBoxLayout(run_group)
|
||
|
||
row_cmd = QHBoxLayout()
|
||
row_cmd.addWidget(QLabel("Command"))
|
||
self.ed_app_cmd = QLineEdit()
|
||
self.ed_app_cmd.setPlaceholderText(
|
||
"e.g. firefox --private-window https://example.com"
|
||
)
|
||
self.ed_app_cmd.setToolTip(
|
||
"EN: Command line to run. This runs as current user via systemd --user.\n"
|
||
"RU: Команда запуска. Запускается от текущего пользователя через systemd --user."
|
||
)
|
||
row_cmd.addWidget(self.ed_app_cmd, stretch=1)
|
||
self.btn_app_pick = QPushButton("Pick app...")
|
||
self.btn_app_pick.setToolTip(
|
||
"EN: Pick an installed app from .desktop entries (system + flatpak + snap) and fill the command.\n"
|
||
"RU: Выбрать установленное приложение из .desktop (system + flatpak + snap) и заполнить команду."
|
||
)
|
||
self.btn_app_pick.clicked.connect(self.on_app_pick)
|
||
row_cmd.addWidget(self.btn_app_pick)
|
||
run_layout.addLayout(row_cmd)
|
||
|
||
row_target = QHBoxLayout()
|
||
row_target.addWidget(QLabel("Route via"))
|
||
self.rad_app_vpn = QRadioButton("VPN")
|
||
self.rad_app_vpn.setToolTip(
|
||
"EN: Force this app traffic via VPN policy table (agvpn).\n"
|
||
"RU: Форсировать трафик приложения через VPN policy-table (agvpn)."
|
||
)
|
||
self.rad_app_direct = QRadioButton("Direct")
|
||
self.rad_app_direct.setToolTip(
|
||
"EN: Force this app traffic to bypass VPN (lookup main), even in full tunnel.\n"
|
||
"RU: Форсировать трафик приложения мимо VPN (lookup main), даже в full tunnel."
|
||
)
|
||
bg_app = QButtonGroup(self)
|
||
bg_app.addButton(self.rad_app_vpn)
|
||
bg_app.addButton(self.rad_app_direct)
|
||
self.rad_app_vpn.setChecked(True)
|
||
row_target.addWidget(self.rad_app_vpn)
|
||
row_target.addWidget(self.rad_app_direct)
|
||
row_target.addStretch(1)
|
||
run_layout.addLayout(row_target)
|
||
|
||
row_ttl = QHBoxLayout()
|
||
row_ttl.addWidget(QLabel("TTL (hours)"))
|
||
self.spn_app_ttl = QSpinBox()
|
||
self.spn_app_ttl.setRange(1, 24 * 30) # up to ~30 days
|
||
self.spn_app_ttl.setValue(24)
|
||
self.spn_app_ttl.setToolTip(
|
||
"EN: How long the runtime mark stays active (backend nftset element timeout).\n"
|
||
"RU: Сколько живет runtime-метка (timeout элемента в nftset)."
|
||
)
|
||
row_ttl.addWidget(self.spn_app_ttl)
|
||
row_ttl.addStretch(1)
|
||
run_layout.addLayout(row_ttl)
|
||
|
||
row_btn = QHBoxLayout()
|
||
self.btn_app_run = QPushButton("Run + apply mark")
|
||
self.btn_app_run.clicked.connect(self.on_app_run)
|
||
row_btn.addWidget(self.btn_app_run)
|
||
self.btn_app_refresh = QPushButton("Refresh counts")
|
||
self.btn_app_refresh.clicked.connect(self.refresh_appmarks_counts)
|
||
row_btn.addWidget(self.btn_app_refresh)
|
||
row_btn.addStretch(1)
|
||
run_layout.addLayout(row_btn)
|
||
|
||
row_btn2 = QHBoxLayout()
|
||
self.btn_app_clear_vpn = QPushButton("Clear VPN marks")
|
||
self.btn_app_clear_vpn.clicked.connect(lambda: self.on_appmarks_clear("vpn"))
|
||
row_btn2.addWidget(self.btn_app_clear_vpn)
|
||
self.btn_app_clear_direct = QPushButton("Clear Direct marks")
|
||
self.btn_app_clear_direct.clicked.connect(
|
||
lambda: self.on_appmarks_clear("direct")
|
||
)
|
||
row_btn2.addWidget(self.btn_app_clear_direct)
|
||
self.btn_app_stop_last = QPushButton("Stop last scope")
|
||
self.btn_app_stop_last.setToolTip(
|
||
"EN: Stops the last launched systemd --user scope.\n"
|
||
"RU: Останавливает последний запущенный systemd --user scope."
|
||
)
|
||
self.btn_app_stop_last.clicked.connect(self.on_app_stop_last_scope)
|
||
row_btn2.addWidget(self.btn_app_stop_last)
|
||
self.btn_app_unmark_last = QPushButton("Unmark last")
|
||
self.btn_app_unmark_last.setToolTip(
|
||
"EN: Removes routing mark for the last launched scope (by cgroup id).\n"
|
||
"RU: Удаляет метку маршрутизации для последнего scope (по cgroup id)."
|
||
)
|
||
self.btn_app_unmark_last.clicked.connect(self.on_app_unmark_last)
|
||
row_btn2.addWidget(self.btn_app_unmark_last)
|
||
row_btn2.addStretch(1)
|
||
run_layout.addLayout(row_btn2)
|
||
|
||
self.lbl_app_counts = QLabel("Marks: —")
|
||
self.lbl_app_counts.setStyleSheet("color: gray;")
|
||
run_layout.addWidget(self.lbl_app_counts)
|
||
self.lbl_app_last = QLabel("Last scope: —")
|
||
self.lbl_app_last.setStyleSheet("color: gray;")
|
||
run_layout.addWidget(self.lbl_app_last)
|
||
self._refresh_last_scope_ui()
|
||
|
||
tab_apps_layout.addWidget(run_group)
|
||
|
||
scopes_group = QGroupBox("Active svpn units (systemd --user)")
|
||
scopes_layout = QVBoxLayout(scopes_group)
|
||
|
||
scopes_row = QHBoxLayout()
|
||
self.btn_scopes_refresh = QPushButton("Refresh units")
|
||
self.btn_scopes_refresh.setToolTip(
|
||
"EN: Refresh list of running svpn-* units (.service/.scope).\n"
|
||
"RU: Обновить список запущенных svpn-* unit (.service/.scope)."
|
||
)
|
||
self.btn_scopes_refresh.clicked.connect(self.refresh_running_scopes)
|
||
scopes_row.addWidget(self.btn_scopes_refresh)
|
||
|
||
self.btn_scopes_stop_selected = QPushButton("Stop selected")
|
||
self.btn_scopes_stop_selected.setToolTip(
|
||
"EN: Unmarks + stops selected units.\n"
|
||
"RU: Удаляет метки + останавливает выбранные unit."
|
||
)
|
||
self.btn_scopes_stop_selected.clicked.connect(self.on_scopes_stop_selected)
|
||
scopes_row.addWidget(self.btn_scopes_stop_selected)
|
||
|
||
self.btn_scopes_cleanup = QPushButton("Cleanup all svpn units")
|
||
self.btn_scopes_cleanup.setToolTip(
|
||
"EN: Unmarks + stops ALL running svpn-* units.\n"
|
||
"RU: Удаляет метки + останавливает ВСЕ запущенные svpn-* unit."
|
||
)
|
||
self.btn_scopes_cleanup.clicked.connect(self.on_scopes_cleanup_all)
|
||
scopes_row.addWidget(self.btn_scopes_cleanup)
|
||
|
||
scopes_row.addStretch(1)
|
||
scopes_layout.addLayout(scopes_row)
|
||
|
||
self.lst_scopes = QListWidget()
|
||
self.lst_scopes.setSelectionMode(QAbstractItemView.ExtendedSelection)
|
||
self.lst_scopes.setToolTip(
|
||
"EN: Running svpn units. Double click to copy unit name.\n"
|
||
"RU: Запущенные svpn unit. Двойной клик копирует имя unit."
|
||
)
|
||
self.lst_scopes.itemDoubleClicked.connect(lambda it: self._copy_scope_unit(it))
|
||
self.lst_scopes.setFixedHeight(140)
|
||
scopes_layout.addWidget(self.lst_scopes)
|
||
|
||
self.lbl_scopes = QLabel("Running units: —")
|
||
self.lbl_scopes.setStyleSheet("color: gray;")
|
||
scopes_layout.addWidget(self.lbl_scopes)
|
||
|
||
tab_apps_layout.addWidget(scopes_group)
|
||
|
||
self.txt_app = QPlainTextEdit()
|
||
self.txt_app.setReadOnly(True)
|
||
tab_apps_layout.addWidget(self.txt_app, stretch=1)
|
||
|
||
tab_apps_layout.addStretch(1)
|
||
self.tabs.addTab(tab_apps, "Apps (runtime)")
|
||
|
||
tab_adv = QWidget()
|
||
tab_adv_layout = QVBoxLayout(tab_adv)
|
||
|
||
self.ed_vpn_subnets = QPlainTextEdit()
|
||
self.ed_vpn_subnets.setToolTip("""EN: Force VPN by source subnet. Useful for docker subnets when you want containers via VPN.
|
||
RU: Принудительно через VPN по source subnet. Полезно для docker-подсетей, если хочешь контейнеры через VPN.""")
|
||
self.ed_vpn_subnets.setPlaceholderText("Force VPN by source subnet, one per line (e.g. 172.18.0.0/16)")
|
||
self.ed_vpn_subnets.setFixedHeight(72)
|
||
|
||
self.ed_vpn_uids = QPlainTextEdit()
|
||
self.ed_vpn_uids.setToolTip("""EN: Force VPN by UID/uidrange (host OUTPUT only). Does not affect forwarded docker traffic.
|
||
RU: Принудительно через VPN по UID (только процессы хоста). На forwarded docker-трафик не влияет.""")
|
||
self.ed_vpn_uids.setPlaceholderText("Force VPN by UID/UID range, one per line (e.g. 1000 or 1000-1010)")
|
||
self.ed_vpn_uids.setFixedHeight(60)
|
||
|
||
self.ed_vpn_cgroups = QPlainTextEdit()
|
||
self.ed_vpn_cgroups.setToolTip("""EN: Force VPN by systemd cgroup. Backend resolves cgroup -> PIDs -> UID rules at apply time.
|
||
RU: Принудительно через VPN по cgroup (systemd). Backend резолвит cgroup -> PID -> UID при применении.""")
|
||
self.ed_vpn_cgroups.setPlaceholderText("Force VPN by cgroup path/name, one per line")
|
||
self.ed_vpn_cgroups.setFixedHeight(60)
|
||
|
||
self.ed_direct_subnets = QPlainTextEdit()
|
||
self.ed_direct_subnets.setToolTip("""EN: Force Direct by source subnet. Useful to keep docker subnets direct in full tunnel.
|
||
RU: Принудительно direct по source subnet. Полезно, чтобы docker-подсети были direct в full tunnel.""")
|
||
self.ed_direct_subnets.setPlaceholderText("Force Direct by source subnet, one per line")
|
||
self.ed_direct_subnets.setFixedHeight(72)
|
||
|
||
self.ed_direct_uids = QPlainTextEdit()
|
||
self.ed_direct_uids.setToolTip("""EN: Force Direct by UID/uidrange (host OUTPUT only).
|
||
RU: Принудительно direct по UID (только процессы хоста).""")
|
||
self.ed_direct_uids.setPlaceholderText("Force Direct by UID/UID range, one per line")
|
||
self.ed_direct_uids.setFixedHeight(60)
|
||
|
||
self.ed_direct_cgroups = QPlainTextEdit()
|
||
self.ed_direct_cgroups.setToolTip("""EN: Force Direct by systemd cgroup (resolved to UID rules at apply time).
|
||
RU: Принудительно direct по cgroup (резолвится в UID правила при применении).""")
|
||
self.ed_direct_cgroups.setPlaceholderText("Force Direct by cgroup path/name, one per line")
|
||
self.ed_direct_cgroups.setFixedHeight(60)
|
||
|
||
cols = QHBoxLayout()
|
||
|
||
vpn_group = QGroupBox("Force VPN")
|
||
vpn_layout = QVBoxLayout(vpn_group)
|
||
vpn_layout.addWidget(QLabel("Source subnets"))
|
||
vpn_layout.addWidget(self.ed_vpn_subnets)
|
||
vpn_layout.addWidget(QLabel("UIDs"))
|
||
vpn_layout.addWidget(self.ed_vpn_uids)
|
||
vpn_layout.addWidget(QLabel("Cgroups / services"))
|
||
vpn_layout.addWidget(self.ed_vpn_cgroups)
|
||
cols.addWidget(vpn_group, stretch=1)
|
||
|
||
direct_group = QGroupBox("Force Direct")
|
||
direct_layout = QVBoxLayout(direct_group)
|
||
direct_layout.addWidget(QLabel("Source subnets"))
|
||
direct_layout.addWidget(self.ed_direct_subnets)
|
||
direct_layout.addWidget(QLabel("UIDs"))
|
||
direct_layout.addWidget(self.ed_direct_uids)
|
||
direct_layout.addWidget(QLabel("Cgroups / services"))
|
||
direct_layout.addWidget(self.ed_direct_cgroups)
|
||
cols.addWidget(direct_group, stretch=1)
|
||
|
||
tab_adv_layout.addLayout(cols, stretch=1)
|
||
|
||
row_adv = QHBoxLayout()
|
||
self.btn_pick_detected = QPushButton("Add detected...")
|
||
self.btn_pick_detected.setToolTip("""EN: Opens a selector with detected subnets/services/UIDs. Only fills fields; nothing is applied automatically.
|
||
RU: Открывает список обнаруженных subnet/service/UID. Только заполняет поля; ничего не применяется автоматически.""")
|
||
self.btn_pick_detected.clicked.connect(self.on_pick_detected)
|
||
row_adv.addWidget(self.btn_pick_detected)
|
||
self.btn_apply_overrides = QPushButton("Apply overrides")
|
||
self.btn_apply_overrides.setToolTip("""EN: Applies policy rules and verifies health. On failure backend rolls back.
|
||
RU: Применяет policy-rules и проверяет health. При ошибке backend делает откат.""")
|
||
self.btn_apply_overrides.clicked.connect(self.on_apply_overrides)
|
||
row_adv.addWidget(self.btn_apply_overrides)
|
||
self.btn_reload_overrides = QPushButton("Reload overrides")
|
||
self.btn_reload_overrides.clicked.connect(self.refresh_state)
|
||
row_adv.addWidget(self.btn_reload_overrides)
|
||
row_adv.addStretch(1)
|
||
tab_adv_layout.addLayout(row_adv)
|
||
|
||
self.tabs.addTab(tab_adv, "Policy overrides (Advanced)")
|
||
|
||
# EN: Small status line for last action performed in this dialog.
|
||
# RU: Строка статуса последнего действия в этом окне.
|
||
self.lbl_action = QLabel("—")
|
||
self.lbl_action.setWordWrap(True)
|
||
self.lbl_action.setStyleSheet("color: gray;")
|
||
root.addWidget(self.lbl_action)
|
||
|
||
row_bottom = QHBoxLayout()
|
||
row_bottom.addStretch(1)
|
||
btn_close = QPushButton("Close")
|
||
btn_close.clicked.connect(self.accept)
|
||
row_bottom.addWidget(btn_close)
|
||
root.addLayout(row_bottom)
|
||
|
||
QtCore.QTimer.singleShot(0, self.refresh_state)
|
||
QtCore.QTimer.singleShot(0, self.refresh_appmarks_counts)
|
||
QtCore.QTimer.singleShot(0, self.refresh_running_scopes)
|
||
|
||
def _is_operation_error(self, message: str) -> bool:
|
||
low = (message or "").strip().lower()
|
||
return ("rolled back" in low) or ("apply failed" in low) or ("verification failed" in low)
|
||
|
||
def _set_action_status(self, msg: str, ok: bool | None = None) -> None:
|
||
text = (msg or "").strip() or "—"
|
||
self.lbl_action.setText(text)
|
||
if ok is True:
|
||
self.lbl_action.setStyleSheet("color: green;")
|
||
elif ok is False:
|
||
self.lbl_action.setStyleSheet("color: red;")
|
||
else:
|
||
self.lbl_action.setStyleSheet("color: gray;")
|
||
|
||
def _safe(self, fn, *, title: str = "Traffic mode error") -> None:
|
||
try:
|
||
fn()
|
||
except Exception as e:
|
||
msg = f"[ui-error] {title}: {e}"
|
||
self._set_action_status(msg, ok=False)
|
||
self._emit_log(msg)
|
||
QMessageBox.critical(self, title, str(e))
|
||
|
||
def _emit_log(self, msg: str) -> None:
|
||
text = (msg or "").strip()
|
||
if not text:
|
||
return
|
||
if self.log_cb:
|
||
self.log_cb(text)
|
||
else:
|
||
try:
|
||
self.ctrl.log_gui(text)
|
||
except Exception:
|
||
pass
|
||
|
||
def _preferred_iface_value(self) -> str:
|
||
raw = self.cmb_iface.currentText().strip()
|
||
if raw.lower() in ("", "auto", "-", "default"):
|
||
return ""
|
||
return raw
|
||
|
||
def _set_preferred_iface_options(self, ifaces: list[str], selected: str) -> None:
|
||
vals = ["auto"] + [x for x in ifaces if x]
|
||
sel = selected.strip() if selected else "auto"
|
||
if not sel:
|
||
sel = "auto"
|
||
if sel not in vals:
|
||
vals.append(sel)
|
||
|
||
self.cmb_iface.blockSignals(True)
|
||
self.cmb_iface.clear()
|
||
self.cmb_iface.addItems(vals)
|
||
idx = self.cmb_iface.findText(sel)
|
||
if idx < 0:
|
||
idx = self.cmb_iface.findText("auto")
|
||
if idx >= 0:
|
||
self.cmb_iface.setCurrentIndex(idx)
|
||
else:
|
||
self.cmb_iface.setEditText(sel)
|
||
self.cmb_iface.blockSignals(False)
|
||
|
||
def _lines_from_text(self, txt: str) -> list[str]:
|
||
out: list[str] = []
|
||
for raw in (txt or "").replace("\r", "\n").split("\n"):
|
||
line = raw.strip()
|
||
if line:
|
||
out.append(line)
|
||
return out
|
||
|
||
def _set_lines(self, widget: QPlainTextEdit, vals: list[str]) -> None:
|
||
widget.blockSignals(True)
|
||
widget.setPlainText("\n".join([x for x in vals if str(x).strip()]))
|
||
widget.blockSignals(False)
|
||
|
||
def _merge_lines(self, widget: QPlainTextEdit, vals: list[str]) -> int:
|
||
cur = self._lines_from_text(widget.toPlainText())
|
||
seen = {x.strip() for x in cur}
|
||
added = 0
|
||
for v in (vals or []):
|
||
vv = str(v).strip()
|
||
if not vv or vv in seen:
|
||
continue
|
||
cur.append(vv)
|
||
seen.add(vv)
|
||
added += 1
|
||
if added > 0:
|
||
self._set_lines(widget, cur)
|
||
return added
|
||
|
||
def _candidates_add(self, target: str, kind: str, values: list[str]) -> None:
|
||
tgt = (target or "").strip().lower()
|
||
k = (kind or "").strip().lower()
|
||
if tgt not in ("vpn", "direct"):
|
||
return
|
||
|
||
widget: QPlainTextEdit | None = None
|
||
if tgt == "vpn":
|
||
if k == "subnet":
|
||
widget = self.ed_vpn_subnets
|
||
elif k == "uid":
|
||
widget = self.ed_vpn_uids
|
||
elif k == "cgroup":
|
||
widget = self.ed_vpn_cgroups
|
||
else:
|
||
if k == "subnet":
|
||
widget = self.ed_direct_subnets
|
||
elif k == "uid":
|
||
widget = self.ed_direct_uids
|
||
elif k == "cgroup":
|
||
widget = self.ed_direct_cgroups
|
||
|
||
if widget is None:
|
||
return
|
||
|
||
added = self._merge_lines(widget, values or [])
|
||
if added > 0:
|
||
msg = f"Traffic candidates added: target={tgt} kind={k} added={added}"
|
||
self._set_action_status(msg, ok=True)
|
||
self._emit_log(msg)
|
||
else:
|
||
msg = f"Traffic candidates add: nothing new (target={tgt} kind={k})"
|
||
self._set_action_status(msg, ok=None)
|
||
self._emit_log(msg)
|
||
|
||
def on_pick_detected(self) -> None:
|
||
def work() -> None:
|
||
cands = self.ctrl.traffic_candidates()
|
||
existing = {
|
||
"vpn": {
|
||
"subnet": set(self._lines_from_text(self.ed_vpn_subnets.toPlainText())),
|
||
"uid": set(self._lines_from_text(self.ed_vpn_uids.toPlainText())),
|
||
"cgroup": set(self._lines_from_text(self.ed_vpn_cgroups.toPlainText())),
|
||
},
|
||
"direct": {
|
||
"subnet": set(self._lines_from_text(self.ed_direct_subnets.toPlainText())),
|
||
"uid": set(self._lines_from_text(self.ed_direct_uids.toPlainText())),
|
||
"cgroup": set(self._lines_from_text(self.ed_direct_cgroups.toPlainText())),
|
||
},
|
||
}
|
||
dlg = TrafficCandidatesDialog(
|
||
cands,
|
||
existing=existing,
|
||
add_cb=self._candidates_add,
|
||
parent=self,
|
||
)
|
||
dlg.exec()
|
||
|
||
self._safe(work, title="Traffic candidates error")
|
||
|
||
|
||
def _set_mode_state(
|
||
self,
|
||
desired_mode: str,
|
||
applied_mode: str,
|
||
preferred_iface: str,
|
||
auto_local_bypass: bool,
|
||
bypass_candidates: int,
|
||
overrides_applied: int,
|
||
cgroup_resolved_uids: int,
|
||
cgroup_warning: str,
|
||
healthy: bool,
|
||
probe_ok: bool,
|
||
probe_message: str,
|
||
active_iface: str,
|
||
iface_reason: str,
|
||
message: str,
|
||
) -> None:
|
||
desired = (desired_mode or "").strip().lower() or "selective"
|
||
applied = (applied_mode or "").strip().lower() or "direct"
|
||
|
||
if healthy:
|
||
color = "green"
|
||
health_txt = "OK"
|
||
else:
|
||
color = "red"
|
||
health_txt = "MISMATCH"
|
||
|
||
text = f"Traffic mode: {desired} (applied: {applied}) [{health_txt}]"
|
||
diag_parts = []
|
||
diag_parts.append(f"preferred={preferred_iface or 'auto'}")
|
||
diag_parts.append(
|
||
f"auto_local_bypass={'on' if auto_local_bypass else 'off'}"
|
||
)
|
||
if bypass_candidates > 0:
|
||
diag_parts.append(f"bypass_routes={bypass_candidates}")
|
||
diag_parts.append(f"overrides={overrides_applied}")
|
||
if cgroup_resolved_uids > 0:
|
||
diag_parts.append(f"cgroup_uids={cgroup_resolved_uids}")
|
||
if cgroup_warning:
|
||
diag_parts.append(f"cgroup_warning={cgroup_warning}")
|
||
if active_iface:
|
||
diag_parts.append(f"iface={active_iface}")
|
||
if iface_reason:
|
||
diag_parts.append(f"source={iface_reason}")
|
||
diag_parts.append(f"probe={'ok' if probe_ok else 'fail'}")
|
||
if probe_message:
|
||
diag_parts.append(probe_message)
|
||
if message:
|
||
diag_parts.append(message)
|
||
diag = " | ".join(diag_parts) if diag_parts else "—"
|
||
|
||
self.lbl_state.setText(text)
|
||
self.lbl_state.setStyleSheet(f"color: {color};")
|
||
self.lbl_diag.setText(diag)
|
||
self.lbl_diag.setStyleSheet("color: gray;")
|
||
|
||
def refresh_state(self) -> None:
|
||
def work() -> None:
|
||
view = self.ctrl.traffic_mode_view()
|
||
mode = (view.desired_mode or "selective").strip().lower()
|
||
|
||
self.rad_selective.blockSignals(True)
|
||
self.rad_full.blockSignals(True)
|
||
self.rad_direct.blockSignals(True)
|
||
self.rad_selective.setChecked(mode == "selective")
|
||
self.rad_full.setChecked(mode == "full_tunnel")
|
||
self.rad_direct.setChecked(mode == "direct")
|
||
self.rad_selective.blockSignals(False)
|
||
self.rad_full.blockSignals(False)
|
||
self.rad_direct.blockSignals(False)
|
||
|
||
opts = self.ctrl.traffic_interfaces()
|
||
self._set_preferred_iface_options(opts, view.preferred_iface)
|
||
self.chk_auto_local.blockSignals(True)
|
||
self.chk_auto_local.setChecked(bool(view.auto_local_bypass))
|
||
self.chk_auto_local.blockSignals(False)
|
||
self._set_lines(self.ed_vpn_subnets, list(view.force_vpn_subnets or []))
|
||
self._set_lines(self.ed_vpn_uids, list(view.force_vpn_uids or []))
|
||
self._set_lines(self.ed_vpn_cgroups, list(view.force_vpn_cgroups or []))
|
||
self._set_lines(self.ed_direct_subnets, list(view.force_direct_subnets or []))
|
||
self._set_lines(self.ed_direct_uids, list(view.force_direct_uids or []))
|
||
self._set_lines(self.ed_direct_cgroups, list(view.force_direct_cgroups or []))
|
||
|
||
self._set_mode_state(
|
||
view.desired_mode,
|
||
view.applied_mode,
|
||
view.preferred_iface,
|
||
bool(view.auto_local_bypass),
|
||
int(view.bypass_candidates),
|
||
int(view.overrides_applied),
|
||
int(view.cgroup_resolved_uids),
|
||
view.cgroup_warning,
|
||
bool(view.healthy),
|
||
bool(view.probe_ok),
|
||
view.probe_message,
|
||
view.active_iface,
|
||
view.iface_reason,
|
||
view.message,
|
||
)
|
||
|
||
self._safe(work)
|
||
|
||
def on_mode_toggle(self, mode: str, checked: bool) -> None:
|
||
if not checked:
|
||
return
|
||
|
||
def work() -> None:
|
||
preferred = self._preferred_iface_value()
|
||
auto_local = self.chk_auto_local.isChecked()
|
||
view = self.ctrl.traffic_mode_set(mode, preferred, auto_local)
|
||
msg = (
|
||
f"Traffic mode set: desired={view.desired_mode}, "
|
||
f"applied={view.applied_mode}, iface={view.active_iface or '-'}, "
|
||
f"preferred={preferred or 'auto'}, probe_ok={view.probe_ok}, "
|
||
f"healthy={view.healthy}, auto_local_bypass={view.auto_local_bypass}, "
|
||
f"bypass_routes={view.bypass_candidates}, overrides={view.overrides_applied}, "
|
||
f"cgroup_uids={view.cgroup_resolved_uids}, message={view.message}"
|
||
)
|
||
self._emit_log(msg)
|
||
op_ok = bool(view.healthy) and not self._is_operation_error(view.message)
|
||
self._set_action_status(
|
||
f"Traffic mode set: desired={view.desired_mode} applied={view.applied_mode} message={view.message}",
|
||
ok=op_ok,
|
||
)
|
||
self.refresh_state()
|
||
if self.refresh_cb:
|
||
self.refresh_cb()
|
||
|
||
self._safe(work)
|
||
|
||
def on_refresh_ifaces(self) -> None:
|
||
def work() -> None:
|
||
view = self.ctrl.traffic_mode_view()
|
||
opts = self.ctrl.traffic_interfaces()
|
||
self._set_preferred_iface_options(opts, view.preferred_iface)
|
||
self._emit_log(
|
||
"Traffic ifaces refreshed: "
|
||
f"preferred={view.preferred_iface or 'auto'} "
|
||
f"active={view.active_iface or '-'}"
|
||
)
|
||
self._set_action_status("Traffic ifaces refreshed", ok=True)
|
||
self.refresh_state()
|
||
if self.refresh_cb:
|
||
self.refresh_cb()
|
||
|
||
self._safe(work, title="Traffic iface detect error")
|
||
|
||
def _selected_mode(self) -> str:
|
||
if self.rad_full.isChecked():
|
||
return "full_tunnel"
|
||
if self.rad_direct.isChecked():
|
||
return "direct"
|
||
return "selective"
|
||
|
||
def on_auto_local_toggle(self) -> None:
|
||
def work() -> None:
|
||
mode = self._selected_mode()
|
||
preferred = self._preferred_iface_value()
|
||
auto_local = self.chk_auto_local.isChecked()
|
||
view = self.ctrl.traffic_mode_set(mode, preferred, auto_local)
|
||
msg = (
|
||
f"Traffic auto-local set: mode={view.desired_mode}, "
|
||
f"auto_local_bypass={view.auto_local_bypass}, "
|
||
f"bypass_routes={view.bypass_candidates}, overrides={view.overrides_applied}, "
|
||
f"cgroup_uids={view.cgroup_resolved_uids}, message={view.message}"
|
||
)
|
||
self._emit_log(msg)
|
||
op_ok = bool(view.healthy) and not self._is_operation_error(view.message)
|
||
self._set_action_status(
|
||
f"Auto-local bypass set: {'on' if view.auto_local_bypass else 'off'} ({view.message})",
|
||
ok=op_ok,
|
||
)
|
||
self.refresh_state()
|
||
if self.refresh_cb:
|
||
self.refresh_cb()
|
||
|
||
self._safe(work, title="Auto-local bypass error")
|
||
|
||
def on_apply_overrides(self) -> None:
|
||
def work() -> None:
|
||
mode = self._selected_mode()
|
||
preferred = self._preferred_iface_value()
|
||
auto_local = self.chk_auto_local.isChecked()
|
||
vpn_subnets = self._lines_from_text(self.ed_vpn_subnets.toPlainText())
|
||
vpn_uids = self._lines_from_text(self.ed_vpn_uids.toPlainText())
|
||
vpn_cgroups = self._lines_from_text(self.ed_vpn_cgroups.toPlainText())
|
||
direct_subnets = self._lines_from_text(self.ed_direct_subnets.toPlainText())
|
||
direct_uids = self._lines_from_text(self.ed_direct_uids.toPlainText())
|
||
direct_cgroups = self._lines_from_text(self.ed_direct_cgroups.toPlainText())
|
||
|
||
view = self.ctrl.traffic_mode_set(
|
||
mode,
|
||
preferred,
|
||
auto_local,
|
||
vpn_subnets,
|
||
vpn_uids,
|
||
vpn_cgroups,
|
||
direct_subnets,
|
||
direct_uids,
|
||
direct_cgroups,
|
||
)
|
||
msg = (
|
||
f"Traffic overrides applied: mode={view.desired_mode}, "
|
||
f"vpn_subnets={len(view.force_vpn_subnets)}, vpn_uids={len(view.force_vpn_uids)}, vpn_cgroups={len(view.force_vpn_cgroups)}, "
|
||
f"direct_subnets={len(view.force_direct_subnets)}, direct_uids={len(view.force_direct_uids)}, direct_cgroups={len(view.force_direct_cgroups)}, "
|
||
f"overrides={view.overrides_applied}, cgroup_uids={view.cgroup_resolved_uids}, "
|
||
f"healthy={view.healthy}, message={view.message}"
|
||
)
|
||
self._emit_log(msg)
|
||
op_ok = bool(view.healthy) and not self._is_operation_error(view.message)
|
||
self._set_action_status(
|
||
f"Overrides applied: overrides={view.overrides_applied} message={view.message}",
|
||
ok=op_ok,
|
||
)
|
||
self.refresh_state()
|
||
if self.refresh_cb:
|
||
self.refresh_cb()
|
||
|
||
self._safe(work, title="Apply overrides error")
|
||
|
||
# -----------------------------------------------------------------
|
||
# Apps (runtime) tab
|
||
# -----------------------------------------------------------------
|
||
|
||
def _append_app_log(self, msg: str) -> None:
|
||
line = (msg or "").rstrip()
|
||
if not line:
|
||
return
|
||
try:
|
||
self.txt_app.appendPlainText(line)
|
||
except Exception:
|
||
pass
|
||
self._emit_log(line)
|
||
|
||
def refresh_appmarks_counts(self) -> None:
|
||
try:
|
||
st = self.ctrl.traffic_appmarks_status()
|
||
self.lbl_app_counts.setText(
|
||
f"Marks: VPN={st.vpn_count}, Direct={st.direct_count}"
|
||
)
|
||
except Exception as e:
|
||
self.lbl_app_counts.setText(f"Marks: error: {e}")
|
||
|
||
def _refresh_last_scope_ui(self) -> None:
|
||
unit = (self._last_app_unit or "").strip()
|
||
target = (self._last_app_target or "").strip().lower()
|
||
cg_id = int(self._last_app_cgroup_id or 0)
|
||
|
||
parts = []
|
||
if unit:
|
||
parts.append(f"unit={unit}")
|
||
if target in ("vpn", "direct"):
|
||
parts.append(f"target={target}")
|
||
if cg_id > 0:
|
||
parts.append(f"cgroup_id={cg_id}")
|
||
|
||
if parts:
|
||
self.lbl_app_last.setText("Last scope: " + " | ".join(parts))
|
||
else:
|
||
self.lbl_app_last.setText("Last scope: —")
|
||
|
||
self.btn_app_stop_last.setEnabled(bool(unit))
|
||
self.btn_app_unmark_last.setEnabled(bool(unit) and target in ("vpn", "direct") and cg_id > 0)
|
||
|
||
def _set_last_scope(self, *, unit: str = "", target: str = "", cgroup_id: int = 0) -> None:
|
||
self._last_app_unit = str(unit or "").strip()
|
||
self._last_app_target = str(target or "").strip().lower()
|
||
try:
|
||
self._last_app_cgroup_id = int(cgroup_id or 0)
|
||
except Exception:
|
||
self._last_app_cgroup_id = 0
|
||
|
||
self._settings.setValue("traffic_app_last_unit", self._last_app_unit)
|
||
self._settings.setValue("traffic_app_last_target", self._last_app_target)
|
||
self._settings.setValue("traffic_app_last_cgroup_id", int(self._last_app_cgroup_id or 0))
|
||
self._refresh_last_scope_ui()
|
||
|
||
def on_app_pick(self) -> None:
|
||
def work() -> None:
|
||
dlg = AppPickerDialog(parent=self)
|
||
if dlg.exec() != QDialog.Accepted:
|
||
return
|
||
cmd = (dlg.selected_command() or "").strip()
|
||
if not cmd:
|
||
return
|
||
self.ed_app_cmd.setText(cmd)
|
||
self._append_app_log(f"[picker] command filled: {cmd}")
|
||
|
||
self._safe(work, title="App picker error")
|
||
|
||
def _run_systemd_unit(self, cmdline: str, *, unit: str) -> tuple[str, str]:
|
||
args = shlex.split(cmdline or "")
|
||
if not args:
|
||
raise ValueError("empty command")
|
||
|
||
run_cmd = [
|
||
"systemd-run",
|
||
"--user",
|
||
"--unit",
|
||
unit,
|
||
"--collect",
|
||
"--same-dir",
|
||
] + args
|
||
|
||
try:
|
||
p = subprocess.run(
|
||
run_cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
check=False,
|
||
timeout=6,
|
||
)
|
||
except subprocess.TimeoutExpired as e:
|
||
raise RuntimeError(
|
||
"systemd-run timed out (UI would freeze without this guard). "
|
||
"Try again or verify that systemd --user is responsive.\n\n"
|
||
f"command: {' '.join(run_cmd)}"
|
||
) from e
|
||
out = ((p.stdout or "") + (p.stderr or "")).strip()
|
||
if p.returncode != 0:
|
||
raise RuntimeError(f"systemd-run failed: {p.returncode}\n{out}".strip())
|
||
|
||
# EN: Some apps (e.g. Chrome wrappers) can return quickly; the transient scope
|
||
# EN: may appear/disappear fast. Retry briefly to avoid race.
|
||
# RU: Некоторые приложения (например, chrome-wrapper) быстро завершаются; scope
|
||
# RU: может появиться/исчезнуть очень быстро. Делаем небольшой retry.
|
||
cg = self._control_group_for_unit_retry(unit, timeout_sec=3.0)
|
||
|
||
return cg, out
|
||
|
||
def _control_group_for_unit_retry(self, unit: str, *, timeout_sec: float = 2.0) -> str:
|
||
u = (unit or "").strip()
|
||
if not u:
|
||
raise ValueError("empty unit")
|
||
|
||
deadline = time.time() + max(0.2, float(timeout_sec or 0))
|
||
last_out = ""
|
||
while time.time() < deadline:
|
||
code, out = self._systemctl_user(["show", "-p", "ControlGroup", "--value", u])
|
||
last_out = out or ""
|
||
if code == 0:
|
||
cg = (out or "").strip()
|
||
if cg:
|
||
return cg
|
||
low = (out or "").lower()
|
||
if "could not be found" in low or "not found" in low or "loaded units listed" in low:
|
||
break
|
||
time.sleep(0.1)
|
||
|
||
# Provide a more actionable error for users.
|
||
code_s, out_s = self._systemctl_user(["status", u, "--no-pager", "--plain"])
|
||
status_txt = (out_s or "").strip()
|
||
hint = (
|
||
"EN: Scope unit may have exited immediately. If the app is already running, "
|
||
"this launch may not create a new process tree inside the scope.\n"
|
||
"EN: Try closing the app полностью and запуск again from here.\n"
|
||
"RU: Scope мог завершиться сразу. Если приложение уже запущено, повторный запуск "
|
||
"может не создать новый процесс внутри scope.\n"
|
||
"RU: Попробуй полностью закрыть приложение и запустить снова отсюда."
|
||
)
|
||
raise RuntimeError(
|
||
(
|
||
"failed to query ControlGroup\n"
|
||
+ (last_out.strip() or "(no output)")
|
||
+ ("\n\nstatus:\n" + status_txt if status_txt else "")
|
||
+ "\n\n"
|
||
+ hint
|
||
).strip()
|
||
)
|
||
|
||
def on_app_run(self) -> None:
|
||
def work() -> None:
|
||
cmdline = (self.ed_app_cmd.text() or "").strip()
|
||
if not cmdline:
|
||
QMessageBox.warning(self, "Missing command", "Please enter a command to run.")
|
||
return
|
||
|
||
target = "vpn" if self.rad_app_vpn.isChecked() else "direct"
|
||
ttl_sec = int(self.spn_app_ttl.value()) * 3600
|
||
unit = f"svpn-{target}-{int(time.time())}.service"
|
||
|
||
self._append_app_log(
|
||
f"[app] launching: target={target} ttl={ttl_sec}s unit={unit}"
|
||
)
|
||
|
||
cg, out = self._run_systemd_unit(cmdline, unit=unit)
|
||
if out:
|
||
self._append_app_log(f"[app] systemd-run:\n{out}")
|
||
self._append_app_log(f"[app] ControlGroup: {cg}")
|
||
self._set_last_scope(unit=unit, target=target, cgroup_id=0)
|
||
|
||
res = self.ctrl.traffic_appmarks_apply(
|
||
op="add",
|
||
target=target,
|
||
cgroup=cg,
|
||
timeout_sec=ttl_sec,
|
||
)
|
||
if res.ok:
|
||
self._append_app_log(
|
||
f"[appmarks] OK: {res.message} cgroup_id={res.cgroup_id} timeout={res.timeout_sec}s"
|
||
)
|
||
self._set_action_status(
|
||
f"App mark added: target={target} cgroup_id={res.cgroup_id}",
|
||
ok=True,
|
||
)
|
||
self._set_last_scope(unit=unit, target=target, cgroup_id=int(res.cgroup_id or 0))
|
||
else:
|
||
self._append_app_log(f"[appmarks] ERROR: {res.message}")
|
||
self._set_action_status(
|
||
f"App mark failed: target={target} ({res.message})",
|
||
ok=False,
|
||
)
|
||
QMessageBox.critical(
|
||
self,
|
||
"App mark error",
|
||
res.message or "unknown error",
|
||
)
|
||
|
||
self.refresh_appmarks_counts()
|
||
|
||
self._safe(work, title="Run app error")
|
||
|
||
def on_app_stop_last_scope(self) -> None:
|
||
unit = (self._last_app_unit or "").strip()
|
||
if not unit:
|
||
return
|
||
|
||
def work() -> None:
|
||
self._append_app_log(f"[app] stop last scope: unit={unit}")
|
||
p = subprocess.run(
|
||
["systemctl", "--user", "stop", unit],
|
||
capture_output=True,
|
||
text=True,
|
||
check=False,
|
||
)
|
||
out = ((p.stdout or "") + (p.stderr or "")).strip()
|
||
if p.returncode == 0:
|
||
self._append_app_log("[app] stopped OK")
|
||
if out:
|
||
self._append_app_log(out)
|
||
self._set_action_status(f"Stopped scope: {unit}", ok=True)
|
||
return
|
||
|
||
self._append_app_log(f"[app] stop failed: rc={p.returncode}")
|
||
if out:
|
||
self._append_app_log(out)
|
||
# fallback: kill
|
||
p2 = subprocess.run(
|
||
["systemctl", "--user", "kill", unit],
|
||
capture_output=True,
|
||
text=True,
|
||
check=False,
|
||
)
|
||
out2 = ((p2.stdout or "") + (p2.stderr or "")).strip()
|
||
if p2.returncode == 0:
|
||
self._append_app_log("[app] kill OK (fallback)")
|
||
if out2:
|
||
self._append_app_log(out2)
|
||
self._set_action_status(f"Killed scope: {unit}", ok=True)
|
||
return
|
||
|
||
self._append_app_log(f"[app] kill failed: rc={p2.returncode}")
|
||
if out2:
|
||
self._append_app_log(out2)
|
||
self._set_action_status(f"Stop scope failed: {unit}", ok=False)
|
||
QMessageBox.critical(self, "Stop scope error", out2 or out or "stop failed")
|
||
|
||
self._safe(work, title="Stop scope error")
|
||
|
||
def on_app_unmark_last(self) -> None:
|
||
unit = (self._last_app_unit or "").strip()
|
||
target = (self._last_app_target or "").strip().lower()
|
||
cg_id = int(self._last_app_cgroup_id or 0)
|
||
if not unit or target not in ("vpn", "direct") or cg_id <= 0:
|
||
return
|
||
|
||
def work() -> None:
|
||
if QMessageBox.question(
|
||
self,
|
||
"Unmark last",
|
||
f"Remove routing mark for last scope?\n\nunit={unit}\ntarget={target}\ncgroup_id={cg_id}",
|
||
) != QMessageBox.StandardButton.Yes:
|
||
return
|
||
|
||
res = self.ctrl.traffic_appmarks_apply(
|
||
op="del",
|
||
target=target,
|
||
cgroup=str(cg_id),
|
||
)
|
||
if res.ok:
|
||
self._append_app_log(f"[appmarks] unmarked: target={target} cgroup_id={cg_id}")
|
||
self._set_action_status(f"Unmarked last: target={target} cgroup_id={cg_id}", ok=True)
|
||
else:
|
||
self._append_app_log(f"[appmarks] unmark error: {res.message}")
|
||
self._set_action_status(f"Unmark last failed: {res.message}", ok=False)
|
||
QMessageBox.critical(self, "Unmark error", res.message or "unmark failed")
|
||
|
||
self.refresh_appmarks_counts()
|
||
|
||
self._safe(work, title="Unmark error")
|
||
|
||
def on_appmarks_clear(self, target: str) -> None:
|
||
tgt = (target or "").strip().lower()
|
||
if tgt not in ("vpn", "direct"):
|
||
return
|
||
|
||
def work() -> None:
|
||
if QMessageBox.question(
|
||
self,
|
||
"Clear marks",
|
||
f"Clear ALL runtime app marks for target={tgt}?",
|
||
) != QMessageBox.StandardButton.Yes:
|
||
return
|
||
|
||
res = self.ctrl.traffic_appmarks_apply(op="clear", target=tgt)
|
||
if res.ok:
|
||
self._append_app_log(f"[appmarks] cleared: target={tgt}")
|
||
self._set_action_status(f"App marks cleared: target={tgt}", ok=True)
|
||
else:
|
||
self._append_app_log(f"[appmarks] clear error: {res.message}")
|
||
self._set_action_status(
|
||
f"App marks clear failed: target={tgt} ({res.message})",
|
||
ok=False,
|
||
)
|
||
QMessageBox.critical(
|
||
self, "Clear marks error", res.message or "unknown error"
|
||
)
|
||
|
||
self.refresh_appmarks_counts()
|
||
|
||
self._safe(work, title="Clear app marks error")
|
||
|
||
# -----------------------------------------------------------------
|
||
# Scopes list + cleanup (runtime)
|
||
# -----------------------------------------------------------------
|
||
|
||
def _scope_target_from_unit(self, unit: str) -> str:
|
||
u = (unit or "").strip()
|
||
if not u.startswith("svpn-"):
|
||
return ""
|
||
rest = u[len("svpn-") :]
|
||
if rest.startswith("vpn-"):
|
||
return "vpn"
|
||
if rest.startswith("direct-"):
|
||
return "direct"
|
||
return ""
|
||
|
||
def _systemctl_user(self, args: list[str]) -> tuple[int, str]:
|
||
cmd = ["systemctl", "--user"] + list(args or [])
|
||
try:
|
||
p = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
check=False,
|
||
timeout=4,
|
||
)
|
||
out = ((p.stdout or "") + (p.stderr or "")).strip()
|
||
return int(p.returncode or 0), out
|
||
except subprocess.TimeoutExpired:
|
||
return 124, f"timeout running: {' '.join(cmd)}"
|
||
|
||
def _list_running_svpn_units(self) -> list[str]:
|
||
code, out = self._systemctl_user(
|
||
[
|
||
"list-units",
|
||
"--type=scope",
|
||
"--type=service",
|
||
"--state=running",
|
||
"--no-legend",
|
||
"--no-pager",
|
||
"--plain",
|
||
]
|
||
)
|
||
if code != 0:
|
||
raise RuntimeError(out or f"systemctl list-units failed: rc={code}")
|
||
|
||
units: list[str] = []
|
||
for raw in (out or "").splitlines():
|
||
line = raw.strip()
|
||
if not line:
|
||
continue
|
||
fields = line.split()
|
||
if not fields:
|
||
continue
|
||
unit = fields[0].strip()
|
||
if unit.startswith("svpn-") and (unit.endswith(".scope") or unit.endswith(".service")):
|
||
units.append(unit)
|
||
units.sort()
|
||
return units
|
||
|
||
def _control_group_for_unit(self, unit: str) -> str:
|
||
code, out = self._systemctl_user(["show", "-p", "ControlGroup", "--value", unit])
|
||
if code != 0:
|
||
raise RuntimeError(out or f"systemctl show failed: rc={code}")
|
||
return (out or "").strip()
|
||
|
||
def _cgroup_inode_id(self, cgroup_path: str) -> int:
|
||
cg = (cgroup_path or "").strip()
|
||
if not cg:
|
||
return 0
|
||
rel = cg.lstrip("/")
|
||
if not rel:
|
||
return 0
|
||
full = os.path.join("/sys/fs/cgroup", rel)
|
||
try:
|
||
st = os.stat(full)
|
||
return int(getattr(st, "st_ino", 0) or 0)
|
||
except Exception:
|
||
return 0
|
||
|
||
def refresh_running_scopes(self) -> None:
|
||
def work() -> None:
|
||
units = self._list_running_svpn_units()
|
||
self.lst_scopes.clear()
|
||
|
||
for unit in units:
|
||
target = self._scope_target_from_unit(unit)
|
||
cg = ""
|
||
cg_id = 0
|
||
try:
|
||
cg = self._control_group_for_unit(unit)
|
||
cg_id = self._cgroup_inode_id(cg)
|
||
except Exception:
|
||
pass
|
||
|
||
label = unit
|
||
if target in ("vpn", "direct"):
|
||
label += f" [{target.upper()}]"
|
||
|
||
it = QListWidgetItem(label)
|
||
info = RuntimeScopeInfo(
|
||
unit=unit,
|
||
target=target or "?",
|
||
cgroup_path=cg,
|
||
cgroup_id=int(cg_id or 0),
|
||
)
|
||
it.setData(QtCore.Qt.UserRole, info)
|
||
it.setToolTip(
|
||
f"Unit: {unit}\n"
|
||
f"Target: {target or '-'}\n"
|
||
f"ControlGroup: {cg or '-'}\n"
|
||
f"cgroup_id: {cg_id or '-'}\n\n"
|
||
"EN: Stop selected will unmark by cgroup_id (if known) and stop the unit.\n"
|
||
"RU: Stop selected удалит метку по cgroup_id (если известен) и остановит unit."
|
||
)
|
||
self.lst_scopes.addItem(it)
|
||
|
||
self.lbl_scopes.setText(f"Running units: {len(units)}")
|
||
has_any = self.lst_scopes.count() > 0
|
||
self.btn_scopes_stop_selected.setEnabled(has_any)
|
||
self.btn_scopes_cleanup.setEnabled(has_any)
|
||
|
||
self._safe(work, title="Units refresh error")
|
||
|
||
def _copy_scope_unit(self, it: QListWidgetItem) -> None:
|
||
unit = ""
|
||
info = it.data(QtCore.Qt.UserRole) if it else None
|
||
if isinstance(info, RuntimeScopeInfo):
|
||
unit = info.unit
|
||
else:
|
||
unit = (it.text() or "").split()[0].strip() if it else ""
|
||
if not unit:
|
||
return
|
||
try:
|
||
QtGui.QGuiApplication.clipboard().setText(unit)
|
||
except Exception:
|
||
pass
|
||
self._append_app_log(f"[unit] copied unit: {unit}")
|
||
self._set_action_status(f"Copied unit: {unit}", ok=True)
|
||
|
||
def _stop_scope_unit(self, unit: str) -> None:
|
||
u = (unit or "").strip()
|
||
if not u:
|
||
return
|
||
code, out = self._systemctl_user(["stop", u])
|
||
if code == 0:
|
||
if out:
|
||
self._append_app_log(out)
|
||
return
|
||
code2, out2 = self._systemctl_user(["kill", u])
|
||
if code2 != 0:
|
||
raise RuntimeError(out2 or out or f"stop/kill failed: {u}")
|
||
|
||
def _unmark_scope(self, info: RuntimeScopeInfo) -> None:
|
||
target = (info.target or "").strip().lower()
|
||
cg_id = int(info.cgroup_id or 0)
|
||
if target not in ("vpn", "direct") or cg_id <= 0:
|
||
return
|
||
res = self.ctrl.traffic_appmarks_apply(
|
||
op="del",
|
||
target=target,
|
||
cgroup=str(cg_id),
|
||
)
|
||
if not res.ok:
|
||
raise RuntimeError(res.message or "unmark failed")
|
||
|
||
def _selected_scope_infos(self) -> list[RuntimeScopeInfo]:
|
||
infos: list[RuntimeScopeInfo] = []
|
||
for it in self.lst_scopes.selectedItems() or []:
|
||
info = it.data(QtCore.Qt.UserRole) if it else None
|
||
if isinstance(info, RuntimeScopeInfo):
|
||
infos.append(info)
|
||
return infos
|
||
|
||
def on_scopes_stop_selected(self) -> None:
|
||
def work() -> None:
|
||
infos = self._selected_scope_infos()
|
||
if not infos:
|
||
QMessageBox.information(self, "No selection", "Select one or more units first.")
|
||
return
|
||
|
||
if QMessageBox.question(
|
||
self,
|
||
"Stop selected",
|
||
f"Unmark + stop {len(infos)} selected unit(s)?",
|
||
) != QMessageBox.StandardButton.Yes:
|
||
return
|
||
|
||
for info in infos:
|
||
self._append_app_log(
|
||
f"[unit] stop: unit={info.unit} target={info.target} cgroup_id={info.cgroup_id}"
|
||
)
|
||
try:
|
||
self._unmark_scope(info)
|
||
self._append_app_log("[unit] unmark OK")
|
||
except Exception as e:
|
||
self._append_app_log(f"[unit] unmark WARN: {e}")
|
||
|
||
self._stop_scope_unit(info.unit)
|
||
self._append_app_log("[unit] stop OK")
|
||
|
||
self.refresh_running_scopes()
|
||
self.refresh_appmarks_counts()
|
||
self._set_action_status(f"Stopped units: {len(infos)}", ok=True)
|
||
|
||
self._safe(work, title="Stop selected units error")
|
||
|
||
def on_scopes_cleanup_all(self) -> None:
|
||
def work() -> None:
|
||
units = self._list_running_svpn_units()
|
||
if not units:
|
||
self._set_action_status("No running svpn units", ok=True)
|
||
self.refresh_running_scopes()
|
||
return
|
||
|
||
if QMessageBox.question(
|
||
self,
|
||
"Cleanup all",
|
||
f"Unmark + stop ALL running svpn units ({len(units)})?",
|
||
) != QMessageBox.StandardButton.Yes:
|
||
return
|
||
|
||
stopped = 0
|
||
for unit in units:
|
||
target = self._scope_target_from_unit(unit)
|
||
cg = ""
|
||
cg_id = 0
|
||
try:
|
||
cg = self._control_group_for_unit(unit)
|
||
cg_id = self._cgroup_inode_id(cg)
|
||
except Exception:
|
||
pass
|
||
info = RuntimeScopeInfo(
|
||
unit=unit,
|
||
target=target or "?",
|
||
cgroup_path=cg,
|
||
cgroup_id=int(cg_id or 0),
|
||
)
|
||
self._append_app_log(
|
||
f"[unit] cleanup: unit={info.unit} target={info.target} cgroup_id={info.cgroup_id}"
|
||
)
|
||
try:
|
||
self._unmark_scope(info)
|
||
self._append_app_log("[unit] unmark OK")
|
||
except Exception as e:
|
||
self._append_app_log(f"[unit] unmark WARN: {e}")
|
||
try:
|
||
self._stop_scope_unit(info.unit)
|
||
stopped += 1
|
||
except Exception as e:
|
||
self._append_app_log(f"[unit] stop ERROR: {e}")
|
||
|
||
self.refresh_running_scopes()
|
||
self.refresh_appmarks_counts()
|
||
self._set_action_status(f"Cleanup done: stopped={stopped}/{len(units)}", ok=True)
|
||
|
||
self._safe(work, title="Cleanup units error")
|
||
|
||
def on_rollback(self) -> None:
|
||
def work() -> None:
|
||
res = self.ctrl.routes_clear()
|
||
self._emit_log(res.pretty_text or "rollback done")
|
||
self._set_action_status(res.pretty_text or "routes cleared (cache saved)", ok=bool(res.ok))
|
||
self.refresh_state()
|
||
if self.refresh_cb:
|
||
self.refresh_cb()
|
||
|
||
self._safe(work, title="Rollback error")
|
||
|
||
def on_restore_cache(self) -> None:
|
||
def work() -> None:
|
||
res = self.ctrl.routes_cache_restore()
|
||
self._emit_log(res.pretty_text or "cache restore done")
|
||
self._set_action_status(res.pretty_text or "routes restored from cache", ok=bool(res.ok))
|
||
self.refresh_state()
|
||
if self.refresh_cb:
|
||
self.refresh_cb()
|
||
|
||
self._safe(work, title="Restore cache error")
|
||
|
||
|
||
class TrafficCandidatesDialog(QDialog):
|
||
def __init__(
|
||
self,
|
||
candidates,
|
||
*,
|
||
existing: dict[str, dict[str, set[str]]] | None = None,
|
||
add_cb: Callable[[str, str, list[str]], None],
|
||
parent=None,
|
||
) -> None:
|
||
super().__init__(parent)
|
||
self.cands = candidates
|
||
self.add_cb = add_cb
|
||
self.existing = existing or {"vpn": {}, "direct": {}}
|
||
|
||
self.setWindowTitle("Add detected overrides")
|
||
self.resize(820, 680)
|
||
|
||
root = QVBoxLayout(self)
|
||
|
||
note = QLabel(
|
||
"Tip: hover list items for details. Подсказка: наведи на элементы списка.\n"
|
||
"Detect results from backend. Nothing is applied until you click Apply overrides."
|
||
)
|
||
note.setWordWrap(True)
|
||
note.setStyleSheet("color: gray;")
|
||
root.addWidget(note)
|
||
|
||
self.chk_hide_existing = QCheckBox("Hide already added")
|
||
self.chk_hide_existing.setToolTip(
|
||
"""EN: Hides items that are already present in Force VPN/Force Direct fields.
|
||
RU: Скрывает элементы, которые уже есть в Force VPN/Force Direct."""
|
||
)
|
||
self.chk_hide_existing.stateChanged.connect(lambda _s: self._refilter_current())
|
||
root.addWidget(self.chk_hide_existing)
|
||
|
||
self.tabs = QTabWidget()
|
||
root.addWidget(self.tabs, stretch=1)
|
||
|
||
self._tab_kind: dict[QWidget, str] = {}
|
||
self._tab_list: dict[QWidget, QListWidget] = {}
|
||
self._tab_filter: dict[QWidget, QLineEdit] = {}
|
||
self._list_kind: dict[QListWidget, str] = {}
|
||
self._list_title: dict[QListWidget, str] = {}
|
||
self.tabs.currentChanged.connect(lambda _idx: self._refilter_current())
|
||
|
||
self._build_subnets_tab()
|
||
self._build_services_tab()
|
||
self._build_uids_tab()
|
||
|
||
row = QHBoxLayout()
|
||
btn_vpn = QPushButton("Add to Force VPN")
|
||
btn_vpn.clicked.connect(lambda: self._add_selected("vpn"))
|
||
row.addWidget(btn_vpn)
|
||
|
||
btn_direct = QPushButton("Add to Force Direct")
|
||
btn_direct.clicked.connect(lambda: self._add_selected("direct"))
|
||
row.addWidget(btn_direct)
|
||
|
||
row.addStretch(1)
|
||
btn_close = QPushButton("Close")
|
||
btn_close.clicked.connect(self.accept)
|
||
row.addWidget(btn_close)
|
||
root.addLayout(row)
|
||
|
||
self.lbl_status = QLabel("—")
|
||
self.lbl_status.setWordWrap(True)
|
||
self.lbl_status.setStyleSheet("color: gray;")
|
||
root.addWidget(self.lbl_status)
|
||
|
||
def _mark_state(self, kind: str, value: str) -> tuple[bool, bool]:
|
||
k = (kind or "").strip().lower()
|
||
v = (value or "").strip()
|
||
if not k or not v:
|
||
return False, False
|
||
in_vpn = v in (self.existing.get("vpn", {}).get(k, set()) or set())
|
||
in_direct = v in (self.existing.get("direct", {}).get(k, set()) or set())
|
||
return bool(in_vpn), bool(in_direct)
|
||
|
||
def _set_status(self, msg: str, ok: bool | None = None) -> None:
|
||
text = (msg or "").strip() or "—"
|
||
self.lbl_status.setText(text)
|
||
if ok is True:
|
||
self.lbl_status.setStyleSheet("color: green;")
|
||
elif ok is False:
|
||
self.lbl_status.setStyleSheet("color: red;")
|
||
else:
|
||
self.lbl_status.setStyleSheet("color: gray;")
|
||
|
||
def _apply_filter(self, lst: QListWidget, query: str) -> None:
|
||
q = (query or "").strip().lower()
|
||
hide_existing = bool(self.chk_hide_existing.isChecked())
|
||
kind = (self._list_kind.get(lst) or "").strip().lower()
|
||
|
||
# Subnets-only quick filters.
|
||
allow_lan = True
|
||
allow_docker = True
|
||
allow_link = True
|
||
allow_linkdown = True
|
||
if kind == "subnet":
|
||
allow_lan = bool(getattr(self, "chk_sub_show_lan", None) and self.chk_sub_show_lan.isChecked())
|
||
allow_docker = bool(getattr(self, "chk_sub_show_docker", None) and self.chk_sub_show_docker.isChecked())
|
||
allow_link = bool(getattr(self, "chk_sub_show_link", None) and self.chk_sub_show_link.isChecked())
|
||
allow_linkdown = not bool(getattr(self, "chk_sub_hide_linkdown", None) and self.chk_sub_hide_linkdown.isChecked())
|
||
|
||
for i in range(lst.count()):
|
||
it = lst.item(i)
|
||
if not it:
|
||
continue
|
||
if hide_existing and bool(it.data(QtCore.Qt.UserRole + 1) or False):
|
||
it.setHidden(True)
|
||
continue
|
||
|
||
if kind == "subnet":
|
||
it_kind = str(it.data(QtCore.Qt.UserRole + 4) or "").strip().lower()
|
||
it_linkdown = bool(it.data(QtCore.Qt.UserRole + 5) or False)
|
||
if it_linkdown and not allow_linkdown:
|
||
it.setHidden(True)
|
||
continue
|
||
if it_kind == "docker" and not allow_docker:
|
||
it.setHidden(True)
|
||
continue
|
||
if it_kind == "lan" and not allow_lan:
|
||
it.setHidden(True)
|
||
continue
|
||
if it_kind == "link" and not allow_link:
|
||
it.setHidden(True)
|
||
continue
|
||
|
||
if not q:
|
||
it.setHidden(False)
|
||
continue
|
||
it.setHidden(q not in it.text().lower())
|
||
|
||
def _refilter_current(self) -> None:
|
||
tab = self.tabs.currentWidget()
|
||
if tab is None:
|
||
return
|
||
lst = self._tab_list.get(tab)
|
||
filt = self._tab_filter.get(tab)
|
||
if lst is None or filt is None:
|
||
return
|
||
self._apply_filter(lst, filt.text())
|
||
|
||
def _filter_for_title(self, title: str) -> QLineEdit | None:
|
||
for i in range(self.tabs.count()):
|
||
if self.tabs.tabText(i) == title:
|
||
tab = self.tabs.widget(i)
|
||
return self._tab_filter.get(tab)
|
||
return None
|
||
|
||
def _preset_set_filter(self, title: str, text: str) -> None:
|
||
filt = self._filter_for_title(title)
|
||
if filt is not None:
|
||
filt.setText(text)
|
||
|
||
def _preset_filter_subnets(self, *, lan: bool, docker: bool, link: bool) -> None:
|
||
# EN: "Filter LAN/Docker" buttons should not rely on text search; they should
|
||
# EN: toggle the kind checkboxes and clear the text filter.
|
||
# RU: Кнопки "Filter LAN/Docker" не должны полагаться на текстовый поиск;
|
||
# RU: они должны переключать чекбоксы видов и очищать текстовый фильтр.
|
||
chk_lan = getattr(self, "chk_sub_show_lan", None)
|
||
chk_docker = getattr(self, "chk_sub_show_docker", None)
|
||
chk_link = getattr(self, "chk_sub_show_link", None)
|
||
if chk_lan is None or chk_docker is None or chk_link is None:
|
||
self._preset_set_filter("Subnets", "")
|
||
self._refilter_current()
|
||
return
|
||
|
||
for chk, val in (
|
||
(chk_lan, lan),
|
||
(chk_docker, docker),
|
||
(chk_link, link),
|
||
):
|
||
try:
|
||
chk.blockSignals(True)
|
||
chk.setChecked(bool(val))
|
||
finally:
|
||
chk.blockSignals(False)
|
||
|
||
self._preset_set_filter("Subnets", "")
|
||
self._refilter_current()
|
||
|
||
def _update_item_render(self, it: QListWidgetItem, kind: str) -> None:
|
||
value = str(it.data(QtCore.Qt.UserRole) or "").strip()
|
||
base_label = str(it.data(QtCore.Qt.UserRole + 2) or it.text() or "")
|
||
base_tip = str(it.data(QtCore.Qt.UserRole + 3) or it.toolTip() or "")
|
||
in_vpn, in_direct = self._mark_state(kind, value)
|
||
flags = []
|
||
if in_vpn:
|
||
flags.append("VPN")
|
||
if in_direct:
|
||
flags.append("DIRECT")
|
||
label = base_label
|
||
if flags:
|
||
label = f"{base_label} [{' + '.join(flags)}]"
|
||
it.setText(label)
|
||
it.setData(QtCore.Qt.UserRole + 1, bool(in_vpn or in_direct))
|
||
if base_tip.strip():
|
||
extra_tip = (
|
||
f"\n\nAlready in Force VPN: {'yes' if in_vpn else 'no'}\n"
|
||
f"Already in Force Direct: {'yes' if in_direct else 'no'}"
|
||
)
|
||
it.setToolTip(base_tip + extra_tip)
|
||
if in_vpn or in_direct:
|
||
it.setForeground(QtGui.QBrush(QtGui.QColor("gray")))
|
||
|
||
def _add_tab(self, title: str, kind: str, items: list[tuple[str, str, str]], *, extra=None) -> None:
|
||
tab = QWidget()
|
||
layout = QVBoxLayout(tab)
|
||
|
||
if extra is not None:
|
||
extra(layout)
|
||
|
||
filt = QLineEdit()
|
||
filt.setPlaceholderText("Filter...")
|
||
layout.addWidget(filt)
|
||
|
||
lst = QListWidget()
|
||
lst.setSelectionMode(QAbstractItemView.ExtendedSelection)
|
||
for entry in items:
|
||
label = str(entry[0]) if len(entry) > 0 else ""
|
||
value = str(entry[1]) if len(entry) > 1 else ""
|
||
tip = str(entry[2]) if len(entry) > 2 else ""
|
||
meta_kind = str(entry[3]) if len(entry) > 3 else ""
|
||
meta_linkdown = bool(entry[4]) if len(entry) > 4 else False
|
||
it = QListWidgetItem(label)
|
||
it.setData(QtCore.Qt.UserRole, value)
|
||
it.setData(QtCore.Qt.UserRole + 2, label) # base label (without [VPN]/[DIRECT])
|
||
it.setData(QtCore.Qt.UserRole + 3, tip) # base tooltip (without existing-state)
|
||
it.setData(QtCore.Qt.UserRole + 4, meta_kind)
|
||
it.setData(QtCore.Qt.UserRole + 5, meta_linkdown)
|
||
lst.addItem(it)
|
||
self._update_item_render(it, kind)
|
||
|
||
layout.addWidget(lst, stretch=1)
|
||
filt.textChanged.connect(lambda txt, l=lst: self._apply_filter(l, txt))
|
||
|
||
self.tabs.addTab(tab, title)
|
||
self._tab_kind[tab] = kind
|
||
self._tab_list[tab] = lst
|
||
self._tab_filter[tab] = filt
|
||
self._list_kind[lst] = kind
|
||
self._list_title[lst] = title
|
||
|
||
def _current_kind_and_list(self) -> tuple[str, QListWidget | None]:
|
||
tab = self.tabs.currentWidget()
|
||
if tab is None:
|
||
return "", None
|
||
return self._tab_kind.get(tab, ""), self._tab_list.get(tab)
|
||
|
||
def _add_selected(self, target: str) -> None:
|
||
kind, lst = self._current_kind_and_list()
|
||
if not kind or lst is None:
|
||
return
|
||
|
||
vals: list[str] = []
|
||
for it in lst.selectedItems():
|
||
v = it.data(QtCore.Qt.UserRole)
|
||
vv = str(v or "").strip()
|
||
if vv:
|
||
vals.append(vv)
|
||
|
||
# stable de-dupe
|
||
out: list[str] = []
|
||
seen: set[str] = set()
|
||
for v in vals:
|
||
if v in seen:
|
||
continue
|
||
seen.add(v)
|
||
out.append(v)
|
||
|
||
if not out:
|
||
self._set_status("Nothing selected", ok=None)
|
||
return
|
||
|
||
tgt = (target or "").strip().lower()
|
||
k = (kind or "").strip().lower()
|
||
other = "direct" if tgt == "vpn" else "vpn"
|
||
have_tgt = self.existing.get(tgt, {}).get(k, set()) or set()
|
||
have_other = self.existing.get(other, {}).get(k, set()) or set()
|
||
|
||
to_add: list[str] = []
|
||
skipped = 0
|
||
conflicts = 0
|
||
for v in out:
|
||
if v in have_tgt:
|
||
skipped += 1
|
||
continue
|
||
if v in have_other:
|
||
conflicts += 1
|
||
to_add.append(v)
|
||
|
||
if not to_add:
|
||
self._set_status(f"Nothing new to add (skipped={skipped}, conflicts={conflicts})", ok=None)
|
||
return
|
||
|
||
self.add_cb(target, kind, to_add)
|
||
|
||
# Update local state so UI marks newly added items immediately.
|
||
if tgt not in self.existing:
|
||
self.existing[tgt] = {}
|
||
if k not in self.existing[tgt]:
|
||
self.existing[tgt][k] = set()
|
||
for v in to_add:
|
||
self.existing[tgt][k].add(v)
|
||
|
||
for i in range(lst.count()):
|
||
it = lst.item(i)
|
||
if it is None:
|
||
continue
|
||
self._update_item_render(it, kind)
|
||
self._refilter_current()
|
||
|
||
msg = f"Added {len(to_add)} item(s) to Force {tgt.upper()} ({k})."
|
||
if skipped or conflicts:
|
||
msg += f" skipped={skipped} conflicts={conflicts}"
|
||
self._set_status(msg, ok=True)
|
||
|
||
def _list_for_title(self, title: str) -> QListWidget | None:
|
||
for i in range(self.tabs.count()):
|
||
if self.tabs.tabText(i) == title:
|
||
tab = self.tabs.widget(i)
|
||
return self._tab_list.get(tab)
|
||
return None
|
||
|
||
def _preset_clear_selection(self, title: str) -> None:
|
||
lst = self._list_for_title(title)
|
||
if lst is not None:
|
||
lst.clearSelection()
|
||
|
||
def _preset_select_services(self, keywords: list[str]) -> None:
|
||
lst = self._list_for_title("Services")
|
||
if lst is None:
|
||
return
|
||
keys = [str(k).strip().lower() for k in (keywords or []) if str(k).strip()]
|
||
if not keys:
|
||
return
|
||
lst.clearSelection()
|
||
for i in range(lst.count()):
|
||
it = lst.item(i)
|
||
if it is None:
|
||
continue
|
||
txt = (it.text() or "").lower()
|
||
val = str(it.data(QtCore.Qt.UserRole) or "").lower()
|
||
if any(k in txt or k in val for k in keys):
|
||
it.setSelected(True)
|
||
|
||
def _preset_select_uids(self, uids: list[int]) -> None:
|
||
lst = self._list_for_title("UIDs")
|
||
if lst is None:
|
||
return
|
||
want = {f"{int(u)}-{int(u)}" for u in (uids or [])}
|
||
if not want:
|
||
return
|
||
lst.clearSelection()
|
||
for i in range(lst.count()):
|
||
it = lst.item(i)
|
||
if it is None:
|
||
continue
|
||
token = str(it.data(QtCore.Qt.UserRole) or "").strip()
|
||
if token in want:
|
||
it.setSelected(True)
|
||
|
||
def _build_subnets_tab(self) -> None:
|
||
subs = list(getattr(self.cands, "subnets", []) or [])
|
||
|
||
items: list[tuple[str, str, str, str, bool]] = []
|
||
for s in subs:
|
||
cidr = str(getattr(s, "cidr", "") or "").strip()
|
||
if not cidr:
|
||
continue
|
||
dev = str(getattr(s, "dev", "") or "").strip()
|
||
kind = str(getattr(s, "kind", "") or "").strip()
|
||
linkdown = bool(getattr(s, "linkdown", False))
|
||
tags = []
|
||
if kind:
|
||
tags.append(kind)
|
||
if dev:
|
||
tags.append(dev)
|
||
if linkdown:
|
||
tags.append("linkdown")
|
||
tag_txt = " " + "[" + ", ".join(tags) + "]" if tags else ""
|
||
tip = (
|
||
f"CIDR: {cidr}\n"
|
||
f"kind={kind or '-'} dev={dev or '-'} linkdown={linkdown}\n\n"
|
||
"EN: Source subnet overrides affect forwarded traffic (Docker).\n"
|
||
"RU: Source subnet влияет на forwarded трафик (Docker)."
|
||
)
|
||
items.append((f"{cidr}{tag_txt}", cidr, tip, kind, linkdown))
|
||
|
||
def extra(layout: QVBoxLayout) -> None:
|
||
row = QHBoxLayout()
|
||
btn_lan = QPushButton("Keep LAN direct")
|
||
btn_lan.clicked.connect(lambda: self._preset_add_lan_direct())
|
||
row.addWidget(btn_lan)
|
||
btn_docker = QPushButton("Keep Docker direct")
|
||
btn_docker.clicked.connect(lambda: self._preset_add_docker_direct())
|
||
row.addWidget(btn_docker)
|
||
row.addStretch(1)
|
||
layout.addLayout(row)
|
||
|
||
row2 = QHBoxLayout()
|
||
btn_f_lan = QPushButton("Filter LAN")
|
||
btn_f_lan.clicked.connect(lambda: self._preset_filter_subnets(lan=True, docker=False, link=True))
|
||
row2.addWidget(btn_f_lan)
|
||
btn_f_docker = QPushButton("Filter Docker")
|
||
btn_f_docker.clicked.connect(lambda: self._preset_filter_subnets(lan=False, docker=True, link=False))
|
||
row2.addWidget(btn_f_docker)
|
||
btn_f_clear = QPushButton("Clear filter")
|
||
btn_f_clear.clicked.connect(lambda: self._preset_filter_subnets(lan=True, docker=True, link=True))
|
||
row2.addWidget(btn_f_clear)
|
||
row2.addStretch(1)
|
||
layout.addLayout(row2)
|
||
|
||
row3 = QHBoxLayout()
|
||
self.chk_sub_show_lan = QCheckBox("LAN")
|
||
self.chk_sub_show_lan.setChecked(True)
|
||
self.chk_sub_show_lan.setToolTip("EN: Show LAN subnets.\nRU: Показать LAN подсети.")
|
||
self.chk_sub_show_lan.stateChanged.connect(lambda _s: self._refilter_current())
|
||
row3.addWidget(self.chk_sub_show_lan)
|
||
|
||
self.chk_sub_show_docker = QCheckBox("Docker")
|
||
self.chk_sub_show_docker.setChecked(True)
|
||
self.chk_sub_show_docker.setToolTip("EN: Show Docker/container subnets.\nRU: Показать Docker/контейнерные подсети.")
|
||
self.chk_sub_show_docker.stateChanged.connect(lambda _s: self._refilter_current())
|
||
row3.addWidget(self.chk_sub_show_docker)
|
||
|
||
self.chk_sub_show_link = QCheckBox("Link (scope link)")
|
||
self.chk_sub_show_link.setChecked(True)
|
||
self.chk_sub_show_link.setToolTip("EN: Show link-scope routes.\nRU: Показать маршруты scope link.")
|
||
self.chk_sub_show_link.stateChanged.connect(lambda _s: self._refilter_current())
|
||
row3.addWidget(self.chk_sub_show_link)
|
||
|
||
self.chk_sub_hide_linkdown = QCheckBox("Hide linkdown")
|
||
self.chk_sub_hide_linkdown.setChecked(True)
|
||
self.chk_sub_hide_linkdown.setToolTip("EN: Hide routes marked as linkdown.\nRU: Скрыть маршруты с меткой linkdown.")
|
||
self.chk_sub_hide_linkdown.stateChanged.connect(lambda _s: self._refilter_current())
|
||
row3.addWidget(self.chk_sub_hide_linkdown)
|
||
|
||
row3.addStretch(1)
|
||
layout.addLayout(row3)
|
||
|
||
self._add_tab("Subnets", "subnet", items, extra=extra)
|
||
|
||
def _preset_add_lan_direct(self) -> None:
|
||
subs = list(getattr(self.cands, "subnets", []) or [])
|
||
vals: list[str] = []
|
||
for s in subs:
|
||
kind = str(getattr(s, "kind", "") or "").strip()
|
||
cidr = str(getattr(s, "cidr", "") or "").strip()
|
||
if not cidr:
|
||
continue
|
||
if kind in ("lan", "link"):
|
||
vals.append(cidr)
|
||
if vals:
|
||
self.add_cb("direct", "subnet", vals)
|
||
|
||
def _preset_add_docker_direct(self) -> None:
|
||
subs = list(getattr(self.cands, "subnets", []) or [])
|
||
vals: list[str] = []
|
||
for s in subs:
|
||
kind = str(getattr(s, "kind", "") or "").strip()
|
||
cidr = str(getattr(s, "cidr", "") or "").strip()
|
||
if not cidr:
|
||
continue
|
||
if kind == "docker":
|
||
vals.append(cidr)
|
||
if vals:
|
||
self.add_cb("direct", "subnet", vals)
|
||
|
||
def _build_services_tab(self) -> None:
|
||
units = list(getattr(self.cands, "units", []) or [])
|
||
|
||
items: list[tuple[str, str, str]] = []
|
||
for u in units:
|
||
unit = str(getattr(u, "unit", "") or "").strip()
|
||
if not unit:
|
||
continue
|
||
desc = str(getattr(u, "description", "") or "").strip()
|
||
cgroup = str(getattr(u, "cgroup", "") or "").strip() or unit
|
||
label = unit
|
||
if desc:
|
||
label += " - " + desc
|
||
tip = (
|
||
f"Unit: {unit}\n"
|
||
f"Cgroup token: {cgroup}\n\n"
|
||
"EN: Adds a cgroup override; backend resolves it to UID rules at apply time.\n"
|
||
"RU: Добавляет cgroup override; backend резолвит его в UID правила при применении."
|
||
)
|
||
items.append((label, cgroup, tip))
|
||
|
||
def extra(layout: QVBoxLayout) -> None:
|
||
row = QHBoxLayout()
|
||
btn_docker = QPushButton("Select docker/container")
|
||
btn_docker.clicked.connect(lambda: self._preset_select_services(["docker", "containerd", "podman"]))
|
||
row.addWidget(btn_docker)
|
||
btn_media = QPushButton("Select media (jellyfin/plex)")
|
||
btn_media.clicked.connect(lambda: self._preset_select_services(["jellyfin", "plex", "emby"]))
|
||
row.addWidget(btn_media)
|
||
btn_clear = QPushButton("Clear selection")
|
||
btn_clear.clicked.connect(lambda: self._preset_clear_selection("Services"))
|
||
row.addWidget(btn_clear)
|
||
row.addStretch(1)
|
||
layout.addLayout(row)
|
||
|
||
row2 = QHBoxLayout()
|
||
btn_f_docker = QPushButton("Filter docker")
|
||
btn_f_docker.clicked.connect(lambda: self._preset_set_filter("Services", "docker"))
|
||
row2.addWidget(btn_f_docker)
|
||
btn_f_media = QPushButton("Filter media")
|
||
btn_f_media.clicked.connect(lambda: self._preset_set_filter("Services", "jellyfin"))
|
||
row2.addWidget(btn_f_media)
|
||
btn_f_clear = QPushButton("Clear filter")
|
||
btn_f_clear.clicked.connect(lambda: self._preset_set_filter("Services", ""))
|
||
row2.addWidget(btn_f_clear)
|
||
row2.addStretch(1)
|
||
layout.addLayout(row2)
|
||
|
||
self._add_tab("Services", "cgroup", items, extra=extra)
|
||
|
||
def _build_uids_tab(self) -> None:
|
||
uids = list(getattr(self.cands, "uids", []) or [])
|
||
|
||
items: list[tuple[str, str, str]] = []
|
||
for u in uids:
|
||
try:
|
||
uid = int(getattr(u, "uid", 0) or 0)
|
||
except Exception:
|
||
continue
|
||
user = str(getattr(u, "user", "") or "").strip()
|
||
examples = list(getattr(u, "examples", []) or [])
|
||
ex_txt = ", ".join([str(x) for x in examples if str(x).strip()])
|
||
|
||
label = str(uid)
|
||
if user:
|
||
label += f" ({user})"
|
||
if ex_txt:
|
||
label += " - " + ex_txt
|
||
|
||
token = f"{uid}-{uid}"
|
||
tip = (
|
||
f"UID: {uid}\n"
|
||
f"User: {user or '-'}\n"
|
||
f"Examples: {ex_txt or '-'}\n\n"
|
||
"EN: UID rules affect host-local processes (OUTPUT).\n"
|
||
"RU: UID правила влияют на процессы хоста (OUTPUT)."
|
||
)
|
||
items.append((label, token, tip))
|
||
|
||
def extra(layout: QVBoxLayout) -> None:
|
||
row = QHBoxLayout()
|
||
btn_me = QPushButton("Select my UID")
|
||
btn_me.clicked.connect(lambda: self._preset_select_uids([os.getuid()]))
|
||
row.addWidget(btn_me)
|
||
btn_root = QPushButton("Select root UID")
|
||
btn_root.clicked.connect(lambda: self._preset_select_uids([0]))
|
||
row.addWidget(btn_root)
|
||
btn_clear = QPushButton("Clear selection")
|
||
btn_clear.clicked.connect(lambda: self._preset_clear_selection("UIDs"))
|
||
row.addWidget(btn_clear)
|
||
row.addStretch(1)
|
||
layout.addLayout(row)
|
||
|
||
self._add_tab("UIDs", "uid", items, extra=extra)
|
||
|
||
|
||
# ---------------------------------------------------------------------
|
||
# App picker (.desktop entries)
|
||
# ---------------------------------------------------------------------
|
||
|
||
|
||
def _desktop_bool(v: str) -> bool:
|
||
return str(v or "").strip().lower() in _DESKTOP_BOOL_TRUE
|
||
|
||
|
||
def _desktop_name_from_section(sec: configparser.SectionProxy) -> str:
|
||
name = str(sec.get("Name", "") or "").strip()
|
||
if name:
|
||
return name
|
||
|
||
# Prefer Russian if present, then English, then any localized variant.
|
||
for key in ("Name[ru]", "Name[en_US]", "Name[en]"):
|
||
v = str(sec.get(key, "") or "").strip()
|
||
if v:
|
||
return v
|
||
|
||
for k, v in sec.items():
|
||
kk = str(k or "")
|
||
if kk.startswith("Name[") and str(v or "").strip():
|
||
return str(v).strip()
|
||
return ""
|
||
|
||
|
||
def _sanitize_desktop_exec(exec_raw: str, *, name: str = "", desktop_path: str = "") -> str:
|
||
raw = str(exec_raw or "").strip()
|
||
if not raw:
|
||
return ""
|
||
|
||
# Desktop spec: "%%" -> literal "%".
|
||
raw = raw.replace("%%", "%")
|
||
|
||
# Best-effort expansion for a couple of common fields.
|
||
if name:
|
||
raw = raw.replace("%c", name)
|
||
if desktop_path:
|
||
raw = raw.replace("%k", desktop_path)
|
||
|
||
try:
|
||
tokens = shlex.split(raw)
|
||
except Exception:
|
||
tokens = raw.split()
|
||
|
||
out: list[str] = []
|
||
for t in tokens:
|
||
tok = str(t or "").strip()
|
||
if not tok:
|
||
continue
|
||
# Drop standalone field codes (%u, %U, %f, ...).
|
||
if len(tok) == 2 and tok.startswith("%"):
|
||
continue
|
||
# Remove field codes inside tokens (e.g. --name=%c).
|
||
tok = _DESKTOP_EXEC_FIELD_RE.sub("", tok).strip()
|
||
if tok:
|
||
out.append(tok)
|
||
|
||
if not out:
|
||
return ""
|
||
return " ".join(shlex.quote(x) for x in out)
|
||
|
||
|
||
def _desktop_entry_from_file(path: str, *, source: str) -> Optional[DesktopAppEntry]:
|
||
p = str(path or "").strip()
|
||
if not p or not p.endswith(".desktop"):
|
||
return None
|
||
|
||
cp = configparser.ConfigParser(interpolation=None, strict=False)
|
||
cp.optionxform = str # keep case
|
||
try:
|
||
cp.read(p, encoding="utf-8")
|
||
except Exception:
|
||
try:
|
||
data = pathlib.Path(p).read_bytes()
|
||
cp.read_string(data.decode("utf-8", errors="replace"))
|
||
except Exception:
|
||
return None
|
||
|
||
if "Desktop Entry" not in cp:
|
||
return None
|
||
|
||
sec = cp["Desktop Entry"]
|
||
if _desktop_bool(sec.get("Hidden", "")) or _desktop_bool(sec.get("NoDisplay", "")):
|
||
return None
|
||
|
||
typ = str(sec.get("Type", "") or "").strip().lower()
|
||
if typ and typ != "application":
|
||
return None
|
||
|
||
exec_raw = str(sec.get("Exec", "") or "").strip()
|
||
if not exec_raw:
|
||
return None
|
||
|
||
name = _desktop_name_from_section(sec)
|
||
desktop_id = pathlib.Path(p).name
|
||
if not name:
|
||
name = desktop_id.replace(".desktop", "")
|
||
|
||
src = str(source or "").strip().lower() or "system"
|
||
return DesktopAppEntry(
|
||
desktop_id=desktop_id,
|
||
name=name,
|
||
exec_raw=exec_raw,
|
||
path=p,
|
||
source=src,
|
||
)
|
||
|
||
|
||
def _scan_desktop_entries() -> list[DesktopAppEntry]:
|
||
home = os.path.expanduser("~")
|
||
dirs: list[tuple[str, str]] = [
|
||
(os.path.join(home, ".local/share/applications"), "user"),
|
||
("/usr/local/share/applications", "system"),
|
||
("/usr/share/applications", "system"),
|
||
(os.path.join(home, ".local/share/flatpak/exports/share/applications"), "flatpak"),
|
||
("/var/lib/flatpak/exports/share/applications", "flatpak"),
|
||
("/var/lib/snapd/desktop/applications", "snap"),
|
||
]
|
||
|
||
seen: set[tuple[str, str]] = set()
|
||
out: list[DesktopAppEntry] = []
|
||
for d, src in dirs:
|
||
if not d or not os.path.isdir(d):
|
||
continue
|
||
try:
|
||
paths = sorted(pathlib.Path(d).glob("*.desktop"))
|
||
except Exception:
|
||
continue
|
||
for fp in paths:
|
||
ent = _desktop_entry_from_file(str(fp), source=src)
|
||
if ent is None:
|
||
continue
|
||
key = (ent.desktop_id, ent.source)
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
out.append(ent)
|
||
|
||
out.sort(key=lambda e: (e.name.lower(), e.source, e.desktop_id.lower()))
|
||
return out
|
||
|
||
|
||
class AppPickerDialog(QDialog):
|
||
def __init__(self, *, parent=None) -> None:
|
||
super().__init__(parent)
|
||
self.setWindowTitle("Pick app (.desktop)")
|
||
self.resize(860, 640)
|
||
|
||
self._selected_cmd: str = ""
|
||
self._entries: list[DesktopAppEntry] = _scan_desktop_entries()
|
||
|
||
root = QVBoxLayout(self)
|
||
|
||
note = QLabel(
|
||
"EN: Pick an installed GUI app from .desktop entries (system + flatpak + snap). "
|
||
"The command will be filled without %u/%U/%f placeholders.\n"
|
||
"RU: Выбери приложение из .desktop (system + flatpak + snap). "
|
||
"Команда будет заполнена без плейсхолдеров %u/%U/%f."
|
||
)
|
||
note.setWordWrap(True)
|
||
note.setStyleSheet("color: gray;")
|
||
root.addWidget(note)
|
||
|
||
row = QHBoxLayout()
|
||
row.addWidget(QLabel("Search"))
|
||
self.ed_search = QLineEdit()
|
||
self.ed_search.setPlaceholderText("Type to filter (name / id / exec)...")
|
||
row.addWidget(self.ed_search, stretch=1)
|
||
self.lbl_count = QLabel("—")
|
||
self.lbl_count.setStyleSheet("color: gray;")
|
||
row.addWidget(self.lbl_count)
|
||
root.addLayout(row)
|
||
|
||
self.lst = QListWidget()
|
||
self.lst.setSelectionMode(QAbstractItemView.SingleSelection)
|
||
root.addWidget(self.lst, stretch=1)
|
||
|
||
self.preview = QPlainTextEdit()
|
||
self.preview.setReadOnly(True)
|
||
self.preview.setFixedHeight(180)
|
||
root.addWidget(self.preview)
|
||
|
||
row_btn = QHBoxLayout()
|
||
self.btn_use = QPushButton("Use selected")
|
||
self.btn_use.clicked.connect(self.on_use_selected)
|
||
row_btn.addWidget(self.btn_use)
|
||
row_btn.addStretch(1)
|
||
btn_close = QPushButton("Close")
|
||
btn_close.clicked.connect(self.reject)
|
||
row_btn.addWidget(btn_close)
|
||
root.addLayout(row_btn)
|
||
|
||
self._populate()
|
||
self.ed_search.textChanged.connect(lambda _t: self._apply_filter())
|
||
self.lst.currentItemChanged.connect(lambda _a, _b: self._update_preview())
|
||
self.lst.itemDoubleClicked.connect(lambda _it: self.on_use_selected())
|
||
|
||
QtCore.QTimer.singleShot(0, self._apply_filter)
|
||
|
||
def selected_command(self) -> str:
|
||
return str(self._selected_cmd or "")
|
||
|
||
def _populate(self) -> None:
|
||
self.lst.clear()
|
||
for ent in self._entries:
|
||
src = ent.source
|
||
label = f"{ent.name} [{src}] ({ent.desktop_id})"
|
||
tip = (
|
||
f"Name: {ent.name}\n"
|
||
f"ID: {ent.desktop_id}\n"
|
||
f"Source: {src}\n"
|
||
f"Path: {ent.path}\n\n"
|
||
f"Exec: {ent.exec_raw}"
|
||
)
|
||
it = QListWidgetItem(label)
|
||
it.setToolTip(tip)
|
||
it.setData(QtCore.Qt.UserRole, ent)
|
||
# precomputed search string
|
||
it.setData(
|
||
QtCore.Qt.UserRole + 1,
|
||
(label + "\n" + ent.exec_raw + "\n" + ent.path).lower(),
|
||
)
|
||
self.lst.addItem(it)
|
||
|
||
if self.lst.count() > 0:
|
||
self.lst.setCurrentRow(0)
|
||
self._update_preview()
|
||
|
||
def _apply_filter(self) -> None:
|
||
q = (self.ed_search.text() or "").strip().lower()
|
||
shown = 0
|
||
for i in range(self.lst.count()):
|
||
it = self.lst.item(i)
|
||
if not it:
|
||
continue
|
||
hay = str(it.data(QtCore.Qt.UserRole + 1) or "")
|
||
hide = bool(q) and q not in hay
|
||
it.setHidden(hide)
|
||
if not hide:
|
||
shown += 1
|
||
self.lbl_count.setText(f"Apps: {shown}/{self.lst.count()}")
|
||
|
||
def _current_entry(self) -> Optional[DesktopAppEntry]:
|
||
it = self.lst.currentItem()
|
||
if not it:
|
||
return None
|
||
ent = it.data(QtCore.Qt.UserRole)
|
||
if isinstance(ent, DesktopAppEntry):
|
||
return ent
|
||
return None
|
||
|
||
def _update_preview(self) -> None:
|
||
ent = self._current_entry()
|
||
if ent is None:
|
||
self.preview.setPlainText("—")
|
||
self.btn_use.setEnabled(False)
|
||
return
|
||
cmd = _sanitize_desktop_exec(ent.exec_raw, name=ent.name, desktop_path=ent.path)
|
||
text = (
|
||
f"Name: {ent.name}\n"
|
||
f"ID: {ent.desktop_id}\n"
|
||
f"Source: {ent.source}\n"
|
||
f"Path: {ent.path}\n\n"
|
||
f"Exec (raw):\n{ent.exec_raw}\n\n"
|
||
f"Command (sanitized):\n{cmd}"
|
||
)
|
||
self.preview.setPlainText(text)
|
||
self.btn_use.setEnabled(bool(cmd.strip()))
|
||
|
||
def on_use_selected(self) -> None:
|
||
ent = self._current_entry()
|
||
if ent is None:
|
||
return
|
||
cmd = _sanitize_desktop_exec(ent.exec_raw, name=ent.name, desktop_path=ent.path).strip()
|
||
if not cmd:
|
||
QMessageBox.warning(self, "No command", "Selected app has no usable Exec command.")
|
||
return
|
||
self._selected_cmd = cmd
|
||
self.accept()
|