traffic: add per-app runtime app routing via cgroup marks

This commit is contained in:
beckline
2026-02-14 16:58:30 +03:00
parent 1fec4a51da
commit 90907219dc
10 changed files with 819 additions and 7 deletions

View File

@@ -0,0 +1,261 @@
#!/usr/bin/env python3
from __future__ import annotations
import shlex
import subprocess
import time
from dataclasses import dataclass
from typing import Callable, Optional
from PySide6 import QtCore
from PySide6.QtWidgets import (
QButtonGroup,
QDialog,
QGroupBox,
QHBoxLayout,
QLabel,
QLineEdit,
QMessageBox,
QPlainTextEdit,
QPushButton,
QRadioButton,
QSpinBox,
QVBoxLayout,
)
from dashboard_controller import DashboardController
@dataclass(frozen=True)
class RunScopeResult:
ok: bool
unit: str = ""
cgroup: str = ""
message: str = ""
stdout: str = ""
class AppRouteDialog(QDialog):
"""
EN: Launch an app inside a systemd --user scope and register its cgroup id in backend nftsets.
RU: Запускает приложение в systemd --user scope и регистрирует его cgroup id в nftset'ах backend-а.
"""
def __init__(
self,
controller: DashboardController,
*,
log_cb: Callable[[str], None] | None = None,
parent=None,
) -> None:
super().__init__(parent)
self.ctrl = controller
self.log_cb = log_cb
self.setWindowTitle("Run app via VPN / Direct (runtime)")
self.resize(720, 520)
root = QVBoxLayout(self)
hint = QLabel(
"Runtime per-app routing (Wayland-friendly):\n"
"- Launch uses systemd-run --user --scope.\n"
"- Backend adds the scope cgroup into nftset -> fwmark rules.\n"
"- Marks are temporary (TTL). Use Traffic overrides for persistent policy."
)
hint.setWordWrap(True)
hint.setStyleSheet("color: gray;")
root.addWidget(hint)
grp = QGroupBox("Run")
gl = QVBoxLayout(grp)
row_cmd = QHBoxLayout()
row_cmd.addWidget(QLabel("Command"))
self.ed_cmd = QLineEdit()
self.ed_cmd.setPlaceholderText("e.g. firefox --private-window https://example.com")
self.ed_cmd.setToolTip(
"EN: Command line to run. This runs as current user in a systemd --user scope.\n"
"RU: Команда запуска. Запускается от текущего пользователя в systemd --user scope."
)
row_cmd.addWidget(self.ed_cmd, stretch=1)
gl.addLayout(row_cmd)
row_target = QHBoxLayout()
row_target.addWidget(QLabel("Route via"))
self.rad_vpn = QRadioButton("VPN")
self.rad_vpn.setToolTip(
"EN: Force this app traffic via VPN policy table (agvpn).\n"
"RU: Форсировать трафик приложения через VPN policy-table (agvpn)."
)
self.rad_direct = QRadioButton("Direct")
self.rad_direct.setToolTip(
"EN: Force this app traffic to bypass VPN (lookup main), even in full tunnel.\n"
"RU: Форсировать трафик приложения мимо VPN (lookup main), даже в full tunnel."
)
bg = QButtonGroup(self)
bg.addButton(self.rad_vpn)
bg.addButton(self.rad_direct)
self.rad_vpn.setChecked(True)
row_target.addWidget(self.rad_vpn)
row_target.addWidget(self.rad_direct)
row_target.addStretch(1)
row_ttl = QHBoxLayout()
row_ttl.addWidget(QLabel("TTL (hours)"))
self.spn_ttl = QSpinBox()
self.spn_ttl.setRange(1, 24 * 30) # up to ~30 days
self.spn_ttl.setValue(24)
self.spn_ttl.setToolTip(
"EN: How long the runtime mark stays active (backend nftset element timeout).\n"
"RU: Сколько живет runtime-метка (timeout элемента в nftset)."
)
row_ttl.addWidget(self.spn_ttl)
row_ttl.addStretch(1)
gl.addLayout(row_target)
gl.addLayout(row_ttl)
row_btn = QHBoxLayout()
self.btn_run = QPushButton("Run in scope + apply mark")
self.btn_run.clicked.connect(self.on_run_clicked)
row_btn.addWidget(self.btn_run)
self.btn_refresh = QPushButton("Refresh counts")
self.btn_refresh.clicked.connect(self.on_refresh_counts)
row_btn.addWidget(self.btn_refresh)
row_btn.addStretch(1)
gl.addLayout(row_btn)
self.lbl_counts = QLabel("Marks: —")
self.lbl_counts.setStyleSheet("color: gray;")
gl.addWidget(self.lbl_counts)
root.addWidget(grp)
self.txt = QPlainTextEdit()
self.txt.setReadOnly(True)
root.addWidget(self.txt, stretch=1)
row_bottom = QHBoxLayout()
row_bottom.addStretch(1)
btn_close = QPushButton("Close")
btn_close.clicked.connect(self.accept)
row_bottom.addWidget(btn_close)
root.addLayout(row_bottom)
QtCore.QTimer.singleShot(0, self.on_refresh_counts)
def _emit_log(self, msg: str) -> None:
text = (msg or "").strip()
if not text:
return
if self.log_cb:
self.log_cb(text)
return
try:
self.ctrl.log_gui(text)
except Exception:
pass
def _append(self, msg: str) -> None:
text = (msg or "").rstrip()
if not text:
return
self.txt.appendPlainText(text)
self._emit_log(text)
def on_refresh_counts(self) -> None:
try:
st = self.ctrl.traffic_appmarks_status()
self.lbl_counts.setText(f"Marks: VPN={st.vpn_count}, Direct={st.direct_count}")
except Exception as e:
self.lbl_counts.setText(f"Marks: error: {e}")
def _run_scope(self, cmdline: str, *, unit: str) -> RunScopeResult:
args = shlex.split(cmdline)
if not args:
return RunScopeResult(ok=False, message="empty command")
# Launch the scope.
run_cmd = [
"systemd-run",
"--user",
"--scope",
"--unit",
unit,
"--collect",
"--same-dir",
] + args
p = subprocess.run(
run_cmd,
capture_output=True,
text=True,
check=False,
)
out = (p.stdout or "") + (p.stderr or "")
if p.returncode != 0:
return RunScopeResult(ok=False, unit=unit, message=f"systemd-run failed: {p.returncode}", stdout=out.strip())
# Get cgroup path for this unit.
p2 = subprocess.run(
["systemctl", "--user", "show", "-p", "ControlGroup", "--value", unit],
capture_output=True,
text=True,
check=False,
)
cg = (p2.stdout or "").strip()
out2 = (p2.stdout or "") + (p2.stderr or "")
if p2.returncode != 0 or not cg:
return RunScopeResult(ok=False, unit=unit, message="failed to query ControlGroup", stdout=(out + "\n" + out2).strip())
return RunScopeResult(ok=True, unit=unit, cgroup=cg, message="ok", stdout=out.strip())
def on_run_clicked(self) -> None:
cmdline = self.ed_cmd.text().strip()
if not cmdline:
QMessageBox.warning(self, "Missing command", "Please enter a command to run.")
return
target = "vpn" if self.rad_vpn.isChecked() else "direct"
ttl_sec = int(self.spn_ttl.value()) * 3600
unit = f"svpn-{target}-{int(time.time())}.scope"
self._append(f"[app] launching: target={target} ttl={ttl_sec}s unit={unit}")
try:
rr = self._run_scope(cmdline, unit=unit)
except Exception as e:
QMessageBox.critical(self, "Run failed", str(e))
return
if not rr.ok:
self._append(f"[app] ERROR: {rr.message}\n{rr.stdout}".rstrip())
QMessageBox.critical(self, "Run failed", rr.message)
return
self._append(f"[app] scope started: unit={rr.unit}")
self._append(f"[app] ControlGroup: {rr.cgroup}")
try:
res = self.ctrl.traffic_appmarks_apply(
op="add",
target=target,
cgroup=rr.cgroup,
timeout_sec=ttl_sec,
)
except Exception as e:
self._append(f"[appmarks] ERROR calling API: {e}")
QMessageBox.critical(self, "API error", str(e))
return
if res.ok:
self._append(f"[appmarks] OK: {res.message} cgroup_id={res.cgroup_id} timeout={res.timeout_sec}s")
else:
self._append(f"[appmarks] ERROR: {res.message}")
QMessageBox.critical(self, "App mark error", res.message or "unknown error")
self.on_refresh_counts()