diff --git a/selective-vpn-api/app/resolver.go b/selective-vpn-api/app/resolver.go index a75de5e..3a0d3b3 100644 --- a/selective-vpn-api/app/resolver.go +++ b/selective-vpn-api/app/resolver.go @@ -215,6 +215,11 @@ type resolverLiveBatchStats struct { Target int Total int Deferred int + P1 int + P2 int + P3 int + NXHeavyTotal int + NXHeavySkip int NextTarget int NextReason string DNSAttempts int @@ -386,9 +391,9 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul } precheckStatePath := opts.CachePath + ".precheck.json" precheckLastRun := loadResolverPrecheckLastRun(precheckStatePath) - liveBatchMin := envInt("RESOLVE_LIVE_BATCH_MIN", 1200) - liveBatchMax := envInt("RESOLVE_LIVE_BATCH_MAX", 5000) - liveBatchDefault := envInt("RESOLVE_LIVE_BATCH_DEFAULT", 3000) + liveBatchMin := envInt("RESOLVE_LIVE_BATCH_MIN", 800) + liveBatchMax := envInt("RESOLVE_LIVE_BATCH_MAX", 3000) + liveBatchDefault := envInt("RESOLVE_LIVE_BATCH_DEFAULT", 1800) if liveBatchMin < 200 { liveBatchMin = 200 } @@ -408,6 +413,13 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul liveBatchDefault = liveBatchMax } liveBatchTarget := loadResolverLiveBatchTarget(precheckStatePath, liveBatchDefault, liveBatchMin, liveBatchMax) + liveBatchNXHeavyPct := envInt("RESOLVE_LIVE_BATCH_NX_HEAVY_PCT", 10) + if liveBatchNXHeavyPct < 0 { + liveBatchNXHeavyPct = 0 + } + if liveBatchNXHeavyPct > 100 { + liveBatchNXHeavyPct = 100 + } precheckEnvForced := resolvePrecheckForceEnvEnabled() precheckFileForced := resolvePrecheckForceFileEnabled(precheckForcePath) precheckDue := precheckEnvForced || precheckFileForced || (precheckEverySec > 0 && (precheckLastRun <= 0 || now-precheckLastRun >= precheckEverySec)) @@ -472,7 +484,7 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul wildcardPolicy := wildcardDNSAttemptPolicy(1) cEnabled, cMin, cRate, cStreak, cBan, cMaxBan := cooldown.configSnapshot() logf( - "resolver policy: direct_try=%d direct_budget_ms=%d wildcard_try=%d wildcard_budget_ms=%d nx_early_stop=%t nx_hard_quarantine=%t cooldown_enabled=%t cooldown_min_attempts=%d cooldown_timeout_rate=%d cooldown_fail_streak=%d cooldown_ban_sec=%d cooldown_max_ban_sec=%d live_batch_target=%d live_batch_min=%d live_batch_max=%d stale_keep_sec=%d precheck_every_sec=%d precheck_max=%d precheck_forced_env=%t precheck_forced_file=%t", + "resolver policy: direct_try=%d direct_budget_ms=%d wildcard_try=%d wildcard_budget_ms=%d nx_early_stop=%t nx_hard_quarantine=%t cooldown_enabled=%t cooldown_min_attempts=%d cooldown_timeout_rate=%d cooldown_fail_streak=%d cooldown_ban_sec=%d cooldown_max_ban_sec=%d live_batch_target=%d live_batch_min=%d live_batch_max=%d live_batch_nx_heavy_pct=%d stale_keep_sec=%d precheck_every_sec=%d precheck_max=%d precheck_forced_env=%t precheck_forced_file=%t", directPolicy.TryLimit, directPolicy.DomainBudget.Milliseconds(), wildcardPolicy.TryLimit, @@ -488,6 +500,7 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul liveBatchTarget, liveBatchMin, liveBatchMax, + liveBatchNXHeavyPct, staleKeepSec, precheckEverySec, precheckMaxDomains, @@ -582,25 +595,27 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul } toResolveTotal := len(toResolve) liveDeferred := 0 - if liveBatchTarget > 0 && len(toResolve) > liveBatchTarget { - startIdx := 0 - if len(toResolve) > 0 { - startIdx = now % len(toResolve) - if startIdx < 0 { - startIdx = 0 - } - } - limited := make([]string, 0, liveBatchTarget) - for i := 0; i < liveBatchTarget; i++ { - idx := (startIdx + i) % len(toResolve) - limited = append(limited, toResolve[idx]) - } - liveDeferred = len(toResolve) - len(limited) - toResolve = limited + liveP1 := 0 + liveP2 := 0 + liveP3 := 0 + liveNXHeavyTotal := 0 + liveNXHeavySkip := 0 + toResolve, liveP1, liveP2, liveP3, liveNXHeavyTotal, liveNXHeavySkip = pickAdaptiveLiveBatch( + toResolve, + liveBatchTarget, + now, + liveBatchNXHeavyPct, + domainCache, + cacheSourceForHost, + wildcards, + ) + liveDeferred = toResolveTotal - len(toResolve) + if liveDeferred < 0 { + liveDeferred = 0 } if logf != nil { - logf("resolve: domains=%d cache_hits=%d cache_neg_hits=%d quarantine_hits=%d stale_hits=%d precheck_due=%t precheck_scheduled=%d to_resolve=%d to_resolve_total=%d deferred_by_live_batch=%d", len(domains), len(fresh), cacheNegativeHits, quarantineHits, staleHits, precheckDue, precheckScheduled, len(toResolve), toResolveTotal, liveDeferred) + logf("resolve: domains=%d cache_hits=%d cache_neg_hits=%d quarantine_hits=%d stale_hits=%d precheck_due=%t precheck_scheduled=%d to_resolve=%d to_resolve_total=%d deferred_by_live_batch=%d live_p1=%d live_p2=%d live_p3=%d live_nxheavy_total=%d live_nxheavy_skip=%d", len(domains), len(fresh), cacheNegativeHits, quarantineHits, staleHits, precheckDue, precheckScheduled, len(toResolve), toResolveTotal, liveDeferred, liveP1, liveP2, liveP3, liveNXHeavyTotal, liveNXHeavySkip) } dnsStats := dnsMetrics{} @@ -768,7 +783,7 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul dnsErrors := dnsStats.totalErrors() unresolvedSuppressed := cacheNegativeHits + quarantineHits + liveDeferred logf( - "resolve summary: domains=%d cache_hits=%d cache_neg_hits=%d quarantine_hits=%d stale_hits=%d resolved_now=%d unresolved=%d unresolved_live=%d unresolved_suppressed=%d live_batch_target=%d live_batch_deferred=%d static_entries=%d static_skipped=%d unique_ips=%d direct_ips=%d wildcard_ips=%d ptr_lookups=%d ptr_errors=%d dns_attempts=%d dns_ok=%d dns_nxdomain=%d dns_timeout=%d dns_temporary=%d dns_other=%d dns_cooldown_skips=%d dns_errors=%d timeout_recheck_checked=%d timeout_recheck_recovered=%d timeout_recheck_recovered_ips=%d timeout_recheck_still_timeout=%d timeout_recheck_now_nxdomain=%d timeout_recheck_now_temporary=%d timeout_recheck_now_other=%d timeout_recheck_no_signal=%d duration_ms=%d", + "resolve summary: domains=%d cache_hits=%d cache_neg_hits=%d quarantine_hits=%d stale_hits=%d resolved_now=%d unresolved=%d unresolved_live=%d unresolved_suppressed=%d live_batch_target=%d live_batch_deferred=%d live_batch_p1=%d live_batch_p2=%d live_batch_p3=%d live_batch_nxheavy_total=%d live_batch_nxheavy_skip=%d static_entries=%d static_skipped=%d unique_ips=%d direct_ips=%d wildcard_ips=%d ptr_lookups=%d ptr_errors=%d dns_attempts=%d dns_ok=%d dns_nxdomain=%d dns_timeout=%d dns_temporary=%d dns_other=%d dns_cooldown_skips=%d dns_errors=%d timeout_recheck_checked=%d timeout_recheck_recovered=%d timeout_recheck_recovered_ips=%d timeout_recheck_still_timeout=%d timeout_recheck_now_nxdomain=%d timeout_recheck_now_temporary=%d timeout_recheck_now_other=%d timeout_recheck_no_signal=%d duration_ms=%d", len(domains), len(fresh), cacheNegativeHits, @@ -780,6 +795,11 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul unresolvedSuppressed, liveBatchTarget, liveDeferred, + liveP1, + liveP2, + liveP3, + liveNXHeavyTotal, + liveNXHeavySkip, len(staticEntries), staticSkipped, len(res.IPs), @@ -838,6 +858,11 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul Target: liveBatchTarget, Total: toResolveTotal, Deferred: liveDeferred, + P1: liveP1, + P2: liveP2, + P3: liveP3, + NXHeavyTotal: liveNXHeavyTotal, + NXHeavySkip: liveNXHeavySkip, NextTarget: nextTarget, NextReason: nextReason, DNSAttempts: dnsStats.Attempts, @@ -2492,6 +2517,165 @@ func computeNextLiveBatchTarget(current, minV, maxV int, dnsStats dnsMetrics, de return next, reason } +func classifyLiveBatchHost( + host string, + cache domainCacheState, + cacheSourceForHost func(string) domainCacheSource, + wildcards wildcardMatcher, +) (priority int, nxHeavy bool) { + h := strings.TrimSpace(strings.ToLower(host)) + if h == "" { + return 2, false + } + if _, ok := wildcards.exact[h]; ok { + return 1, false + } + source := cacheSourceForHost(h) + rec, ok := cache.Domains[h] + if !ok { + return 2, false + } + entry := getCacheEntryBySource(rec, source) + if entry == nil { + return 2, false + } + stored := normalizeCacheIPs(entry.IPs) + state := normalizeDomainState(entry.State, entry.Score) + errKind, hasErr := normalizeCacheErrorKind(entry.LastErrorKind) + nxHeavy = hasErr && errKind == dnsErrorNXDomain && (state == domainStateQuarantine || state == domainStateHardQuar || entry.Score <= -10) + + switch { + case len(stored) > 0: + return 1, false + case state == domainStateActive || state == domainStateStable || state == domainStateSuspect: + return 1, false + case nxHeavy: + return 3, true + default: + return 2, false + } +} + +func splitLiveBatchCandidates( + candidates []string, + cache domainCacheState, + cacheSourceForHost func(string) domainCacheSource, + wildcards wildcardMatcher, +) (p1, p2, p3 []string, nxHeavyTotal int) { + for _, host := range candidates { + h := strings.TrimSpace(strings.ToLower(host)) + if h == "" { + continue + } + prio, nxHeavy := classifyLiveBatchHost(h, cache, cacheSourceForHost, wildcards) + switch prio { + case 1: + p1 = append(p1, h) + case 3: + nxHeavyTotal++ + p3 = append(p3, h) + case 2: + p2 = append(p2, h) + default: + if nxHeavy { + nxHeavyTotal++ + p3 = append(p3, h) + } else { + p2 = append(p2, h) + } + } + } + return p1, p2, p3, nxHeavyTotal +} + +func pickAdaptiveLiveBatch( + candidates []string, + target int, + now int, + nxHeavyPct int, + cache domainCacheState, + cacheSourceForHost func(string) domainCacheSource, + wildcards wildcardMatcher, +) ([]string, int, int, int, int, int) { + if len(candidates) == 0 { + return nil, 0, 0, 0, 0, 0 + } + if target <= 0 { + p1, p2, p3, nxTotal := splitLiveBatchCandidates(candidates, cache, cacheSourceForHost, wildcards) + return append([]string(nil), candidates...), len(p1), len(p2), len(p3), nxTotal, 0 + } + if target > len(candidates) { + target = len(candidates) + } + if nxHeavyPct < 0 { + nxHeavyPct = 0 + } + if nxHeavyPct > 100 { + nxHeavyPct = 100 + } + + start := now % len(candidates) + if start < 0 { + start = 0 + } + rotated := make([]string, 0, len(candidates)) + for i := 0; i < len(candidates); i++ { + idx := (start + i) % len(candidates) + rotated = append(rotated, candidates[idx]) + } + p1, p2, p3, nxTotal := splitLiveBatchCandidates(rotated, cache, cacheSourceForHost, wildcards) + out := make([]string, 0, target) + selectedP1 := 0 + selectedP2 := 0 + selectedP3 := 0 + + take := func(src []string, n int) ([]string, int) { + if n <= 0 || len(src) == 0 { + return src, 0 + } + if n > len(src) { + n = len(src) + } + out = append(out, src[:n]...) + return src[n:], n + } + + remain := target + var took int + p1, took = take(p1, remain) + selectedP1 += took + remain = target - len(out) + p2, took = take(p2, remain) + selectedP2 += took + remain = target - len(out) + + p3Cap := (target * nxHeavyPct) / 100 + if nxHeavyPct > 0 && p3Cap == 0 { + p3Cap = 1 + } + if len(out) == 0 && len(p3) > 0 && p3Cap == 0 { + p3Cap = 1 + } + if p3Cap > remain { + p3Cap = remain + } + p3, took = take(p3, p3Cap) + selectedP3 += took + + // Keep forward progress if every candidate is in NX-heavy bucket. + if len(out) == 0 && len(p3) > 0 && target > 0 { + remain = target - len(out) + p3, took = take(p3, remain) + selectedP3 += took + } + + nxSkipped := nxTotal - selectedP3 + if nxSkipped < 0 { + nxSkipped = 0 + } + return out, selectedP1, selectedP2, selectedP3, nxTotal, nxSkipped +} + func saveResolverPrecheckState(path string, ts int, timeoutStats resolverTimeoutRecheckStats, live resolverLiveBatchStats) { if path == "" || ts <= 0 { return @@ -2514,6 +2698,11 @@ func saveResolverPrecheckState(path string, ts int, timeoutStats resolverTimeout state["live_batch_target"] = live.Target state["live_batch_total"] = live.Total state["live_batch_deferred"] = live.Deferred + state["live_batch_p1"] = live.P1 + state["live_batch_p2"] = live.P2 + state["live_batch_p3"] = live.P3 + state["live_batch_nxheavy_total"] = live.NXHeavyTotal + state["live_batch_nxheavy_skip"] = live.NXHeavySkip state["live_batch_next_target"] = live.NextTarget state["live_batch_next_reason"] = live.NextReason state["live_batch_dns_attempts"] = live.DNSAttempts