package app import ( "encoding/json" "io" "net/http" "strings" "time" ) func handleTransportPoliciesApplyExec(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } var body TransportPolicyApplyRequest if r.Body != nil { defer r.Body.Close() if err := json.NewDecoder(io.LimitReader(r.Body, 2<<20)).Decode(&body); err != nil && err != io.EOF { http.Error(w, "bad json", http.StatusBadRequest) return } } idempotencyKey := normalizeTransportIdempotencyKey(r.Header.Get("Idempotency-Key")) requestHash := hashTransportPolicyMutationRequest(body) transportMu.Lock() publishRuntimeSnapshot := false defer func() { transportMu.Unlock() if publishRuntimeSnapshot { publishTransportRuntimeObservabilitySnapshotChanged("transport_policy_applied", nil, nil) } }() respond := func(resp TransportPolicyResponse) { persistTransportPolicyIdempotencyLocked(transportPolicyIdempotencyApplyScope, idempotencyKey, requestHash, resp) writeJSON(w, http.StatusOK, resp) } if lookup := lookupTransportPolicyIdempotencyLocked(transportPolicyIdempotencyApplyScope, idempotencyKey, requestHash); lookup.Replay || lookup.Conflict { writeJSON(w, http.StatusOK, lookup.Response) return } clientsState := loadTransportClientsState() clients := transportPolicyClientsWithVirtualTargets(clientsState.Items) current := loadTransportPolicyState() locks := TransportOwnerLockState{} if transportPolicyKernelConntrackStickyEnabled() { locks = loadTransportOwnerLocksState() } if body.BaseRevision != current.Revision { respond(TransportPolicyResponse{ OK: false, Message: "stale policy revision", Code: "POLICY_REVISION_MISMATCH", CurrentRevision: current.Revision, }) return } result := validateTransportPolicy(body.Intents, current.Intents, clients) ownerSwitchConflicts := detectTransportOwnerSwitchConflicts(current.Intents, result.Normalized) if len(ownerSwitchConflicts) > 0 { result.Conflicts = append(result.Conflicts, ownerSwitchConflicts...) result.Conflicts = dedupeTransportConflicts(result.Conflicts) result.Summary = summarizeTransportConflicts(result.Conflicts) result.Valid = result.Summary.BlockCount == 0 } ownerLockConflicts := detectTransportOwnerLockConflicts(current.Intents, result.Normalized, clients) if len(ownerLockConflicts) > 0 { result.Conflicts = append(result.Conflicts, ownerLockConflicts...) result.Conflicts = dedupeTransportConflicts(result.Conflicts) result.Summary = summarizeTransportConflicts(result.Conflicts) result.Valid = result.Summary.BlockCount == 0 } destinationLockConflicts := detectTransportDestinationLockConflicts(current.Intents, result.Normalized, locks) if len(destinationLockConflicts) > 0 { result.Conflicts = append(result.Conflicts, destinationLockConflicts...) result.Conflicts = dedupeTransportConflicts(result.Conflicts) result.Summary = summarizeTransportConflicts(result.Conflicts) result.Valid = result.Summary.BlockCount == 0 } overridableBlocks, hardBlocks := splitTransportBlockingConflicts(result.Conflicts) if len(hardBlocks) > 0 { respond(TransportPolicyResponse{ OK: false, Message: "policy has non-overridable blocking conflicts", Code: "POLICY_CONFLICT_BLOCK", Conflicts: result.Conflicts, }) return } if len(overridableBlocks) > 0 && !body.Options.ForceOverride { respond(TransportPolicyResponse{ OK: false, Message: "policy requires explicit force override", Code: "POLICY_FORCE_OVERRIDE_REQUIRED", Conflicts: result.Conflicts, }) return } plan, compileConflicts := compileTransportPolicyPlan(result.Normalized, clients, current.Revision+1) mergedConflicts := append([]TransportConflictRecord{}, result.Conflicts...) if len(compileConflicts) > 0 { mergedConflicts = append(mergedConflicts, compileConflicts...) mergedConflicts = dedupeTransportConflicts(mergedConflicts) sum := summarizeTransportConflicts(mergedConflicts) if sum.BlockCount > 0 { respond(TransportPolicyResponse{ OK: false, Message: "policy compile blocked by allocator/interface conflicts", Code: "POLICY_COMPILE_BLOCK", Conflicts: mergedConflicts, Plan: &plan, }) return } } conflictsSummary := summarizeTransportConflicts(mergedConflicts) digest := digestTransportIntents(result.Normalized) if len(overridableBlocks) > 0 && body.Options.ForceOverride { if !consumeTransportConfirmToken(strings.TrimSpace(body.Options.ConfirmToken), current.Revision, digest) { respond(TransportPolicyResponse{ OK: false, Message: "invalid or expired confirm token", Code: "FORCE_OVERRIDE_CONFIRM_REQUIRED", }) return } } if err := saveTransportPolicySnapshot(current); err != nil { respond(TransportPolicyResponse{ OK: false, Message: "snapshot save failed: " + err.Error(), }) return } next := TransportPolicyState{ Version: transportStateVersion, Revision: current.Revision + 1, UpdatedAt: time.Now().UTC().Format(time.RFC3339), Intents: append([]TransportPolicyIntent(nil), result.Normalized...), } plan.PolicyRevision = next.Revision applyID := "apl-" + newTransportToken(8) appliedRuntime, err := applyTransportPolicyDataPlaneAtomicLocked(plan, applyID) if err != nil { respond(TransportPolicyResponse{ OK: false, Message: "policy runtime apply failed: " + err.Error(), Code: "POLICY_RUNTIME_APPLY_FAILED", Plan: &plan, }) return } healthCheck, updatedClients, clientsChanged := runTransportPolicyHealthCheck(clients, plan, time.Now().UTC()) if clientsChanged { persistClients := transportPolicyPersistableClients(updatedClients) if err := saveTransportClientsState(transportClientsState{ Version: transportStateVersion, Items: persistClients, }); err != nil { _ = rollbackTransportPolicyRuntimeToSnapshot(appliedRuntime) respond(TransportPolicyResponse{ OK: false, Message: "health-check client state save failed: " + err.Error(), Code: "POLICY_HEALTHCHECK_SAVE_FAILED", Plan: &plan, HealthCheck: &healthCheck, }) return } clients = updatedClients } if !healthCheck.OK { rollbackErr := rollbackTransportPolicyRuntimeToSnapshot(appliedRuntime) msg := healthCheck.Message if rollbackErr != nil { msg = strings.TrimSpace(msg + "; runtime rollback failed: " + rollbackErr.Error()) } events.push("transport_policy_healthcheck_failed", map[string]any{ "apply_id": applyID, "policy_revision": next.Revision, "failed_count": healthCheck.FailedCount, }) respond(TransportPolicyResponse{ OK: false, Message: msg, Code: "POLICY_HEALTHCHECK_FAILED", Plan: &plan, HealthCheck: &healthCheck, }) return } events.push("transport_policy_healthcheck_passed", map[string]any{ "apply_id": applyID, "policy_revision": next.Revision, "checked_count": healthCheck.CheckedCount, }) if err := saveTransportPolicyState(next); err != nil { respond(TransportPolicyResponse{ OK: false, Message: "policy save failed: " + err.Error(), }) return } if err := saveTransportPolicyCompilePlan(plan); err != nil { respond(TransportPolicyResponse{ OK: false, Message: "policy plan save failed: " + err.Error(), Code: "POLICY_PLAN_SAVE_FAILED", }) return } ownership := buildTransportOwnershipStateFromPlan(plan, next.Revision) if err := saveTransportOwnershipState(ownership); err != nil { respond(TransportPolicyResponse{ OK: false, Message: "ownership save failed: " + err.Error(), Code: "POLICY_OWNERSHIP_SAVE_FAILED", }) return } conflicts := TransportConflictState{ Version: transportStateVersion, UpdatedAt: time.Now().UTC().Format(time.RFC3339), HasBlocking: conflictsSummary.BlockCount > 0, Items: append([]TransportConflictRecord(nil), mergedConflicts...), } _ = saveTransportConflictsState(conflicts) events.push("transport_policy_applied", map[string]any{ "apply_id": applyID, "policy_revision": next.Revision, "iface_count": plan.InterfaceCount, "rule_count": plan.RuleCount, }) if conflictsSummary.BlockCount > 0 { events.push("transport_conflict_detected", map[string]any{ "count": conflictsSummary.BlockCount, }) } publishRuntimeSnapshot = true respond(TransportPolicyResponse{ OK: true, Message: "policy applied", PolicyRevision: next.Revision, ApplyID: applyID, RollbackAvailable: true, Plan: &plan, HealthCheck: &healthCheck, }) }