from __future__ import annotations import re import time from PySide6 import QtCore from PySide6.QtCore import QTimer, Qt from PySide6.QtWidgets import QMessageBox from api_client import ApiError from main_window.constants import LOCATION_TARGET_ROLE from netns_debug import singbox_clients_netns_state, singbox_netns_toggle_button class UILocationRuntimeMixin: def eventFilter(self, obj, event): # pragma: no cover - GUI cmb = getattr(self, "cmb_locations", None) try: view = cmb.view() if cmb is not None else None except RuntimeError: return super().eventFilter(obj, event) if obj in (cmb, view): if event.type() == QtCore.QEvent.KeyPress: if self._handle_location_keypress(event): return True return super().eventFilter(obj, event) def _handle_location_keypress(self, event) -> bool: key = int(event.key()) if key == int(Qt.Key_Backspace): if self._loc_typeahead_buf: self._loc_typeahead_buf = self._loc_typeahead_buf[:-1] self._apply_location_search_filter() self.loc_typeahead_timer.start() self.cmb_locations.showPopup() return True if key == int(Qt.Key_Escape): self._reset_location_typeahead() return True text = event.text() or "" if len(text) != 1 or not text.isprintable() or text.isspace(): return False self._loc_typeahead_buf += text.lower() count = self._apply_location_search_filter() if count == 0 and len(self._loc_typeahead_buf) > 1: self._loc_typeahead_buf = text.lower() self._apply_location_search_filter() self.loc_typeahead_timer.start() self.cmb_locations.showPopup() return True def _apply_location_search_filter(self) -> int: source = list(self._all_locations or []) query = (self._loc_typeahead_buf or "").strip().lower() if not source: self._set_locations_combo_items([]) return 0 items = source if query: items = [ row for row in source if self._location_matches(query, row[0], row[1], row[2], row[3]) ] items = self._sort_location_items(items) self._set_locations_combo_items(items) return len(items) def _location_matches( self, query: str, label: str, iso: str, target: str, name: str, ) -> bool: q = (query or "").strip().lower() if not q: return True iso_l = (iso or "").strip().lower() label_l = (label or "").strip().lower() target_l = (target or "").strip().lower() name_l = (name or "").strip().lower() if iso_l.startswith(q): return True if target_l.startswith(q) or label_l.startswith(q) or name_l.startswith(q): return True tokens = [t for t in re.split(r"[^\w]+", f"{target_l} {name_l} {label_l}") if t] if any(tok.startswith(q) for tok in tokens): return True return q in target_l or q in name_l or q in label_l def _sort_location_items( self, items: list[tuple[str, str, str, str, int]], ) -> list[tuple[str, str, str, str, int]]: mode = str(self.cmb_locations_sort.currentData() or "ping").strip().lower() if mode == "ping_desc": return sorted(items, key=lambda x: (-x[4], x[3].lower(), x[0].lower())) if mode == "name": return sorted(items, key=lambda x: (x[3].lower(), x[4], x[0].lower())) if mode == "name_desc": return sorted(items, key=lambda x: (x[3].lower(), x[4], x[0].lower()), reverse=True) return sorted(items, key=lambda x: (x[4], x[3].lower(), x[0].lower())) def _set_locations_combo_items(self, items: list[tuple[str, str, str, str, int]]) -> None: prev_target = str(self.cmb_locations.currentData(LOCATION_TARGET_ROLE) or "").strip() prev_iso = str(self.cmb_locations.currentData() or "").strip().upper() desired = (self._vpn_desired_location or "").strip() desired_l = desired.lower() self.cmb_locations.blockSignals(True) self.cmb_locations.clear() pick = -1 for i, (label, iso, target, _name, _ping) in enumerate(items): self.cmb_locations.addItem(label, iso) self.cmb_locations.setItemData(i, target, LOCATION_TARGET_ROLE) iso_l = (iso or "").strip().lower() target_l = (target or "").strip().lower() if desired_l and desired_l in (iso_l, target_l): pick = i if pick < 0 and prev_target and prev_target == target: pick = i if pick < 0 and prev_iso and prev_iso == iso: pick = i if self.cmb_locations.count() > 0: if pick < 0: pick = 0 self.cmb_locations.setCurrentIndex(pick) model_index = self.cmb_locations.model().index(pick, 0) self.cmb_locations.view().setCurrentIndex(model_index) self.cmb_locations.blockSignals(False) def _reset_location_typeahead(self) -> None: self._loc_typeahead_buf = "" self._apply_location_search_filter() def _location_name_ping(self, label: str, iso: str, target: str) -> tuple[str, int]: text = (label or "").strip() ping = 1_000_000 m = re.search(r"\((\d+)\s*ms\)\s*$", text, flags=re.IGNORECASE) if m: try: ping = int(m.group(1)) except Exception: ping = 1_000_000 text = text[:m.start()].strip() iso_pref = (iso or "").strip().upper() pref = iso_pref + " " if iso_pref and text.upper().startswith(pref): text = text[len(pref):].strip() name = text or (target or iso_pref or "").strip() return name, ping def on_locations_sort_changed(self, _index: int = 0) -> None: self._apply_location_search_filter() self._save_ui_preferences() def on_locations_refresh_click(self) -> None: self._safe(self._trigger_locations_refresh, title="Locations refresh error") def _trigger_locations_refresh(self) -> None: self.lbl_locations_meta.setText("Locations: refreshing...") self.lbl_locations_meta.setStyleSheet("color: orange;") self._refresh_locations_async(force_refresh=True) def _append_transport_log(self, line: str) -> None: msg = (line or "").strip() if not msg: return self._append_text(self.txt_transport, msg + "\n") def _singbox_clients_netns_state(self) -> tuple[bool, bool]: return singbox_clients_netns_state(list(self._transport_clients or [])) def _refresh_transport_netns_toggle_button(self) -> None: all_enabled, any_enabled = self._singbox_clients_netns_state() text, color = singbox_netns_toggle_button(all_enabled, any_enabled) self.btn_transport_netns_toggle.setText(text) self.btn_transport_netns_toggle.setStyleSheet(f"color: {color};") def _selected_transport_engine_id(self) -> str: return str(self.cmb_transport_engine.currentData() or "").strip() def _selected_transport_client(self): cid = self._selected_transport_engine_id() if not cid: return None for client in self._transport_clients or []: if str(getattr(client, "id", "") or "").strip() == cid: return client return None def _transport_live_health_for_client(self, client) -> tuple[str, int, str, str]: status = str(getattr(client, "status", "") or "").strip().lower() or "unknown" latency = int(getattr(getattr(client, "health", None), "latency_ms", 0) or 0) last_error = str(getattr(getattr(client, "health", None), "last_error", "") or "").strip() last_check = str(getattr(getattr(client, "health", None), "last_check", "") or "").strip() cid = str(getattr(client, "id", "") or "").strip() if not cid: return status, latency, last_error, last_check snap = self._transport_health_live.get(cid) if not isinstance(snap, dict): return status, latency, last_error, last_check snap_status = str(snap.get("status") or "").strip().lower() if snap_status: status = snap_status try: snap_latency = int(snap.get("latency_ms") or 0) if snap_latency >= 0: latency = snap_latency except Exception: pass snap_err = str(snap.get("last_error") or "").strip() if snap_err: last_error = snap_err snap_check = str(snap.get("last_check") or "").strip() if snap_check: last_check = snap_check return status, latency, last_error, last_check def _country_flag(self, country_code: str) -> str: cc = str(country_code or "").strip().upper() if len(cc) != 2 or not cc.isalpha(): return "" try: return "".join(chr(127397 + ord(ch)) for ch in cc) except Exception: return "" def _refresh_egress_identity_scope( self, scope: str, *, force: bool = False, trigger_refresh: bool = True, min_interval_sec: float = 1.0, silent: bool = True, ): scope_key = str(scope or "").strip().lower() if not scope_key: return None now = time.monotonic() last = float(self._egress_identity_last_probe_ts.get(scope_key, 0.0) or 0.0) if not force and (now - last) < max(0.2, float(min_interval_sec)): return self._egress_identity_cache.get(scope_key) self._egress_identity_last_probe_ts[scope_key] = now try: item = self.ctrl.egress_identity(scope_key, refresh=trigger_refresh) self._egress_identity_cache[scope_key] = item return item except ApiError as e: code = int(getattr(e, "status_code", 0) or 0) if not silent and code != 404: QMessageBox.warning(self, "Egress identity error", str(e)) return self._egress_identity_cache.get(scope_key) except Exception as e: if not silent: QMessageBox.warning(self, "Egress identity error", str(e)) return self._egress_identity_cache.get(scope_key) def _format_egress_identity_short(self, item) -> str: if item is None: return "" ip = str(getattr(item, "ip", "") or "").strip() if not ip: return "" code = str(getattr(item, "country_code", "") or "").strip().upper() flag = self._country_flag(code) if flag: return f"{flag} {ip}" return ip def _render_vpn_egress_label(self, item) -> None: if item is None: self.lbl_vpn_egress.setText("Egress: n/a") self.lbl_vpn_egress.setStyleSheet("color: gray;") return ip = str(getattr(item, "ip", "") or "").strip() code = str(getattr(item, "country_code", "") or "").strip().upper() name = str(getattr(item, "country_name", "") or "").strip() stale = bool(getattr(item, "stale", False)) refreshing = bool(getattr(item, "refresh_in_progress", False)) last_error = str(getattr(item, "last_error", "") or "").strip() if not ip: if refreshing: self.lbl_vpn_egress.setText("Egress: refreshing...") self.lbl_vpn_egress.setStyleSheet("color: orange;") return if last_error: cut = last_error if len(last_error) <= 120 else last_error[:117] + "..." self.lbl_vpn_egress.setText(f"Egress: n/a ({cut})") self.lbl_vpn_egress.setStyleSheet("color: red;") return self.lbl_vpn_egress.setText("Egress: n/a") self.lbl_vpn_egress.setStyleSheet("color: gray;") return flag = self._country_flag(code) prefix = f"{flag} {ip}" if flag else ip tail = "" if name: tail = f" ({name})" elif code: tail = f" ({code})" if stale: tail += " ยท stale" self.lbl_vpn_egress.setText(f"Egress: {prefix}{tail}") self.lbl_vpn_egress.setStyleSheet("color: orange;" if stale else "color: #1f6b2f;") def _poll_vpn_egress_after_switch(self, token: int, attempts_left: int) -> None: if token != self._vpn_egress_refresh_token: return item = self._refresh_egress_identity_scope( "adguardvpn", force=True, trigger_refresh=False, min_interval_sec=0.0, silent=True, ) self._render_vpn_egress_label(item) if token != self._vpn_egress_refresh_token: return refresh_in_progress = bool(getattr(item, "refresh_in_progress", False)) if item is not None else True has_ip = bool(str(getattr(item, "ip", "") or "").strip()) if item is not None else False has_country = bool( str(getattr(item, "country_code", "") or "").strip() or str(getattr(item, "country_name", "") or "").strip() ) if item is not None else False if attempts_left <= 0: return if has_ip and has_country and not refresh_in_progress and not self._vpn_switching_active: return delay_ms = 450 if attempts_left > 3 else 900 QTimer.singleShot( delay_ms, lambda tok=token, left=attempts_left - 1: self._poll_vpn_egress_after_switch(tok, left), ) def _trigger_vpn_egress_refresh(self, *, reason: str = "") -> None: scope = "adguardvpn" self._vpn_egress_refresh_token += 1 token = self._vpn_egress_refresh_token self._egress_identity_last_probe_ts[scope] = 0.0 self._vpn_autoloop_refresh_pending = False self._vpn_autoloop_last_force_refresh_ts = time.monotonic() self.lbl_vpn_egress.setText("Egress: refreshing...") self.lbl_vpn_egress.setStyleSheet("color: orange;") try: self.ctrl.egress_identity_refresh(scopes=[scope], force=True) except Exception: pass if reason: try: self.ctrl.log_gui(f"[egress] force refresh: {reason}") except Exception: pass self._poll_vpn_egress_after_switch(token, attempts_left=14) def _normalize_vpn_autoloop_state(self, unit_text: str) -> str: low = str(unit_text or "").strip().lower() if ":" in low: low = low.split(":", 1)[1].strip() if "reconnect" in low: return "reconnecting" if "disconnected" in low or "inactive" in low: return "down" if "failed" in low or "error" in low or "dead" in low: return "down" if "connected" in low: return "connected" if "active" in low or "running" in low or "enabled" in low or "up" in low: return "connected" return "unknown" def _maybe_trigger_vpn_egress_refresh_on_autoloop(self, unit_text: str) -> None: state = self._normalize_vpn_autoloop_state(unit_text) prev = str(self._vpn_autoloop_last_state or "").strip().lower() now = time.monotonic() if state in ("down", "reconnecting", "unknown"): self._vpn_autoloop_refresh_pending = True if ( state == "connected" and self._vpn_autoloop_refresh_pending and not self._vpn_switching_active and (now - float(self._vpn_autoloop_last_force_refresh_ts or 0.0)) >= 1.0 ): self._trigger_vpn_egress_refresh(reason=f"autoloop {prev or 'unknown'} -> connected") self._vpn_autoloop_last_state = state def _refresh_selected_transport_health_live( self, *, force: bool = False, min_interval_sec: float = 0.8, silent: bool = True, ) -> bool: if not self._transport_api_supported: return False cid = self._selected_transport_engine_id() if not cid: return False now = time.monotonic() if not force and (now - self._transport_health_last_probe_ts) < max(0.2, float(min_interval_sec)): return False self._transport_health_last_probe_ts = now try: snap = self.ctrl.transport_client_health(cid) except ApiError as e: if not silent and int(getattr(e, "status_code", 0) or 0) != 404: QMessageBox.warning(self, "Transport health error", str(e)) return False except Exception as e: if not silent: QMessageBox.warning(self, "Transport health error", str(e)) return False self._transport_health_live[cid] = { "status": str(getattr(snap, "status", "") or "").strip().lower(), "latency_ms": int(getattr(snap, "latency_ms", 0) or 0), "last_error": str(getattr(snap, "last_error", "") or "").strip(), "last_check": str(getattr(snap, "last_check", "") or "").strip(), } self._render_singbox_profile_cards() self._sync_singbox_profile_card_selection(cid) self._update_transport_engine_view()