ui: add process picker for PID appmarks

This commit is contained in:
beckline
2026-02-15 23:51:41 +03:00
parent 2b32427e5b
commit 0c41a938af

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import configparser import configparser
import os import os
import pathlib import pathlib
import pwd
import re import re
import shlex import shlex
import subprocess import subprocess
@@ -58,6 +59,17 @@ class RuntimeScopeInfo:
cgroup_id: int cgroup_id: int
@dataclass(frozen=True)
class RunningProcessEntry:
pid: int
uid: int
user: str
comm: str
exe: str
cmdline: str
cgroup: str
class TrafficModeDialog(QDialog): class TrafficModeDialog(QDialog):
def __init__( def __init__(
self, self,
@@ -405,6 +417,13 @@ RU: Восстанавливает маршруты/nft из последнег
"RU: Читает /proc/<pid>/cgroup чтобы получить cgroupv2 path." "RU: Читает /proc/<pid>/cgroup чтобы получить cgroupv2 path."
) )
pid_layout.addWidget(self.ed_app_pid, stretch=1) pid_layout.addWidget(self.ed_app_pid, stretch=1)
self.btn_app_pick_pid = QPushButton("Pick process...")
self.btn_app_pick_pid.setToolTip(
"EN: Pick a running process and fill PID.\n"
"RU: Выбрать запущенный процесс и заполнить PID."
)
self.btn_app_pick_pid.clicked.connect(self.on_app_pick_pid)
pid_layout.addWidget(self.btn_app_pick_pid)
self.btn_app_mark_pid = QPushButton("Apply mark") self.btn_app_mark_pid = QPushButton("Apply mark")
self.btn_app_mark_pid.setToolTip( self.btn_app_mark_pid.setToolTip(
"EN: Apply routing mark to the PID (does not launch/stop the app).\n" "EN: Apply routing mark to the PID (does not launch/stop the app).\n"
@@ -1836,6 +1855,21 @@ RU: Применяет policy-rules и проверяет health. При оши
except Exception: except Exception:
return "" return ""
def on_app_pick_pid(self) -> None:
def work() -> None:
dlg = ProcessPickerDialog(parent=self)
if dlg.exec() != QDialog.Accepted:
return
ent = dlg.selected_entry()
if ent is None:
return
self.ed_app_pid.setText(str(int(ent.pid or 0)))
self._append_app_log(
f"[picker] pid selected: pid={ent.pid} user={ent.user or ent.uid} comm={ent.comm or '-'} exe={ent.exe or '-'}"
)
self._safe(work, title="Process picker error")
def _control_group_for_unit_retry(self, unit: str, *, timeout_sec: float = 2.0) -> str: def _control_group_for_unit_retry(self, unit: str, *, timeout_sec: float = 2.0) -> str:
u = (unit or "").strip() u = (unit or "").strip()
if not u: if not u:
@@ -3336,3 +3370,300 @@ class AppPickerDialog(QDialog):
return return
self._selected_cmd = cmd self._selected_cmd = cmd
self.accept() self.accept()
def _proc_read_text(path: str) -> str:
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
return (f.read() or "").strip()
except Exception:
return ""
def _proc_read_cmdline(pid: int) -> str:
p = int(pid or 0)
if p <= 0:
return ""
try:
with open(f"/proc/{p}/cmdline", "rb") as f:
raw = f.read() or b""
parts = [x for x in raw.split(b"\x00") if x]
out = " ".join([x.decode("utf-8", errors="replace") for x in parts])
return out.strip()
except Exception:
return ""
def _proc_read_uid(pid: int) -> int:
p = int(pid or 0)
if p <= 0:
return -1
try:
with open(f"/proc/{p}/status", "r", encoding="utf-8", errors="replace") as f:
for raw in f:
line = (raw or "").strip()
if not line.startswith("Uid:"):
continue
fields = line.split()
if len(fields) >= 2:
return int(fields[1])
except Exception:
return -1
return -1
def _proc_read_exe(pid: int) -> str:
p = int(pid or 0)
if p <= 0:
return ""
try:
return os.readlink(f"/proc/{p}/exe").strip()
except Exception:
return ""
def _proc_read_cgroup(pid: int) -> str:
p = int(pid or 0)
if p <= 0:
return ""
try:
with open(f"/proc/{p}/cgroup", "r", encoding="utf-8", errors="replace") as f:
for raw in f:
line = (raw or "").strip()
if line.startswith("0::"):
return line[len("0::") :].strip()
except Exception:
return ""
return ""
def _username_for_uid(uid: int) -> str:
try:
return pwd.getpwuid(int(uid)).pw_name
except Exception:
return str(int(uid))
class ProcessPickerDialog(QDialog):
def __init__(self, *, parent=None) -> None:
super().__init__(parent)
self.setWindowTitle("Pick process (PID)")
self.resize(900, 660)
self._selected: RunningProcessEntry | None = None
root = QVBoxLayout(self)
note = QLabel(
"EN: Pick a running process to apply a runtime mark by PID (no launch).\n"
"RU: Выбери запущенный процесс, чтобы применить runtime-метку по PID (без запуска)."
)
note.setWordWrap(True)
note.setStyleSheet("color: gray;")
root.addWidget(note)
row = QHBoxLayout()
row.addWidget(QLabel("Search"))
self.ed_search = QLineEdit()
self.ed_search.setPlaceholderText("Filter (pid / user / name / exe / cmdline / cgroup)...")
row.addWidget(self.ed_search, stretch=1)
self.chk_user_only = QCheckBox("Only my user")
self.chk_user_only.setChecked(True)
self.chk_user_only.setToolTip(
"EN: Show only processes owned by current user.\n"
"RU: Показать только процессы текущего пользователя."
)
row.addWidget(self.chk_user_only)
self.chk_hide_empty = QCheckBox("Hide empty cmdline")
self.chk_hide_empty.setChecked(True)
self.chk_hide_empty.setToolTip(
"EN: Hide kernel threads / processes without cmdline.\n"
"RU: Скрыть kernel threads / процессы без cmdline."
)
row.addWidget(self.chk_hide_empty)
self.btn_refresh = QPushButton("Refresh")
self.btn_refresh.clicked.connect(self.refresh)
row.addWidget(self.btn_refresh)
self.lbl_count = QLabel("")
self.lbl_count.setStyleSheet("color: gray;")
row.addWidget(self.lbl_count)
root.addLayout(row)
self.lst = QListWidget()
self.lst.setSelectionMode(QAbstractItemView.SingleSelection)
root.addWidget(self.lst, stretch=1)
self.preview = QPlainTextEdit()
self.preview.setReadOnly(True)
self.preview.setFixedHeight(200)
root.addWidget(self.preview)
row_btn = QHBoxLayout()
self.btn_use = QPushButton("Use selected")
self.btn_use.clicked.connect(self.on_use_selected)
row_btn.addWidget(self.btn_use)
row_btn.addStretch(1)
btn_close = QPushButton("Close")
btn_close.clicked.connect(self.reject)
row_btn.addWidget(btn_close)
root.addLayout(row_btn)
self.ed_search.textChanged.connect(lambda _t: self._apply_filter())
self.chk_user_only.stateChanged.connect(lambda _s: self.refresh())
self.chk_hide_empty.stateChanged.connect(lambda _s: self.refresh())
self.lst.currentItemChanged.connect(lambda _a, _b: self._update_preview())
self.lst.itemDoubleClicked.connect(lambda _it: self.on_use_selected())
self._set_loading()
QtCore.QTimer.singleShot(0, self.refresh)
def selected_entry(self) -> RunningProcessEntry | None:
return self._selected
def _set_loading(self) -> None:
self.lbl_count.setText("Loading...")
self.lst.clear()
self.preview.setPlainText("")
self.btn_use.setEnabled(False)
def _scan(self) -> list[RunningProcessEntry]:
only_uid = os.getuid() if self.chk_user_only.isChecked() else None
hide_empty = bool(self.chk_hide_empty.isChecked())
out: list[RunningProcessEntry] = []
try:
names = os.listdir("/proc")
except Exception:
names = []
pids: list[int] = []
for n in names:
if n.isdigit():
try:
pids.append(int(n))
except Exception:
pass
pids.sort()
for i, pid in enumerate(pids):
uid = _proc_read_uid(pid)
if uid < 0:
continue
if only_uid is not None and uid != int(only_uid):
continue
comm = _proc_read_text(f"/proc/{pid}/comm")
cmdline = _proc_read_cmdline(pid)
if hide_empty and not cmdline:
continue
exe = _proc_read_exe(pid)
cg = _proc_read_cgroup(pid)
user = _username_for_uid(uid)
out.append(
RunningProcessEntry(
pid=int(pid),
uid=int(uid),
user=str(user or uid),
comm=str(comm or "").strip(),
exe=str(exe or "").strip(),
cmdline=str(cmdline or "").strip(),
cgroup=str(cg or "").strip(),
)
)
# Keep UI responsive for large /proc.
if (i % 200) == 0:
try:
QtCore.QCoreApplication.processEvents()
except Exception:
pass
out.sort(key=lambda e: (e.user.lower(), e.comm.lower(), e.pid))
return out
def refresh(self) -> None:
self._set_loading()
entries = self._scan()
self.lst.clear()
for ent in entries:
comm = ent.comm or "(no comm)"
exe_base = os.path.basename(ent.exe) if ent.exe else ""
label = f"{ent.pid} {ent.user} {comm}"
if exe_base and exe_base.lower() not in comm.lower():
label += f" [{exe_base}]"
tip = (
f"PID: {ent.pid}\n"
f"User: {ent.user} (uid={ent.uid})\n"
f"Comm: {ent.comm or '-'}\n"
f"Exe: {ent.exe or '-'}\n"
f"Cgroup: {ent.cgroup or '-'}\n\n"
f"Cmdline:\n{ent.cmdline or '-'}"
)
it = QListWidgetItem(label)
it.setToolTip(tip)
it.setData(QtCore.Qt.UserRole, ent)
it.setData(
QtCore.Qt.UserRole + 1,
(label + "\n" + tip).lower(),
)
self.lst.addItem(it)
if self.lst.count() > 0:
self.lst.setCurrentRow(0)
self._apply_filter()
self._update_preview()
def _apply_filter(self) -> None:
q = (self.ed_search.text() or "").strip().lower()
shown = 0
for i in range(self.lst.count()):
it = self.lst.item(i)
if not it:
continue
hay = str(it.data(QtCore.Qt.UserRole + 1) or "")
hide = bool(q) and q not in hay
it.setHidden(hide)
if not hide:
shown += 1
self.lbl_count.setText(f"Processes: {shown}/{self.lst.count()}")
self.btn_use.setEnabled(shown > 0 and self._current_entry() is not None)
def _current_entry(self) -> RunningProcessEntry | None:
it = self.lst.currentItem()
if not it:
return None
ent = it.data(QtCore.Qt.UserRole)
if isinstance(ent, RunningProcessEntry):
return ent
return None
def _update_preview(self) -> None:
ent = self._current_entry()
if ent is None:
self.preview.setPlainText("")
self.btn_use.setEnabled(False)
return
text = (
f"PID: {ent.pid}\n"
f"User: {ent.user} (uid={ent.uid})\n"
f"Comm: {ent.comm or '-'}\n"
f"Exe: {ent.exe or '-'}\n"
f"Cgroup: {ent.cgroup or '-'}\n\n"
f"Cmdline:\n{ent.cmdline or '-'}"
)
self.preview.setPlainText(text)
self.btn_use.setEnabled(True)
def on_use_selected(self) -> None:
ent = self._current_entry()
if ent is None:
return
self._selected = ent
self.accept()