449 lines
17 KiB
Python
449 lines
17 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
import time
|
|
|
|
from PySide6 import QtCore
|
|
from PySide6.QtCore import QTimer, Qt
|
|
from PySide6.QtWidgets import QMessageBox
|
|
|
|
from api_client import ApiError
|
|
from main_window.constants import LOCATION_TARGET_ROLE
|
|
from netns_debug import singbox_clients_netns_state, singbox_netns_toggle_button
|
|
|
|
|
|
class UILocationRuntimeMixin:
|
|
def eventFilter(self, obj, event): # pragma: no cover - GUI
|
|
cmb = getattr(self, "cmb_locations", None)
|
|
try:
|
|
view = cmb.view() if cmb is not None else None
|
|
except RuntimeError:
|
|
return super().eventFilter(obj, event)
|
|
if obj in (cmb, view):
|
|
if event.type() == QtCore.QEvent.KeyPress:
|
|
if self._handle_location_keypress(event):
|
|
return True
|
|
return super().eventFilter(obj, event)
|
|
|
|
def _handle_location_keypress(self, event) -> bool:
|
|
key = int(event.key())
|
|
if key == int(Qt.Key_Backspace):
|
|
if self._loc_typeahead_buf:
|
|
self._loc_typeahead_buf = self._loc_typeahead_buf[:-1]
|
|
self._apply_location_search_filter()
|
|
self.loc_typeahead_timer.start()
|
|
self.cmb_locations.showPopup()
|
|
return True
|
|
|
|
if key == int(Qt.Key_Escape):
|
|
self._reset_location_typeahead()
|
|
return True
|
|
|
|
text = event.text() or ""
|
|
if len(text) != 1 or not text.isprintable() or text.isspace():
|
|
return False
|
|
|
|
self._loc_typeahead_buf += text.lower()
|
|
count = self._apply_location_search_filter()
|
|
if count == 0 and len(self._loc_typeahead_buf) > 1:
|
|
self._loc_typeahead_buf = text.lower()
|
|
self._apply_location_search_filter()
|
|
self.loc_typeahead_timer.start()
|
|
self.cmb_locations.showPopup()
|
|
return True
|
|
|
|
def _apply_location_search_filter(self) -> int:
|
|
source = list(self._all_locations or [])
|
|
query = (self._loc_typeahead_buf or "").strip().lower()
|
|
|
|
if not source:
|
|
self._set_locations_combo_items([])
|
|
return 0
|
|
|
|
items = source
|
|
if query:
|
|
items = [
|
|
row
|
|
for row in source
|
|
if self._location_matches(query, row[0], row[1], row[2], row[3])
|
|
]
|
|
|
|
items = self._sort_location_items(items)
|
|
self._set_locations_combo_items(items)
|
|
return len(items)
|
|
|
|
def _location_matches(
|
|
self,
|
|
query: str,
|
|
label: str,
|
|
iso: str,
|
|
target: str,
|
|
name: str,
|
|
) -> bool:
|
|
q = (query or "").strip().lower()
|
|
if not q:
|
|
return True
|
|
|
|
iso_l = (iso or "").strip().lower()
|
|
label_l = (label or "").strip().lower()
|
|
target_l = (target or "").strip().lower()
|
|
name_l = (name or "").strip().lower()
|
|
|
|
if iso_l.startswith(q):
|
|
return True
|
|
if target_l.startswith(q) or label_l.startswith(q) or name_l.startswith(q):
|
|
return True
|
|
|
|
tokens = [t for t in re.split(r"[^\w]+", f"{target_l} {name_l} {label_l}") if t]
|
|
if any(tok.startswith(q) for tok in tokens):
|
|
return True
|
|
return q in target_l or q in name_l or q in label_l
|
|
|
|
def _sort_location_items(
|
|
self,
|
|
items: list[tuple[str, str, str, str, int]],
|
|
) -> list[tuple[str, str, str, str, int]]:
|
|
mode = str(self.cmb_locations_sort.currentData() or "ping").strip().lower()
|
|
if mode == "ping_desc":
|
|
return sorted(items, key=lambda x: (-x[4], x[3].lower(), x[0].lower()))
|
|
if mode == "name":
|
|
return sorted(items, key=lambda x: (x[3].lower(), x[4], x[0].lower()))
|
|
if mode == "name_desc":
|
|
return sorted(items, key=lambda x: (x[3].lower(), x[4], x[0].lower()), reverse=True)
|
|
return sorted(items, key=lambda x: (x[4], x[3].lower(), x[0].lower()))
|
|
|
|
def _set_locations_combo_items(self, items: list[tuple[str, str, str, str, int]]) -> None:
|
|
prev_target = str(self.cmb_locations.currentData(LOCATION_TARGET_ROLE) or "").strip()
|
|
prev_iso = str(self.cmb_locations.currentData() or "").strip().upper()
|
|
desired = (self._vpn_desired_location or "").strip()
|
|
desired_l = desired.lower()
|
|
|
|
self.cmb_locations.blockSignals(True)
|
|
self.cmb_locations.clear()
|
|
|
|
pick = -1
|
|
for i, (label, iso, target, _name, _ping) in enumerate(items):
|
|
self.cmb_locations.addItem(label, iso)
|
|
self.cmb_locations.setItemData(i, target, LOCATION_TARGET_ROLE)
|
|
|
|
iso_l = (iso or "").strip().lower()
|
|
target_l = (target or "").strip().lower()
|
|
if desired_l and desired_l in (iso_l, target_l):
|
|
pick = i
|
|
if pick < 0 and prev_target and prev_target == target:
|
|
pick = i
|
|
if pick < 0 and prev_iso and prev_iso == iso:
|
|
pick = i
|
|
|
|
if self.cmb_locations.count() > 0:
|
|
if pick < 0:
|
|
pick = 0
|
|
self.cmb_locations.setCurrentIndex(pick)
|
|
model_index = self.cmb_locations.model().index(pick, 0)
|
|
self.cmb_locations.view().setCurrentIndex(model_index)
|
|
|
|
self.cmb_locations.blockSignals(False)
|
|
|
|
def _reset_location_typeahead(self) -> None:
|
|
self._loc_typeahead_buf = ""
|
|
self._apply_location_search_filter()
|
|
|
|
def _location_name_ping(self, label: str, iso: str, target: str) -> tuple[str, int]:
|
|
text = (label or "").strip()
|
|
ping = 1_000_000
|
|
|
|
m = re.search(r"\((\d+)\s*ms\)\s*$", text, flags=re.IGNORECASE)
|
|
if m:
|
|
try:
|
|
ping = int(m.group(1))
|
|
except Exception:
|
|
ping = 1_000_000
|
|
text = text[:m.start()].strip()
|
|
|
|
iso_pref = (iso or "").strip().upper()
|
|
pref = iso_pref + " "
|
|
if iso_pref and text.upper().startswith(pref):
|
|
text = text[len(pref):].strip()
|
|
|
|
name = text or (target or iso_pref or "").strip()
|
|
return name, ping
|
|
|
|
def on_locations_sort_changed(self, _index: int = 0) -> None:
|
|
self._apply_location_search_filter()
|
|
self._save_ui_preferences()
|
|
|
|
def on_locations_refresh_click(self) -> None:
|
|
self._safe(self._trigger_locations_refresh, title="Locations refresh error")
|
|
|
|
def _trigger_locations_refresh(self) -> None:
|
|
self.lbl_locations_meta.setText("Locations: refreshing...")
|
|
self.lbl_locations_meta.setStyleSheet("color: orange;")
|
|
self._refresh_locations_async(force_refresh=True)
|
|
|
|
def _append_transport_log(self, line: str) -> None:
|
|
msg = (line or "").strip()
|
|
if not msg:
|
|
return
|
|
self._append_text(self.txt_transport, msg + "\n")
|
|
|
|
def _singbox_clients_netns_state(self) -> tuple[bool, bool]:
|
|
return singbox_clients_netns_state(list(self._transport_clients or []))
|
|
|
|
def _refresh_transport_netns_toggle_button(self) -> None:
|
|
all_enabled, any_enabled = self._singbox_clients_netns_state()
|
|
text, color = singbox_netns_toggle_button(all_enabled, any_enabled)
|
|
self.btn_transport_netns_toggle.setText(text)
|
|
self.btn_transport_netns_toggle.setStyleSheet(f"color: {color};")
|
|
|
|
def _selected_transport_engine_id(self) -> str:
|
|
return str(self.cmb_transport_engine.currentData() or "").strip()
|
|
|
|
def _selected_transport_client(self):
|
|
cid = self._selected_transport_engine_id()
|
|
if not cid:
|
|
return None
|
|
for client in self._transport_clients or []:
|
|
if str(getattr(client, "id", "") or "").strip() == cid:
|
|
return client
|
|
return None
|
|
|
|
def _transport_live_health_for_client(self, client) -> tuple[str, int, str, str]:
|
|
status = str(getattr(client, "status", "") or "").strip().lower() or "unknown"
|
|
latency = int(getattr(getattr(client, "health", None), "latency_ms", 0) or 0)
|
|
last_error = str(getattr(getattr(client, "health", None), "last_error", "") or "").strip()
|
|
last_check = str(getattr(getattr(client, "health", None), "last_check", "") or "").strip()
|
|
cid = str(getattr(client, "id", "") or "").strip()
|
|
if not cid:
|
|
return status, latency, last_error, last_check
|
|
snap = self._transport_health_live.get(cid)
|
|
if not isinstance(snap, dict):
|
|
return status, latency, last_error, last_check
|
|
snap_status = str(snap.get("status") or "").strip().lower()
|
|
if snap_status:
|
|
status = snap_status
|
|
try:
|
|
snap_latency = int(snap.get("latency_ms") or 0)
|
|
if snap_latency >= 0:
|
|
latency = snap_latency
|
|
except Exception:
|
|
pass
|
|
snap_err = str(snap.get("last_error") or "").strip()
|
|
if snap_err:
|
|
last_error = snap_err
|
|
snap_check = str(snap.get("last_check") or "").strip()
|
|
if snap_check:
|
|
last_check = snap_check
|
|
return status, latency, last_error, last_check
|
|
|
|
def _country_flag(self, country_code: str) -> str:
|
|
cc = str(country_code or "").strip().upper()
|
|
if len(cc) != 2 or not cc.isalpha():
|
|
return ""
|
|
try:
|
|
return "".join(chr(127397 + ord(ch)) for ch in cc)
|
|
except Exception:
|
|
return ""
|
|
|
|
def _refresh_egress_identity_scope(
|
|
self,
|
|
scope: str,
|
|
*,
|
|
force: bool = False,
|
|
trigger_refresh: bool = True,
|
|
min_interval_sec: float = 1.0,
|
|
silent: bool = True,
|
|
):
|
|
scope_key = str(scope or "").strip().lower()
|
|
if not scope_key:
|
|
return None
|
|
|
|
now = time.monotonic()
|
|
last = float(self._egress_identity_last_probe_ts.get(scope_key, 0.0) or 0.0)
|
|
if not force and (now - last) < max(0.2, float(min_interval_sec)):
|
|
return self._egress_identity_cache.get(scope_key)
|
|
|
|
self._egress_identity_last_probe_ts[scope_key] = now
|
|
try:
|
|
item = self.ctrl.egress_identity(scope_key, refresh=trigger_refresh)
|
|
self._egress_identity_cache[scope_key] = item
|
|
return item
|
|
except ApiError as e:
|
|
code = int(getattr(e, "status_code", 0) or 0)
|
|
if not silent and code != 404:
|
|
QMessageBox.warning(self, "Egress identity error", str(e))
|
|
return self._egress_identity_cache.get(scope_key)
|
|
except Exception as e:
|
|
if not silent:
|
|
QMessageBox.warning(self, "Egress identity error", str(e))
|
|
return self._egress_identity_cache.get(scope_key)
|
|
|
|
def _format_egress_identity_short(self, item) -> str:
|
|
if item is None:
|
|
return ""
|
|
ip = str(getattr(item, "ip", "") or "").strip()
|
|
if not ip:
|
|
return ""
|
|
code = str(getattr(item, "country_code", "") or "").strip().upper()
|
|
flag = self._country_flag(code)
|
|
if flag:
|
|
return f"{flag} {ip}"
|
|
return ip
|
|
|
|
def _render_vpn_egress_label(self, item) -> None:
|
|
if item is None:
|
|
self.lbl_vpn_egress.setText("Egress: n/a")
|
|
self.lbl_vpn_egress.setStyleSheet("color: gray;")
|
|
return
|
|
|
|
ip = str(getattr(item, "ip", "") or "").strip()
|
|
code = str(getattr(item, "country_code", "") or "").strip().upper()
|
|
name = str(getattr(item, "country_name", "") or "").strip()
|
|
stale = bool(getattr(item, "stale", False))
|
|
refreshing = bool(getattr(item, "refresh_in_progress", False))
|
|
last_error = str(getattr(item, "last_error", "") or "").strip()
|
|
|
|
if not ip:
|
|
if refreshing:
|
|
self.lbl_vpn_egress.setText("Egress: refreshing...")
|
|
self.lbl_vpn_egress.setStyleSheet("color: orange;")
|
|
return
|
|
if last_error:
|
|
cut = last_error if len(last_error) <= 120 else last_error[:117] + "..."
|
|
self.lbl_vpn_egress.setText(f"Egress: n/a ({cut})")
|
|
self.lbl_vpn_egress.setStyleSheet("color: red;")
|
|
return
|
|
self.lbl_vpn_egress.setText("Egress: n/a")
|
|
self.lbl_vpn_egress.setStyleSheet("color: gray;")
|
|
return
|
|
|
|
flag = self._country_flag(code)
|
|
prefix = f"{flag} {ip}" if flag else ip
|
|
tail = ""
|
|
if name:
|
|
tail = f" ({name})"
|
|
elif code:
|
|
tail = f" ({code})"
|
|
if stale:
|
|
tail += " · stale"
|
|
self.lbl_vpn_egress.setText(f"Egress: {prefix}{tail}")
|
|
self.lbl_vpn_egress.setStyleSheet("color: orange;" if stale else "color: #1f6b2f;")
|
|
|
|
def _poll_vpn_egress_after_switch(self, token: int, attempts_left: int) -> None:
|
|
if token != self._vpn_egress_refresh_token:
|
|
return
|
|
item = self._refresh_egress_identity_scope(
|
|
"adguardvpn",
|
|
force=True,
|
|
trigger_refresh=False,
|
|
min_interval_sec=0.0,
|
|
silent=True,
|
|
)
|
|
self._render_vpn_egress_label(item)
|
|
if token != self._vpn_egress_refresh_token:
|
|
return
|
|
refresh_in_progress = bool(getattr(item, "refresh_in_progress", False)) if item is not None else True
|
|
has_ip = bool(str(getattr(item, "ip", "") or "").strip()) if item is not None else False
|
|
has_country = bool(
|
|
str(getattr(item, "country_code", "") or "").strip()
|
|
or str(getattr(item, "country_name", "") or "").strip()
|
|
) if item is not None else False
|
|
if attempts_left <= 0:
|
|
return
|
|
if has_ip and has_country and not refresh_in_progress and not self._vpn_switching_active:
|
|
return
|
|
delay_ms = 450 if attempts_left > 3 else 900
|
|
QTimer.singleShot(
|
|
delay_ms,
|
|
lambda tok=token, left=attempts_left - 1: self._poll_vpn_egress_after_switch(tok, left),
|
|
)
|
|
|
|
def _trigger_vpn_egress_refresh(self, *, reason: str = "") -> None:
|
|
scope = "adguardvpn"
|
|
self._vpn_egress_refresh_token += 1
|
|
token = self._vpn_egress_refresh_token
|
|
self._egress_identity_last_probe_ts[scope] = 0.0
|
|
self._vpn_autoloop_refresh_pending = False
|
|
self._vpn_autoloop_last_force_refresh_ts = time.monotonic()
|
|
self.lbl_vpn_egress.setText("Egress: refreshing...")
|
|
self.lbl_vpn_egress.setStyleSheet("color: orange;")
|
|
try:
|
|
self.ctrl.egress_identity_refresh(scopes=[scope], force=True)
|
|
except Exception:
|
|
pass
|
|
if reason:
|
|
try:
|
|
self.ctrl.log_gui(f"[egress] force refresh: {reason}")
|
|
except Exception:
|
|
pass
|
|
self._poll_vpn_egress_after_switch(token, attempts_left=14)
|
|
|
|
def _normalize_vpn_autoloop_state(self, unit_text: str) -> str:
|
|
low = str(unit_text or "").strip().lower()
|
|
if ":" in low:
|
|
low = low.split(":", 1)[1].strip()
|
|
if "reconnect" in low:
|
|
return "reconnecting"
|
|
if "disconnected" in low or "inactive" in low:
|
|
return "down"
|
|
if "failed" in low or "error" in low or "dead" in low:
|
|
return "down"
|
|
if "connected" in low:
|
|
return "connected"
|
|
if "active" in low or "running" in low or "enabled" in low or "up" in low:
|
|
return "connected"
|
|
return "unknown"
|
|
|
|
def _maybe_trigger_vpn_egress_refresh_on_autoloop(self, unit_text: str) -> None:
|
|
state = self._normalize_vpn_autoloop_state(unit_text)
|
|
prev = str(self._vpn_autoloop_last_state or "").strip().lower()
|
|
now = time.monotonic()
|
|
|
|
if state in ("down", "reconnecting", "unknown"):
|
|
self._vpn_autoloop_refresh_pending = True
|
|
|
|
if (
|
|
state == "connected"
|
|
and self._vpn_autoloop_refresh_pending
|
|
and not self._vpn_switching_active
|
|
and (now - float(self._vpn_autoloop_last_force_refresh_ts or 0.0)) >= 1.0
|
|
):
|
|
self._trigger_vpn_egress_refresh(reason=f"autoloop {prev or 'unknown'} -> connected")
|
|
|
|
self._vpn_autoloop_last_state = state
|
|
|
|
def _refresh_selected_transport_health_live(
|
|
self,
|
|
*,
|
|
force: bool = False,
|
|
min_interval_sec: float = 0.8,
|
|
silent: bool = True,
|
|
) -> bool:
|
|
if not self._transport_api_supported:
|
|
return False
|
|
cid = self._selected_transport_engine_id()
|
|
if not cid:
|
|
return False
|
|
now = time.monotonic()
|
|
if not force and (now - self._transport_health_last_probe_ts) < max(0.2, float(min_interval_sec)):
|
|
return False
|
|
self._transport_health_last_probe_ts = now
|
|
try:
|
|
snap = self.ctrl.transport_client_health(cid)
|
|
except ApiError as e:
|
|
if not silent and int(getattr(e, "status_code", 0) or 0) != 404:
|
|
QMessageBox.warning(self, "Transport health error", str(e))
|
|
return False
|
|
except Exception as e:
|
|
if not silent:
|
|
QMessageBox.warning(self, "Transport health error", str(e))
|
|
return False
|
|
self._transport_health_live[cid] = {
|
|
"status": str(getattr(snap, "status", "") or "").strip().lower(),
|
|
"latency_ms": int(getattr(snap, "latency_ms", 0) or 0),
|
|
"last_error": str(getattr(snap, "last_error", "") or "").strip(),
|
|
"last_check": str(getattr(snap, "last_check", "") or "").strip(),
|
|
}
|
|
self._render_singbox_profile_cards()
|
|
self._sync_singbox_profile_card_selection(cid)
|
|
self._update_transport_engine_view()
|