From 156de7eb193f9f9229fc063bfd46bcc332cfa049 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Tue, 3 Feb 2026 13:59:03 +0200 Subject: [PATCH] Bug fixing --- pkg/cli/production/upgrade/orchestrator.go | 240 +++++++++++++++++- pkg/environments/production/orchestrator.go | 69 ++--- pkg/gateway/instance_spawner.go | 13 + pkg/gateway/middleware.go | 264 +++++++++++++++++++- pkg/namespace/cluster_manager.go | 187 ++++++++------ pkg/rqlite/cluster.go | 17 +- 6 files changed, 667 insertions(+), 123 deletions(-) diff --git a/pkg/cli/production/upgrade/orchestrator.go b/pkg/cli/production/upgrade/orchestrator.go index b700a22..b3bfea1 100644 --- a/pkg/cli/production/upgrade/orchestrator.go +++ b/pkg/cli/production/upgrade/orchestrator.go @@ -1,8 +1,11 @@ package upgrade import ( + "encoding/json" "fmt" + "io" "net" + "net/http" "os" "os/exec" "path/filepath" @@ -206,7 +209,128 @@ func (o *Orchestrator) handleBranchPreferences() error { return nil } +// ClusterState represents the saved state of the RQLite cluster before shutdown +type ClusterState struct { + Nodes []ClusterNode `json:"nodes"` + CapturedAt time.Time `json:"captured_at"` +} + +// ClusterNode represents a node in the cluster +type ClusterNode struct { + ID string `json:"id"` + Address string `json:"address"` + Voter bool `json:"voter"` + Reachable bool `json:"reachable"` +} + +// captureClusterState saves the current RQLite cluster state before stopping services +// This allows nodes to recover cluster membership faster after restart +func (o *Orchestrator) captureClusterState() error { + fmt.Printf("\n📸 Capturing cluster state before shutdown...\n") + + // Query RQLite /nodes endpoint to get current cluster membership + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Get("http://localhost:5001/nodes?timeout=3s") + if err != nil { + fmt.Printf(" âš ī¸ Could not query cluster state: %v\n", err) + return nil // Non-fatal - continue with upgrade + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + fmt.Printf(" âš ī¸ RQLite returned status %d\n", resp.StatusCode) + return nil + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Printf(" âš ī¸ Could not read cluster state: %v\n", err) + return nil + } + + // Parse the nodes response + var nodes map[string]struct { + Addr string `json:"addr"` + Voter bool `json:"voter"` + Reachable bool `json:"reachable"` + } + if err := json.Unmarshal(body, &nodes); err != nil { + fmt.Printf(" âš ī¸ Could not parse cluster state: %v\n", err) + return nil + } + + // Build cluster state + state := ClusterState{ + Nodes: make([]ClusterNode, 0, len(nodes)), + CapturedAt: time.Now(), + } + + for id, node := range nodes { + state.Nodes = append(state.Nodes, ClusterNode{ + ID: id, + Address: node.Addr, + Voter: node.Voter, + Reachable: node.Reachable, + }) + fmt.Printf(" Found node: %s (voter=%v, reachable=%v)\n", id, node.Voter, node.Reachable) + } + + // Save to file + stateFile := filepath.Join(o.oramaDir, "cluster-state.json") + data, err := json.MarshalIndent(state, "", " ") + if err != nil { + fmt.Printf(" âš ī¸ Could not marshal cluster state: %v\n", err) + return nil + } + + if err := os.WriteFile(stateFile, data, 0644); err != nil { + fmt.Printf(" âš ī¸ Could not save cluster state: %v\n", err) + return nil + } + + fmt.Printf(" ✓ Cluster state saved (%d nodes) to %s\n", len(state.Nodes), stateFile) + + // Also write peers.json directly for RQLite recovery + if err := o.writePeersJSONFromState(state); err != nil { + fmt.Printf(" âš ī¸ Could not write peers.json: %v\n", err) + } else { + fmt.Printf(" ✓ peers.json written for cluster recovery\n") + } + + return nil +} + +// writePeersJSONFromState writes RQLite's peers.json file from captured cluster state +func (o *Orchestrator) writePeersJSONFromState(state ClusterState) error { + // Build peers.json format + peers := make([]map[string]interface{}, 0, len(state.Nodes)) + for _, node := range state.Nodes { + peers = append(peers, map[string]interface{}{ + "id": node.ID, + "address": node.ID, // RQLite uses raft address as both id and address + "non_voter": !node.Voter, + }) + } + + data, err := json.MarshalIndent(peers, "", " ") + if err != nil { + return err + } + + // Write to RQLite's raft directory + raftDir := filepath.Join(o.oramaHome, ".orama", "data", "rqlite", "raft") + if err := os.MkdirAll(raftDir, 0755); err != nil { + return err + } + + peersFile := filepath.Join(raftDir, "peers.json") + return os.WriteFile(peersFile, data, 0644) +} + func (o *Orchestrator) stopServices() error { + // Capture cluster state BEFORE stopping services + _ = o.captureClusterState() + fmt.Printf("\nâšī¸ Stopping all services before upgrade...\n") serviceController := production.NewSystemdController() // Stop services in reverse dependency order @@ -395,13 +519,14 @@ func (o *Orchestrator) regenerateConfigs() error { } func (o *Orchestrator) restartServices() error { - fmt.Printf(" Restarting services...\n") + fmt.Printf("\n🔄 Restarting services with rolling restart...\n") + // Reload systemd daemon if err := exec.Command("systemctl", "daemon-reload").Run(); err != nil { fmt.Fprintf(os.Stderr, " âš ī¸ Warning: Failed to reload systemd daemon: %v\n", err) } - // Restart services to apply changes - use getProductionServices to only restart existing services + // Get services to restart services := utils.GetProductionServices() // If this is a nameserver, also restart CoreDNS and Caddy @@ -417,23 +542,71 @@ func (o *Orchestrator) restartServices() error { if len(services) == 0 { fmt.Printf(" âš ī¸ No services found to restart\n") - } else { + return nil + } + + // Define the order for rolling restart - node service first (contains RQLite) + // This ensures the cluster can reform before other services start + priorityOrder := []string{ + "debros-node", // Start node first - contains RQLite cluster + "debros-olric", // Distributed cache + "debros-ipfs", // IPFS daemon + "debros-ipfs-cluster", // IPFS cluster + "debros-gateway", // Gateway (legacy) + "coredns", // DNS server + "caddy", // Reverse proxy + } + + // Restart services in priority order with health checks + for _, priority := range priorityOrder { for _, svc := range services { + if svc == priority { + fmt.Printf(" Starting %s...\n", svc) + if err := exec.Command("systemctl", "restart", svc).Run(); err != nil { + fmt.Printf(" âš ī¸ Failed to restart %s: %v\n", svc, err) + continue + } + fmt.Printf(" ✓ Started %s\n", svc) + + // For the node service, wait for RQLite cluster health + if svc == "debros-node" { + fmt.Printf(" Waiting for RQLite cluster to become healthy...\n") + if err := o.waitForClusterHealth(2 * time.Minute); err != nil { + fmt.Printf(" âš ī¸ Cluster health check warning: %v\n", err) + fmt.Printf(" Continuing with restart (cluster may recover)...\n") + } else { + fmt.Printf(" ✓ RQLite cluster is healthy\n") + } + } + break + } + } + } + + // Start any remaining services not in priority list + for _, svc := range services { + found := false + for _, priority := range priorityOrder { + if svc == priority { + found = true + break + } + } + if !found { + fmt.Printf(" Starting %s...\n", svc) if err := exec.Command("systemctl", "restart", svc).Run(); err != nil { fmt.Printf(" âš ī¸ Failed to restart %s: %v\n", svc, err) } else { - fmt.Printf(" ✓ Restarted %s\n", svc) + fmt.Printf(" ✓ Started %s\n", svc) } } - fmt.Printf(" ✓ All services restarted\n") } + fmt.Printf(" ✓ All services restarted\n") + // Seed DNS records after services are running (RQLite must be up) if o.setup.IsNameserver() { fmt.Printf(" Seeding DNS records...\n") - // Wait for RQLite to fully start - it takes about 10 seconds to initialize - fmt.Printf(" Waiting for RQLite to start (10s)...\n") - time.Sleep(10 * time.Second) _, _, baseDomain := o.extractGatewayConfig() peers := o.extractPeers() @@ -448,3 +621,54 @@ func (o *Orchestrator) restartServices() error { return nil } + +// waitForClusterHealth waits for the RQLite cluster to become healthy +func (o *Orchestrator) waitForClusterHealth(timeout time.Duration) error { + client := &http.Client{Timeout: 5 * time.Second} + deadline := time.Now().Add(timeout) + + for time.Now().Before(deadline) { + // Query RQLite status + resp, err := client.Get("http://localhost:5001/status") + if err != nil { + time.Sleep(2 * time.Second) + continue + } + + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + time.Sleep(2 * time.Second) + continue + } + + // Parse status response + var status struct { + Store struct { + Raft struct { + State string `json:"state"` + NumPeers int `json:"num_peers"` + } `json:"raft"` + } `json:"store"` + } + + if err := json.Unmarshal(body, &status); err != nil { + time.Sleep(2 * time.Second) + continue + } + + raftState := status.Store.Raft.State + numPeers := status.Store.Raft.NumPeers + + // Cluster is healthy if we're a Leader or Follower (not Candidate) + if raftState == "Leader" || raftState == "Follower" { + fmt.Printf(" RQLite state: %s (peers: %d)\n", raftState, numPeers) + return nil + } + + fmt.Printf(" RQLite state: %s (waiting for Leader/Follower)...\n", raftState) + time.Sleep(3 * time.Second) + } + + return fmt.Errorf("timeout waiting for cluster to become healthy") +} diff --git a/pkg/environments/production/orchestrator.go b/pkg/environments/production/orchestrator.go index 60e6850..3b92c61 100644 --- a/pkg/environments/production/orchestrator.go +++ b/pkg/environments/production/orchestrator.go @@ -356,16 +356,16 @@ func (ps *ProductionSetup) Phase2bInstallBinaries() error { return fmt.Errorf("failed to install DeBros binaries: %w", err) } - // Install CoreDNS and Caddy only if this is a nameserver node + // Install CoreDNS only for nameserver nodes if ps.isNameserver { if err := ps.binaryInstaller.InstallCoreDNS(); err != nil { ps.logf(" âš ī¸ CoreDNS install warning: %v", err) } - if err := ps.binaryInstaller.InstallCaddy(); err != nil { - ps.logf(" âš ī¸ Caddy install warning: %v", err) - } - } else { - ps.logf(" â„šī¸ Skipping CoreDNS/Caddy (not a nameserver node)") + } + + // Install Caddy on ALL nodes (any node may host namespaces and need TLS) + if err := ps.binaryInstaller.InstallCaddy(); err != nil { + ps.logf(" âš ī¸ Caddy install warning: %v", err) } } @@ -687,9 +687,8 @@ func (ps *ProductionSetup) Phase5CreateSystemdServices(enableHTTPS bool) error { ps.logf(" ✓ Anyone Relay service created (operator mode, ORPort: %d)", ps.anyoneRelayConfig.ORPort) } - // CoreDNS and Caddy services (only for nameserver nodes) + // CoreDNS service (only for nameserver nodes) if ps.isNameserver { - // CoreDNS service (for dynamic DNS with RQLite) if _, err := os.Stat("/usr/local/bin/coredns"); err == nil { corednsUnit := ps.serviceGenerator.GenerateCoreDNSService() if err := ps.serviceController.WriteServiceUnit("coredns.service", corednsUnit); err != nil { @@ -698,22 +697,22 @@ func (ps *ProductionSetup) Phase5CreateSystemdServices(enableHTTPS bool) error { ps.logf(" ✓ CoreDNS service created") } } + } - // Caddy service (for SSL/TLS with DNS-01 ACME challenges) - if _, err := os.Stat("/usr/bin/caddy"); err == nil { - // Create caddy user if it doesn't exist - exec.Command("useradd", "-r", "-m", "-d", "/home/caddy", "-s", "/sbin/nologin", "caddy").Run() - exec.Command("mkdir", "-p", "/var/lib/caddy").Run() - exec.Command("chown", "caddy:caddy", "/var/lib/caddy").Run() - exec.Command("mkdir", "-p", "/home/caddy").Run() - exec.Command("chown", "caddy:caddy", "/home/caddy").Run() + // Caddy service on ALL nodes (any node may host namespaces and need TLS) + if _, err := os.Stat("/usr/bin/caddy"); err == nil { + // Create caddy user if it doesn't exist + exec.Command("useradd", "-r", "-m", "-d", "/home/caddy", "-s", "/sbin/nologin", "caddy").Run() + exec.Command("mkdir", "-p", "/var/lib/caddy").Run() + exec.Command("chown", "caddy:caddy", "/var/lib/caddy").Run() + exec.Command("mkdir", "-p", "/home/caddy").Run() + exec.Command("chown", "caddy:caddy", "/home/caddy").Run() - caddyUnit := ps.serviceGenerator.GenerateCaddyService() - if err := ps.serviceController.WriteServiceUnit("caddy.service", caddyUnit); err != nil { - ps.logf(" âš ī¸ Failed to write Caddy service: %v", err) - } else { - ps.logf(" ✓ Caddy service created") - } + caddyUnit := ps.serviceGenerator.GenerateCaddyService() + if err := ps.serviceController.WriteServiceUnit("caddy.service", caddyUnit); err != nil { + ps.logf(" âš ī¸ Failed to write Caddy service: %v", err) + } else { + ps.logf(" ✓ Caddy service created") } } @@ -733,14 +732,15 @@ func (ps *ProductionSetup) Phase5CreateSystemdServices(enableHTTPS bool) error { services = append(services, "debros-anyone-relay.service") } - // Add CoreDNS and Caddy only for nameserver nodes + // Add CoreDNS only for nameserver nodes if ps.isNameserver { if _, err := os.Stat("/usr/local/bin/coredns"); err == nil { services = append(services, "coredns.service") } - if _, err := os.Stat("/usr/bin/caddy"); err == nil { - services = append(services, "caddy.service") - } + } + // Add Caddy on ALL nodes (any node may host namespaces and need TLS) + if _, err := os.Stat("/usr/bin/caddy"); err == nil { + services = append(services, "caddy.service") } for _, svc := range services { if err := ps.serviceController.EnableService(svc); err != nil { @@ -796,8 +796,7 @@ func (ps *ProductionSetup) Phase5CreateSystemdServices(enableHTTPS bool) error { ps.logf(" - debros-node.service started (with embedded gateway)") } - // Start CoreDNS and Caddy (nameserver nodes only) - // Caddy depends on debros-node.service (gateway on :6001), so start after node + // Start CoreDNS (nameserver nodes only) if ps.isNameserver { if _, err := os.Stat("/usr/local/bin/coredns"); err == nil { if err := ps.serviceController.RestartService("coredns.service"); err != nil { @@ -806,12 +805,14 @@ func (ps *ProductionSetup) Phase5CreateSystemdServices(enableHTTPS bool) error { ps.logf(" - coredns.service started") } } - if _, err := os.Stat("/usr/bin/caddy"); err == nil { - if err := ps.serviceController.RestartService("caddy.service"); err != nil { - ps.logf(" âš ī¸ Failed to start caddy.service: %v", err) - } else { - ps.logf(" - caddy.service started") - } + } + // Start Caddy on ALL nodes (any node may host namespaces and need TLS) + // Caddy depends on debros-node.service (gateway on :6001), so start after node + if _, err := os.Stat("/usr/bin/caddy"); err == nil { + if err := ps.serviceController.RestartService("caddy.service"); err != nil { + ps.logf(" âš ī¸ Failed to start caddy.service: %v", err) + } else { + ps.logf(" - caddy.service started") } } diff --git a/pkg/gateway/instance_spawner.go b/pkg/gateway/instance_spawner.go index 1dce319..f208978 100644 --- a/pkg/gateway/instance_spawner.go +++ b/pkg/gateway/instance_spawner.go @@ -80,6 +80,11 @@ type InstanceConfig struct { OlricServers []string // Olric server addresses NodePeerID string // Physical node's peer ID for home node management DataDir string // Data directory for deployments, SQLite, etc. + // IPFS configuration for storage endpoints + IPFSClusterAPIURL string // IPFS Cluster API URL (e.g., "http://localhost:9094") + IPFSAPIURL string // IPFS API URL (e.g., "http://localhost:5001") + IPFSTimeout time.Duration // Timeout for IPFS operations + IPFSReplicationFactor int // IPFS replication factor } // GatewayYAMLConfig represents the gateway YAML configuration structure @@ -275,6 +280,14 @@ func (is *InstanceSpawner) generateConfig(configPath string, cfg InstanceConfig, OlricServers: cfg.OlricServers, // Note: DomainName is used for HTTPS/TLS, not needed for namespace gateways in dev mode DomainName: cfg.BaseDomain, + // IPFS configuration for storage endpoints + IPFSClusterAPIURL: cfg.IPFSClusterAPIURL, + IPFSAPIURL: cfg.IPFSAPIURL, + IPFSReplicationFactor: cfg.IPFSReplicationFactor, + } + // Set IPFS timeout if provided + if cfg.IPFSTimeout > 0 { + gatewayCfg.IPFSTimeout = cfg.IPFSTimeout.String() } data, err := yaml.Marshal(gatewayCfg) diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index c256c61..8da98d9 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -19,6 +19,150 @@ import ( // Note: context keys (ctxKeyAPIKey, ctxKeyJWT, CtxKeyNamespaceOverride) are now defined in context.go +// Internal auth headers for trusted inter-gateway communication. +// When the main gateway proxies to a namespace gateway, it validates auth first +// and passes the validated namespace via these headers. The namespace gateway +// trusts these headers when they come from internal IPs (WireGuard 10.0.0.x). +const ( + // HeaderInternalAuthNamespace contains the validated namespace name + HeaderInternalAuthNamespace = "X-Internal-Auth-Namespace" + // HeaderInternalAuthValidated indicates the request was pre-authenticated by main gateway + HeaderInternalAuthValidated = "X-Internal-Auth-Validated" +) + +// validateAuthForNamespaceProxy validates the request's auth credentials against the MAIN +// cluster RQLite and returns the namespace the credentials belong to. +// This is used by handleNamespaceGatewayRequest to pre-authenticate before proxying to +// namespace gateways (which have isolated RQLites without API keys). +// +// Returns: +// - (namespace, "") if auth is valid +// - ("", errorMessage) if auth is invalid +// - ("", "") if no auth credentials provided (for public paths) +func (g *Gateway) validateAuthForNamespaceProxy(r *http.Request) (namespace string, errMsg string) { + // 1) Try JWT Bearer first + if auth := r.Header.Get("Authorization"); auth != "" { + lower := strings.ToLower(auth) + if strings.HasPrefix(lower, "bearer ") { + tok := strings.TrimSpace(auth[len("Bearer "):]) + if strings.Count(tok, ".") == 2 { + if claims, err := g.authService.ParseAndVerifyJWT(tok); err == nil { + if ns := strings.TrimSpace(claims.Namespace); ns != "" { + return ns, "" + } + } + // JWT verification failed - fall through to API key check + } + } + } + + // 2) Try API key + key := extractAPIKey(r) + if key == "" { + return "", "" // No credentials provided + } + + // Look up API key in main cluster RQLite + db := g.client.Database() + internalCtx := client.WithInternalAuth(r.Context()) + q := "SELECT namespaces.name FROM api_keys JOIN namespaces ON api_keys.namespace_id = namespaces.id WHERE api_keys.key = ? LIMIT 1" + res, err := db.Query(internalCtx, q, key) + if err != nil || res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 { + return "", "invalid API key" + } + + // Extract namespace name + var ns string + if s, ok := res.Rows[0][0].(string); ok { + ns = strings.TrimSpace(s) + } else { + b, _ := json.Marshal(res.Rows[0][0]) + _ = json.Unmarshal(b, &ns) + ns = strings.TrimSpace(ns) + } + if ns == "" { + return "", "invalid API key" + } + + return ns, "" +} + +// isWebSocketUpgrade checks if the request is a WebSocket upgrade request +func isWebSocketUpgrade(r *http.Request) bool { + connection := strings.ToLower(r.Header.Get("Connection")) + upgrade := strings.ToLower(r.Header.Get("Upgrade")) + return strings.Contains(connection, "upgrade") && upgrade == "websocket" +} + +// proxyWebSocket proxies a WebSocket connection by hijacking the client connection +// and tunneling bidirectionally to the backend +func (g *Gateway) proxyWebSocket(w http.ResponseWriter, r *http.Request, targetHost string) bool { + hijacker, ok := w.(http.Hijacker) + if !ok { + http.Error(w, "WebSocket proxy not supported", http.StatusInternalServerError) + return false + } + + // Connect to backend + backendConn, err := net.DialTimeout("tcp", targetHost, 10*time.Second) + if err != nil { + g.logger.ComponentError(logging.ComponentGeneral, "WebSocket backend dial failed", + zap.String("target", targetHost), + zap.Error(err), + ) + http.Error(w, "Backend unavailable", http.StatusServiceUnavailable) + return false + } + + // Write the original request to backend (this initiates the WebSocket handshake) + if err := r.Write(backendConn); err != nil { + backendConn.Close() + g.logger.ComponentError(logging.ComponentGeneral, "WebSocket handshake write failed", + zap.Error(err), + ) + http.Error(w, "Failed to initiate WebSocket", http.StatusBadGateway) + return false + } + + // Hijack client connection + clientConn, clientBuf, err := hijacker.Hijack() + if err != nil { + backendConn.Close() + g.logger.ComponentError(logging.ComponentGeneral, "WebSocket hijack failed", + zap.Error(err), + ) + return false + } + + // Flush any buffered data from the client + if clientBuf.Reader.Buffered() > 0 { + buffered := make([]byte, clientBuf.Reader.Buffered()) + clientBuf.Read(buffered) + backendConn.Write(buffered) + } + + // Bidirectional copy between client and backend + done := make(chan struct{}, 2) + go func() { + defer func() { done <- struct{}{} }() + io.Copy(clientConn, backendConn) + clientConn.Close() + }() + go func() { + defer func() { done <- struct{}{} }() + io.Copy(backendConn, clientConn) + backendConn.Close() + }() + + // Wait for one side to close + <-done + clientConn.Close() + backendConn.Close() + <-done + + return true +} + // withMiddleware adds CORS, security headers, rate limiting, and logging middleware func (g *Gateway) withMiddleware(next http.Handler) http.Handler { // Order: logging -> security headers -> rate limit -> CORS -> domain routing -> auth -> handler @@ -72,6 +216,7 @@ func (g *Gateway) loggingMiddleware(next http.Handler) http.Handler { // - Authorization: Bearer (RS256 issued by this gateway) // - Authorization: Bearer or ApiKey // - X-API-Key: +// - X-Internal-Auth-Validated: true (from internal IPs only - pre-authenticated by main gateway) func (g *Gateway) authMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Allow preflight without auth @@ -82,6 +227,23 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler { isPublic := isPublicPath(r.URL.Path) + // 0) Trust internal auth headers from internal IPs (WireGuard network or localhost) + // This allows the main gateway to pre-authenticate requests before proxying to namespace gateways + if r.Header.Get(HeaderInternalAuthValidated) == "true" { + clientIP := getClientIP(r) + if isInternalIP(clientIP) { + ns := strings.TrimSpace(r.Header.Get(HeaderInternalAuthNamespace)) + if ns != "" { + // Pre-authenticated by main gateway - trust the namespace + reqCtx := context.WithValue(r.Context(), CtxKeyNamespaceOverride, ns) + next.ServeHTTP(w, r.WithContext(reqCtx)) + return + } + } + // If internal auth header is present but invalid (wrong IP or missing namespace), + // fall through to normal auth flow + } + // 1) Try JWT Bearer first if Authorization looks like one if auth := r.Header.Get("Authorization"); auth != "" { lower := strings.ToLower(auth) @@ -588,7 +750,27 @@ func (g *Gateway) domainRoutingMiddleware(next http.Handler) http.Handler { // handleNamespaceGatewayRequest proxies requests to a namespace's dedicated gateway cluster // This enables physical isolation where each namespace has its own RQLite, Olric, and Gateway +// +// IMPORTANT: This function validates auth against the MAIN cluster RQLite before proxying. +// The validated namespace is passed to the namespace gateway via X-Internal-Auth-* headers. +// This is necessary because namespace gateways have their own isolated RQLite that doesn't +// contain API keys (API keys are stored in the main cluster RQLite only). func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.Request, namespaceName string) { + // Validate auth against main cluster RQLite BEFORE proxying + // This ensures API keys work even though they're not in the namespace's RQLite + validatedNamespace, authErr := g.validateAuthForNamespaceProxy(r) + if authErr != "" && !isPublicPath(r.URL.Path) { + w.Header().Set("WWW-Authenticate", "Bearer error=\"invalid_token\"") + writeError(w, http.StatusUnauthorized, authErr) + return + } + + // If auth succeeded, ensure the API key belongs to the target namespace + if validatedNamespace != "" && validatedNamespace != namespaceName { + writeError(w, http.StatusForbidden, "API key does not belong to this namespace") + return + } + // Look up namespace cluster gateway using internal (WireGuard) IPs for inter-node proxying db := g.client.Database() internalCtx := client.WithInternalAuth(r.Context()) @@ -621,8 +803,31 @@ func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.R gatewayPort = p } - // Proxy request to the namespace gateway - targetURL := "http://" + gatewayIP + ":" + strconv.Itoa(gatewayPort) + r.URL.Path + targetHost := gatewayIP + ":" + strconv.Itoa(gatewayPort) + + // Handle WebSocket upgrade requests specially (http.Client can't handle 101 Switching Protocols) + if isWebSocketUpgrade(r) { + // Set forwarding headers on the original request + r.Header.Set("X-Forwarded-For", getClientIP(r)) + r.Header.Set("X-Forwarded-Proto", "https") + r.Header.Set("X-Forwarded-Host", r.Host) + // Set internal auth headers if auth was validated + if validatedNamespace != "" { + r.Header.Set(HeaderInternalAuthValidated, "true") + r.Header.Set(HeaderInternalAuthNamespace, validatedNamespace) + } + r.URL.Scheme = "http" + r.URL.Host = targetHost + r.Host = targetHost + if g.proxyWebSocket(w, r, targetHost) { + return + } + // If WebSocket proxy failed and already wrote error, return + return + } + + // Proxy regular HTTP request to the namespace gateway + targetURL := "http://" + targetHost + r.URL.Path if r.URL.RawQuery != "" { targetURL += "?" + r.URL.RawQuery } @@ -648,6 +853,13 @@ func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.R proxyReq.Header.Set("X-Forwarded-Host", r.Host) proxyReq.Header.Set("X-Original-Host", r.Host) + // Set internal auth headers if auth was validated by main gateway + // This allows the namespace gateway to trust the authentication + if validatedNamespace != "" { + proxyReq.Header.Set(HeaderInternalAuthValidated, "true") + proxyReq.Header.Set(HeaderInternalAuthNamespace, validatedNamespace) + } + // Execute proxy request httpClient := &http.Client{Timeout: 30 * time.Second} resp, err := httpClient.Do(proxyReq) @@ -863,13 +1075,32 @@ func (g *Gateway) proxyToDynamicDeployment(w http.ResponseWriter, r *http.Reques serveLocal: // Create a simple reverse proxy to localhost - target := "http://localhost:" + strconv.Itoa(deployment.Port) + targetHost := "localhost:" + strconv.Itoa(deployment.Port) + target := "http://" + targetHost // Set proxy headers r.Header.Set("X-Forwarded-For", getClientIP(r)) r.Header.Set("X-Forwarded-Proto", "https") r.Header.Set("X-Forwarded-Host", r.Host) + // Handle WebSocket upgrade requests specially + if isWebSocketUpgrade(r) { + r.URL.Scheme = "http" + r.URL.Host = targetHost + r.Host = targetHost + if g.proxyWebSocket(w, r, targetHost) { + return + } + // WebSocket proxy failed - try cross-node replicas as fallback + if g.replicaManager != nil { + if g.proxyCrossNodeWithReplicas(w, r, deployment) { + return + } + } + http.Error(w, "WebSocket connection failed", http.StatusServiceUnavailable) + return + } + // Create a new request to the backend backendURL := target + r.URL.Path if r.URL.RawQuery != "" { @@ -955,7 +1186,19 @@ func (g *Gateway) proxyCrossNode(w http.ResponseWriter, r *http.Request, deploym // Proxy to home node via internal HTTP port (6001) // This is node-to-node internal communication - no TLS needed - targetURL := "http://" + homeIP + ":6001" + r.URL.Path + targetHost := homeIP + ":6001" + + // Handle WebSocket upgrade requests specially + if isWebSocketUpgrade(r) { + r.Header.Set("X-Forwarded-For", getClientIP(r)) + r.Header.Set("X-Orama-Proxy-Node", g.nodePeerID) + r.URL.Scheme = "http" + r.URL.Host = targetHost + // Keep original Host header for domain routing + return g.proxyWebSocket(w, r, targetHost) + } + + targetURL := "http://" + targetHost + r.URL.Path if r.URL.RawQuery != "" { targetURL += "?" + r.URL.RawQuery } @@ -1056,7 +1299,18 @@ func (g *Gateway) proxyCrossNodeToIP(w http.ResponseWriter, r *http.Request, dep zap.String("node_ip", nodeIP), ) - targetURL := "http://" + nodeIP + ":6001" + r.URL.Path + targetHost := nodeIP + ":6001" + + // Handle WebSocket upgrade requests specially + if isWebSocketUpgrade(r) { + r.Header.Set("X-Forwarded-For", getClientIP(r)) + r.Header.Set("X-Orama-Proxy-Node", g.nodePeerID) + r.URL.Scheme = "http" + r.URL.Host = targetHost + return g.proxyWebSocket(w, r, targetHost) + } + + targetURL := "http://" + targetHost + r.URL.Path if r.URL.RawQuery != "" { targetURL += "?" + r.URL.RawQuery } diff --git a/pkg/namespace/cluster_manager.go b/pkg/namespace/cluster_manager.go index cc6903a..a6a4831 100644 --- a/pkg/namespace/cluster_manager.go +++ b/pkg/namespace/cluster_manager.go @@ -26,6 +26,11 @@ import ( type ClusterManagerConfig struct { BaseDomain string // Base domain for namespace gateways (e.g., "orama-devnet.network") BaseDataDir string // Base directory for namespace data (e.g., "~/.orama/data/namespaces") + // IPFS configuration for namespace gateways (defaults used if not set) + IPFSClusterAPIURL string // IPFS Cluster API URL (default: "http://localhost:9094") + IPFSAPIURL string // IPFS API URL (default: "http://localhost:5001") + IPFSTimeout time.Duration // Timeout for IPFS operations (default: 60s) + IPFSReplicationFactor int // IPFS replication factor (default: 3) } // ClusterManager orchestrates namespace cluster provisioning and lifecycle @@ -40,6 +45,12 @@ type ClusterManager struct { baseDomain string baseDataDir string + // IPFS configuration for namespace gateways + ipfsClusterAPIURL string + ipfsAPIURL string + ipfsTimeout time.Duration + ipfsReplicationFactor int + // Local node identity for distributed spawning localNodeID string @@ -61,17 +72,39 @@ func NewClusterManager( olricSpawner := olric.NewInstanceSpawner(cfg.BaseDataDir, logger) gatewaySpawner := gateway.NewInstanceSpawner(cfg.BaseDataDir, logger) + // Set IPFS defaults + ipfsClusterAPIURL := cfg.IPFSClusterAPIURL + if ipfsClusterAPIURL == "" { + ipfsClusterAPIURL = "http://localhost:9094" + } + ipfsAPIURL := cfg.IPFSAPIURL + if ipfsAPIURL == "" { + ipfsAPIURL = "http://localhost:5001" + } + ipfsTimeout := cfg.IPFSTimeout + if ipfsTimeout == 0 { + ipfsTimeout = 60 * time.Second + } + ipfsReplicationFactor := cfg.IPFSReplicationFactor + if ipfsReplicationFactor == 0 { + ipfsReplicationFactor = 3 + } + return &ClusterManager{ - db: db, - portAllocator: portAllocator, - nodeSelector: nodeSelector, - rqliteSpawner: rqliteSpawner, - olricSpawner: olricSpawner, - gatewaySpawner: gatewaySpawner, - baseDomain: cfg.BaseDomain, - baseDataDir: cfg.BaseDataDir, - logger: logger.With(zap.String("component", "cluster-manager")), - provisioning: make(map[string]bool), + db: db, + portAllocator: portAllocator, + nodeSelector: nodeSelector, + rqliteSpawner: rqliteSpawner, + olricSpawner: olricSpawner, + gatewaySpawner: gatewaySpawner, + baseDomain: cfg.BaseDomain, + baseDataDir: cfg.BaseDataDir, + ipfsClusterAPIURL: ipfsClusterAPIURL, + ipfsAPIURL: ipfsAPIURL, + ipfsTimeout: ipfsTimeout, + ipfsReplicationFactor: ipfsReplicationFactor, + logger: logger.With(zap.String("component", "cluster-manager")), + provisioning: make(map[string]bool), } } @@ -86,17 +119,39 @@ func NewClusterManagerWithComponents( cfg ClusterManagerConfig, logger *zap.Logger, ) *ClusterManager { + // Set IPFS defaults (same as NewClusterManager) + ipfsClusterAPIURL := cfg.IPFSClusterAPIURL + if ipfsClusterAPIURL == "" { + ipfsClusterAPIURL = "http://localhost:9094" + } + ipfsAPIURL := cfg.IPFSAPIURL + if ipfsAPIURL == "" { + ipfsAPIURL = "http://localhost:5001" + } + ipfsTimeout := cfg.IPFSTimeout + if ipfsTimeout == 0 { + ipfsTimeout = 60 * time.Second + } + ipfsReplicationFactor := cfg.IPFSReplicationFactor + if ipfsReplicationFactor == 0 { + ipfsReplicationFactor = 3 + } + return &ClusterManager{ - db: db, - portAllocator: portAllocator, - nodeSelector: nodeSelector, - rqliteSpawner: rqliteSpawner, - olricSpawner: olricSpawner, - gatewaySpawner: gatewaySpawner, - baseDomain: cfg.BaseDomain, - baseDataDir: cfg.BaseDataDir, - logger: logger.With(zap.String("component", "cluster-manager")), - provisioning: make(map[string]bool), + db: db, + portAllocator: portAllocator, + nodeSelector: nodeSelector, + rqliteSpawner: rqliteSpawner, + olricSpawner: olricSpawner, + gatewaySpawner: gatewaySpawner, + baseDomain: cfg.BaseDomain, + baseDataDir: cfg.BaseDataDir, + ipfsClusterAPIURL: ipfsClusterAPIURL, + ipfsAPIURL: ipfsAPIURL, + ipfsTimeout: ipfsTimeout, + ipfsReplicationFactor: ipfsReplicationFactor, + logger: logger.With(zap.String("component", "cluster-manager")), + provisioning: make(map[string]bool), } } @@ -407,14 +462,10 @@ func (cm *ClusterManager) startOlricCluster(ctx context.Context, cluster *Namesp func (cm *ClusterManager) startGatewayCluster(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock, rqliteInstances []*rqlite.Instance, olricInstances []*olric.OlricInstance) ([]*gateway.GatewayInstance, error) { instances := make([]*gateway.GatewayInstance, len(nodes)) - // Build Olric server addresses — use WireGuard IPs for remote instances + // Build Olric server addresses — always use WireGuard IPs (Olric binds to WireGuard interface) olricServers := make([]string, len(olricInstances)) for i, inst := range olricInstances { - if nodes[i].NodeID == cm.localNodeID { - olricServers[i] = inst.DSN() // localhost for local - } else { - olricServers[i] = inst.AdvertisedDSN() // WireGuard IP for remote - } + olricServers[i] = inst.AdvertisedDSN() // Always use WireGuard IP } // Start all Gateway instances @@ -423,12 +474,16 @@ func (cm *ClusterManager) startGatewayCluster(ctx context.Context, cluster *Name rqliteDSN := fmt.Sprintf("http://localhost:%d", portBlocks[i].RQLiteHTTPPort) cfg := gateway.InstanceConfig{ - Namespace: cluster.NamespaceName, - NodeID: node.NodeID, - HTTPPort: portBlocks[i].GatewayHTTPPort, - BaseDomain: cm.baseDomain, - RQLiteDSN: rqliteDSN, - OlricServers: olricServers, + Namespace: cluster.NamespaceName, + NodeID: node.NodeID, + HTTPPort: portBlocks[i].GatewayHTTPPort, + BaseDomain: cm.baseDomain, + RQLiteDSN: rqliteDSN, + OlricServers: olricServers, + IPFSClusterAPIURL: cm.ipfsClusterAPIURL, + IPFSAPIURL: cm.ipfsAPIURL, + IPFSTimeout: cm.ipfsTimeout, + IPFSReplicationFactor: cm.ipfsReplicationFactor, } instance, err := cm.gatewaySpawner.SpawnInstance(ctx, cfg) @@ -577,32 +632,13 @@ func (cm *ClusterManager) sendStopRequest(ctx context.Context, nodeIP, action, n } // createDNSRecords creates DNS records for the namespace gateway. -// Only nameserver nodes get DNS A records, because only they run Caddy +// All namespace nodes get DNS A records since all nodes now run Caddy // and can serve TLS for ns-{namespace}.{baseDomain} subdomains. func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) error { fqdn := fmt.Sprintf("ns-%s.%s.", cluster.NamespaceName, cm.baseDomain) - // Query nameserver node IDs so we only add DNS records for nodes that can serve TLS - type nsRow struct { - NodeID string `db:"node_id"` - } - var nameservers []nsRow - _ = cm.db.Query(ctx, &nameservers, `SELECT node_id FROM dns_nameservers`) - nsSet := make(map[string]bool, len(nameservers)) - for _, ns := range nameservers { - nsSet[ns.NodeID] = true - } - recordCount := 0 for i, node := range nodes { - if len(nsSet) > 0 && !nsSet[node.NodeID] { - cm.logger.Info("Skipping DNS record for non-nameserver node", - zap.String("node_id", node.NodeID), - zap.String("ip", node.IPAddress), - ) - continue - } - query := ` INSERT INTO dns_records (fqdn, record_type, value, ttl, namespace, created_by) VALUES (?, 'A', ?, 300, ?, 'system') @@ -1294,23 +1330,23 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n } if !gwRunning { - // Build olric server addresses + // Build olric server addresses — always use WireGuard IPs (Olric binds to WireGuard interface) var olricServers []string for _, np := range allNodePorts { - if np.NodeID == cm.localNodeID { - olricServers = append(olricServers, fmt.Sprintf("localhost:%d", np.OlricHTTPPort)) - } else { - olricServers = append(olricServers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricHTTPPort)) - } + olricServers = append(olricServers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricHTTPPort)) } gwCfg := gateway.InstanceConfig{ - Namespace: namespaceName, - NodeID: cm.localNodeID, - HTTPPort: pb.GatewayHTTPPort, - BaseDomain: cm.baseDomain, - RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort), - OlricServers: olricServers, + Namespace: namespaceName, + NodeID: cm.localNodeID, + HTTPPort: pb.GatewayHTTPPort, + BaseDomain: cm.baseDomain, + RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort), + OlricServers: olricServers, + IPFSClusterAPIURL: cm.ipfsClusterAPIURL, + IPFSAPIURL: cm.ipfsAPIURL, + IPFSTimeout: cm.ipfsTimeout, + IPFSReplicationFactor: cm.ipfsReplicationFactor, } if _, err := cm.gatewaySpawner.SpawnInstance(ctx, gwCfg); err != nil { @@ -1550,21 +1586,22 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl if err == nil { resp.Body.Close() } else { + // Build olric server addresses — always use WireGuard IPs (Olric binds to WireGuard interface) var olricServers []string for _, np := range state.AllNodes { - if np.NodeID == cm.localNodeID { - olricServers = append(olricServers, fmt.Sprintf("localhost:%d", np.OlricHTTPPort)) - } else { - olricServers = append(olricServers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricHTTPPort)) - } + olricServers = append(olricServers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricHTTPPort)) } gwCfg := gateway.InstanceConfig{ - Namespace: state.NamespaceName, - NodeID: cm.localNodeID, - HTTPPort: pb.GatewayHTTPPort, - BaseDomain: state.BaseDomain, - RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort), - OlricServers: olricServers, + Namespace: state.NamespaceName, + NodeID: cm.localNodeID, + HTTPPort: pb.GatewayHTTPPort, + BaseDomain: state.BaseDomain, + RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort), + OlricServers: olricServers, + IPFSClusterAPIURL: cm.ipfsClusterAPIURL, + IPFSAPIURL: cm.ipfsAPIURL, + IPFSTimeout: cm.ipfsTimeout, + IPFSReplicationFactor: cm.ipfsReplicationFactor, } if _, err := cm.gatewaySpawner.SpawnInstance(ctx, gwCfg); err != nil { cm.logger.Error("Failed to restore Gateway from state", zap.String("namespace", state.NamespaceName), zap.Error(err)) diff --git a/pkg/rqlite/cluster.go b/pkg/rqlite/cluster.go index dfb89d1..61228fc 100644 --- a/pkg/rqlite/cluster.go +++ b/pkg/rqlite/cluster.go @@ -9,6 +9,8 @@ import ( "path/filepath" "strings" "time" + + "go.uber.org/zap" ) // establishLeadershipOrJoin handles post-startup cluster establishment @@ -95,7 +97,9 @@ func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rql r.discoveryService.TriggerSync() time.Sleep(2 * time.Second) - discoveryDeadline := time.Now().Add(30 * time.Second) + // Wait up to 2 minutes for peer discovery - LibP2P DHT can take 60+ seconds + // to re-establish connections after simultaneous restart + discoveryDeadline := time.Now().Add(2 * time.Minute) var discoveredPeers int for time.Now().Before(discoveryDeadline) { @@ -103,12 +107,23 @@ func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rql discoveredPeers = len(allPeers) if discoveredPeers >= r.config.MinClusterSize { + r.logger.Info("Discovered required peers for cluster", + zap.Int("discovered", discoveredPeers), + zap.Int("required", r.config.MinClusterSize)) break } time.Sleep(2 * time.Second) } + // Even if we only discovered ourselves, write peers.json as a fallback + // This ensures RQLite has consistent state and can potentially recover + // when other nodes come online if discoveredPeers <= 1 { + r.logger.Warn("Only discovered self during pre-start discovery, writing single-node peers.json as fallback", + zap.Int("discovered_peers", discoveredPeers), + zap.Int("min_cluster_size", r.config.MinClusterSize)) + // Still write peers.json with just ourselves - better than nothing + _ = r.discoveryService.ForceWritePeersJSON() return nil }