221 lines
6.8 KiB
Go
221 lines
6.8 KiB
Go
package app
|
|
|
|
import (
|
|
"encoding/json"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
func handleTransportPoliciesRollbackExec(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
var body TransportPolicyRollbackRequest
|
|
if r.Body != nil {
|
|
defer r.Body.Close()
|
|
if err := json.NewDecoder(io.LimitReader(r.Body, 1<<20)).Decode(&body); err != nil && err != io.EOF {
|
|
http.Error(w, "bad json", http.StatusBadRequest)
|
|
return
|
|
}
|
|
}
|
|
idempotencyKey := normalizeTransportIdempotencyKey(r.Header.Get("Idempotency-Key"))
|
|
requestHash := hashTransportPolicyMutationRequest(body)
|
|
|
|
transportMu.Lock()
|
|
publishRuntimeSnapshot := false
|
|
defer func() {
|
|
transportMu.Unlock()
|
|
if publishRuntimeSnapshot {
|
|
publishTransportRuntimeObservabilitySnapshotChanged("transport_policy_rollback_applied", nil, nil)
|
|
}
|
|
}()
|
|
respond := func(resp TransportPolicyResponse) {
|
|
persistTransportPolicyIdempotencyLocked(transportPolicyIdempotencyRollbackScope, idempotencyKey, requestHash, resp)
|
|
writeJSON(w, http.StatusOK, resp)
|
|
}
|
|
if lookup := lookupTransportPolicyIdempotencyLocked(transportPolicyIdempotencyRollbackScope, idempotencyKey, requestHash); lookup.Replay || lookup.Conflict {
|
|
writeJSON(w, http.StatusOK, lookup.Response)
|
|
return
|
|
}
|
|
|
|
current := loadTransportPolicyState()
|
|
if body.BaseRevision > 0 && body.BaseRevision != current.Revision {
|
|
respond(TransportPolicyResponse{
|
|
OK: false,
|
|
Message: "stale policy revision",
|
|
Code: "POLICY_REVISION_MISMATCH",
|
|
CurrentRevision: current.Revision,
|
|
})
|
|
return
|
|
}
|
|
|
|
snapshot, ok := loadTransportPolicySnapshot()
|
|
if !ok {
|
|
respond(TransportPolicyResponse{
|
|
OK: false,
|
|
Message: "rollback snapshot not found",
|
|
Code: "ROLLBACK_SNAPSHOT_NOT_FOUND",
|
|
})
|
|
return
|
|
}
|
|
|
|
clientsState := loadTransportClientsState()
|
|
clients := transportPolicyClientsWithVirtualTargets(clientsState.Items)
|
|
result := validateTransportPolicy(snapshot.Intents, current.Intents, clients)
|
|
if result.Summary.BlockCount > 0 {
|
|
respond(TransportPolicyResponse{
|
|
OK: false,
|
|
Message: "rollback blocked by policy conflicts",
|
|
Code: "ROLLBACK_BLOCKED",
|
|
Conflicts: result.Conflicts,
|
|
})
|
|
return
|
|
}
|
|
plan, compileConflicts := compileTransportPolicyPlan(result.Normalized, clients, current.Revision+1)
|
|
mergedConflicts := append([]TransportConflictRecord{}, result.Conflicts...)
|
|
if len(compileConflicts) > 0 {
|
|
mergedConflicts = append(mergedConflicts, compileConflicts...)
|
|
mergedConflicts = dedupeTransportConflicts(mergedConflicts)
|
|
sum := summarizeTransportConflicts(mergedConflicts)
|
|
if sum.BlockCount > 0 {
|
|
respond(TransportPolicyResponse{
|
|
OK: false,
|
|
Message: "rollback compile blocked by allocator/interface conflicts",
|
|
Code: "ROLLBACK_COMPILE_BLOCKED",
|
|
Conflicts: mergedConflicts,
|
|
Plan: &plan,
|
|
})
|
|
return
|
|
}
|
|
}
|
|
conflictsSummary := summarizeTransportConflicts(mergedConflicts)
|
|
|
|
// Keep current state as the next rollback candidate.
|
|
if err := saveTransportPolicySnapshot(current); err != nil {
|
|
respond(TransportPolicyResponse{
|
|
OK: false,
|
|
Message: "snapshot save failed: " + err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
next := snapshot
|
|
next.Version = transportStateVersion
|
|
next.Revision = current.Revision + 1
|
|
next.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
|
|
next.Intents = append([]TransportPolicyIntent(nil), result.Normalized...)
|
|
plan.PolicyRevision = next.Revision
|
|
applyID := "rbk-" + newTransportToken(8)
|
|
appliedRuntime, err := applyTransportPolicyDataPlaneAtomicLocked(plan, applyID)
|
|
if err != nil {
|
|
respond(TransportPolicyResponse{
|
|
OK: false,
|
|
Message: "rollback runtime apply failed: " + err.Error(),
|
|
Code: "ROLLBACK_RUNTIME_APPLY_FAILED",
|
|
Plan: &plan,
|
|
})
|
|
return
|
|
}
|
|
healthCheck, updatedClients, clientsChanged := runTransportPolicyHealthCheck(clients, plan, time.Now().UTC())
|
|
if clientsChanged {
|
|
persistClients := transportPolicyPersistableClients(updatedClients)
|
|
if err := saveTransportClientsState(transportClientsState{
|
|
Version: transportStateVersion,
|
|
Items: persistClients,
|
|
}); err != nil {
|
|
_ = rollbackTransportPolicyRuntimeToSnapshot(appliedRuntime)
|
|
respond(TransportPolicyResponse{
|
|
OK: false,
|
|
Message: "rollback health-check client state save failed: " + err.Error(),
|
|
Code: "ROLLBACK_HEALTHCHECK_SAVE_FAILED",
|
|
Plan: &plan,
|
|
HealthCheck: &healthCheck,
|
|
})
|
|
return
|
|
}
|
|
clients = updatedClients
|
|
}
|
|
if !healthCheck.OK {
|
|
rollbackErr := rollbackTransportPolicyRuntimeToSnapshot(appliedRuntime)
|
|
msg := healthCheck.Message
|
|
if rollbackErr != nil {
|
|
msg = strings.TrimSpace(msg + "; runtime rollback failed: " + rollbackErr.Error())
|
|
}
|
|
events.push("transport_policy_healthcheck_failed", map[string]any{
|
|
"apply_id": applyID,
|
|
"policy_revision": next.Revision,
|
|
"rollback": true,
|
|
"failed_count": healthCheck.FailedCount,
|
|
})
|
|
respond(TransportPolicyResponse{
|
|
OK: false,
|
|
Message: msg,
|
|
Code: "ROLLBACK_HEALTHCHECK_FAILED",
|
|
Plan: &plan,
|
|
HealthCheck: &healthCheck,
|
|
})
|
|
return
|
|
}
|
|
events.push("transport_policy_healthcheck_passed", map[string]any{
|
|
"apply_id": applyID,
|
|
"policy_revision": next.Revision,
|
|
"rollback": true,
|
|
"checked_count": healthCheck.CheckedCount,
|
|
})
|
|
|
|
if err := saveTransportPolicyState(next); err != nil {
|
|
respond(TransportPolicyResponse{
|
|
OK: false,
|
|
Message: "policy save failed: " + err.Error(),
|
|
})
|
|
return
|
|
}
|
|
if err := saveTransportPolicyCompilePlan(plan); err != nil {
|
|
respond(TransportPolicyResponse{
|
|
OK: false,
|
|
Message: "policy plan save failed: " + err.Error(),
|
|
Code: "POLICY_PLAN_SAVE_FAILED",
|
|
})
|
|
return
|
|
}
|
|
ownership := buildTransportOwnershipStateFromPlan(plan, next.Revision)
|
|
if err := saveTransportOwnershipState(ownership); err != nil {
|
|
respond(TransportPolicyResponse{
|
|
OK: false,
|
|
Message: "ownership save failed: " + err.Error(),
|
|
Code: "POLICY_OWNERSHIP_SAVE_FAILED",
|
|
})
|
|
return
|
|
}
|
|
|
|
conflicts := TransportConflictState{
|
|
Version: transportStateVersion,
|
|
UpdatedAt: time.Now().UTC().Format(time.RFC3339),
|
|
HasBlocking: conflictsSummary.BlockCount > 0,
|
|
Items: append([]TransportConflictRecord(nil), mergedConflicts...),
|
|
}
|
|
_ = saveTransportConflictsState(conflicts)
|
|
|
|
events.push("transport_policy_applied", map[string]any{
|
|
"apply_id": applyID,
|
|
"policy_revision": next.Revision,
|
|
"rollback": true,
|
|
"iface_count": plan.InterfaceCount,
|
|
"rule_count": plan.RuleCount,
|
|
})
|
|
publishRuntimeSnapshot = true
|
|
|
|
respond(TransportPolicyResponse{
|
|
OK: true,
|
|
Message: "policy rollback applied",
|
|
PolicyRevision: next.Revision,
|
|
ApplyID: applyID,
|
|
RollbackAvailable: true,
|
|
Plan: &plan,
|
|
HealthCheck: &healthCheck,
|
|
})
|
|
}
|