From 0c41a938af98c8726a53fa6c31bb310a1660662e Mon Sep 17 00:00:00 2001 From: beckline Date: Sun, 15 Feb 2026 23:51:41 +0300 Subject: [PATCH] ui: add process picker for PID appmarks --- selective-vpn-gui/traffic_mode_dialog.py | 331 +++++++++++++++++++++++ 1 file changed, 331 insertions(+) diff --git a/selective-vpn-gui/traffic_mode_dialog.py b/selective-vpn-gui/traffic_mode_dialog.py index 043bd5c..dd00553 100644 --- a/selective-vpn-gui/traffic_mode_dialog.py +++ b/selective-vpn-gui/traffic_mode_dialog.py @@ -4,6 +4,7 @@ from __future__ import annotations import configparser import os import pathlib +import pwd import re import shlex import subprocess @@ -58,6 +59,17 @@ class RuntimeScopeInfo: cgroup_id: int +@dataclass(frozen=True) +class RunningProcessEntry: + pid: int + uid: int + user: str + comm: str + exe: str + cmdline: str + cgroup: str + + class TrafficModeDialog(QDialog): def __init__( self, @@ -405,6 +417,13 @@ RU: Восстанавливает маршруты/nft из последнег "RU: Читает /proc//cgroup чтобы получить cgroupv2 path." ) pid_layout.addWidget(self.ed_app_pid, stretch=1) + self.btn_app_pick_pid = QPushButton("Pick process...") + self.btn_app_pick_pid.setToolTip( + "EN: Pick a running process and fill PID.\n" + "RU: Выбрать запущенный процесс и заполнить PID." + ) + self.btn_app_pick_pid.clicked.connect(self.on_app_pick_pid) + pid_layout.addWidget(self.btn_app_pick_pid) self.btn_app_mark_pid = QPushButton("Apply mark") self.btn_app_mark_pid.setToolTip( "EN: Apply routing mark to the PID (does not launch/stop the app).\n" @@ -1836,6 +1855,21 @@ RU: Применяет policy-rules и проверяет health. При оши except Exception: return "" + def on_app_pick_pid(self) -> None: + def work() -> None: + dlg = ProcessPickerDialog(parent=self) + if dlg.exec() != QDialog.Accepted: + return + ent = dlg.selected_entry() + if ent is None: + return + self.ed_app_pid.setText(str(int(ent.pid or 0))) + self._append_app_log( + f"[picker] pid selected: pid={ent.pid} user={ent.user or ent.uid} comm={ent.comm or '-'} exe={ent.exe or '-'}" + ) + + self._safe(work, title="Process picker error") + def _control_group_for_unit_retry(self, unit: str, *, timeout_sec: float = 2.0) -> str: u = (unit or "").strip() if not u: @@ -3336,3 +3370,300 @@ class AppPickerDialog(QDialog): return self._selected_cmd = cmd self.accept() + + +def _proc_read_text(path: str) -> str: + try: + with open(path, "r", encoding="utf-8", errors="replace") as f: + return (f.read() or "").strip() + except Exception: + return "" + + +def _proc_read_cmdline(pid: int) -> str: + p = int(pid or 0) + if p <= 0: + return "" + try: + with open(f"/proc/{p}/cmdline", "rb") as f: + raw = f.read() or b"" + parts = [x for x in raw.split(b"\x00") if x] + out = " ".join([x.decode("utf-8", errors="replace") for x in parts]) + return out.strip() + except Exception: + return "" + + +def _proc_read_uid(pid: int) -> int: + p = int(pid or 0) + if p <= 0: + return -1 + try: + with open(f"/proc/{p}/status", "r", encoding="utf-8", errors="replace") as f: + for raw in f: + line = (raw or "").strip() + if not line.startswith("Uid:"): + continue + fields = line.split() + if len(fields) >= 2: + return int(fields[1]) + except Exception: + return -1 + return -1 + + +def _proc_read_exe(pid: int) -> str: + p = int(pid or 0) + if p <= 0: + return "" + try: + return os.readlink(f"/proc/{p}/exe").strip() + except Exception: + return "" + + +def _proc_read_cgroup(pid: int) -> str: + p = int(pid or 0) + if p <= 0: + return "" + try: + with open(f"/proc/{p}/cgroup", "r", encoding="utf-8", errors="replace") as f: + for raw in f: + line = (raw or "").strip() + if line.startswith("0::"): + return line[len("0::") :].strip() + except Exception: + return "" + return "" + + +def _username_for_uid(uid: int) -> str: + try: + return pwd.getpwuid(int(uid)).pw_name + except Exception: + return str(int(uid)) + + +class ProcessPickerDialog(QDialog): + def __init__(self, *, parent=None) -> None: + super().__init__(parent) + self.setWindowTitle("Pick process (PID)") + self.resize(900, 660) + + self._selected: RunningProcessEntry | None = None + + root = QVBoxLayout(self) + + note = QLabel( + "EN: Pick a running process to apply a runtime mark by PID (no launch).\n" + "RU: Выбери запущенный процесс, чтобы применить runtime-метку по PID (без запуска)." + ) + note.setWordWrap(True) + note.setStyleSheet("color: gray;") + root.addWidget(note) + + row = QHBoxLayout() + row.addWidget(QLabel("Search")) + self.ed_search = QLineEdit() + self.ed_search.setPlaceholderText("Filter (pid / user / name / exe / cmdline / cgroup)...") + row.addWidget(self.ed_search, stretch=1) + + self.chk_user_only = QCheckBox("Only my user") + self.chk_user_only.setChecked(True) + self.chk_user_only.setToolTip( + "EN: Show only processes owned by current user.\n" + "RU: Показать только процессы текущего пользователя." + ) + row.addWidget(self.chk_user_only) + + self.chk_hide_empty = QCheckBox("Hide empty cmdline") + self.chk_hide_empty.setChecked(True) + self.chk_hide_empty.setToolTip( + "EN: Hide kernel threads / processes without cmdline.\n" + "RU: Скрыть kernel threads / процессы без cmdline." + ) + row.addWidget(self.chk_hide_empty) + + self.btn_refresh = QPushButton("Refresh") + self.btn_refresh.clicked.connect(self.refresh) + row.addWidget(self.btn_refresh) + + self.lbl_count = QLabel("—") + self.lbl_count.setStyleSheet("color: gray;") + row.addWidget(self.lbl_count) + root.addLayout(row) + + self.lst = QListWidget() + self.lst.setSelectionMode(QAbstractItemView.SingleSelection) + root.addWidget(self.lst, stretch=1) + + self.preview = QPlainTextEdit() + self.preview.setReadOnly(True) + self.preview.setFixedHeight(200) + root.addWidget(self.preview) + + row_btn = QHBoxLayout() + self.btn_use = QPushButton("Use selected") + self.btn_use.clicked.connect(self.on_use_selected) + row_btn.addWidget(self.btn_use) + row_btn.addStretch(1) + btn_close = QPushButton("Close") + btn_close.clicked.connect(self.reject) + row_btn.addWidget(btn_close) + root.addLayout(row_btn) + + self.ed_search.textChanged.connect(lambda _t: self._apply_filter()) + self.chk_user_only.stateChanged.connect(lambda _s: self.refresh()) + self.chk_hide_empty.stateChanged.connect(lambda _s: self.refresh()) + self.lst.currentItemChanged.connect(lambda _a, _b: self._update_preview()) + self.lst.itemDoubleClicked.connect(lambda _it: self.on_use_selected()) + + self._set_loading() + QtCore.QTimer.singleShot(0, self.refresh) + + def selected_entry(self) -> RunningProcessEntry | None: + return self._selected + + def _set_loading(self) -> None: + self.lbl_count.setText("Loading...") + self.lst.clear() + self.preview.setPlainText("—") + self.btn_use.setEnabled(False) + + def _scan(self) -> list[RunningProcessEntry]: + only_uid = os.getuid() if self.chk_user_only.isChecked() else None + hide_empty = bool(self.chk_hide_empty.isChecked()) + + out: list[RunningProcessEntry] = [] + try: + names = os.listdir("/proc") + except Exception: + names = [] + + pids: list[int] = [] + for n in names: + if n.isdigit(): + try: + pids.append(int(n)) + except Exception: + pass + pids.sort() + + for i, pid in enumerate(pids): + uid = _proc_read_uid(pid) + if uid < 0: + continue + if only_uid is not None and uid != int(only_uid): + continue + + comm = _proc_read_text(f"/proc/{pid}/comm") + cmdline = _proc_read_cmdline(pid) + if hide_empty and not cmdline: + continue + + exe = _proc_read_exe(pid) + cg = _proc_read_cgroup(pid) + user = _username_for_uid(uid) + + out.append( + RunningProcessEntry( + pid=int(pid), + uid=int(uid), + user=str(user or uid), + comm=str(comm or "").strip(), + exe=str(exe or "").strip(), + cmdline=str(cmdline or "").strip(), + cgroup=str(cg or "").strip(), + ) + ) + + # Keep UI responsive for large /proc. + if (i % 200) == 0: + try: + QtCore.QCoreApplication.processEvents() + except Exception: + pass + + out.sort(key=lambda e: (e.user.lower(), e.comm.lower(), e.pid)) + return out + + def refresh(self) -> None: + self._set_loading() + entries = self._scan() + + self.lst.clear() + for ent in entries: + comm = ent.comm or "(no comm)" + exe_base = os.path.basename(ent.exe) if ent.exe else "" + label = f"{ent.pid} {ent.user} {comm}" + if exe_base and exe_base.lower() not in comm.lower(): + label += f" [{exe_base}]" + tip = ( + f"PID: {ent.pid}\n" + f"User: {ent.user} (uid={ent.uid})\n" + f"Comm: {ent.comm or '-'}\n" + f"Exe: {ent.exe or '-'}\n" + f"Cgroup: {ent.cgroup or '-'}\n\n" + f"Cmdline:\n{ent.cmdline or '-'}" + ) + it = QListWidgetItem(label) + it.setToolTip(tip) + it.setData(QtCore.Qt.UserRole, ent) + it.setData( + QtCore.Qt.UserRole + 1, + (label + "\n" + tip).lower(), + ) + self.lst.addItem(it) + + if self.lst.count() > 0: + self.lst.setCurrentRow(0) + self._apply_filter() + self._update_preview() + + def _apply_filter(self) -> None: + q = (self.ed_search.text() or "").strip().lower() + shown = 0 + for i in range(self.lst.count()): + it = self.lst.item(i) + if not it: + continue + hay = str(it.data(QtCore.Qt.UserRole + 1) or "") + hide = bool(q) and q not in hay + it.setHidden(hide) + if not hide: + shown += 1 + self.lbl_count.setText(f"Processes: {shown}/{self.lst.count()}") + self.btn_use.setEnabled(shown > 0 and self._current_entry() is not None) + + def _current_entry(self) -> RunningProcessEntry | None: + it = self.lst.currentItem() + if not it: + return None + ent = it.data(QtCore.Qt.UserRole) + if isinstance(ent, RunningProcessEntry): + return ent + return None + + def _update_preview(self) -> None: + ent = self._current_entry() + if ent is None: + self.preview.setPlainText("—") + self.btn_use.setEnabled(False) + return + text = ( + f"PID: {ent.pid}\n" + f"User: {ent.user} (uid={ent.uid})\n" + f"Comm: {ent.comm or '-'}\n" + f"Exe: {ent.exe or '-'}\n" + f"Cgroup: {ent.cgroup or '-'}\n\n" + f"Cmdline:\n{ent.cmdline or '-'}" + ) + self.preview.setPlainText(text) + self.btn_use.setEnabled(True) + + def on_use_selected(self) -> None: + ent = self._current_entry() + if ent is None: + return + self._selected = ent + self.accept()