chore: prune GUI legacy scripts

Keep only active GUI modules in selective-vpn-gui; archive old copies under _legacy (ignored).
This commit is contained in:
beckline
2026-02-14 16:01:49 +03:00
parent b39b666946
commit dc0cb88832
3 changed files with 2 additions and 1546 deletions

View File

@@ -1,645 +0,0 @@
#!/usr/bin/env python3
import argparse
import concurrent.futures
import json
import os
import sys
import time
from collections import defaultdict
# --- dnspython --------------------------------------------------------
try:
import dns.resolver
import dns.reversename
import dns.exception
except ImportError as e:
print(f"[resolver] dnspython is required: {e}", file=sys.stderr)
sys.exit(2)
# --------------------------------------------------------------------
# Общий DNS-конфиг
# --------------------------------------------------------------------
DNS_CONFIG_PATH = "/etc/selective-vpn/dns-upstreams.conf"
DEFAULT_DNS_DEFAULT = ["94.140.14.14", "94.140.15.15"]
DEFAULT_DNS_META = ["46.243.231.30", "46.243.231.41"]
DNS_DEFAULT = DEFAULT_DNS_DEFAULT.copy()
DNS_META = DEFAULT_DNS_META.copy()
# --------------------------------------------------------------------
# helpers
# --------------------------------------------------------------------
def log(msg, trace_log=None):
line = f"[resolver] {msg}"
print(line, file=sys.stderr)
if trace_log:
try:
with open(trace_log, "a") as f:
f.write(line + "\n")
except Exception:
pass
def is_private_ipv4(ip: str) -> bool:
"""
ip может быть "A.B.C.D" или "A.B.C.D/nn".
Возвращаем True, если адрес из приватных диапазонов.
"""
parts = ip.split("/")
base = parts[0]
try:
o1, o2, o3, o4 = map(int, base.split("."))
except ValueError:
return True
if o1 == 10:
return True
if o1 == 127:
return True
if o1 == 0:
return True
if o1 == 192 and o2 == 168:
return True
if o1 == 172 and 16 <= o2 <= 31:
return True
return False
def load_list(path):
if not os.path.exists(path):
return []
out = []
with open(path, "r") as f:
for line in f:
s = line.strip()
if not s or s.startswith("#"):
continue
out.append(s)
return out
def load_cache(path):
if not os.path.exists(path):
return {}
try:
with open(path, "r") as f:
return json.load(f)
except Exception:
return {}
def save_cache(path, data):
tmp = path + ".tmp"
try:
with open(tmp, "w") as f:
json.dump(data, f, indent=2, sort_keys=True)
os.replace(tmp, path)
except Exception:
pass
def split_dns(dns: str):
"""
Разбор записи вида:
"1.2.3.4" -> ("1.2.3.4", None)
"1.2.3.4#6053" -> ("1.2.3.4", "6053")
"""
if "#" in dns:
host, port = dns.split("#", 1)
host = host.strip()
port = port.strip()
if not host:
host = "127.0.0.1"
if not port:
port = "53"
return host, port
return dns, None
# --------------------------------------------------------------------
# dnspython-резолвы
# --------------------------------------------------------------------
def dig_a(host, dns_list, timeout=3):
"""
A-резолв через dnspython.
dns_list: либо строка "IP[#PORT]", либо список таких строк.
"""
if isinstance(dns_list, str):
dns_list = [dns_list]
ips = []
for entry in dns_list:
server, port = split_dns(entry)
if not server:
continue
r = dns.resolver.Resolver(configure=False)
r.nameservers = [server]
if port:
try:
r.port = int(port)
except ValueError:
r.port = 53
r.timeout = timeout
r.lifetime = timeout
try:
answer = r.resolve(host, "A")
except dns.exception.DNSException:
continue
except Exception:
continue
for rr in answer:
s = rr.to_text().strip()
parts = s.split(".")
if len(parts) != 4:
continue
if all(p.isdigit() and 0 <= int(p) <= 255 for p in parts):
if not is_private_ipv4(s) and s not in ips:
ips.append(s)
return ips
def dig_ptr(ip, upstream, timeout=3):
"""
PTR-резолв: ip -> список имён.
dns может быть "IP" или "IP#PORT".
"""
server, port = split_dns(upstream)
if not server:
return []
r = dns.resolver.Resolver(configure=False)
r.nameservers = [server]
if port:
try:
r.port = int(port)
except ValueError:
r.port = 53
r.timeout = timeout
r.lifetime = timeout
try:
rev = dns.reversename.from_address(ip)
except Exception:
return []
try:
answer = r.resolve(rev, "PTR")
except dns.exception.DNSException:
return []
except Exception:
return []
names = []
for rr in answer:
s = rr.to_text().strip()
if s.endswith("."):
s = s[:-1]
if s:
names.append(s.lower())
return names
# --------------------------------------------------------------------
# Загрузка DNS-конфига
# --------------------------------------------------------------------
def load_dns_config(path=DNS_CONFIG_PATH, trace_log=None):
"""
Читает /etc/selective-vpn/dns-upstreams.conf и обновляет
глобальные DNS_DEFAULT / DNS_META.
Формат строк:
default 1.2.3.4 5.6.7.8
meta 9.9.9.9 8.8.8.8
Можно использовать "ip#port", например 127.0.0.1#6053.
"""
global DNS_DEFAULT, DNS_META
if not os.path.exists(path):
DNS_DEFAULT = DEFAULT_DNS_DEFAULT.copy()
DNS_META = DEFAULT_DNS_META.copy()
log(
f"dns-config: {path} not found, fallback to built-in defaults "
f"(default={DNS_DEFAULT}, meta={DNS_META})",
trace_log,
)
return
dflt = []
meta = []
try:
with open(path, "r") as f:
for line in f:
s = line.strip()
if not s or s.startswith("#"):
continue
parts = s.split()
if len(parts) < 2:
continue
key = parts[0].lower()
addrs = parts[1:]
if key == "default":
dflt.extend(addrs)
elif key == "meta":
meta.extend(addrs)
except Exception as e:
DNS_DEFAULT = DEFAULT_DNS_DEFAULT.copy()
DNS_META = DEFAULT_DNS_META.copy()
log(
f"dns-config: failed to read {path}: {e}, fallback to built-in defaults "
f"(default={DNS_DEFAULT}, meta={DNS_META})",
trace_log,
)
return
if not dflt:
dflt = DEFAULT_DNS_DEFAULT.copy()
log(
"dns-config: no 'default' section, fallback to built-in for default",
trace_log,
)
if not meta:
meta = DEFAULT_DNS_META.copy()
log("dns-config: no 'meta' section, fallback to built-in for meta", trace_log)
DNS_DEFAULT = dflt
DNS_META = meta
log(
f"dns-config: accept {path}: "
f"default={', '.join(DNS_DEFAULT)}; meta={', '.join(DNS_META)}",
trace_log,
)
def resolve_host(host, meta_special, trace_log=None):
"""
Forward-резолв одного домена (A-записи).
DNS берём из DNS_DEFAULT / DNS_META, которые загрузил load_dns_config().
"""
if host in meta_special:
dns_list = DNS_META
else:
dns_list = DNS_DEFAULT
ips = dig_a(host, dns_list)
uniq = []
for ip in ips:
if ip not in uniq:
uniq.append(ip)
if uniq:
log(f"{host}: {', '.join(uniq)}", trace_log)
else:
log(f"{host}: no IPs", trace_log)
return uniq
def parse_static_entries(static_lines):
"""
static_lines — строки из static-ips.txt.
Возвращаем список кортежей (ip_entry, base_ip, comment).
"""
entries = []
for line in static_lines:
s = line.strip()
if not s or s.startswith("#"):
continue
if "#" in s:
ip_part, comment = s.split("#", 1)
ip_part = ip_part.strip()
comment = comment.strip()
else:
ip_part = s
comment = ""
if not ip_part:
continue
if is_private_ipv4(ip_part):
continue
base_ip = ip_part.split("/", 1)[0]
entries.append((ip_part, base_ip, comment))
return entries
def resolve_static_entries(static_entries, ptr_cache, ttl_sec, trace_log=None):
"""
static_entries: список кортежей (ip_entry, base_ip, comment).
ip_entry — как в static-ips.txt (может быть с /mask)
base_ip — A.B.C.D (без маски)
comment — текст после # или "".
Возвращаем dict: ip_entry -> список меток,
уже с префиксом '*' (чтобы можно было искать).
"""
now = int(time.time())
result = {}
for ip_entry, base_ip, comment in static_entries:
labels = []
# 1) если есть комментарий — он главнее всего
if comment:
labels.append(f"*{comment}")
# 2) если комментария нет, пробуем PTR (с кэшем)
if not comment:
cache_entry = ptr_cache.get(base_ip)
names = []
if (
cache_entry
and isinstance(cache_entry, dict)
and isinstance(cache_entry.get("last_resolved"), (int, float))
):
age = now - cache_entry["last_resolved"]
cached_names = cache_entry.get("names") or []
if age <= ttl_sec and cached_names:
names = cached_names
if not names:
# PTR через те же DNS, что и обычный трафик (используем первый из default)
dns_for_ptr = DNS_DEFAULT[0] if DNS_DEFAULT else DEFAULT_DNS_DEFAULT[0]
try:
names = dig_ptr(base_ip, dns_for_ptr) or []
except Exception as e:
log(
f"PTR failed for {base_ip} (using {dns_for_ptr}): "
f"{type(e).__name__}: {e}",
trace_log,
)
names = []
uniq_names = []
for n in names:
if n not in uniq_names:
uniq_names.append(n)
names = uniq_names
ptr_cache[base_ip] = {
"names": names,
"last_resolved": now,
}
for n in names:
labels.append(f"*{n}")
# 3) если вообще ничего нет — ставим общий тег
if not labels:
labels = ["*[STATIC-IP]"]
result[ip_entry] = labels
log(f"static {ip_entry}: labels={', '.join(labels)}", trace_log)
return result
# --------------------------------------------------------------------
# API-слой: одна чистая функция, которую легко вызвать откуда угодно
# --------------------------------------------------------------------
def run_resolver_job(
*,
domains,
meta_special,
static_lines,
cache_path,
ptr_cache_path,
ttl_sec,
workers,
trace_log=None,
):
"""
Главный API резолвера.
Вход:
domains — список доменов
meta_special — set() доменов из meta-special.txt
static_lines — строки из static-ips.txt
cache_path — путь к domain-cache.json
ptr_cache_path— путь к ptr-cache.json
ttl_sec — TTL кэша доменов / PTR
workers — число потоков
trace_log — путь к trace.log (или None)
Выход: dict с ключами:
ips — отсортированный список IP/подсетей
ip_map — список (ip, label) пар (домен или *LABEL)
domain_cache — обновлённый кэш доменов
ptr_cache — обновлённый PTR-кэш
summary — статистика (dict)
"""
# --- подгружаем DNS-конфиг ---
load_dns_config(DNS_CONFIG_PATH, trace_log)
meta_special = set(meta_special or [])
log(f"domains to resolve: {len(domains)}", trace_log)
# --- кэши ---
domain_cache = load_cache(cache_path)
ptr_cache = load_cache(ptr_cache_path)
now = int(time.time())
# --- разруливаем: что берём из domain_cache, что резолвим ---
fresh_from_cache = {}
to_resolve = []
for d in domains:
entry = domain_cache.get(d)
if entry and isinstance(entry, dict):
ts = entry.get("last_resolved") or 0
ips = entry.get("ips") or []
if isinstance(ts, (int, float)) and isinstance(ips, list) and ips:
if now - ts <= ttl_sec:
valid_ips = [ip for ip in ips if not is_private_ipv4(ip)]
if valid_ips:
fresh_from_cache[d] = valid_ips
continue
to_resolve.append(d)
log(
f"from cache: {len(fresh_from_cache)}, to resolve: {len(to_resolve)}",
trace_log,
)
resolved = dict(fresh_from_cache)
total_domains = len(domains)
cache_hits = len(fresh_from_cache)
resolved_now = 0
unresolved = 0
# --- параллельный резолв доменов ---
if to_resolve:
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as ex:
fut2host = {
ex.submit(resolve_host, d, meta_special, trace_log): d
for d in to_resolve
}
for fut in concurrent.futures.as_completed(fut2host):
d = fut2host[fut]
try:
ips = fut.result()
except Exception as e:
log(f"{d}: resolver exception: {e}", trace_log)
ips = []
if ips:
resolved[d] = ips
domain_cache[d] = {
"ips": ips,
"last_resolved": now,
}
resolved_now += 1
else:
unresolved += 1
# --- читаем static-ips и готовим список для PTR ---
static_entries = parse_static_entries(static_lines)
log(f"static entries: {len(static_entries)}", trace_log)
# --- PTR/labels для static-ips ---
static_label_map = resolve_static_entries(
static_entries, ptr_cache, ttl_sec, trace_log
)
# --- собираем общий список IP и map ---
ip_set = set()
ip_to_domains = defaultdict(set)
# доменные IP
for d, ips in resolved.items():
for ip in ips:
ip_set.add(ip)
ip_to_domains[ip].add(d)
# статические IP / сети
for ip_entry, _, _ in static_entries:
ip_set.add(ip_entry)
for label in static_label_map.get(ip_entry, []):
ip_to_domains[ip_entry].add(label)
unique_ip_count = len(ip_set)
if unique_ip_count == 0:
log("no IPs resolved at all", trace_log)
else:
log(f"resolver done: {unique_ip_count} unique IPs", trace_log)
ips_sorted = sorted(ip_set)
# flatten ip_map
ip_map_pairs = []
for ip in ips_sorted:
for dom in sorted(ip_to_domains[ip]):
ip_map_pairs.append((ip, dom))
summary = {
"domains_total": total_domains,
"from_cache": cache_hits,
"resolved_now": resolved_now,
"unresolved": unresolved,
"static_entries": len(static_entries),
"unique_ips": unique_ip_count,
}
log(
"summary: domains=%d, from_cache=%d, resolved_now=%d, "
"unresolved=%d, static_entries=%d, unique_ips=%d"
% (
summary["domains_total"],
summary["from_cache"],
summary["resolved_now"],
summary["unresolved"],
summary["static_entries"],
summary["unique_ips"],
),
trace_log,
)
return {
"ips": ips_sorted,
"ip_map": ip_map_pairs,
"domain_cache": domain_cache,
"ptr_cache": ptr_cache,
"summary": summary,
}
# --------------------------------------------------------------------
# CLI-обёртка вокруг API-функции (для bash-скрипта)
# --------------------------------------------------------------------
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--domains", required=True, help="file with domains (one per line)")
ap.add_argument("--output-ips", required=True, help="file to write unique IPs")
ap.add_argument(
"--output-map",
required=True,
help="file to write IP<TAB>domain map",
)
ap.add_argument("--meta-file", required=True, help="meta-special.txt path")
ap.add_argument("--static-ips", required=True, help="static-ips.txt path")
ap.add_argument("--cache", required=True, help="domain-cache.json path")
ap.add_argument("--ptr-cache", required=True, help="ptr-cache.json path")
ap.add_argument("--trace-log", default=None)
ap.add_argument("--workers", type=int, default=40)
ap.add_argument("--ttl-sec", type=int, default=24 * 3600)
args = ap.parse_args()
trace_log = args.trace_log
try:
# входные данные для API-функции
domains = load_list(args.domains)
meta_special = load_list(args.meta_file)
static_lines = []
if os.path.exists(args.static_ips):
with open(args.static_ips, "r") as f:
static_lines = f.read().splitlines()
job_result = run_resolver_job(
domains=domains,
meta_special=meta_special,
static_lines=static_lines,
cache_path=args.cache,
ptr_cache_path=args.ptr_cache,
ttl_sec=args.ttl_sec,
workers=args.workers,
trace_log=trace_log,
)
ips_sorted = job_result["ips"]
ip_map_pairs = job_result["ip_map"]
domain_cache = job_result["domain_cache"]
ptr_cache = job_result["ptr_cache"]
# output-ips: по одному IP/подсети
with open(args.output_ips, "w") as f:
for ip in ips_sorted:
f.write(ip + "\n")
# output-map: IP<TAB>домен/метка
with open(args.output_map, "w") as f:
for ip, dom in ip_map_pairs:
f.write(f"{ip}\t{dom}\n")
# сохраняем кэши
save_cache(args.cache, domain_cache)
save_cache(args.ptr_cache, ptr_cache)
return 0
except Exception as e:
# настоящий фатал
log(f"FATAL resolver error: {e}", trace_log)
import traceback
traceback.print_exc(file=sys.stderr)
return 2
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,901 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Selective-VPN Dashboard (UI only)
RULES:
- This file must NOT know anything about REST paths, HTTP methods, or JSON keys.
- It talks ONLY to DashboardController (which uses ApiClient).
"""
from __future__ import annotations
import re
import subprocess
import sys
import tkinter as tk
from tkinter import messagebox
from tkinter import ttk
from typing import Literal, Optional, cast, Tuple
from api_client import ApiClient, DnsUpstreams
from dashboard_controller import DashboardController
TraceMode = Literal["full", "gui", "smartdns"]
# убираем спам автопроверки из логов UI (на всякий случай, даже если почистил controller)
_NEXT_CHECK_RE = re.compile(r"(?:\b\d+s\.)?\s*Next check in\s+\d+s\.?", re.IGNORECASE)
class App(ttk.Frame):
def __init__(self, master: tk.Tk, ctrl: DashboardController) -> None:
super().__init__(master)
self.master = master
self.ctrl = ctrl
# login-flow runtime
self._login_flow_active: bool = False
self._login_cursor: int = 0
self._login_url_opened: bool = False
self._login_poll_after_id: Optional[str] = None
self._build_ui()
self._wire_events()
self.after(50, self.refresh_everything)
self.master.protocol("WM_DELETE_WINDOW", self._on_close)
# ---------------- UI BUILD ----------------
def _build_ui(self) -> None:
self.master.title("Selective-VPN Dashboard")
self.pack(fill="both", expand=True)
# Top bar
top = ttk.Frame(self)
top.pack(fill="x", padx=10, pady=(10, 6))
self.btn_refresh = ttk.Button(top, text="Refresh all", command=self.refresh_everything)
self.btn_refresh.pack(side="left")
# Login indicator (dot + text)
self.login_dot = tk.Canvas(top, width=12, height=12, highlightthickness=0)
self.login_dot.pack(side="left", padx=(12, 4))
self._login_dot_id = self.login_dot.create_oval(2, 2, 10, 10, fill="gray", outline="")
self.lbl_login = ttk.Label(top, text="AdGuard VPN: ...", font=("TkDefaultFont", 10, "bold"))
self.lbl_login.pack(side="left", padx=(0, 10))
# Single auth button (Login/Logout)
self.btn_auth = ttk.Button(top, text="Login", command=self.on_auth_button)
self.btn_auth.pack(side="left")
self.lbl_hint = ttk.Label(top, text="(GUI contains no API logic)", foreground="gray")
self.lbl_hint.pack(side="right")
# Notebook
self.nb = ttk.Notebook(self)
self.nb.pack(fill="both", expand=True, padx=10, pady=(0, 10))
self._build_tab_status()
self._build_tab_vpn()
self._build_tab_routes()
self._build_tab_dns()
self._build_tab_domains()
self._build_tab_trace()
def _build_tab_status(self) -> None:
tab = ttk.Frame(self.nb)
self.nb.add(tab, text="Status")
frm = ttk.Frame(tab)
frm.pack(fill="both", expand=True, padx=10, pady=10)
grid = ttk.Frame(frm)
grid.pack(fill="x")
def row(r: int, label: str) -> ttk.Label:
ttk.Label(grid, text=label).grid(row=r, column=0, sticky="w", pady=2)
v = ttk.Label(grid, text="")
v.grid(row=r, column=1, sticky="w", pady=2, padx=(10, 0))
return v
self.st_timestamp = row(0, "Timestamp")
self.st_counts = row(1, "Counts")
self.st_iface = row(2, "Iface/Table/Mark")
self.st_route = row(3, "Policy route")
self.st_routesvc = row(4, "Routes service")
self.st_smartdns = row(5, "SmartDNS service")
self.st_vpnsvc = row(6, "VPN service")
btns = ttk.Frame(frm)
btns.pack(fill="x", pady=(10, 0))
ttk.Button(btns, text="Refresh status", command=self.refresh_status_tab).pack(side="left")
def _build_tab_vpn(self) -> None:
tab = ttk.Frame(self.nb)
self.nb.add(tab, text="AdGuardVPN")
# Pages container
self.vpn_pages = ttk.Frame(tab)
self.vpn_pages.pack(fill="both", expand=True, padx=10, pady=10)
self.vpn_page_main = ttk.Frame(self.vpn_pages)
self.vpn_page_login = ttk.Frame(self.vpn_pages)
for p in (self.vpn_page_main, self.vpn_page_login):
p.grid(row=0, column=0, sticky="nsew")
self.vpn_pages.rowconfigure(0, weight=1)
self.vpn_pages.columnconfigure(0, weight=1)
# -------- Page 1: main VPN controls (Enter Login removed) --------
frm = self.vpn_page_main
top_actions = ttk.Frame(frm)
top_actions.pack(fill="x", pady=(0, 10))
ttk.Button(top_actions, text="Refresh", command=self.refresh_vpn_tab).pack(side="right")
# Autoconnect toggle
ac = ttk.LabelFrame(frm, text="Auto-connect")
ac.pack(fill="x")
self.var_autoconnect = tk.BooleanVar(value=False)
self.chk_autoconnect = ttk.Checkbutton(
ac,
text="Enable auto-connect",
variable=self.var_autoconnect,
command=self.on_toggle_autoconnect,
)
self.chk_autoconnect.pack(side="left", padx=10, pady=8)
# Location picker
loc = ttk.LabelFrame(frm, text="Location")
loc.pack(fill="x", pady=(10, 0))
self.cmb_location = ttk.Combobox(loc, state="readonly", width=40)
self.cmb_location.pack(side="left", padx=10, pady=8)
self.btn_set_location = ttk.Button(loc, text="Set location", command=self.on_set_location)
self.btn_set_location.pack(side="left", padx=6, pady=8)
self.lbl_vpn_desired = ttk.Label(loc, text="Desired: —", foreground="gray")
self.lbl_vpn_desired.pack(side="left", padx=12)
# Status output
st = ttk.LabelFrame(frm, text="VPN Status")
st.pack(fill="both", expand=True, pady=(10, 0))
self.txt_vpn = tk.Text(st, height=12, wrap="none")
self.txt_vpn.pack(fill="both", expand=True, padx=10, pady=10)
# -------- Page 2: Login flow --------
lf = self.vpn_page_login
lf_top = ttk.Frame(lf)
lf_top.pack(fill="x", pady=(0, 10))
ttk.Button(lf_top, text="← Back", command=self.on_login_back).pack(side="left")
self.login_flow_dot = tk.Canvas(lf_top, width=14, height=14, highlightthickness=0)
self.login_flow_dot.pack(side="left", padx=(10, 4))
self._login_flow_dot_id = self.login_flow_dot.create_oval(2, 2, 12, 12, fill="orange", outline="")
self.lbl_login_flow_status = ttk.Label(lf_top, text="Status: —", font=("TkDefaultFont", 10, "bold"))
self.lbl_login_flow_status.pack(side="left", padx=(0, 10))
self.lbl_login_flow_email = ttk.Label(lf_top, text="", foreground="gray")
self.lbl_login_flow_email.pack(side="left")
url_row = ttk.Frame(lf)
url_row.pack(fill="x", pady=(0, 10))
ttk.Label(url_row, text="URL:").pack(side="left")
self.var_login_url = tk.StringVar(value="")
self.ent_login_url = ttk.Entry(url_row, textvariable=self.var_login_url, state="readonly")
self.ent_login_url.pack(side="left", fill="x", expand=True, padx=8)
self.btn_login_copy = ttk.Button(url_row, text="Copy", command=self.on_login_copy)
self.btn_login_copy.pack(side="left", padx=(0, 6))
self.btn_login_open = ttk.Button(url_row, text="Open", command=self.on_login_open)
self.btn_login_open.pack(side="left")
ctrl_row = ttk.Frame(lf)
ctrl_row.pack(fill="x", pady=(0, 10))
self.btn_login_check = ttk.Button(ctrl_row, text="Check", command=self.on_login_check)
self.btn_login_check.pack(side="left")
self.btn_login_close = ttk.Button(ctrl_row, text="Close (cancel)", command=self.on_login_cancel)
self.btn_login_close.pack(side="left", padx=6)
self.btn_login_stop = ttk.Button(ctrl_row, text="Stop (force)", command=self.on_login_stop)
self.btn_login_stop.pack(side="left", padx=6)
# Log output
out = ttk.LabelFrame(lf, text="Login output")
out.pack(fill="both", expand=True)
self.txt_login_flow = tk.Text(out, wrap="word", height=16)
self.txt_login_flow.pack(fill="both", expand=True, padx=10, pady=10)
self._show_vpn_page("main")
def _build_tab_routes(self) -> None:
tab = ttk.Frame(self.nb)
self.nb.add(tab, text="Routes")
frm = ttk.Frame(tab)
frm.pack(fill="both", expand=True, padx=10, pady=10)
svc = ttk.LabelFrame(frm, text="Routes service")
svc.pack(fill="x")
ttk.Button(svc, text="Start", command=lambda: self.on_routes_action("start")).pack(side="left", padx=10, pady=8)
ttk.Button(svc, text="Stop", command=lambda: self.on_routes_action("stop")).pack(side="left", padx=6, pady=8)
ttk.Button(svc, text="Restart", command=lambda: self.on_routes_action("restart")).pack(side="left", padx=6, pady=8)
ttk.Button(svc, text="Clear routes", command=self.on_routes_clear).pack(side="right", padx=10, pady=8)
timer = ttk.LabelFrame(frm, text="Timer")
timer.pack(fill="x", pady=(10, 0))
self.var_timer = tk.BooleanVar(value=False)
self.chk_timer = ttk.Checkbutton(timer, text="Enable timer", variable=self.var_timer, command=self.on_toggle_timer)
self.chk_timer.pack(side="left", padx=10, pady=8)
ttk.Button(timer, text="Fix policy route", command=self.on_fix_policy_route).pack(side="right", padx=10, pady=8)
out = ttk.LabelFrame(frm, text="Output")
out.pack(fill="both", expand=True, pady=(10, 0))
self.txt_routes = tk.Text(out, height=12, wrap="none")
self.txt_routes.pack(fill="both", expand=True, padx=10, pady=10)
def _build_tab_dns(self) -> None:
tab = ttk.Frame(self.nb)
self.nb.add(tab, text="DNS")
frm = ttk.Frame(tab)
frm.pack(fill="both", expand=True, padx=10, pady=10)
ups = ttk.LabelFrame(frm, text="Upstreams")
ups.pack(fill="x")
def add_field(r: int, label: str) -> ttk.Entry:
ttk.Label(ups, text=label).grid(row=r, column=0, sticky="w", padx=10, pady=4)
e = ttk.Entry(ups, width=60)
e.grid(row=r, column=1, sticky="we", padx=10, pady=4)
return e
ups.columnconfigure(1, weight=1)
self.ent_def1 = add_field(0, "default1")
self.ent_def2 = add_field(1, "default2")
self.ent_meta1 = add_field(2, "meta1")
self.ent_meta2 = add_field(3, "meta2")
btns = ttk.Frame(frm)
btns.pack(fill="x", pady=(10, 0))
ttk.Button(btns, text="Refresh", command=self.refresh_dns_tab).pack(side="left")
ttk.Button(btns, text="Save", command=self.on_save_upstreams).pack(side="left", padx=6)
sm = ttk.LabelFrame(frm, text="SmartDNS")
sm.pack(fill="both", expand=True, pady=(10, 0))
top = ttk.Frame(sm)
top.pack(fill="x", padx=10, pady=(10, 6))
self.lbl_smartdns_state = ttk.Label(top, text="Service: —")
self.lbl_smartdns_state.pack(side="left")
ttk.Button(top, text="Start", command=lambda: self.on_smartdns_action("start")).pack(side="right", padx=6)
ttk.Button(top, text="Stop", command=lambda: self.on_smartdns_action("stop")).pack(side="right")
mid = ttk.Frame(sm)
mid.pack(fill="both", expand=True, padx=10, pady=(0, 10))
ttk.Label(mid, text="Wildcards (one per line):").pack(anchor="w")
self.txt_wildcards = tk.Text(mid, height=10, wrap="none")
self.txt_wildcards.pack(fill="both", expand=True, pady=(4, 6))
btns2 = ttk.Frame(mid)
btns2.pack(fill="x")
ttk.Button(btns2, text="Refresh", command=self.refresh_dns_tab).pack(side="left")
ttk.Button(btns2, text="Save", command=self.on_save_wildcards).pack(side="left", padx=6)
def _build_tab_domains(self) -> None:
tab = ttk.Frame(self.nb)
self.nb.add(tab, text="Domains")
frm = ttk.Frame(tab)
frm.pack(fill="both", expand=True, padx=10, pady=10)
left = ttk.Frame(frm)
left.pack(side="left", fill="y")
right = ttk.Frame(frm)
right.pack(side="left", fill="both", expand=True, padx=(10, 0))
ttk.Label(left, text="Files:").pack(anchor="w")
self.lst_files = tk.Listbox(left, height=6, exportselection=False)
for name in ("bases", "meta", "subs", "static"):
self.lst_files.insert("end", name)
self.lst_files.selection_set(0)
self.lst_files.pack(fill="y", pady=(4, 8))
ttk.Button(left, text="Refresh table", command=self.refresh_domains_tab).pack(fill="x")
ttk.Button(left, text="Load file", command=self.on_domains_load).pack(fill="x", pady=(6, 0))
ttk.Button(left, text="Save file", command=self.on_domains_save).pack(fill="x", pady=(6, 0))
ttk.Button(left, text="Load AGVPN table", command=self.on_load_agvpn_table).pack(fill="x", pady=(10, 0))
ttk.Button(left, text="Load SmartDNS table", command=self.on_load_smartdns_table).pack(fill="x", pady=(6, 0))
top = ttk.Frame(right)
top.pack(fill="x")
self.lbl_domains_info = ttk.Label(top, text="", foreground="gray")
self.lbl_domains_info.pack(side="left")
self.txt_domains = tk.Text(right, wrap="none")
self.txt_domains.pack(fill="both", expand=True, pady=(6, 0))
def _build_tab_trace(self) -> None:
tab = ttk.Frame(self.nb)
self.nb.add(tab, text="Trace")
frm = ttk.Frame(tab)
frm.pack(fill="both", expand=True, padx=10, pady=10)
top = ttk.Frame(frm)
top.pack(fill="x")
self.var_trace_mode = tk.StringVar(value="full")
for m, title in (("full", "Full"), ("gui", "GUI"), ("smartdns", "SmartDNS")):
ttk.Radiobutton(top, text=title, value=m, variable=self.var_trace_mode, command=self.refresh_trace_tab).pack(
side="left", padx=(0, 10)
)
ttk.Button(top, text="Refresh", command=self.refresh_trace_tab).pack(side="right")
self.txt_trace = tk.Text(frm, wrap="none")
self.txt_trace.pack(fill="both", expand=True, pady=(10, 0))
def _wire_events(self) -> None:
self.lst_files.bind("<<ListboxSelect>>", lambda _e: self.on_domains_load())
# ---------------- UI HELPERS ----------------
def _set_text(self, widget: tk.Text, text: str) -> None:
widget.config(state="normal")
widget.delete("1.0", "end")
widget.insert("1.0", text)
widget.config(state="normal")
def _append_text(self, widget: tk.Text, text: str) -> None:
widget.config(state="normal")
widget.insert("end", text)
widget.see("end")
widget.config(state="normal")
def _clean_ui_lines(self, lines) -> str:
# финальная страховка: убираем "Next check" и нормализуем \r
buf = "\n".join([str(x) for x in (lines or [])]).replace("\r", "\n")
out_lines = []
for ln in buf.splitlines():
t = ln.strip()
if not t:
continue
t2 = _NEXT_CHECK_RE.sub("", t).strip()
if not t2:
continue
out_lines.append(t2)
return "\n".join(out_lines).rstrip()
def _get_selected_domains_file(self) -> str:
sel = self.lst_files.curselection()
if not sel:
return "bases"
return str(self.lst_files.get(sel[0]))
def _read_local_file(self, path: str) -> str:
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
except Exception:
return ""
def _safe(self, fn, *, title: str = "Error"):
try:
return fn()
except Exception as e:
messagebox.showerror(title, str(e))
return None
def _set_dot(self, canvas: tk.Canvas, dot_id: int, color: str) -> None:
c = (color or "").strip().lower()
if c in ("green", "ok", "true"):
fill = "green"
elif c in ("red", "error", "false"):
fill = "red"
elif c in ("orange", "yellow", "try", "unknown", "pending", "wait"):
fill = "orange"
else:
fill = "gray"
try:
canvas.itemconfigure(dot_id, fill=fill)
except Exception:
pass
def _show_vpn_page(self, which: Literal["main", "login"]) -> None:
if which == "login":
self.vpn_page_login.tkraise()
else:
self.vpn_page_main.tkraise()
def _parse_login_banner(self, text: str, color: str) -> Tuple[bool, str]:
# считаем "logged" если зеленый
is_logged = (color or "").strip().lower() == "green"
email = ""
t = (text or "")
# пытаемся вытащить email из строки
m = re.search(r"([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,})", t)
if m:
email = m.group(1)
return is_logged, email
def _set_auth_button(self, logged: bool) -> None:
self.btn_auth.config(text=("Logout" if logged else "Login"))
# ---------------- REFRESH ----------------
def refresh_everything(self) -> None:
self.refresh_status_tab()
self.refresh_vpn_tab()
self.refresh_routes_tab()
self.refresh_dns_tab()
self.refresh_domains_tab()
self.refresh_trace_tab()
self.refresh_login_banner()
def refresh_login_banner(self) -> None:
def work():
view = self.ctrl.get_login_view()
self.lbl_login.config(text=view.text)
self._set_dot(self.login_dot, self._login_dot_id, view.color)
# НЕ гадаем по цвету: используем нормализованную логику controller-а
self._set_auth_button(bool(view.logged_in))
try:
self.lbl_login.config(foreground=view.color)
except tk.TclError:
pass
self._safe(work, title="Login state error")
def refresh_status_tab(self) -> None:
def work():
view = self.ctrl.get_status_overview()
self.st_timestamp.config(text=view.timestamp)
self.st_counts.config(text=view.counts)
self.st_iface.config(text=view.iface_table_mark)
self.st_route.config(text=view.policy_route)
self.st_routesvc.config(text=view.routes_service)
self.st_smartdns.config(text=view.smartdns_service)
self.st_vpnsvc.config(text=view.vpn_service)
self._safe(work, title="Status error")
def refresh_vpn_tab(self) -> None:
def work():
locs = self.ctrl.vpn_locations_view()
self.cmb_location["values"] = [f"{x.iso}{x.label}" for x in locs]
st = self.ctrl.vpn_status_view()
self.lbl_vpn_desired.config(text=f"Desired: {st.desired_location or ''}")
self._set_text(self.txt_vpn, st.pretty_text)
self.var_autoconnect.set(self.ctrl.vpn_autoconnect_enabled())
self._safe(work, title="VPN error")
def refresh_routes_tab(self) -> None:
def work():
self.var_timer.set(self.ctrl.routes_timer_enabled())
self._safe(work, title="Routes error")
def refresh_dns_tab(self) -> None:
def work():
cfg = self.ctrl.dns_upstreams_view()
self.ent_def1.delete(0, "end"); self.ent_def1.insert(0, cfg.default1)
self.ent_def2.delete(0, "end"); self.ent_def2.insert(0, cfg.default2)
self.ent_meta1.delete(0, "end"); self.ent_meta1.insert(0, cfg.meta1)
self.ent_meta2.delete(0, "end"); self.ent_meta2.insert(0, cfg.meta2)
sd = self.ctrl.smartdns_service_view()
self.lbl_smartdns_state.config(text=f"Service: {sd.state}")
wc = self.ctrl.smartdns_wildcards_view()
self._set_text(self.txt_wildcards, "\n".join(wc.domains).strip() + ("\n" if wc.domains else ""))
self._safe(work, title="DNS error")
def refresh_domains_tab(self) -> None:
def work():
table = self.ctrl.domains_table_view()
self.lbl_domains_info.config(text=f"Table lines: {len(table.lines)}")
self._safe(work, title="Domains error")
def refresh_trace_tab(self) -> None:
def work():
mode = cast(TraceMode, self.var_trace_mode.get())
dump = self.ctrl.trace_view(mode)
self._set_text(self.txt_trace, "\n".join(dump.lines).strip() + ("\n" if dump.lines else ""))
self._safe(work, title="Trace error")
# ---------------- LOGIN FLOW (UI) ----------------
def _login_flow_reset_ui(self) -> None:
self._login_cursor = 0
self._login_url_opened = False
self.var_login_url.set("")
self.lbl_login_flow_status.config(text="Status: —")
self.lbl_login_flow_email.config(text="")
self._set_dot(self.login_flow_dot, self._login_flow_dot_id, "orange")
self._set_text(self.txt_login_flow, "")
def _login_flow_set_buttons(self, *, can_open: bool, can_check: bool, can_cancel: bool) -> None:
def set_state(btn: ttk.Button, enabled: bool) -> None:
try:
btn.config(state=("normal" if enabled else "disabled"))
except Exception:
pass
set_state(self.btn_login_open, can_open)
set_state(self.btn_login_copy, bool(self.var_login_url.get().strip()))
set_state(self.btn_login_check, can_check)
set_state(self.btn_login_close, can_cancel)
# stop — страховка, но если уже success/already_logged, можно тоже выключить не обязательно
try:
self.btn_login_stop.config(state="normal")
except Exception:
pass
def _login_flow_autopoll_start(self) -> None:
self._login_flow_active = True
self._login_poll_tick()
def _login_flow_autopoll_stop(self) -> None:
self._login_flow_active = False
if self._login_poll_after_id is not None:
try:
self.after_cancel(self._login_poll_after_id)
except Exception:
pass
self._login_poll_after_id = None
def _login_poll_tick(self) -> None:
if not self._login_flow_active:
return
def work():
view = self.ctrl.login_flow_poll(self._login_cursor)
self._login_cursor = int(view.cursor)
# indicator + status
self._set_dot(self.login_flow_dot, self._login_flow_dot_id, view.dot_color)
self.lbl_login_flow_status.config(text=f"Status: {view.status_text or ''}")
self.lbl_login_flow_email.config(text=(f"User: {view.email}" if view.email else ""))
if view.url:
self.var_login_url.set(view.url)
# buttons
self._login_flow_set_buttons(can_open=view.can_open, can_check=view.can_check, can_cancel=view.can_cancel)
# append cleaned lines
cleaned = self._clean_ui_lines(view.lines)
if cleaned:
self._append_text(self.txt_login_flow, cleaned + "\n")
# auto-open browser once when url appears
if (not self._login_url_opened) and view.url:
self._login_url_opened = True
try:
subprocess.Popen(["xdg-open", view.url])
except Exception:
pass
phase = (view.phase or "").strip().lower()
if (not view.alive) or phase in ("success", "failed", "cancelled", "already_logged"):
# Авто-обновляем баннер при успехе/уже залогинен
if phase in ("success", "already_logged"):
self.after(250, self.refresh_login_banner)
# и возвращаемся на main страницу VPN, чтобы UX был как у тебя на примере
self.after(500, lambda: self._show_vpn_page("main"))
# на терминале — стопаем polling
self._login_flow_autopoll_stop()
# в терминале делаем кнопки логина неактивными (как в твоём "идеальном" окне)
self._login_flow_set_buttons(can_open=False, can_check=False, can_cancel=False)
try:
self.btn_login_stop.config(state="disabled")
except Exception:
pass
self._safe(work, title="Login flow error")
if self._login_flow_active:
self._login_poll_after_id = self.after(250, self._login_poll_tick)
# ---------------- TOP AUTH BUTTON ----------------
def on_auth_button(self) -> None:
# decide based on current banner
def work():
view = self.ctrl.get_login_view()
if bool(view.logged_in):
self.on_logout()
else:
self.on_start_login()
self._safe(work, title="Auth error")
# ---------------- ACTIONS ----------------
def on_start_login(self) -> None:
def work():
self.ctrl.log_gui("Top Login clicked")
self._login_flow_reset_ui()
start = self.ctrl.login_flow_start()
# reflect start info
self._set_dot(self.login_flow_dot, self._login_flow_dot_id, start.dot_color)
self.lbl_login_flow_status.config(text=f"Status: {start.status_text or ''}")
self.lbl_login_flow_email.config(text=(f"User: {start.email}" if start.email else ""))
if start.url:
self.var_login_url.set(start.url)
cleaned = self._clean_ui_lines(start.lines)
if cleaned:
self._append_text(self.txt_login_flow, cleaned + "\n")
# already logged: banner update and stop
phase = (start.phase or "").strip().lower()
if phase == "already_logged":
self.refresh_login_banner()
messagebox.showinfo("Login", f"Already logged in{f' as {start.email}' if start.email else ''}.")
return
# show login page and start polling
self._show_vpn_page("login")
self._login_cursor = int(start.cursor or 0)
self._login_flow_set_buttons(can_open=start.can_open, can_check=start.can_check, can_cancel=start.can_cancel)
self._login_flow_autopoll_start()
self._safe(work, title="Login start error")
def on_login_back(self) -> None:
self._login_flow_autopoll_stop()
self._show_vpn_page("main")
self.refresh_login_banner()
def on_login_copy(self) -> None:
u = self.var_login_url.get().strip()
if not u:
return
try:
self.master.clipboard_clear()
self.master.clipboard_append(u)
except Exception:
pass
def on_login_open(self) -> None:
def work():
self.ctrl.login_flow_action("open")
u = self.var_login_url.get().strip()
if u:
try:
subprocess.Popen(["xdg-open", u])
except Exception:
pass
self.ctrl.log_gui("Login flow: open")
self._safe(work, title="Login open error")
def on_login_check(self) -> None:
def work():
self.ctrl.login_flow_action("check")
self.ctrl.log_gui("Login flow: check")
self._safe(work, title="Login check error")
def on_login_cancel(self) -> None:
def work():
self.ctrl.login_flow_action("cancel")
self.ctrl.log_gui("Login flow: cancel")
self._safe(work, title="Login cancel error")
def on_login_stop(self) -> None:
def work():
self.ctrl.login_flow_stop()
self.ctrl.log_gui("Login flow: stop")
self._login_flow_autopoll_stop()
self.after(250, self.refresh_login_banner)
self._safe(work, title="Login stop error")
def on_logout(self) -> None:
def work():
if not messagebox.askyesno("Logout", "Logout from AdGuard VPN account?"):
return
res = self.ctrl.vpn_logout()
self.ctrl.log_gui("VPN logout executed")
messagebox.showinfo("Logout", res.pretty_text.strip() or "Done.")
self.refresh_login_banner()
self.refresh_vpn_tab()
self._safe(work, title="Logout error")
def on_toggle_autoconnect(self) -> None:
def work():
enable = bool(self.var_autoconnect.get())
res = self.ctrl.vpn_set_autoconnect(enable)
self._set_text(self.txt_vpn, res.pretty_text)
self.ctrl.log_gui(f"Auto-connect set to {enable}")
self._safe(work, title="Auto-connect error")
def on_set_location(self) -> None:
def work():
val = self.cmb_location.get().strip()
if not val:
messagebox.showinfo("Location", "Choose a location first.")
return
iso = val.split("", 1)[0].strip()
res = self.ctrl.vpn_set_location(iso)
self._set_text(self.txt_vpn, res.pretty_text)
self.ctrl.log_gui(f"Location set to {iso}")
self.refresh_vpn_tab()
self._safe(work, title="Set location error")
def on_routes_action(self, action: str) -> None:
def work():
res = self.ctrl.routes_service_action(action)
self._set_text(self.txt_routes, res.pretty_text)
self.ctrl.log_gui(f"Routes service: {action}")
self.refresh_status_tab()
self._safe(work, title="Routes service error")
def on_routes_clear(self) -> None:
def work():
res = self.ctrl.routes_clear()
self._set_text(self.txt_routes, res.pretty_text)
self.ctrl.log_gui("Routes cleared")
self.refresh_status_tab()
self._safe(work, title="Clear routes error")
def on_toggle_timer(self) -> None:
def work():
enabled = bool(self.var_timer.get())
res = self.ctrl.routes_timer_set(enabled)
self._set_text(self.txt_routes, res.pretty_text)
self.ctrl.log_gui(f"Routes timer set to {enabled}")
self.refresh_status_tab()
self._safe(work, title="Timer error")
def on_fix_policy_route(self) -> None:
def work():
res = self.ctrl.routes_fix_policy_route()
self._set_text(self.txt_routes, res.pretty_text)
self.ctrl.log_gui("Policy route fix executed")
self.refresh_status_tab()
self._safe(work, title="Fix policy route error")
def on_save_upstreams(self) -> None:
def work():
cfg = DnsUpstreams(
default1=self.ent_def1.get().strip(),
default2=self.ent_def2.get().strip(),
meta1=self.ent_meta1.get().strip(),
meta2=self.ent_meta2.get().strip(),
)
self.ctrl.dns_upstreams_save(cfg)
self.ctrl.log_gui("DNS upstreams saved")
messagebox.showinfo("DNS", "Saved.")
self.refresh_dns_tab()
self._safe(work, title="Save upstreams error")
def on_smartdns_action(self, action: str) -> None:
def work():
_res = self.ctrl.smartdns_service_action(action)
self.ctrl.log_gui(f"SmartDNS action: {action}")
self.refresh_dns_tab()
self.refresh_trace_tab()
self._safe(work, title="SmartDNS error")
def on_save_wildcards(self) -> None:
def work():
raw = self.txt_wildcards.get("1.0", "end")
domains = [x.strip() for x in raw.splitlines() if x.strip()]
self.ctrl.smartdns_wildcards_save(domains)
self.ctrl.log_gui(f"Wildcards saved: {len(domains)}")
messagebox.showinfo("SmartDNS", "Wildcards saved.")
self.refresh_dns_tab()
self._safe(work, title="Save wildcards error")
def on_domains_load(self) -> None:
def work():
name = self._get_selected_domains_file()
f = self.ctrl.domains_file_load(name)
content = f.content or ""
source = getattr(f, "source", "") or "file"
if not content:
path = f"/etc/selective-vpn/domains/{name}.txt"
content = self._read_local_file(path)
if content:
source = f"{source}+fallback" if source else "fallback"
self._set_text(self.txt_domains, content)
self.lbl_domains_info.config(text=f"{name} (source: {source})")
self.ctrl.log_gui(f"Domains file loaded: {name} source={source}")
self._safe(work, title="Load domains file error")
def on_domains_save(self) -> None:
def work():
name = self._get_selected_domains_file()
content = self.txt_domains.get("1.0", "end")
self.ctrl.domains_file_save(name, content)
self.ctrl.log_gui(f"Domains file saved: {name}")
messagebox.showinfo("Domains", "Saved.")
self.refresh_status_tab()
self._safe(work, title="Save domains file error")
def on_load_agvpn_table(self) -> None:
path = "/var/lib/selective-vpn/last-ips-map.txt"
data = self._read_local_file(path)
self._set_text(self.txt_domains, data or "(empty)")
self.lbl_domains_info.config(text=f"AGVPN table: {path}")
def on_load_smartdns_table(self) -> None:
path = "/etc/selective-vpn/smartdns.conf"
data = self._read_local_file(path)
self._set_text(self.txt_domains, data or "(empty)")
self.lbl_domains_info.config(text=f"SmartDNS table: {path}")
# ---------------- CLOSE HANDLER ----------------
def _on_close(self) -> None:
def work():
if self._login_flow_active:
try:
self.ctrl.login_flow_action("cancel")
except Exception:
pass
try:
self.ctrl.login_flow_stop()
except Exception:
pass
self._login_flow_autopoll_stop()
try:
work()
finally:
self.master.destroy()
def main() -> int:
client = ApiClient.from_env()
ctrl = DashboardController(client)
root = tk.Tk()
try:
root.minsize(900, 650)
except Exception:
pass
try:
style = ttk.Style()
if "clam" in style.theme_names():
style.theme_use("clam")
except Exception:
pass
_app = App(root, ctrl)
root.mainloop()
return 0
if __name__ == "__main__":
raise SystemExit(main())