From 61ccad952a8932f5a0e489b66ddddccf176bc4b6 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 13 Feb 2026 13:40:33 +0200 Subject: [PATCH] Improved health check , plus bug fixing --- pkg/gateway/gateway.go | 9 ++ pkg/gateway/namespace_health.go | 260 ++++++++++++++++++++++++++++++++ pkg/gateway/status_handlers.go | 5 + pkg/node/dns_registration.go | 15 ++ 4 files changed, 289 insertions(+) create mode 100644 pkg/gateway/namespace_health.go diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 712f86b..9df7478 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -150,6 +150,9 @@ type Gateway struct { // Shared HTTP transport for proxy connections (connection pooling) proxyTransport *http.Transport + + // Namespace health state (local service probes + hourly reconciliation) + nsHealth *namespaceHealthState } // localSubscriber represents a WebSocket subscriber for local message delivery @@ -583,6 +586,12 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { zap.String("node_id", cfg.NodePeerID)) } + // Start namespace health monitoring loop (local probes every 30s, reconciliation every 1h) + if cfg.NodePeerID != "" && deps.SQLDB != nil { + go gw.startNamespaceHealthLoop(context.Background()) + logger.ComponentInfo(logging.ComponentGeneral, "Namespace health monitor started") + } + logger.ComponentInfo(logging.ComponentGeneral, "Gateway creation completed") return gw, nil } diff --git a/pkg/gateway/namespace_health.go b/pkg/gateway/namespace_health.go new file mode 100644 index 0000000..75b508f --- /dev/null +++ b/pkg/gateway/namespace_health.go @@ -0,0 +1,260 @@ +package gateway + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "sync" + "time" + + "github.com/DeBrosOfficial/network/pkg/logging" + "go.uber.org/zap" +) + +// NamespaceServiceHealth represents the health of a single namespace service. +type NamespaceServiceHealth struct { + Status string `json:"status"` + Port int `json:"port"` + Latency string `json:"latency,omitempty"` + Error string `json:"error,omitempty"` +} + +// NamespaceHealth represents the health of a namespace on this node. +type NamespaceHealth struct { + Status string `json:"status"` // "healthy", "degraded", "unhealthy" + Services map[string]NamespaceServiceHealth `json:"services"` +} + +// namespaceHealthState holds the cached namespace health data. +type namespaceHealthState struct { + mu sync.RWMutex + cache map[string]*NamespaceHealth // namespace_name → health +} + +// startNamespaceHealthLoop runs two periodic tasks: +// 1. Every 30s: probe local namespace services and cache health state +// 2. Every 1h: (leader-only) check for under-provisioned namespaces and trigger repair +func (g *Gateway) startNamespaceHealthLoop(ctx context.Context) { + g.nsHealth = &namespaceHealthState{ + cache: make(map[string]*NamespaceHealth), + } + + probeTicker := time.NewTicker(30 * time.Second) + reconcileTicker := time.NewTicker(1 * time.Hour) + defer probeTicker.Stop() + defer reconcileTicker.Stop() + + // Initial probe after a short delay (let services start) + time.Sleep(5 * time.Second) + g.probeLocalNamespaces(ctx) + + for { + select { + case <-ctx.Done(): + return + case <-probeTicker.C: + g.probeLocalNamespaces(ctx) + case <-reconcileTicker.C: + g.reconcileNamespaces(ctx) + } + } +} + +// getNamespaceHealth returns the cached namespace health for the /v1/health response. +func (g *Gateway) getNamespaceHealth() map[string]*NamespaceHealth { + if g.nsHealth == nil { + return nil + } + g.nsHealth.mu.RLock() + defer g.nsHealth.mu.RUnlock() + + if len(g.nsHealth.cache) == 0 { + return nil + } + + // Return a copy to avoid data races + result := make(map[string]*NamespaceHealth, len(g.nsHealth.cache)) + for k, v := range g.nsHealth.cache { + result[k] = v + } + return result +} + +// probeLocalNamespaces discovers which namespaces this node hosts and checks their services. +func (g *Gateway) probeLocalNamespaces(ctx context.Context) { + if g.sqlDB == nil || g.nodePeerID == "" { + return + } + + query := ` + SELECT nc.namespace_name, npa.rqlite_http_port, npa.olric_memberlist_port, npa.gateway_http_port + FROM namespace_port_allocations npa + JOIN namespace_clusters nc ON npa.namespace_cluster_id = nc.id + WHERE npa.node_id = ? AND nc.status = 'ready' + ` + rows, err := g.sqlDB.QueryContext(ctx, query, g.nodePeerID) + if err != nil { + g.logger.ComponentWarn(logging.ComponentGeneral, "Failed to query local namespace allocations", + zap.Error(err)) + return + } + defer rows.Close() + + health := make(map[string]*NamespaceHealth) + for rows.Next() { + var name string + var rqlitePort, olricPort, gatewayPort int + if err := rows.Scan(&name, &rqlitePort, &olricPort, &gatewayPort); err != nil { + continue + } + + nsHealth := &NamespaceHealth{ + Services: make(map[string]NamespaceServiceHealth), + } + + // Probe RQLite (HTTP on localhost) + nsHealth.Services["rqlite"] = probeTCP("127.0.0.1", rqlitePort) + + // Probe Olric memberlist (binds to WireGuard IP) + olricHost := g.localWireGuardIP + if olricHost == "" { + olricHost = "127.0.0.1" + } + nsHealth.Services["olric"] = probeTCP(olricHost, olricPort) + + // Probe Gateway (HTTP on all interfaces) + nsHealth.Services["gateway"] = probeTCP("127.0.0.1", gatewayPort) + + // Aggregate status + nsHealth.Status = "healthy" + for _, svc := range nsHealth.Services { + if svc.Status == "error" { + nsHealth.Status = "unhealthy" + break + } + } + + health[name] = nsHealth + } + + g.nsHealth.mu.Lock() + g.nsHealth.cache = health + g.nsHealth.mu.Unlock() +} + +// reconcileNamespaces checks all namespaces for under-provisioning and triggers repair. +// Only runs on the RQLite leader to avoid duplicate repairs. +func (g *Gateway) reconcileNamespaces(ctx context.Context) { + if g.sqlDB == nil || g.nodeRecoverer == nil { + return + } + + // Only the leader should run reconciliation + if !g.isRQLiteLeader(ctx) { + return + } + + g.logger.ComponentInfo(logging.ComponentGeneral, "Running namespace reconciliation check") + + // Query all ready namespaces with their expected and actual node counts + query := ` + SELECT nc.namespace_name, + nc.rqlite_node_count + nc.olric_node_count + nc.gateway_node_count AS expected_services, + (SELECT COUNT(*) FROM namespace_cluster_nodes ncn + WHERE ncn.namespace_cluster_id = nc.id AND ncn.status = 'running') AS actual_services + FROM namespace_clusters nc + WHERE nc.status = 'ready' AND nc.namespace_name != 'default' + ` + rows, err := g.sqlDB.QueryContext(ctx, query) + if err != nil { + g.logger.ComponentWarn(logging.ComponentGeneral, "Failed to query namespaces for reconciliation", + zap.Error(err)) + return + } + defer rows.Close() + + for rows.Next() { + var name string + var expected, actual int + if err := rows.Scan(&name, &expected, &actual); err != nil { + continue + } + + if actual < expected { + g.logger.ComponentWarn(logging.ComponentGeneral, "Namespace under-provisioned, triggering repair", + zap.String("namespace", name), + zap.Int("expected_services", expected), + zap.Int("actual_services", actual), + ) + if err := g.nodeRecoverer.RepairCluster(ctx, name); err != nil { + g.logger.ComponentError(logging.ComponentGeneral, "Namespace repair failed", + zap.String("namespace", name), + zap.Error(err), + ) + } else { + g.logger.ComponentInfo(logging.ComponentGeneral, "Namespace repair completed", + zap.String("namespace", name), + ) + } + } + } +} + +// isRQLiteLeader checks whether this node is the current Raft leader. +func (g *Gateway) isRQLiteLeader(ctx context.Context) bool { + dsn := g.cfg.RQLiteDSN + if dsn == "" { + dsn = "http://localhost:5001" + } + + client := &http.Client{Timeout: 5 * time.Second} + req, err := http.NewRequestWithContext(ctx, http.MethodGet, dsn+"/status", nil) + if err != nil { + return false + } + + resp, err := client.Do(req) + if err != nil { + return false + } + defer resp.Body.Close() + + var status struct { + Store struct { + Raft struct { + State string `json:"state"` + } `json:"raft"` + } `json:"store"` + } + if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { + return false + } + + return status.Store.Raft.State == "Leader" +} + +// probeTCP checks if a port is listening by attempting a TCP connection. +func probeTCP(host string, port int) NamespaceServiceHealth { + start := time.Now() + addr := fmt.Sprintf("%s:%d", host, port) + conn, err := net.DialTimeout("tcp", addr, 2*time.Second) + latency := time.Since(start) + + if err != nil { + return NamespaceServiceHealth{ + Status: "error", + Port: port, + Latency: latency.String(), + Error: "port not reachable", + } + } + conn.Close() + + return NamespaceServiceHealth{ + Status: "ok", + Port: port, + Latency: latency.String(), + } +} diff --git a/pkg/gateway/status_handlers.go b/pkg/gateway/status_handlers.go index c869b38..dc0eced 100644 --- a/pkg/gateway/status_handlers.go +++ b/pkg/gateway/status_handlers.go @@ -176,6 +176,11 @@ func (g *Gateway) healthHandler(w http.ResponseWriter, r *http.Request) { "checks": checks, } + // Include namespace health if available (populated by namespace health loop) + if nsHealth := g.getNamespaceHealth(); nsHealth != nil { + resp["namespaces"] = nsHealth + } + // Cache g.healthCacheMu.Lock() g.healthCache = &cachedHealthResult{ diff --git a/pkg/node/dns_registration.go b/pkg/node/dns_registration.go index d4cb76c..3779d36 100644 --- a/pkg/node/dns_registration.go +++ b/pkg/node/dns_registration.go @@ -379,6 +379,21 @@ func (n *Node) cleanupStaleNodeRecords(ctx context.Context) { zap.String("node_id", nodeID), zap.String("ip", ip), ) + + // Check if the dead node hosted any namespace services + var nsCount int + if err := db.QueryRowContext(ctx, + `SELECT COUNT(DISTINCT nc.namespace_name) FROM namespace_cluster_nodes ncn + JOIN namespace_clusters nc ON ncn.namespace_cluster_id = nc.id + WHERE ncn.node_id = ? AND ncn.status = 'running'`, nodeID, + ).Scan(&nsCount); err == nil && nsCount > 0 { + n.logger.ComponentWarn(logging.ComponentNode, + "Dead node hosted namespace services — reconciliation loop will repair", + zap.String("node_id", nodeID), + zap.String("ip", ip), + zap.Int("affected_namespaces", nsCount), + ) + } } }