dns ui: compact tab + benchmark dialog and api endpoint

This commit is contained in:
beckline
2026-02-22 14:40:40 +03:00
parent 0b28586f31
commit a7ec4fe801
7 changed files with 1089 additions and 50 deletions

View File

@@ -90,7 +90,11 @@ class TrafficModeStatus:
desired_mode: str
applied_mode: str
preferred_iface: str
advanced_active: bool
auto_local_bypass: bool
auto_local_active: bool
ingress_reply_bypass: bool
ingress_reply_active: bool
bypass_candidates: int
force_vpn_subnets: List[str]
force_vpn_uids: List[str]
@@ -105,6 +109,8 @@ class TrafficModeStatus:
iface_reason: str
rule_mark: bool
rule_full: bool
ingress_rule_present: bool
ingress_nft_active: bool
table_default: bool
probe_ok: bool
probe_message: str
@@ -221,6 +227,38 @@ class DnsUpstreams:
meta2: str
@dataclass(frozen=True)
class DNSBenchmarkUpstream:
addr: str
enabled: bool = True
@dataclass(frozen=True)
class DNSBenchmarkResult:
upstream: str
attempts: int
ok: int
fail: int
nxdomain: int
timeout: int
temporary: int
other: int
avg_ms: int
p95_ms: int
score: float
color: str
@dataclass(frozen=True)
class DNSBenchmarkResponse:
results: List[DNSBenchmarkResult]
domains_used: List[str]
timeout_ms: int
attempts_per_domain: int
recommended_default: List[str]
recommended_meta: List[str]
@dataclass(frozen=True)
class SmartdnsServiceState:
state: str
@@ -654,7 +692,11 @@ class ApiClient:
desired_mode=str(data.get("desired_mode") or data.get("mode") or "selective"),
applied_mode=str(data.get("applied_mode") or "direct"),
preferred_iface=str(data.get("preferred_iface") or ""),
advanced_active=bool(data.get("advanced_active", False)),
auto_local_bypass=bool(data.get("auto_local_bypass", True)),
auto_local_active=bool(data.get("auto_local_active", False)),
ingress_reply_bypass=bool(data.get("ingress_reply_bypass", False)),
ingress_reply_active=bool(data.get("ingress_reply_active", False)),
bypass_candidates=int(data.get("bypass_candidates", 0) or 0),
force_vpn_subnets=[str(x) for x in (data.get("force_vpn_subnets") or []) if str(x).strip()],
force_vpn_uids=[str(x) for x in (data.get("force_vpn_uids") or []) if str(x).strip()],
@@ -669,6 +711,8 @@ class ApiClient:
iface_reason=str(data.get("iface_reason") or ""),
rule_mark=bool(data.get("rule_mark", False)),
rule_full=bool(data.get("rule_full", False)),
ingress_rule_present=bool(data.get("ingress_rule_present", False)),
ingress_nft_active=bool(data.get("ingress_nft_active", False)),
table_default=bool(data.get("table_default", False)),
probe_ok=bool(data.get("probe_ok", False)),
probe_message=str(data.get("probe_message") or ""),
@@ -681,6 +725,7 @@ class ApiClient:
mode: str,
preferred_iface: Optional[str] = None,
auto_local_bypass: Optional[bool] = None,
ingress_reply_bypass: Optional[bool] = None,
force_vpn_subnets: Optional[List[str]] = None,
force_vpn_uids: Optional[List[str]] = None,
force_vpn_cgroups: Optional[List[str]] = None,
@@ -696,6 +741,8 @@ class ApiClient:
payload["preferred_iface"] = str(preferred_iface).strip()
if auto_local_bypass is not None:
payload["auto_local_bypass"] = bool(auto_local_bypass)
if ingress_reply_bypass is not None:
payload["ingress_reply_bypass"] = bool(ingress_reply_bypass)
if force_vpn_subnets is not None:
payload["force_vpn_subnets"] = [str(x) for x in force_vpn_subnets]
if force_vpn_uids is not None:
@@ -724,7 +771,11 @@ class ApiClient:
desired_mode=str(data.get("desired_mode") or data.get("mode") or m),
applied_mode=str(data.get("applied_mode") or "direct"),
preferred_iface=str(data.get("preferred_iface") or ""),
advanced_active=bool(data.get("advanced_active", False)),
auto_local_bypass=bool(data.get("auto_local_bypass", True)),
auto_local_active=bool(data.get("auto_local_active", False)),
ingress_reply_bypass=bool(data.get("ingress_reply_bypass", False)),
ingress_reply_active=bool(data.get("ingress_reply_active", False)),
bypass_candidates=int(data.get("bypass_candidates", 0) or 0),
force_vpn_subnets=[str(x) for x in (data.get("force_vpn_subnets") or []) if str(x).strip()],
force_vpn_uids=[str(x) for x in (data.get("force_vpn_uids") or []) if str(x).strip()],
@@ -739,6 +790,8 @@ class ApiClient:
iface_reason=str(data.get("iface_reason") or ""),
rule_mark=bool(data.get("rule_mark", False)),
rule_full=bool(data.get("rule_full", False)),
ingress_rule_present=bool(data.get("ingress_rule_present", False)),
ingress_nft_active=bool(data.get("ingress_nft_active", False)),
table_default=bool(data.get("table_default", False)),
probe_ok=bool(data.get("probe_ok", False)),
probe_message=str(data.get("probe_message") or ""),
@@ -756,7 +809,11 @@ class ApiClient:
desired_mode=str(data.get("desired_mode") or data.get("mode") or "selective"),
applied_mode=str(data.get("applied_mode") or "direct"),
preferred_iface=str(data.get("preferred_iface") or ""),
advanced_active=bool(data.get("advanced_active", False)),
auto_local_bypass=bool(data.get("auto_local_bypass", True)),
auto_local_active=bool(data.get("auto_local_active", False)),
ingress_reply_bypass=bool(data.get("ingress_reply_bypass", False)),
ingress_reply_active=bool(data.get("ingress_reply_active", False)),
bypass_candidates=int(data.get("bypass_candidates", 0) or 0),
force_vpn_subnets=[str(x) for x in (data.get("force_vpn_subnets") or []) if str(x).strip()],
force_vpn_uids=[str(x) for x in (data.get("force_vpn_uids") or []) if str(x).strip()],
@@ -771,6 +828,46 @@ class ApiClient:
iface_reason=str(data.get("iface_reason") or ""),
rule_mark=bool(data.get("rule_mark", False)),
rule_full=bool(data.get("rule_full", False)),
ingress_rule_present=bool(data.get("ingress_rule_present", False)),
ingress_nft_active=bool(data.get("ingress_nft_active", False)),
table_default=bool(data.get("table_default", False)),
probe_ok=bool(data.get("probe_ok", False)),
probe_message=str(data.get("probe_message") or ""),
healthy=bool(data.get("healthy", False)),
message=str(data.get("message") or ""),
)
def traffic_advanced_reset(self) -> TrafficModeStatus:
data = cast(
Dict[str, Any],
self._json(self._request("POST", "/api/v1/traffic/advanced/reset")) or {},
)
return TrafficModeStatus(
mode=str(data.get("mode") or "selective"),
desired_mode=str(data.get("desired_mode") or data.get("mode") or "selective"),
applied_mode=str(data.get("applied_mode") or "direct"),
preferred_iface=str(data.get("preferred_iface") or ""),
advanced_active=bool(data.get("advanced_active", False)),
auto_local_bypass=bool(data.get("auto_local_bypass", True)),
auto_local_active=bool(data.get("auto_local_active", False)),
ingress_reply_bypass=bool(data.get("ingress_reply_bypass", False)),
ingress_reply_active=bool(data.get("ingress_reply_active", False)),
bypass_candidates=int(data.get("bypass_candidates", 0) or 0),
force_vpn_subnets=[str(x) for x in (data.get("force_vpn_subnets") or []) if str(x).strip()],
force_vpn_uids=[str(x) for x in (data.get("force_vpn_uids") or []) if str(x).strip()],
force_vpn_cgroups=[str(x) for x in (data.get("force_vpn_cgroups") or []) if str(x).strip()],
force_direct_subnets=[str(x) for x in (data.get("force_direct_subnets") or []) if str(x).strip()],
force_direct_uids=[str(x) for x in (data.get("force_direct_uids") or []) if str(x).strip()],
force_direct_cgroups=[str(x) for x in (data.get("force_direct_cgroups") or []) if str(x).strip()],
overrides_applied=int(data.get("overrides_applied", 0) or 0),
cgroup_resolved_uids=int(data.get("cgroup_resolved_uids", 0) or 0),
cgroup_warning=str(data.get("cgroup_warning") or ""),
active_iface=str(data.get("active_iface") or ""),
iface_reason=str(data.get("iface_reason") or ""),
rule_mark=bool(data.get("rule_mark", False)),
rule_full=bool(data.get("rule_full", False)),
ingress_rule_present=bool(data.get("ingress_rule_present", False)),
ingress_nft_active=bool(data.get("ingress_nft_active", False)),
table_default=bool(data.get("table_default", False)),
probe_ok=bool(data.get("probe_ok", False)),
probe_message=str(data.get("probe_message") or ""),
@@ -1088,6 +1185,63 @@ class ApiClient:
},
)
def dns_benchmark(
self,
upstreams: List[DNSBenchmarkUpstream],
domains: List[str],
timeout_ms: int = 1800,
attempts: int = 1,
concurrency: int = 6,
) -> DNSBenchmarkResponse:
data = cast(
Dict[str, Any],
self._json(
self._request(
"POST",
"/api/v1/dns/benchmark",
json_body={
"upstreams": [{"addr": u.addr, "enabled": bool(u.enabled)} for u in (upstreams or [])],
"domains": [str(d or "").strip() for d in (domains or []) if str(d or "").strip()],
"timeout_ms": int(timeout_ms),
"attempts": int(attempts),
"concurrency": int(concurrency),
},
)
)
or {},
)
raw_results = data.get("results") or []
if not isinstance(raw_results, list):
raw_results = []
results: List[DNSBenchmarkResult] = []
for row in raw_results:
if not isinstance(row, dict):
continue
results.append(
DNSBenchmarkResult(
upstream=str(row.get("upstream") or "").strip(),
attempts=int(row.get("attempts", 0) or 0),
ok=int(row.get("ok", 0) or 0),
fail=int(row.get("fail", 0) or 0),
nxdomain=int(row.get("nxdomain", 0) or 0),
timeout=int(row.get("timeout", 0) or 0),
temporary=int(row.get("temporary", 0) or 0),
other=int(row.get("other", 0) or 0),
avg_ms=int(row.get("avg_ms", 0) or 0),
p95_ms=int(row.get("p95_ms", 0) or 0),
score=float(row.get("score", 0.0) or 0.0),
color=str(row.get("color") or "").strip().lower(),
)
)
return DNSBenchmarkResponse(
results=results,
domains_used=[str(d or "").strip() for d in (data.get("domains_used") or []) if str(d or "").strip()],
timeout_ms=int(data.get("timeout_ms", 0) or 0),
attempts_per_domain=int(data.get("attempts_per_domain", 0) or 0),
recommended_default=[str(d or "").strip() for d in (data.get("recommended_default") or []) if str(d or "").strip()],
recommended_meta=[str(d or "").strip() for d in (data.get("recommended_meta") or []) if str(d or "").strip()],
)
def dns_status_get(self) -> DNSStatus:
data = cast(Dict[str, Any], self._json(self._request("GET", "/api/v1/dns/status")) or {})
return self._parse_dns_status(data)