resolver: add upstream cooldown + expose live vs suppressed unresolved

This commit is contained in:
beckline
2026-02-25 10:11:28 +03:00
parent e8fb361b4c
commit 29dde73f04
3 changed files with 204 additions and 13 deletions

View File

@@ -6,7 +6,7 @@
- [x] ~~3. Add stale-keep policy.~~
- [x] ~~4. Wire 24h precheck cycle (soft pruning only).~~
- [x] ~~5. Expose metrics/log clarity in API + GUI (API/trace done; DNS benchmark load-profile UI done; route badges done).~~
- [ ] 6. Tune thresholds with production data (pass-1 done: timeout-only stays suspect; pass-2 done: NX hard-quarantine disabled by default).
- [ ] 6. Tune thresholds with production data (pass-1 timeout-suspect; pass-2 NX hard-quarantine off; pass-3 DNS upstream cooldown in-run).
## 1) Goal
- Stabilize resolver behavior under high domain volume.

View File

@@ -13,6 +13,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
)
@@ -43,6 +44,7 @@ type dnsUpstreamMetrics struct {
Timeout int
Temporary int
Other int
Skipped int
}
type dnsMetrics struct {
@@ -52,6 +54,7 @@ type dnsMetrics struct {
Timeout int
Temporary int
Other int
Skipped int
PerUpstream map[string]*dnsUpstreamMetrics
}
@@ -96,6 +99,12 @@ func (m *dnsMetrics) addError(upstream string, kind dnsErrorKind) {
}
}
func (m *dnsMetrics) addCooldownSkip(upstream string) {
m.Skipped++
us := m.ensureUpstream(upstream)
us.Skipped++
}
func (m *dnsMetrics) merge(other dnsMetrics) {
m.Attempts += other.Attempts
m.OK += other.OK
@@ -131,7 +140,7 @@ func (m dnsMetrics) formatPerUpstream() string {
parts := make([]string, 0, len(keys))
for _, k := range keys {
v := m.PerUpstream[k]
parts = append(parts, fmt.Sprintf("%s{attempts=%d ok=%d nxdomain=%d timeout=%d temporary=%d other=%d}", k, v.Attempts, v.OK, v.NXDomain, v.Timeout, v.Temporary, v.Other))
parts = append(parts, fmt.Sprintf("%s{attempts=%d ok=%d nxdomain=%d timeout=%d temporary=%d other=%d skipped=%d}", k, v.Attempts, v.OK, v.NXDomain, v.Timeout, v.Temporary, v.Other, v.Skipped))
}
return strings.Join(parts, "; ")
}
@@ -163,7 +172,7 @@ func (m dnsMetrics) formatResolverHealth() string {
default:
state = "bad"
}
parts = append(parts, fmt.Sprintf("%s{score=%.1f state=%s attempts=%d ok=%d timeout=%d nxdomain=%d temporary=%d other=%d}", k, score, state, v.Attempts, v.OK, v.Timeout, v.NXDomain, v.Temporary, v.Other))
parts = append(parts, fmt.Sprintf("%s{score=%.1f state=%s attempts=%d ok=%d timeout=%d nxdomain=%d temporary=%d other=%d skipped=%d}", k, score, state, v.Attempts, v.OK, v.Timeout, v.NXDomain, v.Temporary, v.Other, v.Skipped))
}
return strings.Join(parts, "; ")
}
@@ -202,6 +211,26 @@ type resolverTimeoutRecheckStats struct {
NoSignal int
}
type dnsCooldownState struct {
Attempts int
TimeoutLike int
FailStreak int
BanUntil int64
BanLevel int
}
type dnsRunCooldown struct {
mu sync.Mutex
enabled bool
minAttempts int
timeoutRatePct int
failStreak int
banSec int
maxBanSec int
temporaryAsError bool
byUpstream map[string]*dnsCooldownState
}
// Empty by default: primary resolver pool comes from DNS upstream pool state.
// Optional fallback list can still be provided via RESOLVE_DNS_FALLBACKS env.
var resolverFallbackDNS []string
@@ -386,6 +415,7 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
}
return domainCacheSourceDirect
}
cooldown := newDNSRunCooldown()
timeoutRecheck := resolverTimeoutRecheckStats{}
if precheckDue && timeoutRecheckMax > 0 {
@@ -407,14 +437,21 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
logf("resolver start: domains=%d ttl=%ds workers=%d dns_timeout_ms=%d", len(domains), ttl, workers, dnsTimeoutMs)
directPolicy := directDNSAttemptPolicy(len(cfg.Default))
wildcardPolicy := wildcardDNSAttemptPolicy(1)
cEnabled, cMin, cRate, cStreak, cBan, cMaxBan := cooldown.configSnapshot()
logf(
"resolver policy: direct_try=%d direct_budget_ms=%d wildcard_try=%d wildcard_budget_ms=%d nx_early_stop=%t nx_hard_quarantine=%t stale_keep_sec=%d precheck_every_sec=%d precheck_max=%d precheck_forced_env=%t precheck_forced_file=%t",
"resolver policy: direct_try=%d direct_budget_ms=%d wildcard_try=%d wildcard_budget_ms=%d nx_early_stop=%t nx_hard_quarantine=%t cooldown_enabled=%t cooldown_min_attempts=%d cooldown_timeout_rate=%d cooldown_fail_streak=%d cooldown_ban_sec=%d cooldown_max_ban_sec=%d stale_keep_sec=%d precheck_every_sec=%d precheck_max=%d precheck_forced_env=%t precheck_forced_file=%t",
directPolicy.TryLimit,
directPolicy.DomainBudget.Milliseconds(),
wildcardPolicy.TryLimit,
wildcardPolicy.DomainBudget.Milliseconds(),
resolveNXEarlyStopEnabled(),
resolveNXHardQuarantineEnabled(),
cEnabled,
cMin,
cRate,
cStreak,
cBan,
cMaxBan,
staleKeepSec,
precheckEverySec,
precheckMaxDomains,
@@ -531,7 +568,7 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
for i := 0; i < workers; i++ {
go func() {
for j := range jobs {
ips, stats := resolveHostGo(j.host, cfg, metaSpecial, wildcards, dnsTimeout, logf)
ips, stats := resolveHostGo(j.host, cfg, metaSpecial, wildcards, dnsTimeout, cooldown, logf)
results <- struct {
host string
ips []string
@@ -676,7 +713,7 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
if logf != nil {
dnsErrors := dnsStats.totalErrors()
logf(
"resolve summary: domains=%d cache_hits=%d cache_neg_hits=%d quarantine_hits=%d stale_hits=%d resolved_now=%d unresolved=%d unresolved_live=%d unresolved_suppressed=%d static_entries=%d static_skipped=%d unique_ips=%d direct_ips=%d wildcard_ips=%d ptr_lookups=%d ptr_errors=%d dns_attempts=%d dns_ok=%d dns_nxdomain=%d dns_timeout=%d dns_temporary=%d dns_other=%d dns_errors=%d timeout_recheck_checked=%d timeout_recheck_recovered=%d timeout_recheck_recovered_ips=%d timeout_recheck_still_timeout=%d timeout_recheck_now_nxdomain=%d timeout_recheck_now_temporary=%d timeout_recheck_now_other=%d timeout_recheck_no_signal=%d duration_ms=%d",
"resolve summary: domains=%d cache_hits=%d cache_neg_hits=%d quarantine_hits=%d stale_hits=%d resolved_now=%d unresolved=%d unresolved_live=%d unresolved_suppressed=%d static_entries=%d static_skipped=%d unique_ips=%d direct_ips=%d wildcard_ips=%d ptr_lookups=%d ptr_errors=%d dns_attempts=%d dns_ok=%d dns_nxdomain=%d dns_timeout=%d dns_temporary=%d dns_other=%d dns_cooldown_skips=%d dns_errors=%d timeout_recheck_checked=%d timeout_recheck_recovered=%d timeout_recheck_recovered_ips=%d timeout_recheck_still_timeout=%d timeout_recheck_now_nxdomain=%d timeout_recheck_now_temporary=%d timeout_recheck_now_other=%d timeout_recheck_no_signal=%d duration_ms=%d",
len(domains),
len(fresh),
cacheNegativeHits,
@@ -699,6 +736,7 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
dnsStats.Timeout,
dnsStats.Temporary,
dnsStats.Other,
dnsStats.Skipped,
dnsErrors,
timeoutRecheck.Checked,
timeoutRecheck.Recovered,
@@ -748,7 +786,7 @@ func runResolverJob(opts ResolverOpts, logf func(string, ...any)) (resolverResul
// DNS resolve helpers
// ---------------------------------------------------------------------
func resolveHostGo(host string, cfg dnsConfig, metaSpecial []string, wildcards wildcardMatcher, timeout time.Duration, logf func(string, ...any)) ([]string, dnsMetrics) {
func resolveHostGo(host string, cfg dnsConfig, metaSpecial []string, wildcards wildcardMatcher, timeout time.Duration, cooldown *dnsRunCooldown, logf func(string, ...any)) ([]string, dnsMetrics) {
useMeta := false
for _, m := range metaSpecial {
if host == m {
@@ -777,7 +815,7 @@ func resolveHostGo(host string, cfg dnsConfig, metaSpecial []string, wildcards w
if primaryViaSmartDNS {
policy = wildcardDNSAttemptPolicy(len(dnsList))
}
ips, stats := digAWithPolicy(host, dnsList, timeout, logf, policy)
ips, stats := digAWithPolicy(host, dnsList, timeout, logf, policy, cooldown)
if len(ips) == 0 &&
!primaryViaSmartDNS &&
cfg.SmartDNS != "" &&
@@ -795,7 +833,7 @@ func resolveHostGo(host string, cfg dnsConfig, metaSpecial []string, wildcards w
)
}
fallbackPolicy := wildcardDNSAttemptPolicy(1)
fallbackIPs, fallbackStats := digAWithPolicy(host, []string{cfg.SmartDNS}, timeout, logf, fallbackPolicy)
fallbackIPs, fallbackStats := digAWithPolicy(host, []string{cfg.SmartDNS}, timeout, logf, fallbackPolicy, cooldown)
stats.merge(fallbackStats)
if len(fallbackIPs) > 0 {
ips = fallbackIPs
@@ -938,7 +976,7 @@ func runTimeoutQuarantineRecheck(
go func() {
for host := range jobs {
src := cacheSourceForHost(host)
ips, dnsStats := resolveHostGo(host, cfg, metaSpecial, wildcards, timeout, nil)
ips, dnsStats := resolveHostGo(host, cfg, metaSpecial, wildcards, timeout, nil, nil)
results <- result{host: host, source: src, ips: ips, dns: dnsStats}
}
}()
@@ -992,10 +1030,10 @@ func runTimeoutQuarantineRecheck(
// RU: `digA` - содержит основную логику для dig a.
// ---------------------------------------------------------------------
func digA(host string, dnsList []string, timeout time.Duration, logf func(string, ...any)) ([]string, dnsMetrics) {
return digAWithPolicy(host, dnsList, timeout, logf, defaultDNSAttemptPolicy(len(dnsList)))
return digAWithPolicy(host, dnsList, timeout, logf, defaultDNSAttemptPolicy(len(dnsList)), nil)
}
func digAWithPolicy(host string, dnsList []string, timeout time.Duration, logf func(string, ...any), policy dnsAttemptPolicy) ([]string, dnsMetrics) {
func digAWithPolicy(host string, dnsList []string, timeout time.Duration, logf func(string, ...any), policy dnsAttemptPolicy, cooldown *dnsRunCooldown) ([]string, dnsMetrics) {
stats := dnsMetrics{}
if len(dnsList) == 0 {
return nil, stats
@@ -1035,6 +1073,10 @@ func digAWithPolicy(host string, dnsList []string, timeout time.Duration, logf f
port = "53"
}
addr := net.JoinHostPort(server, port)
if cooldown != nil && cooldown.shouldSkip(addr, time.Now().Unix()) {
stats.addCooldownSkip(addr)
continue
}
r := &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
@@ -1055,6 +1097,11 @@ func digAWithPolicy(host string, dnsList []string, timeout time.Duration, logf f
if err != nil {
kind := classifyDNSError(err)
stats.addError(addr, kind)
if cooldown != nil {
if banned, banSec := cooldown.observeError(addr, kind, time.Now().Unix()); banned && logf != nil {
logf("dns cooldown ban %s: timeout-like failures; ban_sec=%d", addr, banSec)
}
}
if logf != nil {
logf("dns warn %s via %s: kind=%s attempt=%d/%d err=%v", host, addr, kind, attempt+1, tryLimit, err)
}
@@ -1075,12 +1122,18 @@ func digAWithPolicy(host string, dnsList []string, timeout time.Duration, logf f
}
if len(ips) == 0 {
stats.addError(addr, dnsErrorOther)
if cooldown != nil {
_, _ = cooldown.observeError(addr, dnsErrorOther, time.Now().Unix())
}
if logf != nil {
logf("dns warn %s via %s: kind=other err=no_public_ips", host, addr)
}
continue
}
stats.addSuccess(addr)
if cooldown != nil {
cooldown.observeSuccess(addr)
}
return uniqueStrings(ips), stats
}
return nil, stats
@@ -1176,6 +1229,142 @@ func resolveNXHardQuarantineEnabled() bool {
}
}
func newDNSRunCooldown() *dnsRunCooldown {
enabled := true
switch strings.ToLower(strings.TrimSpace(os.Getenv("RESOLVE_DNS_COOLDOWN_ENABLED"))) {
case "0", "false", "no", "off":
enabled = false
}
c := &dnsRunCooldown{
enabled: enabled,
minAttempts: envInt("RESOLVE_DNS_COOLDOWN_MIN_ATTEMPTS", 300),
timeoutRatePct: envInt("RESOLVE_DNS_COOLDOWN_TIMEOUT_RATE_PCT", 70),
failStreak: envInt("RESOLVE_DNS_COOLDOWN_FAIL_STREAK", 25),
banSec: envInt("RESOLVE_DNS_COOLDOWN_BAN_SEC", 60),
maxBanSec: envInt("RESOLVE_DNS_COOLDOWN_MAX_BAN_SEC", 300),
temporaryAsError: true,
byUpstream: map[string]*dnsCooldownState{},
}
if c.minAttempts < 50 {
c.minAttempts = 50
}
if c.minAttempts > 2000 {
c.minAttempts = 2000
}
if c.timeoutRatePct < 40 {
c.timeoutRatePct = 40
}
if c.timeoutRatePct > 95 {
c.timeoutRatePct = 95
}
if c.failStreak < 8 {
c.failStreak = 8
}
if c.failStreak > 200 {
c.failStreak = 200
}
if c.banSec < 10 {
c.banSec = 10
}
if c.banSec > 3600 {
c.banSec = 3600
}
if c.maxBanSec < c.banSec {
c.maxBanSec = c.banSec
}
if c.maxBanSec > 3600 {
c.maxBanSec = 3600
}
return c
}
func (c *dnsRunCooldown) configSnapshot() (enabled bool, minAttempts, timeoutRatePct, failStreak, banSec, maxBanSec int) {
if c == nil {
return false, 0, 0, 0, 0, 0
}
return c.enabled, c.minAttempts, c.timeoutRatePct, c.failStreak, c.banSec, c.maxBanSec
}
func (c *dnsRunCooldown) stateFor(upstream string) *dnsCooldownState {
if c.byUpstream == nil {
c.byUpstream = map[string]*dnsCooldownState{}
}
st, ok := c.byUpstream[upstream]
if ok {
return st
}
st = &dnsCooldownState{}
c.byUpstream[upstream] = st
return st
}
func (c *dnsRunCooldown) shouldSkip(upstream string, now int64) bool {
if c == nil || !c.enabled {
return false
}
c.mu.Lock()
defer c.mu.Unlock()
st := c.stateFor(upstream)
return st.BanUntil > now
}
func (c *dnsRunCooldown) observeSuccess(upstream string) {
if c == nil || !c.enabled {
return
}
c.mu.Lock()
defer c.mu.Unlock()
st := c.stateFor(upstream)
st.Attempts++
st.FailStreak = 0
}
func (c *dnsRunCooldown) observeError(upstream string, kind dnsErrorKind, now int64) (bool, int) {
if c == nil || !c.enabled {
return false, 0
}
c.mu.Lock()
defer c.mu.Unlock()
st := c.stateFor(upstream)
st.Attempts++
timeoutLike := kind == dnsErrorTimeout || (c.temporaryAsError && kind == dnsErrorTemporary)
if timeoutLike {
st.TimeoutLike++
st.FailStreak++
} else {
st.FailStreak = 0
return false, 0
}
if st.BanUntil > now {
return false, 0
}
rateBan := st.Attempts >= c.minAttempts && (st.TimeoutLike*100 >= c.timeoutRatePct*st.Attempts)
streakBan := st.FailStreak >= c.failStreak
if !rateBan && !streakBan {
return false, 0
}
st.BanLevel++
dur := c.banSec
if st.BanLevel > 1 {
for i := 1; i < st.BanLevel; i++ {
dur *= 2
if dur >= c.maxBanSec {
dur = c.maxBanSec
break
}
}
}
if dur > c.maxBanSec {
dur = c.maxBanSec
}
st.BanUntil = now + int64(dur)
st.FailStreak = 0
return true, dur
}
func resolvePrecheckForceEnvEnabled() bool {
switch strings.ToLower(strings.TrimSpace(os.Getenv("RESOLVE_PRECHECK_FORCE"))) {
case "1", "true", "yes", "on":

View File

@@ -666,6 +666,7 @@ class DashboardController:
q_hits = int(pairs.get("quarantine_hits", 0))
dns_attempts = int(pairs.get("dns_attempts", 0))
dns_timeout = int(pairs.get("dns_timeout", 0))
dns_cooldown_skips = int(pairs.get("dns_cooldown_skips", 0))
r_checked = int(pairs.get("timeout_recheck_checked", 0))
r_recovered = int(pairs.get("timeout_recheck_recovered", 0))
@@ -678,7 +679,8 @@ class DashboardController:
f"Resolve: ips={unique_ips} (direct={direct_ips}, wildcard={wildcard_ips}, "
f"+recheck_ips={r_recovered_ips}) | unresolved={unresolved} "
f"(live={unresolved_live}, suppressed={unresolved_suppressed}) | "
f"quarantine_hits={q_hits} | dns_timeout={dns_timeout} | attempts={dns_attempts}"
f"quarantine_hits={q_hits} | dns_timeout={dns_timeout} "
f"| cooldown_skips={dns_cooldown_skips} | attempts={dns_attempts}"
)
recheck_text = (
f"Timeout recheck: checked={r_checked} recovered={r_recovered} "