Files
elmprodvpn/selective-vpn-gui/main_window/ui_location_runtime_mixin.py

449 lines
17 KiB
Python

from __future__ import annotations
import re
import time
from PySide6 import QtCore
from PySide6.QtCore import QTimer, Qt
from PySide6.QtWidgets import QMessageBox
from api_client import ApiError
from main_window.constants import LOCATION_TARGET_ROLE
from netns_debug import singbox_clients_netns_state, singbox_netns_toggle_button
class UILocationRuntimeMixin:
def eventFilter(self, obj, event): # pragma: no cover - GUI
cmb = getattr(self, "cmb_locations", None)
try:
view = cmb.view() if cmb is not None else None
except RuntimeError:
return super().eventFilter(obj, event)
if obj in (cmb, view):
if event.type() == QtCore.QEvent.KeyPress:
if self._handle_location_keypress(event):
return True
return super().eventFilter(obj, event)
def _handle_location_keypress(self, event) -> bool:
key = int(event.key())
if key == int(Qt.Key_Backspace):
if self._loc_typeahead_buf:
self._loc_typeahead_buf = self._loc_typeahead_buf[:-1]
self._apply_location_search_filter()
self.loc_typeahead_timer.start()
self.cmb_locations.showPopup()
return True
if key == int(Qt.Key_Escape):
self._reset_location_typeahead()
return True
text = event.text() or ""
if len(text) != 1 or not text.isprintable() or text.isspace():
return False
self._loc_typeahead_buf += text.lower()
count = self._apply_location_search_filter()
if count == 0 and len(self._loc_typeahead_buf) > 1:
self._loc_typeahead_buf = text.lower()
self._apply_location_search_filter()
self.loc_typeahead_timer.start()
self.cmb_locations.showPopup()
return True
def _apply_location_search_filter(self) -> int:
source = list(self._all_locations or [])
query = (self._loc_typeahead_buf or "").strip().lower()
if not source:
self._set_locations_combo_items([])
return 0
items = source
if query:
items = [
row
for row in source
if self._location_matches(query, row[0], row[1], row[2], row[3])
]
items = self._sort_location_items(items)
self._set_locations_combo_items(items)
return len(items)
def _location_matches(
self,
query: str,
label: str,
iso: str,
target: str,
name: str,
) -> bool:
q = (query or "").strip().lower()
if not q:
return True
iso_l = (iso or "").strip().lower()
label_l = (label or "").strip().lower()
target_l = (target or "").strip().lower()
name_l = (name or "").strip().lower()
if iso_l.startswith(q):
return True
if target_l.startswith(q) or label_l.startswith(q) or name_l.startswith(q):
return True
tokens = [t for t in re.split(r"[^\w]+", f"{target_l} {name_l} {label_l}") if t]
if any(tok.startswith(q) for tok in tokens):
return True
return q in target_l or q in name_l or q in label_l
def _sort_location_items(
self,
items: list[tuple[str, str, str, str, int]],
) -> list[tuple[str, str, str, str, int]]:
mode = str(self.cmb_locations_sort.currentData() or "ping").strip().lower()
if mode == "ping_desc":
return sorted(items, key=lambda x: (-x[4], x[3].lower(), x[0].lower()))
if mode == "name":
return sorted(items, key=lambda x: (x[3].lower(), x[4], x[0].lower()))
if mode == "name_desc":
return sorted(items, key=lambda x: (x[3].lower(), x[4], x[0].lower()), reverse=True)
return sorted(items, key=lambda x: (x[4], x[3].lower(), x[0].lower()))
def _set_locations_combo_items(self, items: list[tuple[str, str, str, str, int]]) -> None:
prev_target = str(self.cmb_locations.currentData(LOCATION_TARGET_ROLE) or "").strip()
prev_iso = str(self.cmb_locations.currentData() or "").strip().upper()
desired = (self._vpn_desired_location or "").strip()
desired_l = desired.lower()
self.cmb_locations.blockSignals(True)
self.cmb_locations.clear()
pick = -1
for i, (label, iso, target, _name, _ping) in enumerate(items):
self.cmb_locations.addItem(label, iso)
self.cmb_locations.setItemData(i, target, LOCATION_TARGET_ROLE)
iso_l = (iso or "").strip().lower()
target_l = (target or "").strip().lower()
if desired_l and desired_l in (iso_l, target_l):
pick = i
if pick < 0 and prev_target and prev_target == target:
pick = i
if pick < 0 and prev_iso and prev_iso == iso:
pick = i
if self.cmb_locations.count() > 0:
if pick < 0:
pick = 0
self.cmb_locations.setCurrentIndex(pick)
model_index = self.cmb_locations.model().index(pick, 0)
self.cmb_locations.view().setCurrentIndex(model_index)
self.cmb_locations.blockSignals(False)
def _reset_location_typeahead(self) -> None:
self._loc_typeahead_buf = ""
self._apply_location_search_filter()
def _location_name_ping(self, label: str, iso: str, target: str) -> tuple[str, int]:
text = (label or "").strip()
ping = 1_000_000
m = re.search(r"\((\d+)\s*ms\)\s*$", text, flags=re.IGNORECASE)
if m:
try:
ping = int(m.group(1))
except Exception:
ping = 1_000_000
text = text[:m.start()].strip()
iso_pref = (iso or "").strip().upper()
pref = iso_pref + " "
if iso_pref and text.upper().startswith(pref):
text = text[len(pref):].strip()
name = text or (target or iso_pref or "").strip()
return name, ping
def on_locations_sort_changed(self, _index: int = 0) -> None:
self._apply_location_search_filter()
self._save_ui_preferences()
def on_locations_refresh_click(self) -> None:
self._safe(self._trigger_locations_refresh, title="Locations refresh error")
def _trigger_locations_refresh(self) -> None:
self.lbl_locations_meta.setText("Locations: refreshing...")
self.lbl_locations_meta.setStyleSheet("color: orange;")
self._refresh_locations_async(force_refresh=True)
def _append_transport_log(self, line: str) -> None:
msg = (line or "").strip()
if not msg:
return
self._append_text(self.txt_transport, msg + "\n")
def _singbox_clients_netns_state(self) -> tuple[bool, bool]:
return singbox_clients_netns_state(list(self._transport_clients or []))
def _refresh_transport_netns_toggle_button(self) -> None:
all_enabled, any_enabled = self._singbox_clients_netns_state()
text, color = singbox_netns_toggle_button(all_enabled, any_enabled)
self.btn_transport_netns_toggle.setText(text)
self.btn_transport_netns_toggle.setStyleSheet(f"color: {color};")
def _selected_transport_engine_id(self) -> str:
return str(self.cmb_transport_engine.currentData() or "").strip()
def _selected_transport_client(self):
cid = self._selected_transport_engine_id()
if not cid:
return None
for client in self._transport_clients or []:
if str(getattr(client, "id", "") or "").strip() == cid:
return client
return None
def _transport_live_health_for_client(self, client) -> tuple[str, int, str, str]:
status = str(getattr(client, "status", "") or "").strip().lower() or "unknown"
latency = int(getattr(getattr(client, "health", None), "latency_ms", 0) or 0)
last_error = str(getattr(getattr(client, "health", None), "last_error", "") or "").strip()
last_check = str(getattr(getattr(client, "health", None), "last_check", "") or "").strip()
cid = str(getattr(client, "id", "") or "").strip()
if not cid:
return status, latency, last_error, last_check
snap = self._transport_health_live.get(cid)
if not isinstance(snap, dict):
return status, latency, last_error, last_check
snap_status = str(snap.get("status") or "").strip().lower()
if snap_status:
status = snap_status
try:
snap_latency = int(snap.get("latency_ms") or 0)
if snap_latency >= 0:
latency = snap_latency
except Exception:
pass
snap_err = str(snap.get("last_error") or "").strip()
if snap_err:
last_error = snap_err
snap_check = str(snap.get("last_check") or "").strip()
if snap_check:
last_check = snap_check
return status, latency, last_error, last_check
def _country_flag(self, country_code: str) -> str:
cc = str(country_code or "").strip().upper()
if len(cc) != 2 or not cc.isalpha():
return ""
try:
return "".join(chr(127397 + ord(ch)) for ch in cc)
except Exception:
return ""
def _refresh_egress_identity_scope(
self,
scope: str,
*,
force: bool = False,
trigger_refresh: bool = True,
min_interval_sec: float = 1.0,
silent: bool = True,
):
scope_key = str(scope or "").strip().lower()
if not scope_key:
return None
now = time.monotonic()
last = float(self._egress_identity_last_probe_ts.get(scope_key, 0.0) or 0.0)
if not force and (now - last) < max(0.2, float(min_interval_sec)):
return self._egress_identity_cache.get(scope_key)
self._egress_identity_last_probe_ts[scope_key] = now
try:
item = self.ctrl.egress_identity(scope_key, refresh=trigger_refresh)
self._egress_identity_cache[scope_key] = item
return item
except ApiError as e:
code = int(getattr(e, "status_code", 0) or 0)
if not silent and code != 404:
QMessageBox.warning(self, "Egress identity error", str(e))
return self._egress_identity_cache.get(scope_key)
except Exception as e:
if not silent:
QMessageBox.warning(self, "Egress identity error", str(e))
return self._egress_identity_cache.get(scope_key)
def _format_egress_identity_short(self, item) -> str:
if item is None:
return ""
ip = str(getattr(item, "ip", "") or "").strip()
if not ip:
return ""
code = str(getattr(item, "country_code", "") or "").strip().upper()
flag = self._country_flag(code)
if flag:
return f"{flag} {ip}"
return ip
def _render_vpn_egress_label(self, item) -> None:
if item is None:
self.lbl_vpn_egress.setText("Egress: n/a")
self.lbl_vpn_egress.setStyleSheet("color: gray;")
return
ip = str(getattr(item, "ip", "") or "").strip()
code = str(getattr(item, "country_code", "") or "").strip().upper()
name = str(getattr(item, "country_name", "") or "").strip()
stale = bool(getattr(item, "stale", False))
refreshing = bool(getattr(item, "refresh_in_progress", False))
last_error = str(getattr(item, "last_error", "") or "").strip()
if not ip:
if refreshing:
self.lbl_vpn_egress.setText("Egress: refreshing...")
self.lbl_vpn_egress.setStyleSheet("color: orange;")
return
if last_error:
cut = last_error if len(last_error) <= 120 else last_error[:117] + "..."
self.lbl_vpn_egress.setText(f"Egress: n/a ({cut})")
self.lbl_vpn_egress.setStyleSheet("color: red;")
return
self.lbl_vpn_egress.setText("Egress: n/a")
self.lbl_vpn_egress.setStyleSheet("color: gray;")
return
flag = self._country_flag(code)
prefix = f"{flag} {ip}" if flag else ip
tail = ""
if name:
tail = f" ({name})"
elif code:
tail = f" ({code})"
if stale:
tail += " · stale"
self.lbl_vpn_egress.setText(f"Egress: {prefix}{tail}")
self.lbl_vpn_egress.setStyleSheet("color: orange;" if stale else "color: #1f6b2f;")
def _poll_vpn_egress_after_switch(self, token: int, attempts_left: int) -> None:
if token != self._vpn_egress_refresh_token:
return
item = self._refresh_egress_identity_scope(
"adguardvpn",
force=True,
trigger_refresh=False,
min_interval_sec=0.0,
silent=True,
)
self._render_vpn_egress_label(item)
if token != self._vpn_egress_refresh_token:
return
refresh_in_progress = bool(getattr(item, "refresh_in_progress", False)) if item is not None else True
has_ip = bool(str(getattr(item, "ip", "") or "").strip()) if item is not None else False
has_country = bool(
str(getattr(item, "country_code", "") or "").strip()
or str(getattr(item, "country_name", "") or "").strip()
) if item is not None else False
if attempts_left <= 0:
return
if has_ip and has_country and not refresh_in_progress and not self._vpn_switching_active:
return
delay_ms = 450 if attempts_left > 3 else 900
QTimer.singleShot(
delay_ms,
lambda tok=token, left=attempts_left - 1: self._poll_vpn_egress_after_switch(tok, left),
)
def _trigger_vpn_egress_refresh(self, *, reason: str = "") -> None:
scope = "adguardvpn"
self._vpn_egress_refresh_token += 1
token = self._vpn_egress_refresh_token
self._egress_identity_last_probe_ts[scope] = 0.0
self._vpn_autoloop_refresh_pending = False
self._vpn_autoloop_last_force_refresh_ts = time.monotonic()
self.lbl_vpn_egress.setText("Egress: refreshing...")
self.lbl_vpn_egress.setStyleSheet("color: orange;")
try:
self.ctrl.egress_identity_refresh(scopes=[scope], force=True)
except Exception:
pass
if reason:
try:
self.ctrl.log_gui(f"[egress] force refresh: {reason}")
except Exception:
pass
self._poll_vpn_egress_after_switch(token, attempts_left=14)
def _normalize_vpn_autoloop_state(self, unit_text: str) -> str:
low = str(unit_text or "").strip().lower()
if ":" in low:
low = low.split(":", 1)[1].strip()
if "reconnect" in low:
return "reconnecting"
if "disconnected" in low or "inactive" in low:
return "down"
if "failed" in low or "error" in low or "dead" in low:
return "down"
if "connected" in low:
return "connected"
if "active" in low or "running" in low or "enabled" in low or "up" in low:
return "connected"
return "unknown"
def _maybe_trigger_vpn_egress_refresh_on_autoloop(self, unit_text: str) -> None:
state = self._normalize_vpn_autoloop_state(unit_text)
prev = str(self._vpn_autoloop_last_state or "").strip().lower()
now = time.monotonic()
if state in ("down", "reconnecting", "unknown"):
self._vpn_autoloop_refresh_pending = True
if (
state == "connected"
and self._vpn_autoloop_refresh_pending
and not self._vpn_switching_active
and (now - float(self._vpn_autoloop_last_force_refresh_ts or 0.0)) >= 1.0
):
self._trigger_vpn_egress_refresh(reason=f"autoloop {prev or 'unknown'} -> connected")
self._vpn_autoloop_last_state = state
def _refresh_selected_transport_health_live(
self,
*,
force: bool = False,
min_interval_sec: float = 0.8,
silent: bool = True,
) -> bool:
if not self._transport_api_supported:
return False
cid = self._selected_transport_engine_id()
if not cid:
return False
now = time.monotonic()
if not force and (now - self._transport_health_last_probe_ts) < max(0.2, float(min_interval_sec)):
return False
self._transport_health_last_probe_ts = now
try:
snap = self.ctrl.transport_client_health(cid)
except ApiError as e:
if not silent and int(getattr(e, "status_code", 0) or 0) != 404:
QMessageBox.warning(self, "Transport health error", str(e))
return False
except Exception as e:
if not silent:
QMessageBox.warning(self, "Transport health error", str(e))
return False
self._transport_health_live[cid] = {
"status": str(getattr(snap, "status", "") or "").strip().lower(),
"latency_ms": int(getattr(snap, "latency_ms", 0) or 0),
"last_error": str(getattr(snap, "last_error", "") or "").strip(),
"last_check": str(getattr(snap, "last_check", "") or "").strip(),
}
self._render_singbox_profile_cards()
self._sync_singbox_profile_card_selection(cid)
self._update_transport_engine_view()