306 lines
9.7 KiB
Python
306 lines
9.7 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
from typing import List, Optional, cast
|
|
|
|
from api_client import (
|
|
CmdResult,
|
|
EgressIdentity,
|
|
EgressIdentityRefreshResult,
|
|
LoginSessionAction,
|
|
LoginSessionStart,
|
|
LoginSessionState,
|
|
LoginState,
|
|
VpnLocation,
|
|
VpnLocationsState,
|
|
VpnStatus,
|
|
)
|
|
|
|
from .views import ActionView, LoginAction, LoginFlowView, VpnAutoconnectView, VpnStatusView
|
|
|
|
|
|
class VpnControllerMixin:
|
|
def vpn_locations_view(self) -> List[VpnLocation]:
|
|
return self.client.vpn_locations()
|
|
|
|
def vpn_locations_state_view(self) -> VpnLocationsState:
|
|
return self.client.vpn_locations_state()
|
|
|
|
def vpn_locations_refresh_trigger(self) -> None:
|
|
self.client.vpn_locations_refresh_trigger()
|
|
|
|
def vpn_status_view(self) -> VpnStatusView:
|
|
st = self.client.vpn_status()
|
|
pretty = self._pretty_vpn_status(st)
|
|
return VpnStatusView(
|
|
desired_location=st.desired_location,
|
|
pretty_text=pretty,
|
|
)
|
|
|
|
def vpn_status_model(self) -> VpnStatus:
|
|
return self.client.vpn_status()
|
|
|
|
# --- autoconnect / autoloop ---
|
|
|
|
def _autoconnect_from_auto(self, auto) -> bool:
|
|
"""
|
|
Вытаскиваем True/False из ответа /vpn/autoloop/status.
|
|
|
|
Приоритет:
|
|
1) явное поле auto.enabled (bool)
|
|
2) эвристика по status_word / raw_text
|
|
"""
|
|
enabled_field = getattr(auto, "enabled", None)
|
|
if isinstance(enabled_field, bool):
|
|
return enabled_field
|
|
|
|
word = (getattr(auto, "status_word", "") or "").strip().lower()
|
|
raw = (getattr(auto, "raw_text", "") or "").lower()
|
|
|
|
# приоритет — явные статусы
|
|
if word in (
|
|
"active",
|
|
"running",
|
|
"enabled",
|
|
"on",
|
|
"up",
|
|
"started",
|
|
"ok",
|
|
"true",
|
|
"yes",
|
|
):
|
|
return True
|
|
if word in ("inactive", "stopped", "disabled", "off", "down", "false", "no"):
|
|
return False
|
|
|
|
# фоллбек — по raw_text
|
|
if "inactive" in raw or "disabled" in raw or "failed" in raw:
|
|
return False
|
|
if "active" in raw or "running" in raw or "enabled" in raw:
|
|
return True
|
|
return False
|
|
|
|
def vpn_autoconnect_view(self) -> VpnAutoconnectView:
|
|
try:
|
|
auto = self.client.vpn_autoloop_status()
|
|
except Exception as e:
|
|
return VpnAutoconnectView(
|
|
enabled=False,
|
|
unit_text=f"unit: ERROR ({e})",
|
|
color="red",
|
|
)
|
|
|
|
enabled = self._autoconnect_from_auto(auto)
|
|
|
|
unit_state = (
|
|
getattr(auto, "unit_state", "") # если backend так отдаёт
|
|
or (auto.status_word or "")
|
|
or "unknown"
|
|
)
|
|
|
|
text = f"unit: {unit_state}"
|
|
|
|
low = f"{unit_state} {(auto.raw_text or '')}".lower()
|
|
if any(x in low for x in ("failed", "error", "unknown", "inactive", "dead")):
|
|
color = "red"
|
|
elif "active" in low or "running" in low or "enabled" in low:
|
|
color = "green"
|
|
else:
|
|
color = "orange"
|
|
|
|
return VpnAutoconnectView(enabled=enabled, unit_text=text, color=color)
|
|
|
|
def vpn_autoconnect_enabled(self) -> bool:
|
|
"""Старый интерфейс — оставляем для кнопки toggle."""
|
|
return self.vpn_autoconnect_view().enabled
|
|
|
|
def vpn_set_autoconnect(self, enable: bool) -> VpnStatusView:
|
|
res = self.client.vpn_autoconnect(enable)
|
|
st = self.client.vpn_status()
|
|
pretty = self._pretty_cmd_then_status(res, st)
|
|
return VpnStatusView(
|
|
desired_location=st.desired_location,
|
|
pretty_text=pretty,
|
|
)
|
|
|
|
def vpn_set_location(self, target: str, iso: str = "", label: str = "") -> VpnStatusView:
|
|
self.client.vpn_set_location(target=target, iso=iso, label=label)
|
|
st = self.client.vpn_status()
|
|
pretty = self._pretty_vpn_status(st)
|
|
return VpnStatusView(
|
|
desired_location=st.desired_location,
|
|
pretty_text=pretty,
|
|
)
|
|
|
|
def egress_identity(self, scope: str, *, refresh: bool = False) -> EgressIdentity:
|
|
return self.client.egress_identity_get(scope, refresh=refresh)
|
|
|
|
def egress_identity_refresh(
|
|
self,
|
|
*,
|
|
scopes: Optional[List[str]] = None,
|
|
force: bool = False,
|
|
) -> EgressIdentityRefreshResult:
|
|
return self.client.egress_identity_refresh(scopes=scopes, force=force)
|
|
|
|
def _pretty_vpn_status(self, st: VpnStatus) -> str:
|
|
lines = [
|
|
f"unit_state: {st.unit_state}",
|
|
f"desired_location: {st.desired_location or '—'}",
|
|
f"status: {st.status_word}",
|
|
]
|
|
if st.raw_text:
|
|
lines.append("")
|
|
lines.append(st.raw_text.strip())
|
|
return "\n".join(lines).strip() + "\n"
|
|
|
|
# -------- Login Flow (interactive) --------
|
|
|
|
def login_flow_start(self) -> LoginFlowView:
|
|
s: LoginSessionStart = self.client.vpn_login_session_start()
|
|
|
|
dot = self._level_to_color(s.level)
|
|
|
|
if not s.ok:
|
|
txt = s.error or "Failed to start login session"
|
|
return LoginFlowView(
|
|
phase=s.phase or "failed",
|
|
level=s.level or "red",
|
|
dot_color="red",
|
|
status_text=txt,
|
|
url="",
|
|
email="",
|
|
alive=False,
|
|
cursor=0,
|
|
lines=[txt],
|
|
can_open=False,
|
|
can_check=False,
|
|
can_cancel=False,
|
|
)
|
|
|
|
if (s.phase or "").lower() == "already_logged":
|
|
txt = (
|
|
f"Already logged in as {s.email}"
|
|
if s.email
|
|
else "Already logged in"
|
|
)
|
|
return LoginFlowView(
|
|
phase="already_logged",
|
|
level="green",
|
|
dot_color="green",
|
|
status_text=txt,
|
|
url="",
|
|
email=s.email or "",
|
|
alive=False,
|
|
cursor=0,
|
|
lines=[txt],
|
|
can_open=False,
|
|
can_check=False,
|
|
can_cancel=False,
|
|
)
|
|
|
|
txt = f"Login started (pid={s.pid})" if s.pid else "Login started"
|
|
return LoginFlowView(
|
|
phase=s.phase or "starting",
|
|
level=s.level or "yellow",
|
|
dot_color=dot,
|
|
status_text=txt,
|
|
url="",
|
|
email="",
|
|
alive=True,
|
|
cursor=0,
|
|
lines=[],
|
|
can_open=True,
|
|
can_check=True,
|
|
can_cancel=True,
|
|
)
|
|
|
|
def login_flow_poll(self, since: int) -> LoginFlowView:
|
|
st: LoginSessionState = self.client.vpn_login_session_state(since=since)
|
|
|
|
dot = self._level_to_color(st.level)
|
|
|
|
phase = (st.phase or "").lower()
|
|
if phase == "waiting_browser":
|
|
status_txt = "Waiting for browser authorization…"
|
|
elif phase == "checking":
|
|
status_txt = "Checking…"
|
|
elif phase == "success":
|
|
status_txt = "✅ Logged in"
|
|
elif phase == "failed":
|
|
status_txt = "❌ Login failed"
|
|
elif phase == "cancelled":
|
|
status_txt = "Cancelled"
|
|
elif phase == "already_logged":
|
|
status_txt = (
|
|
f"Already logged in as {st.email}"
|
|
if st.email
|
|
else "Already logged in"
|
|
)
|
|
else:
|
|
status_txt = st.phase or "…"
|
|
|
|
clean_lines = self._clean_login_lines(st.lines)
|
|
|
|
return LoginFlowView(
|
|
phase=st.phase,
|
|
level=st.level,
|
|
dot_color=dot,
|
|
status_text=status_txt,
|
|
url=st.url,
|
|
email=st.email,
|
|
alive=st.alive,
|
|
cursor=st.cursor,
|
|
can_open=st.can_open,
|
|
can_check=st.can_cancel,
|
|
can_cancel=st.can_cancel,
|
|
lines=clean_lines,
|
|
)
|
|
|
|
def login_flow_action(self, action: str) -> ActionView:
|
|
act = action.strip().lower()
|
|
if act not in ("open", "check", "cancel"):
|
|
raise ValueError(f"Invalid login action: {action}")
|
|
|
|
res: LoginSessionAction = self.client.vpn_login_session_action(
|
|
cast(LoginAction, act)
|
|
)
|
|
|
|
if not res.ok:
|
|
txt = res.error or "Login action failed"
|
|
return ActionView(ok=False, pretty_text=txt + "\n")
|
|
|
|
txt = f"OK: {act} → phase={res.phase} level={res.level}"
|
|
return ActionView(ok=True, pretty_text=txt + "\n")
|
|
|
|
def login_flow_stop(self) -> ActionView:
|
|
res = self.client.vpn_login_session_stop()
|
|
return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res))
|
|
|
|
def vpn_logout(self) -> ActionView:
|
|
res = self.client.vpn_logout()
|
|
return ActionView(ok=res.ok, pretty_text=self._pretty_cmd(res))
|
|
|
|
# Баннер "AdGuard VPN: logged in as ...", по клику показываем инфу как в CLI
|
|
def login_banner_cli_text(self) -> str:
|
|
try:
|
|
st: LoginState = self.client.get_login_state()
|
|
except Exception as e:
|
|
return f"Failed to query login state: {e}"
|
|
|
|
# backend может не иметь поля error, поэтому через getattr
|
|
err = getattr(st, "error", None) or getattr(st, "message", None)
|
|
if err:
|
|
return str(err)
|
|
|
|
if st.email:
|
|
return f"You are already logged in.\nCurrent user is {st.email}"
|
|
|
|
if st.state:
|
|
return f"Login state: {st.state}"
|
|
|
|
return "No login information available."
|
|
|
|
# -------- Routes --------
|
|
|