From 11d5c1b19abdbd3280b7d350080bb10e90214f1e Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Mon, 2 Feb 2026 16:18:13 +0200 Subject: [PATCH] Bug fixing --- pkg/gateway/middleware.go | 42 ++--- pkg/namespace/cluster_manager.go | 253 +++++++++++++++++++++++++++++++ pkg/node/gateway.go | 32 +++- 3 files changed, 294 insertions(+), 33 deletions(-) diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index ddcf142..c256c61 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -589,24 +589,23 @@ func (g *Gateway) domainRoutingMiddleware(next http.Handler) http.Handler { // handleNamespaceGatewayRequest proxies requests to a namespace's dedicated gateway cluster // This enables physical isolation where each namespace has its own RQLite, Olric, and Gateway func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.Request, namespaceName string) { - // Look up namespace cluster gateway IPs from DNS records + // Look up namespace cluster gateway using internal (WireGuard) IPs for inter-node proxying db := g.client.Database() internalCtx := client.WithInternalAuth(r.Context()) - baseDomain := "dbrs.space" - if g.cfg != nil && g.cfg.BaseDomain != "" { - baseDomain = g.cfg.BaseDomain - } - - // Query DNS records for the namespace gateway - fqdn := "ns-" + namespaceName + "." + baseDomain + "." - query := `SELECT value FROM dns_records WHERE fqdn = ? AND record_type = 'A' AND is_active = TRUE ORDER BY RANDOM() LIMIT 1` - result, err := db.Query(internalCtx, query, fqdn) + // Single query: get internal IP + gateway port from cluster tables + query := ` + SELECT COALESCE(dn.internal_ip, dn.ip_address), npa.gateway_http_port + FROM namespace_port_allocations npa + JOIN namespace_clusters nc ON npa.namespace_cluster_id = nc.id + JOIN dns_nodes dn ON npa.node_id = dn.id + WHERE nc.namespace_name = ? AND nc.status = 'ready' + ORDER BY RANDOM() LIMIT 1 + ` + result, err := db.Query(internalCtx, query, namespaceName) if err != nil || result == nil || len(result.Rows) == 0 { - // No gateway found for this namespace g.logger.ComponentWarn(logging.ComponentGeneral, "namespace gateway not found", zap.String("namespace", namespaceName), - zap.String("fqdn", fqdn), ) http.Error(w, "Namespace gateway not found", http.StatusNotFound) return @@ -617,22 +616,9 @@ func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.R http.Error(w, "Namespace gateway not available", http.StatusServiceUnavailable) return } - - // Get the gateway port from namespace_port_allocations - // Gateway HTTP port is port_start + 4 - portQuery := ` - SELECT npa.gateway_http_port - FROM namespace_port_allocations npa - JOIN namespace_clusters nc ON npa.namespace_cluster_id = nc.id - WHERE nc.namespace_name = ? - LIMIT 1 - ` - portResult, err := db.Query(internalCtx, portQuery, namespaceName) - gatewayPort := 10004 // Default to first namespace's gateway port - if err == nil && portResult != nil && len(portResult.Rows) > 0 { - if p := getInt(portResult.Rows[0][0]); p > 0 { - gatewayPort = p - } + gatewayPort := 10004 + if p := getInt(result.Rows[0][1]); p > 0 { + gatewayPort = p } // Proxy request to the namespace gateway diff --git a/pkg/namespace/cluster_manager.go b/pkg/namespace/cluster_manager.go index 3778155..0c0e15a 100644 --- a/pkg/namespace/cluster_manager.go +++ b/pkg/namespace/cluster_manager.go @@ -8,6 +8,8 @@ import ( "io" "net" "net/http" + "os" + "path/filepath" "sort" "strings" "sync" @@ -1299,6 +1301,257 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n } } + // Save local state to disk for future restarts without DB dependency + var stateNodes []ClusterLocalStateNode + for _, np := range allNodePorts { + stateNodes = append(stateNodes, ClusterLocalStateNode{ + NodeID: np.NodeID, + InternalIP: np.InternalIP, + RQLiteHTTPPort: np.RQLiteHTTPPort, + RQLiteRaftPort: np.RQLiteRaftPort, + OlricHTTPPort: np.OlricHTTPPort, + OlricMemberlistPort: np.OlricMemberlistPort, + }) + } + localState := &ClusterLocalState{ + ClusterID: clusterID, + NamespaceName: namespaceName, + LocalNodeID: cm.localNodeID, + LocalIP: localIP, + LocalPorts: ClusterLocalStatePorts{ + RQLiteHTTPPort: pb.RQLiteHTTPPort, + RQLiteRaftPort: pb.RQLiteRaftPort, + OlricHTTPPort: pb.OlricHTTPPort, + OlricMemberlistPort: pb.OlricMemberlistPort, + GatewayHTTPPort: pb.GatewayHTTPPort, + }, + AllNodes: stateNodes, + HasGateway: hasGateway, + BaseDomain: cm.baseDomain, + SavedAt: time.Now(), + } + if err := cm.saveLocalState(localState); err != nil { + cm.logger.Warn("Failed to save cluster local state", zap.String("namespace", namespaceName), zap.Error(err)) + } + + return nil +} + +// ClusterLocalState is persisted to disk so namespace processes can be restored +// without querying the main RQLite cluster (which may not have a leader yet on cold start). +type ClusterLocalState struct { + ClusterID string `json:"cluster_id"` + NamespaceName string `json:"namespace_name"` + LocalNodeID string `json:"local_node_id"` + LocalIP string `json:"local_ip"` + LocalPorts ClusterLocalStatePorts `json:"local_ports"` + AllNodes []ClusterLocalStateNode `json:"all_nodes"` + HasGateway bool `json:"has_gateway"` + BaseDomain string `json:"base_domain"` + SavedAt time.Time `json:"saved_at"` +} + +type ClusterLocalStatePorts struct { + RQLiteHTTPPort int `json:"rqlite_http_port"` + RQLiteRaftPort int `json:"rqlite_raft_port"` + OlricHTTPPort int `json:"olric_http_port"` + OlricMemberlistPort int `json:"olric_memberlist_port"` + GatewayHTTPPort int `json:"gateway_http_port"` +} + +type ClusterLocalStateNode struct { + NodeID string `json:"node_id"` + InternalIP string `json:"internal_ip"` + RQLiteHTTPPort int `json:"rqlite_http_port"` + RQLiteRaftPort int `json:"rqlite_raft_port"` + OlricHTTPPort int `json:"olric_http_port"` + OlricMemberlistPort int `json:"olric_memberlist_port"` +} + +// saveLocalState writes cluster state to disk for fast recovery without DB queries. +func (cm *ClusterManager) saveLocalState(state *ClusterLocalState) error { + dir := filepath.Join(cm.baseDataDir, state.NamespaceName) + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("failed to create state dir: %w", err) + } + data, err := json.MarshalIndent(state, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal state: %w", err) + } + path := filepath.Join(dir, "cluster-state.json") + if err := os.WriteFile(path, data, 0644); err != nil { + return fmt.Errorf("failed to write state file: %w", err) + } + cm.logger.Info("Saved cluster local state", zap.String("namespace", state.NamespaceName), zap.String("path", path)) + return nil +} + +// loadLocalState reads cluster state from disk. +func loadLocalState(path string) (*ClusterLocalState, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + var state ClusterLocalState + if err := json.Unmarshal(data, &state); err != nil { + return nil, fmt.Errorf("failed to parse state file: %w", err) + } + return &state, nil +} + +// RestoreLocalClustersFromDisk restores namespace processes using local state files, +// avoiding any dependency on the main RQLite cluster being available. +// Returns the number of namespaces restored, or -1 if no state files were found. +func (cm *ClusterManager) RestoreLocalClustersFromDisk(ctx context.Context) (int, error) { + pattern := filepath.Join(cm.baseDataDir, "*", "cluster-state.json") + matches, err := filepath.Glob(pattern) + if err != nil { + return -1, fmt.Errorf("failed to glob state files: %w", err) + } + if len(matches) == 0 { + return -1, nil + } + + cm.logger.Info("Found local cluster state files", zap.Int("count", len(matches))) + + restored := 0 + for _, path := range matches { + state, err := loadLocalState(path) + if err != nil { + cm.logger.Error("Failed to load cluster state file", zap.String("path", path), zap.Error(err)) + continue + } + if err := cm.restoreClusterFromState(ctx, state); err != nil { + cm.logger.Error("Failed to restore cluster from state", zap.String("namespace", state.NamespaceName), zap.Error(err)) + continue + } + restored++ + } + return restored, nil +} + +// restoreClusterFromState restores all processes for a cluster using local state (no DB queries). +func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *ClusterLocalState) error { + cm.logger.Info("Restoring namespace cluster from local state", + zap.String("namespace", state.NamespaceName), + zap.String("cluster_id", state.ClusterID), + ) + + pb := &state.LocalPorts + localIP := state.LocalIP + + // 1. Restore RQLite + if !cm.rqliteSpawner.IsInstanceRunning(pb.RQLiteHTTPPort) { + hasExistingData := cm.rqliteSpawner.HasExistingData(state.NamespaceName, cm.localNodeID) + + if hasExistingData { + var peers []rqlite.RaftPeer + for _, np := range state.AllNodes { + raftAddr := fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort) + peers = append(peers, rqlite.RaftPeer{ID: raftAddr, Address: raftAddr, NonVoter: false}) + } + dataDir := cm.rqliteSpawner.GetDataDir(state.NamespaceName, cm.localNodeID) + if err := cm.rqliteSpawner.WritePeersJSON(dataDir, peers); err != nil { + cm.logger.Error("Failed to write peers.json", zap.String("namespace", state.NamespaceName), zap.Error(err)) + } + } + + var joinAddrs []string + isLeader := false + if !hasExistingData { + sortedNodeIDs := make([]string, 0, len(state.AllNodes)) + for _, np := range state.AllNodes { + sortedNodeIDs = append(sortedNodeIDs, np.NodeID) + } + sort.Strings(sortedNodeIDs) + electedLeaderID := sortedNodeIDs[0] + + if cm.localNodeID == electedLeaderID { + isLeader = true + } else { + for _, np := range state.AllNodes { + if np.NodeID == electedLeaderID { + joinAddrs = append(joinAddrs, fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort)) + break + } + } + } + } + + rqliteCfg := rqlite.InstanceConfig{ + Namespace: state.NamespaceName, + NodeID: cm.localNodeID, + HTTPPort: pb.RQLiteHTTPPort, + RaftPort: pb.RQLiteRaftPort, + HTTPAdvAddress: fmt.Sprintf("%s:%d", localIP, pb.RQLiteHTTPPort), + RaftAdvAddress: fmt.Sprintf("%s:%d", localIP, pb.RQLiteRaftPort), + JoinAddresses: joinAddrs, + IsLeader: isLeader, + } + if _, err := cm.rqliteSpawner.SpawnInstance(ctx, rqliteCfg); err != nil { + cm.logger.Error("Failed to restore RQLite from state", zap.String("namespace", state.NamespaceName), zap.Error(err)) + } else { + cm.logger.Info("Restored RQLite instance from state", zap.String("namespace", state.NamespaceName)) + } + } + + // 2. Restore Olric + conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", pb.OlricMemberlistPort), 2*time.Second) + if err == nil { + conn.Close() + } else { + var peers []string + for _, np := range state.AllNodes { + if np.NodeID != cm.localNodeID { + peers = append(peers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricMemberlistPort)) + } + } + olricCfg := olric.InstanceConfig{ + Namespace: state.NamespaceName, + NodeID: cm.localNodeID, + HTTPPort: pb.OlricHTTPPort, + MemberlistPort: pb.OlricMemberlistPort, + BindAddr: localIP, + AdvertiseAddr: localIP, + PeerAddresses: peers, + } + if _, err := cm.olricSpawner.SpawnInstance(ctx, olricCfg); err != nil { + cm.logger.Error("Failed to restore Olric from state", zap.String("namespace", state.NamespaceName), zap.Error(err)) + } else { + cm.logger.Info("Restored Olric instance from state", zap.String("namespace", state.NamespaceName)) + } + } + + // 3. Restore Gateway + if state.HasGateway { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/health", pb.GatewayHTTPPort)) + if err == nil { + resp.Body.Close() + } else { + var olricServers []string + for _, np := range state.AllNodes { + if np.NodeID == cm.localNodeID { + olricServers = append(olricServers, fmt.Sprintf("localhost:%d", np.OlricHTTPPort)) + } else { + olricServers = append(olricServers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricHTTPPort)) + } + } + gwCfg := gateway.InstanceConfig{ + Namespace: state.NamespaceName, + NodeID: cm.localNodeID, + HTTPPort: pb.GatewayHTTPPort, + BaseDomain: state.BaseDomain, + RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort), + OlricServers: olricServers, + } + if _, err := cm.gatewaySpawner.SpawnInstance(ctx, gwCfg); err != nil { + cm.logger.Error("Failed to restore Gateway from state", zap.String("namespace", state.NamespaceName), zap.Error(err)) + } else { + cm.logger.Info("Restored Gateway instance from state", zap.String("namespace", state.NamespaceName)) + } + } + } + return nil } diff --git a/pkg/node/gateway.go b/pkg/node/gateway.go index 0cb3b31..18d18f3 100644 --- a/pkg/node/gateway.go +++ b/pkg/node/gateway.go @@ -93,13 +93,35 @@ func (n *Node) startHTTPGateway(ctx context.Context) error { zap.String("base_domain", clusterCfg.BaseDomain), zap.String("base_data_dir", baseDataDir)) - // Restore previously-running namespace cluster processes in background + // Restore previously-running namespace cluster processes in background. + // First try local state files (no DB dependency), then fall back to DB query with retries. go func() { - // Wait for main RQLite to be ready before querying cluster assignments - time.Sleep(10 * time.Second) - if err := clusterManager.RestoreLocalClusters(ctx); err != nil { - n.logger.ComponentError(logging.ComponentNode, "Failed to restore namespace clusters", zap.Error(err)) + time.Sleep(5 * time.Second) + + // Try disk-based restore first (instant, no DB needed) + restored, err := clusterManager.RestoreLocalClustersFromDisk(ctx) + if err != nil { + n.logger.ComponentWarn(logging.ComponentNode, "Disk-based namespace restore failed", zap.Error(err)) } + if restored > 0 { + n.logger.ComponentInfo(logging.ComponentNode, "Restored namespace clusters from local state", + zap.Int("count", restored)) + return + } + + // No state files found — fall back to DB query with retries + n.logger.ComponentInfo(logging.ComponentNode, "No local state files, falling back to DB restore") + time.Sleep(5 * time.Second) + for attempt := 1; attempt <= 12; attempt++ { + if err := clusterManager.RestoreLocalClusters(ctx); err == nil { + return + } else { + n.logger.ComponentWarn(logging.ComponentNode, "Namespace cluster restore failed, retrying", + zap.Int("attempt", attempt), zap.Error(err)) + } + time.Sleep(10 * time.Second) + } + n.logger.ComponentError(logging.ComponentNode, "Failed to restore namespace clusters after all retries") }() }