428 lines
17 KiB
Python
428 lines
17 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
|
|
from PySide6 import QtCore
|
|
|
|
from dashboard_controller import TraceMode
|
|
from main_window.workers import EventThread, LocationsThread
|
|
|
|
|
|
class RuntimeRefreshMixin:
|
|
def _start_events_stream(self) -> None:
|
|
if self.events_thread:
|
|
return
|
|
self.events_thread = EventThread(self.ctrl, self)
|
|
self.events_thread.eventReceived.connect(self._handle_event)
|
|
self.events_thread.error.connect(self._handle_event_error)
|
|
self.events_thread.start()
|
|
|
|
@QtCore.Slot(object)
|
|
def _handle_event(self, ev) -> None:
|
|
try:
|
|
kinds = self.ctrl.classify_event(ev)
|
|
except Exception:
|
|
kinds = []
|
|
|
|
# Отдельно ловим routes_nft_progress, чтобы обновить лейбл прогресса.
|
|
try:
|
|
k = (getattr(ev, "kind", "") or "").strip().lower()
|
|
except Exception:
|
|
k = ""
|
|
|
|
if k == "routes_nft_progress":
|
|
try:
|
|
prog_view = self.ctrl.routes_nft_progress_from_event(ev)
|
|
self._update_routes_progress_label(prog_view)
|
|
except Exception:
|
|
# не роняем UI, просто игнор
|
|
pass
|
|
|
|
# Простая стратегия: триггерить существующие refresh-функции.
|
|
if "status" in kinds:
|
|
self.refresh_status_tab()
|
|
if "login" in kinds:
|
|
self.refresh_login_banner()
|
|
if "vpn" in kinds:
|
|
self.refresh_vpn_tab()
|
|
if "routes" in kinds:
|
|
self.refresh_routes_tab()
|
|
if "dns" in kinds:
|
|
self.refresh_dns_tab()
|
|
if "transport" in kinds:
|
|
self.refresh_singbox_tab()
|
|
self._refresh_selected_transport_health_live(silent=True)
|
|
if "trace" in kinds:
|
|
self.refresh_trace_tab()
|
|
|
|
|
|
@QtCore.Slot(str)
|
|
def _handle_event_error(self, msg: str) -> None:
|
|
# Логируем в trace, UI не блокируем.
|
|
try:
|
|
self.ctrl.log_gui(f"[sse-error] {msg}")
|
|
except Exception:
|
|
pass
|
|
|
|
# ---------------- REFRESH ----------------
|
|
|
|
def refresh_everything(self) -> None:
|
|
self.refresh_login_banner()
|
|
self.refresh_status_tab()
|
|
self.refresh_vpn_tab()
|
|
self.refresh_singbox_tab()
|
|
self.refresh_routes_tab()
|
|
self.refresh_dns_tab()
|
|
self.refresh_domains_tab()
|
|
self.refresh_trace_tab()
|
|
|
|
def refresh_login_banner(self) -> None:
|
|
def work():
|
|
view = self.ctrl.get_login_view()
|
|
|
|
self._set_auth_button(view.logged_in)
|
|
|
|
if self._vpn_switching_active:
|
|
if self._is_vpn_switching_expired():
|
|
self._stop_vpn_location_switching()
|
|
else:
|
|
target = (self._vpn_switching_target or "").strip()
|
|
msg = "AdGuard VPN: Switching location..."
|
|
if target:
|
|
msg = f"AdGuard VPN: Switching location to {target}..."
|
|
self.btn_login_banner.setText(msg)
|
|
self.btn_login_banner.setStyleSheet(
|
|
"text-align: left; border: none; color: #d4a200;"
|
|
)
|
|
return
|
|
|
|
self.btn_login_banner.setText(view.text)
|
|
# Принудительно: зелёный если залогинен, серый если нет
|
|
color = "green" if view.logged_in else "gray"
|
|
base_style = "text-align: left; border: none;"
|
|
self.btn_login_banner.setStyleSheet(
|
|
f"{base_style} color: {color};"
|
|
)
|
|
|
|
self._safe(work, title="Login state error")
|
|
|
|
def refresh_status_tab(self) -> None:
|
|
def work():
|
|
view = self.ctrl.get_status_overview()
|
|
self.st_timestamp.setText(view.timestamp)
|
|
self.st_counts.setText(view.counts)
|
|
self.st_iface.setText(view.iface_table_mark)
|
|
|
|
self._set_status_label_color(
|
|
self.st_route, view.policy_route, kind="policy"
|
|
)
|
|
self._set_status_label_color(
|
|
self.st_routes_service, view.routes_service, kind="service"
|
|
)
|
|
self._set_status_label_color(
|
|
self.st_smartdns_service, view.smartdns_service, kind="service"
|
|
)
|
|
self._set_status_label_color(
|
|
self.st_vpn_service, view.vpn_service, kind="service"
|
|
)
|
|
|
|
self._safe(work, title="Status error")
|
|
|
|
def refresh_vpn_tab(self) -> None:
|
|
def work():
|
|
view = self.ctrl.vpn_status_view()
|
|
prev_desired = (self._vpn_desired_location_last_seen or "").strip().lower()
|
|
self._vpn_desired_location = (view.desired_location or "").strip()
|
|
current_desired = (self._vpn_desired_location or "").strip().lower()
|
|
self._vpn_desired_location_last_seen = self._vpn_desired_location
|
|
txt = []
|
|
if view.desired_location:
|
|
txt.append(f"Desired location: {view.desired_location}")
|
|
if view.pretty_text:
|
|
txt.append(view.pretty_text.rstrip())
|
|
self._set_text(self.txt_vpn, "\n".join(txt).strip() + "\n")
|
|
|
|
auto_view = self.ctrl.vpn_autoconnect_view()
|
|
self.btn_autoconnect_toggle.setText(
|
|
"Disable autoconnect" if auto_view.enabled else "Enable autoconnect"
|
|
)
|
|
self.lbl_autoconnect_state.setText(auto_view.unit_text)
|
|
self.lbl_autoconnect_state.setStyleSheet(
|
|
f"color: {auto_view.color};"
|
|
)
|
|
|
|
vpn_egress = self._refresh_egress_identity_scope(
|
|
"adguardvpn",
|
|
trigger_refresh=True,
|
|
silent=True,
|
|
)
|
|
self._render_vpn_egress_label(vpn_egress)
|
|
self._maybe_trigger_vpn_egress_refresh_on_autoloop(auto_view.unit_text)
|
|
if prev_desired and current_desired and prev_desired != current_desired:
|
|
self._trigger_vpn_egress_refresh(
|
|
reason=f"desired location changed: {prev_desired} -> {current_desired}"
|
|
)
|
|
|
|
if self._vpn_switching_active:
|
|
unit_low = (auto_view.unit_text or "").strip().lower()
|
|
elapsed = self._vpn_switching_elapsed_sec()
|
|
if any(
|
|
x in unit_low
|
|
for x in ("disconnected", "reconnecting", "unknown", "error", "inactive", "failed", "dead")
|
|
):
|
|
self._vpn_switching_seen_non_connected = True
|
|
|
|
desired_now = (self._vpn_desired_location or "").strip().lower()
|
|
target_now = (self._vpn_switching_target or "").strip().lower()
|
|
desired_matches = bool(target_now and desired_now and target_now == desired_now)
|
|
|
|
if self._is_vpn_switching_expired():
|
|
self._stop_vpn_location_switching()
|
|
elif (
|
|
"connected" in unit_low
|
|
and "disconnected" not in unit_low
|
|
and elapsed >= float(self._vpn_switching_min_visible_sec)
|
|
and (self._vpn_switching_seen_non_connected or desired_matches)
|
|
):
|
|
switched_to = (self._vpn_switching_target or "").strip()
|
|
self._stop_vpn_location_switching()
|
|
if switched_to:
|
|
self._trigger_vpn_egress_refresh(
|
|
reason=f"location switch completed: {switched_to}"
|
|
)
|
|
self.refresh_login_banner()
|
|
|
|
self._refresh_locations_async()
|
|
|
|
self._safe(work, title="VPN error")
|
|
|
|
def refresh_singbox_tab(self) -> None:
|
|
def work():
|
|
self.refresh_transport_engines(silent=True)
|
|
self.refresh_transport_policy_locks(silent=True)
|
|
self._apply_singbox_profile_controls()
|
|
self._safe(work, title="SingBox error")
|
|
|
|
def _start_vpn_location_switching(self, target: str) -> None:
|
|
self._vpn_switching_active = True
|
|
self._vpn_switching_target = str(target or "").strip()
|
|
self._vpn_switching_started_at = time.monotonic()
|
|
self._vpn_switching_seen_non_connected = False
|
|
|
|
def _stop_vpn_location_switching(self) -> None:
|
|
self._vpn_switching_active = False
|
|
self._vpn_switching_target = ""
|
|
self._vpn_switching_started_at = 0.0
|
|
self._vpn_switching_seen_non_connected = False
|
|
|
|
def _is_vpn_switching_expired(self) -> bool:
|
|
if not self._vpn_switching_active:
|
|
return False
|
|
started = float(self._vpn_switching_started_at or 0.0)
|
|
if started <= 0:
|
|
return False
|
|
return (time.monotonic() - started) >= float(self._vpn_switching_timeout_sec)
|
|
|
|
def _vpn_switching_elapsed_sec(self) -> float:
|
|
if not self._vpn_switching_active:
|
|
return 0.0
|
|
started = float(self._vpn_switching_started_at or 0.0)
|
|
if started <= 0:
|
|
return 0.0
|
|
return max(0.0, time.monotonic() - started)
|
|
|
|
def _refresh_locations_async(self, force_refresh: bool = False) -> None:
|
|
if self.locations_thread and self.locations_thread.isRunning():
|
|
self._locations_refresh_pending = True
|
|
if force_refresh:
|
|
self._locations_force_refresh_pending = True
|
|
return
|
|
|
|
run_force_refresh = bool(force_refresh or self._locations_force_refresh_pending)
|
|
self._locations_refresh_pending = False
|
|
self._locations_force_refresh_pending = False
|
|
self.locations_thread = LocationsThread(
|
|
self.ctrl,
|
|
force_refresh=run_force_refresh,
|
|
parent=self,
|
|
)
|
|
self.locations_thread.loaded.connect(self._on_locations_loaded)
|
|
self.locations_thread.error.connect(self._on_locations_error)
|
|
self.locations_thread.finished.connect(self._on_locations_finished)
|
|
self.locations_thread.start()
|
|
|
|
@QtCore.Slot(object)
|
|
def _on_locations_loaded(self, state) -> None:
|
|
try:
|
|
self._apply_locations_state(state)
|
|
except Exception as e:
|
|
self._on_locations_error(str(e))
|
|
|
|
@QtCore.Slot(str)
|
|
def _on_locations_error(self, msg: str) -> None:
|
|
msg = (msg or "").strip()
|
|
if not msg:
|
|
msg = "failed to load locations"
|
|
self.lbl_locations_meta.setText(f"Locations: {msg}")
|
|
self.lbl_locations_meta.setStyleSheet("color: red;")
|
|
try:
|
|
self.ctrl.log_gui(f"[vpn-locations] {msg}")
|
|
except Exception:
|
|
pass
|
|
|
|
@QtCore.Slot()
|
|
def _on_locations_finished(self) -> None:
|
|
self.locations_thread = None
|
|
if self._locations_refresh_pending:
|
|
force_refresh = self._locations_force_refresh_pending
|
|
self._locations_refresh_pending = False
|
|
self._locations_force_refresh_pending = False
|
|
self._refresh_locations_async(force_refresh=force_refresh)
|
|
|
|
def _apply_locations_state(self, state) -> None:
|
|
all_items: list[tuple[str, str, str, str, int]] = []
|
|
for loc in getattr(state, "locations", []) or []:
|
|
iso = str(getattr(loc, "iso", "") or "").strip().upper()
|
|
label = str(getattr(loc, "label", "") or "").strip()
|
|
target = str(getattr(loc, "target", "") or "").strip()
|
|
if not iso or not label:
|
|
continue
|
|
if not target:
|
|
target = iso
|
|
name, ping = self._location_name_ping(label, iso, target)
|
|
all_items.append((label, iso, target, name, ping))
|
|
|
|
self._all_locations = all_items
|
|
self._apply_location_search_filter()
|
|
self._render_locations_meta(state)
|
|
|
|
def _render_locations_meta(self, state) -> None:
|
|
parts = []
|
|
color = "gray"
|
|
|
|
updated_at = str(getattr(state, "updated_at", "") or "").strip()
|
|
stale = bool(getattr(state, "stale", False))
|
|
refreshing = bool(getattr(state, "refresh_in_progress", False))
|
|
last_error = str(getattr(state, "last_error", "") or "").strip()
|
|
next_retry = str(getattr(state, "next_retry_at", "") or "").strip()
|
|
|
|
if refreshing:
|
|
parts.append("refreshing")
|
|
color = "orange"
|
|
if updated_at:
|
|
parts.append(f"updated: {updated_at}")
|
|
else:
|
|
parts.append("updated: n/a")
|
|
if stale:
|
|
parts.append("stale cache")
|
|
color = "orange"
|
|
if last_error:
|
|
cut = last_error if len(last_error) <= 120 else last_error[:117] + "..."
|
|
parts.append(f"last error: {cut}")
|
|
color = "red" if not refreshing else "orange"
|
|
if next_retry:
|
|
parts.append(f"next retry: {next_retry}")
|
|
|
|
self.lbl_locations_meta.setText(" | ".join(parts))
|
|
self.lbl_locations_meta.setStyleSheet(f"color: {color};")
|
|
|
|
def refresh_routes_tab(self) -> None:
|
|
def work():
|
|
timer_enabled = self.ctrl.routes_timer_enabled()
|
|
self.chk_timer.blockSignals(True)
|
|
self.chk_timer.setChecked(bool(timer_enabled))
|
|
self.chk_timer.blockSignals(False)
|
|
|
|
t = self.ctrl.traffic_mode_view()
|
|
self._set_traffic_mode_state(
|
|
t.desired_mode,
|
|
t.applied_mode,
|
|
t.preferred_iface,
|
|
bool(t.advanced_active),
|
|
bool(t.auto_local_bypass),
|
|
bool(t.auto_local_active),
|
|
bool(t.ingress_reply_bypass),
|
|
bool(t.ingress_reply_active),
|
|
int(t.bypass_candidates),
|
|
int(t.overrides_applied),
|
|
int(t.cgroup_resolved_uids),
|
|
t.cgroup_warning,
|
|
bool(t.healthy),
|
|
bool(t.ingress_rule_present),
|
|
bool(t.ingress_nft_active),
|
|
bool(t.probe_ok),
|
|
t.probe_message,
|
|
t.active_iface,
|
|
t.iface_reason,
|
|
t.message,
|
|
)
|
|
rs = self.ctrl.routes_resolve_summary_view()
|
|
self.lbl_routes_resolve_summary.setText(rs.text)
|
|
self.lbl_routes_resolve_summary.setStyleSheet(f"color: {rs.color};")
|
|
self.lbl_routes_recheck_summary.setText(rs.recheck_text)
|
|
self.lbl_routes_recheck_summary.setStyleSheet(f"color: {rs.recheck_color};")
|
|
self._safe(work, title="Routes error")
|
|
|
|
def refresh_dns_tab(self) -> None:
|
|
def work():
|
|
self._dns_ui_refresh = True
|
|
try:
|
|
pool = self.ctrl.dns_upstream_pool_view()
|
|
self._set_dns_resolver_summary(getattr(pool, "items", []))
|
|
|
|
st = self.ctrl.dns_status_view()
|
|
self.ent_smartdns_addr.setText(st.smartdns_addr or "")
|
|
|
|
mode = (getattr(st, "mode", "") or "").strip().lower()
|
|
if mode in ("hybrid_wildcard", "hybrid"):
|
|
hybrid_enabled = True
|
|
mode = "hybrid_wildcard"
|
|
else:
|
|
hybrid_enabled = False
|
|
mode = "direct"
|
|
|
|
self.chk_dns_via_smartdns.blockSignals(True)
|
|
self.chk_dns_via_smartdns.setChecked(hybrid_enabled)
|
|
self.chk_dns_via_smartdns.blockSignals(False)
|
|
|
|
unit_state = (st.unit_state or "unknown").strip().lower()
|
|
unit_active = unit_state == "active"
|
|
self.chk_dns_unit_relay.blockSignals(True)
|
|
self.chk_dns_unit_relay.setChecked(unit_active)
|
|
self.chk_dns_unit_relay.blockSignals(False)
|
|
|
|
self.chk_dns_runtime_nftset.blockSignals(True)
|
|
self.chk_dns_runtime_nftset.setChecked(bool(getattr(st, "runtime_nftset", True)))
|
|
self.chk_dns_runtime_nftset.blockSignals(False)
|
|
self._set_dns_unit_relay_state(unit_active)
|
|
self._set_dns_runtime_state(
|
|
bool(getattr(st, "runtime_nftset", True)),
|
|
str(getattr(st, "wildcard_source", "") or ""),
|
|
str(getattr(st, "runtime_config_error", "") or ""),
|
|
)
|
|
self._set_dns_mode_state(mode)
|
|
finally:
|
|
self._dns_ui_refresh = False
|
|
self._safe(work, title="DNS error")
|
|
|
|
def refresh_domains_tab(self) -> None:
|
|
def work():
|
|
# reload currently selected file
|
|
self.on_domains_load()
|
|
self._safe(work, title="Domains error")
|
|
|
|
def refresh_trace_tab(self) -> None:
|
|
def work():
|
|
if self.radio_trace_gui.isChecked():
|
|
mode: TraceMode = "gui"
|
|
elif self.radio_trace_smartdns.isChecked():
|
|
mode = "smartdns"
|
|
else:
|
|
mode = "full"
|
|
dump = self.ctrl.trace_view(mode)
|
|
text = "\n".join(dump.lines).rstrip()
|
|
if dump.lines:
|
|
text += "\n"
|
|
self._set_text(self.txt_trace, text, preserve_scroll=True)
|
|
self._safe(work, title="Trace error")
|