1013 lines
41 KiB
Python
1013 lines
41 KiB
Python
#!/usr/bin/env python3
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
from typing import Callable
|
||
|
||
from PySide6 import QtCore, QtGui
|
||
from PySide6.QtWidgets import (
|
||
QCheckBox,
|
||
QComboBox,
|
||
QDialog,
|
||
QGroupBox,
|
||
QHBoxLayout,
|
||
QLabel,
|
||
QLineEdit,
|
||
QListWidget,
|
||
QListWidgetItem,
|
||
QAbstractItemView,
|
||
QMessageBox,
|
||
QPlainTextEdit,
|
||
QPushButton,
|
||
QRadioButton,
|
||
QTabWidget,
|
||
QVBoxLayout,
|
||
QWidget,
|
||
)
|
||
|
||
from dashboard_controller import DashboardController
|
||
|
||
|
||
class TrafficModeDialog(QDialog):
|
||
def __init__(
|
||
self,
|
||
controller: DashboardController,
|
||
*,
|
||
log_cb: Callable[[str], None] | None = None,
|
||
refresh_cb: Callable[[], None] | None = None,
|
||
parent=None,
|
||
) -> None:
|
||
super().__init__(parent)
|
||
self.ctrl = controller
|
||
self.log_cb = log_cb
|
||
self.refresh_cb = refresh_cb
|
||
|
||
self.setWindowTitle("Traffic mode settings")
|
||
self.resize(780, 760)
|
||
|
||
root = QVBoxLayout(self)
|
||
|
||
hint_group = QGroupBox("Mode behavior")
|
||
hint_layout = QVBoxLayout(hint_group)
|
||
hint_layout.addWidget(QLabel("Selective: only marked traffic goes via VPN."))
|
||
hint_layout.addWidget(QLabel("Full tunnel: all traffic goes via VPN."))
|
||
hint_layout.addWidget(QLabel("Direct: VPN routing rules are disabled."))
|
||
warn = QLabel(
|
||
"Warning: Full tunnel can break local/LAN access depending on your host routes."
|
||
)
|
||
warn.setStyleSheet("color: red;")
|
||
hint_layout.addWidget(warn)
|
||
root.addWidget(hint_group)
|
||
|
||
tip = QLabel("Tip: hover any control for help. Подсказка: наведи на элемент для описания.")
|
||
tip.setWordWrap(True)
|
||
tip.setStyleSheet("color: gray;")
|
||
root.addWidget(tip)
|
||
|
||
self.tabs = QTabWidget()
|
||
root.addWidget(self.tabs, stretch=1)
|
||
|
||
tab_basic = QWidget()
|
||
tab_basic_layout = QVBoxLayout(tab_basic)
|
||
|
||
mode_group = QGroupBox("Traffic mode relay")
|
||
mode_layout = QVBoxLayout(mode_group)
|
||
|
||
row_mode = QHBoxLayout()
|
||
self.rad_selective = QRadioButton("Selective")
|
||
self.rad_selective.setToolTip("""EN: Only marked traffic (fwmark 0x66) uses VPN policy table (agvpn).
|
||
RU: Только помеченный трафик (fwmark 0x66) идет через policy-table (agvpn).""")
|
||
self.rad_selective.toggled.connect(
|
||
lambda checked: self.on_mode_toggle("selective", checked)
|
||
)
|
||
row_mode.addWidget(self.rad_selective)
|
||
|
||
self.rad_full = QRadioButton("Full tunnel")
|
||
self.rad_full.setToolTip("""EN: All traffic uses VPN policy table (agvpn). Use with auto-local bypass for LAN/docker.
|
||
RU: Весь трафик идет через policy-table (agvpn). Для LAN/docker включай auto-local bypass.""")
|
||
self.rad_full.toggled.connect(
|
||
lambda checked: self.on_mode_toggle("full_tunnel", checked)
|
||
)
|
||
row_mode.addWidget(self.rad_full)
|
||
|
||
self.rad_direct = QRadioButton("Direct")
|
||
self.rad_direct.setToolTip("""EN: Disables base VPN routing rules (no full/selective rule).
|
||
RU: Отключает базовые VPN policy-rules (нет full/selective правила).""")
|
||
self.rad_direct.toggled.connect(
|
||
lambda checked: self.on_mode_toggle("direct", checked)
|
||
)
|
||
row_mode.addWidget(self.rad_direct)
|
||
row_mode.addStretch(1)
|
||
mode_layout.addLayout(row_mode)
|
||
|
||
row_iface = QHBoxLayout()
|
||
row_iface.addWidget(QLabel("Preferred iface"))
|
||
self.cmb_iface = QComboBox()
|
||
self.cmb_iface.setToolTip("""EN: VPN interface for policy routing. Use auto unless you know the exact iface.
|
||
RU: Интерфейс VPN для policy routing. Оставь auto, если не уверен.""")
|
||
self.cmb_iface.setEditable(True)
|
||
self.cmb_iface.setInsertPolicy(QComboBox.NoInsert)
|
||
self.cmb_iface.setMinimumWidth(180)
|
||
row_iface.addWidget(self.cmb_iface)
|
||
|
||
self.btn_refresh_ifaces = QPushButton("Detect ifaces")
|
||
self.btn_refresh_ifaces.setToolTip("""EN: Refresh list of available interfaces (UP).
|
||
RU: Обновить список доступных интерфейсов (UP).""")
|
||
self.btn_refresh_ifaces.clicked.connect(self.on_refresh_ifaces)
|
||
row_iface.addWidget(self.btn_refresh_ifaces)
|
||
row_iface.addStretch(1)
|
||
mode_layout.addLayout(row_iface)
|
||
|
||
self.chk_auto_local = QCheckBox("Auto-local bypass (LAN/container subnets)")
|
||
self.chk_auto_local.setToolTip("""EN: Mirrors local/LAN/docker routes from main into agvpn table to prevent breakage in full tunnel.
|
||
EN: This does NOT force containers to use direct internet; use Force Direct subnets for that.
|
||
RU: Копирует локальные/LAN/docker маршруты из main в agvpn, чтобы не ломалась локалка в full tunnel.
|
||
RU: Это НЕ делает контейнеры direct в интернет; для этого используй Force Direct subnets.""")
|
||
self.chk_auto_local.stateChanged.connect(lambda _state: self.on_auto_local_toggle())
|
||
mode_layout.addWidget(self.chk_auto_local)
|
||
|
||
self.lbl_state = QLabel("Traffic mode: —")
|
||
self.lbl_state.setStyleSheet("color: gray;")
|
||
mode_layout.addWidget(self.lbl_state)
|
||
|
||
self.lbl_diag = QLabel("—")
|
||
self.lbl_diag.setStyleSheet("color: gray;")
|
||
mode_layout.addWidget(self.lbl_diag)
|
||
|
||
tab_basic_layout.addWidget(mode_group)
|
||
|
||
maint_group = QGroupBox("Rollback / cache")
|
||
maint_layout = QHBoxLayout(maint_group)
|
||
self.btn_rollback = QPushButton("Clear routes (save cache)")
|
||
self.btn_rollback.setToolTip("""EN: Clears VPN routes and nft sets, but saves a cache snapshot for restore.
|
||
RU: Очищает VPN маршруты и nft-сеты, но сохраняет снапшот для восстановления.""")
|
||
self.btn_rollback.clicked.connect(self.on_rollback)
|
||
maint_layout.addWidget(self.btn_rollback)
|
||
self.btn_restore_cache = QPushButton("Restore cached routes")
|
||
self.btn_restore_cache.setToolTip("""EN: Restores routes/nft from the last clear snapshot. Skips non-critical route restore errors.
|
||
RU: Восстанавливает маршруты/nft из последнего снапшота clear. Некритичные ошибки восстановления пропускаются.""")
|
||
self.btn_restore_cache.clicked.connect(self.on_restore_cache)
|
||
maint_layout.addWidget(self.btn_restore_cache)
|
||
maint_layout.addStretch(1)
|
||
tab_basic_layout.addWidget(maint_group)
|
||
|
||
tab_basic_layout.addStretch(1)
|
||
self.tabs.addTab(tab_basic, "Traffic basics")
|
||
|
||
tab_adv = QWidget()
|
||
tab_adv_layout = QVBoxLayout(tab_adv)
|
||
|
||
self.ed_vpn_subnets = QPlainTextEdit()
|
||
self.ed_vpn_subnets.setToolTip("""EN: Force VPN by source subnet. Useful for docker subnets when you want containers via VPN.
|
||
RU: Принудительно через VPN по source subnet. Полезно для docker-подсетей, если хочешь контейнеры через VPN.""")
|
||
self.ed_vpn_subnets.setPlaceholderText("Force VPN by source subnet, one per line (e.g. 172.18.0.0/16)")
|
||
self.ed_vpn_subnets.setFixedHeight(72)
|
||
|
||
self.ed_vpn_uids = QPlainTextEdit()
|
||
self.ed_vpn_uids.setToolTip("""EN: Force VPN by UID/uidrange (host OUTPUT only). Does not affect forwarded docker traffic.
|
||
RU: Принудительно через VPN по UID (только процессы хоста). На forwarded docker-трафик не влияет.""")
|
||
self.ed_vpn_uids.setPlaceholderText("Force VPN by UID/UID range, one per line (e.g. 1000 or 1000-1010)")
|
||
self.ed_vpn_uids.setFixedHeight(60)
|
||
|
||
self.ed_vpn_cgroups = QPlainTextEdit()
|
||
self.ed_vpn_cgroups.setToolTip("""EN: Force VPN by systemd cgroup. Backend resolves cgroup -> PIDs -> UID rules at apply time.
|
||
RU: Принудительно через VPN по cgroup (systemd). Backend резолвит cgroup -> PID -> UID при применении.""")
|
||
self.ed_vpn_cgroups.setPlaceholderText("Force VPN by cgroup path/name, one per line")
|
||
self.ed_vpn_cgroups.setFixedHeight(60)
|
||
|
||
self.ed_direct_subnets = QPlainTextEdit()
|
||
self.ed_direct_subnets.setToolTip("""EN: Force Direct by source subnet. Useful to keep docker subnets direct in full tunnel.
|
||
RU: Принудительно direct по source subnet. Полезно, чтобы docker-подсети были direct в full tunnel.""")
|
||
self.ed_direct_subnets.setPlaceholderText("Force Direct by source subnet, one per line")
|
||
self.ed_direct_subnets.setFixedHeight(72)
|
||
|
||
self.ed_direct_uids = QPlainTextEdit()
|
||
self.ed_direct_uids.setToolTip("""EN: Force Direct by UID/uidrange (host OUTPUT only).
|
||
RU: Принудительно direct по UID (только процессы хоста).""")
|
||
self.ed_direct_uids.setPlaceholderText("Force Direct by UID/UID range, one per line")
|
||
self.ed_direct_uids.setFixedHeight(60)
|
||
|
||
self.ed_direct_cgroups = QPlainTextEdit()
|
||
self.ed_direct_cgroups.setToolTip("""EN: Force Direct by systemd cgroup (resolved to UID rules at apply time).
|
||
RU: Принудительно direct по cgroup (резолвится в UID правила при применении).""")
|
||
self.ed_direct_cgroups.setPlaceholderText("Force Direct by cgroup path/name, one per line")
|
||
self.ed_direct_cgroups.setFixedHeight(60)
|
||
|
||
cols = QHBoxLayout()
|
||
|
||
vpn_group = QGroupBox("Force VPN")
|
||
vpn_layout = QVBoxLayout(vpn_group)
|
||
vpn_layout.addWidget(QLabel("Source subnets"))
|
||
vpn_layout.addWidget(self.ed_vpn_subnets)
|
||
vpn_layout.addWidget(QLabel("UIDs"))
|
||
vpn_layout.addWidget(self.ed_vpn_uids)
|
||
vpn_layout.addWidget(QLabel("Cgroups / services"))
|
||
vpn_layout.addWidget(self.ed_vpn_cgroups)
|
||
cols.addWidget(vpn_group, stretch=1)
|
||
|
||
direct_group = QGroupBox("Force Direct")
|
||
direct_layout = QVBoxLayout(direct_group)
|
||
direct_layout.addWidget(QLabel("Source subnets"))
|
||
direct_layout.addWidget(self.ed_direct_subnets)
|
||
direct_layout.addWidget(QLabel("UIDs"))
|
||
direct_layout.addWidget(self.ed_direct_uids)
|
||
direct_layout.addWidget(QLabel("Cgroups / services"))
|
||
direct_layout.addWidget(self.ed_direct_cgroups)
|
||
cols.addWidget(direct_group, stretch=1)
|
||
|
||
tab_adv_layout.addLayout(cols, stretch=1)
|
||
|
||
row_adv = QHBoxLayout()
|
||
self.btn_pick_detected = QPushButton("Add detected...")
|
||
self.btn_pick_detected.setToolTip("""EN: Opens a selector with detected subnets/services/UIDs. Only fills fields; nothing is applied automatically.
|
||
RU: Открывает список обнаруженных subnet/service/UID. Только заполняет поля; ничего не применяется автоматически.""")
|
||
self.btn_pick_detected.clicked.connect(self.on_pick_detected)
|
||
row_adv.addWidget(self.btn_pick_detected)
|
||
self.btn_apply_overrides = QPushButton("Apply overrides")
|
||
self.btn_apply_overrides.setToolTip("""EN: Applies policy rules and verifies health. On failure backend rolls back.
|
||
RU: Применяет policy-rules и проверяет health. При ошибке backend делает откат.""")
|
||
self.btn_apply_overrides.clicked.connect(self.on_apply_overrides)
|
||
row_adv.addWidget(self.btn_apply_overrides)
|
||
self.btn_reload_overrides = QPushButton("Reload overrides")
|
||
self.btn_reload_overrides.clicked.connect(self.refresh_state)
|
||
row_adv.addWidget(self.btn_reload_overrides)
|
||
row_adv.addStretch(1)
|
||
tab_adv_layout.addLayout(row_adv)
|
||
|
||
self.tabs.addTab(tab_adv, "Policy overrides (Advanced)")
|
||
|
||
# EN: Small status line for last action performed in this dialog.
|
||
# RU: Строка статуса последнего действия в этом окне.
|
||
self.lbl_action = QLabel("—")
|
||
self.lbl_action.setWordWrap(True)
|
||
self.lbl_action.setStyleSheet("color: gray;")
|
||
root.addWidget(self.lbl_action)
|
||
|
||
row_bottom = QHBoxLayout()
|
||
row_bottom.addStretch(1)
|
||
btn_close = QPushButton("Close")
|
||
btn_close.clicked.connect(self.accept)
|
||
row_bottom.addWidget(btn_close)
|
||
root.addLayout(row_bottom)
|
||
|
||
QtCore.QTimer.singleShot(0, self.refresh_state)
|
||
|
||
def _is_operation_error(self, message: str) -> bool:
|
||
low = (message or "").strip().lower()
|
||
return ("rolled back" in low) or ("apply failed" in low) or ("verification failed" in low)
|
||
|
||
def _set_action_status(self, msg: str, ok: bool | None = None) -> None:
|
||
text = (msg or "").strip() or "—"
|
||
self.lbl_action.setText(text)
|
||
if ok is True:
|
||
self.lbl_action.setStyleSheet("color: green;")
|
||
elif ok is False:
|
||
self.lbl_action.setStyleSheet("color: red;")
|
||
else:
|
||
self.lbl_action.setStyleSheet("color: gray;")
|
||
|
||
def _safe(self, fn, *, title: str = "Traffic mode error") -> None:
|
||
try:
|
||
fn()
|
||
except Exception as e:
|
||
msg = f"[ui-error] {title}: {e}"
|
||
self._set_action_status(msg, ok=False)
|
||
self._emit_log(msg)
|
||
QMessageBox.critical(self, title, str(e))
|
||
|
||
def _emit_log(self, msg: str) -> None:
|
||
text = (msg or "").strip()
|
||
if not text:
|
||
return
|
||
if self.log_cb:
|
||
self.log_cb(text)
|
||
else:
|
||
try:
|
||
self.ctrl.log_gui(text)
|
||
except Exception:
|
||
pass
|
||
|
||
def _preferred_iface_value(self) -> str:
|
||
raw = self.cmb_iface.currentText().strip()
|
||
if raw.lower() in ("", "auto", "-", "default"):
|
||
return ""
|
||
return raw
|
||
|
||
def _set_preferred_iface_options(self, ifaces: list[str], selected: str) -> None:
|
||
vals = ["auto"] + [x for x in ifaces if x]
|
||
sel = selected.strip() if selected else "auto"
|
||
if not sel:
|
||
sel = "auto"
|
||
if sel not in vals:
|
||
vals.append(sel)
|
||
|
||
self.cmb_iface.blockSignals(True)
|
||
self.cmb_iface.clear()
|
||
self.cmb_iface.addItems(vals)
|
||
idx = self.cmb_iface.findText(sel)
|
||
if idx < 0:
|
||
idx = self.cmb_iface.findText("auto")
|
||
if idx >= 0:
|
||
self.cmb_iface.setCurrentIndex(idx)
|
||
else:
|
||
self.cmb_iface.setEditText(sel)
|
||
self.cmb_iface.blockSignals(False)
|
||
|
||
def _lines_from_text(self, txt: str) -> list[str]:
|
||
out: list[str] = []
|
||
for raw in (txt or "").replace("\r", "\n").split("\n"):
|
||
line = raw.strip()
|
||
if line:
|
||
out.append(line)
|
||
return out
|
||
|
||
def _set_lines(self, widget: QPlainTextEdit, vals: list[str]) -> None:
|
||
widget.blockSignals(True)
|
||
widget.setPlainText("\n".join([x for x in vals if str(x).strip()]))
|
||
widget.blockSignals(False)
|
||
|
||
def _merge_lines(self, widget: QPlainTextEdit, vals: list[str]) -> int:
|
||
cur = self._lines_from_text(widget.toPlainText())
|
||
seen = {x.strip() for x in cur}
|
||
added = 0
|
||
for v in (vals or []):
|
||
vv = str(v).strip()
|
||
if not vv or vv in seen:
|
||
continue
|
||
cur.append(vv)
|
||
seen.add(vv)
|
||
added += 1
|
||
if added > 0:
|
||
self._set_lines(widget, cur)
|
||
return added
|
||
|
||
def _candidates_add(self, target: str, kind: str, values: list[str]) -> None:
|
||
tgt = (target or "").strip().lower()
|
||
k = (kind or "").strip().lower()
|
||
if tgt not in ("vpn", "direct"):
|
||
return
|
||
|
||
widget: QPlainTextEdit | None = None
|
||
if tgt == "vpn":
|
||
if k == "subnet":
|
||
widget = self.ed_vpn_subnets
|
||
elif k == "uid":
|
||
widget = self.ed_vpn_uids
|
||
elif k == "cgroup":
|
||
widget = self.ed_vpn_cgroups
|
||
else:
|
||
if k == "subnet":
|
||
widget = self.ed_direct_subnets
|
||
elif k == "uid":
|
||
widget = self.ed_direct_uids
|
||
elif k == "cgroup":
|
||
widget = self.ed_direct_cgroups
|
||
|
||
if widget is None:
|
||
return
|
||
|
||
added = self._merge_lines(widget, values or [])
|
||
if added > 0:
|
||
msg = f"Traffic candidates added: target={tgt} kind={k} added={added}"
|
||
self._set_action_status(msg, ok=True)
|
||
self._emit_log(msg)
|
||
else:
|
||
msg = f"Traffic candidates add: nothing new (target={tgt} kind={k})"
|
||
self._set_action_status(msg, ok=None)
|
||
self._emit_log(msg)
|
||
|
||
def on_pick_detected(self) -> None:
|
||
def work() -> None:
|
||
cands = self.ctrl.traffic_candidates()
|
||
existing = {
|
||
"vpn": {
|
||
"subnet": set(self._lines_from_text(self.ed_vpn_subnets.toPlainText())),
|
||
"uid": set(self._lines_from_text(self.ed_vpn_uids.toPlainText())),
|
||
"cgroup": set(self._lines_from_text(self.ed_vpn_cgroups.toPlainText())),
|
||
},
|
||
"direct": {
|
||
"subnet": set(self._lines_from_text(self.ed_direct_subnets.toPlainText())),
|
||
"uid": set(self._lines_from_text(self.ed_direct_uids.toPlainText())),
|
||
"cgroup": set(self._lines_from_text(self.ed_direct_cgroups.toPlainText())),
|
||
},
|
||
}
|
||
dlg = TrafficCandidatesDialog(
|
||
cands,
|
||
existing=existing,
|
||
add_cb=self._candidates_add,
|
||
parent=self,
|
||
)
|
||
dlg.exec()
|
||
|
||
self._safe(work, title="Traffic candidates error")
|
||
|
||
|
||
def _set_mode_state(
|
||
self,
|
||
desired_mode: str,
|
||
applied_mode: str,
|
||
preferred_iface: str,
|
||
auto_local_bypass: bool,
|
||
bypass_candidates: int,
|
||
overrides_applied: int,
|
||
cgroup_resolved_uids: int,
|
||
cgroup_warning: str,
|
||
healthy: bool,
|
||
probe_ok: bool,
|
||
probe_message: str,
|
||
active_iface: str,
|
||
iface_reason: str,
|
||
message: str,
|
||
) -> None:
|
||
desired = (desired_mode or "").strip().lower() or "selective"
|
||
applied = (applied_mode or "").strip().lower() or "direct"
|
||
|
||
if healthy:
|
||
color = "green"
|
||
health_txt = "OK"
|
||
else:
|
||
color = "red"
|
||
health_txt = "MISMATCH"
|
||
|
||
text = f"Traffic mode: {desired} (applied: {applied}) [{health_txt}]"
|
||
diag_parts = []
|
||
diag_parts.append(f"preferred={preferred_iface or 'auto'}")
|
||
diag_parts.append(
|
||
f"auto_local_bypass={'on' if auto_local_bypass else 'off'}"
|
||
)
|
||
if bypass_candidates > 0:
|
||
diag_parts.append(f"bypass_routes={bypass_candidates}")
|
||
diag_parts.append(f"overrides={overrides_applied}")
|
||
if cgroup_resolved_uids > 0:
|
||
diag_parts.append(f"cgroup_uids={cgroup_resolved_uids}")
|
||
if cgroup_warning:
|
||
diag_parts.append(f"cgroup_warning={cgroup_warning}")
|
||
if active_iface:
|
||
diag_parts.append(f"iface={active_iface}")
|
||
if iface_reason:
|
||
diag_parts.append(f"source={iface_reason}")
|
||
diag_parts.append(f"probe={'ok' if probe_ok else 'fail'}")
|
||
if probe_message:
|
||
diag_parts.append(probe_message)
|
||
if message:
|
||
diag_parts.append(message)
|
||
diag = " | ".join(diag_parts) if diag_parts else "—"
|
||
|
||
self.lbl_state.setText(text)
|
||
self.lbl_state.setStyleSheet(f"color: {color};")
|
||
self.lbl_diag.setText(diag)
|
||
self.lbl_diag.setStyleSheet("color: gray;")
|
||
|
||
def refresh_state(self) -> None:
|
||
def work() -> None:
|
||
view = self.ctrl.traffic_mode_view()
|
||
mode = (view.desired_mode or "selective").strip().lower()
|
||
|
||
self.rad_selective.blockSignals(True)
|
||
self.rad_full.blockSignals(True)
|
||
self.rad_direct.blockSignals(True)
|
||
self.rad_selective.setChecked(mode == "selective")
|
||
self.rad_full.setChecked(mode == "full_tunnel")
|
||
self.rad_direct.setChecked(mode == "direct")
|
||
self.rad_selective.blockSignals(False)
|
||
self.rad_full.blockSignals(False)
|
||
self.rad_direct.blockSignals(False)
|
||
|
||
opts = self.ctrl.traffic_interfaces()
|
||
self._set_preferred_iface_options(opts, view.preferred_iface)
|
||
self.chk_auto_local.blockSignals(True)
|
||
self.chk_auto_local.setChecked(bool(view.auto_local_bypass))
|
||
self.chk_auto_local.blockSignals(False)
|
||
self._set_lines(self.ed_vpn_subnets, list(view.force_vpn_subnets or []))
|
||
self._set_lines(self.ed_vpn_uids, list(view.force_vpn_uids or []))
|
||
self._set_lines(self.ed_vpn_cgroups, list(view.force_vpn_cgroups or []))
|
||
self._set_lines(self.ed_direct_subnets, list(view.force_direct_subnets or []))
|
||
self._set_lines(self.ed_direct_uids, list(view.force_direct_uids or []))
|
||
self._set_lines(self.ed_direct_cgroups, list(view.force_direct_cgroups or []))
|
||
|
||
self._set_mode_state(
|
||
view.desired_mode,
|
||
view.applied_mode,
|
||
view.preferred_iface,
|
||
bool(view.auto_local_bypass),
|
||
int(view.bypass_candidates),
|
||
int(view.overrides_applied),
|
||
int(view.cgroup_resolved_uids),
|
||
view.cgroup_warning,
|
||
bool(view.healthy),
|
||
bool(view.probe_ok),
|
||
view.probe_message,
|
||
view.active_iface,
|
||
view.iface_reason,
|
||
view.message,
|
||
)
|
||
|
||
self._safe(work)
|
||
|
||
def on_mode_toggle(self, mode: str, checked: bool) -> None:
|
||
if not checked:
|
||
return
|
||
|
||
def work() -> None:
|
||
preferred = self._preferred_iface_value()
|
||
auto_local = self.chk_auto_local.isChecked()
|
||
view = self.ctrl.traffic_mode_set(mode, preferred, auto_local)
|
||
msg = (
|
||
f"Traffic mode set: desired={view.desired_mode}, "
|
||
f"applied={view.applied_mode}, iface={view.active_iface or '-'}, "
|
||
f"preferred={preferred or 'auto'}, probe_ok={view.probe_ok}, "
|
||
f"healthy={view.healthy}, auto_local_bypass={view.auto_local_bypass}, "
|
||
f"bypass_routes={view.bypass_candidates}, overrides={view.overrides_applied}, "
|
||
f"cgroup_uids={view.cgroup_resolved_uids}, message={view.message}"
|
||
)
|
||
self._emit_log(msg)
|
||
op_ok = bool(view.healthy) and not self._is_operation_error(view.message)
|
||
self._set_action_status(
|
||
f"Traffic mode set: desired={view.desired_mode} applied={view.applied_mode} message={view.message}",
|
||
ok=op_ok,
|
||
)
|
||
self.refresh_state()
|
||
if self.refresh_cb:
|
||
self.refresh_cb()
|
||
|
||
self._safe(work)
|
||
|
||
def on_refresh_ifaces(self) -> None:
|
||
def work() -> None:
|
||
view = self.ctrl.traffic_mode_view()
|
||
opts = self.ctrl.traffic_interfaces()
|
||
self._set_preferred_iface_options(opts, view.preferred_iface)
|
||
self._emit_log(
|
||
"Traffic ifaces refreshed: "
|
||
f"preferred={view.preferred_iface or 'auto'} "
|
||
f"active={view.active_iface or '-'}"
|
||
)
|
||
self._set_action_status("Traffic ifaces refreshed", ok=True)
|
||
self.refresh_state()
|
||
if self.refresh_cb:
|
||
self.refresh_cb()
|
||
|
||
self._safe(work, title="Traffic iface detect error")
|
||
|
||
def _selected_mode(self) -> str:
|
||
if self.rad_full.isChecked():
|
||
return "full_tunnel"
|
||
if self.rad_direct.isChecked():
|
||
return "direct"
|
||
return "selective"
|
||
|
||
def on_auto_local_toggle(self) -> None:
|
||
def work() -> None:
|
||
mode = self._selected_mode()
|
||
preferred = self._preferred_iface_value()
|
||
auto_local = self.chk_auto_local.isChecked()
|
||
view = self.ctrl.traffic_mode_set(mode, preferred, auto_local)
|
||
msg = (
|
||
f"Traffic auto-local set: mode={view.desired_mode}, "
|
||
f"auto_local_bypass={view.auto_local_bypass}, "
|
||
f"bypass_routes={view.bypass_candidates}, overrides={view.overrides_applied}, "
|
||
f"cgroup_uids={view.cgroup_resolved_uids}, message={view.message}"
|
||
)
|
||
self._emit_log(msg)
|
||
op_ok = bool(view.healthy) and not self._is_operation_error(view.message)
|
||
self._set_action_status(
|
||
f"Auto-local bypass set: {'on' if view.auto_local_bypass else 'off'} ({view.message})",
|
||
ok=op_ok,
|
||
)
|
||
self.refresh_state()
|
||
if self.refresh_cb:
|
||
self.refresh_cb()
|
||
|
||
self._safe(work, title="Auto-local bypass error")
|
||
|
||
def on_apply_overrides(self) -> None:
|
||
def work() -> None:
|
||
mode = self._selected_mode()
|
||
preferred = self._preferred_iface_value()
|
||
auto_local = self.chk_auto_local.isChecked()
|
||
vpn_subnets = self._lines_from_text(self.ed_vpn_subnets.toPlainText())
|
||
vpn_uids = self._lines_from_text(self.ed_vpn_uids.toPlainText())
|
||
vpn_cgroups = self._lines_from_text(self.ed_vpn_cgroups.toPlainText())
|
||
direct_subnets = self._lines_from_text(self.ed_direct_subnets.toPlainText())
|
||
direct_uids = self._lines_from_text(self.ed_direct_uids.toPlainText())
|
||
direct_cgroups = self._lines_from_text(self.ed_direct_cgroups.toPlainText())
|
||
|
||
view = self.ctrl.traffic_mode_set(
|
||
mode,
|
||
preferred,
|
||
auto_local,
|
||
vpn_subnets,
|
||
vpn_uids,
|
||
vpn_cgroups,
|
||
direct_subnets,
|
||
direct_uids,
|
||
direct_cgroups,
|
||
)
|
||
msg = (
|
||
f"Traffic overrides applied: mode={view.desired_mode}, "
|
||
f"vpn_subnets={len(view.force_vpn_subnets)}, vpn_uids={len(view.force_vpn_uids)}, vpn_cgroups={len(view.force_vpn_cgroups)}, "
|
||
f"direct_subnets={len(view.force_direct_subnets)}, direct_uids={len(view.force_direct_uids)}, direct_cgroups={len(view.force_direct_cgroups)}, "
|
||
f"overrides={view.overrides_applied}, cgroup_uids={view.cgroup_resolved_uids}, "
|
||
f"healthy={view.healthy}, message={view.message}"
|
||
)
|
||
self._emit_log(msg)
|
||
op_ok = bool(view.healthy) and not self._is_operation_error(view.message)
|
||
self._set_action_status(
|
||
f"Overrides applied: overrides={view.overrides_applied} message={view.message}",
|
||
ok=op_ok,
|
||
)
|
||
self.refresh_state()
|
||
if self.refresh_cb:
|
||
self.refresh_cb()
|
||
|
||
self._safe(work, title="Apply overrides error")
|
||
|
||
def on_rollback(self) -> None:
|
||
def work() -> None:
|
||
res = self.ctrl.routes_clear()
|
||
self._emit_log(res.pretty_text or "rollback done")
|
||
self._set_action_status(res.pretty_text or "routes cleared (cache saved)", ok=bool(res.ok))
|
||
self.refresh_state()
|
||
if self.refresh_cb:
|
||
self.refresh_cb()
|
||
|
||
self._safe(work, title="Rollback error")
|
||
|
||
def on_restore_cache(self) -> None:
|
||
def work() -> None:
|
||
res = self.ctrl.routes_cache_restore()
|
||
self._emit_log(res.pretty_text or "cache restore done")
|
||
self._set_action_status(res.pretty_text or "routes restored from cache", ok=bool(res.ok))
|
||
self.refresh_state()
|
||
if self.refresh_cb:
|
||
self.refresh_cb()
|
||
|
||
self._safe(work, title="Restore cache error")
|
||
|
||
|
||
class TrafficCandidatesDialog(QDialog):
|
||
def __init__(
|
||
self,
|
||
candidates,
|
||
*,
|
||
existing: dict[str, dict[str, set[str]]] | None = None,
|
||
add_cb: Callable[[str, str, list[str]], None],
|
||
parent=None,
|
||
) -> None:
|
||
super().__init__(parent)
|
||
self.cands = candidates
|
||
self.add_cb = add_cb
|
||
self.existing = existing or {"vpn": {}, "direct": {}}
|
||
|
||
self.setWindowTitle("Add detected overrides")
|
||
self.resize(820, 680)
|
||
|
||
root = QVBoxLayout(self)
|
||
|
||
note = QLabel(
|
||
"Tip: hover list items for details. Подсказка: наведи на элементы списка.\n"
|
||
"Detect results from backend. Nothing is applied until you click Apply overrides."
|
||
)
|
||
note.setWordWrap(True)
|
||
note.setStyleSheet("color: gray;")
|
||
root.addWidget(note)
|
||
|
||
self.chk_hide_existing = QCheckBox("Hide already added")
|
||
self.chk_hide_existing.setToolTip(
|
||
"""EN: Hides items that are already present in Force VPN/Force Direct fields.
|
||
RU: Скрывает элементы, которые уже есть в Force VPN/Force Direct."""
|
||
)
|
||
self.chk_hide_existing.stateChanged.connect(lambda _s: self._refilter_current())
|
||
root.addWidget(self.chk_hide_existing)
|
||
|
||
self.tabs = QTabWidget()
|
||
root.addWidget(self.tabs, stretch=1)
|
||
|
||
self._tab_kind: dict[QWidget, str] = {}
|
||
self._tab_list: dict[QWidget, QListWidget] = {}
|
||
self._tab_filter: dict[QWidget, QLineEdit] = {}
|
||
self.tabs.currentChanged.connect(lambda _idx: self._refilter_current())
|
||
|
||
self._build_subnets_tab()
|
||
self._build_services_tab()
|
||
self._build_uids_tab()
|
||
|
||
row = QHBoxLayout()
|
||
btn_vpn = QPushButton("Add to Force VPN")
|
||
btn_vpn.clicked.connect(lambda: self._add_selected("vpn"))
|
||
row.addWidget(btn_vpn)
|
||
|
||
btn_direct = QPushButton("Add to Force Direct")
|
||
btn_direct.clicked.connect(lambda: self._add_selected("direct"))
|
||
row.addWidget(btn_direct)
|
||
|
||
row.addStretch(1)
|
||
btn_close = QPushButton("Close")
|
||
btn_close.clicked.connect(self.accept)
|
||
row.addWidget(btn_close)
|
||
root.addLayout(row)
|
||
|
||
def _mark_state(self, kind: str, value: str) -> tuple[bool, bool]:
|
||
k = (kind or "").strip().lower()
|
||
v = (value or "").strip()
|
||
if not k or not v:
|
||
return False, False
|
||
in_vpn = v in (self.existing.get("vpn", {}).get(k, set()) or set())
|
||
in_direct = v in (self.existing.get("direct", {}).get(k, set()) or set())
|
||
return bool(in_vpn), bool(in_direct)
|
||
|
||
def _apply_filter(self, lst: QListWidget, query: str) -> None:
|
||
q = (query or "").strip().lower()
|
||
hide_existing = bool(self.chk_hide_existing.isChecked())
|
||
for i in range(lst.count()):
|
||
it = lst.item(i)
|
||
if not it:
|
||
continue
|
||
if hide_existing and bool(it.data(QtCore.Qt.UserRole + 1) or False):
|
||
it.setHidden(True)
|
||
continue
|
||
if not q:
|
||
it.setHidden(False)
|
||
continue
|
||
it.setHidden(q not in it.text().lower())
|
||
|
||
def _refilter_current(self) -> None:
|
||
tab = self.tabs.currentWidget()
|
||
if tab is None:
|
||
return
|
||
lst = self._tab_list.get(tab)
|
||
filt = self._tab_filter.get(tab)
|
||
if lst is None or filt is None:
|
||
return
|
||
self._apply_filter(lst, filt.text())
|
||
|
||
def _add_tab(self, title: str, kind: str, items: list[tuple[str, str, str]], *, extra=None) -> None:
|
||
tab = QWidget()
|
||
layout = QVBoxLayout(tab)
|
||
|
||
if extra is not None:
|
||
extra(layout)
|
||
|
||
filt = QLineEdit()
|
||
filt.setPlaceholderText("Filter...")
|
||
layout.addWidget(filt)
|
||
|
||
lst = QListWidget()
|
||
lst.setSelectionMode(QAbstractItemView.ExtendedSelection)
|
||
for entry in items:
|
||
label = str(entry[0]) if len(entry) > 0 else ""
|
||
value = str(entry[1]) if len(entry) > 1 else ""
|
||
tip = str(entry[2]) if len(entry) > 2 else ""
|
||
in_vpn, in_direct = self._mark_state(kind, value)
|
||
flags = []
|
||
if in_vpn:
|
||
flags.append("VPN")
|
||
if in_direct:
|
||
flags.append("DIRECT")
|
||
if flags:
|
||
label = f"{label} [{' + '.join(flags)}]"
|
||
|
||
it = QListWidgetItem(label)
|
||
it.setData(QtCore.Qt.UserRole, value)
|
||
it.setData(QtCore.Qt.UserRole + 1, bool(in_vpn or in_direct))
|
||
if tip.strip():
|
||
extra_tip = (
|
||
f"\n\nAlready in Force VPN: {'yes' if in_vpn else 'no'}\n"
|
||
f"Already in Force Direct: {'yes' if in_direct else 'no'}"
|
||
)
|
||
it.setToolTip(tip + extra_tip)
|
||
if in_vpn or in_direct:
|
||
it.setForeground(QtGui.QBrush(QtGui.QColor("gray")))
|
||
lst.addItem(it)
|
||
|
||
layout.addWidget(lst, stretch=1)
|
||
filt.textChanged.connect(lambda txt, l=lst: self._apply_filter(l, txt))
|
||
|
||
self.tabs.addTab(tab, title)
|
||
self._tab_kind[tab] = kind
|
||
self._tab_list[tab] = lst
|
||
self._tab_filter[tab] = filt
|
||
|
||
def _current_kind_and_list(self) -> tuple[str, QListWidget | None]:
|
||
tab = self.tabs.currentWidget()
|
||
if tab is None:
|
||
return "", None
|
||
return self._tab_kind.get(tab, ""), self._tab_list.get(tab)
|
||
|
||
def _add_selected(self, target: str) -> None:
|
||
kind, lst = self._current_kind_and_list()
|
||
if not kind or lst is None:
|
||
return
|
||
|
||
vals: list[str] = []
|
||
for it in lst.selectedItems():
|
||
v = it.data(QtCore.Qt.UserRole)
|
||
vv = str(v or "").strip()
|
||
if vv:
|
||
vals.append(vv)
|
||
|
||
# stable de-dupe
|
||
out: list[str] = []
|
||
seen: set[str] = set()
|
||
for v in vals:
|
||
if v in seen:
|
||
continue
|
||
seen.add(v)
|
||
out.append(v)
|
||
|
||
if out:
|
||
self.add_cb(target, kind, out)
|
||
|
||
def _list_for_title(self, title: str) -> QListWidget | None:
|
||
for i in range(self.tabs.count()):
|
||
if self.tabs.tabText(i) == title:
|
||
tab = self.tabs.widget(i)
|
||
return self._tab_list.get(tab)
|
||
return None
|
||
|
||
def _preset_clear_selection(self, title: str) -> None:
|
||
lst = self._list_for_title(title)
|
||
if lst is not None:
|
||
lst.clearSelection()
|
||
|
||
def _preset_select_services(self, keywords: list[str]) -> None:
|
||
lst = self._list_for_title("Services")
|
||
if lst is None:
|
||
return
|
||
keys = [str(k).strip().lower() for k in (keywords or []) if str(k).strip()]
|
||
if not keys:
|
||
return
|
||
lst.clearSelection()
|
||
for i in range(lst.count()):
|
||
it = lst.item(i)
|
||
if it is None:
|
||
continue
|
||
txt = (it.text() or "").lower()
|
||
val = str(it.data(QtCore.Qt.UserRole) or "").lower()
|
||
if any(k in txt or k in val for k in keys):
|
||
it.setSelected(True)
|
||
|
||
def _preset_select_uids(self, uids: list[int]) -> None:
|
||
lst = self._list_for_title("UIDs")
|
||
if lst is None:
|
||
return
|
||
want = {f"{int(u)}-{int(u)}" for u in (uids or [])}
|
||
if not want:
|
||
return
|
||
lst.clearSelection()
|
||
for i in range(lst.count()):
|
||
it = lst.item(i)
|
||
if it is None:
|
||
continue
|
||
token = str(it.data(QtCore.Qt.UserRole) or "").strip()
|
||
if token in want:
|
||
it.setSelected(True)
|
||
|
||
def _build_subnets_tab(self) -> None:
|
||
subs = list(getattr(self.cands, "subnets", []) or [])
|
||
|
||
items: list[tuple[str, str, str]] = []
|
||
for s in subs:
|
||
cidr = str(getattr(s, "cidr", "") or "").strip()
|
||
if not cidr:
|
||
continue
|
||
dev = str(getattr(s, "dev", "") or "").strip()
|
||
kind = str(getattr(s, "kind", "") or "").strip()
|
||
linkdown = bool(getattr(s, "linkdown", False))
|
||
tags = []
|
||
if kind:
|
||
tags.append(kind)
|
||
if dev:
|
||
tags.append(dev)
|
||
if linkdown:
|
||
tags.append("linkdown")
|
||
tag_txt = " " + "[" + ", ".join(tags) + "]" if tags else ""
|
||
tip = (
|
||
f"CIDR: {cidr}\n"
|
||
f"kind={kind or '-'} dev={dev or '-'} linkdown={linkdown}\n\n"
|
||
"EN: Source subnet overrides affect forwarded traffic (Docker).\n"
|
||
"RU: Source subnet влияет на forwarded трафик (Docker)."
|
||
)
|
||
items.append((f"{cidr}{tag_txt}", cidr, tip))
|
||
|
||
def extra(layout: QVBoxLayout) -> None:
|
||
row = QHBoxLayout()
|
||
btn_lan = QPushButton("Keep LAN direct")
|
||
btn_lan.clicked.connect(lambda: self._preset_add_lan_direct())
|
||
row.addWidget(btn_lan)
|
||
btn_docker = QPushButton("Keep Docker direct")
|
||
btn_docker.clicked.connect(lambda: self._preset_add_docker_direct())
|
||
row.addWidget(btn_docker)
|
||
row.addStretch(1)
|
||
layout.addLayout(row)
|
||
|
||
self._add_tab("Subnets", "subnet", items, extra=extra)
|
||
|
||
def _preset_add_lan_direct(self) -> None:
|
||
subs = list(getattr(self.cands, "subnets", []) or [])
|
||
vals: list[str] = []
|
||
for s in subs:
|
||
kind = str(getattr(s, "kind", "") or "").strip()
|
||
cidr = str(getattr(s, "cidr", "") or "").strip()
|
||
if not cidr:
|
||
continue
|
||
if kind in ("lan", "link"):
|
||
vals.append(cidr)
|
||
if vals:
|
||
self.add_cb("direct", "subnet", vals)
|
||
|
||
def _preset_add_docker_direct(self) -> None:
|
||
subs = list(getattr(self.cands, "subnets", []) or [])
|
||
vals: list[str] = []
|
||
for s in subs:
|
||
kind = str(getattr(s, "kind", "") or "").strip()
|
||
cidr = str(getattr(s, "cidr", "") or "").strip()
|
||
if not cidr:
|
||
continue
|
||
if kind == "docker":
|
||
vals.append(cidr)
|
||
if vals:
|
||
self.add_cb("direct", "subnet", vals)
|
||
|
||
def _build_services_tab(self) -> None:
|
||
units = list(getattr(self.cands, "units", []) or [])
|
||
|
||
items: list[tuple[str, str, str]] = []
|
||
for u in units:
|
||
unit = str(getattr(u, "unit", "") or "").strip()
|
||
if not unit:
|
||
continue
|
||
desc = str(getattr(u, "description", "") or "").strip()
|
||
cgroup = str(getattr(u, "cgroup", "") or "").strip() or unit
|
||
label = unit
|
||
if desc:
|
||
label += " - " + desc
|
||
tip = (
|
||
f"Unit: {unit}\n"
|
||
f"Cgroup token: {cgroup}\n\n"
|
||
"EN: Adds a cgroup override; backend resolves it to UID rules at apply time.\n"
|
||
"RU: Добавляет cgroup override; backend резолвит его в UID правила при применении."
|
||
)
|
||
items.append((label, cgroup, tip))
|
||
|
||
def extra(layout: QVBoxLayout) -> None:
|
||
row = QHBoxLayout()
|
||
btn_docker = QPushButton("Select docker/container")
|
||
btn_docker.clicked.connect(lambda: self._preset_select_services(["docker", "containerd", "podman"]))
|
||
row.addWidget(btn_docker)
|
||
btn_media = QPushButton("Select media (jellyfin/plex)")
|
||
btn_media.clicked.connect(lambda: self._preset_select_services(["jellyfin", "plex", "emby"]))
|
||
row.addWidget(btn_media)
|
||
btn_clear = QPushButton("Clear selection")
|
||
btn_clear.clicked.connect(lambda: self._preset_clear_selection("Services"))
|
||
row.addWidget(btn_clear)
|
||
row.addStretch(1)
|
||
layout.addLayout(row)
|
||
|
||
self._add_tab("Services", "cgroup", items, extra=extra)
|
||
|
||
def _build_uids_tab(self) -> None:
|
||
uids = list(getattr(self.cands, "uids", []) or [])
|
||
|
||
items: list[tuple[str, str, str]] = []
|
||
for u in uids:
|
||
try:
|
||
uid = int(getattr(u, "uid", 0) or 0)
|
||
except Exception:
|
||
continue
|
||
user = str(getattr(u, "user", "") or "").strip()
|
||
examples = list(getattr(u, "examples", []) or [])
|
||
ex_txt = ", ".join([str(x) for x in examples if str(x).strip()])
|
||
|
||
label = str(uid)
|
||
if user:
|
||
label += f" ({user})"
|
||
if ex_txt:
|
||
label += " - " + ex_txt
|
||
|
||
token = f"{uid}-{uid}"
|
||
tip = (
|
||
f"UID: {uid}\n"
|
||
f"User: {user or '-'}\n"
|
||
f"Examples: {ex_txt or '-'}\n\n"
|
||
"EN: UID rules affect host-local processes (OUTPUT).\n"
|
||
"RU: UID правила влияют на процессы хоста (OUTPUT)."
|
||
)
|
||
items.append((label, token, tip))
|
||
|
||
def extra(layout: QVBoxLayout) -> None:
|
||
row = QHBoxLayout()
|
||
btn_me = QPushButton("Select my UID")
|
||
btn_me.clicked.connect(lambda: self._preset_select_uids([os.getuid()]))
|
||
row.addWidget(btn_me)
|
||
btn_root = QPushButton("Select root UID")
|
||
btn_root.clicked.connect(lambda: self._preset_select_uids([0]))
|
||
row.addWidget(btn_root)
|
||
btn_clear = QPushButton("Clear selection")
|
||
btn_clear.clicked.connect(lambda: self._preset_clear_selection("UIDs"))
|
||
row.addWidget(btn_clear)
|
||
row.addStretch(1)
|
||
layout.addLayout(row)
|
||
|
||
self._add_tab("UIDs", "uid", items, extra=extra)
|