diff --git a/Makefile b/Makefile index 99068ad..0aca838 100644 --- a/Makefile +++ b/Makefile @@ -86,7 +86,7 @@ test-e2e-quick: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill redeploy-devnet redeploy-testnet release health -VERSION := 0.102.5 +VERSION := 0.103.0 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 172fd78..87bab66 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -175,7 +175,8 @@ func showHelp() { fmt.Printf(" db backups - List database backups\n\n") fmt.Printf("šŸ¢ Namespaces:\n") - fmt.Printf(" namespace delete - Delete current namespace and all resources\n\n") + fmt.Printf(" namespace delete - Delete current namespace and all resources\n") + fmt.Printf(" namespace repair - Repair under-provisioned cluster (add missing nodes)\n\n") fmt.Printf("šŸ” Cluster Inspection:\n") fmt.Printf(" inspect - Inspect cluster health via SSH\n") diff --git a/cmd/gateway/config.go b/cmd/gateway/config.go index 331233b..3983f2c 100644 --- a/cmd/gateway/config.go +++ b/cmd/gateway/config.go @@ -192,17 +192,6 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config { cfg.IPFSReplicationFactor = y.IPFSReplicationFactor } - // Phantom Solana auth (from env vars) - if v := os.Getenv("PHANTOM_AUTH_URL"); v != "" { - cfg.PhantomAuthURL = v - } - if v := os.Getenv("SOLANA_RPC_URL"); v != "" { - cfg.SolanaRPCURL = v - } - if v := os.Getenv("NFT_COLLECTION_ADDRESS"); v != "" { - cfg.NFTCollectionAddress = v - } - // Validate configuration if errs := cfg.ValidateConfig(); len(errs) > 0 { fmt.Fprintf(os.Stderr, "\nGateway configuration errors (%d):\n", len(errs)) diff --git a/pkg/auth/phantom.go b/pkg/auth/phantom.go index 62d073b..6e6a1e3 100644 --- a/pkg/auth/phantom.go +++ b/pkg/auth/phantom.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "net/url" "os" "strings" "time" @@ -15,10 +16,12 @@ import ( qrterminal "github.com/mdp/qrterminal/v3" ) +// Hardcoded Phantom auth React app URL (deployed on Orama devnet) +const phantomAuthURL = "https://phantom-auth-y0w9aa.orama-devnet.network" + // PhantomSession represents a phantom auth session from the gateway. type PhantomSession struct { SessionID string `json:"session_id"` - AuthURL string `json:"auth_url"` ExpiresAt string `json:"expires_at"` } @@ -71,10 +74,13 @@ func PerformPhantomAuthentication(gatewayURL, namespace string) (*Credentials, e return nil, fmt.Errorf("failed to create session: %w", err) } - // 2. Display QR code + // 2. Build auth URL and display QR code + authURL := fmt.Sprintf("%s/?session=%s&gateway=%s&namespace=%s", + phantomAuthURL, session.SessionID, url.QueryEscape(gatewayURL), url.QueryEscape(namespace)) + fmt.Println("\nScan this QR code with your phone to authenticate:") fmt.Println() - qrterminal.GenerateWithConfig(session.AuthURL, qrterminal.Config{ + qrterminal.GenerateWithConfig(authURL, qrterminal.Config{ Level: qrterminal.M, Writer: os.Stdout, BlackChar: qrterminal.BLACK, @@ -82,7 +88,7 @@ func PerformPhantomAuthentication(gatewayURL, namespace string) (*Credentials, e QuietZone: 1, }) fmt.Println() - fmt.Printf("Or open this URL on your phone:\n%s\n\n", session.AuthURL) + fmt.Printf("Or open this URL on your phone:\n%s\n\n", authURL) fmt.Println("Waiting for authentication... (timeout: 5 minutes)") // 3. Poll for completion @@ -94,7 +100,11 @@ func PerformPhantomAuthentication(gatewayURL, namespace string) (*Credentials, e // Set namespace and build namespace URL creds.Namespace = namespace if domain := extractDomainFromURL(gatewayURL); domain != "" { - creds.NamespaceURL = fmt.Sprintf("https://ns-%s.%s", namespace, domain) + if namespace == "default" { + creds.NamespaceURL = fmt.Sprintf("https://%s", domain) + } else { + creds.NamespaceURL = fmt.Sprintf("https://ns-%s.%s", namespace, domain) + } } fmt.Printf("\nšŸŽ‰ Authentication successful!\n") diff --git a/pkg/auth/rootwallet.go b/pkg/auth/rootwallet.go index 1518b70..8ab505f 100644 --- a/pkg/auth/rootwallet.go +++ b/pkg/auth/rootwallet.go @@ -208,7 +208,11 @@ func verifySignature(client *http.Client, gatewayURL, wallet, nonce, signature, // Build namespace gateway URL namespaceURL := "" if d := extractDomainFromURL(gatewayURL); d != "" { - namespaceURL = fmt.Sprintf("https://ns-%s.%s", namespace, d) + if namespace == "default" { + namespaceURL = fmt.Sprintf("https://%s", d) + } else { + namespaceURL = fmt.Sprintf("https://ns-%s.%s", namespace, d) + } } creds := &Credentials{ @@ -221,9 +225,8 @@ func verifySignature(client *http.Client, gatewayURL, wallet, nonce, signature, NamespaceURL: namespaceURL, } - if result.ExpiresIn > 0 { - creds.ExpiresAt = time.Now().Add(time.Duration(result.ExpiresIn) * time.Second) - } + // Note: result.ExpiresIn is the JWT access token lifetime (15min), + // NOT the API key lifetime. Don't set ExpiresAt — the API key is permanent. return creds, nil } diff --git a/pkg/auth/simple_auth.go b/pkg/auth/simple_auth.go index 058ee5f..3b5f7b5 100644 --- a/pkg/auth/simple_auth.go +++ b/pkg/auth/simple_auth.go @@ -76,7 +76,11 @@ func PerformSimpleAuthentication(gatewayURL, wallet, namespace, existingAPIKey s // Build namespace gateway URL from the gateway URL namespaceURL := "" if domain := extractDomainFromURL(gatewayURL); domain != "" { - namespaceURL = fmt.Sprintf("https://ns-%s.%s", namespace, domain) + if namespace == "default" { + namespaceURL = fmt.Sprintf("https://%s", domain) + } else { + namespaceURL = fmt.Sprintf("https://ns-%s.%s", namespace, domain) + } } // Create credentials diff --git a/pkg/cli/namespace_commands.go b/pkg/cli/namespace_commands.go index 137942e..73d3680 100644 --- a/pkg/cli/namespace_commands.go +++ b/pkg/cli/namespace_commands.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/DeBrosOfficial/network/pkg/auth" + "github.com/DeBrosOfficial/network/pkg/constants" ) // HandleNamespaceCommand handles namespace management commands @@ -28,6 +29,12 @@ func HandleNamespaceCommand(args []string) { fs.BoolVar(&force, "force", false, "Skip confirmation prompt") _ = fs.Parse(args[1:]) handleNamespaceDelete(force) + case "repair": + if len(args) < 2 { + fmt.Fprintf(os.Stderr, "Usage: orama namespace repair \n") + os.Exit(1) + } + handleNamespaceRepair(args[1]) case "help": showNamespaceHelp() default: @@ -41,13 +48,53 @@ func showNamespaceHelp() { fmt.Printf("Namespace Management Commands\n\n") fmt.Printf("Usage: orama namespace \n\n") fmt.Printf("Subcommands:\n") - fmt.Printf(" delete - Delete the current namespace and all its resources\n") - fmt.Printf(" help - Show this help message\n\n") + fmt.Printf(" delete - Delete the current namespace and all its resources\n") + fmt.Printf(" repair - Repair an under-provisioned namespace cluster (add missing nodes)\n") + fmt.Printf(" help - Show this help message\n\n") fmt.Printf("Flags:\n") - fmt.Printf(" --force - Skip confirmation prompt\n\n") + fmt.Printf(" --force - Skip confirmation prompt (delete only)\n\n") fmt.Printf("Examples:\n") fmt.Printf(" orama namespace delete\n") fmt.Printf(" orama namespace delete --force\n") + fmt.Printf(" orama namespace repair anchat\n") +} + +func handleNamespaceRepair(namespaceName string) { + fmt.Printf("Repairing namespace cluster '%s'...\n", namespaceName) + + // Call the internal repair endpoint on the local gateway + url := fmt.Sprintf("http://localhost:%d/v1/internal/namespace/repair?namespace=%s", constants.GatewayAPIPort, namespaceName) + req, err := http.NewRequest(http.MethodPost, url, nil) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to create request: %v\n", err) + os.Exit(1) + } + req.Header.Set("X-Orama-Internal-Auth", "namespace-coordination") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to connect to local gateway (is the node running?): %v\n", err) + os.Exit(1) + } + defer resp.Body.Close() + + var result map[string]interface{} + json.NewDecoder(resp.Body).Decode(&result) + + if resp.StatusCode != http.StatusOK { + errMsg := "unknown error" + if e, ok := result["error"].(string); ok { + errMsg = e + } + fmt.Fprintf(os.Stderr, "Repair failed: %s\n", errMsg) + os.Exit(1) + } + + fmt.Printf("Namespace '%s' cluster repaired successfully.\n", namespaceName) + if msg, ok := result["message"].(string); ok { + fmt.Printf(" %s\n", msg) + } } func handleNamespaceDelete(force bool) { diff --git a/pkg/environments/production/config.go b/pkg/environments/production/config.go index ab74e9e..d700b11 100644 --- a/pkg/environments/production/config.go +++ b/pkg/environments/production/config.go @@ -161,8 +161,8 @@ func (cg *ConfigGenerator) GenerateNodeConfig(peerAddresses []string, vpsIP stri P2PPort: 4001, DataDir: filepath.Join(cg.oramaDir, "data"), RQLiteHTTPPort: 5001, - RQLiteRaftPort: 7001, // External SNI port - RQLiteRaftInternalPort: raftInternalPort, // Internal RQLite binding port + RQLiteRaftPort: 7001, // External SNI port + RQLiteRaftInternalPort: raftInternalPort, // Internal RQLite binding port RQLiteJoinAddress: rqliteJoinAddr, BootstrapPeers: peerAddresses, ClusterAPIPort: 9094, diff --git a/pkg/gateway/auth/solana_nft.go b/pkg/gateway/auth/solana_nft.go index f253b98..0ad02f8 100644 --- a/pkg/gateway/auth/solana_nft.go +++ b/pkg/gateway/auth/solana_nft.go @@ -20,6 +20,11 @@ const ( tokenProgramID = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" // Metaplex Token Metadata Program ID metaplexProgramID = "metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s" + + // Hardcoded Solana RPC endpoint (mainnet-beta) + defaultSolanaRPCURL = "https://api.mainnet-beta.solana.com" + // Required NFT collection address for Phantom auth + defaultNFTCollectionAddress = "GtsCViqB9fWriKeDMQdveDvYmqqvBCEoxRfu1gzE48uh" ) // SolanaNFTVerifier verifies NFT ownership on Solana via JSON-RPC. @@ -29,11 +34,11 @@ type SolanaNFTVerifier struct { httpClient *http.Client } -// NewSolanaNFTVerifier creates a new verifier for the given collection. -func NewSolanaNFTVerifier(rpcURL, collectionAddress string) *SolanaNFTVerifier { +// NewDefaultSolanaNFTVerifier creates a verifier with the hardcoded collection and RPC endpoint. +func NewDefaultSolanaNFTVerifier() *SolanaNFTVerifier { return &SolanaNFTVerifier{ - rpcURL: rpcURL, - collectionAddress: collectionAddress, + rpcURL: defaultSolanaRPCURL, + collectionAddress: defaultNFTCollectionAddress, httpClient: &http.Client{ Timeout: 30 * time.Second, }, diff --git a/pkg/gateway/config.go b/pkg/gateway/config.go index 6a7cbcf..9384513 100644 --- a/pkg/gateway/config.go +++ b/pkg/gateway/config.go @@ -41,9 +41,4 @@ type Config struct { // WireGuard mesh configuration ClusterSecret string // Cluster secret for authenticating internal WireGuard peer exchange - - // Phantom Solana auth configuration - PhantomAuthURL string // URL of the deployed Phantom auth React app - SolanaRPCURL string // Solana RPC endpoint for NFT verification - NFTCollectionAddress string // Required NFT collection address for Phantom auth } diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index a4b52bc..1533256 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -140,6 +140,9 @@ type Gateway struct { // Node health monitor (ring-based peer failure detection) healthMonitor *nodehealth.Monitor + + // Node recovery handler (called when health monitor confirms a node dead or recovered) + nodeRecoverer authhandlers.NodeRecoverer } // localSubscriber represents a WebSocket subscriber for local message delivery @@ -322,18 +325,10 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { gw.withInternalAuth, ) - // Configure Phantom Solana auth if env vars are set - if cfg.PhantomAuthURL != "" { - var solanaVerifier *auth.SolanaNFTVerifier - if cfg.SolanaRPCURL != "" && cfg.NFTCollectionAddress != "" { - solanaVerifier = auth.NewSolanaNFTVerifier(cfg.SolanaRPCURL, cfg.NFTCollectionAddress) - logger.ComponentInfo(logging.ComponentGeneral, "Solana NFT verifier configured", - zap.String("collection", cfg.NFTCollectionAddress)) - } - gw.authHandlers.SetPhantomConfig(cfg.PhantomAuthURL, solanaVerifier) - logger.ComponentInfo(logging.ComponentGeneral, "Phantom auth configured", - zap.String("auth_url", cfg.PhantomAuthURL)) - } + // Configure Solana NFT verifier for Phantom auth (hardcoded collection + RPC) + solanaVerifier := auth.NewDefaultSolanaNFTVerifier() + gw.authHandlers.SetSolanaVerifier(solanaVerifier) + logger.ComponentInfo(logging.ComponentGeneral, "Solana NFT verifier configured") } // Initialize middleware cache (60s TTL for auth/routing lookups) @@ -554,8 +549,18 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { Neighbors: 3, }) gw.healthMonitor.OnNodeDead(func(nodeID string) { - logger.ComponentError(logging.ComponentGeneral, "Node confirmed dead by quorum — recovery not yet implemented", + logger.ComponentError(logging.ComponentGeneral, "Node confirmed dead by quorum — starting recovery", zap.String("dead_node", nodeID)) + if gw.nodeRecoverer != nil { + go gw.nodeRecoverer.HandleDeadNode(context.Background(), nodeID) + } + }) + gw.healthMonitor.OnNodeRecovered(func(nodeID string) { + logger.ComponentInfo(logging.ComponentGeneral, "Previously dead node recovered — checking for orphaned services", + zap.String("node_id", nodeID)) + if gw.nodeRecoverer != nil { + go gw.nodeRecoverer.HandleRecoveredNode(context.Background(), nodeID) + } }) go gw.healthMonitor.Start(context.Background()) logger.ComponentInfo(logging.ComponentGeneral, "Node health monitor started", @@ -584,6 +589,11 @@ func (g *Gateway) SetClusterProvisioner(cp authhandlers.ClusterProvisioner) { } } +// SetNodeRecoverer sets the handler for dead node recovery and revived node cleanup. +func (g *Gateway) SetNodeRecoverer(nr authhandlers.NodeRecoverer) { + g.nodeRecoverer = nr +} + // SetSpawnHandler sets the handler for internal namespace spawn/stop requests. func (g *Gateway) SetSpawnHandler(h http.Handler) { g.spawnHandler = h @@ -748,3 +758,43 @@ func (g *Gateway) namespaceClusterStatusHandler(w http.ResponseWriter, r *http.R json.NewEncoder(w).Encode(status) } +// namespaceClusterRepairHandler handles POST /v1/internal/namespace/repair?namespace={name} +// This endpoint repairs under-provisioned namespace clusters by adding missing nodes. +// Internal-only: authenticated by X-Orama-Internal-Auth header. +func (g *Gateway) namespaceClusterRepairHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + // Internal auth check + if r.Header.Get("X-Orama-Internal-Auth") != "namespace-coordination" { + writeError(w, http.StatusUnauthorized, "unauthorized") + return + } + + namespaceName := r.URL.Query().Get("namespace") + if namespaceName == "" { + writeError(w, http.StatusBadRequest, "namespace parameter required") + return + } + + if g.nodeRecoverer == nil { + writeError(w, http.StatusServiceUnavailable, "cluster recovery not enabled") + return + } + + if err := g.nodeRecoverer.RepairCluster(r.Context(), namespaceName); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "status": "ok", + "namespace": namespaceName, + "message": "cluster repair completed", + }) +} + diff --git a/pkg/gateway/handlers/auth/handlers.go b/pkg/gateway/handlers/auth/handlers.go index 94b61dc..4a3f4d6 100644 --- a/pkg/gateway/handlers/auth/handlers.go +++ b/pkg/gateway/handlers/auth/handlers.go @@ -48,6 +48,14 @@ type ClusterProvisioner interface { GetClusterStatusByID(ctx context.Context, clusterID string) (interface{}, error) } +// NodeRecoverer handles automatic recovery when nodes die or come back online, +// and manual cluster repair for under-provisioned clusters. +type NodeRecoverer interface { + HandleDeadNode(ctx context.Context, deadNodeID string) + HandleRecoveredNode(ctx context.Context, nodeID string) + RepairCluster(ctx context.Context, namespaceName string) error +} + // Handlers holds dependencies for authentication HTTP handlers type Handlers struct { logger *logging.ColoredLogger @@ -55,8 +63,7 @@ type Handlers struct { netClient NetworkClient defaultNS string internalAuthFn func(context.Context) context.Context - clusterProvisioner ClusterProvisioner // Optional: for namespace cluster provisioning - phantomAuthURL string // URL of the Phantom auth React app + clusterProvisioner ClusterProvisioner // Optional: for namespace cluster provisioning solanaVerifier *authsvc.SolanaNFTVerifier // Server-side NFT ownership verifier } @@ -82,9 +89,8 @@ func (h *Handlers) SetClusterProvisioner(cp ClusterProvisioner) { h.clusterProvisioner = cp } -// SetPhantomConfig sets the Phantom auth app URL and Solana NFT verifier -func (h *Handlers) SetPhantomConfig(authURL string, verifier *authsvc.SolanaNFTVerifier) { - h.phantomAuthURL = authURL +// SetSolanaVerifier sets the server-side NFT ownership verifier for Phantom auth +func (h *Handlers) SetSolanaVerifier(verifier *authsvc.SolanaNFTVerifier) { h.solanaVerifier = verifier } diff --git a/pkg/gateway/handlers/auth/phantom_handler.go b/pkg/gateway/handlers/auth/phantom_handler.go index 942b7bd..95e8359 100644 --- a/pkg/gateway/handlers/auth/phantom_handler.go +++ b/pkg/gateway/handlers/auth/phantom_handler.go @@ -5,9 +5,7 @@ import ( "crypto/rand" "encoding/hex" "encoding/json" - "fmt" "net/http" - "net/url" "regexp" "strings" "time" @@ -29,10 +27,6 @@ func (h *Handlers) PhantomSessionHandler(w http.ResponseWriter, r *http.Request) writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } - if h.phantomAuthURL == "" { - writeError(w, http.StatusServiceUnavailable, "phantom auth not configured") - return - } var req struct { Namespace string `json:"namespace"` @@ -78,23 +72,8 @@ func (h *Handlers) PhantomSessionHandler(w http.ResponseWriter, r *http.Request) return } - // Build the auth URL that the phone will open - gatewayURL := r.Header.Get("X-Forwarded-Proto") + "://" + r.Header.Get("X-Forwarded-Host") - if gatewayURL == "://" { - // Fallback: construct from request - scheme := "https" - if r.TLS == nil && r.Header.Get("X-Forwarded-Proto") == "" { - scheme = "http" - } - gatewayURL = scheme + "://" + r.Host - } - - authURL := fmt.Sprintf("%s/?session=%s&gateway=%s&namespace=%s", - h.phantomAuthURL, sessionID, url.QueryEscape(gatewayURL), url.QueryEscape(namespace)) - writeJSON(w, http.StatusOK, map[string]any{ "session_id": sessionID, - "auth_url": authURL, "expires_at": expiresAt.UTC().Format(time.RFC3339), }) } diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index cf9d45c..fa586d0 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -449,6 +449,11 @@ func isPublicPath(p string) bool { return true } + // Namespace cluster repair endpoint (auth handled by internal auth header) + if p == "/v1/internal/namespace/repair" { + return true + } + // Phantom auth endpoints are public (session creation, status polling, completion) if strings.HasPrefix(p, "/v1/auth/phantom/") { return true diff --git a/pkg/gateway/routes.go b/pkg/gateway/routes.go index 3835039..bc9f805 100644 --- a/pkg/gateway/routes.go +++ b/pkg/gateway/routes.go @@ -44,6 +44,9 @@ func (g *Gateway) Routes() http.Handler { mux.Handle("/v1/internal/namespace/spawn", g.spawnHandler) } + // Namespace cluster repair (internal, handler does its own auth) + mux.HandleFunc("/v1/internal/namespace/repair", g.namespaceClusterRepairHandler) + // auth endpoints mux.HandleFunc("/v1/auth/jwks", g.authService.JWKSHandler) mux.HandleFunc("/.well-known/jwks.json", g.authService.JWKSHandler) diff --git a/pkg/namespace/cluster_manager.go b/pkg/namespace/cluster_manager.go index e7edbe5..8c683a1 100644 --- a/pkg/namespace/cluster_manager.go +++ b/pkg/namespace/cluster_manager.go @@ -1722,6 +1722,28 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl zap.String("cluster_id", state.ClusterID), ) + // Self-check: verify this node is still assigned to this cluster in the DB. + // If we were replaced during downtime, do NOT restore — stop services instead. + if cm.db != nil { + type countResult struct { + Count int `db:"count"` + } + var results []countResult + verifyQuery := `SELECT COUNT(*) as count FROM namespace_cluster_nodes WHERE namespace_cluster_id = ? AND node_id = ?` + if err := cm.db.Query(ctx, &results, verifyQuery, state.ClusterID, cm.localNodeID); err == nil && len(results) > 0 { + if results[0].Count == 0 { + cm.logger.Warn("Node was replaced during downtime, stopping orphaned services instead of restoring", + zap.String("namespace", state.NamespaceName), + zap.String("cluster_id", state.ClusterID)) + cm.systemdSpawner.StopAll(ctx, state.NamespaceName) + // Delete the stale cluster-state.json + stateFilePath := filepath.Join(cm.baseDataDir, state.NamespaceName, "cluster-state.json") + os.Remove(stateFilePath) + return nil + } + } + } + pb := &state.LocalPorts localIP := state.LocalIP diff --git a/pkg/namespace/cluster_recovery.go b/pkg/namespace/cluster_recovery.go new file mode 100644 index 0000000..e934ac9 --- /dev/null +++ b/pkg/namespace/cluster_recovery.go @@ -0,0 +1,913 @@ +package namespace + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/DeBrosOfficial/network/pkg/client" + "github.com/DeBrosOfficial/network/pkg/gateway" + "github.com/DeBrosOfficial/network/pkg/olric" + "github.com/DeBrosOfficial/network/pkg/rqlite" + "go.uber.org/zap" +) + +// nodeIPInfo holds both internal (WireGuard) and public IPs for a node. +type nodeIPInfo struct { + InternalIP string `db:"internal_ip"` + IPAddress string `db:"ip_address"` +} + +// survivingNodePorts holds port and IP info for surviving cluster nodes. +type survivingNodePorts struct { + NodeID string `db:"node_id"` + InternalIP string `db:"internal_ip"` + IPAddress string `db:"ip_address"` + RQLiteHTTPPort int `db:"rqlite_http_port"` + RQLiteRaftPort int `db:"rqlite_raft_port"` + OlricHTTPPort int `db:"olric_http_port"` + OlricMemberlistPort int `db:"olric_memberlist_port"` + GatewayHTTPPort int `db:"gateway_http_port"` +} + +// HandleDeadNode processes the death of a network node by recovering all affected +// namespace clusters. It finds all clusters with assignments to the dead node and +// replaces it with a healthy node in each cluster. +func (cm *ClusterManager) HandleDeadNode(ctx context.Context, deadNodeID string) { + cm.logger.Error("Handling dead node — starting recovery", + zap.String("dead_node", deadNodeID), + ) + + // Mark node as offline in dns_nodes + if err := cm.markNodeOffline(ctx, deadNodeID); err != nil { + cm.logger.Warn("Failed to mark node offline", zap.Error(err)) + } + + // Find all affected clusters + clusters, err := cm.getClustersByNodeID(ctx, deadNodeID) + if err != nil { + cm.logger.Error("Failed to find affected clusters for dead node", + zap.String("dead_node", deadNodeID), zap.Error(err)) + return + } + + if len(clusters) == 0 { + cm.logger.Info("Dead node had no namespace cluster assignments", + zap.String("dead_node", deadNodeID)) + return + } + + cm.logger.Info("Found affected namespace clusters", + zap.String("dead_node", deadNodeID), + zap.Int("cluster_count", len(clusters)), + ) + + // Recover each cluster sequentially (avoid overloading replacement nodes) + successCount := 0 + for _, cluster := range clusters { + recoveryKey := "recovery:" + cluster.ID + cm.provisioningMu.Lock() + if cm.provisioning[recoveryKey] { + cm.provisioningMu.Unlock() + cm.logger.Info("Recovery already in progress for cluster, skipping", + zap.String("cluster_id", cluster.ID), + zap.String("namespace", cluster.NamespaceName)) + continue + } + cm.provisioning[recoveryKey] = true + cm.provisioningMu.Unlock() + + clusterCopy := cluster + err := func() error { + defer func() { + cm.provisioningMu.Lock() + delete(cm.provisioning, recoveryKey) + cm.provisioningMu.Unlock() + }() + return cm.ReplaceClusterNode(ctx, &clusterCopy, deadNodeID) + }() + + if err != nil { + cm.logger.Error("Failed to recover cluster", + zap.String("cluster_id", clusterCopy.ID), + zap.String("namespace", clusterCopy.NamespaceName), + zap.String("dead_node", deadNodeID), + zap.Error(err), + ) + cm.logEvent(ctx, clusterCopy.ID, EventRecoveryFailed, deadNodeID, + fmt.Sprintf("Recovery failed: %s", err), nil) + } else { + successCount++ + } + } + + cm.logger.Info("Dead node recovery completed", + zap.String("dead_node", deadNodeID), + zap.Int("clusters_total", len(clusters)), + zap.Int("clusters_recovered", successCount), + ) +} + +// HandleRecoveredNode handles a previously-dead node coming back online. +// It checks if the node was replaced during downtime and cleans up orphaned services. +func (cm *ClusterManager) HandleRecoveredNode(ctx context.Context, nodeID string) { + cm.logger.Info("Handling recovered node — checking for orphaned services", + zap.String("node_id", nodeID), + ) + + // Check if the node still has any cluster assignments + type assignmentCheck struct { + Count int `db:"count"` + } + var results []assignmentCheck + query := `SELECT COUNT(*) as count FROM namespace_cluster_nodes WHERE node_id = ?` + if err := cm.db.Query(ctx, &results, query, nodeID); err != nil { + cm.logger.Warn("Failed to check node assignments", zap.Error(err)) + return + } + + if len(results) > 0 && results[0].Count > 0 { + // Node still has legitimate assignments — just mark active and return + cm.logger.Info("Recovered node still has cluster assignments, marking active", + zap.String("node_id", nodeID), + zap.Int("assignments", results[0].Count)) + cm.markNodeActive(ctx, nodeID) + return + } + + // Node has no assignments — it was replaced. Clean up orphaned services. + cm.logger.Warn("Recovered node was replaced during downtime, cleaning up orphaned services", + zap.String("node_id", nodeID)) + + // Get the node's internal IP to send stop requests + ips, err := cm.getNodeIPs(ctx, nodeID) + if err != nil { + cm.logger.Warn("Failed to get recovered node IPs for cleanup", zap.Error(err)) + cm.markNodeActive(ctx, nodeID) + return + } + + // Find which namespaces were moved away by querying recovery events + type eventInfo struct { + NamespaceName string `db:"namespace_name"` + } + var events []eventInfo + cutoff := time.Now().Add(-24 * time.Hour).Format("2006-01-02 15:04:05") + eventsQuery := ` + SELECT DISTINCT c.namespace_name + FROM namespace_cluster_events e + JOIN namespace_clusters c ON e.namespace_cluster_id = c.id + WHERE e.node_id = ? AND e.event_type = ? AND e.created_at > ? + ` + if err := cm.db.Query(ctx, &events, eventsQuery, nodeID, EventRecoveryStarted, cutoff); err != nil { + cm.logger.Warn("Failed to query recovery events for cleanup", zap.Error(err)) + } + + // Send stop requests for each orphaned namespace + for _, evt := range events { + cm.logger.Info("Stopping orphaned namespace services on recovered node", + zap.String("node_id", nodeID), + zap.String("namespace", evt.NamespaceName)) + cm.sendStopRequest(ctx, ips.InternalIP, "stop-all", evt.NamespaceName, nodeID) + // Also delete the stale cluster-state.json + cm.sendSpawnRequest(ctx, ips.InternalIP, map[string]interface{}{ + "action": "delete-cluster-state", + "namespace": evt.NamespaceName, + "node_id": nodeID, + }) + } + + // Mark node as active again — it's available for future use + cm.markNodeActive(ctx, nodeID) + + cm.logger.Info("Recovered node cleanup completed", + zap.String("node_id", nodeID), + zap.Int("namespaces_cleaned", len(events))) +} + +// ReplaceClusterNode replaces a dead node in a specific namespace cluster. +// It selects a new node, allocates ports, spawns services, updates DNS, and cleans up. +func (cm *ClusterManager) ReplaceClusterNode(ctx context.Context, cluster *NamespaceCluster, deadNodeID string) error { + cm.logger.Info("Starting node replacement in cluster", + zap.String("cluster_id", cluster.ID), + zap.String("namespace", cluster.NamespaceName), + zap.String("dead_node", deadNodeID), + ) + cm.logEvent(ctx, cluster.ID, EventRecoveryStarted, deadNodeID, + fmt.Sprintf("Recovery started: replacing dead node %s", deadNodeID), nil) + + // 1. Mark dead node's assignments as failed + if err := cm.updateClusterNodeStatus(ctx, cluster.ID, deadNodeID, NodeStatusFailed); err != nil { + cm.logger.Warn("Failed to mark node as failed in cluster", zap.Error(err)) + } + + // 2. Mark cluster as degraded + cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusDegraded, + fmt.Sprintf("Node %s is dead, recovery in progress", deadNodeID)) + cm.logEvent(ctx, cluster.ID, EventClusterDegraded, deadNodeID, "Cluster degraded due to dead node", nil) + + // 3. Get all current cluster nodes and their info + clusterNodes, err := cm.getClusterNodes(ctx, cluster.ID) + if err != nil { + return fmt.Errorf("failed to get cluster nodes: %w", err) + } + + // Build exclude list (all current cluster members) + excludeIDs := make([]string, 0, len(clusterNodes)) + for _, cn := range clusterNodes { + excludeIDs = append(excludeIDs, cn.NodeID) + } + + // 4. Select replacement node + replacement, err := cm.nodeSelector.SelectReplacementNode(ctx, excludeIDs) + if err != nil { + return fmt.Errorf("failed to select replacement node: %w", err) + } + + cm.logger.Info("Selected replacement node", + zap.String("namespace", cluster.NamespaceName), + zap.String("replacement_node", replacement.NodeID), + zap.String("replacement_ip", replacement.InternalIP), + ) + + // 5. Allocate ports on replacement node + portBlock, err := cm.portAllocator.AllocatePortBlock(ctx, replacement.NodeID, cluster.ID) + if err != nil { + return fmt.Errorf("failed to allocate ports on replacement node: %w", err) + } + + // 6. Get surviving nodes' port info + var surviving []survivingNodePorts + portsQuery := ` + SELECT pa.node_id, COALESCE(dn.internal_ip, dn.ip_address) as internal_ip, dn.ip_address, + pa.rqlite_http_port, pa.rqlite_raft_port, pa.olric_http_port, + pa.olric_memberlist_port, pa.gateway_http_port + FROM namespace_port_allocations pa + JOIN dns_nodes dn ON pa.node_id = dn.id + WHERE pa.namespace_cluster_id = ? AND pa.node_id != ? + ` + if err := cm.db.Query(ctx, &surviving, portsQuery, cluster.ID, deadNodeID); err != nil { + // Rollback port allocation + cm.portAllocator.DeallocatePortBlock(ctx, cluster.ID, replacement.NodeID) + return fmt.Errorf("failed to query surviving node ports: %w", err) + } + + // 7. Determine dead node's roles + deadNodeRoles := make(map[NodeRole]bool) + var deadNodeRaftPort int + for _, cn := range clusterNodes { + if cn.NodeID == deadNodeID { + deadNodeRoles[cn.Role] = true + if cn.Role == NodeRoleRQLiteLeader || cn.Role == NodeRoleRQLiteFollower { + deadNodeRaftPort = cn.RQLiteRaftPort + } + } + } + + // 8. Remove dead node from RQLite Raft cluster (before joining replacement) + if deadNodeRoles[NodeRoleRQLiteLeader] || deadNodeRoles[NodeRoleRQLiteFollower] { + deadIPs, err := cm.getNodeIPs(ctx, deadNodeID) + if err == nil && deadNodeRaftPort > 0 { + deadRaftAddr := fmt.Sprintf("%s:%d", deadIPs.InternalIP, deadNodeRaftPort) + cm.removeDeadNodeFromRaft(ctx, deadRaftAddr, surviving) + } + } + + spawnErrors := 0 + + // 9. Spawn RQLite follower on replacement + if deadNodeRoles[NodeRoleRQLiteLeader] || deadNodeRoles[NodeRoleRQLiteFollower] { + var joinAddr string + for _, s := range surviving { + if s.RQLiteRaftPort > 0 { + joinAddr = fmt.Sprintf("%s:%d", s.InternalIP, s.RQLiteRaftPort) + break + } + } + + rqliteCfg := rqlite.InstanceConfig{ + Namespace: cluster.NamespaceName, + NodeID: replacement.NodeID, + HTTPPort: portBlock.RQLiteHTTPPort, + RaftPort: portBlock.RQLiteRaftPort, + HTTPAdvAddress: fmt.Sprintf("%s:%d", replacement.InternalIP, portBlock.RQLiteHTTPPort), + RaftAdvAddress: fmt.Sprintf("%s:%d", replacement.InternalIP, portBlock.RQLiteRaftPort), + JoinAddresses: []string{joinAddr}, + IsLeader: false, + } + + var spawnErr error + if replacement.NodeID == cm.localNodeID { + spawnErr = cm.spawnRQLiteWithSystemd(ctx, rqliteCfg) + } else { + _, spawnErr = cm.spawnRQLiteRemote(ctx, replacement.InternalIP, rqliteCfg) + } + if spawnErr != nil { + cm.logger.Error("Failed to spawn RQLite follower on replacement", + zap.String("node", replacement.NodeID), zap.Error(spawnErr)) + spawnErrors++ + } else { + cm.insertClusterNode(ctx, cluster.ID, replacement.NodeID, NodeRoleRQLiteFollower, portBlock) + cm.logEvent(ctx, cluster.ID, EventRQLiteStarted, replacement.NodeID, + "RQLite follower started on replacement node", nil) + } + } + + // 10. Spawn Olric on replacement + if deadNodeRoles[NodeRoleOlric] { + var olricPeers []string + for _, s := range surviving { + if s.OlricMemberlistPort > 0 { + olricPeers = append(olricPeers, fmt.Sprintf("%s:%d", s.InternalIP, s.OlricMemberlistPort)) + } + } + + olricCfg := olric.InstanceConfig{ + Namespace: cluster.NamespaceName, + NodeID: replacement.NodeID, + HTTPPort: portBlock.OlricHTTPPort, + MemberlistPort: portBlock.OlricMemberlistPort, + BindAddr: replacement.InternalIP, + AdvertiseAddr: replacement.InternalIP, + PeerAddresses: olricPeers, + } + + var spawnErr error + if replacement.NodeID == cm.localNodeID { + spawnErr = cm.spawnOlricWithSystemd(ctx, olricCfg) + } else { + _, spawnErr = cm.spawnOlricRemote(ctx, replacement.InternalIP, olricCfg) + } + if spawnErr != nil { + cm.logger.Error("Failed to spawn Olric on replacement", + zap.String("node", replacement.NodeID), zap.Error(spawnErr)) + spawnErrors++ + } else { + cm.insertClusterNode(ctx, cluster.ID, replacement.NodeID, NodeRoleOlric, portBlock) + cm.logEvent(ctx, cluster.ID, EventOlricStarted, replacement.NodeID, + "Olric started on replacement node", nil) + } + } + + // 11. Spawn Gateway on replacement + if deadNodeRoles[NodeRoleGateway] { + // Build Olric server addresses — all nodes including replacement + var olricServers []string + for _, s := range surviving { + if s.OlricHTTPPort > 0 { + olricServers = append(olricServers, fmt.Sprintf("%s:%d", s.InternalIP, s.OlricHTTPPort)) + } + } + olricServers = append(olricServers, fmt.Sprintf("%s:%d", replacement.InternalIP, portBlock.OlricHTTPPort)) + + gwCfg := gateway.InstanceConfig{ + Namespace: cluster.NamespaceName, + NodeID: replacement.NodeID, + HTTPPort: portBlock.GatewayHTTPPort, + BaseDomain: cm.baseDomain, + RQLiteDSN: fmt.Sprintf("http://localhost:%d", portBlock.RQLiteHTTPPort), + GlobalRQLiteDSN: cm.globalRQLiteDSN, + OlricServers: olricServers, + OlricTimeout: 30 * time.Second, + IPFSClusterAPIURL: cm.ipfsClusterAPIURL, + IPFSAPIURL: cm.ipfsAPIURL, + IPFSTimeout: cm.ipfsTimeout, + IPFSReplicationFactor: cm.ipfsReplicationFactor, + } + + var spawnErr error + if replacement.NodeID == cm.localNodeID { + spawnErr = cm.spawnGatewayWithSystemd(ctx, gwCfg) + } else { + _, spawnErr = cm.spawnGatewayRemote(ctx, replacement.InternalIP, gwCfg) + } + if spawnErr != nil { + cm.logger.Error("Failed to spawn Gateway on replacement", + zap.String("node", replacement.NodeID), zap.Error(spawnErr)) + spawnErrors++ + } else { + cm.insertClusterNode(ctx, cluster.ID, replacement.NodeID, NodeRoleGateway, portBlock) + cm.logEvent(ctx, cluster.ID, EventGatewayStarted, replacement.NodeID, + "Gateway started on replacement node", nil) + } + } + + // 12. Update DNS: swap dead node's PUBLIC IP for replacement's PUBLIC IP + deadIPs, err := cm.getNodeIPs(ctx, deadNodeID) + if err == nil && deadIPs.IPAddress != "" { + dnsManager := NewDNSRecordManager(cm.db, cm.baseDomain, cm.logger) + if err := dnsManager.UpdateNamespaceRecord(ctx, cluster.NamespaceName, deadIPs.IPAddress, replacement.IPAddress); err != nil { + cm.logger.Error("Failed to update DNS records", + zap.String("namespace", cluster.NamespaceName), + zap.String("old_ip", deadIPs.IPAddress), + zap.String("new_ip", replacement.IPAddress), + zap.Error(err)) + } else { + cm.logger.Info("DNS records updated", + zap.String("namespace", cluster.NamespaceName), + zap.String("old_ip", deadIPs.IPAddress), + zap.String("new_ip", replacement.IPAddress)) + cm.logEvent(ctx, cluster.ID, EventDNSCreated, replacement.NodeID, + fmt.Sprintf("DNS updated: %s → %s", deadIPs.IPAddress, replacement.IPAddress), nil) + } + } + + // 13. Clean up dead node's port allocations and cluster assignments + cm.portAllocator.DeallocatePortBlock(ctx, cluster.ID, deadNodeID) + cm.removeClusterNodeAssignment(ctx, cluster.ID, deadNodeID) + + // 14. Update cluster-state.json on all nodes + cm.updateClusterStateAfterRecovery(ctx, cluster) + + // 15. Update cluster status + if spawnErrors == 0 { + cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusReady, "") + } + // If there were spawn errors, cluster stays degraded + + cm.logEvent(ctx, cluster.ID, EventNodeReplaced, replacement.NodeID, + fmt.Sprintf("Dead node %s replaced by %s", deadNodeID, replacement.NodeID), + map[string]interface{}{ + "dead_node": deadNodeID, + "replacement_node": replacement.NodeID, + "spawn_errors": spawnErrors, + }) + cm.logEvent(ctx, cluster.ID, EventRecoveryComplete, "", "Recovery completed", nil) + + cm.logger.Info("Node replacement completed", + zap.String("cluster_id", cluster.ID), + zap.String("namespace", cluster.NamespaceName), + zap.String("dead_node", deadNodeID), + zap.String("replacement", replacement.NodeID), + zap.Int("spawn_errors", spawnErrors), + ) + + return nil +} + +// --- Helper methods --- + +// getClustersByNodeID returns all ready/degraded clusters that have the given node assigned. +func (cm *ClusterManager) getClustersByNodeID(ctx context.Context, nodeID string) ([]NamespaceCluster, error) { + internalCtx := client.WithInternalAuth(ctx) + + type clusterRef struct { + ClusterID string `db:"namespace_cluster_id"` + } + var refs []clusterRef + query := ` + SELECT DISTINCT cn.namespace_cluster_id + FROM namespace_cluster_nodes cn + JOIN namespace_clusters c ON cn.namespace_cluster_id = c.id + WHERE cn.node_id = ? AND c.status IN ('ready', 'degraded') + ` + if err := cm.db.Query(internalCtx, &refs, query, nodeID); err != nil { + return nil, fmt.Errorf("failed to query clusters by node: %w", err) + } + + var clusters []NamespaceCluster + for _, ref := range refs { + cluster, err := cm.GetCluster(internalCtx, ref.ClusterID) + if err != nil || cluster == nil { + continue + } + clusters = append(clusters, *cluster) + } + return clusters, nil +} + +// updateClusterNodeStatus marks a specific cluster node assignment with a new status. +func (cm *ClusterManager) updateClusterNodeStatus(ctx context.Context, clusterID, nodeID string, status NodeStatus) error { + query := `UPDATE namespace_cluster_nodes SET status = ?, updated_at = ? WHERE namespace_cluster_id = ? AND node_id = ?` + _, err := cm.db.Exec(ctx, query, status, time.Now().Format("2006-01-02 15:04:05"), clusterID, nodeID) + return err +} + +// removeClusterNodeAssignment deletes all node assignments for a node in a cluster. +func (cm *ClusterManager) removeClusterNodeAssignment(ctx context.Context, clusterID, nodeID string) { + query := `DELETE FROM namespace_cluster_nodes WHERE namespace_cluster_id = ? AND node_id = ?` + if _, err := cm.db.Exec(ctx, query, clusterID, nodeID); err != nil { + cm.logger.Warn("Failed to remove cluster node assignment", + zap.String("cluster_id", clusterID), + zap.String("node_id", nodeID), + zap.Error(err)) + } +} + +// getNodeIPs returns both the internal (WireGuard) and public IP for a node. +func (cm *ClusterManager) getNodeIPs(ctx context.Context, nodeID string) (*nodeIPInfo, error) { + var results []nodeIPInfo + query := `SELECT COALESCE(internal_ip, ip_address) as internal_ip, ip_address FROM dns_nodes WHERE id = ? LIMIT 1` + if err := cm.db.Query(ctx, &results, query, nodeID); err != nil || len(results) == 0 { + return nil, fmt.Errorf("node %s not found in dns_nodes", nodeID) + } + return &results[0], nil +} + +// markNodeOffline sets a node's status to 'offline' in dns_nodes. +func (cm *ClusterManager) markNodeOffline(ctx context.Context, nodeID string) error { + query := `UPDATE dns_nodes SET status = 'offline', updated_at = ? WHERE id = ?` + _, err := cm.db.Exec(ctx, query, time.Now().Format("2006-01-02 15:04:05"), nodeID) + return err +} + +// markNodeActive sets a node's status to 'active' in dns_nodes. +func (cm *ClusterManager) markNodeActive(ctx context.Context, nodeID string) { + query := `UPDATE dns_nodes SET status = 'active', updated_at = ? WHERE id = ?` + if _, err := cm.db.Exec(ctx, query, time.Now().Format("2006-01-02 15:04:05"), nodeID); err != nil { + cm.logger.Warn("Failed to mark node active", zap.String("node_id", nodeID), zap.Error(err)) + } +} + +// removeDeadNodeFromRaft sends a DELETE request to a surviving RQLite node +// to remove the dead node from the Raft voter set. +func (cm *ClusterManager) removeDeadNodeFromRaft(ctx context.Context, deadRaftAddr string, survivingNodes []survivingNodePorts) { + if deadRaftAddr == "" { + return + } + + payload, _ := json.Marshal(map[string]string{"id": deadRaftAddr}) + + for _, s := range survivingNodes { + if s.RQLiteHTTPPort == 0 { + continue + } + url := fmt.Sprintf("http://%s:%d/remove", s.InternalIP, s.RQLiteHTTPPort) + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, bytes.NewReader(payload)) + if err != nil { + continue + } + req.Header.Set("Content-Type", "application/json") + + httpClient := &http.Client{Timeout: 10 * time.Second} + resp, err := httpClient.Do(req) + if err != nil { + cm.logger.Warn("Failed to remove dead node from Raft via this node", + zap.String("target", s.NodeID), zap.Error(err)) + continue + } + resp.Body.Close() + + if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusNoContent { + cm.logger.Info("Removed dead node from Raft cluster", + zap.String("dead_raft_addr", deadRaftAddr), + zap.String("via_node", s.NodeID)) + return + } + cm.logger.Warn("Raft removal returned unexpected status", + zap.String("via_node", s.NodeID), + zap.Int("status", resp.StatusCode)) + } + cm.logger.Warn("Could not remove dead node from Raft cluster (best-effort)", + zap.String("dead_raft_addr", deadRaftAddr)) +} + +// updateClusterStateAfterRecovery rebuilds and distributes cluster-state.json +// to all current nodes in the cluster (surviving + replacement). +func (cm *ClusterManager) updateClusterStateAfterRecovery(ctx context.Context, cluster *NamespaceCluster) { + // Re-query all current nodes and ports + var allPorts []survivingNodePorts + query := ` + SELECT pa.node_id, COALESCE(dn.internal_ip, dn.ip_address) as internal_ip, dn.ip_address, + pa.rqlite_http_port, pa.rqlite_raft_port, pa.olric_http_port, + pa.olric_memberlist_port, pa.gateway_http_port + FROM namespace_port_allocations pa + JOIN dns_nodes dn ON pa.node_id = dn.id + WHERE pa.namespace_cluster_id = ? + ` + if err := cm.db.Query(ctx, &allPorts, query, cluster.ID); err != nil { + cm.logger.Warn("Failed to query ports for state update", zap.Error(err)) + return + } + + // Convert to the format expected by saveClusterStateToAllNodes + nodes := make([]NodeCapacity, len(allPorts)) + portBlocks := make([]*PortBlock, len(allPorts)) + for i, np := range allPorts { + nodes[i] = NodeCapacity{ + NodeID: np.NodeID, + InternalIP: np.InternalIP, + IPAddress: np.IPAddress, + } + portBlocks[i] = &PortBlock{ + RQLiteHTTPPort: np.RQLiteHTTPPort, + RQLiteRaftPort: np.RQLiteRaftPort, + OlricHTTPPort: np.OlricHTTPPort, + OlricMemberlistPort: np.OlricMemberlistPort, + GatewayHTTPPort: np.GatewayHTTPPort, + } + } + + cm.saveClusterStateToAllNodes(ctx, cluster, nodes, portBlocks) +} + +// RepairCluster checks a namespace cluster for missing nodes and adds replacements +// without touching surviving nodes. This is used to repair under-provisioned clusters +// (e.g., after manual node removal) without data loss or downtime. +func (cm *ClusterManager) RepairCluster(ctx context.Context, namespaceName string) error { + cm.logger.Info("Starting cluster repair", + zap.String("namespace", namespaceName), + ) + + // 1. Look up the cluster + cluster, err := cm.GetClusterByNamespace(ctx, namespaceName) + if err != nil { + return fmt.Errorf("failed to look up cluster: %w", err) + } + if cluster == nil { + return ErrClusterNotFound + } + + if cluster.Status != ClusterStatusReady && cluster.Status != ClusterStatusDegraded { + return fmt.Errorf("cluster status is %s, can only repair ready or degraded clusters", cluster.Status) + } + + // 2. Acquire per-cluster lock + repairKey := "repair:" + cluster.ID + cm.provisioningMu.Lock() + if cm.provisioning[repairKey] { + cm.provisioningMu.Unlock() + return ErrRecoveryInProgress + } + cm.provisioning[repairKey] = true + cm.provisioningMu.Unlock() + defer func() { + cm.provisioningMu.Lock() + delete(cm.provisioning, repairKey) + cm.provisioningMu.Unlock() + }() + + // 3. Get current cluster nodes + clusterNodes, err := cm.getClusterNodes(ctx, cluster.ID) + if err != nil { + return fmt.Errorf("failed to get cluster nodes: %w", err) + } + + // Count unique physical nodes with active services + activeNodes := make(map[string]bool) + for _, cn := range clusterNodes { + if cn.Status == NodeStatusRunning || cn.Status == NodeStatusStarting { + activeNodes[cn.NodeID] = true + } + } + + // Expected node count is the cluster's configured RQLite count (each physical node + // runs all 3 services: rqlite + olric + gateway) + expectedCount := cluster.RQLiteNodeCount + activeCount := len(activeNodes) + missingCount := expectedCount - activeCount + + if missingCount <= 0 { + cm.logger.Info("Cluster has expected number of active nodes, no repair needed", + zap.String("namespace", namespaceName), + zap.Int("active_nodes", activeCount), + zap.Int("expected", expectedCount), + ) + return nil + } + + cm.logger.Info("Cluster needs repair — adding missing nodes", + zap.String("namespace", namespaceName), + zap.Int("active_nodes", activeCount), + zap.Int("expected", expectedCount), + zap.Int("missing", missingCount), + ) + + cm.logEvent(ctx, cluster.ID, EventRecoveryStarted, "", + fmt.Sprintf("Cluster repair started: %d of %d nodes active, adding %d", activeCount, expectedCount, missingCount), nil) + + // 4. Build the current node exclude list (all physical node IDs in the cluster) + excludeIDs := make([]string, 0) + nodeIDSet := make(map[string]bool) + for _, cn := range clusterNodes { + if !nodeIDSet[cn.NodeID] { + nodeIDSet[cn.NodeID] = true + excludeIDs = append(excludeIDs, cn.NodeID) + } + } + + // 5. Get surviving nodes' port info for joining + var surviving []survivingNodePorts + portsQuery := ` + SELECT pa.node_id, COALESCE(dn.internal_ip, dn.ip_address) as internal_ip, dn.ip_address, + pa.rqlite_http_port, pa.rqlite_raft_port, pa.olric_http_port, + pa.olric_memberlist_port, pa.gateway_http_port + FROM namespace_port_allocations pa + JOIN dns_nodes dn ON pa.node_id = dn.id + WHERE pa.namespace_cluster_id = ? + ` + if err := cm.db.Query(ctx, &surviving, portsQuery, cluster.ID); err != nil { + return fmt.Errorf("failed to query surviving node ports: %w", err) + } + + if len(surviving) == 0 { + return fmt.Errorf("no surviving nodes found with port allocations") + } + + // 6. Add missing nodes one at a time + addedCount := 0 + for i := 0; i < missingCount; i++ { + replacement, portBlock, err := cm.addNodeToCluster(ctx, cluster, excludeIDs, surviving) + if err != nil { + cm.logger.Error("Failed to add node during cluster repair", + zap.String("namespace", namespaceName), + zap.Int("node_index", i+1), + zap.Int("missing", missingCount), + zap.Error(err), + ) + cm.logEvent(ctx, cluster.ID, EventRecoveryFailed, "", + fmt.Sprintf("Repair failed on node %d of %d: %s", i+1, missingCount, err), nil) + break + } + + addedCount++ + + // Update exclude list and surviving list for next iteration + excludeIDs = append(excludeIDs, replacement.NodeID) + surviving = append(surviving, survivingNodePorts{ + NodeID: replacement.NodeID, + InternalIP: replacement.InternalIP, + IPAddress: replacement.IPAddress, + RQLiteHTTPPort: portBlock.RQLiteHTTPPort, + RQLiteRaftPort: portBlock.RQLiteRaftPort, + OlricHTTPPort: portBlock.OlricHTTPPort, + OlricMemberlistPort: portBlock.OlricMemberlistPort, + GatewayHTTPPort: portBlock.GatewayHTTPPort, + }) + } + + if addedCount == 0 { + return fmt.Errorf("failed to add any replacement nodes") + } + + // 7. Update cluster-state.json on all nodes + cm.updateClusterStateAfterRecovery(ctx, cluster) + + // 8. Mark cluster ready + cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusReady, "") + + cm.logEvent(ctx, cluster.ID, EventRecoveryComplete, "", + fmt.Sprintf("Cluster repair completed: added %d of %d missing nodes", addedCount, missingCount), + map[string]interface{}{"added_nodes": addedCount, "missing_nodes": missingCount}) + + cm.logger.Info("Cluster repair completed", + zap.String("namespace", namespaceName), + zap.Int("added_nodes", addedCount), + zap.Int("missing_nodes", missingCount), + ) + + return nil +} + +// addNodeToCluster selects a new node and spawns all services (RQLite follower, Olric, Gateway) +// on it, joining the existing cluster. Returns the replacement node info and allocated port block. +func (cm *ClusterManager) addNodeToCluster( + ctx context.Context, + cluster *NamespaceCluster, + excludeIDs []string, + surviving []survivingNodePorts, +) (*NodeCapacity, *PortBlock, error) { + + // 1. Select replacement node + replacement, err := cm.nodeSelector.SelectReplacementNode(ctx, excludeIDs) + if err != nil { + return nil, nil, fmt.Errorf("failed to select replacement node: %w", err) + } + + cm.logger.Info("Selected node for cluster repair", + zap.String("namespace", cluster.NamespaceName), + zap.String("new_node", replacement.NodeID), + zap.String("new_ip", replacement.InternalIP), + ) + + // 2. Allocate ports on the new node + portBlock, err := cm.portAllocator.AllocatePortBlock(ctx, replacement.NodeID, cluster.ID) + if err != nil { + return nil, nil, fmt.Errorf("failed to allocate ports on new node: %w", err) + } + + // 3. Spawn RQLite follower + var joinAddr string + for _, s := range surviving { + if s.RQLiteRaftPort > 0 { + joinAddr = fmt.Sprintf("%s:%d", s.InternalIP, s.RQLiteRaftPort) + break + } + } + + rqliteCfg := rqlite.InstanceConfig{ + Namespace: cluster.NamespaceName, + NodeID: replacement.NodeID, + HTTPPort: portBlock.RQLiteHTTPPort, + RaftPort: portBlock.RQLiteRaftPort, + HTTPAdvAddress: fmt.Sprintf("%s:%d", replacement.InternalIP, portBlock.RQLiteHTTPPort), + RaftAdvAddress: fmt.Sprintf("%s:%d", replacement.InternalIP, portBlock.RQLiteRaftPort), + JoinAddresses: []string{joinAddr}, + IsLeader: false, + } + + var spawnErr error + if replacement.NodeID == cm.localNodeID { + spawnErr = cm.spawnRQLiteWithSystemd(ctx, rqliteCfg) + } else { + _, spawnErr = cm.spawnRQLiteRemote(ctx, replacement.InternalIP, rqliteCfg) + } + if spawnErr != nil { + cm.portAllocator.DeallocatePortBlock(ctx, cluster.ID, replacement.NodeID) + return nil, nil, fmt.Errorf("failed to spawn RQLite follower: %w", spawnErr) + } + cm.insertClusterNode(ctx, cluster.ID, replacement.NodeID, NodeRoleRQLiteFollower, portBlock) + cm.logEvent(ctx, cluster.ID, EventRQLiteStarted, replacement.NodeID, + "RQLite follower started on new node (repair)", nil) + + // 4. Spawn Olric + var olricPeers []string + for _, s := range surviving { + if s.OlricMemberlistPort > 0 { + olricPeers = append(olricPeers, fmt.Sprintf("%s:%d", s.InternalIP, s.OlricMemberlistPort)) + } + } + + olricCfg := olric.InstanceConfig{ + Namespace: cluster.NamespaceName, + NodeID: replacement.NodeID, + HTTPPort: portBlock.OlricHTTPPort, + MemberlistPort: portBlock.OlricMemberlistPort, + BindAddr: replacement.InternalIP, + AdvertiseAddr: replacement.InternalIP, + PeerAddresses: olricPeers, + } + + if replacement.NodeID == cm.localNodeID { + spawnErr = cm.spawnOlricWithSystemd(ctx, olricCfg) + } else { + _, spawnErr = cm.spawnOlricRemote(ctx, replacement.InternalIP, olricCfg) + } + if spawnErr != nil { + cm.logger.Error("Failed to spawn Olric on new node (repair continues)", + zap.String("node", replacement.NodeID), zap.Error(spawnErr)) + } else { + cm.insertClusterNode(ctx, cluster.ID, replacement.NodeID, NodeRoleOlric, portBlock) + cm.logEvent(ctx, cluster.ID, EventOlricStarted, replacement.NodeID, + "Olric started on new node (repair)", nil) + } + + // 5. Spawn Gateway + var olricServers []string + for _, s := range surviving { + if s.OlricHTTPPort > 0 { + olricServers = append(olricServers, fmt.Sprintf("%s:%d", s.InternalIP, s.OlricHTTPPort)) + } + } + olricServers = append(olricServers, fmt.Sprintf("%s:%d", replacement.InternalIP, portBlock.OlricHTTPPort)) + + gwCfg := gateway.InstanceConfig{ + Namespace: cluster.NamespaceName, + NodeID: replacement.NodeID, + HTTPPort: portBlock.GatewayHTTPPort, + BaseDomain: cm.baseDomain, + RQLiteDSN: fmt.Sprintf("http://localhost:%d", portBlock.RQLiteHTTPPort), + GlobalRQLiteDSN: cm.globalRQLiteDSN, + OlricServers: olricServers, + OlricTimeout: 30 * time.Second, + IPFSClusterAPIURL: cm.ipfsClusterAPIURL, + IPFSAPIURL: cm.ipfsAPIURL, + IPFSTimeout: cm.ipfsTimeout, + IPFSReplicationFactor: cm.ipfsReplicationFactor, + } + + if replacement.NodeID == cm.localNodeID { + spawnErr = cm.spawnGatewayWithSystemd(ctx, gwCfg) + } else { + _, spawnErr = cm.spawnGatewayRemote(ctx, replacement.InternalIP, gwCfg) + } + if spawnErr != nil { + cm.logger.Error("Failed to spawn Gateway on new node (repair continues)", + zap.String("node", replacement.NodeID), zap.Error(spawnErr)) + } else { + cm.insertClusterNode(ctx, cluster.ID, replacement.NodeID, NodeRoleGateway, portBlock) + cm.logEvent(ctx, cluster.ID, EventGatewayStarted, replacement.NodeID, + "Gateway started on new node (repair)", nil) + } + + // 6. Add DNS records for the new node's public IP + dnsManager := NewDNSRecordManager(cm.db, cm.baseDomain, cm.logger) + if err := dnsManager.AddNamespaceRecord(ctx, cluster.NamespaceName, replacement.IPAddress); err != nil { + cm.logger.Error("Failed to add DNS record for new node", + zap.String("namespace", cluster.NamespaceName), + zap.String("ip", replacement.IPAddress), + zap.Error(err)) + } else { + cm.logEvent(ctx, cluster.ID, EventDNSCreated, replacement.NodeID, + fmt.Sprintf("DNS record added for new node %s", replacement.IPAddress), nil) + } + + cm.logEvent(ctx, cluster.ID, EventNodeReplaced, replacement.NodeID, + fmt.Sprintf("New node %s added to cluster (repair)", replacement.NodeID), + map[string]interface{}{"new_node": replacement.NodeID}) + + return replacement, portBlock, nil +} diff --git a/pkg/namespace/dns_manager.go b/pkg/namespace/dns_manager.go index 8febcf4..43f0f9c 100644 --- a/pkg/namespace/dns_manager.go +++ b/pkg/namespace/dns_manager.go @@ -182,6 +182,48 @@ func (drm *DNSRecordManager) GetNamespaceGatewayIPs(ctx context.Context, namespa return ips, nil } +// AddNamespaceRecord adds DNS A records for a single IP to an existing namespace. +// Unlike CreateNamespaceRecords, this does NOT delete existing records — it's purely additive. +// Used when adding a new node to an under-provisioned cluster (repair). +func (drm *DNSRecordManager) AddNamespaceRecord(ctx context.Context, namespaceName, ip string) error { + internalCtx := client.WithInternalAuth(ctx) + + fqdn := fmt.Sprintf("ns-%s.%s.", namespaceName, drm.baseDomain) + wildcardFqdn := fmt.Sprintf("*.ns-%s.%s.", namespaceName, drm.baseDomain) + + drm.logger.Info("Adding DNS record for namespace", + zap.String("namespace", namespaceName), + zap.String("ip", ip), + ) + + now := time.Now() + for _, f := range []string{fqdn, wildcardFqdn} { + recordID := uuid.New().String() + insertQuery := ` + INSERT INTO dns_records ( + id, fqdn, record_type, value, ttl, namespace, created_by, is_active, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ` + _, err := drm.db.Exec(internalCtx, insertQuery, + recordID, f, "A", ip, 60, + "namespace:"+namespaceName, "cluster-manager", true, now, now, + ) + if err != nil { + return &ClusterError{ + Message: fmt.Sprintf("failed to add DNS record %s -> %s", f, ip), + Cause: err, + } + } + } + + drm.logger.Info("DNS records added for namespace", + zap.String("namespace", namespaceName), + zap.String("ip", ip), + ) + + return nil +} + // UpdateNamespaceRecord updates a specific node's DNS record (for failover) func (drm *DNSRecordManager) UpdateNamespaceRecord(ctx context.Context, namespaceName, oldIP, newIP string) error { internalCtx := client.WithInternalAuth(ctx) diff --git a/pkg/namespace/node_selector.go b/pkg/namespace/node_selector.go index 929e645..242b9d2 100644 --- a/pkg/namespace/node_selector.go +++ b/pkg/namespace/node_selector.go @@ -117,6 +117,56 @@ func (cns *ClusterNodeSelector) SelectNodesForCluster(ctx context.Context, nodeC return selectedNodes, nil } +// SelectReplacementNode selects a single optimal node for replacing a dead node +// in an existing cluster. excludeNodeIDs contains nodes that should not be +// selected (dead node + existing cluster members). +func (cns *ClusterNodeSelector) SelectReplacementNode(ctx context.Context, excludeNodeIDs []string) (*NodeCapacity, error) { + internalCtx := client.WithInternalAuth(ctx) + + activeNodes, err := cns.getActiveNodes(internalCtx) + if err != nil { + return nil, err + } + + exclude := make(map[string]bool, len(excludeNodeIDs)) + for _, id := range excludeNodeIDs { + exclude[id] = true + } + + var eligible []NodeCapacity + for _, node := range activeNodes { + if exclude[node.NodeID] { + continue + } + capacity, err := cns.getNodeCapacity(internalCtx, node.NodeID, node.IPAddress, node.InternalIP) + if err != nil { + cns.logger.Warn("Failed to get node capacity for replacement, skipping", + zap.String("node_id", node.NodeID), zap.Error(err)) + continue + } + if capacity.AvailableNamespaceSlots > 0 { + eligible = append(eligible, *capacity) + } + } + + if len(eligible) == 0 { + return nil, ErrInsufficientNodes + } + + sort.Slice(eligible, func(i, j int) bool { + return eligible[i].Score > eligible[j].Score + }) + + selected := &eligible[0] + cns.logger.Info("Selected replacement node", + zap.String("node_id", selected.NodeID), + zap.Float64("score", selected.Score), + zap.Int("available_slots", selected.AvailableNamespaceSlots), + ) + + return selected, nil +} + // nodeInfo is used for querying active nodes type nodeInfo struct { NodeID string `db:"id"` diff --git a/pkg/namespace/types.go b/pkg/namespace/types.go index 2b12afd..b3146f2 100644 --- a/pkg/namespace/types.go +++ b/pkg/namespace/types.go @@ -58,6 +58,10 @@ const ( EventNodeRecovered EventType = "node_recovered" EventDeprovisionStarted EventType = "deprovisioning_started" EventDeprovisioned EventType = "deprovisioned" + EventRecoveryStarted EventType = "recovery_started" + EventNodeReplaced EventType = "node_replaced" + EventRecoveryComplete EventType = "recovery_complete" + EventRecoveryFailed EventType = "recovery_failed" ) // Port allocation constants @@ -201,4 +205,5 @@ var ( ErrProvisioningFailed = &ClusterError{Message: "cluster provisioning failed"} ErrNamespaceNotFound = &ClusterError{Message: "namespace not found"} ErrInvalidClusterStatus = &ClusterError{Message: "invalid cluster status for operation"} + ErrRecoveryInProgress = &ClusterError{Message: "recovery already in progress for this cluster"} ) diff --git a/pkg/node/gateway.go b/pkg/node/gateway.go index 136be43..6e147d4 100644 --- a/pkg/node/gateway.go +++ b/pkg/node/gateway.go @@ -57,10 +57,7 @@ func (n *Node) startHTTPGateway(ctx context.Context) error { IPFSTimeout: n.config.HTTPGateway.IPFSTimeout, BaseDomain: n.config.HTTPGateway.BaseDomain, DataDir: oramaDir, - ClusterSecret: clusterSecret, - PhantomAuthURL: os.Getenv("PHANTOM_AUTH_URL"), - SolanaRPCURL: os.Getenv("SOLANA_RPC_URL"), - NFTCollectionAddress: os.Getenv("NFT_COLLECTION_ADDRESS"), + ClusterSecret: clusterSecret, } apiGateway, err := gateway.New(gatewayLogger, gwCfg) @@ -84,6 +81,7 @@ func (n *Node) startHTTPGateway(ctx context.Context) error { clusterManager := namespace.NewClusterManager(ormClient, clusterCfg, n.logger.Logger) clusterManager.SetLocalNodeID(gwCfg.NodePeerID) apiGateway.SetClusterProvisioner(clusterManager) + apiGateway.SetNodeRecoverer(clusterManager) // Wire spawn handler for distributed namespace instance spawning systemdSpawner := namespace.NewSystemdSpawner(baseDataDir, n.logger.Logger) diff --git a/pkg/node/health/monitor.go b/pkg/node/health/monitor.go index af91beb..ee7ea53 100644 --- a/pkg/node/health/monitor.go +++ b/pkg/node/health/monitor.go @@ -60,7 +60,8 @@ type Monitor struct { mu sync.Mutex peers map[string]*peerState // nodeID → state - onDeadFn func(nodeID string) // callback when quorum confirms death + onDeadFn func(nodeID string) // callback when quorum confirms death + onRecoveredFn func(nodeID string) // callback when node transitions from dead → healthy } // NewMonitor creates a new health monitor. @@ -94,6 +95,12 @@ func (m *Monitor) OnNodeDead(fn func(nodeID string)) { m.onDeadFn = fn } +// OnNodeRecovered registers a callback invoked when a previously dead node +// transitions back to healthy. Used to clean up orphaned services. +func (m *Monitor) OnNodeRecovered(fn func(nodeID string)) { + m.onRecoveredFn = fn +} + // Start runs the monitor loop until ctx is cancelled. func (m *Monitor) Start(ctx context.Context) { m.logger.Info("Starting node health monitor", @@ -173,8 +180,17 @@ func (m *Monitor) updateState(ctx context.Context, nodeID string, healthy bool) if healthy { // Recovered if ps.status != "healthy" { - m.logger.Info("Node recovered", zap.String("target", nodeID)) + wasDead := ps.status == "dead" + m.logger.Info("Node recovered", zap.String("target", nodeID), + zap.String("previous_status", ps.status)) m.writeEvent(ctx, nodeID, "recovered") + + // Fire recovery callback for nodes that were confirmed dead + if wasDead && m.onRecoveredFn != nil { + m.mu.Unlock() + m.onRecoveredFn(nodeID) + m.mu.Lock() + } } ps.missCount = 0 ps.status = "healthy"