Files
elmprodvpn/selective-vpn-gui/traffic_mode_dialog.py

1919 lines
76 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
from __future__ import annotations
import configparser
import os
import pathlib
import re
import shlex
import subprocess
import time
from dataclasses import dataclass
from typing import Callable, Optional
from PySide6 import QtCore, QtGui
from PySide6.QtWidgets import (
QButtonGroup,
QCheckBox,
QComboBox,
QDialog,
QGroupBox,
QHBoxLayout,
QLabel,
QLineEdit,
QListWidget,
QListWidgetItem,
QAbstractItemView,
QMessageBox,
QPlainTextEdit,
QPushButton,
QRadioButton,
QSpinBox,
QTabWidget,
QVBoxLayout,
QWidget,
)
from dashboard_controller import DashboardController
_DESKTOP_BOOL_TRUE = {"1", "true", "yes", "y", "on"}
_DESKTOP_EXEC_FIELD_RE = re.compile(r"%[A-Za-z]")
@dataclass(frozen=True)
class DesktopAppEntry:
desktop_id: str
name: str
exec_raw: str
path: str
source: str # system|flatpak|user
class TrafficModeDialog(QDialog):
def __init__(
self,
controller: DashboardController,
*,
log_cb: Callable[[str], None] | None = None,
refresh_cb: Callable[[], None] | None = None,
parent=None,
) -> None:
super().__init__(parent)
self.ctrl = controller
self.log_cb = log_cb
self.refresh_cb = refresh_cb
self.setWindowTitle("Traffic mode settings")
self.resize(780, 760)
root = QVBoxLayout(self)
# EN: Persist small UI state across dialog sessions.
# RU: Сохраняем небольшой UI state между открытиями окна.
self._settings = QtCore.QSettings("AdGuardVPN", "SelectiveVPNDashboardQt")
self._last_app_unit: str = str(self._settings.value("traffic_app_last_unit", "") or "")
self._last_app_target: str = str(self._settings.value("traffic_app_last_target", "") or "")
try:
self._last_app_cgroup_id: int = int(self._settings.value("traffic_app_last_cgroup_id", 0) or 0)
except Exception:
self._last_app_cgroup_id = 0
hint_group = QGroupBox("Mode behavior")
hint_layout = QVBoxLayout(hint_group)
hint_layout.addWidget(QLabel("Selective: only marked traffic goes via VPN."))
hint_layout.addWidget(QLabel("Full tunnel: all traffic goes via VPN."))
hint_layout.addWidget(QLabel("Direct: VPN routing rules are disabled."))
warn = QLabel(
"Warning: Full tunnel can break local/LAN access depending on your host routes."
)
warn.setStyleSheet("color: red;")
hint_layout.addWidget(warn)
root.addWidget(hint_group)
tip = QLabel("Tip: hover any control for help. Подсказка: наведи на элемент для описания.")
tip.setWordWrap(True)
tip.setStyleSheet("color: gray;")
root.addWidget(tip)
self.tabs = QTabWidget()
root.addWidget(self.tabs, stretch=1)
tab_basic = QWidget()
tab_basic_layout = QVBoxLayout(tab_basic)
mode_group = QGroupBox("Traffic mode relay")
mode_layout = QVBoxLayout(mode_group)
row_mode = QHBoxLayout()
self.rad_selective = QRadioButton("Selective")
self.rad_selective.setToolTip("""EN: Only marked traffic (fwmark 0x66) uses VPN policy table (agvpn).
RU: Только помеченный трафик (fwmark 0x66) идет через policy-table (agvpn).""")
self.rad_selective.toggled.connect(
lambda checked: self.on_mode_toggle("selective", checked)
)
row_mode.addWidget(self.rad_selective)
self.rad_full = QRadioButton("Full tunnel")
self.rad_full.setToolTip("""EN: All traffic uses VPN policy table (agvpn). Use with auto-local bypass for LAN/docker.
RU: Весь трафик идет через policy-table (agvpn). Для LAN/docker включай auto-local bypass.""")
self.rad_full.toggled.connect(
lambda checked: self.on_mode_toggle("full_tunnel", checked)
)
row_mode.addWidget(self.rad_full)
self.rad_direct = QRadioButton("Direct")
self.rad_direct.setToolTip("""EN: Disables base VPN routing rules (no full/selective rule).
RU: Отключает базовые VPN policy-rules (нет full/selective правила).""")
self.rad_direct.toggled.connect(
lambda checked: self.on_mode_toggle("direct", checked)
)
row_mode.addWidget(self.rad_direct)
row_mode.addStretch(1)
mode_layout.addLayout(row_mode)
row_iface = QHBoxLayout()
row_iface.addWidget(QLabel("Preferred iface"))
self.cmb_iface = QComboBox()
self.cmb_iface.setToolTip("""EN: VPN interface for policy routing. Use auto unless you know the exact iface.
RU: Интерфейс VPN для policy routing. Оставь auto, если не уверен.""")
self.cmb_iface.setEditable(True)
self.cmb_iface.setInsertPolicy(QComboBox.NoInsert)
self.cmb_iface.setMinimumWidth(180)
row_iface.addWidget(self.cmb_iface)
self.btn_refresh_ifaces = QPushButton("Detect ifaces")
self.btn_refresh_ifaces.setToolTip("""EN: Refresh list of available interfaces (UP).
RU: Обновить список доступных интерфейсов (UP).""")
self.btn_refresh_ifaces.clicked.connect(self.on_refresh_ifaces)
row_iface.addWidget(self.btn_refresh_ifaces)
row_iface.addStretch(1)
mode_layout.addLayout(row_iface)
self.chk_auto_local = QCheckBox("Auto-local bypass (LAN/container subnets)")
self.chk_auto_local.setToolTip("""EN: Mirrors local/LAN/docker routes from main into agvpn table to prevent breakage in full tunnel.
EN: This does NOT force containers to use direct internet; use Force Direct subnets for that.
RU: Копирует локальные/LAN/docker маршруты из main в agvpn, чтобы не ломалась локалка в full tunnel.
RU: Это НЕ делает контейнеры direct в интернет; для этого используй Force Direct subnets.""")
self.chk_auto_local.stateChanged.connect(lambda _state: self.on_auto_local_toggle())
mode_layout.addWidget(self.chk_auto_local)
self.lbl_state = QLabel("Traffic mode: —")
self.lbl_state.setStyleSheet("color: gray;")
mode_layout.addWidget(self.lbl_state)
self.lbl_diag = QLabel("")
self.lbl_diag.setStyleSheet("color: gray;")
mode_layout.addWidget(self.lbl_diag)
tab_basic_layout.addWidget(mode_group)
maint_group = QGroupBox("Rollback / cache")
maint_layout = QHBoxLayout(maint_group)
self.btn_rollback = QPushButton("Clear routes (save cache)")
self.btn_rollback.setToolTip("""EN: Clears VPN routes and nft sets, but saves a cache snapshot for restore.
RU: Очищает VPN маршруты и nft-сеты, но сохраняет снапшот для восстановления.""")
self.btn_rollback.clicked.connect(self.on_rollback)
maint_layout.addWidget(self.btn_rollback)
self.btn_restore_cache = QPushButton("Restore cached routes")
self.btn_restore_cache.setToolTip("""EN: Restores routes/nft from the last clear snapshot. Skips non-critical route restore errors.
RU: Восстанавливает маршруты/nft из последнего снапшота clear. Некритичные ошибки восстановления пропускаются.""")
self.btn_restore_cache.clicked.connect(self.on_restore_cache)
maint_layout.addWidget(self.btn_restore_cache)
maint_layout.addStretch(1)
tab_basic_layout.addWidget(maint_group)
tab_basic_layout.addStretch(1)
self.tabs.addTab(tab_basic, "Traffic basics")
# -----------------------------------------------------------------
# Apps (runtime): systemd --user scope + backend appmarks
# -----------------------------------------------------------------
tab_apps = QWidget()
tab_apps_layout = QVBoxLayout(tab_apps)
apps_hint = QLabel(
"Runtime per-app routing (Wayland-friendly):\n"
"- Launch uses systemd-run --user --scope.\n"
"- Backend adds the scope cgroup into nftset -> fwmark rules.\n"
"- Marks are temporary (TTL). Use Policy overrides for persistent policy."
)
apps_hint.setWordWrap(True)
apps_hint.setStyleSheet("color: gray;")
tab_apps_layout.addWidget(apps_hint)
run_group = QGroupBox("Run app in scope + apply mark")
run_layout = QVBoxLayout(run_group)
row_cmd = QHBoxLayout()
row_cmd.addWidget(QLabel("Command"))
self.ed_app_cmd = QLineEdit()
self.ed_app_cmd.setPlaceholderText(
"e.g. firefox --private-window https://example.com"
)
self.ed_app_cmd.setToolTip(
"EN: Command line to run. This runs as current user in a systemd --user scope.\n"
"RU: Команда запуска. Запускается от текущего пользователя в systemd --user scope."
)
row_cmd.addWidget(self.ed_app_cmd, stretch=1)
self.btn_app_pick = QPushButton("Pick app...")
self.btn_app_pick.setToolTip(
"EN: Pick an installed app from .desktop entries (system + flatpak) and fill the command.\n"
"RU: Выбрать установленное приложение из .desktop (system + flatpak) и заполнить команду."
)
self.btn_app_pick.clicked.connect(self.on_app_pick)
row_cmd.addWidget(self.btn_app_pick)
run_layout.addLayout(row_cmd)
row_target = QHBoxLayout()
row_target.addWidget(QLabel("Route via"))
self.rad_app_vpn = QRadioButton("VPN")
self.rad_app_vpn.setToolTip(
"EN: Force this app traffic via VPN policy table (agvpn).\n"
"RU: Форсировать трафик приложения через VPN policy-table (agvpn)."
)
self.rad_app_direct = QRadioButton("Direct")
self.rad_app_direct.setToolTip(
"EN: Force this app traffic to bypass VPN (lookup main), even in full tunnel.\n"
"RU: Форсировать трафик приложения мимо VPN (lookup main), даже в full tunnel."
)
bg_app = QButtonGroup(self)
bg_app.addButton(self.rad_app_vpn)
bg_app.addButton(self.rad_app_direct)
self.rad_app_vpn.setChecked(True)
row_target.addWidget(self.rad_app_vpn)
row_target.addWidget(self.rad_app_direct)
row_target.addStretch(1)
run_layout.addLayout(row_target)
row_ttl = QHBoxLayout()
row_ttl.addWidget(QLabel("TTL (hours)"))
self.spn_app_ttl = QSpinBox()
self.spn_app_ttl.setRange(1, 24 * 30) # up to ~30 days
self.spn_app_ttl.setValue(24)
self.spn_app_ttl.setToolTip(
"EN: How long the runtime mark stays active (backend nftset element timeout).\n"
"RU: Сколько живет runtime-метка (timeout элемента в nftset)."
)
row_ttl.addWidget(self.spn_app_ttl)
row_ttl.addStretch(1)
run_layout.addLayout(row_ttl)
row_btn = QHBoxLayout()
self.btn_app_run = QPushButton("Run + apply mark")
self.btn_app_run.clicked.connect(self.on_app_run)
row_btn.addWidget(self.btn_app_run)
self.btn_app_refresh = QPushButton("Refresh counts")
self.btn_app_refresh.clicked.connect(self.refresh_appmarks_counts)
row_btn.addWidget(self.btn_app_refresh)
row_btn.addStretch(1)
run_layout.addLayout(row_btn)
row_btn2 = QHBoxLayout()
self.btn_app_clear_vpn = QPushButton("Clear VPN marks")
self.btn_app_clear_vpn.clicked.connect(lambda: self.on_appmarks_clear("vpn"))
row_btn2.addWidget(self.btn_app_clear_vpn)
self.btn_app_clear_direct = QPushButton("Clear Direct marks")
self.btn_app_clear_direct.clicked.connect(
lambda: self.on_appmarks_clear("direct")
)
row_btn2.addWidget(self.btn_app_clear_direct)
self.btn_app_stop_last = QPushButton("Stop last scope")
self.btn_app_stop_last.setToolTip(
"EN: Stops the last launched systemd --user scope.\n"
"RU: Останавливает последний запущенный systemd --user scope."
)
self.btn_app_stop_last.clicked.connect(self.on_app_stop_last_scope)
row_btn2.addWidget(self.btn_app_stop_last)
self.btn_app_unmark_last = QPushButton("Unmark last")
self.btn_app_unmark_last.setToolTip(
"EN: Removes routing mark for the last launched scope (by cgroup id).\n"
"RU: Удаляет метку маршрутизации для последнего scope (по cgroup id)."
)
self.btn_app_unmark_last.clicked.connect(self.on_app_unmark_last)
row_btn2.addWidget(self.btn_app_unmark_last)
row_btn2.addStretch(1)
run_layout.addLayout(row_btn2)
self.lbl_app_counts = QLabel("Marks: —")
self.lbl_app_counts.setStyleSheet("color: gray;")
run_layout.addWidget(self.lbl_app_counts)
self.lbl_app_last = QLabel("Last scope: —")
self.lbl_app_last.setStyleSheet("color: gray;")
run_layout.addWidget(self.lbl_app_last)
self._refresh_last_scope_ui()
tab_apps_layout.addWidget(run_group)
self.txt_app = QPlainTextEdit()
self.txt_app.setReadOnly(True)
tab_apps_layout.addWidget(self.txt_app, stretch=1)
tab_apps_layout.addStretch(1)
self.tabs.addTab(tab_apps, "Apps (runtime)")
tab_adv = QWidget()
tab_adv_layout = QVBoxLayout(tab_adv)
self.ed_vpn_subnets = QPlainTextEdit()
self.ed_vpn_subnets.setToolTip("""EN: Force VPN by source subnet. Useful for docker subnets when you want containers via VPN.
RU: Принудительно через VPN по source subnet. Полезно для docker-подсетей, если хочешь контейнеры через VPN.""")
self.ed_vpn_subnets.setPlaceholderText("Force VPN by source subnet, one per line (e.g. 172.18.0.0/16)")
self.ed_vpn_subnets.setFixedHeight(72)
self.ed_vpn_uids = QPlainTextEdit()
self.ed_vpn_uids.setToolTip("""EN: Force VPN by UID/uidrange (host OUTPUT only). Does not affect forwarded docker traffic.
RU: Принудительно через VPN по UID (только процессы хоста). На forwarded docker-трафик не влияет.""")
self.ed_vpn_uids.setPlaceholderText("Force VPN by UID/UID range, one per line (e.g. 1000 or 1000-1010)")
self.ed_vpn_uids.setFixedHeight(60)
self.ed_vpn_cgroups = QPlainTextEdit()
self.ed_vpn_cgroups.setToolTip("""EN: Force VPN by systemd cgroup. Backend resolves cgroup -> PIDs -> UID rules at apply time.
RU: Принудительно через VPN по cgroup (systemd). Backend резолвит cgroup -> PID -> UID при применении.""")
self.ed_vpn_cgroups.setPlaceholderText("Force VPN by cgroup path/name, one per line")
self.ed_vpn_cgroups.setFixedHeight(60)
self.ed_direct_subnets = QPlainTextEdit()
self.ed_direct_subnets.setToolTip("""EN: Force Direct by source subnet. Useful to keep docker subnets direct in full tunnel.
RU: Принудительно direct по source subnet. Полезно, чтобы docker-подсети были direct в full tunnel.""")
self.ed_direct_subnets.setPlaceholderText("Force Direct by source subnet, one per line")
self.ed_direct_subnets.setFixedHeight(72)
self.ed_direct_uids = QPlainTextEdit()
self.ed_direct_uids.setToolTip("""EN: Force Direct by UID/uidrange (host OUTPUT only).
RU: Принудительно direct по UID (только процессы хоста).""")
self.ed_direct_uids.setPlaceholderText("Force Direct by UID/UID range, one per line")
self.ed_direct_uids.setFixedHeight(60)
self.ed_direct_cgroups = QPlainTextEdit()
self.ed_direct_cgroups.setToolTip("""EN: Force Direct by systemd cgroup (resolved to UID rules at apply time).
RU: Принудительно direct по cgroup (резолвится в UID правила при применении).""")
self.ed_direct_cgroups.setPlaceholderText("Force Direct by cgroup path/name, one per line")
self.ed_direct_cgroups.setFixedHeight(60)
cols = QHBoxLayout()
vpn_group = QGroupBox("Force VPN")
vpn_layout = QVBoxLayout(vpn_group)
vpn_layout.addWidget(QLabel("Source subnets"))
vpn_layout.addWidget(self.ed_vpn_subnets)
vpn_layout.addWidget(QLabel("UIDs"))
vpn_layout.addWidget(self.ed_vpn_uids)
vpn_layout.addWidget(QLabel("Cgroups / services"))
vpn_layout.addWidget(self.ed_vpn_cgroups)
cols.addWidget(vpn_group, stretch=1)
direct_group = QGroupBox("Force Direct")
direct_layout = QVBoxLayout(direct_group)
direct_layout.addWidget(QLabel("Source subnets"))
direct_layout.addWidget(self.ed_direct_subnets)
direct_layout.addWidget(QLabel("UIDs"))
direct_layout.addWidget(self.ed_direct_uids)
direct_layout.addWidget(QLabel("Cgroups / services"))
direct_layout.addWidget(self.ed_direct_cgroups)
cols.addWidget(direct_group, stretch=1)
tab_adv_layout.addLayout(cols, stretch=1)
row_adv = QHBoxLayout()
self.btn_pick_detected = QPushButton("Add detected...")
self.btn_pick_detected.setToolTip("""EN: Opens a selector with detected subnets/services/UIDs. Only fills fields; nothing is applied automatically.
RU: Открывает список обнаруженных subnet/service/UID. Только заполняет поля; ничего не применяется автоматически.""")
self.btn_pick_detected.clicked.connect(self.on_pick_detected)
row_adv.addWidget(self.btn_pick_detected)
self.btn_apply_overrides = QPushButton("Apply overrides")
self.btn_apply_overrides.setToolTip("""EN: Applies policy rules and verifies health. On failure backend rolls back.
RU: Применяет policy-rules и проверяет health. При ошибке backend делает откат.""")
self.btn_apply_overrides.clicked.connect(self.on_apply_overrides)
row_adv.addWidget(self.btn_apply_overrides)
self.btn_reload_overrides = QPushButton("Reload overrides")
self.btn_reload_overrides.clicked.connect(self.refresh_state)
row_adv.addWidget(self.btn_reload_overrides)
row_adv.addStretch(1)
tab_adv_layout.addLayout(row_adv)
self.tabs.addTab(tab_adv, "Policy overrides (Advanced)")
# EN: Small status line for last action performed in this dialog.
# RU: Строка статуса последнего действия в этом окне.
self.lbl_action = QLabel("")
self.lbl_action.setWordWrap(True)
self.lbl_action.setStyleSheet("color: gray;")
root.addWidget(self.lbl_action)
row_bottom = QHBoxLayout()
row_bottom.addStretch(1)
btn_close = QPushButton("Close")
btn_close.clicked.connect(self.accept)
row_bottom.addWidget(btn_close)
root.addLayout(row_bottom)
QtCore.QTimer.singleShot(0, self.refresh_state)
QtCore.QTimer.singleShot(0, self.refresh_appmarks_counts)
def _is_operation_error(self, message: str) -> bool:
low = (message or "").strip().lower()
return ("rolled back" in low) or ("apply failed" in low) or ("verification failed" in low)
def _set_action_status(self, msg: str, ok: bool | None = None) -> None:
text = (msg or "").strip() or ""
self.lbl_action.setText(text)
if ok is True:
self.lbl_action.setStyleSheet("color: green;")
elif ok is False:
self.lbl_action.setStyleSheet("color: red;")
else:
self.lbl_action.setStyleSheet("color: gray;")
def _safe(self, fn, *, title: str = "Traffic mode error") -> None:
try:
fn()
except Exception as e:
msg = f"[ui-error] {title}: {e}"
self._set_action_status(msg, ok=False)
self._emit_log(msg)
QMessageBox.critical(self, title, str(e))
def _emit_log(self, msg: str) -> None:
text = (msg or "").strip()
if not text:
return
if self.log_cb:
self.log_cb(text)
else:
try:
self.ctrl.log_gui(text)
except Exception:
pass
def _preferred_iface_value(self) -> str:
raw = self.cmb_iface.currentText().strip()
if raw.lower() in ("", "auto", "-", "default"):
return ""
return raw
def _set_preferred_iface_options(self, ifaces: list[str], selected: str) -> None:
vals = ["auto"] + [x for x in ifaces if x]
sel = selected.strip() if selected else "auto"
if not sel:
sel = "auto"
if sel not in vals:
vals.append(sel)
self.cmb_iface.blockSignals(True)
self.cmb_iface.clear()
self.cmb_iface.addItems(vals)
idx = self.cmb_iface.findText(sel)
if idx < 0:
idx = self.cmb_iface.findText("auto")
if idx >= 0:
self.cmb_iface.setCurrentIndex(idx)
else:
self.cmb_iface.setEditText(sel)
self.cmb_iface.blockSignals(False)
def _lines_from_text(self, txt: str) -> list[str]:
out: list[str] = []
for raw in (txt or "").replace("\r", "\n").split("\n"):
line = raw.strip()
if line:
out.append(line)
return out
def _set_lines(self, widget: QPlainTextEdit, vals: list[str]) -> None:
widget.blockSignals(True)
widget.setPlainText("\n".join([x for x in vals if str(x).strip()]))
widget.blockSignals(False)
def _merge_lines(self, widget: QPlainTextEdit, vals: list[str]) -> int:
cur = self._lines_from_text(widget.toPlainText())
seen = {x.strip() for x in cur}
added = 0
for v in (vals or []):
vv = str(v).strip()
if not vv or vv in seen:
continue
cur.append(vv)
seen.add(vv)
added += 1
if added > 0:
self._set_lines(widget, cur)
return added
def _candidates_add(self, target: str, kind: str, values: list[str]) -> None:
tgt = (target or "").strip().lower()
k = (kind or "").strip().lower()
if tgt not in ("vpn", "direct"):
return
widget: QPlainTextEdit | None = None
if tgt == "vpn":
if k == "subnet":
widget = self.ed_vpn_subnets
elif k == "uid":
widget = self.ed_vpn_uids
elif k == "cgroup":
widget = self.ed_vpn_cgroups
else:
if k == "subnet":
widget = self.ed_direct_subnets
elif k == "uid":
widget = self.ed_direct_uids
elif k == "cgroup":
widget = self.ed_direct_cgroups
if widget is None:
return
added = self._merge_lines(widget, values or [])
if added > 0:
msg = f"Traffic candidates added: target={tgt} kind={k} added={added}"
self._set_action_status(msg, ok=True)
self._emit_log(msg)
else:
msg = f"Traffic candidates add: nothing new (target={tgt} kind={k})"
self._set_action_status(msg, ok=None)
self._emit_log(msg)
def on_pick_detected(self) -> None:
def work() -> None:
cands = self.ctrl.traffic_candidates()
existing = {
"vpn": {
"subnet": set(self._lines_from_text(self.ed_vpn_subnets.toPlainText())),
"uid": set(self._lines_from_text(self.ed_vpn_uids.toPlainText())),
"cgroup": set(self._lines_from_text(self.ed_vpn_cgroups.toPlainText())),
},
"direct": {
"subnet": set(self._lines_from_text(self.ed_direct_subnets.toPlainText())),
"uid": set(self._lines_from_text(self.ed_direct_uids.toPlainText())),
"cgroup": set(self._lines_from_text(self.ed_direct_cgroups.toPlainText())),
},
}
dlg = TrafficCandidatesDialog(
cands,
existing=existing,
add_cb=self._candidates_add,
parent=self,
)
dlg.exec()
self._safe(work, title="Traffic candidates error")
def _set_mode_state(
self,
desired_mode: str,
applied_mode: str,
preferred_iface: str,
auto_local_bypass: bool,
bypass_candidates: int,
overrides_applied: int,
cgroup_resolved_uids: int,
cgroup_warning: str,
healthy: bool,
probe_ok: bool,
probe_message: str,
active_iface: str,
iface_reason: str,
message: str,
) -> None:
desired = (desired_mode or "").strip().lower() or "selective"
applied = (applied_mode or "").strip().lower() or "direct"
if healthy:
color = "green"
health_txt = "OK"
else:
color = "red"
health_txt = "MISMATCH"
text = f"Traffic mode: {desired} (applied: {applied}) [{health_txt}]"
diag_parts = []
diag_parts.append(f"preferred={preferred_iface or 'auto'}")
diag_parts.append(
f"auto_local_bypass={'on' if auto_local_bypass else 'off'}"
)
if bypass_candidates > 0:
diag_parts.append(f"bypass_routes={bypass_candidates}")
diag_parts.append(f"overrides={overrides_applied}")
if cgroup_resolved_uids > 0:
diag_parts.append(f"cgroup_uids={cgroup_resolved_uids}")
if cgroup_warning:
diag_parts.append(f"cgroup_warning={cgroup_warning}")
if active_iface:
diag_parts.append(f"iface={active_iface}")
if iface_reason:
diag_parts.append(f"source={iface_reason}")
diag_parts.append(f"probe={'ok' if probe_ok else 'fail'}")
if probe_message:
diag_parts.append(probe_message)
if message:
diag_parts.append(message)
diag = " | ".join(diag_parts) if diag_parts else ""
self.lbl_state.setText(text)
self.lbl_state.setStyleSheet(f"color: {color};")
self.lbl_diag.setText(diag)
self.lbl_diag.setStyleSheet("color: gray;")
def refresh_state(self) -> None:
def work() -> None:
view = self.ctrl.traffic_mode_view()
mode = (view.desired_mode or "selective").strip().lower()
self.rad_selective.blockSignals(True)
self.rad_full.blockSignals(True)
self.rad_direct.blockSignals(True)
self.rad_selective.setChecked(mode == "selective")
self.rad_full.setChecked(mode == "full_tunnel")
self.rad_direct.setChecked(mode == "direct")
self.rad_selective.blockSignals(False)
self.rad_full.blockSignals(False)
self.rad_direct.blockSignals(False)
opts = self.ctrl.traffic_interfaces()
self._set_preferred_iface_options(opts, view.preferred_iface)
self.chk_auto_local.blockSignals(True)
self.chk_auto_local.setChecked(bool(view.auto_local_bypass))
self.chk_auto_local.blockSignals(False)
self._set_lines(self.ed_vpn_subnets, list(view.force_vpn_subnets or []))
self._set_lines(self.ed_vpn_uids, list(view.force_vpn_uids or []))
self._set_lines(self.ed_vpn_cgroups, list(view.force_vpn_cgroups or []))
self._set_lines(self.ed_direct_subnets, list(view.force_direct_subnets or []))
self._set_lines(self.ed_direct_uids, list(view.force_direct_uids or []))
self._set_lines(self.ed_direct_cgroups, list(view.force_direct_cgroups or []))
self._set_mode_state(
view.desired_mode,
view.applied_mode,
view.preferred_iface,
bool(view.auto_local_bypass),
int(view.bypass_candidates),
int(view.overrides_applied),
int(view.cgroup_resolved_uids),
view.cgroup_warning,
bool(view.healthy),
bool(view.probe_ok),
view.probe_message,
view.active_iface,
view.iface_reason,
view.message,
)
self._safe(work)
def on_mode_toggle(self, mode: str, checked: bool) -> None:
if not checked:
return
def work() -> None:
preferred = self._preferred_iface_value()
auto_local = self.chk_auto_local.isChecked()
view = self.ctrl.traffic_mode_set(mode, preferred, auto_local)
msg = (
f"Traffic mode set: desired={view.desired_mode}, "
f"applied={view.applied_mode}, iface={view.active_iface or '-'}, "
f"preferred={preferred or 'auto'}, probe_ok={view.probe_ok}, "
f"healthy={view.healthy}, auto_local_bypass={view.auto_local_bypass}, "
f"bypass_routes={view.bypass_candidates}, overrides={view.overrides_applied}, "
f"cgroup_uids={view.cgroup_resolved_uids}, message={view.message}"
)
self._emit_log(msg)
op_ok = bool(view.healthy) and not self._is_operation_error(view.message)
self._set_action_status(
f"Traffic mode set: desired={view.desired_mode} applied={view.applied_mode} message={view.message}",
ok=op_ok,
)
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work)
def on_refresh_ifaces(self) -> None:
def work() -> None:
view = self.ctrl.traffic_mode_view()
opts = self.ctrl.traffic_interfaces()
self._set_preferred_iface_options(opts, view.preferred_iface)
self._emit_log(
"Traffic ifaces refreshed: "
f"preferred={view.preferred_iface or 'auto'} "
f"active={view.active_iface or '-'}"
)
self._set_action_status("Traffic ifaces refreshed", ok=True)
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work, title="Traffic iface detect error")
def _selected_mode(self) -> str:
if self.rad_full.isChecked():
return "full_tunnel"
if self.rad_direct.isChecked():
return "direct"
return "selective"
def on_auto_local_toggle(self) -> None:
def work() -> None:
mode = self._selected_mode()
preferred = self._preferred_iface_value()
auto_local = self.chk_auto_local.isChecked()
view = self.ctrl.traffic_mode_set(mode, preferred, auto_local)
msg = (
f"Traffic auto-local set: mode={view.desired_mode}, "
f"auto_local_bypass={view.auto_local_bypass}, "
f"bypass_routes={view.bypass_candidates}, overrides={view.overrides_applied}, "
f"cgroup_uids={view.cgroup_resolved_uids}, message={view.message}"
)
self._emit_log(msg)
op_ok = bool(view.healthy) and not self._is_operation_error(view.message)
self._set_action_status(
f"Auto-local bypass set: {'on' if view.auto_local_bypass else 'off'} ({view.message})",
ok=op_ok,
)
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work, title="Auto-local bypass error")
def on_apply_overrides(self) -> None:
def work() -> None:
mode = self._selected_mode()
preferred = self._preferred_iface_value()
auto_local = self.chk_auto_local.isChecked()
vpn_subnets = self._lines_from_text(self.ed_vpn_subnets.toPlainText())
vpn_uids = self._lines_from_text(self.ed_vpn_uids.toPlainText())
vpn_cgroups = self._lines_from_text(self.ed_vpn_cgroups.toPlainText())
direct_subnets = self._lines_from_text(self.ed_direct_subnets.toPlainText())
direct_uids = self._lines_from_text(self.ed_direct_uids.toPlainText())
direct_cgroups = self._lines_from_text(self.ed_direct_cgroups.toPlainText())
view = self.ctrl.traffic_mode_set(
mode,
preferred,
auto_local,
vpn_subnets,
vpn_uids,
vpn_cgroups,
direct_subnets,
direct_uids,
direct_cgroups,
)
msg = (
f"Traffic overrides applied: mode={view.desired_mode}, "
f"vpn_subnets={len(view.force_vpn_subnets)}, vpn_uids={len(view.force_vpn_uids)}, vpn_cgroups={len(view.force_vpn_cgroups)}, "
f"direct_subnets={len(view.force_direct_subnets)}, direct_uids={len(view.force_direct_uids)}, direct_cgroups={len(view.force_direct_cgroups)}, "
f"overrides={view.overrides_applied}, cgroup_uids={view.cgroup_resolved_uids}, "
f"healthy={view.healthy}, message={view.message}"
)
self._emit_log(msg)
op_ok = bool(view.healthy) and not self._is_operation_error(view.message)
self._set_action_status(
f"Overrides applied: overrides={view.overrides_applied} message={view.message}",
ok=op_ok,
)
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work, title="Apply overrides error")
# -----------------------------------------------------------------
# Apps (runtime) tab
# -----------------------------------------------------------------
def _append_app_log(self, msg: str) -> None:
line = (msg or "").rstrip()
if not line:
return
try:
self.txt_app.appendPlainText(line)
except Exception:
pass
self._emit_log(line)
def refresh_appmarks_counts(self) -> None:
try:
st = self.ctrl.traffic_appmarks_status()
self.lbl_app_counts.setText(
f"Marks: VPN={st.vpn_count}, Direct={st.direct_count}"
)
except Exception as e:
self.lbl_app_counts.setText(f"Marks: error: {e}")
def _refresh_last_scope_ui(self) -> None:
unit = (self._last_app_unit or "").strip()
target = (self._last_app_target or "").strip().lower()
cg_id = int(self._last_app_cgroup_id or 0)
parts = []
if unit:
parts.append(f"unit={unit}")
if target in ("vpn", "direct"):
parts.append(f"target={target}")
if cg_id > 0:
parts.append(f"cgroup_id={cg_id}")
if parts:
self.lbl_app_last.setText("Last scope: " + " | ".join(parts))
else:
self.lbl_app_last.setText("Last scope: —")
self.btn_app_stop_last.setEnabled(bool(unit))
self.btn_app_unmark_last.setEnabled(bool(unit) and target in ("vpn", "direct") and cg_id > 0)
def _set_last_scope(self, *, unit: str = "", target: str = "", cgroup_id: int = 0) -> None:
self._last_app_unit = str(unit or "").strip()
self._last_app_target = str(target or "").strip().lower()
try:
self._last_app_cgroup_id = int(cgroup_id or 0)
except Exception:
self._last_app_cgroup_id = 0
self._settings.setValue("traffic_app_last_unit", self._last_app_unit)
self._settings.setValue("traffic_app_last_target", self._last_app_target)
self._settings.setValue("traffic_app_last_cgroup_id", int(self._last_app_cgroup_id or 0))
self._refresh_last_scope_ui()
def on_app_pick(self) -> None:
def work() -> None:
dlg = AppPickerDialog(parent=self)
if dlg.exec() != QDialog.Accepted:
return
cmd = (dlg.selected_command() or "").strip()
if not cmd:
return
self.ed_app_cmd.setText(cmd)
self._append_app_log(f"[picker] command filled: {cmd}")
self._safe(work, title="App picker error")
def _run_systemd_scope(self, cmdline: str, *, unit: str) -> tuple[str, str]:
args = shlex.split(cmdline or "")
if not args:
raise ValueError("empty command")
run_cmd = [
"systemd-run",
"--user",
"--scope",
"--unit",
unit,
"--collect",
"--same-dir",
] + args
p = subprocess.run(run_cmd, capture_output=True, text=True, check=False)
out = ((p.stdout or "") + (p.stderr or "")).strip()
if p.returncode != 0:
raise RuntimeError(f"systemd-run failed: {p.returncode}\n{out}".strip())
p2 = subprocess.run(
["systemctl", "--user", "show", "-p", "ControlGroup", "--value", unit],
capture_output=True,
text=True,
check=False,
)
out2 = ((p2.stdout or "") + (p2.stderr or "")).strip()
cg = (p2.stdout or "").strip()
if p2.returncode != 0 or not cg:
raise RuntimeError(f"failed to query ControlGroup\n{out2}".strip())
return cg, out
def on_app_run(self) -> None:
def work() -> None:
cmdline = (self.ed_app_cmd.text() or "").strip()
if not cmdline:
QMessageBox.warning(self, "Missing command", "Please enter a command to run.")
return
target = "vpn" if self.rad_app_vpn.isChecked() else "direct"
ttl_sec = int(self.spn_app_ttl.value()) * 3600
unit = f"svpn-{target}-{int(time.time())}.scope"
self._append_app_log(
f"[app] launching: target={target} ttl={ttl_sec}s unit={unit}"
)
cg, out = self._run_systemd_scope(cmdline, unit=unit)
if out:
self._append_app_log(f"[app] systemd-run:\n{out}")
self._append_app_log(f"[app] ControlGroup: {cg}")
self._set_last_scope(unit=unit, target=target, cgroup_id=0)
res = self.ctrl.traffic_appmarks_apply(
op="add",
target=target,
cgroup=cg,
timeout_sec=ttl_sec,
)
if res.ok:
self._append_app_log(
f"[appmarks] OK: {res.message} cgroup_id={res.cgroup_id} timeout={res.timeout_sec}s"
)
self._set_action_status(
f"App mark added: target={target} cgroup_id={res.cgroup_id}",
ok=True,
)
self._set_last_scope(unit=unit, target=target, cgroup_id=int(res.cgroup_id or 0))
else:
self._append_app_log(f"[appmarks] ERROR: {res.message}")
self._set_action_status(
f"App mark failed: target={target} ({res.message})",
ok=False,
)
QMessageBox.critical(
self,
"App mark error",
res.message or "unknown error",
)
self.refresh_appmarks_counts()
self._safe(work, title="Run app error")
def on_app_stop_last_scope(self) -> None:
unit = (self._last_app_unit or "").strip()
if not unit:
return
def work() -> None:
self._append_app_log(f"[app] stop last scope: unit={unit}")
p = subprocess.run(
["systemctl", "--user", "stop", unit],
capture_output=True,
text=True,
check=False,
)
out = ((p.stdout or "") + (p.stderr or "")).strip()
if p.returncode == 0:
self._append_app_log("[app] stopped OK")
if out:
self._append_app_log(out)
self._set_action_status(f"Stopped scope: {unit}", ok=True)
return
self._append_app_log(f"[app] stop failed: rc={p.returncode}")
if out:
self._append_app_log(out)
# fallback: kill
p2 = subprocess.run(
["systemctl", "--user", "kill", unit],
capture_output=True,
text=True,
check=False,
)
out2 = ((p2.stdout or "") + (p2.stderr or "")).strip()
if p2.returncode == 0:
self._append_app_log("[app] kill OK (fallback)")
if out2:
self._append_app_log(out2)
self._set_action_status(f"Killed scope: {unit}", ok=True)
return
self._append_app_log(f"[app] kill failed: rc={p2.returncode}")
if out2:
self._append_app_log(out2)
self._set_action_status(f"Stop scope failed: {unit}", ok=False)
QMessageBox.critical(self, "Stop scope error", out2 or out or "stop failed")
self._safe(work, title="Stop scope error")
def on_app_unmark_last(self) -> None:
unit = (self._last_app_unit or "").strip()
target = (self._last_app_target or "").strip().lower()
cg_id = int(self._last_app_cgroup_id or 0)
if not unit or target not in ("vpn", "direct") or cg_id <= 0:
return
def work() -> None:
if QMessageBox.question(
self,
"Unmark last",
f"Remove routing mark for last scope?\n\nunit={unit}\ntarget={target}\ncgroup_id={cg_id}",
) != QMessageBox.StandardButton.Yes:
return
res = self.ctrl.traffic_appmarks_apply(
op="del",
target=target,
cgroup=str(cg_id),
)
if res.ok:
self._append_app_log(f"[appmarks] unmarked: target={target} cgroup_id={cg_id}")
self._set_action_status(f"Unmarked last: target={target} cgroup_id={cg_id}", ok=True)
else:
self._append_app_log(f"[appmarks] unmark error: {res.message}")
self._set_action_status(f"Unmark last failed: {res.message}", ok=False)
QMessageBox.critical(self, "Unmark error", res.message or "unmark failed")
self.refresh_appmarks_counts()
self._safe(work, title="Unmark error")
def on_appmarks_clear(self, target: str) -> None:
tgt = (target or "").strip().lower()
if tgt not in ("vpn", "direct"):
return
def work() -> None:
if QMessageBox.question(
self,
"Clear marks",
f"Clear ALL runtime app marks for target={tgt}?",
) != QMessageBox.StandardButton.Yes:
return
res = self.ctrl.traffic_appmarks_apply(op="clear", target=tgt)
if res.ok:
self._append_app_log(f"[appmarks] cleared: target={tgt}")
self._set_action_status(f"App marks cleared: target={tgt}", ok=True)
else:
self._append_app_log(f"[appmarks] clear error: {res.message}")
self._set_action_status(
f"App marks clear failed: target={tgt} ({res.message})",
ok=False,
)
QMessageBox.critical(
self, "Clear marks error", res.message or "unknown error"
)
self.refresh_appmarks_counts()
self._safe(work, title="Clear app marks error")
def on_rollback(self) -> None:
def work() -> None:
res = self.ctrl.routes_clear()
self._emit_log(res.pretty_text or "rollback done")
self._set_action_status(res.pretty_text or "routes cleared (cache saved)", ok=bool(res.ok))
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work, title="Rollback error")
def on_restore_cache(self) -> None:
def work() -> None:
res = self.ctrl.routes_cache_restore()
self._emit_log(res.pretty_text or "cache restore done")
self._set_action_status(res.pretty_text or "routes restored from cache", ok=bool(res.ok))
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work, title="Restore cache error")
class TrafficCandidatesDialog(QDialog):
def __init__(
self,
candidates,
*,
existing: dict[str, dict[str, set[str]]] | None = None,
add_cb: Callable[[str, str, list[str]], None],
parent=None,
) -> None:
super().__init__(parent)
self.cands = candidates
self.add_cb = add_cb
self.existing = existing or {"vpn": {}, "direct": {}}
self.setWindowTitle("Add detected overrides")
self.resize(820, 680)
root = QVBoxLayout(self)
note = QLabel(
"Tip: hover list items for details. Подсказка: наведи на элементы списка.\n"
"Detect results from backend. Nothing is applied until you click Apply overrides."
)
note.setWordWrap(True)
note.setStyleSheet("color: gray;")
root.addWidget(note)
self.chk_hide_existing = QCheckBox("Hide already added")
self.chk_hide_existing.setToolTip(
"""EN: Hides items that are already present in Force VPN/Force Direct fields.
RU: Скрывает элементы, которые уже есть в Force VPN/Force Direct."""
)
self.chk_hide_existing.stateChanged.connect(lambda _s: self._refilter_current())
root.addWidget(self.chk_hide_existing)
self.tabs = QTabWidget()
root.addWidget(self.tabs, stretch=1)
self._tab_kind: dict[QWidget, str] = {}
self._tab_list: dict[QWidget, QListWidget] = {}
self._tab_filter: dict[QWidget, QLineEdit] = {}
self._list_kind: dict[QListWidget, str] = {}
self._list_title: dict[QListWidget, str] = {}
self.tabs.currentChanged.connect(lambda _idx: self._refilter_current())
self._build_subnets_tab()
self._build_services_tab()
self._build_uids_tab()
row = QHBoxLayout()
btn_vpn = QPushButton("Add to Force VPN")
btn_vpn.clicked.connect(lambda: self._add_selected("vpn"))
row.addWidget(btn_vpn)
btn_direct = QPushButton("Add to Force Direct")
btn_direct.clicked.connect(lambda: self._add_selected("direct"))
row.addWidget(btn_direct)
row.addStretch(1)
btn_close = QPushButton("Close")
btn_close.clicked.connect(self.accept)
row.addWidget(btn_close)
root.addLayout(row)
self.lbl_status = QLabel("")
self.lbl_status.setWordWrap(True)
self.lbl_status.setStyleSheet("color: gray;")
root.addWidget(self.lbl_status)
def _mark_state(self, kind: str, value: str) -> tuple[bool, bool]:
k = (kind or "").strip().lower()
v = (value or "").strip()
if not k or not v:
return False, False
in_vpn = v in (self.existing.get("vpn", {}).get(k, set()) or set())
in_direct = v in (self.existing.get("direct", {}).get(k, set()) or set())
return bool(in_vpn), bool(in_direct)
def _set_status(self, msg: str, ok: bool | None = None) -> None:
text = (msg or "").strip() or ""
self.lbl_status.setText(text)
if ok is True:
self.lbl_status.setStyleSheet("color: green;")
elif ok is False:
self.lbl_status.setStyleSheet("color: red;")
else:
self.lbl_status.setStyleSheet("color: gray;")
def _apply_filter(self, lst: QListWidget, query: str) -> None:
q = (query or "").strip().lower()
hide_existing = bool(self.chk_hide_existing.isChecked())
kind = (self._list_kind.get(lst) or "").strip().lower()
# Subnets-only quick filters.
allow_lan = True
allow_docker = True
allow_link = True
allow_linkdown = True
if kind == "subnet":
allow_lan = bool(getattr(self, "chk_sub_show_lan", None) and self.chk_sub_show_lan.isChecked())
allow_docker = bool(getattr(self, "chk_sub_show_docker", None) and self.chk_sub_show_docker.isChecked())
allow_link = bool(getattr(self, "chk_sub_show_link", None) and self.chk_sub_show_link.isChecked())
allow_linkdown = not bool(getattr(self, "chk_sub_hide_linkdown", None) and self.chk_sub_hide_linkdown.isChecked())
for i in range(lst.count()):
it = lst.item(i)
if not it:
continue
if hide_existing and bool(it.data(QtCore.Qt.UserRole + 1) or False):
it.setHidden(True)
continue
if kind == "subnet":
it_kind = str(it.data(QtCore.Qt.UserRole + 4) or "").strip().lower()
it_linkdown = bool(it.data(QtCore.Qt.UserRole + 5) or False)
if it_linkdown and not allow_linkdown:
it.setHidden(True)
continue
if it_kind == "docker" and not allow_docker:
it.setHidden(True)
continue
if it_kind == "lan" and not allow_lan:
it.setHidden(True)
continue
if it_kind == "link" and not allow_link:
it.setHidden(True)
continue
if not q:
it.setHidden(False)
continue
it.setHidden(q not in it.text().lower())
def _refilter_current(self) -> None:
tab = self.tabs.currentWidget()
if tab is None:
return
lst = self._tab_list.get(tab)
filt = self._tab_filter.get(tab)
if lst is None or filt is None:
return
self._apply_filter(lst, filt.text())
def _filter_for_title(self, title: str) -> QLineEdit | None:
for i in range(self.tabs.count()):
if self.tabs.tabText(i) == title:
tab = self.tabs.widget(i)
return self._tab_filter.get(tab)
return None
def _preset_set_filter(self, title: str, text: str) -> None:
filt = self._filter_for_title(title)
if filt is not None:
filt.setText(text)
def _preset_filter_subnets(self, *, lan: bool, docker: bool, link: bool) -> None:
# EN: "Filter LAN/Docker" buttons should not rely on text search; they should
# EN: toggle the kind checkboxes and clear the text filter.
# RU: Кнопки "Filter LAN/Docker" не должны полагаться на текстовый поиск;
# RU: они должны переключать чекбоксы видов и очищать текстовый фильтр.
chk_lan = getattr(self, "chk_sub_show_lan", None)
chk_docker = getattr(self, "chk_sub_show_docker", None)
chk_link = getattr(self, "chk_sub_show_link", None)
if chk_lan is None or chk_docker is None or chk_link is None:
self._preset_set_filter("Subnets", "")
self._refilter_current()
return
for chk, val in (
(chk_lan, lan),
(chk_docker, docker),
(chk_link, link),
):
try:
chk.blockSignals(True)
chk.setChecked(bool(val))
finally:
chk.blockSignals(False)
self._preset_set_filter("Subnets", "")
self._refilter_current()
def _update_item_render(self, it: QListWidgetItem, kind: str) -> None:
value = str(it.data(QtCore.Qt.UserRole) or "").strip()
base_label = str(it.data(QtCore.Qt.UserRole + 2) or it.text() or "")
base_tip = str(it.data(QtCore.Qt.UserRole + 3) or it.toolTip() or "")
in_vpn, in_direct = self._mark_state(kind, value)
flags = []
if in_vpn:
flags.append("VPN")
if in_direct:
flags.append("DIRECT")
label = base_label
if flags:
label = f"{base_label} [{' + '.join(flags)}]"
it.setText(label)
it.setData(QtCore.Qt.UserRole + 1, bool(in_vpn or in_direct))
if base_tip.strip():
extra_tip = (
f"\n\nAlready in Force VPN: {'yes' if in_vpn else 'no'}\n"
f"Already in Force Direct: {'yes' if in_direct else 'no'}"
)
it.setToolTip(base_tip + extra_tip)
if in_vpn or in_direct:
it.setForeground(QtGui.QBrush(QtGui.QColor("gray")))
def _add_tab(self, title: str, kind: str, items: list[tuple[str, str, str]], *, extra=None) -> None:
tab = QWidget()
layout = QVBoxLayout(tab)
if extra is not None:
extra(layout)
filt = QLineEdit()
filt.setPlaceholderText("Filter...")
layout.addWidget(filt)
lst = QListWidget()
lst.setSelectionMode(QAbstractItemView.ExtendedSelection)
for entry in items:
label = str(entry[0]) if len(entry) > 0 else ""
value = str(entry[1]) if len(entry) > 1 else ""
tip = str(entry[2]) if len(entry) > 2 else ""
meta_kind = str(entry[3]) if len(entry) > 3 else ""
meta_linkdown = bool(entry[4]) if len(entry) > 4 else False
it = QListWidgetItem(label)
it.setData(QtCore.Qt.UserRole, value)
it.setData(QtCore.Qt.UserRole + 2, label) # base label (without [VPN]/[DIRECT])
it.setData(QtCore.Qt.UserRole + 3, tip) # base tooltip (without existing-state)
it.setData(QtCore.Qt.UserRole + 4, meta_kind)
it.setData(QtCore.Qt.UserRole + 5, meta_linkdown)
lst.addItem(it)
self._update_item_render(it, kind)
layout.addWidget(lst, stretch=1)
filt.textChanged.connect(lambda txt, l=lst: self._apply_filter(l, txt))
self.tabs.addTab(tab, title)
self._tab_kind[tab] = kind
self._tab_list[tab] = lst
self._tab_filter[tab] = filt
self._list_kind[lst] = kind
self._list_title[lst] = title
def _current_kind_and_list(self) -> tuple[str, QListWidget | None]:
tab = self.tabs.currentWidget()
if tab is None:
return "", None
return self._tab_kind.get(tab, ""), self._tab_list.get(tab)
def _add_selected(self, target: str) -> None:
kind, lst = self._current_kind_and_list()
if not kind or lst is None:
return
vals: list[str] = []
for it in lst.selectedItems():
v = it.data(QtCore.Qt.UserRole)
vv = str(v or "").strip()
if vv:
vals.append(vv)
# stable de-dupe
out: list[str] = []
seen: set[str] = set()
for v in vals:
if v in seen:
continue
seen.add(v)
out.append(v)
if not out:
self._set_status("Nothing selected", ok=None)
return
tgt = (target or "").strip().lower()
k = (kind or "").strip().lower()
other = "direct" if tgt == "vpn" else "vpn"
have_tgt = self.existing.get(tgt, {}).get(k, set()) or set()
have_other = self.existing.get(other, {}).get(k, set()) or set()
to_add: list[str] = []
skipped = 0
conflicts = 0
for v in out:
if v in have_tgt:
skipped += 1
continue
if v in have_other:
conflicts += 1
to_add.append(v)
if not to_add:
self._set_status(f"Nothing new to add (skipped={skipped}, conflicts={conflicts})", ok=None)
return
self.add_cb(target, kind, to_add)
# Update local state so UI marks newly added items immediately.
if tgt not in self.existing:
self.existing[tgt] = {}
if k not in self.existing[tgt]:
self.existing[tgt][k] = set()
for v in to_add:
self.existing[tgt][k].add(v)
for i in range(lst.count()):
it = lst.item(i)
if it is None:
continue
self._update_item_render(it, kind)
self._refilter_current()
msg = f"Added {len(to_add)} item(s) to Force {tgt.upper()} ({k})."
if skipped or conflicts:
msg += f" skipped={skipped} conflicts={conflicts}"
self._set_status(msg, ok=True)
def _list_for_title(self, title: str) -> QListWidget | None:
for i in range(self.tabs.count()):
if self.tabs.tabText(i) == title:
tab = self.tabs.widget(i)
return self._tab_list.get(tab)
return None
def _preset_clear_selection(self, title: str) -> None:
lst = self._list_for_title(title)
if lst is not None:
lst.clearSelection()
def _preset_select_services(self, keywords: list[str]) -> None:
lst = self._list_for_title("Services")
if lst is None:
return
keys = [str(k).strip().lower() for k in (keywords or []) if str(k).strip()]
if not keys:
return
lst.clearSelection()
for i in range(lst.count()):
it = lst.item(i)
if it is None:
continue
txt = (it.text() or "").lower()
val = str(it.data(QtCore.Qt.UserRole) or "").lower()
if any(k in txt or k in val for k in keys):
it.setSelected(True)
def _preset_select_uids(self, uids: list[int]) -> None:
lst = self._list_for_title("UIDs")
if lst is None:
return
want = {f"{int(u)}-{int(u)}" for u in (uids or [])}
if not want:
return
lst.clearSelection()
for i in range(lst.count()):
it = lst.item(i)
if it is None:
continue
token = str(it.data(QtCore.Qt.UserRole) or "").strip()
if token in want:
it.setSelected(True)
def _build_subnets_tab(self) -> None:
subs = list(getattr(self.cands, "subnets", []) or [])
items: list[tuple[str, str, str, str, bool]] = []
for s in subs:
cidr = str(getattr(s, "cidr", "") or "").strip()
if not cidr:
continue
dev = str(getattr(s, "dev", "") or "").strip()
kind = str(getattr(s, "kind", "") or "").strip()
linkdown = bool(getattr(s, "linkdown", False))
tags = []
if kind:
tags.append(kind)
if dev:
tags.append(dev)
if linkdown:
tags.append("linkdown")
tag_txt = " " + "[" + ", ".join(tags) + "]" if tags else ""
tip = (
f"CIDR: {cidr}\n"
f"kind={kind or '-'} dev={dev or '-'} linkdown={linkdown}\n\n"
"EN: Source subnet overrides affect forwarded traffic (Docker).\n"
"RU: Source subnet влияет на forwarded трафик (Docker)."
)
items.append((f"{cidr}{tag_txt}", cidr, tip, kind, linkdown))
def extra(layout: QVBoxLayout) -> None:
row = QHBoxLayout()
btn_lan = QPushButton("Keep LAN direct")
btn_lan.clicked.connect(lambda: self._preset_add_lan_direct())
row.addWidget(btn_lan)
btn_docker = QPushButton("Keep Docker direct")
btn_docker.clicked.connect(lambda: self._preset_add_docker_direct())
row.addWidget(btn_docker)
row.addStretch(1)
layout.addLayout(row)
row2 = QHBoxLayout()
btn_f_lan = QPushButton("Filter LAN")
btn_f_lan.clicked.connect(lambda: self._preset_filter_subnets(lan=True, docker=False, link=True))
row2.addWidget(btn_f_lan)
btn_f_docker = QPushButton("Filter Docker")
btn_f_docker.clicked.connect(lambda: self._preset_filter_subnets(lan=False, docker=True, link=False))
row2.addWidget(btn_f_docker)
btn_f_clear = QPushButton("Clear filter")
btn_f_clear.clicked.connect(lambda: self._preset_filter_subnets(lan=True, docker=True, link=True))
row2.addWidget(btn_f_clear)
row2.addStretch(1)
layout.addLayout(row2)
row3 = QHBoxLayout()
self.chk_sub_show_lan = QCheckBox("LAN")
self.chk_sub_show_lan.setChecked(True)
self.chk_sub_show_lan.setToolTip("EN: Show LAN subnets.\nRU: Показать LAN подсети.")
self.chk_sub_show_lan.stateChanged.connect(lambda _s: self._refilter_current())
row3.addWidget(self.chk_sub_show_lan)
self.chk_sub_show_docker = QCheckBox("Docker")
self.chk_sub_show_docker.setChecked(True)
self.chk_sub_show_docker.setToolTip("EN: Show Docker/container subnets.\nRU: Показать Docker/контейнерные подсети.")
self.chk_sub_show_docker.stateChanged.connect(lambda _s: self._refilter_current())
row3.addWidget(self.chk_sub_show_docker)
self.chk_sub_show_link = QCheckBox("Link (scope link)")
self.chk_sub_show_link.setChecked(True)
self.chk_sub_show_link.setToolTip("EN: Show link-scope routes.\nRU: Показать маршруты scope link.")
self.chk_sub_show_link.stateChanged.connect(lambda _s: self._refilter_current())
row3.addWidget(self.chk_sub_show_link)
self.chk_sub_hide_linkdown = QCheckBox("Hide linkdown")
self.chk_sub_hide_linkdown.setChecked(True)
self.chk_sub_hide_linkdown.setToolTip("EN: Hide routes marked as linkdown.\nRU: Скрыть маршруты с меткой linkdown.")
self.chk_sub_hide_linkdown.stateChanged.connect(lambda _s: self._refilter_current())
row3.addWidget(self.chk_sub_hide_linkdown)
row3.addStretch(1)
layout.addLayout(row3)
self._add_tab("Subnets", "subnet", items, extra=extra)
def _preset_add_lan_direct(self) -> None:
subs = list(getattr(self.cands, "subnets", []) or [])
vals: list[str] = []
for s in subs:
kind = str(getattr(s, "kind", "") or "").strip()
cidr = str(getattr(s, "cidr", "") or "").strip()
if not cidr:
continue
if kind in ("lan", "link"):
vals.append(cidr)
if vals:
self.add_cb("direct", "subnet", vals)
def _preset_add_docker_direct(self) -> None:
subs = list(getattr(self.cands, "subnets", []) or [])
vals: list[str] = []
for s in subs:
kind = str(getattr(s, "kind", "") or "").strip()
cidr = str(getattr(s, "cidr", "") or "").strip()
if not cidr:
continue
if kind == "docker":
vals.append(cidr)
if vals:
self.add_cb("direct", "subnet", vals)
def _build_services_tab(self) -> None:
units = list(getattr(self.cands, "units", []) or [])
items: list[tuple[str, str, str]] = []
for u in units:
unit = str(getattr(u, "unit", "") or "").strip()
if not unit:
continue
desc = str(getattr(u, "description", "") or "").strip()
cgroup = str(getattr(u, "cgroup", "") or "").strip() or unit
label = unit
if desc:
label += " - " + desc
tip = (
f"Unit: {unit}\n"
f"Cgroup token: {cgroup}\n\n"
"EN: Adds a cgroup override; backend resolves it to UID rules at apply time.\n"
"RU: Добавляет cgroup override; backend резолвит его в UID правила при применении."
)
items.append((label, cgroup, tip))
def extra(layout: QVBoxLayout) -> None:
row = QHBoxLayout()
btn_docker = QPushButton("Select docker/container")
btn_docker.clicked.connect(lambda: self._preset_select_services(["docker", "containerd", "podman"]))
row.addWidget(btn_docker)
btn_media = QPushButton("Select media (jellyfin/plex)")
btn_media.clicked.connect(lambda: self._preset_select_services(["jellyfin", "plex", "emby"]))
row.addWidget(btn_media)
btn_clear = QPushButton("Clear selection")
btn_clear.clicked.connect(lambda: self._preset_clear_selection("Services"))
row.addWidget(btn_clear)
row.addStretch(1)
layout.addLayout(row)
row2 = QHBoxLayout()
btn_f_docker = QPushButton("Filter docker")
btn_f_docker.clicked.connect(lambda: self._preset_set_filter("Services", "docker"))
row2.addWidget(btn_f_docker)
btn_f_media = QPushButton("Filter media")
btn_f_media.clicked.connect(lambda: self._preset_set_filter("Services", "jellyfin"))
row2.addWidget(btn_f_media)
btn_f_clear = QPushButton("Clear filter")
btn_f_clear.clicked.connect(lambda: self._preset_set_filter("Services", ""))
row2.addWidget(btn_f_clear)
row2.addStretch(1)
layout.addLayout(row2)
self._add_tab("Services", "cgroup", items, extra=extra)
def _build_uids_tab(self) -> None:
uids = list(getattr(self.cands, "uids", []) or [])
items: list[tuple[str, str, str]] = []
for u in uids:
try:
uid = int(getattr(u, "uid", 0) or 0)
except Exception:
continue
user = str(getattr(u, "user", "") or "").strip()
examples = list(getattr(u, "examples", []) or [])
ex_txt = ", ".join([str(x) for x in examples if str(x).strip()])
label = str(uid)
if user:
label += f" ({user})"
if ex_txt:
label += " - " + ex_txt
token = f"{uid}-{uid}"
tip = (
f"UID: {uid}\n"
f"User: {user or '-'}\n"
f"Examples: {ex_txt or '-'}\n\n"
"EN: UID rules affect host-local processes (OUTPUT).\n"
"RU: UID правила влияют на процессы хоста (OUTPUT)."
)
items.append((label, token, tip))
def extra(layout: QVBoxLayout) -> None:
row = QHBoxLayout()
btn_me = QPushButton("Select my UID")
btn_me.clicked.connect(lambda: self._preset_select_uids([os.getuid()]))
row.addWidget(btn_me)
btn_root = QPushButton("Select root UID")
btn_root.clicked.connect(lambda: self._preset_select_uids([0]))
row.addWidget(btn_root)
btn_clear = QPushButton("Clear selection")
btn_clear.clicked.connect(lambda: self._preset_clear_selection("UIDs"))
row.addWidget(btn_clear)
row.addStretch(1)
layout.addLayout(row)
self._add_tab("UIDs", "uid", items, extra=extra)
# ---------------------------------------------------------------------
# App picker (.desktop entries)
# ---------------------------------------------------------------------
def _desktop_bool(v: str) -> bool:
return str(v or "").strip().lower() in _DESKTOP_BOOL_TRUE
def _desktop_name_from_section(sec: configparser.SectionProxy) -> str:
name = str(sec.get("Name", "") or "").strip()
if name:
return name
# Prefer Russian if present, then English, then any localized variant.
for key in ("Name[ru]", "Name[en_US]", "Name[en]"):
v = str(sec.get(key, "") or "").strip()
if v:
return v
for k, v in sec.items():
kk = str(k or "")
if kk.startswith("Name[") and str(v or "").strip():
return str(v).strip()
return ""
def _sanitize_desktop_exec(exec_raw: str, *, name: str = "", desktop_path: str = "") -> str:
raw = str(exec_raw or "").strip()
if not raw:
return ""
# Desktop spec: "%%" -> literal "%".
raw = raw.replace("%%", "%")
# Best-effort expansion for a couple of common fields.
if name:
raw = raw.replace("%c", name)
if desktop_path:
raw = raw.replace("%k", desktop_path)
try:
tokens = shlex.split(raw)
except Exception:
tokens = raw.split()
out: list[str] = []
for t in tokens:
tok = str(t or "").strip()
if not tok:
continue
# Drop standalone field codes (%u, %U, %f, ...).
if len(tok) == 2 and tok.startswith("%"):
continue
# Remove field codes inside tokens (e.g. --name=%c).
tok = _DESKTOP_EXEC_FIELD_RE.sub("", tok).strip()
if tok:
out.append(tok)
if not out:
return ""
return " ".join(shlex.quote(x) for x in out)
def _desktop_entry_from_file(path: str, *, source: str) -> Optional[DesktopAppEntry]:
p = str(path or "").strip()
if not p or not p.endswith(".desktop"):
return None
cp = configparser.ConfigParser(interpolation=None, strict=False)
cp.optionxform = str # keep case
try:
cp.read(p, encoding="utf-8")
except Exception:
try:
data = pathlib.Path(p).read_bytes()
cp.read_string(data.decode("utf-8", errors="replace"))
except Exception:
return None
if "Desktop Entry" not in cp:
return None
sec = cp["Desktop Entry"]
if _desktop_bool(sec.get("Hidden", "")) or _desktop_bool(sec.get("NoDisplay", "")):
return None
typ = str(sec.get("Type", "") or "").strip().lower()
if typ and typ != "application":
return None
exec_raw = str(sec.get("Exec", "") or "").strip()
if not exec_raw:
return None
name = _desktop_name_from_section(sec)
desktop_id = pathlib.Path(p).name
if not name:
name = desktop_id.replace(".desktop", "")
src = str(source or "").strip().lower() or "system"
return DesktopAppEntry(
desktop_id=desktop_id,
name=name,
exec_raw=exec_raw,
path=p,
source=src,
)
def _scan_desktop_entries() -> list[DesktopAppEntry]:
home = os.path.expanduser("~")
dirs: list[tuple[str, str]] = [
(os.path.join(home, ".local/share/applications"), "user"),
("/usr/local/share/applications", "system"),
("/usr/share/applications", "system"),
(os.path.join(home, ".local/share/flatpak/exports/share/applications"), "flatpak"),
("/var/lib/flatpak/exports/share/applications", "flatpak"),
]
seen: set[tuple[str, str]] = set()
out: list[DesktopAppEntry] = []
for d, src in dirs:
if not d or not os.path.isdir(d):
continue
try:
paths = sorted(pathlib.Path(d).glob("*.desktop"))
except Exception:
continue
for fp in paths:
ent = _desktop_entry_from_file(str(fp), source=src)
if ent is None:
continue
key = (ent.desktop_id, ent.source)
if key in seen:
continue
seen.add(key)
out.append(ent)
out.sort(key=lambda e: (e.name.lower(), e.source, e.desktop_id.lower()))
return out
class AppPickerDialog(QDialog):
def __init__(self, *, parent=None) -> None:
super().__init__(parent)
self.setWindowTitle("Pick app (.desktop)")
self.resize(860, 640)
self._selected_cmd: str = ""
self._entries: list[DesktopAppEntry] = _scan_desktop_entries()
root = QVBoxLayout(self)
note = QLabel(
"EN: Pick an installed GUI app from .desktop entries (system + flatpak). "
"The command will be filled without %u/%U/%f placeholders.\n"
"RU: Выбери приложение из .desktop (system + flatpak). "
"Команда будет заполнена без плейсхолдеров %u/%U/%f."
)
note.setWordWrap(True)
note.setStyleSheet("color: gray;")
root.addWidget(note)
row = QHBoxLayout()
row.addWidget(QLabel("Search"))
self.ed_search = QLineEdit()
self.ed_search.setPlaceholderText("Type to filter (name / id / exec)...")
row.addWidget(self.ed_search, stretch=1)
self.lbl_count = QLabel("")
self.lbl_count.setStyleSheet("color: gray;")
row.addWidget(self.lbl_count)
root.addLayout(row)
self.lst = QListWidget()
self.lst.setSelectionMode(QAbstractItemView.SingleSelection)
root.addWidget(self.lst, stretch=1)
self.preview = QPlainTextEdit()
self.preview.setReadOnly(True)
self.preview.setFixedHeight(180)
root.addWidget(self.preview)
row_btn = QHBoxLayout()
self.btn_use = QPushButton("Use selected")
self.btn_use.clicked.connect(self.on_use_selected)
row_btn.addWidget(self.btn_use)
row_btn.addStretch(1)
btn_close = QPushButton("Close")
btn_close.clicked.connect(self.reject)
row_btn.addWidget(btn_close)
root.addLayout(row_btn)
self._populate()
self.ed_search.textChanged.connect(lambda _t: self._apply_filter())
self.lst.currentItemChanged.connect(lambda _a, _b: self._update_preview())
self.lst.itemDoubleClicked.connect(lambda _it: self.on_use_selected())
QtCore.QTimer.singleShot(0, self._apply_filter)
def selected_command(self) -> str:
return str(self._selected_cmd or "")
def _populate(self) -> None:
self.lst.clear()
for ent in self._entries:
src = ent.source
label = f"{ent.name} [{src}] ({ent.desktop_id})"
tip = (
f"Name: {ent.name}\n"
f"ID: {ent.desktop_id}\n"
f"Source: {src}\n"
f"Path: {ent.path}\n\n"
f"Exec: {ent.exec_raw}"
)
it = QListWidgetItem(label)
it.setToolTip(tip)
it.setData(QtCore.Qt.UserRole, ent)
# precomputed search string
it.setData(
QtCore.Qt.UserRole + 1,
(label + "\n" + ent.exec_raw + "\n" + ent.path).lower(),
)
self.lst.addItem(it)
if self.lst.count() > 0:
self.lst.setCurrentRow(0)
self._update_preview()
def _apply_filter(self) -> None:
q = (self.ed_search.text() or "").strip().lower()
shown = 0
for i in range(self.lst.count()):
it = self.lst.item(i)
if not it:
continue
hay = str(it.data(QtCore.Qt.UserRole + 1) or "")
hide = bool(q) and q not in hay
it.setHidden(hide)
if not hide:
shown += 1
self.lbl_count.setText(f"Apps: {shown}/{self.lst.count()}")
def _current_entry(self) -> Optional[DesktopAppEntry]:
it = self.lst.currentItem()
if not it:
return None
ent = it.data(QtCore.Qt.UserRole)
if isinstance(ent, DesktopAppEntry):
return ent
return None
def _update_preview(self) -> None:
ent = self._current_entry()
if ent is None:
self.preview.setPlainText("")
self.btn_use.setEnabled(False)
return
cmd = _sanitize_desktop_exec(ent.exec_raw, name=ent.name, desktop_path=ent.path)
text = (
f"Name: {ent.name}\n"
f"ID: {ent.desktop_id}\n"
f"Source: {ent.source}\n"
f"Path: {ent.path}\n\n"
f"Exec (raw):\n{ent.exec_raw}\n\n"
f"Command (sanitized):\n{cmd}"
)
self.preview.setPlainText(text)
self.btn_use.setEnabled(bool(cmd.strip()))
def on_use_selected(self) -> None:
ent = self._current_entry()
if ent is None:
return
cmd = _sanitize_desktop_exec(ent.exec_raw, name=ent.name, desktop_path=ent.path).strip()
if not cmd:
QMessageBox.warning(self, "No command", "Selected app has no usable Exec command.")
return
self._selected_cmd = cmd
self.accept()