Files
elmprodvpn/selective-vpn-gui/traffic_mode_dialog.py
2026-02-14 17:31:32 +03:00

1453 lines
59 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
from __future__ import annotations
import os
import shlex
import subprocess
import time
from typing import Callable
from PySide6 import QtCore, QtGui
from PySide6.QtWidgets import (
QButtonGroup,
QCheckBox,
QComboBox,
QDialog,
QGroupBox,
QHBoxLayout,
QLabel,
QLineEdit,
QListWidget,
QListWidgetItem,
QAbstractItemView,
QMessageBox,
QPlainTextEdit,
QPushButton,
QRadioButton,
QSpinBox,
QTabWidget,
QVBoxLayout,
QWidget,
)
from dashboard_controller import DashboardController
class TrafficModeDialog(QDialog):
def __init__(
self,
controller: DashboardController,
*,
log_cb: Callable[[str], None] | None = None,
refresh_cb: Callable[[], None] | None = None,
parent=None,
) -> None:
super().__init__(parent)
self.ctrl = controller
self.log_cb = log_cb
self.refresh_cb = refresh_cb
self.setWindowTitle("Traffic mode settings")
self.resize(780, 760)
root = QVBoxLayout(self)
hint_group = QGroupBox("Mode behavior")
hint_layout = QVBoxLayout(hint_group)
hint_layout.addWidget(QLabel("Selective: only marked traffic goes via VPN."))
hint_layout.addWidget(QLabel("Full tunnel: all traffic goes via VPN."))
hint_layout.addWidget(QLabel("Direct: VPN routing rules are disabled."))
warn = QLabel(
"Warning: Full tunnel can break local/LAN access depending on your host routes."
)
warn.setStyleSheet("color: red;")
hint_layout.addWidget(warn)
root.addWidget(hint_group)
tip = QLabel("Tip: hover any control for help. Подсказка: наведи на элемент для описания.")
tip.setWordWrap(True)
tip.setStyleSheet("color: gray;")
root.addWidget(tip)
self.tabs = QTabWidget()
root.addWidget(self.tabs, stretch=1)
tab_basic = QWidget()
tab_basic_layout = QVBoxLayout(tab_basic)
mode_group = QGroupBox("Traffic mode relay")
mode_layout = QVBoxLayout(mode_group)
row_mode = QHBoxLayout()
self.rad_selective = QRadioButton("Selective")
self.rad_selective.setToolTip("""EN: Only marked traffic (fwmark 0x66) uses VPN policy table (agvpn).
RU: Только помеченный трафик (fwmark 0x66) идет через policy-table (agvpn).""")
self.rad_selective.toggled.connect(
lambda checked: self.on_mode_toggle("selective", checked)
)
row_mode.addWidget(self.rad_selective)
self.rad_full = QRadioButton("Full tunnel")
self.rad_full.setToolTip("""EN: All traffic uses VPN policy table (agvpn). Use with auto-local bypass for LAN/docker.
RU: Весь трафик идет через policy-table (agvpn). Для LAN/docker включай auto-local bypass.""")
self.rad_full.toggled.connect(
lambda checked: self.on_mode_toggle("full_tunnel", checked)
)
row_mode.addWidget(self.rad_full)
self.rad_direct = QRadioButton("Direct")
self.rad_direct.setToolTip("""EN: Disables base VPN routing rules (no full/selective rule).
RU: Отключает базовые VPN policy-rules (нет full/selective правила).""")
self.rad_direct.toggled.connect(
lambda checked: self.on_mode_toggle("direct", checked)
)
row_mode.addWidget(self.rad_direct)
row_mode.addStretch(1)
mode_layout.addLayout(row_mode)
row_iface = QHBoxLayout()
row_iface.addWidget(QLabel("Preferred iface"))
self.cmb_iface = QComboBox()
self.cmb_iface.setToolTip("""EN: VPN interface for policy routing. Use auto unless you know the exact iface.
RU: Интерфейс VPN для policy routing. Оставь auto, если не уверен.""")
self.cmb_iface.setEditable(True)
self.cmb_iface.setInsertPolicy(QComboBox.NoInsert)
self.cmb_iface.setMinimumWidth(180)
row_iface.addWidget(self.cmb_iface)
self.btn_refresh_ifaces = QPushButton("Detect ifaces")
self.btn_refresh_ifaces.setToolTip("""EN: Refresh list of available interfaces (UP).
RU: Обновить список доступных интерфейсов (UP).""")
self.btn_refresh_ifaces.clicked.connect(self.on_refresh_ifaces)
row_iface.addWidget(self.btn_refresh_ifaces)
row_iface.addStretch(1)
mode_layout.addLayout(row_iface)
self.chk_auto_local = QCheckBox("Auto-local bypass (LAN/container subnets)")
self.chk_auto_local.setToolTip("""EN: Mirrors local/LAN/docker routes from main into agvpn table to prevent breakage in full tunnel.
EN: This does NOT force containers to use direct internet; use Force Direct subnets for that.
RU: Копирует локальные/LAN/docker маршруты из main в agvpn, чтобы не ломалась локалка в full tunnel.
RU: Это НЕ делает контейнеры direct в интернет; для этого используй Force Direct subnets.""")
self.chk_auto_local.stateChanged.connect(lambda _state: self.on_auto_local_toggle())
mode_layout.addWidget(self.chk_auto_local)
self.lbl_state = QLabel("Traffic mode: —")
self.lbl_state.setStyleSheet("color: gray;")
mode_layout.addWidget(self.lbl_state)
self.lbl_diag = QLabel("")
self.lbl_diag.setStyleSheet("color: gray;")
mode_layout.addWidget(self.lbl_diag)
tab_basic_layout.addWidget(mode_group)
maint_group = QGroupBox("Rollback / cache")
maint_layout = QHBoxLayout(maint_group)
self.btn_rollback = QPushButton("Clear routes (save cache)")
self.btn_rollback.setToolTip("""EN: Clears VPN routes and nft sets, but saves a cache snapshot for restore.
RU: Очищает VPN маршруты и nft-сеты, но сохраняет снапшот для восстановления.""")
self.btn_rollback.clicked.connect(self.on_rollback)
maint_layout.addWidget(self.btn_rollback)
self.btn_restore_cache = QPushButton("Restore cached routes")
self.btn_restore_cache.setToolTip("""EN: Restores routes/nft from the last clear snapshot. Skips non-critical route restore errors.
RU: Восстанавливает маршруты/nft из последнего снапшота clear. Некритичные ошибки восстановления пропускаются.""")
self.btn_restore_cache.clicked.connect(self.on_restore_cache)
maint_layout.addWidget(self.btn_restore_cache)
maint_layout.addStretch(1)
tab_basic_layout.addWidget(maint_group)
tab_basic_layout.addStretch(1)
self.tabs.addTab(tab_basic, "Traffic basics")
# -----------------------------------------------------------------
# Apps (runtime): systemd --user scope + backend appmarks
# -----------------------------------------------------------------
tab_apps = QWidget()
tab_apps_layout = QVBoxLayout(tab_apps)
apps_hint = QLabel(
"Runtime per-app routing (Wayland-friendly):\n"
"- Launch uses systemd-run --user --scope.\n"
"- Backend adds the scope cgroup into nftset -> fwmark rules.\n"
"- Marks are temporary (TTL). Use Policy overrides for persistent policy."
)
apps_hint.setWordWrap(True)
apps_hint.setStyleSheet("color: gray;")
tab_apps_layout.addWidget(apps_hint)
run_group = QGroupBox("Run app in scope + apply mark")
run_layout = QVBoxLayout(run_group)
row_cmd = QHBoxLayout()
row_cmd.addWidget(QLabel("Command"))
self.ed_app_cmd = QLineEdit()
self.ed_app_cmd.setPlaceholderText(
"e.g. firefox --private-window https://example.com"
)
self.ed_app_cmd.setToolTip(
"EN: Command line to run. This runs as current user in a systemd --user scope.\n"
"RU: Команда запуска. Запускается от текущего пользователя в systemd --user scope."
)
row_cmd.addWidget(self.ed_app_cmd, stretch=1)
run_layout.addLayout(row_cmd)
row_target = QHBoxLayout()
row_target.addWidget(QLabel("Route via"))
self.rad_app_vpn = QRadioButton("VPN")
self.rad_app_vpn.setToolTip(
"EN: Force this app traffic via VPN policy table (agvpn).\n"
"RU: Форсировать трафик приложения через VPN policy-table (agvpn)."
)
self.rad_app_direct = QRadioButton("Direct")
self.rad_app_direct.setToolTip(
"EN: Force this app traffic to bypass VPN (lookup main), even in full tunnel.\n"
"RU: Форсировать трафик приложения мимо VPN (lookup main), даже в full tunnel."
)
bg_app = QButtonGroup(self)
bg_app.addButton(self.rad_app_vpn)
bg_app.addButton(self.rad_app_direct)
self.rad_app_vpn.setChecked(True)
row_target.addWidget(self.rad_app_vpn)
row_target.addWidget(self.rad_app_direct)
row_target.addStretch(1)
run_layout.addLayout(row_target)
row_ttl = QHBoxLayout()
row_ttl.addWidget(QLabel("TTL (hours)"))
self.spn_app_ttl = QSpinBox()
self.spn_app_ttl.setRange(1, 24 * 30) # up to ~30 days
self.spn_app_ttl.setValue(24)
self.spn_app_ttl.setToolTip(
"EN: How long the runtime mark stays active (backend nftset element timeout).\n"
"RU: Сколько живет runtime-метка (timeout элемента в nftset)."
)
row_ttl.addWidget(self.spn_app_ttl)
row_ttl.addStretch(1)
run_layout.addLayout(row_ttl)
row_btn = QHBoxLayout()
self.btn_app_run = QPushButton("Run + apply mark")
self.btn_app_run.clicked.connect(self.on_app_run)
row_btn.addWidget(self.btn_app_run)
self.btn_app_refresh = QPushButton("Refresh counts")
self.btn_app_refresh.clicked.connect(self.refresh_appmarks_counts)
row_btn.addWidget(self.btn_app_refresh)
self.btn_app_clear_vpn = QPushButton("Clear VPN marks")
self.btn_app_clear_vpn.clicked.connect(lambda: self.on_appmarks_clear("vpn"))
row_btn.addWidget(self.btn_app_clear_vpn)
self.btn_app_clear_direct = QPushButton("Clear Direct marks")
self.btn_app_clear_direct.clicked.connect(
lambda: self.on_appmarks_clear("direct")
)
row_btn.addWidget(self.btn_app_clear_direct)
row_btn.addStretch(1)
run_layout.addLayout(row_btn)
self.lbl_app_counts = QLabel("Marks: —")
self.lbl_app_counts.setStyleSheet("color: gray;")
run_layout.addWidget(self.lbl_app_counts)
tab_apps_layout.addWidget(run_group)
self.txt_app = QPlainTextEdit()
self.txt_app.setReadOnly(True)
tab_apps_layout.addWidget(self.txt_app, stretch=1)
tab_apps_layout.addStretch(1)
self.tabs.addTab(tab_apps, "Apps (runtime)")
tab_adv = QWidget()
tab_adv_layout = QVBoxLayout(tab_adv)
self.ed_vpn_subnets = QPlainTextEdit()
self.ed_vpn_subnets.setToolTip("""EN: Force VPN by source subnet. Useful for docker subnets when you want containers via VPN.
RU: Принудительно через VPN по source subnet. Полезно для docker-подсетей, если хочешь контейнеры через VPN.""")
self.ed_vpn_subnets.setPlaceholderText("Force VPN by source subnet, one per line (e.g. 172.18.0.0/16)")
self.ed_vpn_subnets.setFixedHeight(72)
self.ed_vpn_uids = QPlainTextEdit()
self.ed_vpn_uids.setToolTip("""EN: Force VPN by UID/uidrange (host OUTPUT only). Does not affect forwarded docker traffic.
RU: Принудительно через VPN по UID (только процессы хоста). На forwarded docker-трафик не влияет.""")
self.ed_vpn_uids.setPlaceholderText("Force VPN by UID/UID range, one per line (e.g. 1000 or 1000-1010)")
self.ed_vpn_uids.setFixedHeight(60)
self.ed_vpn_cgroups = QPlainTextEdit()
self.ed_vpn_cgroups.setToolTip("""EN: Force VPN by systemd cgroup. Backend resolves cgroup -> PIDs -> UID rules at apply time.
RU: Принудительно через VPN по cgroup (systemd). Backend резолвит cgroup -> PID -> UID при применении.""")
self.ed_vpn_cgroups.setPlaceholderText("Force VPN by cgroup path/name, one per line")
self.ed_vpn_cgroups.setFixedHeight(60)
self.ed_direct_subnets = QPlainTextEdit()
self.ed_direct_subnets.setToolTip("""EN: Force Direct by source subnet. Useful to keep docker subnets direct in full tunnel.
RU: Принудительно direct по source subnet. Полезно, чтобы docker-подсети были direct в full tunnel.""")
self.ed_direct_subnets.setPlaceholderText("Force Direct by source subnet, one per line")
self.ed_direct_subnets.setFixedHeight(72)
self.ed_direct_uids = QPlainTextEdit()
self.ed_direct_uids.setToolTip("""EN: Force Direct by UID/uidrange (host OUTPUT only).
RU: Принудительно direct по UID (только процессы хоста).""")
self.ed_direct_uids.setPlaceholderText("Force Direct by UID/UID range, one per line")
self.ed_direct_uids.setFixedHeight(60)
self.ed_direct_cgroups = QPlainTextEdit()
self.ed_direct_cgroups.setToolTip("""EN: Force Direct by systemd cgroup (resolved to UID rules at apply time).
RU: Принудительно direct по cgroup (резолвится в UID правила при применении).""")
self.ed_direct_cgroups.setPlaceholderText("Force Direct by cgroup path/name, one per line")
self.ed_direct_cgroups.setFixedHeight(60)
cols = QHBoxLayout()
vpn_group = QGroupBox("Force VPN")
vpn_layout = QVBoxLayout(vpn_group)
vpn_layout.addWidget(QLabel("Source subnets"))
vpn_layout.addWidget(self.ed_vpn_subnets)
vpn_layout.addWidget(QLabel("UIDs"))
vpn_layout.addWidget(self.ed_vpn_uids)
vpn_layout.addWidget(QLabel("Cgroups / services"))
vpn_layout.addWidget(self.ed_vpn_cgroups)
cols.addWidget(vpn_group, stretch=1)
direct_group = QGroupBox("Force Direct")
direct_layout = QVBoxLayout(direct_group)
direct_layout.addWidget(QLabel("Source subnets"))
direct_layout.addWidget(self.ed_direct_subnets)
direct_layout.addWidget(QLabel("UIDs"))
direct_layout.addWidget(self.ed_direct_uids)
direct_layout.addWidget(QLabel("Cgroups / services"))
direct_layout.addWidget(self.ed_direct_cgroups)
cols.addWidget(direct_group, stretch=1)
tab_adv_layout.addLayout(cols, stretch=1)
row_adv = QHBoxLayout()
self.btn_pick_detected = QPushButton("Add detected...")
self.btn_pick_detected.setToolTip("""EN: Opens a selector with detected subnets/services/UIDs. Only fills fields; nothing is applied automatically.
RU: Открывает список обнаруженных subnet/service/UID. Только заполняет поля; ничего не применяется автоматически.""")
self.btn_pick_detected.clicked.connect(self.on_pick_detected)
row_adv.addWidget(self.btn_pick_detected)
self.btn_apply_overrides = QPushButton("Apply overrides")
self.btn_apply_overrides.setToolTip("""EN: Applies policy rules and verifies health. On failure backend rolls back.
RU: Применяет policy-rules и проверяет health. При ошибке backend делает откат.""")
self.btn_apply_overrides.clicked.connect(self.on_apply_overrides)
row_adv.addWidget(self.btn_apply_overrides)
self.btn_reload_overrides = QPushButton("Reload overrides")
self.btn_reload_overrides.clicked.connect(self.refresh_state)
row_adv.addWidget(self.btn_reload_overrides)
row_adv.addStretch(1)
tab_adv_layout.addLayout(row_adv)
self.tabs.addTab(tab_adv, "Policy overrides (Advanced)")
# EN: Small status line for last action performed in this dialog.
# RU: Строка статуса последнего действия в этом окне.
self.lbl_action = QLabel("")
self.lbl_action.setWordWrap(True)
self.lbl_action.setStyleSheet("color: gray;")
root.addWidget(self.lbl_action)
row_bottom = QHBoxLayout()
row_bottom.addStretch(1)
btn_close = QPushButton("Close")
btn_close.clicked.connect(self.accept)
row_bottom.addWidget(btn_close)
root.addLayout(row_bottom)
QtCore.QTimer.singleShot(0, self.refresh_state)
QtCore.QTimer.singleShot(0, self.refresh_appmarks_counts)
def _is_operation_error(self, message: str) -> bool:
low = (message or "").strip().lower()
return ("rolled back" in low) or ("apply failed" in low) or ("verification failed" in low)
def _set_action_status(self, msg: str, ok: bool | None = None) -> None:
text = (msg or "").strip() or ""
self.lbl_action.setText(text)
if ok is True:
self.lbl_action.setStyleSheet("color: green;")
elif ok is False:
self.lbl_action.setStyleSheet("color: red;")
else:
self.lbl_action.setStyleSheet("color: gray;")
def _safe(self, fn, *, title: str = "Traffic mode error") -> None:
try:
fn()
except Exception as e:
msg = f"[ui-error] {title}: {e}"
self._set_action_status(msg, ok=False)
self._emit_log(msg)
QMessageBox.critical(self, title, str(e))
def _emit_log(self, msg: str) -> None:
text = (msg or "").strip()
if not text:
return
if self.log_cb:
self.log_cb(text)
else:
try:
self.ctrl.log_gui(text)
except Exception:
pass
def _preferred_iface_value(self) -> str:
raw = self.cmb_iface.currentText().strip()
if raw.lower() in ("", "auto", "-", "default"):
return ""
return raw
def _set_preferred_iface_options(self, ifaces: list[str], selected: str) -> None:
vals = ["auto"] + [x for x in ifaces if x]
sel = selected.strip() if selected else "auto"
if not sel:
sel = "auto"
if sel not in vals:
vals.append(sel)
self.cmb_iface.blockSignals(True)
self.cmb_iface.clear()
self.cmb_iface.addItems(vals)
idx = self.cmb_iface.findText(sel)
if idx < 0:
idx = self.cmb_iface.findText("auto")
if idx >= 0:
self.cmb_iface.setCurrentIndex(idx)
else:
self.cmb_iface.setEditText(sel)
self.cmb_iface.blockSignals(False)
def _lines_from_text(self, txt: str) -> list[str]:
out: list[str] = []
for raw in (txt or "").replace("\r", "\n").split("\n"):
line = raw.strip()
if line:
out.append(line)
return out
def _set_lines(self, widget: QPlainTextEdit, vals: list[str]) -> None:
widget.blockSignals(True)
widget.setPlainText("\n".join([x for x in vals if str(x).strip()]))
widget.blockSignals(False)
def _merge_lines(self, widget: QPlainTextEdit, vals: list[str]) -> int:
cur = self._lines_from_text(widget.toPlainText())
seen = {x.strip() for x in cur}
added = 0
for v in (vals or []):
vv = str(v).strip()
if not vv or vv in seen:
continue
cur.append(vv)
seen.add(vv)
added += 1
if added > 0:
self._set_lines(widget, cur)
return added
def _candidates_add(self, target: str, kind: str, values: list[str]) -> None:
tgt = (target or "").strip().lower()
k = (kind or "").strip().lower()
if tgt not in ("vpn", "direct"):
return
widget: QPlainTextEdit | None = None
if tgt == "vpn":
if k == "subnet":
widget = self.ed_vpn_subnets
elif k == "uid":
widget = self.ed_vpn_uids
elif k == "cgroup":
widget = self.ed_vpn_cgroups
else:
if k == "subnet":
widget = self.ed_direct_subnets
elif k == "uid":
widget = self.ed_direct_uids
elif k == "cgroup":
widget = self.ed_direct_cgroups
if widget is None:
return
added = self._merge_lines(widget, values or [])
if added > 0:
msg = f"Traffic candidates added: target={tgt} kind={k} added={added}"
self._set_action_status(msg, ok=True)
self._emit_log(msg)
else:
msg = f"Traffic candidates add: nothing new (target={tgt} kind={k})"
self._set_action_status(msg, ok=None)
self._emit_log(msg)
def on_pick_detected(self) -> None:
def work() -> None:
cands = self.ctrl.traffic_candidates()
existing = {
"vpn": {
"subnet": set(self._lines_from_text(self.ed_vpn_subnets.toPlainText())),
"uid": set(self._lines_from_text(self.ed_vpn_uids.toPlainText())),
"cgroup": set(self._lines_from_text(self.ed_vpn_cgroups.toPlainText())),
},
"direct": {
"subnet": set(self._lines_from_text(self.ed_direct_subnets.toPlainText())),
"uid": set(self._lines_from_text(self.ed_direct_uids.toPlainText())),
"cgroup": set(self._lines_from_text(self.ed_direct_cgroups.toPlainText())),
},
}
dlg = TrafficCandidatesDialog(
cands,
existing=existing,
add_cb=self._candidates_add,
parent=self,
)
dlg.exec()
self._safe(work, title="Traffic candidates error")
def _set_mode_state(
self,
desired_mode: str,
applied_mode: str,
preferred_iface: str,
auto_local_bypass: bool,
bypass_candidates: int,
overrides_applied: int,
cgroup_resolved_uids: int,
cgroup_warning: str,
healthy: bool,
probe_ok: bool,
probe_message: str,
active_iface: str,
iface_reason: str,
message: str,
) -> None:
desired = (desired_mode or "").strip().lower() or "selective"
applied = (applied_mode or "").strip().lower() or "direct"
if healthy:
color = "green"
health_txt = "OK"
else:
color = "red"
health_txt = "MISMATCH"
text = f"Traffic mode: {desired} (applied: {applied}) [{health_txt}]"
diag_parts = []
diag_parts.append(f"preferred={preferred_iface or 'auto'}")
diag_parts.append(
f"auto_local_bypass={'on' if auto_local_bypass else 'off'}"
)
if bypass_candidates > 0:
diag_parts.append(f"bypass_routes={bypass_candidates}")
diag_parts.append(f"overrides={overrides_applied}")
if cgroup_resolved_uids > 0:
diag_parts.append(f"cgroup_uids={cgroup_resolved_uids}")
if cgroup_warning:
diag_parts.append(f"cgroup_warning={cgroup_warning}")
if active_iface:
diag_parts.append(f"iface={active_iface}")
if iface_reason:
diag_parts.append(f"source={iface_reason}")
diag_parts.append(f"probe={'ok' if probe_ok else 'fail'}")
if probe_message:
diag_parts.append(probe_message)
if message:
diag_parts.append(message)
diag = " | ".join(diag_parts) if diag_parts else ""
self.lbl_state.setText(text)
self.lbl_state.setStyleSheet(f"color: {color};")
self.lbl_diag.setText(diag)
self.lbl_diag.setStyleSheet("color: gray;")
def refresh_state(self) -> None:
def work() -> None:
view = self.ctrl.traffic_mode_view()
mode = (view.desired_mode or "selective").strip().lower()
self.rad_selective.blockSignals(True)
self.rad_full.blockSignals(True)
self.rad_direct.blockSignals(True)
self.rad_selective.setChecked(mode == "selective")
self.rad_full.setChecked(mode == "full_tunnel")
self.rad_direct.setChecked(mode == "direct")
self.rad_selective.blockSignals(False)
self.rad_full.blockSignals(False)
self.rad_direct.blockSignals(False)
opts = self.ctrl.traffic_interfaces()
self._set_preferred_iface_options(opts, view.preferred_iface)
self.chk_auto_local.blockSignals(True)
self.chk_auto_local.setChecked(bool(view.auto_local_bypass))
self.chk_auto_local.blockSignals(False)
self._set_lines(self.ed_vpn_subnets, list(view.force_vpn_subnets or []))
self._set_lines(self.ed_vpn_uids, list(view.force_vpn_uids or []))
self._set_lines(self.ed_vpn_cgroups, list(view.force_vpn_cgroups or []))
self._set_lines(self.ed_direct_subnets, list(view.force_direct_subnets or []))
self._set_lines(self.ed_direct_uids, list(view.force_direct_uids or []))
self._set_lines(self.ed_direct_cgroups, list(view.force_direct_cgroups or []))
self._set_mode_state(
view.desired_mode,
view.applied_mode,
view.preferred_iface,
bool(view.auto_local_bypass),
int(view.bypass_candidates),
int(view.overrides_applied),
int(view.cgroup_resolved_uids),
view.cgroup_warning,
bool(view.healthy),
bool(view.probe_ok),
view.probe_message,
view.active_iface,
view.iface_reason,
view.message,
)
self._safe(work)
def on_mode_toggle(self, mode: str, checked: bool) -> None:
if not checked:
return
def work() -> None:
preferred = self._preferred_iface_value()
auto_local = self.chk_auto_local.isChecked()
view = self.ctrl.traffic_mode_set(mode, preferred, auto_local)
msg = (
f"Traffic mode set: desired={view.desired_mode}, "
f"applied={view.applied_mode}, iface={view.active_iface or '-'}, "
f"preferred={preferred or 'auto'}, probe_ok={view.probe_ok}, "
f"healthy={view.healthy}, auto_local_bypass={view.auto_local_bypass}, "
f"bypass_routes={view.bypass_candidates}, overrides={view.overrides_applied}, "
f"cgroup_uids={view.cgroup_resolved_uids}, message={view.message}"
)
self._emit_log(msg)
op_ok = bool(view.healthy) and not self._is_operation_error(view.message)
self._set_action_status(
f"Traffic mode set: desired={view.desired_mode} applied={view.applied_mode} message={view.message}",
ok=op_ok,
)
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work)
def on_refresh_ifaces(self) -> None:
def work() -> None:
view = self.ctrl.traffic_mode_view()
opts = self.ctrl.traffic_interfaces()
self._set_preferred_iface_options(opts, view.preferred_iface)
self._emit_log(
"Traffic ifaces refreshed: "
f"preferred={view.preferred_iface or 'auto'} "
f"active={view.active_iface or '-'}"
)
self._set_action_status("Traffic ifaces refreshed", ok=True)
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work, title="Traffic iface detect error")
def _selected_mode(self) -> str:
if self.rad_full.isChecked():
return "full_tunnel"
if self.rad_direct.isChecked():
return "direct"
return "selective"
def on_auto_local_toggle(self) -> None:
def work() -> None:
mode = self._selected_mode()
preferred = self._preferred_iface_value()
auto_local = self.chk_auto_local.isChecked()
view = self.ctrl.traffic_mode_set(mode, preferred, auto_local)
msg = (
f"Traffic auto-local set: mode={view.desired_mode}, "
f"auto_local_bypass={view.auto_local_bypass}, "
f"bypass_routes={view.bypass_candidates}, overrides={view.overrides_applied}, "
f"cgroup_uids={view.cgroup_resolved_uids}, message={view.message}"
)
self._emit_log(msg)
op_ok = bool(view.healthy) and not self._is_operation_error(view.message)
self._set_action_status(
f"Auto-local bypass set: {'on' if view.auto_local_bypass else 'off'} ({view.message})",
ok=op_ok,
)
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work, title="Auto-local bypass error")
def on_apply_overrides(self) -> None:
def work() -> None:
mode = self._selected_mode()
preferred = self._preferred_iface_value()
auto_local = self.chk_auto_local.isChecked()
vpn_subnets = self._lines_from_text(self.ed_vpn_subnets.toPlainText())
vpn_uids = self._lines_from_text(self.ed_vpn_uids.toPlainText())
vpn_cgroups = self._lines_from_text(self.ed_vpn_cgroups.toPlainText())
direct_subnets = self._lines_from_text(self.ed_direct_subnets.toPlainText())
direct_uids = self._lines_from_text(self.ed_direct_uids.toPlainText())
direct_cgroups = self._lines_from_text(self.ed_direct_cgroups.toPlainText())
view = self.ctrl.traffic_mode_set(
mode,
preferred,
auto_local,
vpn_subnets,
vpn_uids,
vpn_cgroups,
direct_subnets,
direct_uids,
direct_cgroups,
)
msg = (
f"Traffic overrides applied: mode={view.desired_mode}, "
f"vpn_subnets={len(view.force_vpn_subnets)}, vpn_uids={len(view.force_vpn_uids)}, vpn_cgroups={len(view.force_vpn_cgroups)}, "
f"direct_subnets={len(view.force_direct_subnets)}, direct_uids={len(view.force_direct_uids)}, direct_cgroups={len(view.force_direct_cgroups)}, "
f"overrides={view.overrides_applied}, cgroup_uids={view.cgroup_resolved_uids}, "
f"healthy={view.healthy}, message={view.message}"
)
self._emit_log(msg)
op_ok = bool(view.healthy) and not self._is_operation_error(view.message)
self._set_action_status(
f"Overrides applied: overrides={view.overrides_applied} message={view.message}",
ok=op_ok,
)
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work, title="Apply overrides error")
# -----------------------------------------------------------------
# Apps (runtime) tab
# -----------------------------------------------------------------
def _append_app_log(self, msg: str) -> None:
line = (msg or "").rstrip()
if not line:
return
try:
self.txt_app.appendPlainText(line)
except Exception:
pass
self._emit_log(line)
def refresh_appmarks_counts(self) -> None:
try:
st = self.ctrl.traffic_appmarks_status()
self.lbl_app_counts.setText(
f"Marks: VPN={st.vpn_count}, Direct={st.direct_count}"
)
except Exception as e:
self.lbl_app_counts.setText(f"Marks: error: {e}")
def _run_systemd_scope(self, cmdline: str, *, unit: str) -> tuple[str, str]:
args = shlex.split(cmdline or "")
if not args:
raise ValueError("empty command")
run_cmd = [
"systemd-run",
"--user",
"--scope",
"--unit",
unit,
"--collect",
"--same-dir",
] + args
p = subprocess.run(run_cmd, capture_output=True, text=True, check=False)
out = ((p.stdout or "") + (p.stderr or "")).strip()
if p.returncode != 0:
raise RuntimeError(f"systemd-run failed: {p.returncode}\n{out}".strip())
p2 = subprocess.run(
["systemctl", "--user", "show", "-p", "ControlGroup", "--value", unit],
capture_output=True,
text=True,
check=False,
)
out2 = ((p2.stdout or "") + (p2.stderr or "")).strip()
cg = (p2.stdout or "").strip()
if p2.returncode != 0 or not cg:
raise RuntimeError(f"failed to query ControlGroup\n{out2}".strip())
return cg, out
def on_app_run(self) -> None:
def work() -> None:
cmdline = (self.ed_app_cmd.text() or "").strip()
if not cmdline:
QMessageBox.warning(self, "Missing command", "Please enter a command to run.")
return
target = "vpn" if self.rad_app_vpn.isChecked() else "direct"
ttl_sec = int(self.spn_app_ttl.value()) * 3600
unit = f"svpn-{target}-{int(time.time())}.scope"
self._append_app_log(
f"[app] launching: target={target} ttl={ttl_sec}s unit={unit}"
)
cg, out = self._run_systemd_scope(cmdline, unit=unit)
if out:
self._append_app_log(f"[app] systemd-run:\n{out}")
self._append_app_log(f"[app] ControlGroup: {cg}")
res = self.ctrl.traffic_appmarks_apply(
op="add",
target=target,
cgroup=cg,
timeout_sec=ttl_sec,
)
if res.ok:
self._append_app_log(
f"[appmarks] OK: {res.message} cgroup_id={res.cgroup_id} timeout={res.timeout_sec}s"
)
self._set_action_status(
f"App mark added: target={target} cgroup_id={res.cgroup_id}",
ok=True,
)
else:
self._append_app_log(f"[appmarks] ERROR: {res.message}")
self._set_action_status(
f"App mark failed: target={target} ({res.message})",
ok=False,
)
QMessageBox.critical(
self,
"App mark error",
res.message or "unknown error",
)
self.refresh_appmarks_counts()
self._safe(work, title="Run app error")
def on_appmarks_clear(self, target: str) -> None:
tgt = (target or "").strip().lower()
if tgt not in ("vpn", "direct"):
return
def work() -> None:
if QMessageBox.question(
self,
"Clear marks",
f"Clear ALL runtime app marks for target={tgt}?",
) != QMessageBox.StandardButton.Yes:
return
res = self.ctrl.traffic_appmarks_apply(op="clear", target=tgt)
if res.ok:
self._append_app_log(f"[appmarks] cleared: target={tgt}")
self._set_action_status(f"App marks cleared: target={tgt}", ok=True)
else:
self._append_app_log(f"[appmarks] clear error: {res.message}")
self._set_action_status(
f"App marks clear failed: target={tgt} ({res.message})",
ok=False,
)
QMessageBox.critical(
self, "Clear marks error", res.message or "unknown error"
)
self.refresh_appmarks_counts()
self._safe(work, title="Clear app marks error")
def on_rollback(self) -> None:
def work() -> None:
res = self.ctrl.routes_clear()
self._emit_log(res.pretty_text or "rollback done")
self._set_action_status(res.pretty_text or "routes cleared (cache saved)", ok=bool(res.ok))
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work, title="Rollback error")
def on_restore_cache(self) -> None:
def work() -> None:
res = self.ctrl.routes_cache_restore()
self._emit_log(res.pretty_text or "cache restore done")
self._set_action_status(res.pretty_text or "routes restored from cache", ok=bool(res.ok))
self.refresh_state()
if self.refresh_cb:
self.refresh_cb()
self._safe(work, title="Restore cache error")
class TrafficCandidatesDialog(QDialog):
def __init__(
self,
candidates,
*,
existing: dict[str, dict[str, set[str]]] | None = None,
add_cb: Callable[[str, str, list[str]], None],
parent=None,
) -> None:
super().__init__(parent)
self.cands = candidates
self.add_cb = add_cb
self.existing = existing or {"vpn": {}, "direct": {}}
self.setWindowTitle("Add detected overrides")
self.resize(820, 680)
root = QVBoxLayout(self)
note = QLabel(
"Tip: hover list items for details. Подсказка: наведи на элементы списка.\n"
"Detect results from backend. Nothing is applied until you click Apply overrides."
)
note.setWordWrap(True)
note.setStyleSheet("color: gray;")
root.addWidget(note)
self.chk_hide_existing = QCheckBox("Hide already added")
self.chk_hide_existing.setToolTip(
"""EN: Hides items that are already present in Force VPN/Force Direct fields.
RU: Скрывает элементы, которые уже есть в Force VPN/Force Direct."""
)
self.chk_hide_existing.stateChanged.connect(lambda _s: self._refilter_current())
root.addWidget(self.chk_hide_existing)
self.tabs = QTabWidget()
root.addWidget(self.tabs, stretch=1)
self._tab_kind: dict[QWidget, str] = {}
self._tab_list: dict[QWidget, QListWidget] = {}
self._tab_filter: dict[QWidget, QLineEdit] = {}
self._list_kind: dict[QListWidget, str] = {}
self._list_title: dict[QListWidget, str] = {}
self.tabs.currentChanged.connect(lambda _idx: self._refilter_current())
self._build_subnets_tab()
self._build_services_tab()
self._build_uids_tab()
row = QHBoxLayout()
btn_vpn = QPushButton("Add to Force VPN")
btn_vpn.clicked.connect(lambda: self._add_selected("vpn"))
row.addWidget(btn_vpn)
btn_direct = QPushButton("Add to Force Direct")
btn_direct.clicked.connect(lambda: self._add_selected("direct"))
row.addWidget(btn_direct)
row.addStretch(1)
btn_close = QPushButton("Close")
btn_close.clicked.connect(self.accept)
row.addWidget(btn_close)
root.addLayout(row)
self.lbl_status = QLabel("")
self.lbl_status.setWordWrap(True)
self.lbl_status.setStyleSheet("color: gray;")
root.addWidget(self.lbl_status)
def _mark_state(self, kind: str, value: str) -> tuple[bool, bool]:
k = (kind or "").strip().lower()
v = (value or "").strip()
if not k or not v:
return False, False
in_vpn = v in (self.existing.get("vpn", {}).get(k, set()) or set())
in_direct = v in (self.existing.get("direct", {}).get(k, set()) or set())
return bool(in_vpn), bool(in_direct)
def _set_status(self, msg: str, ok: bool | None = None) -> None:
text = (msg or "").strip() or ""
self.lbl_status.setText(text)
if ok is True:
self.lbl_status.setStyleSheet("color: green;")
elif ok is False:
self.lbl_status.setStyleSheet("color: red;")
else:
self.lbl_status.setStyleSheet("color: gray;")
def _apply_filter(self, lst: QListWidget, query: str) -> None:
q = (query or "").strip().lower()
hide_existing = bool(self.chk_hide_existing.isChecked())
kind = (self._list_kind.get(lst) or "").strip().lower()
# Subnets-only quick filters.
allow_lan = True
allow_docker = True
allow_link = True
allow_linkdown = True
if kind == "subnet":
allow_lan = bool(getattr(self, "chk_sub_show_lan", None) and self.chk_sub_show_lan.isChecked())
allow_docker = bool(getattr(self, "chk_sub_show_docker", None) and self.chk_sub_show_docker.isChecked())
allow_link = bool(getattr(self, "chk_sub_show_link", None) and self.chk_sub_show_link.isChecked())
allow_linkdown = not bool(getattr(self, "chk_sub_hide_linkdown", None) and self.chk_sub_hide_linkdown.isChecked())
for i in range(lst.count()):
it = lst.item(i)
if not it:
continue
if hide_existing and bool(it.data(QtCore.Qt.UserRole + 1) or False):
it.setHidden(True)
continue
if kind == "subnet":
it_kind = str(it.data(QtCore.Qt.UserRole + 4) or "").strip().lower()
it_linkdown = bool(it.data(QtCore.Qt.UserRole + 5) or False)
if it_linkdown and not allow_linkdown:
it.setHidden(True)
continue
if it_kind == "docker" and not allow_docker:
it.setHidden(True)
continue
if it_kind == "lan" and not allow_lan:
it.setHidden(True)
continue
if it_kind == "link" and not allow_link:
it.setHidden(True)
continue
if not q:
it.setHidden(False)
continue
it.setHidden(q not in it.text().lower())
def _refilter_current(self) -> None:
tab = self.tabs.currentWidget()
if tab is None:
return
lst = self._tab_list.get(tab)
filt = self._tab_filter.get(tab)
if lst is None or filt is None:
return
self._apply_filter(lst, filt.text())
def _filter_for_title(self, title: str) -> QLineEdit | None:
for i in range(self.tabs.count()):
if self.tabs.tabText(i) == title:
tab = self.tabs.widget(i)
return self._tab_filter.get(tab)
return None
def _preset_set_filter(self, title: str, text: str) -> None:
filt = self._filter_for_title(title)
if filt is not None:
filt.setText(text)
def _preset_filter_subnets(self, *, lan: bool, docker: bool, link: bool) -> None:
# EN: "Filter LAN/Docker" buttons should not rely on text search; they should
# EN: toggle the kind checkboxes and clear the text filter.
# RU: Кнопки "Filter LAN/Docker" не должны полагаться на текстовый поиск;
# RU: они должны переключать чекбоксы видов и очищать текстовый фильтр.
chk_lan = getattr(self, "chk_sub_show_lan", None)
chk_docker = getattr(self, "chk_sub_show_docker", None)
chk_link = getattr(self, "chk_sub_show_link", None)
if chk_lan is None or chk_docker is None or chk_link is None:
self._preset_set_filter("Subnets", "")
self._refilter_current()
return
for chk, val in (
(chk_lan, lan),
(chk_docker, docker),
(chk_link, link),
):
try:
chk.blockSignals(True)
chk.setChecked(bool(val))
finally:
chk.blockSignals(False)
self._preset_set_filter("Subnets", "")
self._refilter_current()
def _update_item_render(self, it: QListWidgetItem, kind: str) -> None:
value = str(it.data(QtCore.Qt.UserRole) or "").strip()
base_label = str(it.data(QtCore.Qt.UserRole + 2) or it.text() or "")
base_tip = str(it.data(QtCore.Qt.UserRole + 3) or it.toolTip() or "")
in_vpn, in_direct = self._mark_state(kind, value)
flags = []
if in_vpn:
flags.append("VPN")
if in_direct:
flags.append("DIRECT")
label = base_label
if flags:
label = f"{base_label} [{' + '.join(flags)}]"
it.setText(label)
it.setData(QtCore.Qt.UserRole + 1, bool(in_vpn or in_direct))
if base_tip.strip():
extra_tip = (
f"\n\nAlready in Force VPN: {'yes' if in_vpn else 'no'}\n"
f"Already in Force Direct: {'yes' if in_direct else 'no'}"
)
it.setToolTip(base_tip + extra_tip)
if in_vpn or in_direct:
it.setForeground(QtGui.QBrush(QtGui.QColor("gray")))
def _add_tab(self, title: str, kind: str, items: list[tuple[str, str, str]], *, extra=None) -> None:
tab = QWidget()
layout = QVBoxLayout(tab)
if extra is not None:
extra(layout)
filt = QLineEdit()
filt.setPlaceholderText("Filter...")
layout.addWidget(filt)
lst = QListWidget()
lst.setSelectionMode(QAbstractItemView.ExtendedSelection)
for entry in items:
label = str(entry[0]) if len(entry) > 0 else ""
value = str(entry[1]) if len(entry) > 1 else ""
tip = str(entry[2]) if len(entry) > 2 else ""
meta_kind = str(entry[3]) if len(entry) > 3 else ""
meta_linkdown = bool(entry[4]) if len(entry) > 4 else False
it = QListWidgetItem(label)
it.setData(QtCore.Qt.UserRole, value)
it.setData(QtCore.Qt.UserRole + 2, label) # base label (without [VPN]/[DIRECT])
it.setData(QtCore.Qt.UserRole + 3, tip) # base tooltip (without existing-state)
it.setData(QtCore.Qt.UserRole + 4, meta_kind)
it.setData(QtCore.Qt.UserRole + 5, meta_linkdown)
lst.addItem(it)
self._update_item_render(it, kind)
layout.addWidget(lst, stretch=1)
filt.textChanged.connect(lambda txt, l=lst: self._apply_filter(l, txt))
self.tabs.addTab(tab, title)
self._tab_kind[tab] = kind
self._tab_list[tab] = lst
self._tab_filter[tab] = filt
self._list_kind[lst] = kind
self._list_title[lst] = title
def _current_kind_and_list(self) -> tuple[str, QListWidget | None]:
tab = self.tabs.currentWidget()
if tab is None:
return "", None
return self._tab_kind.get(tab, ""), self._tab_list.get(tab)
def _add_selected(self, target: str) -> None:
kind, lst = self._current_kind_and_list()
if not kind or lst is None:
return
vals: list[str] = []
for it in lst.selectedItems():
v = it.data(QtCore.Qt.UserRole)
vv = str(v or "").strip()
if vv:
vals.append(vv)
# stable de-dupe
out: list[str] = []
seen: set[str] = set()
for v in vals:
if v in seen:
continue
seen.add(v)
out.append(v)
if not out:
self._set_status("Nothing selected", ok=None)
return
tgt = (target or "").strip().lower()
k = (kind or "").strip().lower()
other = "direct" if tgt == "vpn" else "vpn"
have_tgt = self.existing.get(tgt, {}).get(k, set()) or set()
have_other = self.existing.get(other, {}).get(k, set()) or set()
to_add: list[str] = []
skipped = 0
conflicts = 0
for v in out:
if v in have_tgt:
skipped += 1
continue
if v in have_other:
conflicts += 1
to_add.append(v)
if not to_add:
self._set_status(f"Nothing new to add (skipped={skipped}, conflicts={conflicts})", ok=None)
return
self.add_cb(target, kind, to_add)
# Update local state so UI marks newly added items immediately.
if tgt not in self.existing:
self.existing[tgt] = {}
if k not in self.existing[tgt]:
self.existing[tgt][k] = set()
for v in to_add:
self.existing[tgt][k].add(v)
for i in range(lst.count()):
it = lst.item(i)
if it is None:
continue
self._update_item_render(it, kind)
self._refilter_current()
msg = f"Added {len(to_add)} item(s) to Force {tgt.upper()} ({k})."
if skipped or conflicts:
msg += f" skipped={skipped} conflicts={conflicts}"
self._set_status(msg, ok=True)
def _list_for_title(self, title: str) -> QListWidget | None:
for i in range(self.tabs.count()):
if self.tabs.tabText(i) == title:
tab = self.tabs.widget(i)
return self._tab_list.get(tab)
return None
def _preset_clear_selection(self, title: str) -> None:
lst = self._list_for_title(title)
if lst is not None:
lst.clearSelection()
def _preset_select_services(self, keywords: list[str]) -> None:
lst = self._list_for_title("Services")
if lst is None:
return
keys = [str(k).strip().lower() for k in (keywords or []) if str(k).strip()]
if not keys:
return
lst.clearSelection()
for i in range(lst.count()):
it = lst.item(i)
if it is None:
continue
txt = (it.text() or "").lower()
val = str(it.data(QtCore.Qt.UserRole) or "").lower()
if any(k in txt or k in val for k in keys):
it.setSelected(True)
def _preset_select_uids(self, uids: list[int]) -> None:
lst = self._list_for_title("UIDs")
if lst is None:
return
want = {f"{int(u)}-{int(u)}" for u in (uids or [])}
if not want:
return
lst.clearSelection()
for i in range(lst.count()):
it = lst.item(i)
if it is None:
continue
token = str(it.data(QtCore.Qt.UserRole) or "").strip()
if token in want:
it.setSelected(True)
def _build_subnets_tab(self) -> None:
subs = list(getattr(self.cands, "subnets", []) or [])
items: list[tuple[str, str, str, str, bool]] = []
for s in subs:
cidr = str(getattr(s, "cidr", "") or "").strip()
if not cidr:
continue
dev = str(getattr(s, "dev", "") or "").strip()
kind = str(getattr(s, "kind", "") or "").strip()
linkdown = bool(getattr(s, "linkdown", False))
tags = []
if kind:
tags.append(kind)
if dev:
tags.append(dev)
if linkdown:
tags.append("linkdown")
tag_txt = " " + "[" + ", ".join(tags) + "]" if tags else ""
tip = (
f"CIDR: {cidr}\n"
f"kind={kind or '-'} dev={dev or '-'} linkdown={linkdown}\n\n"
"EN: Source subnet overrides affect forwarded traffic (Docker).\n"
"RU: Source subnet влияет на forwarded трафик (Docker)."
)
items.append((f"{cidr}{tag_txt}", cidr, tip, kind, linkdown))
def extra(layout: QVBoxLayout) -> None:
row = QHBoxLayout()
btn_lan = QPushButton("Keep LAN direct")
btn_lan.clicked.connect(lambda: self._preset_add_lan_direct())
row.addWidget(btn_lan)
btn_docker = QPushButton("Keep Docker direct")
btn_docker.clicked.connect(lambda: self._preset_add_docker_direct())
row.addWidget(btn_docker)
row.addStretch(1)
layout.addLayout(row)
row2 = QHBoxLayout()
btn_f_lan = QPushButton("Filter LAN")
btn_f_lan.clicked.connect(lambda: self._preset_filter_subnets(lan=True, docker=False, link=True))
row2.addWidget(btn_f_lan)
btn_f_docker = QPushButton("Filter Docker")
btn_f_docker.clicked.connect(lambda: self._preset_filter_subnets(lan=False, docker=True, link=False))
row2.addWidget(btn_f_docker)
btn_f_clear = QPushButton("Clear filter")
btn_f_clear.clicked.connect(lambda: self._preset_filter_subnets(lan=True, docker=True, link=True))
row2.addWidget(btn_f_clear)
row2.addStretch(1)
layout.addLayout(row2)
row3 = QHBoxLayout()
self.chk_sub_show_lan = QCheckBox("LAN")
self.chk_sub_show_lan.setChecked(True)
self.chk_sub_show_lan.setToolTip("EN: Show LAN subnets.\nRU: Показать LAN подсети.")
self.chk_sub_show_lan.stateChanged.connect(lambda _s: self._refilter_current())
row3.addWidget(self.chk_sub_show_lan)
self.chk_sub_show_docker = QCheckBox("Docker")
self.chk_sub_show_docker.setChecked(True)
self.chk_sub_show_docker.setToolTip("EN: Show Docker/container subnets.\nRU: Показать Docker/контейнерные подсети.")
self.chk_sub_show_docker.stateChanged.connect(lambda _s: self._refilter_current())
row3.addWidget(self.chk_sub_show_docker)
self.chk_sub_show_link = QCheckBox("Link (scope link)")
self.chk_sub_show_link.setChecked(True)
self.chk_sub_show_link.setToolTip("EN: Show link-scope routes.\nRU: Показать маршруты scope link.")
self.chk_sub_show_link.stateChanged.connect(lambda _s: self._refilter_current())
row3.addWidget(self.chk_sub_show_link)
self.chk_sub_hide_linkdown = QCheckBox("Hide linkdown")
self.chk_sub_hide_linkdown.setChecked(True)
self.chk_sub_hide_linkdown.setToolTip("EN: Hide routes marked as linkdown.\nRU: Скрыть маршруты с меткой linkdown.")
self.chk_sub_hide_linkdown.stateChanged.connect(lambda _s: self._refilter_current())
row3.addWidget(self.chk_sub_hide_linkdown)
row3.addStretch(1)
layout.addLayout(row3)
self._add_tab("Subnets", "subnet", items, extra=extra)
def _preset_add_lan_direct(self) -> None:
subs = list(getattr(self.cands, "subnets", []) or [])
vals: list[str] = []
for s in subs:
kind = str(getattr(s, "kind", "") or "").strip()
cidr = str(getattr(s, "cidr", "") or "").strip()
if not cidr:
continue
if kind in ("lan", "link"):
vals.append(cidr)
if vals:
self.add_cb("direct", "subnet", vals)
def _preset_add_docker_direct(self) -> None:
subs = list(getattr(self.cands, "subnets", []) or [])
vals: list[str] = []
for s in subs:
kind = str(getattr(s, "kind", "") or "").strip()
cidr = str(getattr(s, "cidr", "") or "").strip()
if not cidr:
continue
if kind == "docker":
vals.append(cidr)
if vals:
self.add_cb("direct", "subnet", vals)
def _build_services_tab(self) -> None:
units = list(getattr(self.cands, "units", []) or [])
items: list[tuple[str, str, str]] = []
for u in units:
unit = str(getattr(u, "unit", "") or "").strip()
if not unit:
continue
desc = str(getattr(u, "description", "") or "").strip()
cgroup = str(getattr(u, "cgroup", "") or "").strip() or unit
label = unit
if desc:
label += " - " + desc
tip = (
f"Unit: {unit}\n"
f"Cgroup token: {cgroup}\n\n"
"EN: Adds a cgroup override; backend resolves it to UID rules at apply time.\n"
"RU: Добавляет cgroup override; backend резолвит его в UID правила при применении."
)
items.append((label, cgroup, tip))
def extra(layout: QVBoxLayout) -> None:
row = QHBoxLayout()
btn_docker = QPushButton("Select docker/container")
btn_docker.clicked.connect(lambda: self._preset_select_services(["docker", "containerd", "podman"]))
row.addWidget(btn_docker)
btn_media = QPushButton("Select media (jellyfin/plex)")
btn_media.clicked.connect(lambda: self._preset_select_services(["jellyfin", "plex", "emby"]))
row.addWidget(btn_media)
btn_clear = QPushButton("Clear selection")
btn_clear.clicked.connect(lambda: self._preset_clear_selection("Services"))
row.addWidget(btn_clear)
row.addStretch(1)
layout.addLayout(row)
row2 = QHBoxLayout()
btn_f_docker = QPushButton("Filter docker")
btn_f_docker.clicked.connect(lambda: self._preset_set_filter("Services", "docker"))
row2.addWidget(btn_f_docker)
btn_f_media = QPushButton("Filter media")
btn_f_media.clicked.connect(lambda: self._preset_set_filter("Services", "jellyfin"))
row2.addWidget(btn_f_media)
btn_f_clear = QPushButton("Clear filter")
btn_f_clear.clicked.connect(lambda: self._preset_set_filter("Services", ""))
row2.addWidget(btn_f_clear)
row2.addStretch(1)
layout.addLayout(row2)
self._add_tab("Services", "cgroup", items, extra=extra)
def _build_uids_tab(self) -> None:
uids = list(getattr(self.cands, "uids", []) or [])
items: list[tuple[str, str, str]] = []
for u in uids:
try:
uid = int(getattr(u, "uid", 0) or 0)
except Exception:
continue
user = str(getattr(u, "user", "") or "").strip()
examples = list(getattr(u, "examples", []) or [])
ex_txt = ", ".join([str(x) for x in examples if str(x).strip()])
label = str(uid)
if user:
label += f" ({user})"
if ex_txt:
label += " - " + ex_txt
token = f"{uid}-{uid}"
tip = (
f"UID: {uid}\n"
f"User: {user or '-'}\n"
f"Examples: {ex_txt or '-'}\n\n"
"EN: UID rules affect host-local processes (OUTPUT).\n"
"RU: UID правила влияют на процессы хоста (OUTPUT)."
)
items.append((label, token, tip))
def extra(layout: QVBoxLayout) -> None:
row = QHBoxLayout()
btn_me = QPushButton("Select my UID")
btn_me.clicked.connect(lambda: self._preset_select_uids([os.getuid()]))
row.addWidget(btn_me)
btn_root = QPushButton("Select root UID")
btn_root.clicked.connect(lambda: self._preset_select_uids([0]))
row.addWidget(btn_root)
btn_clear = QPushButton("Clear selection")
btn_clear.clicked.connect(lambda: self._preset_clear_selection("UIDs"))
row.addWidget(btn_clear)
row.addStretch(1)
layout.addLayout(row)
self._add_tab("UIDs", "uid", items, extra=extra)