diff --git a/.gitignore b/.gitignore index f73d11e..549922e 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,5 @@ selective-vpn-gui/main.go *.tmp selective-vpn-api/works +# Local archive / old copies (kept out of repo root) +_legacy/ diff --git a/selective-vpn-gui/agvpn-resolver.py b/selective-vpn-gui/agvpn-resolver.py deleted file mode 100755 index 2d87496..0000000 --- a/selective-vpn-gui/agvpn-resolver.py +++ /dev/null @@ -1,645 +0,0 @@ -#!/usr/bin/env python3 -import argparse -import concurrent.futures -import json -import os -import sys -import time -from collections import defaultdict - -# --- dnspython -------------------------------------------------------- -try: - import dns.resolver - import dns.reversename - import dns.exception -except ImportError as e: - print(f"[resolver] dnspython is required: {e}", file=sys.stderr) - sys.exit(2) - -# -------------------------------------------------------------------- -# Общий DNS-конфиг -# -------------------------------------------------------------------- -DNS_CONFIG_PATH = "/etc/selective-vpn/dns-upstreams.conf" - -DEFAULT_DNS_DEFAULT = ["94.140.14.14", "94.140.15.15"] -DEFAULT_DNS_META = ["46.243.231.30", "46.243.231.41"] - -DNS_DEFAULT = DEFAULT_DNS_DEFAULT.copy() -DNS_META = DEFAULT_DNS_META.copy() - - -# -------------------------------------------------------------------- -# helpers -# -------------------------------------------------------------------- -def log(msg, trace_log=None): - line = f"[resolver] {msg}" - print(line, file=sys.stderr) - if trace_log: - try: - with open(trace_log, "a") as f: - f.write(line + "\n") - except Exception: - pass - - -def is_private_ipv4(ip: str) -> bool: - """ - ip может быть "A.B.C.D" или "A.B.C.D/nn". - Возвращаем True, если адрес из приватных диапазонов. - """ - parts = ip.split("/") - base = parts[0] - try: - o1, o2, o3, o4 = map(int, base.split(".")) - except ValueError: - return True - - if o1 == 10: - return True - if o1 == 127: - return True - if o1 == 0: - return True - if o1 == 192 and o2 == 168: - return True - if o1 == 172 and 16 <= o2 <= 31: - return True - return False - - -def load_list(path): - if not os.path.exists(path): - return [] - out = [] - with open(path, "r") as f: - for line in f: - s = line.strip() - if not s or s.startswith("#"): - continue - out.append(s) - return out - - -def load_cache(path): - if not os.path.exists(path): - return {} - try: - with open(path, "r") as f: - return json.load(f) - except Exception: - return {} - - -def save_cache(path, data): - tmp = path + ".tmp" - try: - with open(tmp, "w") as f: - json.dump(data, f, indent=2, sort_keys=True) - os.replace(tmp, path) - except Exception: - pass - - -def split_dns(dns: str): - """ - Разбор записи вида: - "1.2.3.4" -> ("1.2.3.4", None) - "1.2.3.4#6053" -> ("1.2.3.4", "6053") - """ - if "#" in dns: - host, port = dns.split("#", 1) - host = host.strip() - port = port.strip() - if not host: - host = "127.0.0.1" - if not port: - port = "53" - return host, port - return dns, None - - -# -------------------------------------------------------------------- -# dnspython-резолвы -# -------------------------------------------------------------------- -def dig_a(host, dns_list, timeout=3): - """ - A-резолв через dnspython. - dns_list: либо строка "IP[#PORT]", либо список таких строк. - """ - if isinstance(dns_list, str): - dns_list = [dns_list] - - ips = [] - - for entry in dns_list: - server, port = split_dns(entry) - if not server: - continue - - r = dns.resolver.Resolver(configure=False) - r.nameservers = [server] - if port: - try: - r.port = int(port) - except ValueError: - r.port = 53 - r.timeout = timeout - r.lifetime = timeout - - try: - answer = r.resolve(host, "A") - except dns.exception.DNSException: - continue - except Exception: - continue - - for rr in answer: - s = rr.to_text().strip() - parts = s.split(".") - if len(parts) != 4: - continue - if all(p.isdigit() and 0 <= int(p) <= 255 for p in parts): - if not is_private_ipv4(s) and s not in ips: - ips.append(s) - - return ips - - -def dig_ptr(ip, upstream, timeout=3): - """ - PTR-резолв: ip -> список имён. - dns может быть "IP" или "IP#PORT". - """ - server, port = split_dns(upstream) - if not server: - return [] - - r = dns.resolver.Resolver(configure=False) - r.nameservers = [server] - if port: - try: - r.port = int(port) - except ValueError: - r.port = 53 - r.timeout = timeout - r.lifetime = timeout - - try: - rev = dns.reversename.from_address(ip) - except Exception: - return [] - - try: - answer = r.resolve(rev, "PTR") - except dns.exception.DNSException: - return [] - except Exception: - return [] - - names = [] - for rr in answer: - s = rr.to_text().strip() - if s.endswith("."): - s = s[:-1] - if s: - names.append(s.lower()) - return names - - -# -------------------------------------------------------------------- -# Загрузка DNS-конфига -# -------------------------------------------------------------------- -def load_dns_config(path=DNS_CONFIG_PATH, trace_log=None): - """ - Читает /etc/selective-vpn/dns-upstreams.conf и обновляет - глобальные DNS_DEFAULT / DNS_META. - - Формат строк: - default 1.2.3.4 5.6.7.8 - meta 9.9.9.9 8.8.8.8 - Можно использовать "ip#port", например 127.0.0.1#6053. - """ - global DNS_DEFAULT, DNS_META - - if not os.path.exists(path): - DNS_DEFAULT = DEFAULT_DNS_DEFAULT.copy() - DNS_META = DEFAULT_DNS_META.copy() - log( - f"dns-config: {path} not found, fallback to built-in defaults " - f"(default={DNS_DEFAULT}, meta={DNS_META})", - trace_log, - ) - return - - dflt = [] - meta = [] - - try: - with open(path, "r") as f: - for line in f: - s = line.strip() - if not s or s.startswith("#"): - continue - parts = s.split() - if len(parts) < 2: - continue - key = parts[0].lower() - addrs = parts[1:] - if key == "default": - dflt.extend(addrs) - elif key == "meta": - meta.extend(addrs) - except Exception as e: - DNS_DEFAULT = DEFAULT_DNS_DEFAULT.copy() - DNS_META = DEFAULT_DNS_META.copy() - log( - f"dns-config: failed to read {path}: {e}, fallback to built-in defaults " - f"(default={DNS_DEFAULT}, meta={DNS_META})", - trace_log, - ) - return - - if not dflt: - dflt = DEFAULT_DNS_DEFAULT.copy() - log( - "dns-config: no 'default' section, fallback to built-in for default", - trace_log, - ) - if not meta: - meta = DEFAULT_DNS_META.copy() - log("dns-config: no 'meta' section, fallback to built-in for meta", trace_log) - - DNS_DEFAULT = dflt - DNS_META = meta - log( - f"dns-config: accept {path}: " - f"default={', '.join(DNS_DEFAULT)}; meta={', '.join(DNS_META)}", - trace_log, - ) - - -def resolve_host(host, meta_special, trace_log=None): - """ - Forward-резолв одного домена (A-записи). - DNS берём из DNS_DEFAULT / DNS_META, которые загрузил load_dns_config(). - """ - if host in meta_special: - dns_list = DNS_META - else: - dns_list = DNS_DEFAULT - - ips = dig_a(host, dns_list) - - uniq = [] - for ip in ips: - if ip not in uniq: - uniq.append(ip) - - if uniq: - log(f"{host}: {', '.join(uniq)}", trace_log) - else: - log(f"{host}: no IPs", trace_log) - return uniq - - -def parse_static_entries(static_lines): - """ - static_lines — строки из static-ips.txt. - Возвращаем список кортежей (ip_entry, base_ip, comment). - """ - entries = [] - for line in static_lines: - s = line.strip() - if not s or s.startswith("#"): - continue - - if "#" in s: - ip_part, comment = s.split("#", 1) - ip_part = ip_part.strip() - comment = comment.strip() - else: - ip_part = s - comment = "" - - if not ip_part: - continue - if is_private_ipv4(ip_part): - continue - - base_ip = ip_part.split("/", 1)[0] - entries.append((ip_part, base_ip, comment)) - return entries - - -def resolve_static_entries(static_entries, ptr_cache, ttl_sec, trace_log=None): - """ - static_entries: список кортежей (ip_entry, base_ip, comment). - ip_entry — как в static-ips.txt (может быть с /mask) - base_ip — A.B.C.D (без маски) - comment — текст после # или "". - - Возвращаем dict: ip_entry -> список меток, - уже с префиксом '*' (чтобы можно было искать). - """ - now = int(time.time()) - result = {} - for ip_entry, base_ip, comment in static_entries: - labels = [] - # 1) если есть комментарий — он главнее всего - if comment: - labels.append(f"*{comment}") - # 2) если комментария нет, пробуем PTR (с кэшем) - if not comment: - cache_entry = ptr_cache.get(base_ip) - names = [] - if ( - cache_entry - and isinstance(cache_entry, dict) - and isinstance(cache_entry.get("last_resolved"), (int, float)) - ): - age = now - cache_entry["last_resolved"] - cached_names = cache_entry.get("names") or [] - if age <= ttl_sec and cached_names: - names = cached_names - if not names: - # PTR через те же DNS, что и обычный трафик (используем первый из default) - dns_for_ptr = DNS_DEFAULT[0] if DNS_DEFAULT else DEFAULT_DNS_DEFAULT[0] - - try: - names = dig_ptr(base_ip, dns_for_ptr) or [] - except Exception as e: - log( - f"PTR failed for {base_ip} (using {dns_for_ptr}): " - f"{type(e).__name__}: {e}", - trace_log, - ) - names = [] - - uniq_names = [] - for n in names: - if n not in uniq_names: - uniq_names.append(n) - names = uniq_names - ptr_cache[base_ip] = { - "names": names, - "last_resolved": now, - } - for n in names: - labels.append(f"*{n}") - # 3) если вообще ничего нет — ставим общий тег - if not labels: - labels = ["*[STATIC-IP]"] - result[ip_entry] = labels - log(f"static {ip_entry}: labels={', '.join(labels)}", trace_log) - return result - - -# -------------------------------------------------------------------- -# API-слой: одна чистая функция, которую легко вызвать откуда угодно -# -------------------------------------------------------------------- -def run_resolver_job( - *, - domains, - meta_special, - static_lines, - cache_path, - ptr_cache_path, - ttl_sec, - workers, - trace_log=None, -): - """ - Главный API резолвера. - - Вход: - domains — список доменов - meta_special — set() доменов из meta-special.txt - static_lines — строки из static-ips.txt - cache_path — путь к domain-cache.json - ptr_cache_path— путь к ptr-cache.json - ttl_sec — TTL кэша доменов / PTR - workers — число потоков - trace_log — путь к trace.log (или None) - - Выход: dict с ключами: - ips — отсортированный список IP/подсетей - ip_map — список (ip, label) пар (домен или *LABEL) - domain_cache — обновлённый кэш доменов - ptr_cache — обновлённый PTR-кэш - summary — статистика (dict) - """ - # --- подгружаем DNS-конфиг --- - load_dns_config(DNS_CONFIG_PATH, trace_log) - - meta_special = set(meta_special or []) - - log(f"domains to resolve: {len(domains)}", trace_log) - - # --- кэши --- - domain_cache = load_cache(cache_path) - ptr_cache = load_cache(ptr_cache_path) - now = int(time.time()) - - # --- разруливаем: что берём из domain_cache, что резолвим --- - fresh_from_cache = {} - to_resolve = [] - - for d in domains: - entry = domain_cache.get(d) - if entry and isinstance(entry, dict): - ts = entry.get("last_resolved") or 0 - ips = entry.get("ips") or [] - if isinstance(ts, (int, float)) and isinstance(ips, list) and ips: - if now - ts <= ttl_sec: - valid_ips = [ip for ip in ips if not is_private_ipv4(ip)] - if valid_ips: - fresh_from_cache[d] = valid_ips - continue - - to_resolve.append(d) - - log( - f"from cache: {len(fresh_from_cache)}, to resolve: {len(to_resolve)}", - trace_log, - ) - - resolved = dict(fresh_from_cache) - - total_domains = len(domains) - cache_hits = len(fresh_from_cache) - resolved_now = 0 - unresolved = 0 - - # --- параллельный резолв доменов --- - if to_resolve: - with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as ex: - fut2host = { - ex.submit(resolve_host, d, meta_special, trace_log): d - for d in to_resolve - } - for fut in concurrent.futures.as_completed(fut2host): - d = fut2host[fut] - try: - ips = fut.result() - except Exception as e: - log(f"{d}: resolver exception: {e}", trace_log) - ips = [] - - if ips: - resolved[d] = ips - domain_cache[d] = { - "ips": ips, - "last_resolved": now, - } - resolved_now += 1 - else: - unresolved += 1 - - # --- читаем static-ips и готовим список для PTR --- - static_entries = parse_static_entries(static_lines) - log(f"static entries: {len(static_entries)}", trace_log) - - # --- PTR/labels для static-ips --- - static_label_map = resolve_static_entries( - static_entries, ptr_cache, ttl_sec, trace_log - ) - - # --- собираем общий список IP и map --- - ip_set = set() - ip_to_domains = defaultdict(set) - - # доменные IP - for d, ips in resolved.items(): - for ip in ips: - ip_set.add(ip) - ip_to_domains[ip].add(d) - - # статические IP / сети - for ip_entry, _, _ in static_entries: - ip_set.add(ip_entry) - for label in static_label_map.get(ip_entry, []): - ip_to_domains[ip_entry].add(label) - - unique_ip_count = len(ip_set) - if unique_ip_count == 0: - log("no IPs resolved at all", trace_log) - else: - log(f"resolver done: {unique_ip_count} unique IPs", trace_log) - - ips_sorted = sorted(ip_set) - - # flatten ip_map - ip_map_pairs = [] - for ip in ips_sorted: - for dom in sorted(ip_to_domains[ip]): - ip_map_pairs.append((ip, dom)) - - summary = { - "domains_total": total_domains, - "from_cache": cache_hits, - "resolved_now": resolved_now, - "unresolved": unresolved, - "static_entries": len(static_entries), - "unique_ips": unique_ip_count, - } - - log( - "summary: domains=%d, from_cache=%d, resolved_now=%d, " - "unresolved=%d, static_entries=%d, unique_ips=%d" - % ( - summary["domains_total"], - summary["from_cache"], - summary["resolved_now"], - summary["unresolved"], - summary["static_entries"], - summary["unique_ips"], - ), - trace_log, - ) - - return { - "ips": ips_sorted, - "ip_map": ip_map_pairs, - "domain_cache": domain_cache, - "ptr_cache": ptr_cache, - "summary": summary, - } - - -# -------------------------------------------------------------------- -# CLI-обёртка вокруг API-функции (для bash-скрипта) -# -------------------------------------------------------------------- -def main(): - ap = argparse.ArgumentParser() - ap.add_argument("--domains", required=True, help="file with domains (one per line)") - ap.add_argument("--output-ips", required=True, help="file to write unique IPs") - ap.add_argument( - "--output-map", - required=True, - help="file to write IPdomain map", - ) - ap.add_argument("--meta-file", required=True, help="meta-special.txt path") - ap.add_argument("--static-ips", required=True, help="static-ips.txt path") - ap.add_argument("--cache", required=True, help="domain-cache.json path") - ap.add_argument("--ptr-cache", required=True, help="ptr-cache.json path") - ap.add_argument("--trace-log", default=None) - ap.add_argument("--workers", type=int, default=40) - ap.add_argument("--ttl-sec", type=int, default=24 * 3600) - args = ap.parse_args() - - trace_log = args.trace_log - - try: - # входные данные для API-функции - domains = load_list(args.domains) - meta_special = load_list(args.meta_file) - - static_lines = [] - if os.path.exists(args.static_ips): - with open(args.static_ips, "r") as f: - static_lines = f.read().splitlines() - - job_result = run_resolver_job( - domains=domains, - meta_special=meta_special, - static_lines=static_lines, - cache_path=args.cache, - ptr_cache_path=args.ptr_cache, - ttl_sec=args.ttl_sec, - workers=args.workers, - trace_log=trace_log, - ) - - ips_sorted = job_result["ips"] - ip_map_pairs = job_result["ip_map"] - domain_cache = job_result["domain_cache"] - ptr_cache = job_result["ptr_cache"] - - # output-ips: по одному IP/подсети - with open(args.output_ips, "w") as f: - for ip in ips_sorted: - f.write(ip + "\n") - - # output-map: IPдомен/метка - with open(args.output_map, "w") as f: - for ip, dom in ip_map_pairs: - f.write(f"{ip}\t{dom}\n") - - # сохраняем кэши - save_cache(args.cache, domain_cache) - save_cache(args.ptr_cache, ptr_cache) - - return 0 - - except Exception as e: - # настоящий фатал - log(f"FATAL resolver error: {e}", trace_log) - import traceback - - traceback.print_exc(file=sys.stderr) - return 2 - - -if __name__ == "__main__": - raise SystemExit(main()) - diff --git a/selective-vpn-gui/vpn-dashboard.py b/selective-vpn-gui/vpn-dashboard.py deleted file mode 100755 index ebf88ab..0000000 --- a/selective-vpn-gui/vpn-dashboard.py +++ /dev/null @@ -1,901 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -Selective-VPN Dashboard (UI only) - -RULES: -- This file must NOT know anything about REST paths, HTTP methods, or JSON keys. -- It talks ONLY to DashboardController (which uses ApiClient). -""" - -from __future__ import annotations - -import re -import subprocess -import sys -import tkinter as tk -from tkinter import messagebox -from tkinter import ttk -from typing import Literal, Optional, cast, Tuple - -from api_client import ApiClient, DnsUpstreams -from dashboard_controller import DashboardController - -TraceMode = Literal["full", "gui", "smartdns"] - -# убираем спам автопроверки из логов UI (на всякий случай, даже если почистил controller) -_NEXT_CHECK_RE = re.compile(r"(?:\b\d+s\.)?\s*Next check in\s+\d+s\.?", re.IGNORECASE) - - -class App(ttk.Frame): - def __init__(self, master: tk.Tk, ctrl: DashboardController) -> None: - super().__init__(master) - self.master = master - self.ctrl = ctrl - - # login-flow runtime - self._login_flow_active: bool = False - self._login_cursor: int = 0 - self._login_url_opened: bool = False - self._login_poll_after_id: Optional[str] = None - - self._build_ui() - self._wire_events() - - self.after(50, self.refresh_everything) - self.master.protocol("WM_DELETE_WINDOW", self._on_close) - - # ---------------- UI BUILD ---------------- - - def _build_ui(self) -> None: - self.master.title("Selective-VPN Dashboard") - self.pack(fill="both", expand=True) - - # Top bar - top = ttk.Frame(self) - top.pack(fill="x", padx=10, pady=(10, 6)) - - self.btn_refresh = ttk.Button(top, text="Refresh all", command=self.refresh_everything) - self.btn_refresh.pack(side="left") - - # Login indicator (dot + text) - self.login_dot = tk.Canvas(top, width=12, height=12, highlightthickness=0) - self.login_dot.pack(side="left", padx=(12, 4)) - self._login_dot_id = self.login_dot.create_oval(2, 2, 10, 10, fill="gray", outline="") - - self.lbl_login = ttk.Label(top, text="AdGuard VPN: ...", font=("TkDefaultFont", 10, "bold")) - self.lbl_login.pack(side="left", padx=(0, 10)) - - # Single auth button (Login/Logout) - self.btn_auth = ttk.Button(top, text="Login", command=self.on_auth_button) - self.btn_auth.pack(side="left") - - self.lbl_hint = ttk.Label(top, text="(GUI contains no API logic)", foreground="gray") - self.lbl_hint.pack(side="right") - - # Notebook - self.nb = ttk.Notebook(self) - self.nb.pack(fill="both", expand=True, padx=10, pady=(0, 10)) - - self._build_tab_status() - self._build_tab_vpn() - self._build_tab_routes() - self._build_tab_dns() - self._build_tab_domains() - self._build_tab_trace() - - def _build_tab_status(self) -> None: - tab = ttk.Frame(self.nb) - self.nb.add(tab, text="Status") - - frm = ttk.Frame(tab) - frm.pack(fill="both", expand=True, padx=10, pady=10) - - grid = ttk.Frame(frm) - grid.pack(fill="x") - - def row(r: int, label: str) -> ttk.Label: - ttk.Label(grid, text=label).grid(row=r, column=0, sticky="w", pady=2) - v = ttk.Label(grid, text="—") - v.grid(row=r, column=1, sticky="w", pady=2, padx=(10, 0)) - return v - - self.st_timestamp = row(0, "Timestamp") - self.st_counts = row(1, "Counts") - self.st_iface = row(2, "Iface/Table/Mark") - self.st_route = row(3, "Policy route") - self.st_routesvc = row(4, "Routes service") - self.st_smartdns = row(5, "SmartDNS service") - self.st_vpnsvc = row(6, "VPN service") - - btns = ttk.Frame(frm) - btns.pack(fill="x", pady=(10, 0)) - ttk.Button(btns, text="Refresh status", command=self.refresh_status_tab).pack(side="left") - - def _build_tab_vpn(self) -> None: - tab = ttk.Frame(self.nb) - self.nb.add(tab, text="AdGuardVPN") - - # Pages container - self.vpn_pages = ttk.Frame(tab) - self.vpn_pages.pack(fill="both", expand=True, padx=10, pady=10) - - self.vpn_page_main = ttk.Frame(self.vpn_pages) - self.vpn_page_login = ttk.Frame(self.vpn_pages) - - for p in (self.vpn_page_main, self.vpn_page_login): - p.grid(row=0, column=0, sticky="nsew") - self.vpn_pages.rowconfigure(0, weight=1) - self.vpn_pages.columnconfigure(0, weight=1) - - # -------- Page 1: main VPN controls (Enter Login removed) -------- - frm = self.vpn_page_main - - top_actions = ttk.Frame(frm) - top_actions.pack(fill="x", pady=(0, 10)) - ttk.Button(top_actions, text="Refresh", command=self.refresh_vpn_tab).pack(side="right") - - # Autoconnect toggle - ac = ttk.LabelFrame(frm, text="Auto-connect") - ac.pack(fill="x") - - self.var_autoconnect = tk.BooleanVar(value=False) - self.chk_autoconnect = ttk.Checkbutton( - ac, - text="Enable auto-connect", - variable=self.var_autoconnect, - command=self.on_toggle_autoconnect, - ) - self.chk_autoconnect.pack(side="left", padx=10, pady=8) - - # Location picker - loc = ttk.LabelFrame(frm, text="Location") - loc.pack(fill="x", pady=(10, 0)) - - self.cmb_location = ttk.Combobox(loc, state="readonly", width=40) - self.cmb_location.pack(side="left", padx=10, pady=8) - self.btn_set_location = ttk.Button(loc, text="Set location", command=self.on_set_location) - self.btn_set_location.pack(side="left", padx=6, pady=8) - - self.lbl_vpn_desired = ttk.Label(loc, text="Desired: —", foreground="gray") - self.lbl_vpn_desired.pack(side="left", padx=12) - - # Status output - st = ttk.LabelFrame(frm, text="VPN Status") - st.pack(fill="both", expand=True, pady=(10, 0)) - - self.txt_vpn = tk.Text(st, height=12, wrap="none") - self.txt_vpn.pack(fill="both", expand=True, padx=10, pady=10) - - # -------- Page 2: Login flow -------- - lf = self.vpn_page_login - - lf_top = ttk.Frame(lf) - lf_top.pack(fill="x", pady=(0, 10)) - - ttk.Button(lf_top, text="← Back", command=self.on_login_back).pack(side="left") - - self.login_flow_dot = tk.Canvas(lf_top, width=14, height=14, highlightthickness=0) - self.login_flow_dot.pack(side="left", padx=(10, 4)) - self._login_flow_dot_id = self.login_flow_dot.create_oval(2, 2, 12, 12, fill="orange", outline="") - - self.lbl_login_flow_status = ttk.Label(lf_top, text="Status: —", font=("TkDefaultFont", 10, "bold")) - self.lbl_login_flow_status.pack(side="left", padx=(0, 10)) - - self.lbl_login_flow_email = ttk.Label(lf_top, text="", foreground="gray") - self.lbl_login_flow_email.pack(side="left") - - url_row = ttk.Frame(lf) - url_row.pack(fill="x", pady=(0, 10)) - - ttk.Label(url_row, text="URL:").pack(side="left") - self.var_login_url = tk.StringVar(value="") - self.ent_login_url = ttk.Entry(url_row, textvariable=self.var_login_url, state="readonly") - self.ent_login_url.pack(side="left", fill="x", expand=True, padx=8) - - self.btn_login_copy = ttk.Button(url_row, text="Copy", command=self.on_login_copy) - self.btn_login_copy.pack(side="left", padx=(0, 6)) - - self.btn_login_open = ttk.Button(url_row, text="Open", command=self.on_login_open) - self.btn_login_open.pack(side="left") - - ctrl_row = ttk.Frame(lf) - ctrl_row.pack(fill="x", pady=(0, 10)) - - self.btn_login_check = ttk.Button(ctrl_row, text="Check", command=self.on_login_check) - self.btn_login_check.pack(side="left") - - self.btn_login_close = ttk.Button(ctrl_row, text="Close (cancel)", command=self.on_login_cancel) - self.btn_login_close.pack(side="left", padx=6) - - self.btn_login_stop = ttk.Button(ctrl_row, text="Stop (force)", command=self.on_login_stop) - self.btn_login_stop.pack(side="left", padx=6) - - # Log output - out = ttk.LabelFrame(lf, text="Login output") - out.pack(fill="both", expand=True) - - self.txt_login_flow = tk.Text(out, wrap="word", height=16) - self.txt_login_flow.pack(fill="both", expand=True, padx=10, pady=10) - - self._show_vpn_page("main") - - def _build_tab_routes(self) -> None: - tab = ttk.Frame(self.nb) - self.nb.add(tab, text="Routes") - - frm = ttk.Frame(tab) - frm.pack(fill="both", expand=True, padx=10, pady=10) - - svc = ttk.LabelFrame(frm, text="Routes service") - svc.pack(fill="x") - - ttk.Button(svc, text="Start", command=lambda: self.on_routes_action("start")).pack(side="left", padx=10, pady=8) - ttk.Button(svc, text="Stop", command=lambda: self.on_routes_action("stop")).pack(side="left", padx=6, pady=8) - ttk.Button(svc, text="Restart", command=lambda: self.on_routes_action("restart")).pack(side="left", padx=6, pady=8) - - ttk.Button(svc, text="Clear routes", command=self.on_routes_clear).pack(side="right", padx=10, pady=8) - - timer = ttk.LabelFrame(frm, text="Timer") - timer.pack(fill="x", pady=(10, 0)) - - self.var_timer = tk.BooleanVar(value=False) - self.chk_timer = ttk.Checkbutton(timer, text="Enable timer", variable=self.var_timer, command=self.on_toggle_timer) - self.chk_timer.pack(side="left", padx=10, pady=8) - - ttk.Button(timer, text="Fix policy route", command=self.on_fix_policy_route).pack(side="right", padx=10, pady=8) - - out = ttk.LabelFrame(frm, text="Output") - out.pack(fill="both", expand=True, pady=(10, 0)) - - self.txt_routes = tk.Text(out, height=12, wrap="none") - self.txt_routes.pack(fill="both", expand=True, padx=10, pady=10) - - def _build_tab_dns(self) -> None: - tab = ttk.Frame(self.nb) - self.nb.add(tab, text="DNS") - - frm = ttk.Frame(tab) - frm.pack(fill="both", expand=True, padx=10, pady=10) - - ups = ttk.LabelFrame(frm, text="Upstreams") - ups.pack(fill="x") - - def add_field(r: int, label: str) -> ttk.Entry: - ttk.Label(ups, text=label).grid(row=r, column=0, sticky="w", padx=10, pady=4) - e = ttk.Entry(ups, width=60) - e.grid(row=r, column=1, sticky="we", padx=10, pady=4) - return e - - ups.columnconfigure(1, weight=1) - self.ent_def1 = add_field(0, "default1") - self.ent_def2 = add_field(1, "default2") - self.ent_meta1 = add_field(2, "meta1") - self.ent_meta2 = add_field(3, "meta2") - - btns = ttk.Frame(frm) - btns.pack(fill="x", pady=(10, 0)) - ttk.Button(btns, text="Refresh", command=self.refresh_dns_tab).pack(side="left") - ttk.Button(btns, text="Save", command=self.on_save_upstreams).pack(side="left", padx=6) - - sm = ttk.LabelFrame(frm, text="SmartDNS") - sm.pack(fill="both", expand=True, pady=(10, 0)) - - top = ttk.Frame(sm) - top.pack(fill="x", padx=10, pady=(10, 6)) - - self.lbl_smartdns_state = ttk.Label(top, text="Service: —") - self.lbl_smartdns_state.pack(side="left") - - ttk.Button(top, text="Start", command=lambda: self.on_smartdns_action("start")).pack(side="right", padx=6) - ttk.Button(top, text="Stop", command=lambda: self.on_smartdns_action("stop")).pack(side="right") - - mid = ttk.Frame(sm) - mid.pack(fill="both", expand=True, padx=10, pady=(0, 10)) - - ttk.Label(mid, text="Wildcards (one per line):").pack(anchor="w") - self.txt_wildcards = tk.Text(mid, height=10, wrap="none") - self.txt_wildcards.pack(fill="both", expand=True, pady=(4, 6)) - - btns2 = ttk.Frame(mid) - btns2.pack(fill="x") - ttk.Button(btns2, text="Refresh", command=self.refresh_dns_tab).pack(side="left") - ttk.Button(btns2, text="Save", command=self.on_save_wildcards).pack(side="left", padx=6) - - def _build_tab_domains(self) -> None: - tab = ttk.Frame(self.nb) - self.nb.add(tab, text="Domains") - - frm = ttk.Frame(tab) - frm.pack(fill="both", expand=True, padx=10, pady=10) - - left = ttk.Frame(frm) - left.pack(side="left", fill="y") - - right = ttk.Frame(frm) - right.pack(side="left", fill="both", expand=True, padx=(10, 0)) - - ttk.Label(left, text="Files:").pack(anchor="w") - self.lst_files = tk.Listbox(left, height=6, exportselection=False) - for name in ("bases", "meta", "subs", "static"): - self.lst_files.insert("end", name) - self.lst_files.selection_set(0) - self.lst_files.pack(fill="y", pady=(4, 8)) - - ttk.Button(left, text="Refresh table", command=self.refresh_domains_tab).pack(fill="x") - ttk.Button(left, text="Load file", command=self.on_domains_load).pack(fill="x", pady=(6, 0)) - ttk.Button(left, text="Save file", command=self.on_domains_save).pack(fill="x", pady=(6, 0)) - ttk.Button(left, text="Load AGVPN table", command=self.on_load_agvpn_table).pack(fill="x", pady=(10, 0)) - ttk.Button(left, text="Load SmartDNS table", command=self.on_load_smartdns_table).pack(fill="x", pady=(6, 0)) - - top = ttk.Frame(right) - top.pack(fill="x") - self.lbl_domains_info = ttk.Label(top, text="—", foreground="gray") - self.lbl_domains_info.pack(side="left") - - self.txt_domains = tk.Text(right, wrap="none") - self.txt_domains.pack(fill="both", expand=True, pady=(6, 0)) - - def _build_tab_trace(self) -> None: - tab = ttk.Frame(self.nb) - self.nb.add(tab, text="Trace") - - frm = ttk.Frame(tab) - frm.pack(fill="both", expand=True, padx=10, pady=10) - - top = ttk.Frame(frm) - top.pack(fill="x") - - self.var_trace_mode = tk.StringVar(value="full") - for m, title in (("full", "Full"), ("gui", "GUI"), ("smartdns", "SmartDNS")): - ttk.Radiobutton(top, text=title, value=m, variable=self.var_trace_mode, command=self.refresh_trace_tab).pack( - side="left", padx=(0, 10) - ) - ttk.Button(top, text="Refresh", command=self.refresh_trace_tab).pack(side="right") - - self.txt_trace = tk.Text(frm, wrap="none") - self.txt_trace.pack(fill="both", expand=True, pady=(10, 0)) - - def _wire_events(self) -> None: - self.lst_files.bind("<>", lambda _e: self.on_domains_load()) - - # ---------------- UI HELPERS ---------------- - - def _set_text(self, widget: tk.Text, text: str) -> None: - widget.config(state="normal") - widget.delete("1.0", "end") - widget.insert("1.0", text) - widget.config(state="normal") - - def _append_text(self, widget: tk.Text, text: str) -> None: - widget.config(state="normal") - widget.insert("end", text) - widget.see("end") - widget.config(state="normal") - - def _clean_ui_lines(self, lines) -> str: - # финальная страховка: убираем "Next check" и нормализуем \r - buf = "\n".join([str(x) for x in (lines or [])]).replace("\r", "\n") - out_lines = [] - for ln in buf.splitlines(): - t = ln.strip() - if not t: - continue - t2 = _NEXT_CHECK_RE.sub("", t).strip() - if not t2: - continue - out_lines.append(t2) - return "\n".join(out_lines).rstrip() - - def _get_selected_domains_file(self) -> str: - sel = self.lst_files.curselection() - if not sel: - return "bases" - return str(self.lst_files.get(sel[0])) - - def _read_local_file(self, path: str) -> str: - try: - with open(path, "r", encoding="utf-8", errors="ignore") as f: - return f.read() - except Exception: - return "" - - def _safe(self, fn, *, title: str = "Error"): - try: - return fn() - except Exception as e: - messagebox.showerror(title, str(e)) - return None - - def _set_dot(self, canvas: tk.Canvas, dot_id: int, color: str) -> None: - c = (color or "").strip().lower() - if c in ("green", "ok", "true"): - fill = "green" - elif c in ("red", "error", "false"): - fill = "red" - elif c in ("orange", "yellow", "try", "unknown", "pending", "wait"): - fill = "orange" - else: - fill = "gray" - try: - canvas.itemconfigure(dot_id, fill=fill) - except Exception: - pass - - def _show_vpn_page(self, which: Literal["main", "login"]) -> None: - if which == "login": - self.vpn_page_login.tkraise() - else: - self.vpn_page_main.tkraise() - - def _parse_login_banner(self, text: str, color: str) -> Tuple[bool, str]: - # считаем "logged" если зеленый - is_logged = (color or "").strip().lower() == "green" - email = "" - t = (text or "") - # пытаемся вытащить email из строки - m = re.search(r"([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,})", t) - if m: - email = m.group(1) - return is_logged, email - - def _set_auth_button(self, logged: bool) -> None: - self.btn_auth.config(text=("Logout" if logged else "Login")) - - # ---------------- REFRESH ---------------- - - def refresh_everything(self) -> None: - self.refresh_status_tab() - self.refresh_vpn_tab() - self.refresh_routes_tab() - self.refresh_dns_tab() - self.refresh_domains_tab() - self.refresh_trace_tab() - self.refresh_login_banner() - - def refresh_login_banner(self) -> None: - def work(): - view = self.ctrl.get_login_view() - self.lbl_login.config(text=view.text) - self._set_dot(self.login_dot, self._login_dot_id, view.color) - - # НЕ гадаем по цвету: используем нормализованную логику controller-а - self._set_auth_button(bool(view.logged_in)) - - try: - self.lbl_login.config(foreground=view.color) - except tk.TclError: - pass - - self._safe(work, title="Login state error") - - def refresh_status_tab(self) -> None: - def work(): - view = self.ctrl.get_status_overview() - self.st_timestamp.config(text=view.timestamp) - self.st_counts.config(text=view.counts) - self.st_iface.config(text=view.iface_table_mark) - self.st_route.config(text=view.policy_route) - self.st_routesvc.config(text=view.routes_service) - self.st_smartdns.config(text=view.smartdns_service) - self.st_vpnsvc.config(text=view.vpn_service) - self._safe(work, title="Status error") - - def refresh_vpn_tab(self) -> None: - def work(): - locs = self.ctrl.vpn_locations_view() - self.cmb_location["values"] = [f"{x.iso} — {x.label}" for x in locs] - - st = self.ctrl.vpn_status_view() - self.lbl_vpn_desired.config(text=f"Desired: {st.desired_location or '—'}") - self._set_text(self.txt_vpn, st.pretty_text) - - self.var_autoconnect.set(self.ctrl.vpn_autoconnect_enabled()) - self._safe(work, title="VPN error") - - def refresh_routes_tab(self) -> None: - def work(): - self.var_timer.set(self.ctrl.routes_timer_enabled()) - self._safe(work, title="Routes error") - - def refresh_dns_tab(self) -> None: - def work(): - cfg = self.ctrl.dns_upstreams_view() - self.ent_def1.delete(0, "end"); self.ent_def1.insert(0, cfg.default1) - self.ent_def2.delete(0, "end"); self.ent_def2.insert(0, cfg.default2) - self.ent_meta1.delete(0, "end"); self.ent_meta1.insert(0, cfg.meta1) - self.ent_meta2.delete(0, "end"); self.ent_meta2.insert(0, cfg.meta2) - - sd = self.ctrl.smartdns_service_view() - self.lbl_smartdns_state.config(text=f"Service: {sd.state}") - - wc = self.ctrl.smartdns_wildcards_view() - self._set_text(self.txt_wildcards, "\n".join(wc.domains).strip() + ("\n" if wc.domains else "")) - self._safe(work, title="DNS error") - - def refresh_domains_tab(self) -> None: - def work(): - table = self.ctrl.domains_table_view() - self.lbl_domains_info.config(text=f"Table lines: {len(table.lines)}") - self._safe(work, title="Domains error") - - def refresh_trace_tab(self) -> None: - def work(): - mode = cast(TraceMode, self.var_trace_mode.get()) - dump = self.ctrl.trace_view(mode) - self._set_text(self.txt_trace, "\n".join(dump.lines).strip() + ("\n" if dump.lines else "")) - self._safe(work, title="Trace error") - - # ---------------- LOGIN FLOW (UI) ---------------- - - def _login_flow_reset_ui(self) -> None: - self._login_cursor = 0 - self._login_url_opened = False - self.var_login_url.set("") - self.lbl_login_flow_status.config(text="Status: —") - self.lbl_login_flow_email.config(text="") - self._set_dot(self.login_flow_dot, self._login_flow_dot_id, "orange") - self._set_text(self.txt_login_flow, "") - - def _login_flow_set_buttons(self, *, can_open: bool, can_check: bool, can_cancel: bool) -> None: - def set_state(btn: ttk.Button, enabled: bool) -> None: - try: - btn.config(state=("normal" if enabled else "disabled")) - except Exception: - pass - - set_state(self.btn_login_open, can_open) - set_state(self.btn_login_copy, bool(self.var_login_url.get().strip())) - set_state(self.btn_login_check, can_check) - set_state(self.btn_login_close, can_cancel) - - # stop — страховка, но если уже success/already_logged, можно тоже выключить не обязательно - try: - self.btn_login_stop.config(state="normal") - except Exception: - pass - - def _login_flow_autopoll_start(self) -> None: - self._login_flow_active = True - self._login_poll_tick() - - def _login_flow_autopoll_stop(self) -> None: - self._login_flow_active = False - if self._login_poll_after_id is not None: - try: - self.after_cancel(self._login_poll_after_id) - except Exception: - pass - self._login_poll_after_id = None - - def _login_poll_tick(self) -> None: - if not self._login_flow_active: - return - - def work(): - view = self.ctrl.login_flow_poll(self._login_cursor) - self._login_cursor = int(view.cursor) - - # indicator + status - self._set_dot(self.login_flow_dot, self._login_flow_dot_id, view.dot_color) - self.lbl_login_flow_status.config(text=f"Status: {view.status_text or '—'}") - self.lbl_login_flow_email.config(text=(f"User: {view.email}" if view.email else "")) - - if view.url: - self.var_login_url.set(view.url) - - # buttons - self._login_flow_set_buttons(can_open=view.can_open, can_check=view.can_check, can_cancel=view.can_cancel) - - # append cleaned lines - cleaned = self._clean_ui_lines(view.lines) - if cleaned: - self._append_text(self.txt_login_flow, cleaned + "\n") - - # auto-open browser once when url appears - if (not self._login_url_opened) and view.url: - self._login_url_opened = True - try: - subprocess.Popen(["xdg-open", view.url]) - except Exception: - pass - - phase = (view.phase or "").strip().lower() - if (not view.alive) or phase in ("success", "failed", "cancelled", "already_logged"): - # Авто-обновляем баннер при успехе/уже залогинен - if phase in ("success", "already_logged"): - self.after(250, self.refresh_login_banner) - # и возвращаемся на main страницу VPN, чтобы UX был как у тебя на примере - self.after(500, lambda: self._show_vpn_page("main")) - - # на терминале — стопаем polling - self._login_flow_autopoll_stop() - - # в терминале делаем кнопки логина неактивными (как в твоём "идеальном" окне) - self._login_flow_set_buttons(can_open=False, can_check=False, can_cancel=False) - try: - self.btn_login_stop.config(state="disabled") - except Exception: - pass - - self._safe(work, title="Login flow error") - - if self._login_flow_active: - self._login_poll_after_id = self.after(250, self._login_poll_tick) - - # ---------------- TOP AUTH BUTTON ---------------- - - def on_auth_button(self) -> None: - # decide based on current banner - def work(): - view = self.ctrl.get_login_view() - if bool(view.logged_in): - self.on_logout() - else: - self.on_start_login() - - self._safe(work, title="Auth error") - - # ---------------- ACTIONS ---------------- - - def on_start_login(self) -> None: - def work(): - self.ctrl.log_gui("Top Login clicked") - self._login_flow_reset_ui() - - start = self.ctrl.login_flow_start() - - # reflect start info - self._set_dot(self.login_flow_dot, self._login_flow_dot_id, start.dot_color) - self.lbl_login_flow_status.config(text=f"Status: {start.status_text or '—'}") - self.lbl_login_flow_email.config(text=(f"User: {start.email}" if start.email else "")) - - if start.url: - self.var_login_url.set(start.url) - - cleaned = self._clean_ui_lines(start.lines) - if cleaned: - self._append_text(self.txt_login_flow, cleaned + "\n") - - # already logged: banner update and stop - phase = (start.phase or "").strip().lower() - if phase == "already_logged": - self.refresh_login_banner() - messagebox.showinfo("Login", f"Already logged in{f' as {start.email}' if start.email else ''}.") - return - - # show login page and start polling - self._show_vpn_page("login") - self._login_cursor = int(start.cursor or 0) - self._login_flow_set_buttons(can_open=start.can_open, can_check=start.can_check, can_cancel=start.can_cancel) - self._login_flow_autopoll_start() - - self._safe(work, title="Login start error") - - def on_login_back(self) -> None: - self._login_flow_autopoll_stop() - self._show_vpn_page("main") - self.refresh_login_banner() - - def on_login_copy(self) -> None: - u = self.var_login_url.get().strip() - if not u: - return - try: - self.master.clipboard_clear() - self.master.clipboard_append(u) - except Exception: - pass - - def on_login_open(self) -> None: - def work(): - self.ctrl.login_flow_action("open") - u = self.var_login_url.get().strip() - if u: - try: - subprocess.Popen(["xdg-open", u]) - except Exception: - pass - self.ctrl.log_gui("Login flow: open") - self._safe(work, title="Login open error") - - def on_login_check(self) -> None: - def work(): - self.ctrl.login_flow_action("check") - self.ctrl.log_gui("Login flow: check") - self._safe(work, title="Login check error") - - def on_login_cancel(self) -> None: - def work(): - self.ctrl.login_flow_action("cancel") - self.ctrl.log_gui("Login flow: cancel") - self._safe(work, title="Login cancel error") - - def on_login_stop(self) -> None: - def work(): - self.ctrl.login_flow_stop() - self.ctrl.log_gui("Login flow: stop") - self._login_flow_autopoll_stop() - self.after(250, self.refresh_login_banner) - self._safe(work, title="Login stop error") - - def on_logout(self) -> None: - def work(): - if not messagebox.askyesno("Logout", "Logout from AdGuard VPN account?"): - return - res = self.ctrl.vpn_logout() - self.ctrl.log_gui("VPN logout executed") - messagebox.showinfo("Logout", res.pretty_text.strip() or "Done.") - self.refresh_login_banner() - self.refresh_vpn_tab() - self._safe(work, title="Logout error") - - def on_toggle_autoconnect(self) -> None: - def work(): - enable = bool(self.var_autoconnect.get()) - res = self.ctrl.vpn_set_autoconnect(enable) - self._set_text(self.txt_vpn, res.pretty_text) - self.ctrl.log_gui(f"Auto-connect set to {enable}") - self._safe(work, title="Auto-connect error") - - def on_set_location(self) -> None: - def work(): - val = self.cmb_location.get().strip() - if not val: - messagebox.showinfo("Location", "Choose a location first.") - return - iso = val.split("—", 1)[0].strip() - res = self.ctrl.vpn_set_location(iso) - self._set_text(self.txt_vpn, res.pretty_text) - self.ctrl.log_gui(f"Location set to {iso}") - self.refresh_vpn_tab() - self._safe(work, title="Set location error") - - def on_routes_action(self, action: str) -> None: - def work(): - res = self.ctrl.routes_service_action(action) - self._set_text(self.txt_routes, res.pretty_text) - self.ctrl.log_gui(f"Routes service: {action}") - self.refresh_status_tab() - self._safe(work, title="Routes service error") - - def on_routes_clear(self) -> None: - def work(): - res = self.ctrl.routes_clear() - self._set_text(self.txt_routes, res.pretty_text) - self.ctrl.log_gui("Routes cleared") - self.refresh_status_tab() - self._safe(work, title="Clear routes error") - - def on_toggle_timer(self) -> None: - def work(): - enabled = bool(self.var_timer.get()) - res = self.ctrl.routes_timer_set(enabled) - self._set_text(self.txt_routes, res.pretty_text) - self.ctrl.log_gui(f"Routes timer set to {enabled}") - self.refresh_status_tab() - self._safe(work, title="Timer error") - - def on_fix_policy_route(self) -> None: - def work(): - res = self.ctrl.routes_fix_policy_route() - self._set_text(self.txt_routes, res.pretty_text) - self.ctrl.log_gui("Policy route fix executed") - self.refresh_status_tab() - self._safe(work, title="Fix policy route error") - - def on_save_upstreams(self) -> None: - def work(): - cfg = DnsUpstreams( - default1=self.ent_def1.get().strip(), - default2=self.ent_def2.get().strip(), - meta1=self.ent_meta1.get().strip(), - meta2=self.ent_meta2.get().strip(), - ) - self.ctrl.dns_upstreams_save(cfg) - self.ctrl.log_gui("DNS upstreams saved") - messagebox.showinfo("DNS", "Saved.") - self.refresh_dns_tab() - self._safe(work, title="Save upstreams error") - - def on_smartdns_action(self, action: str) -> None: - def work(): - _res = self.ctrl.smartdns_service_action(action) - self.ctrl.log_gui(f"SmartDNS action: {action}") - self.refresh_dns_tab() - self.refresh_trace_tab() - self._safe(work, title="SmartDNS error") - - def on_save_wildcards(self) -> None: - def work(): - raw = self.txt_wildcards.get("1.0", "end") - domains = [x.strip() for x in raw.splitlines() if x.strip()] - self.ctrl.smartdns_wildcards_save(domains) - self.ctrl.log_gui(f"Wildcards saved: {len(domains)}") - messagebox.showinfo("SmartDNS", "Wildcards saved.") - self.refresh_dns_tab() - self._safe(work, title="Save wildcards error") - - def on_domains_load(self) -> None: - def work(): - name = self._get_selected_domains_file() - f = self.ctrl.domains_file_load(name) - content = f.content or "" - source = getattr(f, "source", "") or "file" - if not content: - path = f"/etc/selective-vpn/domains/{name}.txt" - content = self._read_local_file(path) - if content: - source = f"{source}+fallback" if source else "fallback" - self._set_text(self.txt_domains, content) - self.lbl_domains_info.config(text=f"{name} (source: {source})") - self.ctrl.log_gui(f"Domains file loaded: {name} source={source}") - self._safe(work, title="Load domains file error") - - def on_domains_save(self) -> None: - def work(): - name = self._get_selected_domains_file() - content = self.txt_domains.get("1.0", "end") - self.ctrl.domains_file_save(name, content) - self.ctrl.log_gui(f"Domains file saved: {name}") - messagebox.showinfo("Domains", "Saved.") - self.refresh_status_tab() - self._safe(work, title="Save domains file error") - - def on_load_agvpn_table(self) -> None: - path = "/var/lib/selective-vpn/last-ips-map.txt" - data = self._read_local_file(path) - self._set_text(self.txt_domains, data or "(empty)") - self.lbl_domains_info.config(text=f"AGVPN table: {path}") - - def on_load_smartdns_table(self) -> None: - path = "/etc/selective-vpn/smartdns.conf" - data = self._read_local_file(path) - self._set_text(self.txt_domains, data or "(empty)") - self.lbl_domains_info.config(text=f"SmartDNS table: {path}") - - # ---------------- CLOSE HANDLER ---------------- - - def _on_close(self) -> None: - def work(): - if self._login_flow_active: - try: - self.ctrl.login_flow_action("cancel") - except Exception: - pass - try: - self.ctrl.login_flow_stop() - except Exception: - pass - self._login_flow_autopoll_stop() - try: - work() - finally: - self.master.destroy() - - -def main() -> int: - client = ApiClient.from_env() - ctrl = DashboardController(client) - - root = tk.Tk() - try: - root.minsize(900, 650) - except Exception: - pass - - try: - style = ttk.Style() - if "clam" in style.theme_names(): - style.theme_use("clam") - except Exception: - pass - - _app = App(root, ctrl) - root.mainloop() - return 0 - - -if __name__ == "__main__": - raise SystemExit(main())