From 859c30fcd9cdf819b52b53a08d43c61321d91292 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Mon, 2 Feb 2026 14:55:29 +0200 Subject: [PATCH] Bug fixing --- pkg/gateway/middleware.go | 17 +- pkg/namespace/cluster_manager.go | 281 ++++++++++++++++++++++++++++++- pkg/node/gateway.go | 10 ++ pkg/rqlite/instance_spawner.go | 51 +++++- 4 files changed, 349 insertions(+), 10 deletions(-) diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index a71846a..ddcf142 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -534,24 +534,25 @@ func (g *Gateway) domainRoutingMiddleware(next http.Handler) http.Handler { return } - // Skip API paths (they should use JWT/API key auth) - if strings.HasPrefix(r.URL.Path, "/v1/") || strings.HasPrefix(r.URL.Path, "/.well-known/") { - next.ServeHTTP(w, r) - return - } - - // Check for namespace gateway domain: ns-{namespace}.{baseDomain} + // Check for namespace gateway domain FIRST (before API path skip) + // Namespace subdomains (ns-{name}.{baseDomain}) must be proxied to namespace gateways + // regardless of path — including /v1/ paths suffix := "." + baseDomain if strings.HasSuffix(host, suffix) { subdomain := strings.TrimSuffix(host, suffix) if strings.HasPrefix(subdomain, "ns-") { - // This is a namespace gateway request namespaceName := strings.TrimPrefix(subdomain, "ns-") g.handleNamespaceGatewayRequest(w, r, namespaceName) return } } + // Skip API paths (they should use JWT/API key auth on the main gateway) + if strings.HasPrefix(r.URL.Path, "/v1/") || strings.HasPrefix(r.URL.Path, "/.well-known/") { + next.ServeHTTP(w, r) + return + } + // Check if deployment handlers are available if g.deploymentService == nil || g.staticHandler == nil { next.ServeHTTP(w, r) diff --git a/pkg/namespace/cluster_manager.go b/pkg/namespace/cluster_manager.go index 03d90a8..3778155 100644 --- a/pkg/namespace/cluster_manager.go +++ b/pkg/namespace/cluster_manager.go @@ -6,7 +6,9 @@ import ( "encoding/json" "fmt" "io" + "net" "net/http" + "sort" "strings" "sync" "time" @@ -575,7 +577,7 @@ func (cm *ClusterManager) sendStopRequest(ctx context.Context, nodeIP, action, n // createDNSRecords creates DNS records for the namespace gateway func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) error { // Create A records for ns-{namespace}.{baseDomain} pointing to all 3 nodes - fqdn := fmt.Sprintf("ns-%s.%s", cluster.NamespaceName, cm.baseDomain) + fqdn := fmt.Sprintf("ns-%s.%s.", cluster.NamespaceName, cm.baseDomain) for i, node := range nodes { query := ` @@ -1023,6 +1025,283 @@ func (cm *ClusterManager) provisionClusterAsync(cluster *NamespaceCluster, names ) } +// RestoreLocalClusters restores namespace cluster processes that should be running on this node. +// Called on node startup to re-spawn RQLite, Olric, and Gateway processes for clusters +// that were previously provisioned and assigned to this node. +func (cm *ClusterManager) RestoreLocalClusters(ctx context.Context) error { + if cm.localNodeID == "" { + return fmt.Errorf("local node ID not set") + } + + cm.logger.Info("Checking for namespace clusters to restore", zap.String("local_node_id", cm.localNodeID)) + + // Find all ready clusters that have this node assigned + type clusterNodeInfo struct { + ClusterID string `db:"namespace_cluster_id"` + NamespaceName string `db:"namespace_name"` + NodeID string `db:"node_id"` + Role string `db:"role"` + } + var assignments []clusterNodeInfo + query := ` + SELECT DISTINCT cn.namespace_cluster_id, c.namespace_name, cn.node_id, cn.role + FROM namespace_cluster_nodes cn + JOIN namespace_clusters c ON cn.namespace_cluster_id = c.id + WHERE cn.node_id = ? AND c.status = 'ready' + ` + if err := cm.db.Query(ctx, &assignments, query, cm.localNodeID); err != nil { + return fmt.Errorf("failed to query local cluster assignments: %w", err) + } + + if len(assignments) == 0 { + cm.logger.Info("No namespace clusters to restore on this node") + return nil + } + + // Group by cluster + clusterNamespaces := make(map[string]string) // clusterID -> namespaceName + for _, a := range assignments { + clusterNamespaces[a.ClusterID] = a.NamespaceName + } + + cm.logger.Info("Found namespace clusters to restore", + zap.Int("count", len(clusterNamespaces)), + zap.String("local_node_id", cm.localNodeID), + ) + + // Get local node's WireGuard IP + type nodeIPInfo struct { + InternalIP string `db:"internal_ip"` + } + var localNodeInfo []nodeIPInfo + ipQuery := `SELECT COALESCE(internal_ip, ip_address) as internal_ip FROM dns_nodes WHERE id = ? LIMIT 1` + if err := cm.db.Query(ctx, &localNodeInfo, ipQuery, cm.localNodeID); err != nil || len(localNodeInfo) == 0 { + cm.logger.Warn("Could not determine local node IP, skipping restore", zap.Error(err)) + return fmt.Errorf("failed to get local node IP: %w", err) + } + localIP := localNodeInfo[0].InternalIP + + for clusterID, namespaceName := range clusterNamespaces { + if err := cm.restoreClusterOnNode(ctx, clusterID, namespaceName, localIP); err != nil { + cm.logger.Error("Failed to restore namespace cluster", + zap.String("namespace", namespaceName), + zap.String("cluster_id", clusterID), + zap.Error(err), + ) + // Continue restoring other clusters + } + } + + return nil +} + +// restoreClusterOnNode restores all processes for a single cluster on this node +func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, namespaceName, localIP string) error { + cm.logger.Info("Restoring namespace cluster processes", + zap.String("namespace", namespaceName), + zap.String("cluster_id", clusterID), + ) + + // Get port allocation for this node + var portBlocks []PortBlock + portQuery := `SELECT * FROM namespace_port_allocations WHERE namespace_cluster_id = ? AND node_id = ?` + if err := cm.db.Query(ctx, &portBlocks, portQuery, clusterID, cm.localNodeID); err != nil || len(portBlocks) == 0 { + return fmt.Errorf("no port allocation found for cluster %s on node %s", clusterID, cm.localNodeID) + } + pb := &portBlocks[0] + + // Get all nodes in this cluster (for join addresses and peer addresses) + allNodes, err := cm.getClusterNodes(ctx, clusterID) + if err != nil { + return fmt.Errorf("failed to get cluster nodes: %w", err) + } + + // Get all nodes' IPs and port allocations + type nodePortInfo struct { + NodeID string `db:"node_id"` + InternalIP string `db:"internal_ip"` + RQLiteHTTPPort int `db:"rqlite_http_port"` + RQLiteRaftPort int `db:"rqlite_raft_port"` + OlricHTTPPort int `db:"olric_http_port"` + OlricMemberlistPort int `db:"olric_memberlist_port"` + } + var allNodePorts []nodePortInfo + allPortsQuery := ` + SELECT pa.node_id, COALESCE(dn.internal_ip, dn.ip_address) as internal_ip, + pa.rqlite_http_port, pa.rqlite_raft_port, pa.olric_http_port, pa.olric_memberlist_port + FROM namespace_port_allocations pa + JOIN dns_nodes dn ON pa.node_id = dn.id + WHERE pa.namespace_cluster_id = ? + ` + if err := cm.db.Query(ctx, &allNodePorts, allPortsQuery, clusterID); err != nil { + return fmt.Errorf("failed to get all node ports: %w", err) + } + + // 1. Restore RQLite + if !cm.rqliteSpawner.IsInstanceRunning(pb.RQLiteHTTPPort) { + hasExistingData := cm.rqliteSpawner.HasExistingData(namespaceName, cm.localNodeID) + + if hasExistingData { + // Write peers.json for Raft cluster recovery (official RQLite mechanism). + // When all nodes restart simultaneously, Raft can't form quorum from stale state. + // peers.json tells rqlited the correct voter list so it can hold a fresh election. + var peers []rqlite.RaftPeer + for _, np := range allNodePorts { + raftAddr := fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort) + peers = append(peers, rqlite.RaftPeer{ + ID: raftAddr, + Address: raftAddr, + NonVoter: false, + }) + } + dataDir := cm.rqliteSpawner.GetDataDir(namespaceName, cm.localNodeID) + if err := cm.rqliteSpawner.WritePeersJSON(dataDir, peers); err != nil { + cm.logger.Error("Failed to write peers.json", zap.String("namespace", namespaceName), zap.Error(err)) + } + } + + // Build join addresses for first-time joins (no existing data) + var joinAddrs []string + isLeader := false + if !hasExistingData { + // Deterministic leader selection: sort all node IDs and pick the first one. + // Every node independently computes the same result — no coordination needed. + // The elected leader bootstraps the cluster; followers use -join with retries + // to wait for the leader to become ready (up to 5 minutes). + sortedNodeIDs := make([]string, 0, len(allNodePorts)) + for _, np := range allNodePorts { + sortedNodeIDs = append(sortedNodeIDs, np.NodeID) + } + sort.Strings(sortedNodeIDs) + electedLeaderID := sortedNodeIDs[0] + + if cm.localNodeID == electedLeaderID { + isLeader = true + cm.logger.Info("Deterministic leader election: this node is the leader", + zap.String("namespace", namespaceName), + zap.String("node_id", cm.localNodeID)) + } else { + // Follower: join the elected leader's raft address + for _, np := range allNodePorts { + if np.NodeID == electedLeaderID { + joinAddrs = append(joinAddrs, fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort)) + break + } + } + cm.logger.Info("Deterministic leader election: this node is a follower", + zap.String("namespace", namespaceName), + zap.String("node_id", cm.localNodeID), + zap.String("leader_id", electedLeaderID), + zap.Strings("join_addrs", joinAddrs)) + } + } + + rqliteCfg := rqlite.InstanceConfig{ + Namespace: namespaceName, + NodeID: cm.localNodeID, + HTTPPort: pb.RQLiteHTTPPort, + RaftPort: pb.RQLiteRaftPort, + HTTPAdvAddress: fmt.Sprintf("%s:%d", localIP, pb.RQLiteHTTPPort), + RaftAdvAddress: fmt.Sprintf("%s:%d", localIP, pb.RQLiteRaftPort), + JoinAddresses: joinAddrs, + IsLeader: isLeader, + } + + if _, err := cm.rqliteSpawner.SpawnInstance(ctx, rqliteCfg); err != nil { + cm.logger.Error("Failed to restore RQLite", zap.String("namespace", namespaceName), zap.Error(err)) + } else { + cm.logger.Info("Restored RQLite instance", zap.String("namespace", namespaceName), zap.Int("port", pb.RQLiteHTTPPort)) + } + } else { + cm.logger.Info("RQLite already running", zap.String("namespace", namespaceName), zap.Int("port", pb.RQLiteHTTPPort)) + } + + // 2. Restore Olric + olricRunning := false + conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", pb.OlricMemberlistPort), 2*time.Second) + if err == nil { + conn.Close() + olricRunning = true + } + + if !olricRunning { + var peers []string + for _, np := range allNodePorts { + if np.NodeID != cm.localNodeID { + peers = append(peers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricMemberlistPort)) + } + } + + olricCfg := olric.InstanceConfig{ + Namespace: namespaceName, + NodeID: cm.localNodeID, + HTTPPort: pb.OlricHTTPPort, + MemberlistPort: pb.OlricMemberlistPort, + BindAddr: localIP, + AdvertiseAddr: localIP, + PeerAddresses: peers, + } + + if _, err := cm.olricSpawner.SpawnInstance(ctx, olricCfg); err != nil { + cm.logger.Error("Failed to restore Olric", zap.String("namespace", namespaceName), zap.Error(err)) + } else { + cm.logger.Info("Restored Olric instance", zap.String("namespace", namespaceName), zap.Int("port", pb.OlricHTTPPort)) + } + } else { + cm.logger.Info("Olric already running", zap.String("namespace", namespaceName), zap.Int("port", pb.OlricMemberlistPort)) + } + + // 3. Restore Gateway + // Check if any cluster node has the gateway role (gateway may have been skipped during provisioning) + hasGateway := false + for _, node := range allNodes { + if node.Role == NodeRoleGateway { + hasGateway = true + break + } + } + + if hasGateway { + gwRunning := false + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/health", pb.GatewayHTTPPort)) + if err == nil { + resp.Body.Close() + gwRunning = true + } + + if !gwRunning { + // Build olric server addresses + var olricServers []string + for _, np := range allNodePorts { + if np.NodeID == cm.localNodeID { + olricServers = append(olricServers, fmt.Sprintf("localhost:%d", np.OlricHTTPPort)) + } else { + olricServers = append(olricServers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricHTTPPort)) + } + } + + gwCfg := gateway.InstanceConfig{ + Namespace: namespaceName, + NodeID: cm.localNodeID, + HTTPPort: pb.GatewayHTTPPort, + BaseDomain: cm.baseDomain, + RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort), + OlricServers: olricServers, + } + + if _, err := cm.gatewaySpawner.SpawnInstance(ctx, gwCfg); err != nil { + cm.logger.Error("Failed to restore Gateway", zap.String("namespace", namespaceName), zap.Error(err)) + } else { + cm.logger.Info("Restored Gateway instance", zap.String("namespace", namespaceName), zap.Int("port", pb.GatewayHTTPPort)) + } + } else { + cm.logger.Info("Gateway already running", zap.String("namespace", namespaceName), zap.Int("port", pb.GatewayHTTPPort)) + } + } + + return nil +} + // GetClusterStatusByID returns the full status of a cluster by ID. // This method is part of the ClusterProvisioner interface used by the gateway. // It returns a generic struct that matches the interface definition in auth/handlers.go. diff --git a/pkg/node/gateway.go b/pkg/node/gateway.go index d4abe1a..0cb3b31 100644 --- a/pkg/node/gateway.go +++ b/pkg/node/gateway.go @@ -6,6 +6,7 @@ import ( "net/http" "os" "path/filepath" + "time" "github.com/DeBrosOfficial/network/pkg/gateway" namespacehandlers "github.com/DeBrosOfficial/network/pkg/gateway/handlers/namespace" @@ -91,6 +92,15 @@ func (n *Node) startHTTPGateway(ctx context.Context) error { n.logger.ComponentInfo(logging.ComponentNode, "Namespace cluster provisioning enabled", zap.String("base_domain", clusterCfg.BaseDomain), zap.String("base_data_dir", baseDataDir)) + + // Restore previously-running namespace cluster processes in background + go func() { + // Wait for main RQLite to be ready before querying cluster assignments + time.Sleep(10 * time.Second) + if err := clusterManager.RestoreLocalClusters(ctx); err != nil { + n.logger.ComponentError(logging.ComponentNode, "Failed to restore namespace clusters", zap.Error(err)) + } + }() } go func() { diff --git a/pkg/rqlite/instance_spawner.go b/pkg/rqlite/instance_spawner.go index 1f63547..f34348e 100644 --- a/pkg/rqlite/instance_spawner.go +++ b/pkg/rqlite/instance_spawner.go @@ -2,6 +2,7 @@ package rqlite import ( "context" + "encoding/json" "fmt" "net/http" "os" @@ -12,6 +13,13 @@ import ( "go.uber.org/zap" ) +// RaftPeer represents a peer entry in RQLite's peers.json recovery file +type RaftPeer struct { + ID string `json:"id"` + Address string `json:"address"` + NonVoter bool `json:"non_voter"` +} + // InstanceConfig contains configuration for spawning a RQLite instance type InstanceConfig struct { Namespace string // Namespace this instance belongs to @@ -80,6 +88,9 @@ func (is *InstanceSpawner) SpawnInstance(ctx context.Context, cfg InstanceConfig for _, addr := range cfg.JoinAddresses { args = append(args, "-join", addr) } + // Retry joining for up to 5 minutes (default is 5 attempts / 3s = 15s which is too short + // when all namespace nodes restart simultaneously and the leader isn't ready yet) + args = append(args, "-join-attempts", "30", "-join-interval", "10s") } // Data directory must be the last argument @@ -139,7 +150,9 @@ func (is *InstanceSpawner) waitForReady(ctx context.Context, httpPort int) error url := fmt.Sprintf("http://localhost:%d/status", httpPort) client := &http.Client{Timeout: 2 * time.Second} - deadline := time.Now().Add(30 * time.Second) + // 6 minutes: must exceed the join retry window (30 attempts * 10s = 5min) + // so we don't kill followers that are still waiting for the leader + deadline := time.Now().Add(6 * time.Minute) for time.Now().Before(deadline) { select { case <-ctx.Done(): @@ -237,6 +250,42 @@ func (is *InstanceSpawner) IsInstanceRunning(httpPort int) bool { return resp.StatusCode == http.StatusOK } +// HasExistingData checks if a RQLite instance has existing data (raft.db indicates prior startup) +func (is *InstanceSpawner) HasExistingData(namespace, nodeID string) bool { + dataDir := is.GetDataDir(namespace, nodeID) + if _, err := os.Stat(filepath.Join(dataDir, "raft.db")); err == nil { + return true + } + return false +} + +// WritePeersJSON writes a peers.json recovery file into the Raft directory. +// This is RQLite's official mechanism for recovering a cluster when all nodes are down. +// On startup, rqlited reads this file, overwrites the Raft peer configuration, +// and renames it to peers.info after recovery. +func (is *InstanceSpawner) WritePeersJSON(dataDir string, peers []RaftPeer) error { + raftDir := filepath.Join(dataDir, "raft") + if err := os.MkdirAll(raftDir, 0755); err != nil { + return fmt.Errorf("failed to create raft directory: %w", err) + } + + data, err := json.MarshalIndent(peers, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal peers.json: %w", err) + } + + peersPath := filepath.Join(raftDir, "peers.json") + if err := os.WriteFile(peersPath, data, 0644); err != nil { + return fmt.Errorf("failed to write peers.json: %w", err) + } + + is.logger.Info("Wrote peers.json for cluster recovery", + zap.String("path", peersPath), + zap.Int("peer_count", len(peers)), + ) + return nil +} + // GetDataDir returns the data directory path for a namespace RQLite instance func (is *InstanceSpawner) GetDataDir(namespace, nodeID string) string { return filepath.Join(is.baseDataDir, namespace, "rqlite", nodeID)