Auto node recovery and failover if node is dead, added wallet authentication with phantom and root wallet + cluster repair logic

This commit is contained in:
anonpenguin23 2026-02-13 08:16:01 +02:00
parent 5fed8a6c88
commit 266507ef09
22 changed files with 1225 additions and 82 deletions

View File

@ -86,7 +86,7 @@ test-e2e-quick:
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill redeploy-devnet redeploy-testnet release health
VERSION := 0.102.5
VERSION := 0.103.0
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'

View File

@ -175,7 +175,8 @@ func showHelp() {
fmt.Printf(" db backups <name> - List database backups\n\n")
fmt.Printf("🏢 Namespaces:\n")
fmt.Printf(" namespace delete - Delete current namespace and all resources\n\n")
fmt.Printf(" namespace delete - Delete current namespace and all resources\n")
fmt.Printf(" namespace repair <name> - Repair under-provisioned cluster (add missing nodes)\n\n")
fmt.Printf("🔍 Cluster Inspection:\n")
fmt.Printf(" inspect - Inspect cluster health via SSH\n")

View File

@ -192,17 +192,6 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config {
cfg.IPFSReplicationFactor = y.IPFSReplicationFactor
}
// Phantom Solana auth (from env vars)
if v := os.Getenv("PHANTOM_AUTH_URL"); v != "" {
cfg.PhantomAuthURL = v
}
if v := os.Getenv("SOLANA_RPC_URL"); v != "" {
cfg.SolanaRPCURL = v
}
if v := os.Getenv("NFT_COLLECTION_ADDRESS"); v != "" {
cfg.NFTCollectionAddress = v
}
// Validate configuration
if errs := cfg.ValidateConfig(); len(errs) > 0 {
fmt.Fprintf(os.Stderr, "\nGateway configuration errors (%d):\n", len(errs))

View File

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"
@ -15,10 +16,12 @@ import (
qrterminal "github.com/mdp/qrterminal/v3"
)
// Hardcoded Phantom auth React app URL (deployed on Orama devnet)
const phantomAuthURL = "https://phantom-auth-y0w9aa.orama-devnet.network"
// PhantomSession represents a phantom auth session from the gateway.
type PhantomSession struct {
SessionID string `json:"session_id"`
AuthURL string `json:"auth_url"`
ExpiresAt string `json:"expires_at"`
}
@ -71,10 +74,13 @@ func PerformPhantomAuthentication(gatewayURL, namespace string) (*Credentials, e
return nil, fmt.Errorf("failed to create session: %w", err)
}
// 2. Display QR code
// 2. Build auth URL and display QR code
authURL := fmt.Sprintf("%s/?session=%s&gateway=%s&namespace=%s",
phantomAuthURL, session.SessionID, url.QueryEscape(gatewayURL), url.QueryEscape(namespace))
fmt.Println("\nScan this QR code with your phone to authenticate:")
fmt.Println()
qrterminal.GenerateWithConfig(session.AuthURL, qrterminal.Config{
qrterminal.GenerateWithConfig(authURL, qrterminal.Config{
Level: qrterminal.M,
Writer: os.Stdout,
BlackChar: qrterminal.BLACK,
@ -82,7 +88,7 @@ func PerformPhantomAuthentication(gatewayURL, namespace string) (*Credentials, e
QuietZone: 1,
})
fmt.Println()
fmt.Printf("Or open this URL on your phone:\n%s\n\n", session.AuthURL)
fmt.Printf("Or open this URL on your phone:\n%s\n\n", authURL)
fmt.Println("Waiting for authentication... (timeout: 5 minutes)")
// 3. Poll for completion
@ -94,7 +100,11 @@ func PerformPhantomAuthentication(gatewayURL, namespace string) (*Credentials, e
// Set namespace and build namespace URL
creds.Namespace = namespace
if domain := extractDomainFromURL(gatewayURL); domain != "" {
creds.NamespaceURL = fmt.Sprintf("https://ns-%s.%s", namespace, domain)
if namespace == "default" {
creds.NamespaceURL = fmt.Sprintf("https://%s", domain)
} else {
creds.NamespaceURL = fmt.Sprintf("https://ns-%s.%s", namespace, domain)
}
}
fmt.Printf("\n🎉 Authentication successful!\n")

View File

@ -208,7 +208,11 @@ func verifySignature(client *http.Client, gatewayURL, wallet, nonce, signature,
// Build namespace gateway URL
namespaceURL := ""
if d := extractDomainFromURL(gatewayURL); d != "" {
namespaceURL = fmt.Sprintf("https://ns-%s.%s", namespace, d)
if namespace == "default" {
namespaceURL = fmt.Sprintf("https://%s", d)
} else {
namespaceURL = fmt.Sprintf("https://ns-%s.%s", namespace, d)
}
}
creds := &Credentials{
@ -221,9 +225,8 @@ func verifySignature(client *http.Client, gatewayURL, wallet, nonce, signature,
NamespaceURL: namespaceURL,
}
if result.ExpiresIn > 0 {
creds.ExpiresAt = time.Now().Add(time.Duration(result.ExpiresIn) * time.Second)
}
// Note: result.ExpiresIn is the JWT access token lifetime (15min),
// NOT the API key lifetime. Don't set ExpiresAt — the API key is permanent.
return creds, nil
}

View File

@ -76,7 +76,11 @@ func PerformSimpleAuthentication(gatewayURL, wallet, namespace, existingAPIKey s
// Build namespace gateway URL from the gateway URL
namespaceURL := ""
if domain := extractDomainFromURL(gatewayURL); domain != "" {
namespaceURL = fmt.Sprintf("https://ns-%s.%s", namespace, domain)
if namespace == "default" {
namespaceURL = fmt.Sprintf("https://%s", domain)
} else {
namespaceURL = fmt.Sprintf("https://ns-%s.%s", namespace, domain)
}
}
// Create credentials

View File

@ -11,6 +11,7 @@ import (
"strings"
"github.com/DeBrosOfficial/network/pkg/auth"
"github.com/DeBrosOfficial/network/pkg/constants"
)
// HandleNamespaceCommand handles namespace management commands
@ -28,6 +29,12 @@ func HandleNamespaceCommand(args []string) {
fs.BoolVar(&force, "force", false, "Skip confirmation prompt")
_ = fs.Parse(args[1:])
handleNamespaceDelete(force)
case "repair":
if len(args) < 2 {
fmt.Fprintf(os.Stderr, "Usage: orama namespace repair <namespace_name>\n")
os.Exit(1)
}
handleNamespaceRepair(args[1])
case "help":
showNamespaceHelp()
default:
@ -41,13 +48,53 @@ func showNamespaceHelp() {
fmt.Printf("Namespace Management Commands\n\n")
fmt.Printf("Usage: orama namespace <subcommand>\n\n")
fmt.Printf("Subcommands:\n")
fmt.Printf(" delete - Delete the current namespace and all its resources\n")
fmt.Printf(" help - Show this help message\n\n")
fmt.Printf(" delete - Delete the current namespace and all its resources\n")
fmt.Printf(" repair <namespace> - Repair an under-provisioned namespace cluster (add missing nodes)\n")
fmt.Printf(" help - Show this help message\n\n")
fmt.Printf("Flags:\n")
fmt.Printf(" --force - Skip confirmation prompt\n\n")
fmt.Printf(" --force - Skip confirmation prompt (delete only)\n\n")
fmt.Printf("Examples:\n")
fmt.Printf(" orama namespace delete\n")
fmt.Printf(" orama namespace delete --force\n")
fmt.Printf(" orama namespace repair anchat\n")
}
func handleNamespaceRepair(namespaceName string) {
fmt.Printf("Repairing namespace cluster '%s'...\n", namespaceName)
// Call the internal repair endpoint on the local gateway
url := fmt.Sprintf("http://localhost:%d/v1/internal/namespace/repair?namespace=%s", constants.GatewayAPIPort, namespaceName)
req, err := http.NewRequest(http.MethodPost, url, nil)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create request: %v\n", err)
os.Exit(1)
}
req.Header.Set("X-Orama-Internal-Auth", "namespace-coordination")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to connect to local gateway (is the node running?): %v\n", err)
os.Exit(1)
}
defer resp.Body.Close()
var result map[string]interface{}
json.NewDecoder(resp.Body).Decode(&result)
if resp.StatusCode != http.StatusOK {
errMsg := "unknown error"
if e, ok := result["error"].(string); ok {
errMsg = e
}
fmt.Fprintf(os.Stderr, "Repair failed: %s\n", errMsg)
os.Exit(1)
}
fmt.Printf("Namespace '%s' cluster repaired successfully.\n", namespaceName)
if msg, ok := result["message"].(string); ok {
fmt.Printf(" %s\n", msg)
}
}
func handleNamespaceDelete(force bool) {

View File

@ -161,8 +161,8 @@ func (cg *ConfigGenerator) GenerateNodeConfig(peerAddresses []string, vpsIP stri
P2PPort: 4001,
DataDir: filepath.Join(cg.oramaDir, "data"),
RQLiteHTTPPort: 5001,
RQLiteRaftPort: 7001, // External SNI port
RQLiteRaftInternalPort: raftInternalPort, // Internal RQLite binding port
RQLiteRaftPort: 7001, // External SNI port
RQLiteRaftInternalPort: raftInternalPort, // Internal RQLite binding port
RQLiteJoinAddress: rqliteJoinAddr,
BootstrapPeers: peerAddresses,
ClusterAPIPort: 9094,

View File

@ -20,6 +20,11 @@ const (
tokenProgramID = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
// Metaplex Token Metadata Program ID
metaplexProgramID = "metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s"
// Hardcoded Solana RPC endpoint (mainnet-beta)
defaultSolanaRPCURL = "https://api.mainnet-beta.solana.com"
// Required NFT collection address for Phantom auth
defaultNFTCollectionAddress = "GtsCViqB9fWriKeDMQdveDvYmqqvBCEoxRfu1gzE48uh"
)
// SolanaNFTVerifier verifies NFT ownership on Solana via JSON-RPC.
@ -29,11 +34,11 @@ type SolanaNFTVerifier struct {
httpClient *http.Client
}
// NewSolanaNFTVerifier creates a new verifier for the given collection.
func NewSolanaNFTVerifier(rpcURL, collectionAddress string) *SolanaNFTVerifier {
// NewDefaultSolanaNFTVerifier creates a verifier with the hardcoded collection and RPC endpoint.
func NewDefaultSolanaNFTVerifier() *SolanaNFTVerifier {
return &SolanaNFTVerifier{
rpcURL: rpcURL,
collectionAddress: collectionAddress,
rpcURL: defaultSolanaRPCURL,
collectionAddress: defaultNFTCollectionAddress,
httpClient: &http.Client{
Timeout: 30 * time.Second,
},

View File

@ -41,9 +41,4 @@ type Config struct {
// WireGuard mesh configuration
ClusterSecret string // Cluster secret for authenticating internal WireGuard peer exchange
// Phantom Solana auth configuration
PhantomAuthURL string // URL of the deployed Phantom auth React app
SolanaRPCURL string // Solana RPC endpoint for NFT verification
NFTCollectionAddress string // Required NFT collection address for Phantom auth
}

View File

@ -140,6 +140,9 @@ type Gateway struct {
// Node health monitor (ring-based peer failure detection)
healthMonitor *nodehealth.Monitor
// Node recovery handler (called when health monitor confirms a node dead or recovered)
nodeRecoverer authhandlers.NodeRecoverer
}
// localSubscriber represents a WebSocket subscriber for local message delivery
@ -322,18 +325,10 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) {
gw.withInternalAuth,
)
// Configure Phantom Solana auth if env vars are set
if cfg.PhantomAuthURL != "" {
var solanaVerifier *auth.SolanaNFTVerifier
if cfg.SolanaRPCURL != "" && cfg.NFTCollectionAddress != "" {
solanaVerifier = auth.NewSolanaNFTVerifier(cfg.SolanaRPCURL, cfg.NFTCollectionAddress)
logger.ComponentInfo(logging.ComponentGeneral, "Solana NFT verifier configured",
zap.String("collection", cfg.NFTCollectionAddress))
}
gw.authHandlers.SetPhantomConfig(cfg.PhantomAuthURL, solanaVerifier)
logger.ComponentInfo(logging.ComponentGeneral, "Phantom auth configured",
zap.String("auth_url", cfg.PhantomAuthURL))
}
// Configure Solana NFT verifier for Phantom auth (hardcoded collection + RPC)
solanaVerifier := auth.NewDefaultSolanaNFTVerifier()
gw.authHandlers.SetSolanaVerifier(solanaVerifier)
logger.ComponentInfo(logging.ComponentGeneral, "Solana NFT verifier configured")
}
// Initialize middleware cache (60s TTL for auth/routing lookups)
@ -554,8 +549,18 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) {
Neighbors: 3,
})
gw.healthMonitor.OnNodeDead(func(nodeID string) {
logger.ComponentError(logging.ComponentGeneral, "Node confirmed dead by quorum — recovery not yet implemented",
logger.ComponentError(logging.ComponentGeneral, "Node confirmed dead by quorum — starting recovery",
zap.String("dead_node", nodeID))
if gw.nodeRecoverer != nil {
go gw.nodeRecoverer.HandleDeadNode(context.Background(), nodeID)
}
})
gw.healthMonitor.OnNodeRecovered(func(nodeID string) {
logger.ComponentInfo(logging.ComponentGeneral, "Previously dead node recovered — checking for orphaned services",
zap.String("node_id", nodeID))
if gw.nodeRecoverer != nil {
go gw.nodeRecoverer.HandleRecoveredNode(context.Background(), nodeID)
}
})
go gw.healthMonitor.Start(context.Background())
logger.ComponentInfo(logging.ComponentGeneral, "Node health monitor started",
@ -584,6 +589,11 @@ func (g *Gateway) SetClusterProvisioner(cp authhandlers.ClusterProvisioner) {
}
}
// SetNodeRecoverer sets the handler for dead node recovery and revived node cleanup.
func (g *Gateway) SetNodeRecoverer(nr authhandlers.NodeRecoverer) {
g.nodeRecoverer = nr
}
// SetSpawnHandler sets the handler for internal namespace spawn/stop requests.
func (g *Gateway) SetSpawnHandler(h http.Handler) {
g.spawnHandler = h
@ -748,3 +758,43 @@ func (g *Gateway) namespaceClusterStatusHandler(w http.ResponseWriter, r *http.R
json.NewEncoder(w).Encode(status)
}
// namespaceClusterRepairHandler handles POST /v1/internal/namespace/repair?namespace={name}
// This endpoint repairs under-provisioned namespace clusters by adding missing nodes.
// Internal-only: authenticated by X-Orama-Internal-Auth header.
func (g *Gateway) namespaceClusterRepairHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
// Internal auth check
if r.Header.Get("X-Orama-Internal-Auth") != "namespace-coordination" {
writeError(w, http.StatusUnauthorized, "unauthorized")
return
}
namespaceName := r.URL.Query().Get("namespace")
if namespaceName == "" {
writeError(w, http.StatusBadRequest, "namespace parameter required")
return
}
if g.nodeRecoverer == nil {
writeError(w, http.StatusServiceUnavailable, "cluster recovery not enabled")
return
}
if err := g.nodeRecoverer.RepairCluster(r.Context(), namespaceName); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "ok",
"namespace": namespaceName,
"message": "cluster repair completed",
})
}

View File

@ -48,6 +48,14 @@ type ClusterProvisioner interface {
GetClusterStatusByID(ctx context.Context, clusterID string) (interface{}, error)
}
// NodeRecoverer handles automatic recovery when nodes die or come back online,
// and manual cluster repair for under-provisioned clusters.
type NodeRecoverer interface {
HandleDeadNode(ctx context.Context, deadNodeID string)
HandleRecoveredNode(ctx context.Context, nodeID string)
RepairCluster(ctx context.Context, namespaceName string) error
}
// Handlers holds dependencies for authentication HTTP handlers
type Handlers struct {
logger *logging.ColoredLogger
@ -55,8 +63,7 @@ type Handlers struct {
netClient NetworkClient
defaultNS string
internalAuthFn func(context.Context) context.Context
clusterProvisioner ClusterProvisioner // Optional: for namespace cluster provisioning
phantomAuthURL string // URL of the Phantom auth React app
clusterProvisioner ClusterProvisioner // Optional: for namespace cluster provisioning
solanaVerifier *authsvc.SolanaNFTVerifier // Server-side NFT ownership verifier
}
@ -82,9 +89,8 @@ func (h *Handlers) SetClusterProvisioner(cp ClusterProvisioner) {
h.clusterProvisioner = cp
}
// SetPhantomConfig sets the Phantom auth app URL and Solana NFT verifier
func (h *Handlers) SetPhantomConfig(authURL string, verifier *authsvc.SolanaNFTVerifier) {
h.phantomAuthURL = authURL
// SetSolanaVerifier sets the server-side NFT ownership verifier for Phantom auth
func (h *Handlers) SetSolanaVerifier(verifier *authsvc.SolanaNFTVerifier) {
h.solanaVerifier = verifier
}

View File

@ -5,9 +5,7 @@ import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"net/url"
"regexp"
"strings"
"time"
@ -29,10 +27,6 @@ func (h *Handlers) PhantomSessionHandler(w http.ResponseWriter, r *http.Request)
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
if h.phantomAuthURL == "" {
writeError(w, http.StatusServiceUnavailable, "phantom auth not configured")
return
}
var req struct {
Namespace string `json:"namespace"`
@ -78,23 +72,8 @@ func (h *Handlers) PhantomSessionHandler(w http.ResponseWriter, r *http.Request)
return
}
// Build the auth URL that the phone will open
gatewayURL := r.Header.Get("X-Forwarded-Proto") + "://" + r.Header.Get("X-Forwarded-Host")
if gatewayURL == "://" {
// Fallback: construct from request
scheme := "https"
if r.TLS == nil && r.Header.Get("X-Forwarded-Proto") == "" {
scheme = "http"
}
gatewayURL = scheme + "://" + r.Host
}
authURL := fmt.Sprintf("%s/?session=%s&gateway=%s&namespace=%s",
h.phantomAuthURL, sessionID, url.QueryEscape(gatewayURL), url.QueryEscape(namespace))
writeJSON(w, http.StatusOK, map[string]any{
"session_id": sessionID,
"auth_url": authURL,
"expires_at": expiresAt.UTC().Format(time.RFC3339),
})
}

View File

@ -449,6 +449,11 @@ func isPublicPath(p string) bool {
return true
}
// Namespace cluster repair endpoint (auth handled by internal auth header)
if p == "/v1/internal/namespace/repair" {
return true
}
// Phantom auth endpoints are public (session creation, status polling, completion)
if strings.HasPrefix(p, "/v1/auth/phantom/") {
return true

View File

@ -44,6 +44,9 @@ func (g *Gateway) Routes() http.Handler {
mux.Handle("/v1/internal/namespace/spawn", g.spawnHandler)
}
// Namespace cluster repair (internal, handler does its own auth)
mux.HandleFunc("/v1/internal/namespace/repair", g.namespaceClusterRepairHandler)
// auth endpoints
mux.HandleFunc("/v1/auth/jwks", g.authService.JWKSHandler)
mux.HandleFunc("/.well-known/jwks.json", g.authService.JWKSHandler)

View File

@ -1722,6 +1722,28 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl
zap.String("cluster_id", state.ClusterID),
)
// Self-check: verify this node is still assigned to this cluster in the DB.
// If we were replaced during downtime, do NOT restore — stop services instead.
if cm.db != nil {
type countResult struct {
Count int `db:"count"`
}
var results []countResult
verifyQuery := `SELECT COUNT(*) as count FROM namespace_cluster_nodes WHERE namespace_cluster_id = ? AND node_id = ?`
if err := cm.db.Query(ctx, &results, verifyQuery, state.ClusterID, cm.localNodeID); err == nil && len(results) > 0 {
if results[0].Count == 0 {
cm.logger.Warn("Node was replaced during downtime, stopping orphaned services instead of restoring",
zap.String("namespace", state.NamespaceName),
zap.String("cluster_id", state.ClusterID))
cm.systemdSpawner.StopAll(ctx, state.NamespaceName)
// Delete the stale cluster-state.json
stateFilePath := filepath.Join(cm.baseDataDir, state.NamespaceName, "cluster-state.json")
os.Remove(stateFilePath)
return nil
}
}
}
pb := &state.LocalPorts
localIP := state.LocalIP

View File

@ -0,0 +1,913 @@
package namespace
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/DeBrosOfficial/network/pkg/client"
"github.com/DeBrosOfficial/network/pkg/gateway"
"github.com/DeBrosOfficial/network/pkg/olric"
"github.com/DeBrosOfficial/network/pkg/rqlite"
"go.uber.org/zap"
)
// nodeIPInfo holds both internal (WireGuard) and public IPs for a node.
type nodeIPInfo struct {
InternalIP string `db:"internal_ip"`
IPAddress string `db:"ip_address"`
}
// survivingNodePorts holds port and IP info for surviving cluster nodes.
type survivingNodePorts struct {
NodeID string `db:"node_id"`
InternalIP string `db:"internal_ip"`
IPAddress string `db:"ip_address"`
RQLiteHTTPPort int `db:"rqlite_http_port"`
RQLiteRaftPort int `db:"rqlite_raft_port"`
OlricHTTPPort int `db:"olric_http_port"`
OlricMemberlistPort int `db:"olric_memberlist_port"`
GatewayHTTPPort int `db:"gateway_http_port"`
}
// HandleDeadNode processes the death of a network node by recovering all affected
// namespace clusters. It finds all clusters with assignments to the dead node and
// replaces it with a healthy node in each cluster.
func (cm *ClusterManager) HandleDeadNode(ctx context.Context, deadNodeID string) {
cm.logger.Error("Handling dead node — starting recovery",
zap.String("dead_node", deadNodeID),
)
// Mark node as offline in dns_nodes
if err := cm.markNodeOffline(ctx, deadNodeID); err != nil {
cm.logger.Warn("Failed to mark node offline", zap.Error(err))
}
// Find all affected clusters
clusters, err := cm.getClustersByNodeID(ctx, deadNodeID)
if err != nil {
cm.logger.Error("Failed to find affected clusters for dead node",
zap.String("dead_node", deadNodeID), zap.Error(err))
return
}
if len(clusters) == 0 {
cm.logger.Info("Dead node had no namespace cluster assignments",
zap.String("dead_node", deadNodeID))
return
}
cm.logger.Info("Found affected namespace clusters",
zap.String("dead_node", deadNodeID),
zap.Int("cluster_count", len(clusters)),
)
// Recover each cluster sequentially (avoid overloading replacement nodes)
successCount := 0
for _, cluster := range clusters {
recoveryKey := "recovery:" + cluster.ID
cm.provisioningMu.Lock()
if cm.provisioning[recoveryKey] {
cm.provisioningMu.Unlock()
cm.logger.Info("Recovery already in progress for cluster, skipping",
zap.String("cluster_id", cluster.ID),
zap.String("namespace", cluster.NamespaceName))
continue
}
cm.provisioning[recoveryKey] = true
cm.provisioningMu.Unlock()
clusterCopy := cluster
err := func() error {
defer func() {
cm.provisioningMu.Lock()
delete(cm.provisioning, recoveryKey)
cm.provisioningMu.Unlock()
}()
return cm.ReplaceClusterNode(ctx, &clusterCopy, deadNodeID)
}()
if err != nil {
cm.logger.Error("Failed to recover cluster",
zap.String("cluster_id", clusterCopy.ID),
zap.String("namespace", clusterCopy.NamespaceName),
zap.String("dead_node", deadNodeID),
zap.Error(err),
)
cm.logEvent(ctx, clusterCopy.ID, EventRecoveryFailed, deadNodeID,
fmt.Sprintf("Recovery failed: %s", err), nil)
} else {
successCount++
}
}
cm.logger.Info("Dead node recovery completed",
zap.String("dead_node", deadNodeID),
zap.Int("clusters_total", len(clusters)),
zap.Int("clusters_recovered", successCount),
)
}
// HandleRecoveredNode handles a previously-dead node coming back online.
// It checks if the node was replaced during downtime and cleans up orphaned services.
func (cm *ClusterManager) HandleRecoveredNode(ctx context.Context, nodeID string) {
cm.logger.Info("Handling recovered node — checking for orphaned services",
zap.String("node_id", nodeID),
)
// Check if the node still has any cluster assignments
type assignmentCheck struct {
Count int `db:"count"`
}
var results []assignmentCheck
query := `SELECT COUNT(*) as count FROM namespace_cluster_nodes WHERE node_id = ?`
if err := cm.db.Query(ctx, &results, query, nodeID); err != nil {
cm.logger.Warn("Failed to check node assignments", zap.Error(err))
return
}
if len(results) > 0 && results[0].Count > 0 {
// Node still has legitimate assignments — just mark active and return
cm.logger.Info("Recovered node still has cluster assignments, marking active",
zap.String("node_id", nodeID),
zap.Int("assignments", results[0].Count))
cm.markNodeActive(ctx, nodeID)
return
}
// Node has no assignments — it was replaced. Clean up orphaned services.
cm.logger.Warn("Recovered node was replaced during downtime, cleaning up orphaned services",
zap.String("node_id", nodeID))
// Get the node's internal IP to send stop requests
ips, err := cm.getNodeIPs(ctx, nodeID)
if err != nil {
cm.logger.Warn("Failed to get recovered node IPs for cleanup", zap.Error(err))
cm.markNodeActive(ctx, nodeID)
return
}
// Find which namespaces were moved away by querying recovery events
type eventInfo struct {
NamespaceName string `db:"namespace_name"`
}
var events []eventInfo
cutoff := time.Now().Add(-24 * time.Hour).Format("2006-01-02 15:04:05")
eventsQuery := `
SELECT DISTINCT c.namespace_name
FROM namespace_cluster_events e
JOIN namespace_clusters c ON e.namespace_cluster_id = c.id
WHERE e.node_id = ? AND e.event_type = ? AND e.created_at > ?
`
if err := cm.db.Query(ctx, &events, eventsQuery, nodeID, EventRecoveryStarted, cutoff); err != nil {
cm.logger.Warn("Failed to query recovery events for cleanup", zap.Error(err))
}
// Send stop requests for each orphaned namespace
for _, evt := range events {
cm.logger.Info("Stopping orphaned namespace services on recovered node",
zap.String("node_id", nodeID),
zap.String("namespace", evt.NamespaceName))
cm.sendStopRequest(ctx, ips.InternalIP, "stop-all", evt.NamespaceName, nodeID)
// Also delete the stale cluster-state.json
cm.sendSpawnRequest(ctx, ips.InternalIP, map[string]interface{}{
"action": "delete-cluster-state",
"namespace": evt.NamespaceName,
"node_id": nodeID,
})
}
// Mark node as active again — it's available for future use
cm.markNodeActive(ctx, nodeID)
cm.logger.Info("Recovered node cleanup completed",
zap.String("node_id", nodeID),
zap.Int("namespaces_cleaned", len(events)))
}
// ReplaceClusterNode replaces a dead node in a specific namespace cluster.
// It selects a new node, allocates ports, spawns services, updates DNS, and cleans up.
func (cm *ClusterManager) ReplaceClusterNode(ctx context.Context, cluster *NamespaceCluster, deadNodeID string) error {
cm.logger.Info("Starting node replacement in cluster",
zap.String("cluster_id", cluster.ID),
zap.String("namespace", cluster.NamespaceName),
zap.String("dead_node", deadNodeID),
)
cm.logEvent(ctx, cluster.ID, EventRecoveryStarted, deadNodeID,
fmt.Sprintf("Recovery started: replacing dead node %s", deadNodeID), nil)
// 1. Mark dead node's assignments as failed
if err := cm.updateClusterNodeStatus(ctx, cluster.ID, deadNodeID, NodeStatusFailed); err != nil {
cm.logger.Warn("Failed to mark node as failed in cluster", zap.Error(err))
}
// 2. Mark cluster as degraded
cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusDegraded,
fmt.Sprintf("Node %s is dead, recovery in progress", deadNodeID))
cm.logEvent(ctx, cluster.ID, EventClusterDegraded, deadNodeID, "Cluster degraded due to dead node", nil)
// 3. Get all current cluster nodes and their info
clusterNodes, err := cm.getClusterNodes(ctx, cluster.ID)
if err != nil {
return fmt.Errorf("failed to get cluster nodes: %w", err)
}
// Build exclude list (all current cluster members)
excludeIDs := make([]string, 0, len(clusterNodes))
for _, cn := range clusterNodes {
excludeIDs = append(excludeIDs, cn.NodeID)
}
// 4. Select replacement node
replacement, err := cm.nodeSelector.SelectReplacementNode(ctx, excludeIDs)
if err != nil {
return fmt.Errorf("failed to select replacement node: %w", err)
}
cm.logger.Info("Selected replacement node",
zap.String("namespace", cluster.NamespaceName),
zap.String("replacement_node", replacement.NodeID),
zap.String("replacement_ip", replacement.InternalIP),
)
// 5. Allocate ports on replacement node
portBlock, err := cm.portAllocator.AllocatePortBlock(ctx, replacement.NodeID, cluster.ID)
if err != nil {
return fmt.Errorf("failed to allocate ports on replacement node: %w", err)
}
// 6. Get surviving nodes' port info
var surviving []survivingNodePorts
portsQuery := `
SELECT pa.node_id, COALESCE(dn.internal_ip, dn.ip_address) as internal_ip, dn.ip_address,
pa.rqlite_http_port, pa.rqlite_raft_port, pa.olric_http_port,
pa.olric_memberlist_port, pa.gateway_http_port
FROM namespace_port_allocations pa
JOIN dns_nodes dn ON pa.node_id = dn.id
WHERE pa.namespace_cluster_id = ? AND pa.node_id != ?
`
if err := cm.db.Query(ctx, &surviving, portsQuery, cluster.ID, deadNodeID); err != nil {
// Rollback port allocation
cm.portAllocator.DeallocatePortBlock(ctx, cluster.ID, replacement.NodeID)
return fmt.Errorf("failed to query surviving node ports: %w", err)
}
// 7. Determine dead node's roles
deadNodeRoles := make(map[NodeRole]bool)
var deadNodeRaftPort int
for _, cn := range clusterNodes {
if cn.NodeID == deadNodeID {
deadNodeRoles[cn.Role] = true
if cn.Role == NodeRoleRQLiteLeader || cn.Role == NodeRoleRQLiteFollower {
deadNodeRaftPort = cn.RQLiteRaftPort
}
}
}
// 8. Remove dead node from RQLite Raft cluster (before joining replacement)
if deadNodeRoles[NodeRoleRQLiteLeader] || deadNodeRoles[NodeRoleRQLiteFollower] {
deadIPs, err := cm.getNodeIPs(ctx, deadNodeID)
if err == nil && deadNodeRaftPort > 0 {
deadRaftAddr := fmt.Sprintf("%s:%d", deadIPs.InternalIP, deadNodeRaftPort)
cm.removeDeadNodeFromRaft(ctx, deadRaftAddr, surviving)
}
}
spawnErrors := 0
// 9. Spawn RQLite follower on replacement
if deadNodeRoles[NodeRoleRQLiteLeader] || deadNodeRoles[NodeRoleRQLiteFollower] {
var joinAddr string
for _, s := range surviving {
if s.RQLiteRaftPort > 0 {
joinAddr = fmt.Sprintf("%s:%d", s.InternalIP, s.RQLiteRaftPort)
break
}
}
rqliteCfg := rqlite.InstanceConfig{
Namespace: cluster.NamespaceName,
NodeID: replacement.NodeID,
HTTPPort: portBlock.RQLiteHTTPPort,
RaftPort: portBlock.RQLiteRaftPort,
HTTPAdvAddress: fmt.Sprintf("%s:%d", replacement.InternalIP, portBlock.RQLiteHTTPPort),
RaftAdvAddress: fmt.Sprintf("%s:%d", replacement.InternalIP, portBlock.RQLiteRaftPort),
JoinAddresses: []string{joinAddr},
IsLeader: false,
}
var spawnErr error
if replacement.NodeID == cm.localNodeID {
spawnErr = cm.spawnRQLiteWithSystemd(ctx, rqliteCfg)
} else {
_, spawnErr = cm.spawnRQLiteRemote(ctx, replacement.InternalIP, rqliteCfg)
}
if spawnErr != nil {
cm.logger.Error("Failed to spawn RQLite follower on replacement",
zap.String("node", replacement.NodeID), zap.Error(spawnErr))
spawnErrors++
} else {
cm.insertClusterNode(ctx, cluster.ID, replacement.NodeID, NodeRoleRQLiteFollower, portBlock)
cm.logEvent(ctx, cluster.ID, EventRQLiteStarted, replacement.NodeID,
"RQLite follower started on replacement node", nil)
}
}
// 10. Spawn Olric on replacement
if deadNodeRoles[NodeRoleOlric] {
var olricPeers []string
for _, s := range surviving {
if s.OlricMemberlistPort > 0 {
olricPeers = append(olricPeers, fmt.Sprintf("%s:%d", s.InternalIP, s.OlricMemberlistPort))
}
}
olricCfg := olric.InstanceConfig{
Namespace: cluster.NamespaceName,
NodeID: replacement.NodeID,
HTTPPort: portBlock.OlricHTTPPort,
MemberlistPort: portBlock.OlricMemberlistPort,
BindAddr: replacement.InternalIP,
AdvertiseAddr: replacement.InternalIP,
PeerAddresses: olricPeers,
}
var spawnErr error
if replacement.NodeID == cm.localNodeID {
spawnErr = cm.spawnOlricWithSystemd(ctx, olricCfg)
} else {
_, spawnErr = cm.spawnOlricRemote(ctx, replacement.InternalIP, olricCfg)
}
if spawnErr != nil {
cm.logger.Error("Failed to spawn Olric on replacement",
zap.String("node", replacement.NodeID), zap.Error(spawnErr))
spawnErrors++
} else {
cm.insertClusterNode(ctx, cluster.ID, replacement.NodeID, NodeRoleOlric, portBlock)
cm.logEvent(ctx, cluster.ID, EventOlricStarted, replacement.NodeID,
"Olric started on replacement node", nil)
}
}
// 11. Spawn Gateway on replacement
if deadNodeRoles[NodeRoleGateway] {
// Build Olric server addresses — all nodes including replacement
var olricServers []string
for _, s := range surviving {
if s.OlricHTTPPort > 0 {
olricServers = append(olricServers, fmt.Sprintf("%s:%d", s.InternalIP, s.OlricHTTPPort))
}
}
olricServers = append(olricServers, fmt.Sprintf("%s:%d", replacement.InternalIP, portBlock.OlricHTTPPort))
gwCfg := gateway.InstanceConfig{
Namespace: cluster.NamespaceName,
NodeID: replacement.NodeID,
HTTPPort: portBlock.GatewayHTTPPort,
BaseDomain: cm.baseDomain,
RQLiteDSN: fmt.Sprintf("http://localhost:%d", portBlock.RQLiteHTTPPort),
GlobalRQLiteDSN: cm.globalRQLiteDSN,
OlricServers: olricServers,
OlricTimeout: 30 * time.Second,
IPFSClusterAPIURL: cm.ipfsClusterAPIURL,
IPFSAPIURL: cm.ipfsAPIURL,
IPFSTimeout: cm.ipfsTimeout,
IPFSReplicationFactor: cm.ipfsReplicationFactor,
}
var spawnErr error
if replacement.NodeID == cm.localNodeID {
spawnErr = cm.spawnGatewayWithSystemd(ctx, gwCfg)
} else {
_, spawnErr = cm.spawnGatewayRemote(ctx, replacement.InternalIP, gwCfg)
}
if spawnErr != nil {
cm.logger.Error("Failed to spawn Gateway on replacement",
zap.String("node", replacement.NodeID), zap.Error(spawnErr))
spawnErrors++
} else {
cm.insertClusterNode(ctx, cluster.ID, replacement.NodeID, NodeRoleGateway, portBlock)
cm.logEvent(ctx, cluster.ID, EventGatewayStarted, replacement.NodeID,
"Gateway started on replacement node", nil)
}
}
// 12. Update DNS: swap dead node's PUBLIC IP for replacement's PUBLIC IP
deadIPs, err := cm.getNodeIPs(ctx, deadNodeID)
if err == nil && deadIPs.IPAddress != "" {
dnsManager := NewDNSRecordManager(cm.db, cm.baseDomain, cm.logger)
if err := dnsManager.UpdateNamespaceRecord(ctx, cluster.NamespaceName, deadIPs.IPAddress, replacement.IPAddress); err != nil {
cm.logger.Error("Failed to update DNS records",
zap.String("namespace", cluster.NamespaceName),
zap.String("old_ip", deadIPs.IPAddress),
zap.String("new_ip", replacement.IPAddress),
zap.Error(err))
} else {
cm.logger.Info("DNS records updated",
zap.String("namespace", cluster.NamespaceName),
zap.String("old_ip", deadIPs.IPAddress),
zap.String("new_ip", replacement.IPAddress))
cm.logEvent(ctx, cluster.ID, EventDNSCreated, replacement.NodeID,
fmt.Sprintf("DNS updated: %s → %s", deadIPs.IPAddress, replacement.IPAddress), nil)
}
}
// 13. Clean up dead node's port allocations and cluster assignments
cm.portAllocator.DeallocatePortBlock(ctx, cluster.ID, deadNodeID)
cm.removeClusterNodeAssignment(ctx, cluster.ID, deadNodeID)
// 14. Update cluster-state.json on all nodes
cm.updateClusterStateAfterRecovery(ctx, cluster)
// 15. Update cluster status
if spawnErrors == 0 {
cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusReady, "")
}
// If there were spawn errors, cluster stays degraded
cm.logEvent(ctx, cluster.ID, EventNodeReplaced, replacement.NodeID,
fmt.Sprintf("Dead node %s replaced by %s", deadNodeID, replacement.NodeID),
map[string]interface{}{
"dead_node": deadNodeID,
"replacement_node": replacement.NodeID,
"spawn_errors": spawnErrors,
})
cm.logEvent(ctx, cluster.ID, EventRecoveryComplete, "", "Recovery completed", nil)
cm.logger.Info("Node replacement completed",
zap.String("cluster_id", cluster.ID),
zap.String("namespace", cluster.NamespaceName),
zap.String("dead_node", deadNodeID),
zap.String("replacement", replacement.NodeID),
zap.Int("spawn_errors", spawnErrors),
)
return nil
}
// --- Helper methods ---
// getClustersByNodeID returns all ready/degraded clusters that have the given node assigned.
func (cm *ClusterManager) getClustersByNodeID(ctx context.Context, nodeID string) ([]NamespaceCluster, error) {
internalCtx := client.WithInternalAuth(ctx)
type clusterRef struct {
ClusterID string `db:"namespace_cluster_id"`
}
var refs []clusterRef
query := `
SELECT DISTINCT cn.namespace_cluster_id
FROM namespace_cluster_nodes cn
JOIN namespace_clusters c ON cn.namespace_cluster_id = c.id
WHERE cn.node_id = ? AND c.status IN ('ready', 'degraded')
`
if err := cm.db.Query(internalCtx, &refs, query, nodeID); err != nil {
return nil, fmt.Errorf("failed to query clusters by node: %w", err)
}
var clusters []NamespaceCluster
for _, ref := range refs {
cluster, err := cm.GetCluster(internalCtx, ref.ClusterID)
if err != nil || cluster == nil {
continue
}
clusters = append(clusters, *cluster)
}
return clusters, nil
}
// updateClusterNodeStatus marks a specific cluster node assignment with a new status.
func (cm *ClusterManager) updateClusterNodeStatus(ctx context.Context, clusterID, nodeID string, status NodeStatus) error {
query := `UPDATE namespace_cluster_nodes SET status = ?, updated_at = ? WHERE namespace_cluster_id = ? AND node_id = ?`
_, err := cm.db.Exec(ctx, query, status, time.Now().Format("2006-01-02 15:04:05"), clusterID, nodeID)
return err
}
// removeClusterNodeAssignment deletes all node assignments for a node in a cluster.
func (cm *ClusterManager) removeClusterNodeAssignment(ctx context.Context, clusterID, nodeID string) {
query := `DELETE FROM namespace_cluster_nodes WHERE namespace_cluster_id = ? AND node_id = ?`
if _, err := cm.db.Exec(ctx, query, clusterID, nodeID); err != nil {
cm.logger.Warn("Failed to remove cluster node assignment",
zap.String("cluster_id", clusterID),
zap.String("node_id", nodeID),
zap.Error(err))
}
}
// getNodeIPs returns both the internal (WireGuard) and public IP for a node.
func (cm *ClusterManager) getNodeIPs(ctx context.Context, nodeID string) (*nodeIPInfo, error) {
var results []nodeIPInfo
query := `SELECT COALESCE(internal_ip, ip_address) as internal_ip, ip_address FROM dns_nodes WHERE id = ? LIMIT 1`
if err := cm.db.Query(ctx, &results, query, nodeID); err != nil || len(results) == 0 {
return nil, fmt.Errorf("node %s not found in dns_nodes", nodeID)
}
return &results[0], nil
}
// markNodeOffline sets a node's status to 'offline' in dns_nodes.
func (cm *ClusterManager) markNodeOffline(ctx context.Context, nodeID string) error {
query := `UPDATE dns_nodes SET status = 'offline', updated_at = ? WHERE id = ?`
_, err := cm.db.Exec(ctx, query, time.Now().Format("2006-01-02 15:04:05"), nodeID)
return err
}
// markNodeActive sets a node's status to 'active' in dns_nodes.
func (cm *ClusterManager) markNodeActive(ctx context.Context, nodeID string) {
query := `UPDATE dns_nodes SET status = 'active', updated_at = ? WHERE id = ?`
if _, err := cm.db.Exec(ctx, query, time.Now().Format("2006-01-02 15:04:05"), nodeID); err != nil {
cm.logger.Warn("Failed to mark node active", zap.String("node_id", nodeID), zap.Error(err))
}
}
// removeDeadNodeFromRaft sends a DELETE request to a surviving RQLite node
// to remove the dead node from the Raft voter set.
func (cm *ClusterManager) removeDeadNodeFromRaft(ctx context.Context, deadRaftAddr string, survivingNodes []survivingNodePorts) {
if deadRaftAddr == "" {
return
}
payload, _ := json.Marshal(map[string]string{"id": deadRaftAddr})
for _, s := range survivingNodes {
if s.RQLiteHTTPPort == 0 {
continue
}
url := fmt.Sprintf("http://%s:%d/remove", s.InternalIP, s.RQLiteHTTPPort)
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, bytes.NewReader(payload))
if err != nil {
continue
}
req.Header.Set("Content-Type", "application/json")
httpClient := &http.Client{Timeout: 10 * time.Second}
resp, err := httpClient.Do(req)
if err != nil {
cm.logger.Warn("Failed to remove dead node from Raft via this node",
zap.String("target", s.NodeID), zap.Error(err))
continue
}
resp.Body.Close()
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusNoContent {
cm.logger.Info("Removed dead node from Raft cluster",
zap.String("dead_raft_addr", deadRaftAddr),
zap.String("via_node", s.NodeID))
return
}
cm.logger.Warn("Raft removal returned unexpected status",
zap.String("via_node", s.NodeID),
zap.Int("status", resp.StatusCode))
}
cm.logger.Warn("Could not remove dead node from Raft cluster (best-effort)",
zap.String("dead_raft_addr", deadRaftAddr))
}
// updateClusterStateAfterRecovery rebuilds and distributes cluster-state.json
// to all current nodes in the cluster (surviving + replacement).
func (cm *ClusterManager) updateClusterStateAfterRecovery(ctx context.Context, cluster *NamespaceCluster) {
// Re-query all current nodes and ports
var allPorts []survivingNodePorts
query := `
SELECT pa.node_id, COALESCE(dn.internal_ip, dn.ip_address) as internal_ip, dn.ip_address,
pa.rqlite_http_port, pa.rqlite_raft_port, pa.olric_http_port,
pa.olric_memberlist_port, pa.gateway_http_port
FROM namespace_port_allocations pa
JOIN dns_nodes dn ON pa.node_id = dn.id
WHERE pa.namespace_cluster_id = ?
`
if err := cm.db.Query(ctx, &allPorts, query, cluster.ID); err != nil {
cm.logger.Warn("Failed to query ports for state update", zap.Error(err))
return
}
// Convert to the format expected by saveClusterStateToAllNodes
nodes := make([]NodeCapacity, len(allPorts))
portBlocks := make([]*PortBlock, len(allPorts))
for i, np := range allPorts {
nodes[i] = NodeCapacity{
NodeID: np.NodeID,
InternalIP: np.InternalIP,
IPAddress: np.IPAddress,
}
portBlocks[i] = &PortBlock{
RQLiteHTTPPort: np.RQLiteHTTPPort,
RQLiteRaftPort: np.RQLiteRaftPort,
OlricHTTPPort: np.OlricHTTPPort,
OlricMemberlistPort: np.OlricMemberlistPort,
GatewayHTTPPort: np.GatewayHTTPPort,
}
}
cm.saveClusterStateToAllNodes(ctx, cluster, nodes, portBlocks)
}
// RepairCluster checks a namespace cluster for missing nodes and adds replacements
// without touching surviving nodes. This is used to repair under-provisioned clusters
// (e.g., after manual node removal) without data loss or downtime.
func (cm *ClusterManager) RepairCluster(ctx context.Context, namespaceName string) error {
cm.logger.Info("Starting cluster repair",
zap.String("namespace", namespaceName),
)
// 1. Look up the cluster
cluster, err := cm.GetClusterByNamespace(ctx, namespaceName)
if err != nil {
return fmt.Errorf("failed to look up cluster: %w", err)
}
if cluster == nil {
return ErrClusterNotFound
}
if cluster.Status != ClusterStatusReady && cluster.Status != ClusterStatusDegraded {
return fmt.Errorf("cluster status is %s, can only repair ready or degraded clusters", cluster.Status)
}
// 2. Acquire per-cluster lock
repairKey := "repair:" + cluster.ID
cm.provisioningMu.Lock()
if cm.provisioning[repairKey] {
cm.provisioningMu.Unlock()
return ErrRecoveryInProgress
}
cm.provisioning[repairKey] = true
cm.provisioningMu.Unlock()
defer func() {
cm.provisioningMu.Lock()
delete(cm.provisioning, repairKey)
cm.provisioningMu.Unlock()
}()
// 3. Get current cluster nodes
clusterNodes, err := cm.getClusterNodes(ctx, cluster.ID)
if err != nil {
return fmt.Errorf("failed to get cluster nodes: %w", err)
}
// Count unique physical nodes with active services
activeNodes := make(map[string]bool)
for _, cn := range clusterNodes {
if cn.Status == NodeStatusRunning || cn.Status == NodeStatusStarting {
activeNodes[cn.NodeID] = true
}
}
// Expected node count is the cluster's configured RQLite count (each physical node
// runs all 3 services: rqlite + olric + gateway)
expectedCount := cluster.RQLiteNodeCount
activeCount := len(activeNodes)
missingCount := expectedCount - activeCount
if missingCount <= 0 {
cm.logger.Info("Cluster has expected number of active nodes, no repair needed",
zap.String("namespace", namespaceName),
zap.Int("active_nodes", activeCount),
zap.Int("expected", expectedCount),
)
return nil
}
cm.logger.Info("Cluster needs repair — adding missing nodes",
zap.String("namespace", namespaceName),
zap.Int("active_nodes", activeCount),
zap.Int("expected", expectedCount),
zap.Int("missing", missingCount),
)
cm.logEvent(ctx, cluster.ID, EventRecoveryStarted, "",
fmt.Sprintf("Cluster repair started: %d of %d nodes active, adding %d", activeCount, expectedCount, missingCount), nil)
// 4. Build the current node exclude list (all physical node IDs in the cluster)
excludeIDs := make([]string, 0)
nodeIDSet := make(map[string]bool)
for _, cn := range clusterNodes {
if !nodeIDSet[cn.NodeID] {
nodeIDSet[cn.NodeID] = true
excludeIDs = append(excludeIDs, cn.NodeID)
}
}
// 5. Get surviving nodes' port info for joining
var surviving []survivingNodePorts
portsQuery := `
SELECT pa.node_id, COALESCE(dn.internal_ip, dn.ip_address) as internal_ip, dn.ip_address,
pa.rqlite_http_port, pa.rqlite_raft_port, pa.olric_http_port,
pa.olric_memberlist_port, pa.gateway_http_port
FROM namespace_port_allocations pa
JOIN dns_nodes dn ON pa.node_id = dn.id
WHERE pa.namespace_cluster_id = ?
`
if err := cm.db.Query(ctx, &surviving, portsQuery, cluster.ID); err != nil {
return fmt.Errorf("failed to query surviving node ports: %w", err)
}
if len(surviving) == 0 {
return fmt.Errorf("no surviving nodes found with port allocations")
}
// 6. Add missing nodes one at a time
addedCount := 0
for i := 0; i < missingCount; i++ {
replacement, portBlock, err := cm.addNodeToCluster(ctx, cluster, excludeIDs, surviving)
if err != nil {
cm.logger.Error("Failed to add node during cluster repair",
zap.String("namespace", namespaceName),
zap.Int("node_index", i+1),
zap.Int("missing", missingCount),
zap.Error(err),
)
cm.logEvent(ctx, cluster.ID, EventRecoveryFailed, "",
fmt.Sprintf("Repair failed on node %d of %d: %s", i+1, missingCount, err), nil)
break
}
addedCount++
// Update exclude list and surviving list for next iteration
excludeIDs = append(excludeIDs, replacement.NodeID)
surviving = append(surviving, survivingNodePorts{
NodeID: replacement.NodeID,
InternalIP: replacement.InternalIP,
IPAddress: replacement.IPAddress,
RQLiteHTTPPort: portBlock.RQLiteHTTPPort,
RQLiteRaftPort: portBlock.RQLiteRaftPort,
OlricHTTPPort: portBlock.OlricHTTPPort,
OlricMemberlistPort: portBlock.OlricMemberlistPort,
GatewayHTTPPort: portBlock.GatewayHTTPPort,
})
}
if addedCount == 0 {
return fmt.Errorf("failed to add any replacement nodes")
}
// 7. Update cluster-state.json on all nodes
cm.updateClusterStateAfterRecovery(ctx, cluster)
// 8. Mark cluster ready
cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusReady, "")
cm.logEvent(ctx, cluster.ID, EventRecoveryComplete, "",
fmt.Sprintf("Cluster repair completed: added %d of %d missing nodes", addedCount, missingCount),
map[string]interface{}{"added_nodes": addedCount, "missing_nodes": missingCount})
cm.logger.Info("Cluster repair completed",
zap.String("namespace", namespaceName),
zap.Int("added_nodes", addedCount),
zap.Int("missing_nodes", missingCount),
)
return nil
}
// addNodeToCluster selects a new node and spawns all services (RQLite follower, Olric, Gateway)
// on it, joining the existing cluster. Returns the replacement node info and allocated port block.
func (cm *ClusterManager) addNodeToCluster(
ctx context.Context,
cluster *NamespaceCluster,
excludeIDs []string,
surviving []survivingNodePorts,
) (*NodeCapacity, *PortBlock, error) {
// 1. Select replacement node
replacement, err := cm.nodeSelector.SelectReplacementNode(ctx, excludeIDs)
if err != nil {
return nil, nil, fmt.Errorf("failed to select replacement node: %w", err)
}
cm.logger.Info("Selected node for cluster repair",
zap.String("namespace", cluster.NamespaceName),
zap.String("new_node", replacement.NodeID),
zap.String("new_ip", replacement.InternalIP),
)
// 2. Allocate ports on the new node
portBlock, err := cm.portAllocator.AllocatePortBlock(ctx, replacement.NodeID, cluster.ID)
if err != nil {
return nil, nil, fmt.Errorf("failed to allocate ports on new node: %w", err)
}
// 3. Spawn RQLite follower
var joinAddr string
for _, s := range surviving {
if s.RQLiteRaftPort > 0 {
joinAddr = fmt.Sprintf("%s:%d", s.InternalIP, s.RQLiteRaftPort)
break
}
}
rqliteCfg := rqlite.InstanceConfig{
Namespace: cluster.NamespaceName,
NodeID: replacement.NodeID,
HTTPPort: portBlock.RQLiteHTTPPort,
RaftPort: portBlock.RQLiteRaftPort,
HTTPAdvAddress: fmt.Sprintf("%s:%d", replacement.InternalIP, portBlock.RQLiteHTTPPort),
RaftAdvAddress: fmt.Sprintf("%s:%d", replacement.InternalIP, portBlock.RQLiteRaftPort),
JoinAddresses: []string{joinAddr},
IsLeader: false,
}
var spawnErr error
if replacement.NodeID == cm.localNodeID {
spawnErr = cm.spawnRQLiteWithSystemd(ctx, rqliteCfg)
} else {
_, spawnErr = cm.spawnRQLiteRemote(ctx, replacement.InternalIP, rqliteCfg)
}
if spawnErr != nil {
cm.portAllocator.DeallocatePortBlock(ctx, cluster.ID, replacement.NodeID)
return nil, nil, fmt.Errorf("failed to spawn RQLite follower: %w", spawnErr)
}
cm.insertClusterNode(ctx, cluster.ID, replacement.NodeID, NodeRoleRQLiteFollower, portBlock)
cm.logEvent(ctx, cluster.ID, EventRQLiteStarted, replacement.NodeID,
"RQLite follower started on new node (repair)", nil)
// 4. Spawn Olric
var olricPeers []string
for _, s := range surviving {
if s.OlricMemberlistPort > 0 {
olricPeers = append(olricPeers, fmt.Sprintf("%s:%d", s.InternalIP, s.OlricMemberlistPort))
}
}
olricCfg := olric.InstanceConfig{
Namespace: cluster.NamespaceName,
NodeID: replacement.NodeID,
HTTPPort: portBlock.OlricHTTPPort,
MemberlistPort: portBlock.OlricMemberlistPort,
BindAddr: replacement.InternalIP,
AdvertiseAddr: replacement.InternalIP,
PeerAddresses: olricPeers,
}
if replacement.NodeID == cm.localNodeID {
spawnErr = cm.spawnOlricWithSystemd(ctx, olricCfg)
} else {
_, spawnErr = cm.spawnOlricRemote(ctx, replacement.InternalIP, olricCfg)
}
if spawnErr != nil {
cm.logger.Error("Failed to spawn Olric on new node (repair continues)",
zap.String("node", replacement.NodeID), zap.Error(spawnErr))
} else {
cm.insertClusterNode(ctx, cluster.ID, replacement.NodeID, NodeRoleOlric, portBlock)
cm.logEvent(ctx, cluster.ID, EventOlricStarted, replacement.NodeID,
"Olric started on new node (repair)", nil)
}
// 5. Spawn Gateway
var olricServers []string
for _, s := range surviving {
if s.OlricHTTPPort > 0 {
olricServers = append(olricServers, fmt.Sprintf("%s:%d", s.InternalIP, s.OlricHTTPPort))
}
}
olricServers = append(olricServers, fmt.Sprintf("%s:%d", replacement.InternalIP, portBlock.OlricHTTPPort))
gwCfg := gateway.InstanceConfig{
Namespace: cluster.NamespaceName,
NodeID: replacement.NodeID,
HTTPPort: portBlock.GatewayHTTPPort,
BaseDomain: cm.baseDomain,
RQLiteDSN: fmt.Sprintf("http://localhost:%d", portBlock.RQLiteHTTPPort),
GlobalRQLiteDSN: cm.globalRQLiteDSN,
OlricServers: olricServers,
OlricTimeout: 30 * time.Second,
IPFSClusterAPIURL: cm.ipfsClusterAPIURL,
IPFSAPIURL: cm.ipfsAPIURL,
IPFSTimeout: cm.ipfsTimeout,
IPFSReplicationFactor: cm.ipfsReplicationFactor,
}
if replacement.NodeID == cm.localNodeID {
spawnErr = cm.spawnGatewayWithSystemd(ctx, gwCfg)
} else {
_, spawnErr = cm.spawnGatewayRemote(ctx, replacement.InternalIP, gwCfg)
}
if spawnErr != nil {
cm.logger.Error("Failed to spawn Gateway on new node (repair continues)",
zap.String("node", replacement.NodeID), zap.Error(spawnErr))
} else {
cm.insertClusterNode(ctx, cluster.ID, replacement.NodeID, NodeRoleGateway, portBlock)
cm.logEvent(ctx, cluster.ID, EventGatewayStarted, replacement.NodeID,
"Gateway started on new node (repair)", nil)
}
// 6. Add DNS records for the new node's public IP
dnsManager := NewDNSRecordManager(cm.db, cm.baseDomain, cm.logger)
if err := dnsManager.AddNamespaceRecord(ctx, cluster.NamespaceName, replacement.IPAddress); err != nil {
cm.logger.Error("Failed to add DNS record for new node",
zap.String("namespace", cluster.NamespaceName),
zap.String("ip", replacement.IPAddress),
zap.Error(err))
} else {
cm.logEvent(ctx, cluster.ID, EventDNSCreated, replacement.NodeID,
fmt.Sprintf("DNS record added for new node %s", replacement.IPAddress), nil)
}
cm.logEvent(ctx, cluster.ID, EventNodeReplaced, replacement.NodeID,
fmt.Sprintf("New node %s added to cluster (repair)", replacement.NodeID),
map[string]interface{}{"new_node": replacement.NodeID})
return replacement, portBlock, nil
}

View File

@ -182,6 +182,48 @@ func (drm *DNSRecordManager) GetNamespaceGatewayIPs(ctx context.Context, namespa
return ips, nil
}
// AddNamespaceRecord adds DNS A records for a single IP to an existing namespace.
// Unlike CreateNamespaceRecords, this does NOT delete existing records — it's purely additive.
// Used when adding a new node to an under-provisioned cluster (repair).
func (drm *DNSRecordManager) AddNamespaceRecord(ctx context.Context, namespaceName, ip string) error {
internalCtx := client.WithInternalAuth(ctx)
fqdn := fmt.Sprintf("ns-%s.%s.", namespaceName, drm.baseDomain)
wildcardFqdn := fmt.Sprintf("*.ns-%s.%s.", namespaceName, drm.baseDomain)
drm.logger.Info("Adding DNS record for namespace",
zap.String("namespace", namespaceName),
zap.String("ip", ip),
)
now := time.Now()
for _, f := range []string{fqdn, wildcardFqdn} {
recordID := uuid.New().String()
insertQuery := `
INSERT INTO dns_records (
id, fqdn, record_type, value, ttl, namespace, created_by, is_active, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
_, err := drm.db.Exec(internalCtx, insertQuery,
recordID, f, "A", ip, 60,
"namespace:"+namespaceName, "cluster-manager", true, now, now,
)
if err != nil {
return &ClusterError{
Message: fmt.Sprintf("failed to add DNS record %s -> %s", f, ip),
Cause: err,
}
}
}
drm.logger.Info("DNS records added for namespace",
zap.String("namespace", namespaceName),
zap.String("ip", ip),
)
return nil
}
// UpdateNamespaceRecord updates a specific node's DNS record (for failover)
func (drm *DNSRecordManager) UpdateNamespaceRecord(ctx context.Context, namespaceName, oldIP, newIP string) error {
internalCtx := client.WithInternalAuth(ctx)

View File

@ -117,6 +117,56 @@ func (cns *ClusterNodeSelector) SelectNodesForCluster(ctx context.Context, nodeC
return selectedNodes, nil
}
// SelectReplacementNode selects a single optimal node for replacing a dead node
// in an existing cluster. excludeNodeIDs contains nodes that should not be
// selected (dead node + existing cluster members).
func (cns *ClusterNodeSelector) SelectReplacementNode(ctx context.Context, excludeNodeIDs []string) (*NodeCapacity, error) {
internalCtx := client.WithInternalAuth(ctx)
activeNodes, err := cns.getActiveNodes(internalCtx)
if err != nil {
return nil, err
}
exclude := make(map[string]bool, len(excludeNodeIDs))
for _, id := range excludeNodeIDs {
exclude[id] = true
}
var eligible []NodeCapacity
for _, node := range activeNodes {
if exclude[node.NodeID] {
continue
}
capacity, err := cns.getNodeCapacity(internalCtx, node.NodeID, node.IPAddress, node.InternalIP)
if err != nil {
cns.logger.Warn("Failed to get node capacity for replacement, skipping",
zap.String("node_id", node.NodeID), zap.Error(err))
continue
}
if capacity.AvailableNamespaceSlots > 0 {
eligible = append(eligible, *capacity)
}
}
if len(eligible) == 0 {
return nil, ErrInsufficientNodes
}
sort.Slice(eligible, func(i, j int) bool {
return eligible[i].Score > eligible[j].Score
})
selected := &eligible[0]
cns.logger.Info("Selected replacement node",
zap.String("node_id", selected.NodeID),
zap.Float64("score", selected.Score),
zap.Int("available_slots", selected.AvailableNamespaceSlots),
)
return selected, nil
}
// nodeInfo is used for querying active nodes
type nodeInfo struct {
NodeID string `db:"id"`

View File

@ -58,6 +58,10 @@ const (
EventNodeRecovered EventType = "node_recovered"
EventDeprovisionStarted EventType = "deprovisioning_started"
EventDeprovisioned EventType = "deprovisioned"
EventRecoveryStarted EventType = "recovery_started"
EventNodeReplaced EventType = "node_replaced"
EventRecoveryComplete EventType = "recovery_complete"
EventRecoveryFailed EventType = "recovery_failed"
)
// Port allocation constants
@ -201,4 +205,5 @@ var (
ErrProvisioningFailed = &ClusterError{Message: "cluster provisioning failed"}
ErrNamespaceNotFound = &ClusterError{Message: "namespace not found"}
ErrInvalidClusterStatus = &ClusterError{Message: "invalid cluster status for operation"}
ErrRecoveryInProgress = &ClusterError{Message: "recovery already in progress for this cluster"}
)

View File

@ -57,10 +57,7 @@ func (n *Node) startHTTPGateway(ctx context.Context) error {
IPFSTimeout: n.config.HTTPGateway.IPFSTimeout,
BaseDomain: n.config.HTTPGateway.BaseDomain,
DataDir: oramaDir,
ClusterSecret: clusterSecret,
PhantomAuthURL: os.Getenv("PHANTOM_AUTH_URL"),
SolanaRPCURL: os.Getenv("SOLANA_RPC_URL"),
NFTCollectionAddress: os.Getenv("NFT_COLLECTION_ADDRESS"),
ClusterSecret: clusterSecret,
}
apiGateway, err := gateway.New(gatewayLogger, gwCfg)
@ -84,6 +81,7 @@ func (n *Node) startHTTPGateway(ctx context.Context) error {
clusterManager := namespace.NewClusterManager(ormClient, clusterCfg, n.logger.Logger)
clusterManager.SetLocalNodeID(gwCfg.NodePeerID)
apiGateway.SetClusterProvisioner(clusterManager)
apiGateway.SetNodeRecoverer(clusterManager)
// Wire spawn handler for distributed namespace instance spawning
systemdSpawner := namespace.NewSystemdSpawner(baseDataDir, n.logger.Logger)

View File

@ -60,7 +60,8 @@ type Monitor struct {
mu sync.Mutex
peers map[string]*peerState // nodeID → state
onDeadFn func(nodeID string) // callback when quorum confirms death
onDeadFn func(nodeID string) // callback when quorum confirms death
onRecoveredFn func(nodeID string) // callback when node transitions from dead → healthy
}
// NewMonitor creates a new health monitor.
@ -94,6 +95,12 @@ func (m *Monitor) OnNodeDead(fn func(nodeID string)) {
m.onDeadFn = fn
}
// OnNodeRecovered registers a callback invoked when a previously dead node
// transitions back to healthy. Used to clean up orphaned services.
func (m *Monitor) OnNodeRecovered(fn func(nodeID string)) {
m.onRecoveredFn = fn
}
// Start runs the monitor loop until ctx is cancelled.
func (m *Monitor) Start(ctx context.Context) {
m.logger.Info("Starting node health monitor",
@ -173,8 +180,17 @@ func (m *Monitor) updateState(ctx context.Context, nodeID string, healthy bool)
if healthy {
// Recovered
if ps.status != "healthy" {
m.logger.Info("Node recovered", zap.String("target", nodeID))
wasDead := ps.status == "dead"
m.logger.Info("Node recovered", zap.String("target", nodeID),
zap.String("previous_status", ps.status))
m.writeEvent(ctx, nodeID, "recovered")
// Fire recovery callback for nodes that were confirmed dead
if wasDead && m.onRecoveredFn != nil {
m.mu.Unlock()
m.onRecoveredFn(nodeID)
m.mu.Lock()
}
}
ps.missCount = 0
ps.status = "healthy"