Files
elmprodvpn/selective-vpn-api/app/transport_runtime_observability.go

483 lines
14 KiB
Go

package app
import (
"sort"
"strings"
"time"
)
type transportRuntimeObservabilityEgressLookup func(clientID string) EgressIdentity
func transportRuntimeObservabilitySnapshotResponse(now time.Time) TransportRuntimeObservabilityResponse {
transportMu.Lock()
clientsState := loadTransportClientsState()
ifacesState := loadTransportInterfacesState()
policy := loadTransportPolicyState()
plan := loadTransportPolicyCompilePlan()
ifacesSnapshot := captureTransportInterfacesStateSnapshot(clientsState, ifacesState)
planSnapshot := captureTransportPolicyPlanStateSnapshot(policy, plan)
transportMu.Unlock()
normIfaces, changed := normalizeTransportInterfacesState(ifacesState, clientsState.Items)
if changed {
if err := saveTransportInterfacesIfSnapshotCurrent(ifacesSnapshot, normIfaces); err != nil {
return TransportRuntimeObservabilityResponse{
OK: false,
Code: "TRANSPORT_INTERFACES_SAVE_FAILED",
Message: "interfaces state save failed: " + err.Error(),
}
}
}
policyTargets := transportPolicyClientsWithVirtualTargets(clientsState.Items)
nextPlan, planChanged := compileTransportPolicyPlanForSnapshot(policy, policyTargets, plan)
if planChanged {
_ = saveTransportPlanIfSnapshotCurrent(planSnapshot, nextPlan)
}
return buildTransportRuntimeObservabilityResponse(now, normIfaces.Items, policyTargets, nextPlan)
}
func buildTransportRuntimeObservabilityResponse(
now time.Time,
ifaces []TransportInterface,
clients []TransportClient,
plan TransportPolicyCompilePlan,
) TransportRuntimeObservabilityResponse {
items := buildTransportRuntimeObservabilityItems(
ifaces,
clients,
plan,
transportRuntimeObservabilityEgressSnapshot,
now,
)
return TransportRuntimeObservabilityResponse{
OK: true,
Message: "ok",
GeneratedAt: now.Format(time.RFC3339),
Count: len(items),
Items: items,
}
}
func buildTransportRuntimeObservabilityItems(
ifaces []TransportInterface,
clients []TransportClient,
plan TransportPolicyCompilePlan,
egressLookup transportRuntimeObservabilityEgressLookup,
now time.Time,
) []TransportRuntimeObservabilityItem {
realClients, virtualClients := splitTransportRuntimeObservabilityClients(clients)
baseItems := buildTransportInterfaceItems(ifaces, realClients)
if len(virtualClients) > 0 {
baseItems = appendTransportVirtualInterfaceItems(baseItems, virtualClients)
}
if len(baseItems) == 0 {
return nil
}
clientsByIface := map[string][]TransportClient{}
for _, client := range realClients {
ifaceID := normalizeTransportIfaceID(client.IfaceID)
clientsByIface[ifaceID] = append(clientsByIface[ifaceID], client)
}
for ifaceID := range clientsByIface {
sort.Slice(clientsByIface[ifaceID], func(i, j int) bool {
return clientsByIface[ifaceID][i].ID < clientsByIface[ifaceID][j].ID
})
}
virtualByID := map[string]TransportClient{}
for _, client := range virtualClients {
id := sanitizeID(client.ID)
if id == "" {
continue
}
virtualByID[id] = client
}
ruleCountByIface := map[string]int{}
ruleCountByClient := map[string]int{}
for _, iface := range plan.Interfaces {
ruleCountByIface[normalizeTransportIfaceID(iface.IfaceID)] = iface.RuleCount
for _, rule := range iface.Rules {
id := sanitizeID(rule.ClientID)
if id == "" {
continue
}
ruleCountByClient[id]++
}
}
out := make([]TransportRuntimeObservabilityItem, 0, len(baseItems))
for _, base := range baseItems {
if virtual, ok := virtualByID[sanitizeID(base.ID)]; ok {
item := buildTransportRuntimeObservabilityVirtualItem(base, virtual, ruleCountByClient, egressLookup, now)
out = append(out, item)
continue
}
members := clientsByIface[normalizeTransportIfaceID(base.ID)]
counters := buildTransportRuntimeObservabilityCounters(members)
counters.RuleCount = ruleCountByIface[normalizeTransportIfaceID(base.ID)]
item := TransportRuntimeObservabilityItem{
IfaceID: base.ID,
Name: strings.TrimSpace(base.Name),
Mode: base.Mode,
RuntimeIface: strings.TrimSpace(base.RuntimeIface),
ActiveIface: strings.TrimSpace(base.RuntimeIface),
NetnsName: strings.TrimSpace(base.NetnsName),
RoutingTable: strings.TrimSpace(base.RoutingTable),
ClientIDs: append([]string(nil), base.ClientIDs...),
Status: string(aggregateTransportRuntimeObservabilityStatus(counters)),
Counters: counters,
EngineCounts: buildTransportRuntimeObservabilityEngineCounts(members),
}
primary, ok := selectTransportRuntimeObservabilityPrimaryClient(members)
if ok {
item.ClientID = primary.ID
if item.RuntimeIface == "" {
item.RuntimeIface = strings.TrimSpace(primary.Iface)
}
if active := strings.TrimSpace(primary.Iface); active != "" {
item.ActiveIface = active
}
if item.RoutingTable == "" {
item.RoutingTable = strings.TrimSpace(primary.RoutingTable)
}
if item.NetnsName == "" && transportNetnsEnabled(primary) {
item.NetnsName = transportNetnsName(primary)
}
item.LatencyMS = primary.Health.LatencyMS
item.LastCheck = strings.TrimSpace(primary.Health.LastCheck)
item.Egress = lookupTransportRuntimeObservabilityEgress(egressLookup, primary.ID)
}
if errClient, ok := selectTransportRuntimeObservabilityErrorClient(members); ok {
item.LastError = transportRuntimeObservabilityClientError(errClient, now)
if item.LastCheck == "" {
item.LastCheck = transportRuntimeObservabilityClientLastCheck(errClient, now)
}
}
if item.LastCheck == "" && ok {
item.LastCheck = transportRuntimeObservabilityClientLastCheck(primary, now)
}
out = append(out, item)
}
return out
}
func splitTransportRuntimeObservabilityClients(clients []TransportClient) ([]TransportClient, []TransportClient) {
if len(clients) == 0 {
return nil, nil
}
realClients := make([]TransportClient, 0, len(clients))
virtualClients := make([]TransportClient, 0, 1)
for _, client := range clients {
if isTransportPolicyVirtualClient(client) {
virtualClients = append(virtualClients, client)
continue
}
realClients = append(realClients, client)
}
sort.Slice(virtualClients, func(i, j int) bool { return virtualClients[i].ID < virtualClients[j].ID })
return realClients, virtualClients
}
func buildTransportRuntimeObservabilityVirtualItem(
base TransportInterfaceItem,
virtual TransportClient,
ruleCountByClient map[string]int,
egressLookup transportRuntimeObservabilityEgressLookup,
now time.Time,
) TransportRuntimeObservabilityItem {
members := []TransportClient{virtual}
counters := buildTransportRuntimeObservabilityCounters(members)
counters.RuleCount = ruleCountByClient[sanitizeID(virtual.ID)]
runtimeIface := strings.TrimSpace(base.RuntimeIface)
if runtimeIface == "" {
runtimeIface = strings.TrimSpace(virtual.Iface)
}
activeIface := strings.TrimSpace(virtual.Iface)
if activeIface == "" {
activeIface = runtimeIface
}
item := TransportRuntimeObservabilityItem{
IfaceID: base.ID,
Name: strings.TrimSpace(base.Name),
Mode: base.Mode,
RuntimeIface: runtimeIface,
ActiveIface: activeIface,
NetnsName: strings.TrimSpace(base.NetnsName),
RoutingTable: strings.TrimSpace(base.RoutingTable),
ClientID: virtual.ID,
ClientIDs: []string{virtual.ID},
Status: string(normalizeTransportStatus(virtual.Status)),
LatencyMS: virtual.Health.LatencyMS,
LastError: transportRuntimeObservabilityClientError(virtual, now),
LastCheck: transportRuntimeObservabilityClientLastCheck(virtual, now),
Egress: lookupTransportRuntimeObservabilityEgress(egressLookup, virtual.ID),
Counters: counters,
EngineCounts: buildTransportRuntimeObservabilityEngineCounts(members),
}
if item.RoutingTable == "" {
item.RoutingTable = strings.TrimSpace(virtual.RoutingTable)
}
if item.Status == "" {
item.Status = string(aggregateTransportRuntimeObservabilityStatus(counters))
}
return item
}
func buildTransportRuntimeObservabilityCounters(clients []TransportClient) TransportRuntimeObservabilityCounters {
counters := TransportRuntimeObservabilityCounters{
ClientCount: len(clients),
}
for _, client := range clients {
if client.Enabled {
counters.EnabledCount++
}
switch normalizeTransportStatus(client.Status) {
case TransportClientUp:
counters.UpCount++
case TransportClientStarting:
counters.StartingCount++
case TransportClientDegraded:
counters.DegradedCount++
default:
counters.DownCount++
}
}
return counters
}
func buildTransportRuntimeObservabilityEngineCounts(clients []TransportClient) []TransportRuntimeObservabilityEngineCounter {
if len(clients) == 0 {
return nil
}
type counts struct {
total int
up int
starting int
degraded int
down int
}
byKind := map[string]counts{}
for _, client := range clients {
kind := strings.TrimSpace(string(client.Kind))
if kind == "" {
kind = "unknown"
}
cur := byKind[kind]
cur.total++
switch normalizeTransportStatus(client.Status) {
case TransportClientUp:
cur.up++
case TransportClientStarting:
cur.starting++
case TransportClientDegraded:
cur.degraded++
default:
cur.down++
}
byKind[kind] = cur
}
kinds := make([]string, 0, len(byKind))
for kind := range byKind {
kinds = append(kinds, kind)
}
sort.Strings(kinds)
out := make([]TransportRuntimeObservabilityEngineCounter, 0, len(kinds))
for _, kind := range kinds {
cur := byKind[kind]
out = append(out, TransportRuntimeObservabilityEngineCounter{
Kind: kind,
Count: cur.total,
UpCount: cur.up,
StartingCount: cur.starting,
DegradedCount: cur.degraded,
DownCount: cur.down,
})
}
return out
}
func aggregateTransportRuntimeObservabilityStatus(counters TransportRuntimeObservabilityCounters) TransportClientStatus {
switch {
case counters.DegradedCount > 0:
return TransportClientDegraded
case counters.UpCount > 0:
return TransportClientUp
case counters.StartingCount > 0:
return TransportClientStarting
default:
return TransportClientDown
}
}
func selectTransportRuntimeObservabilityPrimaryClient(clients []TransportClient) (TransportClient, bool) {
var best TransportClient
found := false
for _, client := range clients {
if !found || preferTransportRuntimeObservabilityPrimaryClient(client, best) {
best = client
found = true
}
}
return best, found
}
func preferTransportRuntimeObservabilityPrimaryClient(candidate, current TransportClient) bool {
candRank := transportRuntimeObservabilityPrimaryRank(candidate)
currRank := transportRuntimeObservabilityPrimaryRank(current)
if candRank != currRank {
return candRank < currRank
}
if preferTransportClient(candidate, current) {
return true
}
if preferTransportClient(current, candidate) {
return false
}
return candidate.ID < current.ID
}
func transportRuntimeObservabilityPrimaryRank(client TransportClient) int {
switch normalizeTransportStatus(client.Status) {
case TransportClientUp:
return 0
case TransportClientDegraded:
return 1
case TransportClientStarting:
return 2
default:
if client.Enabled {
return 3
}
return 4
}
}
func selectTransportRuntimeObservabilityErrorClient(clients []TransportClient) (TransportClient, bool) {
var best TransportClient
found := false
for _, client := range clients {
if transportRuntimeObservabilityClientError(client, time.Time{}) == "" {
continue
}
if !found || preferTransportRuntimeObservabilityErrorClient(client, best) {
best = client
found = true
}
}
return best, found
}
func preferTransportRuntimeObservabilityErrorClient(candidate, current TransportClient) bool {
candRank := transportRuntimeObservabilityErrorRank(candidate)
currRank := transportRuntimeObservabilityErrorRank(current)
if candRank != currRank {
return candRank < currRank
}
if preferTransportClient(candidate, current) {
return true
}
if preferTransportClient(current, candidate) {
return false
}
return candidate.ID < current.ID
}
func transportRuntimeObservabilityErrorRank(client TransportClient) int {
switch normalizeTransportStatus(client.Status) {
case TransportClientDegraded:
return 0
case TransportClientStarting:
return 1
case TransportClientUp:
return 2
default:
return 3
}
}
func transportRuntimeObservabilityClientError(client TransportClient, now time.Time) string {
if msg := strings.TrimSpace(client.Health.LastError); msg != "" {
return msg
}
runtime := transportRuntimeSnapshot(client, fallbackTransportRuntimeObservabilityNow(now))
return strings.TrimSpace(runtime.LastError.Message)
}
func transportRuntimeObservabilityClientLastCheck(client TransportClient, now time.Time) string {
if ts := strings.TrimSpace(client.Health.LastCheck); ts != "" {
return ts
}
runtime := transportRuntimeSnapshot(client, fallbackTransportRuntimeObservabilityNow(now))
if ts := strings.TrimSpace(runtime.LastError.At); ts != "" {
return ts
}
if ts := strings.TrimSpace(client.UpdatedAt); ts != "" {
return ts
}
return ""
}
func fallbackTransportRuntimeObservabilityNow(now time.Time) time.Time {
if !now.IsZero() {
return now
}
return time.Now().UTC()
}
func lookupTransportRuntimeObservabilityEgress(lookup transportRuntimeObservabilityEgressLookup, clientID string) EgressIdentity {
id := sanitizeID(clientID)
if id == "" || lookup == nil {
return EgressIdentity{}
}
return lookup(id)
}
func transportRuntimeObservabilityScopeForClientID(id string) string {
cid := sanitizeID(id)
if cid == "" {
return ""
}
if isTransportPolicyVirtualClientID(cid) {
return transportPolicyTargetAdGuardID
}
return "transport:" + cid
}
func transportRuntimeObservabilityEgressSnapshot(clientID string) EgressIdentity {
id := sanitizeID(clientID)
if id == "" {
return EgressIdentity{}
}
scope := transportRuntimeObservabilityScopeForClientID(id)
if scope == "" {
return EgressIdentity{}
}
item, err := egressIdentitySWR.getSnapshot(scope, false)
if err == nil {
return item
}
source := "transport"
sourceID := id
if scope == transportPolicyTargetAdGuardID {
source = transportPolicyTargetAdGuardID
sourceID = transportPolicyTargetAdGuardID
}
return EgressIdentity{
Scope: scope,
Source: source,
SourceID: sourceID,
Stale: true,
LastError: err.Error(),
}
}