From 359fb5ae040c664c7a91af5878e42bcae67f8a1f Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Tue, 10 Feb 2026 16:40:01 +0200 Subject: [PATCH] Updated health check --- Makefile | 2 +- pkg/client/client.go | 30 ++++-- pkg/client/client_test.go | 15 ++- pkg/gateway/gateway.go | 19 +++- pkg/gateway/middleware.go | 38 +++++--- pkg/gateway/peer_discovery.go | 31 +++--- pkg/gateway/status_handlers.go | 170 +++++++++++++++++++++++++++------ 7 files changed, 241 insertions(+), 64 deletions(-) diff --git a/Makefile b/Makefile index 3d193e4..d57ff9b 100644 --- a/Makefile +++ b/Makefile @@ -86,7 +86,7 @@ test-e2e-quick: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill -VERSION := 0.101.4 +VERSION := 0.101.5 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/client/client.go b/pkg/client/client.go index d152962..6b75f80 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -296,26 +296,40 @@ func (c *Client) Health() (*HealthStatus, error) { c.mu.RLock() defer c.mu.RUnlock() + start := time.Now() status := "healthy" - if !c.connected { + checks := make(map[string]string) + + // Connection (real) + if c.connected { + checks["connection"] = "ok" + } else { + checks["connection"] = "disconnected" status = "unhealthy" } - checks := map[string]string{ - "connection": "ok", - "database": "ok", - "pubsub": "ok", + // LibP2P peers (real) + if c.host != nil { + checks["peers"] = fmt.Sprintf("%d", len(c.host.Network().Peers())) + } else { + checks["peers"] = "0" } - if !c.connected { - checks["connection"] = "disconnected" + // PubSub (real — check if adapter was initialized) + if c.pubsub != nil && c.pubsub.adapter != nil { + checks["pubsub"] = "ok" + } else { + checks["pubsub"] = "unavailable" + if status == "healthy" { + status = "degraded" + } } return &HealthStatus{ Status: status, Checks: checks, LastUpdated: time.Now(), - ResponseTime: time.Millisecond * 10, // Simulated + ResponseTime: time.Since(start), }, nil } diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index c990c7e..2599d8c 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -174,7 +174,7 @@ func TestHealth(t *testing.T) { cfg := &ClientConfig{AppName: "app"} c := &Client{config: cfg} - // default disconnected + // default disconnected → unhealthy h, err := c.Health() if err != nil { t.Fatalf("unexpected error: %v", err) @@ -183,10 +183,17 @@ func TestHealth(t *testing.T) { t.Fatalf("expected unhealthy when not connected, got %q", h.Status) } - // mark connected + // connected but no pubsub → degraded (pubsub not initialized) c.connected = true h2, _ := c.Health() - if h2.Status != "healthy" { - t.Fatalf("expected healthy when connected, got %q", h2.Status) + if h2.Status != "degraded" { + t.Fatalf("expected degraded when connected without pubsub, got %q", h2.Status) + } + + // connected with pubsub → healthy + c.pubsub = &pubSubBridge{client: c, adapter: &pubsub.ClientAdapter{}} + h3, _ := c.Health() + if h3.Status != "healthy" { + t.Fatalf("expected healthy when fully connected, got %q", h3.Status) } } diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 8b59c4d..9ded1aa 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -46,8 +46,9 @@ type Gateway struct { logger *logging.ColoredLogger cfg *Config client client.NetworkClient - nodePeerID string // The node's actual peer ID from its identity file (overrides client's peer ID) - startedAt time.Time + nodePeerID string // The node's actual peer ID from its identity file (overrides client's peer ID) + localWireGuardIP string // WireGuard IP of this node, used to prefer local namespace gateways + startedAt time.Time // rqlite SQL connection and HTTP ORM gateway sqlDB *sql.DB @@ -62,6 +63,10 @@ type Gateway struct { olricMu sync.RWMutex cacheHandlers *cache.CacheHandlers + // Health check result cache (5s TTL) + healthCacheMu sync.RWMutex + healthCache *cachedHealthResult + // IPFS storage client ipfsClient ipfs.IPFSClient storageHandlers *storage.Handlers @@ -251,6 +256,16 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { presenceMembers: make(map[string][]PresenceMember), } + // Resolve local WireGuard IP for local namespace gateway preference + if wgIP, err := GetWireGuardIP(); err == nil { + gw.localWireGuardIP = wgIP + logger.ComponentInfo(logging.ComponentGeneral, "Detected local WireGuard IP for gateway routing", + zap.String("wireguard_ip", wgIP)) + } else { + logger.ComponentWarn(logging.ComponentGeneral, "Could not detect WireGuard IP, local gateway preference disabled", + zap.Error(err)) + } + // Create separate auth client for global RQLite if GlobalRQLiteDSN is provided // This allows namespace gateways to validate API keys against the global database if cfg.GlobalRQLiteDSN != "" && cfg.GlobalRQLiteDSN != cfg.RQLiteDSN { diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index c8d8423..73791ae 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -913,18 +913,34 @@ func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.R return targets[i].ip < targets[j].ip }) - affinityKey := namespaceName + "|" + validatedNamespace - if apiKey := extractAPIKey(r); apiKey != "" { - affinityKey = namespaceName + "|" + apiKey - } else if authz := strings.TrimSpace(r.Header.Get("Authorization")); authz != "" { - affinityKey = namespaceName + "|" + authz - } else { - affinityKey = namespaceName + "|" + getClientIP(r) + // Prefer local gateway if this node is part of the namespace cluster. + // This avoids a WireGuard network hop and eliminates single-point-of-failure + // when a remote gateway node is down. + var selected namespaceGatewayTarget + if g.localWireGuardIP != "" { + for _, t := range targets { + if t.ip == g.localWireGuardIP { + selected = t + break + } + } + } + + // Fall back to consistent hashing for nodes not in the namespace cluster + if selected.ip == "" { + affinityKey := namespaceName + "|" + validatedNamespace + if apiKey := extractAPIKey(r); apiKey != "" { + affinityKey = namespaceName + "|" + apiKey + } else if authz := strings.TrimSpace(r.Header.Get("Authorization")); authz != "" { + affinityKey = namespaceName + "|" + authz + } else { + affinityKey = namespaceName + "|" + getClientIP(r) + } + hasher := fnv.New32a() + _, _ = hasher.Write([]byte(affinityKey)) + targetIdx := int(hasher.Sum32()) % len(targets) + selected = targets[targetIdx] } - hasher := fnv.New32a() - _, _ = hasher.Write([]byte(affinityKey)) - targetIdx := int(hasher.Sum32()) % len(targets) - selected := targets[targetIdx] gatewayIP := selected.ip gatewayPort := selected.port targetHost := gatewayIP + ":" + strconv.Itoa(gatewayPort) diff --git a/pkg/gateway/peer_discovery.go b/pkg/gateway/peer_discovery.go index e6f82d9..404bb3b 100644 --- a/pkg/gateway/peer_discovery.go +++ b/pkg/gateway/peer_discovery.go @@ -336,16 +336,15 @@ func (pd *PeerDiscovery) updateHeartbeat(ctx context.Context) error { return nil } -// getWireGuardIP extracts the WireGuard IP from the WireGuard interface -func (pd *PeerDiscovery) getWireGuardIP() (string, error) { +// GetWireGuardIP detects the local WireGuard IP address using the wg0 network +// interface or the WireGuard config file. It does not require a PeerDiscovery +// instance and can be called from anywhere in the gateway package. +func GetWireGuardIP() (string, error) { // Method 1: Use 'ip addr show wg0' command (works without root) - ip, err := pd.getWireGuardIPFromInterface() + ip, err := getWireGuardIPFromCommand() if err == nil { - pd.logger.Info("Found WireGuard IP from network interface", - zap.String("ip", ip)) return ip, nil } - pd.logger.Debug("Failed to get WireGuard IP from interface", zap.Error(err)) // Method 2: Try to read from WireGuard config file (requires root, may fail) configPath := "/etc/wireguard/wg0.conf" @@ -363,14 +362,24 @@ func (pd *PeerDiscovery) getWireGuardIP() (string, error) { // Remove /24 suffix ip := strings.Split(addrWithCIDR, "/")[0] ip = strings.TrimSpace(ip) - pd.logger.Info("Found WireGuard IP from config", - zap.String("ip", ip)) return ip, nil } } } } - pd.logger.Debug("Failed to read WireGuard config", zap.Error(err)) + + return "", fmt.Errorf("could not determine WireGuard IP") +} + +// getWireGuardIP extracts the WireGuard IP from the WireGuard interface +func (pd *PeerDiscovery) getWireGuardIP() (string, error) { + // Try the standalone methods first (interface + config file) + ip, err := GetWireGuardIP() + if err == nil { + pd.logger.Info("Found WireGuard IP", zap.String("ip", ip)) + return ip, nil + } + pd.logger.Debug("Failed to get WireGuard IP from interface/config", zap.Error(err)) // Method 3: Fallback - Try to get from libp2p host addresses for _, addr := range pd.host.Addrs() { @@ -400,8 +409,8 @@ func (pd *PeerDiscovery) getWireGuardIP() (string, error) { return "", fmt.Errorf("could not determine WireGuard IP") } -// getWireGuardIPFromInterface gets the WireGuard IP using 'ip addr show wg0' -func (pd *PeerDiscovery) getWireGuardIPFromInterface() (string, error) { +// getWireGuardIPFromCommand gets the WireGuard IP using 'ip addr show wg0' +func getWireGuardIPFromCommand() (string, error) { cmd := exec.Command("ip", "addr", "show", "wg0") output, err := cmd.Output() if err != nil { diff --git a/pkg/gateway/status_handlers.go b/pkg/gateway/status_handlers.go index d77779d..f415260 100644 --- a/pkg/gateway/status_handlers.go +++ b/pkg/gateway/status_handlers.go @@ -1,14 +1,12 @@ package gateway import ( - "encoding/json" + "context" "net/http" "strings" "time" "github.com/DeBrosOfficial/network/pkg/client" - "github.com/DeBrosOfficial/network/pkg/logging" - "go.uber.org/zap" ) // Build info (set via -ldflags at build time; defaults for dev) @@ -18,41 +16,159 @@ var ( BuildTime = "" ) -// healthResponse is the JSON structure used by healthHandler -type healthResponse struct { - Status string `json:"status"` - StartedAt time.Time `json:"started_at"` - Uptime string `json:"uptime"` +// checkResult holds the result of a single subsystem health check. +type checkResult struct { + Status string `json:"status"` // "ok", "error", "unavailable" + Latency string `json:"latency,omitempty"` // e.g. "1.2ms" + Error string `json:"error,omitempty"` // set when Status == "error" + Peers int `json:"peers,omitempty"` // libp2p peer count } +// cachedHealthResult caches the aggregate health response for 5 seconds. +type cachedHealthResult struct { + response any + httpStatus int + cachedAt time.Time +} + +const healthCacheTTL = 5 * time.Second + func (g *Gateway) healthHandler(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - server := healthResponse{ - Status: "ok", - StartedAt: g.startedAt, - Uptime: time.Since(g.startedAt).String(), + // Serve from cache if fresh + g.healthCacheMu.RLock() + cached := g.healthCache + g.healthCacheMu.RUnlock() + if cached != nil && time.Since(cached.cachedAt) < healthCacheTTL { + writeJSON(w, cached.httpStatus, cached.response) + return } - var clientHealth *client.HealthStatus - if g.client != nil { - if h, err := g.client.Health(); err == nil { - clientHealth = h + // Run all checks in parallel with a shared 5s timeout + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + type namedResult struct { + name string + result checkResult + } + ch := make(chan namedResult, 4) + + // RQLite + go func() { + nr := namedResult{name: "rqlite"} + if g.sqlDB == nil { + nr.result = checkResult{Status: "unavailable"} } else { - g.logger.ComponentWarn(logging.ComponentClient, "failed to fetch client health", zap.Error(err)) + start := time.Now() + if err := g.sqlDB.PingContext(ctx); err != nil { + nr.result = checkResult{Status: "error", Latency: time.Since(start).String(), Error: err.Error()} + } else { + nr.result = checkResult{Status: "ok", Latency: time.Since(start).String()} + } + } + ch <- nr + }() + + // Olric (thread-safe: can be nil or reconnected in background) + go func() { + nr := namedResult{name: "olric"} + g.olricMu.RLock() + oc := g.olricClient + g.olricMu.RUnlock() + if oc == nil { + nr.result = checkResult{Status: "unavailable"} + } else { + start := time.Now() + if err := oc.Health(ctx); err != nil { + nr.result = checkResult{Status: "error", Latency: time.Since(start).String(), Error: err.Error()} + } else { + nr.result = checkResult{Status: "ok", Latency: time.Since(start).String()} + } + } + ch <- nr + }() + + // IPFS + go func() { + nr := namedResult{name: "ipfs"} + if g.ipfsClient == nil { + nr.result = checkResult{Status: "unavailable"} + } else { + start := time.Now() + if err := g.ipfsClient.Health(ctx); err != nil { + nr.result = checkResult{Status: "error", Latency: time.Since(start).String(), Error: err.Error()} + } else { + nr.result = checkResult{Status: "ok", Latency: time.Since(start).String()} + } + } + ch <- nr + }() + + // LibP2P + go func() { + nr := namedResult{name: "libp2p"} + if g.client == nil { + nr.result = checkResult{Status: "unavailable"} + } else if h := g.client.Host(); h == nil { + nr.result = checkResult{Status: "unavailable"} + } else { + peers := len(h.Network().Peers()) + nr.result = checkResult{Status: "ok", Peers: peers} + } + ch <- nr + }() + + // Collect + checks := make(map[string]checkResult, 4) + for i := 0; i < 4; i++ { + nr := <-ch + checks[nr.name] = nr.result + } + + // Aggregate status. + // Critical: rqlite down → "unhealthy" + // Non-critical (olric, ipfs, libp2p) error → "degraded" + // "unavailable" means the client was never configured — not an error. + overallStatus := "healthy" + if c := checks["rqlite"]; c.Status == "error" { + overallStatus = "unhealthy" + } + if overallStatus == "healthy" { + for name, c := range checks { + if name == "rqlite" { + continue + } + if c.Status == "error" { + overallStatus = "degraded" + break + } } } - resp := struct { - Status string `json:"status"` - Server healthResponse `json:"server"` - Client *client.HealthStatus `json:"client"` - }{ - Status: "ok", - Server: server, - Client: clientHealth, + httpStatus := http.StatusOK + if overallStatus != "healthy" { + httpStatus = http.StatusServiceUnavailable } - _ = json.NewEncoder(w).Encode(resp) + resp := map[string]any{ + "status": overallStatus, + "server": map[string]any{ + "started_at": g.startedAt, + "uptime": time.Since(g.startedAt).String(), + }, + "checks": checks, + } + + // Cache + g.healthCacheMu.Lock() + g.healthCache = &cachedHealthResult{ + response: resp, + httpStatus: httpStatus, + cachedAt: time.Now(), + } + g.healthCacheMu.Unlock() + + writeJSON(w, httpStatus, resp) } // statusHandler aggregates server uptime and network status