package namespace import ( "bytes" "context" "encoding/json" "fmt" "io" "net" "net/http" "os" "path/filepath" "sort" "strings" "sync" "time" "github.com/DeBrosOfficial/network/pkg/gateway" "github.com/DeBrosOfficial/network/pkg/olric" "github.com/DeBrosOfficial/network/pkg/rqlite" "github.com/DeBrosOfficial/network/pkg/systemd" "github.com/google/uuid" "go.uber.org/zap" ) // ClusterManagerConfig contains configuration for the cluster manager type ClusterManagerConfig struct { BaseDomain string // Base domain for namespace gateways (e.g., "orama-devnet.network") BaseDataDir string // Base directory for namespace data (e.g., "~/.orama/data/namespaces") GlobalRQLiteDSN string // Global RQLite DSN for API key validation (e.g., "http://localhost:4001") // IPFS configuration for namespace gateways (defaults used if not set) IPFSClusterAPIURL string // IPFS Cluster API URL (default: "http://localhost:9094") IPFSAPIURL string // IPFS API URL (default: "http://localhost:4501") IPFSTimeout time.Duration // Timeout for IPFS operations (default: 60s) IPFSReplicationFactor int // IPFS replication factor (default: 3) } // ClusterManager orchestrates namespace cluster provisioning and lifecycle type ClusterManager struct { db rqlite.Client portAllocator *NamespacePortAllocator nodeSelector *ClusterNodeSelector systemdSpawner *SystemdSpawner // NEW: Systemd-based spawner replaces old spawners logger *zap.Logger baseDomain string baseDataDir string globalRQLiteDSN string // Global RQLite DSN for namespace gateway auth // IPFS configuration for namespace gateways ipfsClusterAPIURL string ipfsAPIURL string ipfsTimeout time.Duration ipfsReplicationFactor int // Local node identity for distributed spawning localNodeID string // Track provisioning operations provisioningMu sync.RWMutex provisioning map[string]bool // namespace -> in progress } // NewClusterManager creates a new cluster manager func NewClusterManager( db rqlite.Client, cfg ClusterManagerConfig, logger *zap.Logger, ) *ClusterManager { // Create internal components portAllocator := NewNamespacePortAllocator(db, logger) nodeSelector := NewClusterNodeSelector(db, portAllocator, logger) systemdSpawner := NewSystemdSpawner(cfg.BaseDataDir, logger) // Set IPFS defaults ipfsClusterAPIURL := cfg.IPFSClusterAPIURL if ipfsClusterAPIURL == "" { ipfsClusterAPIURL = "http://localhost:9094" } ipfsAPIURL := cfg.IPFSAPIURL if ipfsAPIURL == "" { ipfsAPIURL = "http://localhost:4501" } ipfsTimeout := cfg.IPFSTimeout if ipfsTimeout == 0 { ipfsTimeout = 60 * time.Second } ipfsReplicationFactor := cfg.IPFSReplicationFactor if ipfsReplicationFactor == 0 { ipfsReplicationFactor = 3 } return &ClusterManager{ db: db, portAllocator: portAllocator, nodeSelector: nodeSelector, systemdSpawner: systemdSpawner, baseDomain: cfg.BaseDomain, baseDataDir: cfg.BaseDataDir, globalRQLiteDSN: cfg.GlobalRQLiteDSN, ipfsClusterAPIURL: ipfsClusterAPIURL, ipfsAPIURL: ipfsAPIURL, ipfsTimeout: ipfsTimeout, ipfsReplicationFactor: ipfsReplicationFactor, logger: logger.With(zap.String("component", "cluster-manager")), provisioning: make(map[string]bool), } } // NewClusterManagerWithComponents creates a cluster manager with custom components (useful for testing) func NewClusterManagerWithComponents( db rqlite.Client, portAllocator *NamespacePortAllocator, nodeSelector *ClusterNodeSelector, systemdSpawner *SystemdSpawner, cfg ClusterManagerConfig, logger *zap.Logger, ) *ClusterManager { // Set IPFS defaults (same as NewClusterManager) ipfsClusterAPIURL := cfg.IPFSClusterAPIURL if ipfsClusterAPIURL == "" { ipfsClusterAPIURL = "http://localhost:9094" } ipfsAPIURL := cfg.IPFSAPIURL if ipfsAPIURL == "" { ipfsAPIURL = "http://localhost:4501" } ipfsTimeout := cfg.IPFSTimeout if ipfsTimeout == 0 { ipfsTimeout = 60 * time.Second } ipfsReplicationFactor := cfg.IPFSReplicationFactor if ipfsReplicationFactor == 0 { ipfsReplicationFactor = 3 } return &ClusterManager{ db: db, portAllocator: portAllocator, nodeSelector: nodeSelector, systemdSpawner: systemdSpawner, baseDomain: cfg.BaseDomain, baseDataDir: cfg.BaseDataDir, globalRQLiteDSN: cfg.GlobalRQLiteDSN, ipfsClusterAPIURL: ipfsClusterAPIURL, ipfsAPIURL: ipfsAPIURL, ipfsTimeout: ipfsTimeout, ipfsReplicationFactor: ipfsReplicationFactor, logger: logger.With(zap.String("component", "cluster-manager")), provisioning: make(map[string]bool), } } // SetLocalNodeID sets this node's peer ID for local/remote dispatch during provisioning func (cm *ClusterManager) SetLocalNodeID(id string) { cm.localNodeID = id cm.logger.Info("Local node ID set for distributed provisioning", zap.String("local_node_id", id)) } // spawnRQLiteWithSystemd generates config and spawns RQLite via systemd func (cm *ClusterManager) spawnRQLiteWithSystemd(ctx context.Context, cfg rqlite.InstanceConfig) error { // RQLite uses command-line args, no config file needed // Just call systemd spawner which will generate env file and start service return cm.systemdSpawner.SpawnRQLite(ctx, cfg.Namespace, cfg.NodeID, cfg) } // spawnOlricWithSystemd spawns Olric via systemd (config creation now handled by spawner) func (cm *ClusterManager) spawnOlricWithSystemd(ctx context.Context, cfg olric.InstanceConfig) error { // SystemdSpawner now handles config file creation return cm.systemdSpawner.SpawnOlric(ctx, cfg.Namespace, cfg.NodeID, cfg) } // writePeersJSON writes RQLite peers.json file for Raft cluster recovery func (cm *ClusterManager) writePeersJSON(dataDir string, peers []rqlite.RaftPeer) error { raftDir := filepath.Join(dataDir, "raft") if err := os.MkdirAll(raftDir, 0755); err != nil { return fmt.Errorf("failed to create raft directory: %w", err) } peersFile := filepath.Join(raftDir, "peers.json") data, err := json.Marshal(peers) if err != nil { return fmt.Errorf("failed to marshal peers: %w", err) } return os.WriteFile(peersFile, data, 0644) } // spawnGatewayWithSystemd spawns Gateway via systemd (config creation now handled by spawner) func (cm *ClusterManager) spawnGatewayWithSystemd(ctx context.Context, cfg gateway.InstanceConfig) error { // SystemdSpawner now handles config file creation return cm.systemdSpawner.SpawnGateway(ctx, cfg.Namespace, cfg.NodeID, cfg) } // ProvisionCluster provisions a new 3-node cluster for a namespace // This is an async operation - returns immediately with cluster ID for polling func (cm *ClusterManager) ProvisionCluster(ctx context.Context, namespaceID int, namespaceName, provisionedBy string) (*NamespaceCluster, error) { // Check if already provisioning cm.provisioningMu.Lock() if cm.provisioning[namespaceName] { cm.provisioningMu.Unlock() return nil, fmt.Errorf("namespace %s is already being provisioned", namespaceName) } cm.provisioning[namespaceName] = true cm.provisioningMu.Unlock() defer func() { cm.provisioningMu.Lock() delete(cm.provisioning, namespaceName) cm.provisioningMu.Unlock() }() cm.logger.Info("Starting cluster provisioning", zap.String("namespace", namespaceName), zap.Int("namespace_id", namespaceID), zap.String("provisioned_by", provisionedBy), ) // Create cluster record cluster := &NamespaceCluster{ ID: uuid.New().String(), NamespaceID: namespaceID, NamespaceName: namespaceName, Status: ClusterStatusProvisioning, RQLiteNodeCount: 3, OlricNodeCount: 3, GatewayNodeCount: 3, ProvisionedBy: provisionedBy, ProvisionedAt: time.Now(), } // Insert cluster record if err := cm.insertCluster(ctx, cluster); err != nil { return nil, fmt.Errorf("failed to insert cluster record: %w", err) } // Log event cm.logEvent(ctx, cluster.ID, EventProvisioningStarted, "", "Cluster provisioning started", nil) // Select 3 nodes for the cluster nodes, err := cm.nodeSelector.SelectNodesForCluster(ctx, 3) if err != nil { cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusFailed, err.Error()) return nil, fmt.Errorf("failed to select nodes: %w", err) } nodeIDs := make([]string, len(nodes)) for i, n := range nodes { nodeIDs[i] = n.NodeID } cm.logEvent(ctx, cluster.ID, EventNodesSelected, "", "Selected nodes for cluster", map[string]interface{}{"nodes": nodeIDs}) // Allocate ports on each node portBlocks := make([]*PortBlock, len(nodes)) for i, node := range nodes { block, err := cm.portAllocator.AllocatePortBlock(ctx, node.NodeID, cluster.ID) if err != nil { // Rollback previous allocations for j := 0; j < i; j++ { cm.portAllocator.DeallocatePortBlock(ctx, cluster.ID, nodes[j].NodeID) } cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusFailed, err.Error()) return nil, fmt.Errorf("failed to allocate ports on node %s: %w", node.NodeID, err) } portBlocks[i] = block cm.logEvent(ctx, cluster.ID, EventPortsAllocated, node.NodeID, fmt.Sprintf("Allocated ports %d-%d", block.PortStart, block.PortEnd), nil) } // Start RQLite instances (leader first, then followers) rqliteInstances, err := cm.startRQLiteCluster(ctx, cluster, nodes, portBlocks) if err != nil { cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, nil, nil) return nil, fmt.Errorf("failed to start RQLite cluster: %w", err) } // Start Olric instances olricInstances, err := cm.startOlricCluster(ctx, cluster, nodes, portBlocks) if err != nil { cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, rqliteInstances, nil) return nil, fmt.Errorf("failed to start Olric cluster: %w", err) } // Start Gateway instances (optional - may not be available in dev mode) _, err = cm.startGatewayCluster(ctx, cluster, nodes, portBlocks, rqliteInstances, olricInstances) if err != nil { // Check if this is a "binary not found" error - if so, continue without gateways if strings.Contains(err.Error(), "gateway binary not found") { cm.logger.Warn("Skipping namespace gateway spawning (binary not available)", zap.String("namespace", cluster.NamespaceName), zap.Error(err), ) cm.logEvent(ctx, cluster.ID, "gateway_skipped", "", "Gateway binary not available, cluster will use main gateway", nil) } else { cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, rqliteInstances, olricInstances) return nil, fmt.Errorf("failed to start Gateway cluster: %w", err) } } // Create DNS records for namespace gateway if err := cm.createDNSRecords(ctx, cluster, nodes, portBlocks); err != nil { cm.logger.Warn("Failed to create DNS records", zap.Error(err)) // Don't fail provisioning for DNS errors } // Update cluster status to ready now := time.Now() cluster.Status = ClusterStatusReady cluster.ReadyAt = &now cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusReady, "") cm.logEvent(ctx, cluster.ID, EventClusterReady, "", "Cluster is ready", nil) cm.logger.Info("Cluster provisioning completed", zap.String("cluster_id", cluster.ID), zap.String("namespace", namespaceName), ) return cluster, nil } // startRQLiteCluster starts RQLite instances on all nodes (locally or remotely) func (cm *ClusterManager) startRQLiteCluster(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) ([]*rqlite.Instance, error) { instances := make([]*rqlite.Instance, len(nodes)) // Start leader first (node 0) leaderCfg := rqlite.InstanceConfig{ Namespace: cluster.NamespaceName, NodeID: nodes[0].NodeID, HTTPPort: portBlocks[0].RQLiteHTTPPort, RaftPort: portBlocks[0].RQLiteRaftPort, HTTPAdvAddress: fmt.Sprintf("%s:%d", nodes[0].InternalIP, portBlocks[0].RQLiteHTTPPort), RaftAdvAddress: fmt.Sprintf("%s:%d", nodes[0].InternalIP, portBlocks[0].RQLiteRaftPort), IsLeader: true, } var err error if nodes[0].NodeID == cm.localNodeID { cm.logger.Info("Spawning RQLite leader locally", zap.String("node", nodes[0].NodeID)) err = cm.spawnRQLiteWithSystemd(ctx, leaderCfg) if err == nil { // Create Instance object for consistency with existing code instances[0] = &rqlite.Instance{ Config: leaderCfg, } } } else { cm.logger.Info("Spawning RQLite leader remotely", zap.String("node", nodes[0].NodeID), zap.String("ip", nodes[0].InternalIP)) instances[0], err = cm.spawnRQLiteRemote(ctx, nodes[0].InternalIP, leaderCfg) } if err != nil { return nil, fmt.Errorf("failed to start RQLite leader: %w", err) } cm.logEvent(ctx, cluster.ID, EventRQLiteStarted, nodes[0].NodeID, "RQLite leader started", nil) cm.logEvent(ctx, cluster.ID, EventRQLiteLeaderElected, nodes[0].NodeID, "RQLite leader elected", nil) if err := cm.insertClusterNode(ctx, cluster.ID, nodes[0].NodeID, NodeRoleRQLiteLeader, portBlocks[0]); err != nil { cm.logger.Warn("Failed to record cluster node", zap.Error(err)) } // Start followers leaderRaftAddr := leaderCfg.RaftAdvAddress for i := 1; i < len(nodes); i++ { followerCfg := rqlite.InstanceConfig{ Namespace: cluster.NamespaceName, NodeID: nodes[i].NodeID, HTTPPort: portBlocks[i].RQLiteHTTPPort, RaftPort: portBlocks[i].RQLiteRaftPort, HTTPAdvAddress: fmt.Sprintf("%s:%d", nodes[i].InternalIP, portBlocks[i].RQLiteHTTPPort), RaftAdvAddress: fmt.Sprintf("%s:%d", nodes[i].InternalIP, portBlocks[i].RQLiteRaftPort), JoinAddresses: []string{leaderRaftAddr}, IsLeader: false, } var followerInstance *rqlite.Instance if nodes[i].NodeID == cm.localNodeID { cm.logger.Info("Spawning RQLite follower locally", zap.String("node", nodes[i].NodeID)) err = cm.spawnRQLiteWithSystemd(ctx, followerCfg) if err == nil { followerInstance = &rqlite.Instance{ Config: followerCfg, } } } else { cm.logger.Info("Spawning RQLite follower remotely", zap.String("node", nodes[i].NodeID), zap.String("ip", nodes[i].InternalIP)) followerInstance, err = cm.spawnRQLiteRemote(ctx, nodes[i].InternalIP, followerCfg) } if err != nil { // Stop previously started instances for j := 0; j < i; j++ { cm.stopRQLiteOnNode(ctx, nodes[j].NodeID, nodes[j].InternalIP, cluster.NamespaceName, instances[j]) } return nil, fmt.Errorf("failed to start RQLite follower on node %s: %w", nodes[i].NodeID, err) } instances[i] = followerInstance cm.logEvent(ctx, cluster.ID, EventRQLiteStarted, nodes[i].NodeID, "RQLite follower started", nil) cm.logEvent(ctx, cluster.ID, EventRQLiteJoined, nodes[i].NodeID, "RQLite follower joined cluster", nil) if err := cm.insertClusterNode(ctx, cluster.ID, nodes[i].NodeID, NodeRoleRQLiteFollower, portBlocks[i]); err != nil { cm.logger.Warn("Failed to record cluster node", zap.Error(err)) } } return instances, nil } // startOlricCluster starts Olric instances on all nodes concurrently. // Olric uses memberlist for peer discovery — all peers must be reachable at roughly // the same time. Sequential spawning fails because early instances exhaust their // retry budget before later instances start. By spawning all concurrently, all // memberlist ports open within seconds of each other, allowing discovery to succeed. func (cm *ClusterManager) startOlricCluster(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) ([]*olric.OlricInstance, error) { instances := make([]*olric.OlricInstance, len(nodes)) errs := make([]error, len(nodes)) // Build configs for all nodes upfront configs := make([]olric.InstanceConfig, len(nodes)) for i, node := range nodes { var peers []string for j, peerNode := range nodes { if j != i { peers = append(peers, fmt.Sprintf("%s:%d", peerNode.InternalIP, portBlocks[j].OlricMemberlistPort)) } } configs[i] = olric.InstanceConfig{ Namespace: cluster.NamespaceName, NodeID: node.NodeID, HTTPPort: portBlocks[i].OlricHTTPPort, MemberlistPort: portBlocks[i].OlricMemberlistPort, BindAddr: node.InternalIP, // Bind to WG IP directly (0.0.0.0 resolves to IPv6 on some hosts) AdvertiseAddr: node.InternalIP, // Advertise WG IP to peers PeerAddresses: peers, } } // Spawn all instances concurrently var wg sync.WaitGroup for i, node := range nodes { wg.Add(1) go func(idx int, n NodeCapacity) { defer wg.Done() if n.NodeID == cm.localNodeID { cm.logger.Info("Spawning Olric locally", zap.String("node", n.NodeID)) errs[idx] = cm.spawnOlricWithSystemd(ctx, configs[idx]) if errs[idx] == nil { instances[idx] = &olric.OlricInstance{ Namespace: configs[idx].Namespace, NodeID: configs[idx].NodeID, HTTPPort: configs[idx].HTTPPort, MemberlistPort: configs[idx].MemberlistPort, BindAddr: configs[idx].BindAddr, AdvertiseAddr: configs[idx].AdvertiseAddr, PeerAddresses: configs[idx].PeerAddresses, Status: olric.InstanceStatusRunning, StartedAt: time.Now(), } } } else { cm.logger.Info("Spawning Olric remotely", zap.String("node", n.NodeID), zap.String("ip", n.InternalIP)) instances[idx], errs[idx] = cm.spawnOlricRemote(ctx, n.InternalIP, configs[idx]) } }(i, node) } wg.Wait() // Check for errors — if any failed, stop all and return for i, err := range errs { if err != nil { cm.logger.Error("Olric spawn failed", zap.String("node", nodes[i].NodeID), zap.Error(err)) // Stop any that succeeded for j := range nodes { if errs[j] == nil { cm.stopOlricOnNode(ctx, nodes[j].NodeID, nodes[j].InternalIP, cluster.NamespaceName) } } return nil, fmt.Errorf("failed to start Olric on node %s: %w", nodes[i].NodeID, err) } } // All instances started — give memberlist time to converge. // Olric's memberlist retries peer joins every ~1s for ~10 attempts. // Since all instances are now up, they should discover each other quickly. cm.logger.Info("All Olric instances started, waiting for memberlist convergence", zap.Int("node_count", len(nodes)), ) time.Sleep(5 * time.Second) // Log events and record cluster nodes for i, node := range nodes { cm.logEvent(ctx, cluster.ID, EventOlricStarted, node.NodeID, "Olric instance started", nil) cm.logEvent(ctx, cluster.ID, EventOlricJoined, node.NodeID, "Olric instance joined memberlist", nil) if err := cm.insertClusterNode(ctx, cluster.ID, node.NodeID, NodeRoleOlric, portBlocks[i]); err != nil { cm.logger.Warn("Failed to record cluster node", zap.Error(err)) } } // Verify at least the local instance is still healthy after convergence for i, node := range nodes { if node.NodeID == cm.localNodeID && instances[i] != nil { healthy, err := instances[i].IsHealthy(ctx) if !healthy { cm.logger.Warn("Local Olric instance unhealthy after convergence wait", zap.Error(err)) } else { cm.logger.Info("Local Olric instance healthy after convergence") } } } return instances, nil } // startGatewayCluster starts Gateway instances on all nodes (locally or remotely) func (cm *ClusterManager) startGatewayCluster(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock, rqliteInstances []*rqlite.Instance, olricInstances []*olric.OlricInstance) ([]*gateway.GatewayInstance, error) { instances := make([]*gateway.GatewayInstance, len(nodes)) // Build Olric server addresses — always use WireGuard IPs (Olric binds to WireGuard interface) olricServers := make([]string, len(olricInstances)) for i, inst := range olricInstances { olricServers[i] = inst.AdvertisedDSN() // Always use WireGuard IP } // Start all Gateway instances for i, node := range nodes { // Connect to local RQLite instance on each node rqliteDSN := fmt.Sprintf("http://localhost:%d", portBlocks[i].RQLiteHTTPPort) cfg := gateway.InstanceConfig{ Namespace: cluster.NamespaceName, NodeID: node.NodeID, HTTPPort: portBlocks[i].GatewayHTTPPort, BaseDomain: cm.baseDomain, RQLiteDSN: rqliteDSN, GlobalRQLiteDSN: cm.globalRQLiteDSN, OlricServers: olricServers, OlricTimeout: 30 * time.Second, IPFSClusterAPIURL: cm.ipfsClusterAPIURL, IPFSAPIURL: cm.ipfsAPIURL, IPFSTimeout: cm.ipfsTimeout, IPFSReplicationFactor: cm.ipfsReplicationFactor, } var instance *gateway.GatewayInstance var err error if node.NodeID == cm.localNodeID { cm.logger.Info("Spawning Gateway locally", zap.String("node", node.NodeID)) err = cm.spawnGatewayWithSystemd(ctx, cfg) if err == nil { instance = &gateway.GatewayInstance{ Namespace: cfg.Namespace, NodeID: cfg.NodeID, HTTPPort: cfg.HTTPPort, BaseDomain: cfg.BaseDomain, RQLiteDSN: cfg.RQLiteDSN, OlricServers: cfg.OlricServers, Status: gateway.InstanceStatusRunning, StartedAt: time.Now(), } } } else { cm.logger.Info("Spawning Gateway remotely", zap.String("node", node.NodeID), zap.String("ip", node.InternalIP)) instance, err = cm.spawnGatewayRemote(ctx, node.InternalIP, cfg) } if err != nil { // Stop previously started instances for j := 0; j < i; j++ { cm.stopGatewayOnNode(ctx, nodes[j].NodeID, nodes[j].InternalIP, cluster.NamespaceName) } return nil, fmt.Errorf("failed to start Gateway on node %s: %w", node.NodeID, err) } instances[i] = instance cm.logEvent(ctx, cluster.ID, EventGatewayStarted, node.NodeID, "Gateway instance started", nil) if err := cm.insertClusterNode(ctx, cluster.ID, node.NodeID, NodeRoleGateway, portBlocks[i]); err != nil { cm.logger.Warn("Failed to record cluster node", zap.Error(err)) } } return instances, nil } // spawnRQLiteRemote sends a spawn-rqlite request to a remote node func (cm *ClusterManager) spawnRQLiteRemote(ctx context.Context, nodeIP string, cfg rqlite.InstanceConfig) (*rqlite.Instance, error) { resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{ "action": "spawn-rqlite", "namespace": cfg.Namespace, "node_id": cfg.NodeID, "rqlite_http_port": cfg.HTTPPort, "rqlite_raft_port": cfg.RaftPort, "rqlite_http_adv_addr": cfg.HTTPAdvAddress, "rqlite_raft_adv_addr": cfg.RaftAdvAddress, "rqlite_join_addrs": cfg.JoinAddresses, "rqlite_is_leader": cfg.IsLeader, }) if err != nil { return nil, err } return &rqlite.Instance{PID: resp.PID}, nil } // spawnOlricRemote sends a spawn-olric request to a remote node func (cm *ClusterManager) spawnOlricRemote(ctx context.Context, nodeIP string, cfg olric.InstanceConfig) (*olric.OlricInstance, error) { resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{ "action": "spawn-olric", "namespace": cfg.Namespace, "node_id": cfg.NodeID, "olric_http_port": cfg.HTTPPort, "olric_memberlist_port": cfg.MemberlistPort, "olric_bind_addr": cfg.BindAddr, "olric_advertise_addr": cfg.AdvertiseAddr, "olric_peer_addresses": cfg.PeerAddresses, }) if err != nil { return nil, err } return &olric.OlricInstance{ PID: resp.PID, HTTPPort: cfg.HTTPPort, MemberlistPort: cfg.MemberlistPort, BindAddr: cfg.BindAddr, AdvertiseAddr: cfg.AdvertiseAddr, }, nil } // spawnGatewayRemote sends a spawn-gateway request to a remote node func (cm *ClusterManager) spawnGatewayRemote(ctx context.Context, nodeIP string, cfg gateway.InstanceConfig) (*gateway.GatewayInstance, error) { ipfsTimeout := "" if cfg.IPFSTimeout > 0 { ipfsTimeout = cfg.IPFSTimeout.String() } resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{ "action": "spawn-gateway", "namespace": cfg.Namespace, "node_id": cfg.NodeID, "gateway_http_port": cfg.HTTPPort, "gateway_base_domain": cfg.BaseDomain, "gateway_rqlite_dsn": cfg.RQLiteDSN, "gateway_olric_servers": cfg.OlricServers, "ipfs_cluster_api_url": cfg.IPFSClusterAPIURL, "ipfs_api_url": cfg.IPFSAPIURL, "ipfs_timeout": ipfsTimeout, "ipfs_replication_factor": cfg.IPFSReplicationFactor, }) if err != nil { return nil, err } return &gateway.GatewayInstance{ Namespace: cfg.Namespace, NodeID: cfg.NodeID, HTTPPort: cfg.HTTPPort, BaseDomain: cfg.BaseDomain, RQLiteDSN: cfg.RQLiteDSN, OlricServers: cfg.OlricServers, PID: resp.PID, }, nil } // spawnResponse represents the JSON response from a spawn request type spawnResponse struct { Success bool `json:"success"` Error string `json:"error,omitempty"` PID int `json:"pid,omitempty"` } // sendSpawnRequest sends a spawn/stop request to a remote node's spawn endpoint func (cm *ClusterManager) sendSpawnRequest(ctx context.Context, nodeIP string, req map[string]interface{}) (*spawnResponse, error) { url := fmt.Sprintf("http://%s:6001/v1/internal/namespace/spawn", nodeIP) body, err := json.Marshal(req) if err != nil { return nil, fmt.Errorf("failed to marshal spawn request: %w", err) } httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("X-Orama-Internal-Auth", "namespace-coordination") client := &http.Client{Timeout: 60 * time.Second} resp, err := client.Do(httpReq) if err != nil { return nil, fmt.Errorf("failed to send spawn request to %s: %w", nodeIP, err) } defer resp.Body.Close() respBody, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read response from %s: %w", nodeIP, err) } var spawnResp spawnResponse if err := json.Unmarshal(respBody, &spawnResp); err != nil { return nil, fmt.Errorf("failed to decode response from %s: %w", nodeIP, err) } if !spawnResp.Success { return nil, fmt.Errorf("spawn request failed on %s: %s", nodeIP, spawnResp.Error) } return &spawnResp, nil } // stopRQLiteOnNode stops a RQLite instance on a node (local or remote) func (cm *ClusterManager) stopRQLiteOnNode(ctx context.Context, nodeID, nodeIP, namespace string, inst *rqlite.Instance) { if nodeID == cm.localNodeID { cm.systemdSpawner.StopRQLite(ctx, namespace, nodeID) } else { cm.sendStopRequest(ctx, nodeIP, "stop-rqlite", namespace, nodeID) } } // stopOlricOnNode stops an Olric instance on a node (local or remote) func (cm *ClusterManager) stopOlricOnNode(ctx context.Context, nodeID, nodeIP, namespace string) { if nodeID == cm.localNodeID { cm.systemdSpawner.StopOlric(ctx, namespace, nodeID) } else { cm.sendStopRequest(ctx, nodeIP, "stop-olric", namespace, nodeID) } } // stopGatewayOnNode stops a Gateway instance on a node (local or remote) func (cm *ClusterManager) stopGatewayOnNode(ctx context.Context, nodeID, nodeIP, namespace string) { if nodeID == cm.localNodeID { cm.systemdSpawner.StopGateway(ctx, namespace, nodeID) } else { cm.sendStopRequest(ctx, nodeIP, "stop-gateway", namespace, nodeID) } } // sendStopRequest sends a stop request to a remote node func (cm *ClusterManager) sendStopRequest(ctx context.Context, nodeIP, action, namespace, nodeID string) { _, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{ "action": action, "namespace": namespace, "node_id": nodeID, }) if err != nil { cm.logger.Warn("Failed to send stop request to remote node", zap.String("node_ip", nodeIP), zap.String("action", action), zap.Error(err), ) } } // createDNSRecords creates DNS records for the namespace gateway. // Creates A records pointing to the public IPs of nodes running the namespace gateway cluster. func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) error { fqdn := fmt.Sprintf("ns-%s.%s.", cluster.NamespaceName, cm.baseDomain) // Collect public IPs from the selected cluster nodes var gatewayIPs []string for _, node := range nodes { if node.IPAddress != "" { gatewayIPs = append(gatewayIPs, node.IPAddress) } } if len(gatewayIPs) == 0 { cm.logger.Error("No valid node IPs found for DNS records", zap.String("namespace", cluster.NamespaceName), zap.Int("node_count", len(nodes)), ) return fmt.Errorf("no valid node IPs found for DNS records") } cm.logger.Info("Creating DNS records for namespace gateway", zap.String("namespace", cluster.NamespaceName), zap.Strings("ips", gatewayIPs), ) recordCount := 0 for _, ip := range gatewayIPs { query := ` INSERT INTO dns_records (fqdn, record_type, value, ttl, namespace, created_by) VALUES (?, 'A', ?, 300, ?, 'system') ` _, err := cm.db.Exec(ctx, query, fqdn, ip, cluster.NamespaceName) if err != nil { cm.logger.Warn("Failed to create DNS record", zap.String("fqdn", fqdn), zap.String("ip", ip), zap.Error(err), ) } else { cm.logger.Info("Created DNS A record for gateway node", zap.String("fqdn", fqdn), zap.String("ip", ip), ) recordCount++ } } cm.logEvent(ctx, cluster.ID, EventDNSCreated, "", fmt.Sprintf("DNS records created for %s (%d gateway node records)", fqdn, recordCount), nil) return nil } // rollbackProvisioning cleans up a failed provisioning attempt func (cm *ClusterManager) rollbackProvisioning(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock, rqliteInstances []*rqlite.Instance, olricInstances []*olric.OlricInstance) { cm.logger.Info("Rolling back failed provisioning", zap.String("cluster_id", cluster.ID)) // Stop all namespace services (Gateway, Olric, RQLite) using systemd cm.systemdSpawner.StopAll(ctx, cluster.NamespaceName) // Stop Olric instances on each node if olricInstances != nil && nodes != nil { for _, node := range nodes { cm.stopOlricOnNode(ctx, node.NodeID, node.InternalIP, cluster.NamespaceName) } } // Stop RQLite instances on each node if rqliteInstances != nil && nodes != nil { for i, inst := range rqliteInstances { if inst != nil && i < len(nodes) { cm.stopRQLiteOnNode(ctx, nodes[i].NodeID, nodes[i].InternalIP, cluster.NamespaceName, inst) } } } // Deallocate ports cm.portAllocator.DeallocateAllPortBlocks(ctx, cluster.ID) // Update cluster status cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusFailed, "Provisioning failed and rolled back") } // DeprovisionCluster tears down a namespace cluster func (cm *ClusterManager) DeprovisionCluster(ctx context.Context, namespaceID int64) error { cluster, err := cm.GetClusterByNamespaceID(ctx, namespaceID) if err != nil { return fmt.Errorf("failed to get cluster: %w", err) } if cluster == nil { return nil // No cluster to deprovision } cm.logger.Info("Starting cluster deprovisioning", zap.String("cluster_id", cluster.ID), zap.String("namespace", cluster.NamespaceName), ) cm.logEvent(ctx, cluster.ID, EventDeprovisionStarted, "", "Cluster deprovisioning started", nil) cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusDeprovisioning, "") // Stop all services using systemd cm.systemdSpawner.StopAll(ctx, cluster.NamespaceName) // Deallocate all ports cm.portAllocator.DeallocateAllPortBlocks(ctx, cluster.ID) // Delete DNS records query := `DELETE FROM dns_records WHERE namespace = ?` cm.db.Exec(ctx, query, cluster.NamespaceName) // Delete cluster record query = `DELETE FROM namespace_clusters WHERE id = ?` cm.db.Exec(ctx, query, cluster.ID) cm.logEvent(ctx, cluster.ID, EventDeprovisioned, "", "Cluster deprovisioned", nil) cm.logger.Info("Cluster deprovisioning completed", zap.String("cluster_id", cluster.ID)) return nil } // GetClusterStatus returns the current status of a namespace cluster func (cm *ClusterManager) GetClusterStatus(ctx context.Context, clusterID string) (*ClusterProvisioningStatus, error) { cluster, err := cm.GetCluster(ctx, clusterID) if err != nil { return nil, err } if cluster == nil { return nil, fmt.Errorf("cluster not found") } status := &ClusterProvisioningStatus{ Status: cluster.Status, ClusterID: cluster.ID, } // Check individual service status // TODO: Actually check each service's health if cluster.Status == ClusterStatusReady { status.RQLiteReady = true status.OlricReady = true status.GatewayReady = true status.DNSReady = true } // Get node list nodes, err := cm.getClusterNodes(ctx, clusterID) if err == nil { for _, node := range nodes { status.Nodes = append(status.Nodes, node.NodeID) } } if cluster.ErrorMessage != "" { status.Error = cluster.ErrorMessage } return status, nil } // GetCluster retrieves a cluster by ID func (cm *ClusterManager) GetCluster(ctx context.Context, clusterID string) (*NamespaceCluster, error) { var clusters []NamespaceCluster query := `SELECT * FROM namespace_clusters WHERE id = ?` if err := cm.db.Query(ctx, &clusters, query, clusterID); err != nil { return nil, err } if len(clusters) == 0 { return nil, nil } return &clusters[0], nil } // GetClusterByNamespaceID retrieves a cluster by namespace ID func (cm *ClusterManager) GetClusterByNamespaceID(ctx context.Context, namespaceID int64) (*NamespaceCluster, error) { var clusters []NamespaceCluster query := `SELECT * FROM namespace_clusters WHERE namespace_id = ?` if err := cm.db.Query(ctx, &clusters, query, namespaceID); err != nil { return nil, err } if len(clusters) == 0 { return nil, nil } return &clusters[0], nil } // GetClusterByNamespace retrieves a cluster by namespace name func (cm *ClusterManager) GetClusterByNamespace(ctx context.Context, namespaceName string) (*NamespaceCluster, error) { var clusters []NamespaceCluster query := `SELECT * FROM namespace_clusters WHERE namespace_name = ?` if err := cm.db.Query(ctx, &clusters, query, namespaceName); err != nil { return nil, err } if len(clusters) == 0 { return nil, nil } return &clusters[0], nil } // Database helper methods func (cm *ClusterManager) insertCluster(ctx context.Context, cluster *NamespaceCluster) error { query := ` INSERT INTO namespace_clusters ( id, namespace_id, namespace_name, status, rqlite_node_count, olric_node_count, gateway_node_count, provisioned_by, provisioned_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ` _, err := cm.db.Exec(ctx, query, cluster.ID, cluster.NamespaceID, cluster.NamespaceName, cluster.Status, cluster.RQLiteNodeCount, cluster.OlricNodeCount, cluster.GatewayNodeCount, cluster.ProvisionedBy, cluster.ProvisionedAt, ) return err } func (cm *ClusterManager) updateClusterStatus(ctx context.Context, clusterID string, status ClusterStatus, errorMsg string) error { var query string var args []interface{} if status == ClusterStatusReady { query = `UPDATE namespace_clusters SET status = ?, ready_at = ?, error_message = '' WHERE id = ?` args = []interface{}{status, time.Now(), clusterID} } else { query = `UPDATE namespace_clusters SET status = ?, error_message = ? WHERE id = ?` args = []interface{}{status, errorMsg, clusterID} } _, err := cm.db.Exec(ctx, query, args...) return err } func (cm *ClusterManager) insertClusterNode(ctx context.Context, clusterID, nodeID string, role NodeRole, portBlock *PortBlock) error { query := ` INSERT INTO namespace_cluster_nodes ( id, namespace_cluster_id, node_id, role, status, rqlite_http_port, rqlite_raft_port, olric_http_port, olric_memberlist_port, gateway_http_port, created_at, updated_at ) VALUES (?, ?, ?, ?, 'running', ?, ?, ?, ?, ?, ?, ?) ` now := time.Now() _, err := cm.db.Exec(ctx, query, uuid.New().String(), clusterID, nodeID, role, portBlock.RQLiteHTTPPort, portBlock.RQLiteRaftPort, portBlock.OlricHTTPPort, portBlock.OlricMemberlistPort, portBlock.GatewayHTTPPort, now, now, ) return err } func (cm *ClusterManager) getClusterNodes(ctx context.Context, clusterID string) ([]ClusterNode, error) { var nodes []ClusterNode query := `SELECT * FROM namespace_cluster_nodes WHERE namespace_cluster_id = ?` if err := cm.db.Query(ctx, &nodes, query, clusterID); err != nil { return nil, err } return nodes, nil } func (cm *ClusterManager) logEvent(ctx context.Context, clusterID string, eventType EventType, nodeID, message string, metadata map[string]interface{}) { metadataJSON := "" if metadata != nil { if data, err := json.Marshal(metadata); err == nil { metadataJSON = string(data) } } query := ` INSERT INTO namespace_cluster_events (id, namespace_cluster_id, event_type, node_id, message, metadata, created_at) VALUES (?, ?, ?, ?, ?, ?, ?) ` _, err := cm.db.Exec(ctx, query, uuid.New().String(), clusterID, eventType, nodeID, message, metadataJSON, time.Now()) if err != nil { cm.logger.Warn("Failed to log cluster event", zap.Error(err)) } } // ClusterProvisioner interface implementation // CheckNamespaceCluster checks if a namespace has a cluster and returns its status. // Returns: (clusterID, status, needsProvisioning, error) // - If the namespace is "default", returns ("", "default", false, nil) as it uses the global cluster // - If a cluster exists and is ready/provisioning, returns (clusterID, status, false, nil) // - If no cluster exists or cluster failed, returns ("", "", true, nil) to indicate provisioning is needed func (cm *ClusterManager) CheckNamespaceCluster(ctx context.Context, namespaceName string) (string, string, bool, error) { // Default namespace uses the global cluster, no per-namespace cluster needed if namespaceName == "default" || namespaceName == "" { return "", "default", false, nil } cluster, err := cm.GetClusterByNamespace(ctx, namespaceName) if err != nil { return "", "", false, err } if cluster == nil { // No cluster exists, provisioning is needed return "", "", true, nil } // If the cluster failed, delete the old record and trigger re-provisioning if cluster.Status == ClusterStatusFailed { cm.logger.Info("Found failed cluster, will re-provision", zap.String("namespace", namespaceName), zap.String("cluster_id", cluster.ID), ) // Delete the failed cluster record query := `DELETE FROM namespace_clusters WHERE id = ?` cm.db.Exec(ctx, query, cluster.ID) // Also clean up any port allocations cm.portAllocator.DeallocateAllPortBlocks(ctx, cluster.ID) return "", "", true, nil } // Return current status return cluster.ID, string(cluster.Status), false, nil } // ProvisionNamespaceCluster triggers provisioning for a new namespace cluster. // Returns: (clusterID, pollURL, error) // This starts an async provisioning process and returns immediately with the cluster ID // and a URL to poll for status updates. func (cm *ClusterManager) ProvisionNamespaceCluster(ctx context.Context, namespaceID int, namespaceName, wallet string) (string, string, error) { // Check if already provisioning cm.provisioningMu.Lock() if cm.provisioning[namespaceName] { cm.provisioningMu.Unlock() // Return existing cluster ID if found cluster, _ := cm.GetClusterByNamespace(ctx, namespaceName) if cluster != nil { return cluster.ID, "/v1/namespace/status?id=" + cluster.ID, nil } return "", "", fmt.Errorf("namespace %s is already being provisioned", namespaceName) } cm.provisioning[namespaceName] = true cm.provisioningMu.Unlock() // Create cluster record synchronously to get the ID cluster := &NamespaceCluster{ ID: uuid.New().String(), NamespaceID: namespaceID, NamespaceName: namespaceName, Status: ClusterStatusProvisioning, RQLiteNodeCount: 3, OlricNodeCount: 3, GatewayNodeCount: 3, ProvisionedBy: wallet, ProvisionedAt: time.Now(), } // Insert cluster record if err := cm.insertCluster(ctx, cluster); err != nil { cm.provisioningMu.Lock() delete(cm.provisioning, namespaceName) cm.provisioningMu.Unlock() return "", "", fmt.Errorf("failed to insert cluster record: %w", err) } cm.logEvent(ctx, cluster.ID, EventProvisioningStarted, "", "Cluster provisioning started", nil) // Start actual provisioning in background goroutine go cm.provisionClusterAsync(cluster, namespaceID, namespaceName, wallet) pollURL := "/v1/namespace/status?id=" + cluster.ID return cluster.ID, pollURL, nil } // provisionClusterAsync performs the actual cluster provisioning in the background func (cm *ClusterManager) provisionClusterAsync(cluster *NamespaceCluster, namespaceID int, namespaceName, provisionedBy string) { defer func() { cm.provisioningMu.Lock() delete(cm.provisioning, namespaceName) cm.provisioningMu.Unlock() }() ctx := context.Background() cm.logger.Info("Starting async cluster provisioning", zap.String("cluster_id", cluster.ID), zap.String("namespace", namespaceName), zap.Int("namespace_id", namespaceID), zap.String("provisioned_by", provisionedBy), ) // Select 3 nodes for the cluster nodes, err := cm.nodeSelector.SelectNodesForCluster(ctx, 3) if err != nil { cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusFailed, err.Error()) cm.logger.Error("Failed to select nodes for cluster", zap.Error(err)) return } nodeIDs := make([]string, len(nodes)) for i, n := range nodes { nodeIDs[i] = n.NodeID } cm.logEvent(ctx, cluster.ID, EventNodesSelected, "", "Selected nodes for cluster", map[string]interface{}{"nodes": nodeIDs}) // Allocate ports on each node portBlocks := make([]*PortBlock, len(nodes)) for i, node := range nodes { block, err := cm.portAllocator.AllocatePortBlock(ctx, node.NodeID, cluster.ID) if err != nil { // Rollback previous allocations for j := 0; j < i; j++ { cm.portAllocator.DeallocatePortBlock(ctx, cluster.ID, nodes[j].NodeID) } cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusFailed, err.Error()) cm.logger.Error("Failed to allocate ports", zap.Error(err)) return } portBlocks[i] = block cm.logEvent(ctx, cluster.ID, EventPortsAllocated, node.NodeID, fmt.Sprintf("Allocated ports %d-%d", block.PortStart, block.PortEnd), nil) } // Start RQLite instances (leader first, then followers) rqliteInstances, err := cm.startRQLiteCluster(ctx, cluster, nodes, portBlocks) if err != nil { cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, nil, nil) cm.logger.Error("Failed to start RQLite cluster", zap.Error(err)) return } // Start Olric instances olricInstances, err := cm.startOlricCluster(ctx, cluster, nodes, portBlocks) if err != nil { cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, rqliteInstances, nil) cm.logger.Error("Failed to start Olric cluster", zap.Error(err)) return } // Start Gateway instances (optional - may not be available in dev mode) _, err = cm.startGatewayCluster(ctx, cluster, nodes, portBlocks, rqliteInstances, olricInstances) if err != nil { // Check if this is a "binary not found" error - if so, continue without gateways if strings.Contains(err.Error(), "gateway binary not found") { cm.logger.Warn("Skipping namespace gateway spawning (binary not available)", zap.String("namespace", cluster.NamespaceName), zap.Error(err), ) cm.logEvent(ctx, cluster.ID, "gateway_skipped", "", "Gateway binary not available, cluster will use main gateway", nil) } else { cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, rqliteInstances, olricInstances) cm.logger.Error("Failed to start Gateway cluster", zap.Error(err)) return } } // Create DNS records for namespace gateway if err := cm.createDNSRecords(ctx, cluster, nodes, portBlocks); err != nil { cm.logger.Warn("Failed to create DNS records", zap.Error(err)) // Don't fail provisioning for DNS errors } // Update cluster status to ready now := time.Now() cluster.Status = ClusterStatusReady cluster.ReadyAt = &now cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusReady, "") cm.logEvent(ctx, cluster.ID, EventClusterReady, "", "Cluster is ready", nil) cm.logger.Info("Cluster provisioning completed", zap.String("cluster_id", cluster.ID), zap.String("namespace", namespaceName), ) } // RestoreLocalClusters restores namespace cluster processes that should be running on this node. // Called on node startup to re-spawn RQLite, Olric, and Gateway processes for clusters // that were previously provisioned and assigned to this node. func (cm *ClusterManager) RestoreLocalClusters(ctx context.Context) error { if cm.localNodeID == "" { return fmt.Errorf("local node ID not set") } cm.logger.Info("Checking for namespace clusters to restore", zap.String("local_node_id", cm.localNodeID)) // Find all ready clusters that have this node assigned type clusterNodeInfo struct { ClusterID string `db:"namespace_cluster_id"` NamespaceName string `db:"namespace_name"` NodeID string `db:"node_id"` Role string `db:"role"` } var assignments []clusterNodeInfo query := ` SELECT DISTINCT cn.namespace_cluster_id, c.namespace_name, cn.node_id, cn.role FROM namespace_cluster_nodes cn JOIN namespace_clusters c ON cn.namespace_cluster_id = c.id WHERE cn.node_id = ? AND c.status = 'ready' ` if err := cm.db.Query(ctx, &assignments, query, cm.localNodeID); err != nil { return fmt.Errorf("failed to query local cluster assignments: %w", err) } if len(assignments) == 0 { cm.logger.Info("No namespace clusters to restore on this node") return nil } // Group by cluster clusterNamespaces := make(map[string]string) // clusterID -> namespaceName for _, a := range assignments { clusterNamespaces[a.ClusterID] = a.NamespaceName } cm.logger.Info("Found namespace clusters to restore", zap.Int("count", len(clusterNamespaces)), zap.String("local_node_id", cm.localNodeID), ) // Get local node's WireGuard IP type nodeIPInfo struct { InternalIP string `db:"internal_ip"` } var localNodeInfo []nodeIPInfo ipQuery := `SELECT COALESCE(internal_ip, ip_address) as internal_ip FROM dns_nodes WHERE id = ? LIMIT 1` if err := cm.db.Query(ctx, &localNodeInfo, ipQuery, cm.localNodeID); err != nil || len(localNodeInfo) == 0 { cm.logger.Warn("Could not determine local node IP, skipping restore", zap.Error(err)) return fmt.Errorf("failed to get local node IP: %w", err) } localIP := localNodeInfo[0].InternalIP for clusterID, namespaceName := range clusterNamespaces { if err := cm.restoreClusterOnNode(ctx, clusterID, namespaceName, localIP); err != nil { cm.logger.Error("Failed to restore namespace cluster", zap.String("namespace", namespaceName), zap.String("cluster_id", clusterID), zap.Error(err), ) // Continue restoring other clusters } } return nil } // restoreClusterOnNode restores all processes for a single cluster on this node func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, namespaceName, localIP string) error { cm.logger.Info("Restoring namespace cluster processes", zap.String("namespace", namespaceName), zap.String("cluster_id", clusterID), ) // Get port allocation for this node var portBlocks []PortBlock portQuery := `SELECT * FROM namespace_port_allocations WHERE namespace_cluster_id = ? AND node_id = ?` if err := cm.db.Query(ctx, &portBlocks, portQuery, clusterID, cm.localNodeID); err != nil || len(portBlocks) == 0 { return fmt.Errorf("no port allocation found for cluster %s on node %s", clusterID, cm.localNodeID) } pb := &portBlocks[0] // Get all nodes in this cluster (for join addresses and peer addresses) allNodes, err := cm.getClusterNodes(ctx, clusterID) if err != nil { return fmt.Errorf("failed to get cluster nodes: %w", err) } // Get all nodes' IPs and port allocations type nodePortInfo struct { NodeID string `db:"node_id"` InternalIP string `db:"internal_ip"` RQLiteHTTPPort int `db:"rqlite_http_port"` RQLiteRaftPort int `db:"rqlite_raft_port"` OlricHTTPPort int `db:"olric_http_port"` OlricMemberlistPort int `db:"olric_memberlist_port"` } var allNodePorts []nodePortInfo allPortsQuery := ` SELECT pa.node_id, COALESCE(dn.internal_ip, dn.ip_address) as internal_ip, pa.rqlite_http_port, pa.rqlite_raft_port, pa.olric_http_port, pa.olric_memberlist_port FROM namespace_port_allocations pa JOIN dns_nodes dn ON pa.node_id = dn.id WHERE pa.namespace_cluster_id = ? ` if err := cm.db.Query(ctx, &allNodePorts, allPortsQuery, clusterID); err != nil { return fmt.Errorf("failed to get all node ports: %w", err) } // 1. Restore RQLite // Check if RQLite systemd service is already running rqliteRunning, _ := cm.systemdSpawner.systemdMgr.IsServiceActive(namespaceName, systemd.ServiceTypeRQLite) if !rqliteRunning { // Check if RQLite data directory exists (has existing data) dataDir := filepath.Join(cm.baseDataDir, namespaceName, "rqlite", cm.localNodeID) hasExistingData := false if _, err := os.Stat(filepath.Join(dataDir, "raft")); err == nil { hasExistingData = true } if hasExistingData { // Write peers.json for Raft cluster recovery (official RQLite mechanism). // When all nodes restart simultaneously, Raft can't form quorum from stale state. // peers.json tells rqlited the correct voter list so it can hold a fresh election. var peers []rqlite.RaftPeer for _, np := range allNodePorts { raftAddr := fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort) peers = append(peers, rqlite.RaftPeer{ ID: raftAddr, Address: raftAddr, NonVoter: false, }) } if err := cm.writePeersJSON(dataDir, peers); err != nil { cm.logger.Error("Failed to write peers.json", zap.String("namespace", namespaceName), zap.Error(err)) } } // Build join addresses for first-time joins (no existing data) var joinAddrs []string isLeader := false if !hasExistingData { // Deterministic leader selection: sort all node IDs and pick the first one. // Every node independently computes the same result — no coordination needed. // The elected leader bootstraps the cluster; followers use -join with retries // to wait for the leader to become ready (up to 5 minutes). sortedNodeIDs := make([]string, 0, len(allNodePorts)) for _, np := range allNodePorts { sortedNodeIDs = append(sortedNodeIDs, np.NodeID) } sort.Strings(sortedNodeIDs) electedLeaderID := sortedNodeIDs[0] if cm.localNodeID == electedLeaderID { isLeader = true cm.logger.Info("Deterministic leader election: this node is the leader", zap.String("namespace", namespaceName), zap.String("node_id", cm.localNodeID)) } else { // Follower: join the elected leader's raft address for _, np := range allNodePorts { if np.NodeID == electedLeaderID { joinAddrs = append(joinAddrs, fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort)) break } } cm.logger.Info("Deterministic leader election: this node is a follower", zap.String("namespace", namespaceName), zap.String("node_id", cm.localNodeID), zap.String("leader_id", electedLeaderID), zap.Strings("join_addrs", joinAddrs)) } } rqliteCfg := rqlite.InstanceConfig{ Namespace: namespaceName, NodeID: cm.localNodeID, HTTPPort: pb.RQLiteHTTPPort, RaftPort: pb.RQLiteRaftPort, HTTPAdvAddress: fmt.Sprintf("%s:%d", localIP, pb.RQLiteHTTPPort), RaftAdvAddress: fmt.Sprintf("%s:%d", localIP, pb.RQLiteRaftPort), JoinAddresses: joinAddrs, IsLeader: isLeader, } if err := cm.spawnRQLiteWithSystemd(ctx, rqliteCfg); err != nil { cm.logger.Error("Failed to restore RQLite", zap.String("namespace", namespaceName), zap.Error(err)) } else { cm.logger.Info("Restored RQLite instance", zap.String("namespace", namespaceName), zap.Int("port", pb.RQLiteHTTPPort)) } } else { cm.logger.Info("RQLite already running", zap.String("namespace", namespaceName), zap.Int("port", pb.RQLiteHTTPPort)) } // 2. Restore Olric olricRunning := false conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", pb.OlricMemberlistPort), 2*time.Second) if err == nil { conn.Close() olricRunning = true } if !olricRunning { var peers []string for _, np := range allNodePorts { if np.NodeID != cm.localNodeID { peers = append(peers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricMemberlistPort)) } } olricCfg := olric.InstanceConfig{ Namespace: namespaceName, NodeID: cm.localNodeID, HTTPPort: pb.OlricHTTPPort, MemberlistPort: pb.OlricMemberlistPort, BindAddr: localIP, AdvertiseAddr: localIP, PeerAddresses: peers, } if err := cm.spawnOlricWithSystemd(ctx, olricCfg); err != nil { cm.logger.Error("Failed to restore Olric", zap.String("namespace", namespaceName), zap.Error(err)) } else { cm.logger.Info("Restored Olric instance", zap.String("namespace", namespaceName), zap.Int("port", pb.OlricHTTPPort)) } } else { cm.logger.Info("Olric already running", zap.String("namespace", namespaceName), zap.Int("port", pb.OlricMemberlistPort)) } // 3. Restore Gateway // Check if any cluster node has the gateway role (gateway may have been skipped during provisioning) hasGateway := false for _, node := range allNodes { if node.Role == NodeRoleGateway { hasGateway = true break } } if hasGateway { gwRunning := false resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/health", pb.GatewayHTTPPort)) if err == nil { resp.Body.Close() gwRunning = true } if !gwRunning { // Build olric server addresses — always use WireGuard IPs (Olric binds to WireGuard interface) var olricServers []string for _, np := range allNodePorts { olricServers = append(olricServers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricHTTPPort)) } gwCfg := gateway.InstanceConfig{ Namespace: namespaceName, NodeID: cm.localNodeID, HTTPPort: pb.GatewayHTTPPort, BaseDomain: cm.baseDomain, RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort), GlobalRQLiteDSN: cm.globalRQLiteDSN, OlricServers: olricServers, OlricTimeout: 30 * time.Second, IPFSClusterAPIURL: cm.ipfsClusterAPIURL, IPFSAPIURL: cm.ipfsAPIURL, IPFSTimeout: cm.ipfsTimeout, IPFSReplicationFactor: cm.ipfsReplicationFactor, } if err := cm.spawnGatewayWithSystemd(ctx, gwCfg); err != nil { cm.logger.Error("Failed to restore Gateway", zap.String("namespace", namespaceName), zap.Error(err)) } else { cm.logger.Info("Restored Gateway instance", zap.String("namespace", namespaceName), zap.Int("port", pb.GatewayHTTPPort)) } } else { cm.logger.Info("Gateway already running", zap.String("namespace", namespaceName), zap.Int("port", pb.GatewayHTTPPort)) } } // Save local state to disk for future restarts without DB dependency var stateNodes []ClusterLocalStateNode for _, np := range allNodePorts { stateNodes = append(stateNodes, ClusterLocalStateNode{ NodeID: np.NodeID, InternalIP: np.InternalIP, RQLiteHTTPPort: np.RQLiteHTTPPort, RQLiteRaftPort: np.RQLiteRaftPort, OlricHTTPPort: np.OlricHTTPPort, OlricMemberlistPort: np.OlricMemberlistPort, }) } localState := &ClusterLocalState{ ClusterID: clusterID, NamespaceName: namespaceName, LocalNodeID: cm.localNodeID, LocalIP: localIP, LocalPorts: ClusterLocalStatePorts{ RQLiteHTTPPort: pb.RQLiteHTTPPort, RQLiteRaftPort: pb.RQLiteRaftPort, OlricHTTPPort: pb.OlricHTTPPort, OlricMemberlistPort: pb.OlricMemberlistPort, GatewayHTTPPort: pb.GatewayHTTPPort, }, AllNodes: stateNodes, HasGateway: hasGateway, BaseDomain: cm.baseDomain, SavedAt: time.Now(), } if err := cm.saveLocalState(localState); err != nil { cm.logger.Warn("Failed to save cluster local state", zap.String("namespace", namespaceName), zap.Error(err)) } return nil } // ClusterLocalState is persisted to disk so namespace processes can be restored // without querying the main RQLite cluster (which may not have a leader yet on cold start). type ClusterLocalState struct { ClusterID string `json:"cluster_id"` NamespaceName string `json:"namespace_name"` LocalNodeID string `json:"local_node_id"` LocalIP string `json:"local_ip"` LocalPorts ClusterLocalStatePorts `json:"local_ports"` AllNodes []ClusterLocalStateNode `json:"all_nodes"` HasGateway bool `json:"has_gateway"` BaseDomain string `json:"base_domain"` SavedAt time.Time `json:"saved_at"` } type ClusterLocalStatePorts struct { RQLiteHTTPPort int `json:"rqlite_http_port"` RQLiteRaftPort int `json:"rqlite_raft_port"` OlricHTTPPort int `json:"olric_http_port"` OlricMemberlistPort int `json:"olric_memberlist_port"` GatewayHTTPPort int `json:"gateway_http_port"` } type ClusterLocalStateNode struct { NodeID string `json:"node_id"` InternalIP string `json:"internal_ip"` RQLiteHTTPPort int `json:"rqlite_http_port"` RQLiteRaftPort int `json:"rqlite_raft_port"` OlricHTTPPort int `json:"olric_http_port"` OlricMemberlistPort int `json:"olric_memberlist_port"` } // saveLocalState writes cluster state to disk for fast recovery without DB queries. func (cm *ClusterManager) saveLocalState(state *ClusterLocalState) error { dir := filepath.Join(cm.baseDataDir, state.NamespaceName) if err := os.MkdirAll(dir, 0755); err != nil { return fmt.Errorf("failed to create state dir: %w", err) } data, err := json.MarshalIndent(state, "", " ") if err != nil { return fmt.Errorf("failed to marshal state: %w", err) } path := filepath.Join(dir, "cluster-state.json") if err := os.WriteFile(path, data, 0644); err != nil { return fmt.Errorf("failed to write state file: %w", err) } cm.logger.Info("Saved cluster local state", zap.String("namespace", state.NamespaceName), zap.String("path", path)) return nil } // loadLocalState reads cluster state from disk. func loadLocalState(path string) (*ClusterLocalState, error) { data, err := os.ReadFile(path) if err != nil { return nil, err } var state ClusterLocalState if err := json.Unmarshal(data, &state); err != nil { return nil, fmt.Errorf("failed to parse state file: %w", err) } return &state, nil } // RestoreLocalClustersFromDisk restores namespace processes using local state files, // avoiding any dependency on the main RQLite cluster being available. // Returns the number of namespaces restored, or -1 if no state files were found. func (cm *ClusterManager) RestoreLocalClustersFromDisk(ctx context.Context) (int, error) { pattern := filepath.Join(cm.baseDataDir, "*", "cluster-state.json") matches, err := filepath.Glob(pattern) if err != nil { return -1, fmt.Errorf("failed to glob state files: %w", err) } if len(matches) == 0 { return -1, nil } cm.logger.Info("Found local cluster state files", zap.Int("count", len(matches))) restored := 0 for _, path := range matches { state, err := loadLocalState(path) if err != nil { cm.logger.Error("Failed to load cluster state file", zap.String("path", path), zap.Error(err)) continue } if err := cm.restoreClusterFromState(ctx, state); err != nil { cm.logger.Error("Failed to restore cluster from state", zap.String("namespace", state.NamespaceName), zap.Error(err)) continue } restored++ } return restored, nil } // restoreClusterFromState restores all processes for a cluster using local state (no DB queries). func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *ClusterLocalState) error { cm.logger.Info("Restoring namespace cluster from local state", zap.String("namespace", state.NamespaceName), zap.String("cluster_id", state.ClusterID), ) pb := &state.LocalPorts localIP := state.LocalIP // 1. Restore RQLite // Check if RQLite systemd service is already running rqliteRunning, _ := cm.systemdSpawner.systemdMgr.IsServiceActive(state.NamespaceName, systemd.ServiceTypeRQLite) if !rqliteRunning { // Check if RQLite data directory exists (has existing data) dataDir := filepath.Join(cm.baseDataDir, state.NamespaceName, "rqlite", cm.localNodeID) hasExistingData := false if _, err := os.Stat(filepath.Join(dataDir, "raft")); err == nil { hasExistingData = true } if hasExistingData { var peers []rqlite.RaftPeer for _, np := range state.AllNodes { raftAddr := fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort) peers = append(peers, rqlite.RaftPeer{ID: raftAddr, Address: raftAddr, NonVoter: false}) } if err := cm.writePeersJSON(dataDir, peers); err != nil { cm.logger.Error("Failed to write peers.json", zap.String("namespace", state.NamespaceName), zap.Error(err)) } } var joinAddrs []string isLeader := false if !hasExistingData { sortedNodeIDs := make([]string, 0, len(state.AllNodes)) for _, np := range state.AllNodes { sortedNodeIDs = append(sortedNodeIDs, np.NodeID) } sort.Strings(sortedNodeIDs) electedLeaderID := sortedNodeIDs[0] if cm.localNodeID == electedLeaderID { isLeader = true } else { for _, np := range state.AllNodes { if np.NodeID == electedLeaderID { joinAddrs = append(joinAddrs, fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort)) break } } } } rqliteCfg := rqlite.InstanceConfig{ Namespace: state.NamespaceName, NodeID: cm.localNodeID, HTTPPort: pb.RQLiteHTTPPort, RaftPort: pb.RQLiteRaftPort, HTTPAdvAddress: fmt.Sprintf("%s:%d", localIP, pb.RQLiteHTTPPort), RaftAdvAddress: fmt.Sprintf("%s:%d", localIP, pb.RQLiteRaftPort), JoinAddresses: joinAddrs, IsLeader: isLeader, } if err := cm.spawnRQLiteWithSystemd(ctx, rqliteCfg); err != nil { cm.logger.Error("Failed to restore RQLite from state", zap.String("namespace", state.NamespaceName), zap.Error(err)) } else { cm.logger.Info("Restored RQLite instance from state", zap.String("namespace", state.NamespaceName)) } } // 2. Restore Olric conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", pb.OlricMemberlistPort), 2*time.Second) if err == nil { conn.Close() } else { var peers []string for _, np := range state.AllNodes { if np.NodeID != cm.localNodeID { peers = append(peers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricMemberlistPort)) } } olricCfg := olric.InstanceConfig{ Namespace: state.NamespaceName, NodeID: cm.localNodeID, HTTPPort: pb.OlricHTTPPort, MemberlistPort: pb.OlricMemberlistPort, BindAddr: localIP, AdvertiseAddr: localIP, PeerAddresses: peers, } if err := cm.spawnOlricWithSystemd(ctx, olricCfg); err != nil { cm.logger.Error("Failed to restore Olric from state", zap.String("namespace", state.NamespaceName), zap.Error(err)) } else { cm.logger.Info("Restored Olric instance from state", zap.String("namespace", state.NamespaceName)) } } // 3. Restore Gateway if state.HasGateway { resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/health", pb.GatewayHTTPPort)) if err == nil { resp.Body.Close() } else { // Build olric server addresses — always use WireGuard IPs (Olric binds to WireGuard interface) var olricServers []string for _, np := range state.AllNodes { olricServers = append(olricServers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricHTTPPort)) } gwCfg := gateway.InstanceConfig{ Namespace: state.NamespaceName, NodeID: cm.localNodeID, HTTPPort: pb.GatewayHTTPPort, BaseDomain: state.BaseDomain, RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort), GlobalRQLiteDSN: cm.globalRQLiteDSN, OlricServers: olricServers, OlricTimeout: 30 * time.Second, IPFSClusterAPIURL: cm.ipfsClusterAPIURL, IPFSAPIURL: cm.ipfsAPIURL, IPFSTimeout: cm.ipfsTimeout, IPFSReplicationFactor: cm.ipfsReplicationFactor, } if err := cm.spawnGatewayWithSystemd(ctx, gwCfg); err != nil { cm.logger.Error("Failed to restore Gateway from state", zap.String("namespace", state.NamespaceName), zap.Error(err)) } else { cm.logger.Info("Restored Gateway instance from state", zap.String("namespace", state.NamespaceName)) } } } return nil } // GetClusterStatusByID returns the full status of a cluster by ID. // This method is part of the ClusterProvisioner interface used by the gateway. // It returns a generic struct that matches the interface definition in auth/handlers.go. func (cm *ClusterManager) GetClusterStatusByID(ctx context.Context, clusterID string) (interface{}, error) { status, err := cm.GetClusterStatus(ctx, clusterID) if err != nil { return nil, err } // Return as a map to avoid import cycles with the interface type return map[string]interface{}{ "cluster_id": status.ClusterID, "namespace": status.Namespace, "status": string(status.Status), "nodes": status.Nodes, "rqlite_ready": status.RQLiteReady, "olric_ready": status.OlricReady, "gateway_ready": status.GatewayReady, "dns_ready": status.DNSReady, "error": status.Error, }, nil }