resolver: prioritize live batch and cap NX-heavy share

This commit is contained in:
beckline
2026-02-25 10:53:41 +03:00
parent 4b1a189152
commit 74f22d2d41

View File

@@ -215,6 +215,11 @@ type resolverLiveBatchStats struct {
Target int Target int
Total int Total int
Deferred int Deferred int
P1 int
P2 int
P3 int
NXHeavyTotal int
NXHeavySkip int
NextTarget int NextTarget int
NextReason string NextReason string
DNSAttempts int DNSAttempts int
@@ -386,9 +391,9 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
} }
precheckStatePath := opts.CachePath + ".precheck.json" precheckStatePath := opts.CachePath + ".precheck.json"
precheckLastRun := loadResolverPrecheckLastRun(precheckStatePath) precheckLastRun := loadResolverPrecheckLastRun(precheckStatePath)
liveBatchMin := envInt("RESOLVE_LIVE_BATCH_MIN", 1200) liveBatchMin := envInt("RESOLVE_LIVE_BATCH_MIN", 800)
liveBatchMax := envInt("RESOLVE_LIVE_BATCH_MAX", 5000) liveBatchMax := envInt("RESOLVE_LIVE_BATCH_MAX", 3000)
liveBatchDefault := envInt("RESOLVE_LIVE_BATCH_DEFAULT", 3000) liveBatchDefault := envInt("RESOLVE_LIVE_BATCH_DEFAULT", 1800)
if liveBatchMin < 200 { if liveBatchMin < 200 {
liveBatchMin = 200 liveBatchMin = 200
} }
@@ -408,6 +413,13 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
liveBatchDefault = liveBatchMax liveBatchDefault = liveBatchMax
} }
liveBatchTarget := loadResolverLiveBatchTarget(precheckStatePath, liveBatchDefault, liveBatchMin, liveBatchMax) liveBatchTarget := loadResolverLiveBatchTarget(precheckStatePath, liveBatchDefault, liveBatchMin, liveBatchMax)
liveBatchNXHeavyPct := envInt("RESOLVE_LIVE_BATCH_NX_HEAVY_PCT", 10)
if liveBatchNXHeavyPct < 0 {
liveBatchNXHeavyPct = 0
}
if liveBatchNXHeavyPct > 100 {
liveBatchNXHeavyPct = 100
}
precheckEnvForced := resolvePrecheckForceEnvEnabled() precheckEnvForced := resolvePrecheckForceEnvEnabled()
precheckFileForced := resolvePrecheckForceFileEnabled(precheckForcePath) precheckFileForced := resolvePrecheckForceFileEnabled(precheckForcePath)
precheckDue := precheckEnvForced || precheckFileForced || (precheckEverySec > 0 && (precheckLastRun <= 0 || now-precheckLastRun >= precheckEverySec)) precheckDue := precheckEnvForced || precheckFileForced || (precheckEverySec > 0 && (precheckLastRun <= 0 || now-precheckLastRun >= precheckEverySec))
@@ -472,7 +484,7 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
wildcardPolicy := wildcardDNSAttemptPolicy(1) wildcardPolicy := wildcardDNSAttemptPolicy(1)
cEnabled, cMin, cRate, cStreak, cBan, cMaxBan := cooldown.configSnapshot() cEnabled, cMin, cRate, cStreak, cBan, cMaxBan := cooldown.configSnapshot()
logf( logf(
"resolver policy: direct_try=%d direct_budget_ms=%d wildcard_try=%d wildcard_budget_ms=%d nx_early_stop=%t nx_hard_quarantine=%t cooldown_enabled=%t cooldown_min_attempts=%d cooldown_timeout_rate=%d cooldown_fail_streak=%d cooldown_ban_sec=%d cooldown_max_ban_sec=%d live_batch_target=%d live_batch_min=%d live_batch_max=%d stale_keep_sec=%d precheck_every_sec=%d precheck_max=%d precheck_forced_env=%t precheck_forced_file=%t", "resolver policy: direct_try=%d direct_budget_ms=%d wildcard_try=%d wildcard_budget_ms=%d nx_early_stop=%t nx_hard_quarantine=%t cooldown_enabled=%t cooldown_min_attempts=%d cooldown_timeout_rate=%d cooldown_fail_streak=%d cooldown_ban_sec=%d cooldown_max_ban_sec=%d live_batch_target=%d live_batch_min=%d live_batch_max=%d live_batch_nx_heavy_pct=%d stale_keep_sec=%d precheck_every_sec=%d precheck_max=%d precheck_forced_env=%t precheck_forced_file=%t",
directPolicy.TryLimit, directPolicy.TryLimit,
directPolicy.DomainBudget.Milliseconds(), directPolicy.DomainBudget.Milliseconds(),
wildcardPolicy.TryLimit, wildcardPolicy.TryLimit,
@@ -488,6 +500,7 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
liveBatchTarget, liveBatchTarget,
liveBatchMin, liveBatchMin,
liveBatchMax, liveBatchMax,
liveBatchNXHeavyPct,
staleKeepSec, staleKeepSec,
precheckEverySec, precheckEverySec,
precheckMaxDomains, precheckMaxDomains,
@@ -582,25 +595,27 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
} }
toResolveTotal := len(toResolve) toResolveTotal := len(toResolve)
liveDeferred := 0 liveDeferred := 0
if liveBatchTarget > 0 && len(toResolve) > liveBatchTarget { liveP1 := 0
startIdx := 0 liveP2 := 0
if len(toResolve) > 0 { liveP3 := 0
startIdx = now % len(toResolve) liveNXHeavyTotal := 0
if startIdx < 0 { liveNXHeavySkip := 0
startIdx = 0 toResolve, liveP1, liveP2, liveP3, liveNXHeavyTotal, liveNXHeavySkip = pickAdaptiveLiveBatch(
} toResolve,
} liveBatchTarget,
limited := make([]string, 0, liveBatchTarget) now,
for i := 0; i < liveBatchTarget; i++ { liveBatchNXHeavyPct,
idx := (startIdx + i) % len(toResolve) domainCache,
limited = append(limited, toResolve[idx]) cacheSourceForHost,
} wildcards,
liveDeferred = len(toResolve) - len(limited) )
toResolve = limited liveDeferred = toResolveTotal - len(toResolve)
if liveDeferred < 0 {
liveDeferred = 0
} }
if logf != nil { if logf != nil {
logf("resolve: domains=%d cache_hits=%d cache_neg_hits=%d quarantine_hits=%d stale_hits=%d precheck_due=%t precheck_scheduled=%d to_resolve=%d to_resolve_total=%d deferred_by_live_batch=%d", len(domains), len(fresh), cacheNegativeHits, quarantineHits, staleHits, precheckDue, precheckScheduled, len(toResolve), toResolveTotal, liveDeferred) logf("resolve: domains=%d cache_hits=%d cache_neg_hits=%d quarantine_hits=%d stale_hits=%d precheck_due=%t precheck_scheduled=%d to_resolve=%d to_resolve_total=%d deferred_by_live_batch=%d live_p1=%d live_p2=%d live_p3=%d live_nxheavy_total=%d live_nxheavy_skip=%d", len(domains), len(fresh), cacheNegativeHits, quarantineHits, staleHits, precheckDue, precheckScheduled, len(toResolve), toResolveTotal, liveDeferred, liveP1, liveP2, liveP3, liveNXHeavyTotal, liveNXHeavySkip)
} }
dnsStats := dnsMetrics{} dnsStats := dnsMetrics{}
@@ -768,7 +783,7 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
dnsErrors := dnsStats.totalErrors() dnsErrors := dnsStats.totalErrors()
unresolvedSuppressed := cacheNegativeHits + quarantineHits + liveDeferred unresolvedSuppressed := cacheNegativeHits + quarantineHits + liveDeferred
logf( logf(
"resolve summary: domains=%d cache_hits=%d cache_neg_hits=%d quarantine_hits=%d stale_hits=%d resolved_now=%d unresolved=%d unresolved_live=%d unresolved_suppressed=%d live_batch_target=%d live_batch_deferred=%d static_entries=%d static_skipped=%d unique_ips=%d direct_ips=%d wildcard_ips=%d ptr_lookups=%d ptr_errors=%d dns_attempts=%d dns_ok=%d dns_nxdomain=%d dns_timeout=%d dns_temporary=%d dns_other=%d dns_cooldown_skips=%d dns_errors=%d timeout_recheck_checked=%d timeout_recheck_recovered=%d timeout_recheck_recovered_ips=%d timeout_recheck_still_timeout=%d timeout_recheck_now_nxdomain=%d timeout_recheck_now_temporary=%d timeout_recheck_now_other=%d timeout_recheck_no_signal=%d duration_ms=%d", "resolve summary: domains=%d cache_hits=%d cache_neg_hits=%d quarantine_hits=%d stale_hits=%d resolved_now=%d unresolved=%d unresolved_live=%d unresolved_suppressed=%d live_batch_target=%d live_batch_deferred=%d live_batch_p1=%d live_batch_p2=%d live_batch_p3=%d live_batch_nxheavy_total=%d live_batch_nxheavy_skip=%d static_entries=%d static_skipped=%d unique_ips=%d direct_ips=%d wildcard_ips=%d ptr_lookups=%d ptr_errors=%d dns_attempts=%d dns_ok=%d dns_nxdomain=%d dns_timeout=%d dns_temporary=%d dns_other=%d dns_cooldown_skips=%d dns_errors=%d timeout_recheck_checked=%d timeout_recheck_recovered=%d timeout_recheck_recovered_ips=%d timeout_recheck_still_timeout=%d timeout_recheck_now_nxdomain=%d timeout_recheck_now_temporary=%d timeout_recheck_now_other=%d timeout_recheck_no_signal=%d duration_ms=%d",
len(domains), len(domains),
len(fresh), len(fresh),
cacheNegativeHits, cacheNegativeHits,
@@ -780,6 +795,11 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
unresolvedSuppressed, unresolvedSuppressed,
liveBatchTarget, liveBatchTarget,
liveDeferred, liveDeferred,
liveP1,
liveP2,
liveP3,
liveNXHeavyTotal,
liveNXHeavySkip,
len(staticEntries), len(staticEntries),
staticSkipped, staticSkipped,
len(res.IPs), len(res.IPs),
@@ -838,6 +858,11 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
Target: liveBatchTarget, Target: liveBatchTarget,
Total: toResolveTotal, Total: toResolveTotal,
Deferred: liveDeferred, Deferred: liveDeferred,
P1: liveP1,
P2: liveP2,
P3: liveP3,
NXHeavyTotal: liveNXHeavyTotal,
NXHeavySkip: liveNXHeavySkip,
NextTarget: nextTarget, NextTarget: nextTarget,
NextReason: nextReason, NextReason: nextReason,
DNSAttempts: dnsStats.Attempts, DNSAttempts: dnsStats.Attempts,
@@ -2492,6 +2517,165 @@ func computeNextLiveBatchTarget(current, minV, maxV int, dnsStats dnsMetrics, de
return next, reason return next, reason
} }
func classifyLiveBatchHost(
host string,
cache domainCacheState,
cacheSourceForHost func(string) domainCacheSource,
wildcards wildcardMatcher,
) (priority int, nxHeavy bool) {
h := strings.TrimSpace(strings.ToLower(host))
if h == "" {
return 2, false
}
if _, ok := wildcards.exact[h]; ok {
return 1, false
}
source := cacheSourceForHost(h)
rec, ok := cache.Domains[h]
if !ok {
return 2, false
}
entry := getCacheEntryBySource(rec, source)
if entry == nil {
return 2, false
}
stored := normalizeCacheIPs(entry.IPs)
state := normalizeDomainState(entry.State, entry.Score)
errKind, hasErr := normalizeCacheErrorKind(entry.LastErrorKind)
nxHeavy = hasErr && errKind == dnsErrorNXDomain && (state == domainStateQuarantine || state == domainStateHardQuar || entry.Score <= -10)
switch {
case len(stored) > 0:
return 1, false
case state == domainStateActive || state == domainStateStable || state == domainStateSuspect:
return 1, false
case nxHeavy:
return 3, true
default:
return 2, false
}
}
func splitLiveBatchCandidates(
candidates []string,
cache domainCacheState,
cacheSourceForHost func(string) domainCacheSource,
wildcards wildcardMatcher,
) (p1, p2, p3 []string, nxHeavyTotal int) {
for _, host := range candidates {
h := strings.TrimSpace(strings.ToLower(host))
if h == "" {
continue
}
prio, nxHeavy := classifyLiveBatchHost(h, cache, cacheSourceForHost, wildcards)
switch prio {
case 1:
p1 = append(p1, h)
case 3:
nxHeavyTotal++
p3 = append(p3, h)
case 2:
p2 = append(p2, h)
default:
if nxHeavy {
nxHeavyTotal++
p3 = append(p3, h)
} else {
p2 = append(p2, h)
}
}
}
return p1, p2, p3, nxHeavyTotal
}
func pickAdaptiveLiveBatch(
candidates []string,
target int,
now int,
nxHeavyPct int,
cache domainCacheState,
cacheSourceForHost func(string) domainCacheSource,
wildcards wildcardMatcher,
) ([]string, int, int, int, int, int) {
if len(candidates) == 0 {
return nil, 0, 0, 0, 0, 0
}
if target <= 0 {
p1, p2, p3, nxTotal := splitLiveBatchCandidates(candidates, cache, cacheSourceForHost, wildcards)
return append([]string(nil), candidates...), len(p1), len(p2), len(p3), nxTotal, 0
}
if target > len(candidates) {
target = len(candidates)
}
if nxHeavyPct < 0 {
nxHeavyPct = 0
}
if nxHeavyPct > 100 {
nxHeavyPct = 100
}
start := now % len(candidates)
if start < 0 {
start = 0
}
rotated := make([]string, 0, len(candidates))
for i := 0; i < len(candidates); i++ {
idx := (start + i) % len(candidates)
rotated = append(rotated, candidates[idx])
}
p1, p2, p3, nxTotal := splitLiveBatchCandidates(rotated, cache, cacheSourceForHost, wildcards)
out := make([]string, 0, target)
selectedP1 := 0
selectedP2 := 0
selectedP3 := 0
take := func(src []string, n int) ([]string, int) {
if n <= 0 || len(src) == 0 {
return src, 0
}
if n > len(src) {
n = len(src)
}
out = append(out, src[:n]...)
return src[n:], n
}
remain := target
var took int
p1, took = take(p1, remain)
selectedP1 += took
remain = target - len(out)
p2, took = take(p2, remain)
selectedP2 += took
remain = target - len(out)
p3Cap := (target * nxHeavyPct) / 100
if nxHeavyPct > 0 && p3Cap == 0 {
p3Cap = 1
}
if len(out) == 0 && len(p3) > 0 && p3Cap == 0 {
p3Cap = 1
}
if p3Cap > remain {
p3Cap = remain
}
p3, took = take(p3, p3Cap)
selectedP3 += took
// Keep forward progress if every candidate is in NX-heavy bucket.
if len(out) == 0 && len(p3) > 0 && target > 0 {
remain = target - len(out)
p3, took = take(p3, remain)
selectedP3 += took
}
nxSkipped := nxTotal - selectedP3
if nxSkipped < 0 {
nxSkipped = 0
}
return out, selectedP1, selectedP2, selectedP3, nxTotal, nxSkipped
}
func saveResolverPrecheckState(path string, ts int, timeoutStats resolverTimeoutRecheckStats, live resolverLiveBatchStats) { func saveResolverPrecheckState(path string, ts int, timeoutStats resolverTimeoutRecheckStats, live resolverLiveBatchStats) {
if path == "" || ts <= 0 { if path == "" || ts <= 0 {
return return
@@ -2514,6 +2698,11 @@ func saveResolverPrecheckState(path string, ts int, timeoutStats resolverTimeout
state["live_batch_target"] = live.Target state["live_batch_target"] = live.Target
state["live_batch_total"] = live.Total state["live_batch_total"] = live.Total
state["live_batch_deferred"] = live.Deferred state["live_batch_deferred"] = live.Deferred
state["live_batch_p1"] = live.P1
state["live_batch_p2"] = live.P2
state["live_batch_p3"] = live.P3
state["live_batch_nxheavy_total"] = live.NXHeavyTotal
state["live_batch_nxheavy_skip"] = live.NXHeavySkip
state["live_batch_next_target"] = live.NextTarget state["live_batch_next_target"] = live.NextTarget
state["live_batch_next_reason"] = live.NextReason state["live_batch_next_reason"] = live.NextReason
state["live_batch_dns_attempts"] = live.DNSAttempts state["live_batch_dns_attempts"] = live.DNSAttempts