resolver: adaptive live batch sizing and cooldown-aware summary

This commit is contained in:
beckline
2026-02-25 10:24:02 +03:00
parent 29dde73f04
commit 4b1a189152
3 changed files with 165 additions and 9 deletions

View File

@@ -6,7 +6,7 @@
- [x] ~~3. Add stale-keep policy.~~ - [x] ~~3. Add stale-keep policy.~~
- [x] ~~4. Wire 24h precheck cycle (soft pruning only).~~ - [x] ~~4. Wire 24h precheck cycle (soft pruning only).~~
- [x] ~~5. Expose metrics/log clarity in API + GUI (API/trace done; DNS benchmark load-profile UI done; route badges done).~~ - [x] ~~5. Expose metrics/log clarity in API + GUI (API/trace done; DNS benchmark load-profile UI done; route badges done).~~
- [ ] 6. Tune thresholds with production data (pass-1 timeout-suspect; pass-2 NX hard-quarantine off; pass-3 DNS upstream cooldown in-run). - [ ] 6. Tune thresholds with production data (pass-1 timeout-suspect; pass-2 NX hard-quarantine off; pass-3 DNS upstream cooldown in-run; pass-4 adaptive live-batch 1200..5000).
## 1) Goal ## 1) Goal
- Stabilize resolver behavior under high domain volume. - Stabilize resolver behavior under high domain volume.

View File

@@ -211,6 +211,17 @@ type resolverTimeoutRecheckStats struct {
NoSignal int NoSignal int
} }
type resolverLiveBatchStats struct {
Target int
Total int
Deferred int
NextTarget int
NextReason string
DNSAttempts int
DNSTimeout int
DNSCoolSkips int
}
type dnsCooldownState struct { type dnsCooldownState struct {
Attempts int Attempts int
TimeoutLike int TimeoutLike int
@@ -375,6 +386,28 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
} }
precheckStatePath := opts.CachePath + ".precheck.json" precheckStatePath := opts.CachePath + ".precheck.json"
precheckLastRun := loadResolverPrecheckLastRun(precheckStatePath) precheckLastRun := loadResolverPrecheckLastRun(precheckStatePath)
liveBatchMin := envInt("RESOLVE_LIVE_BATCH_MIN", 1200)
liveBatchMax := envInt("RESOLVE_LIVE_BATCH_MAX", 5000)
liveBatchDefault := envInt("RESOLVE_LIVE_BATCH_DEFAULT", 3000)
if liveBatchMin < 200 {
liveBatchMin = 200
}
if liveBatchMin > 50000 {
liveBatchMin = 50000
}
if liveBatchMax < liveBatchMin {
liveBatchMax = liveBatchMin
}
if liveBatchMax > 50000 {
liveBatchMax = 50000
}
if liveBatchDefault < liveBatchMin {
liveBatchDefault = liveBatchMin
}
if liveBatchDefault > liveBatchMax {
liveBatchDefault = liveBatchMax
}
liveBatchTarget := loadResolverLiveBatchTarget(precheckStatePath, liveBatchDefault, liveBatchMin, liveBatchMax)
precheckEnvForced := resolvePrecheckForceEnvEnabled() precheckEnvForced := resolvePrecheckForceEnvEnabled()
precheckFileForced := resolvePrecheckForceFileEnabled(precheckForcePath) precheckFileForced := resolvePrecheckForceFileEnabled(precheckForcePath)
precheckDue := precheckEnvForced || precheckFileForced || (precheckEverySec > 0 && (precheckLastRun <= 0 || now-precheckLastRun >= precheckEverySec)) precheckDue := precheckEnvForced || precheckFileForced || (precheckEverySec > 0 && (precheckLastRun <= 0 || now-precheckLastRun >= precheckEverySec))
@@ -439,7 +472,7 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
wildcardPolicy := wildcardDNSAttemptPolicy(1) wildcardPolicy := wildcardDNSAttemptPolicy(1)
cEnabled, cMin, cRate, cStreak, cBan, cMaxBan := cooldown.configSnapshot() cEnabled, cMin, cRate, cStreak, cBan, cMaxBan := cooldown.configSnapshot()
logf( logf(
"resolver policy: direct_try=%d direct_budget_ms=%d wildcard_try=%d wildcard_budget_ms=%d nx_early_stop=%t nx_hard_quarantine=%t cooldown_enabled=%t cooldown_min_attempts=%d cooldown_timeout_rate=%d cooldown_fail_streak=%d cooldown_ban_sec=%d cooldown_max_ban_sec=%d stale_keep_sec=%d precheck_every_sec=%d precheck_max=%d precheck_forced_env=%t precheck_forced_file=%t", "resolver policy: direct_try=%d direct_budget_ms=%d wildcard_try=%d wildcard_budget_ms=%d nx_early_stop=%t nx_hard_quarantine=%t cooldown_enabled=%t cooldown_min_attempts=%d cooldown_timeout_rate=%d cooldown_fail_streak=%d cooldown_ban_sec=%d cooldown_max_ban_sec=%d live_batch_target=%d live_batch_min=%d live_batch_max=%d stale_keep_sec=%d precheck_every_sec=%d precheck_max=%d precheck_forced_env=%t precheck_forced_file=%t",
directPolicy.TryLimit, directPolicy.TryLimit,
directPolicy.DomainBudget.Milliseconds(), directPolicy.DomainBudget.Milliseconds(),
wildcardPolicy.TryLimit, wildcardPolicy.TryLimit,
@@ -452,6 +485,9 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
cStreak, cStreak,
cBan, cBan,
cMaxBan, cMaxBan,
liveBatchTarget,
liveBatchMin,
liveBatchMax,
staleKeepSec, staleKeepSec,
precheckEverySec, precheckEverySec,
precheckMaxDomains, precheckMaxDomains,
@@ -544,9 +580,27 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
for k, v := range fresh { for k, v := range fresh {
resolved[k] = v resolved[k] = v
} }
toResolveTotal := len(toResolve)
liveDeferred := 0
if liveBatchTarget > 0 && len(toResolve) > liveBatchTarget {
startIdx := 0
if len(toResolve) > 0 {
startIdx = now % len(toResolve)
if startIdx < 0 {
startIdx = 0
}
}
limited := make([]string, 0, liveBatchTarget)
for i := 0; i < liveBatchTarget; i++ {
idx := (startIdx + i) % len(toResolve)
limited = append(limited, toResolve[idx])
}
liveDeferred = len(toResolve) - len(limited)
toResolve = limited
}
if logf != nil { if logf != nil {
logf("resolve: domains=%d cache_hits=%d cache_neg_hits=%d quarantine_hits=%d stale_hits=%d precheck_due=%t precheck_scheduled=%d to_resolve=%d", len(domains), len(fresh), cacheNegativeHits, quarantineHits, staleHits, precheckDue, precheckScheduled, len(toResolve)) logf("resolve: domains=%d cache_hits=%d cache_neg_hits=%d quarantine_hits=%d stale_hits=%d precheck_due=%t precheck_scheduled=%d to_resolve=%d to_resolve_total=%d deferred_by_live_batch=%d", len(domains), len(fresh), cacheNegativeHits, quarantineHits, staleHits, precheckDue, precheckScheduled, len(toResolve), toResolveTotal, liveDeferred)
} }
dnsStats := dnsMetrics{} dnsStats := dnsMetrics{}
@@ -712,8 +766,9 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
if logf != nil { if logf != nil {
dnsErrors := dnsStats.totalErrors() dnsErrors := dnsStats.totalErrors()
unresolvedSuppressed := cacheNegativeHits + quarantineHits + liveDeferred
logf( logf(
"resolve summary: domains=%d cache_hits=%d cache_neg_hits=%d quarantine_hits=%d stale_hits=%d resolved_now=%d unresolved=%d unresolved_live=%d unresolved_suppressed=%d static_entries=%d static_skipped=%d unique_ips=%d direct_ips=%d wildcard_ips=%d ptr_lookups=%d ptr_errors=%d dns_attempts=%d dns_ok=%d dns_nxdomain=%d dns_timeout=%d dns_temporary=%d dns_other=%d dns_cooldown_skips=%d dns_errors=%d timeout_recheck_checked=%d timeout_recheck_recovered=%d timeout_recheck_recovered_ips=%d timeout_recheck_still_timeout=%d timeout_recheck_now_nxdomain=%d timeout_recheck_now_temporary=%d timeout_recheck_now_other=%d timeout_recheck_no_signal=%d duration_ms=%d", "resolve summary: domains=%d cache_hits=%d cache_neg_hits=%d quarantine_hits=%d stale_hits=%d resolved_now=%d unresolved=%d unresolved_live=%d unresolved_suppressed=%d live_batch_target=%d live_batch_deferred=%d static_entries=%d static_skipped=%d unique_ips=%d direct_ips=%d wildcard_ips=%d ptr_lookups=%d ptr_errors=%d dns_attempts=%d dns_ok=%d dns_nxdomain=%d dns_timeout=%d dns_temporary=%d dns_other=%d dns_cooldown_skips=%d dns_errors=%d timeout_recheck_checked=%d timeout_recheck_recovered=%d timeout_recheck_recovered_ips=%d timeout_recheck_still_timeout=%d timeout_recheck_now_nxdomain=%d timeout_recheck_now_temporary=%d timeout_recheck_now_other=%d timeout_recheck_no_signal=%d duration_ms=%d",
len(domains), len(domains),
len(fresh), len(fresh),
cacheNegativeHits, cacheNegativeHits,
@@ -722,7 +777,9 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
len(resolved)-len(fresh), len(resolved)-len(fresh),
len(domains)-len(resolved), len(domains)-len(resolved),
unresolvedAfterAttempts, unresolvedAfterAttempts,
cacheNegativeHits+quarantineHits, unresolvedSuppressed,
liveBatchTarget,
liveDeferred,
len(staticEntries), len(staticEntries),
staticSkipped, staticSkipped,
len(res.IPs), len(res.IPs),
@@ -758,12 +815,13 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
logf("resolve domain states: %s", stateSummary) logf("resolve domain states: %s", stateSummary)
} }
logf( logf(
"resolve breakdown: resolved_now_total=%d resolved_now_dns=%d resolved_now_stale=%d skipped_neg=%d skipped_quarantine=%d unresolved_after_attempts=%d", "resolve breakdown: resolved_now_total=%d resolved_now_dns=%d resolved_now_stale=%d skipped_neg=%d skipped_quarantine=%d deferred_live_batch=%d unresolved_after_attempts=%d",
len(resolved)-len(fresh), len(resolved)-len(fresh),
resolvedNowDNS, resolvedNowDNS,
resolvedNowStale, resolvedNowStale,
cacheNegativeHits, cacheNegativeHits,
quarantineHits, quarantineHits,
liveDeferred,
unresolvedAfterAttempts, unresolvedAfterAttempts,
) )
if precheckDue { if precheckDue {
@@ -771,7 +829,22 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
} }
} }
if precheckDue { if precheckDue {
saveResolverPrecheckState(precheckStatePath, now, timeoutRecheck) nextTarget, nextReason := computeNextLiveBatchTarget(liveBatchTarget, liveBatchMin, liveBatchMax, dnsStats, liveDeferred)
saveResolverPrecheckState(
precheckStatePath,
now,
timeoutRecheck,
resolverLiveBatchStats{
Target: liveBatchTarget,
Total: toResolveTotal,
Deferred: liveDeferred,
NextTarget: nextTarget,
NextReason: nextReason,
DNSAttempts: dnsStats.Attempts,
DNSTimeout: dnsStats.Timeout,
DNSCoolSkips: dnsStats.Skipped,
},
)
} }
if precheckFileForced { if precheckFileForced {
_ = os.Remove(precheckForcePath) _ = os.Remove(precheckForcePath)
@@ -2347,7 +2420,79 @@ func loadResolverPrecheckLastRun(path string) int {
return v return v
} }
func saveResolverPrecheckState(path string, ts int, timeoutStats resolverTimeoutRecheckStats) { func loadResolverLiveBatchTarget(path string, fallback, minV, maxV int) int {
if fallback < minV {
fallback = minV
}
if fallback > maxV {
fallback = maxV
}
m := loadJSONMap(path)
if len(m) == 0 {
return fallback
}
raw := m["live_batch_next_target"]
if raw == nil {
raw = m["live_batch_target"]
}
v, ok := parseAnyInt(raw)
if !ok || v <= 0 {
return fallback
}
if v < minV {
v = minV
}
if v > maxV {
v = maxV
}
return v
}
func computeNextLiveBatchTarget(current, minV, maxV int, dnsStats dnsMetrics, deferred int) (int, string) {
if current < minV {
current = minV
}
if current > maxV {
current = maxV
}
next := current
reason := "stable"
attempts := dnsStats.Attempts
timeoutRate := 0.0
if attempts > 0 {
timeoutRate = float64(dnsStats.Timeout) / float64(attempts)
}
switch {
case attempts == 0:
reason = "no_dns_attempts"
case dnsStats.Skipped > 0 || timeoutRate >= 0.15:
next = int(float64(current) * 0.75)
reason = "timeout_high_or_cooldown"
case timeoutRate >= 0.08:
next = int(float64(current) * 0.90)
reason = "timeout_medium"
case timeoutRate <= 0.03 && deferred > 0:
next = int(float64(current) * 1.15)
reason = "timeout_low_expand"
case timeoutRate <= 0.03:
next = int(float64(current) * 1.10)
reason = "timeout_low"
}
if next < minV {
next = minV
}
if next > maxV {
next = maxV
}
if next == current && reason == "timeout_low" {
reason = "stable"
}
return next, reason
}
func saveResolverPrecheckState(path string, ts int, timeoutStats resolverTimeoutRecheckStats, live resolverLiveBatchStats) {
if path == "" || ts <= 0 { if path == "" || ts <= 0 {
return return
} }
@@ -2366,6 +2511,14 @@ func saveResolverPrecheckState(path string, ts int, timeoutStats resolverTimeout
"now_other": timeoutStats.NowOther, "now_other": timeoutStats.NowOther,
"no_signal": timeoutStats.NoSignal, "no_signal": timeoutStats.NoSignal,
} }
state["live_batch_target"] = live.Target
state["live_batch_total"] = live.Total
state["live_batch_deferred"] = live.Deferred
state["live_batch_next_target"] = live.NextTarget
state["live_batch_next_reason"] = live.NextReason
state["live_batch_dns_attempts"] = live.DNSAttempts
state["live_batch_dns_timeout"] = live.DNSTimeout
state["live_batch_dns_cooldown_skips"] = live.DNSCoolSkips
saveJSON(state, path) saveJSON(state, path)
} }

View File

@@ -667,6 +667,8 @@ class DashboardController:
dns_attempts = int(pairs.get("dns_attempts", 0)) dns_attempts = int(pairs.get("dns_attempts", 0))
dns_timeout = int(pairs.get("dns_timeout", 0)) dns_timeout = int(pairs.get("dns_timeout", 0))
dns_cooldown_skips = int(pairs.get("dns_cooldown_skips", 0)) dns_cooldown_skips = int(pairs.get("dns_cooldown_skips", 0))
live_batch_target = int(pairs.get("live_batch_target", 0))
live_batch_deferred = int(pairs.get("live_batch_deferred", 0))
r_checked = int(pairs.get("timeout_recheck_checked", 0)) r_checked = int(pairs.get("timeout_recheck_checked", 0))
r_recovered = int(pairs.get("timeout_recheck_recovered", 0)) r_recovered = int(pairs.get("timeout_recheck_recovered", 0))
@@ -680,7 +682,8 @@ class DashboardController:
f"+recheck_ips={r_recovered_ips}) | unresolved={unresolved} " f"+recheck_ips={r_recovered_ips}) | unresolved={unresolved} "
f"(live={unresolved_live}, suppressed={unresolved_suppressed}) | " f"(live={unresolved_live}, suppressed={unresolved_suppressed}) | "
f"quarantine_hits={q_hits} | dns_timeout={dns_timeout} " f"quarantine_hits={q_hits} | dns_timeout={dns_timeout} "
f"| cooldown_skips={dns_cooldown_skips} | attempts={dns_attempts}" f"| cooldown_skips={dns_cooldown_skips} | attempts={dns_attempts} "
f"| live_batch={live_batch_target} deferred={live_batch_deferred}"
) )
recheck_text = ( recheck_text = (
f"Timeout recheck: checked={r_checked} recovered={r_recovered} " f"Timeout recheck: checked={r_checked} recovered={r_recovered} "