package rqlite import ( "encoding/json" "fmt" "net" "os" "path/filepath" "sort" "strings" "time" "github.com/DeBrosOfficial/network/pkg/discovery" "go.uber.org/zap" ) // MaxDefaultVoters is the maximum number of voter nodes in the default cluster. // Additional nodes join as non-voters (read replicas). Voter election is // deterministic: all peers sorted by the IP component of their raft address, // and the first MaxDefaultVoters are voters. const MaxDefaultVoters = 5 // collectPeerMetadata collects RQLite metadata from LibP2P peers func (c *ClusterDiscoveryService) collectPeerMetadata() []*discovery.RQLiteNodeMetadata { connectedPeers := c.host.Network().Peers() var metadata []*discovery.RQLiteNodeMetadata c.mu.RLock() currentRaftAddr := c.raftAddress currentHTTPAddr := c.httpAddress c.mu.RUnlock() // Add ourselves ourMetadata := &discovery.RQLiteNodeMetadata{ NodeID: currentRaftAddr, // RQLite uses raft address as node ID RaftAddress: currentRaftAddr, HTTPAddress: currentHTTPAddr, NodeType: c.nodeType, RaftLogIndex: c.rqliteManager.getRaftLogIndex(), LastSeen: time.Now(), ClusterVersion: "1.0", PeerID: c.host.ID().String(), WireGuardIP: c.wireGuardIP, } // Populate lifecycle state if c.lifecycle != nil { state, ttl := c.lifecycle.Snapshot() ourMetadata.LifecycleState = string(state) if state == "maintenance" { ourMetadata.MaintenanceTTL = ttl } } if c.adjustSelfAdvertisedAddresses(ourMetadata) { c.logger.Debug("Adjusted self-advertised RQLite addresses", zap.String("raft_address", ourMetadata.RaftAddress), zap.String("http_address", ourMetadata.HTTPAddress)) } metadata = append(metadata, ourMetadata) staleNodeIDs := make([]string, 0) for _, peerID := range connectedPeers { if val, err := c.host.Peerstore().Get(peerID, "rqlite_metadata"); err == nil { if jsonData, ok := val.([]byte); ok { var peerMeta discovery.RQLiteNodeMetadata if err := json.Unmarshal(jsonData, &peerMeta); err == nil { if updated, stale := c.adjustPeerAdvertisedAddresses(peerID, &peerMeta); updated && stale != "" { staleNodeIDs = append(staleNodeIDs, stale) } peerMeta.LastSeen = time.Now() metadata = append(metadata, &peerMeta) } } } } if len(staleNodeIDs) > 0 { c.mu.Lock() for _, id := range staleNodeIDs { delete(c.knownPeers, id) delete(c.peerHealth, id) } c.mu.Unlock() } return metadata } type membershipUpdateResult struct { peersJSON []map[string]interface{} added []string updated []string changed bool } func (c *ClusterDiscoveryService) updateClusterMembership() { metadata := c.collectPeerMetadata() c.mu.Lock() result := c.computeMembershipChangesLocked(metadata) c.mu.Unlock() if result.changed { if len(result.added) > 0 || len(result.updated) > 0 { c.logger.Info("Membership changed", zap.Int("added", len(result.added)), zap.Int("updated", len(result.updated)), zap.Strings("added", result.added), zap.Strings("updated", result.updated)) } if err := c.writePeersJSONWithData(result.peersJSON); err != nil { c.logger.Error("Failed to write peers.json", zap.Error(err), zap.String("data_dir", c.dataDir), zap.Int("peers", len(result.peersJSON))) } else { c.logger.Debug("peers.json updated", zap.Int("peers", len(result.peersJSON))) } c.mu.Lock() c.lastUpdate = time.Now() c.mu.Unlock() } } func (c *ClusterDiscoveryService) computeMembershipChangesLocked(metadata []*discovery.RQLiteNodeMetadata) membershipUpdateResult { added := []string{} updated := []string{} for _, meta := range metadata { isSelf := meta.NodeID == c.raftAddress if existing, ok := c.knownPeers[meta.NodeID]; ok { if existing.RaftLogIndex != meta.RaftLogIndex || existing.HTTPAddress != meta.HTTPAddress || existing.RaftAddress != meta.RaftAddress { updated = append(updated, meta.NodeID) } } else { added = append(added, meta.NodeID) c.logger.Info("Node added", zap.String("node", meta.NodeID), zap.String("raft", meta.RaftAddress), zap.String("type", meta.NodeType), zap.Uint64("log_index", meta.RaftLogIndex)) } c.knownPeers[meta.NodeID] = meta if !isSelf { if _, ok := c.peerHealth[meta.NodeID]; !ok { c.peerHealth[meta.NodeID] = &PeerHealth{ LastSeen: time.Now(), LastSuccessful: time.Now(), Status: "active", } } else { c.peerHealth[meta.NodeID].LastSeen = time.Now() c.peerHealth[meta.NodeID].Status = "active" c.peerHealth[meta.NodeID].FailureCount = 0 } } } remotePeerCount := 0 for _, peer := range c.knownPeers { if peer.NodeID != c.raftAddress { remotePeerCount++ } } peers := c.getPeersJSONUnlocked() shouldWrite := len(added) > 0 || len(updated) > 0 || c.lastUpdate.IsZero() if shouldWrite { if c.lastUpdate.IsZero() { requiredRemotePeers := c.minClusterSize - 1 if remotePeerCount < requiredRemotePeers { c.logger.Info("Waiting for peers", zap.Int("have", remotePeerCount), zap.Int("need", requiredRemotePeers), zap.Int("min_size", c.minClusterSize)) return membershipUpdateResult{ changed: false, } } } if len(peers) == 0 && c.lastUpdate.IsZero() { c.logger.Info("No remote peers - waiting") return membershipUpdateResult{ changed: false, } } if c.lastUpdate.IsZero() { c.logger.Info("Initial sync", zap.Int("total", len(c.knownPeers)), zap.Int("remote", remotePeerCount), zap.Int("in_json", len(peers))) } return membershipUpdateResult{ peersJSON: peers, added: added, updated: updated, changed: true, } } return membershipUpdateResult{ changed: false, } } func (c *ClusterDiscoveryService) removeInactivePeers() { c.mu.Lock() defer c.mu.Unlock() now := time.Now() removed := []string{} for nodeID, health := range c.peerHealth { inactiveDuration := now.Sub(health.LastSeen) if inactiveDuration > c.inactivityLimit { c.logger.Warn("Node removed", zap.String("node", nodeID), zap.String("reason", "inactive"), zap.Duration("inactive_duration", inactiveDuration)) delete(c.knownPeers, nodeID) delete(c.peerHealth, nodeID) removed = append(removed, nodeID) } } if len(removed) > 0 { c.logger.Info("Removed inactive", zap.Int("count", len(removed)), zap.Strings("nodes", removed)) if err := c.writePeersJSON(); err != nil { c.logger.Error("Failed to write peers.json after cleanup", zap.Error(err)) } } } func (c *ClusterDiscoveryService) getPeersJSON() []map[string]interface{} { c.mu.RLock() defer c.mu.RUnlock() return c.getPeersJSONUnlocked() } func (c *ClusterDiscoveryService) getPeersJSONUnlocked() []map[string]interface{} { // Collect all raft addresses raftAddrs := make([]string, 0, len(c.knownPeers)) for _, peer := range c.knownPeers { raftAddrs = append(raftAddrs, peer.RaftAddress) } // Determine voter set voterSet := computeVoterSet(raftAddrs, MaxDefaultVoters) peers := make([]map[string]interface{}, 0, len(c.knownPeers)) for _, peer := range c.knownPeers { _, isVoter := voterSet[peer.RaftAddress] peerEntry := map[string]interface{}{ "id": peer.RaftAddress, "address": peer.RaftAddress, "non_voter": !isVoter, } peers = append(peers, peerEntry) } return peers } // computeVoterSet returns the set of raft addresses that should be voters. // It sorts addresses by their numeric IP and selects the first maxVoters. // This is deterministic — all nodes compute the same voter set from the same peer list. func computeVoterSet(raftAddrs []string, maxVoters int) map[string]struct{} { sorted := make([]string, len(raftAddrs)) copy(sorted, raftAddrs) sort.Slice(sorted, func(i, j int) bool { ipI := extractIPForSort(sorted[i]) ipJ := extractIPForSort(sorted[j]) return compareIPs(ipI, ipJ) }) voters := make(map[string]struct{}) for i, addr := range sorted { if i >= maxVoters { break } voters[addr] = struct{}{} } return voters } // extractIPForSort extracts the IP string from a raft address (host:port) for sorting. func extractIPForSort(raftAddr string) string { host, _, err := net.SplitHostPort(raftAddr) if err != nil { return raftAddr } return host } // compareIPs compares two IP strings numerically (not alphabetically). // Alphabetical sort gives wrong results: "10.0.0.10" < "10.0.0.2" alphabetically, // but numerically 10.0.0.2 < 10.0.0.10. This was causing wrong nodes to be // selected as voters (e.g., 10.0.0.1, 10.0.0.10, 10.0.0.11 instead of 10.0.0.1-5). func compareIPs(a, b string) bool { ipA := net.ParseIP(a) ipB := net.ParseIP(b) // Fallback to string comparison if parsing fails if ipA == nil || ipB == nil { return a < b } // Normalize to 16-byte representation for consistent comparison ipA = ipA.To16() ipB = ipB.To16() for i := range ipA { if ipA[i] != ipB[i] { return ipA[i] < ipB[i] } } return false } // IsVoter returns true if the given raft address is in the voter set // based on the current known peers. Must be called with c.mu held. func (c *ClusterDiscoveryService) IsVoterLocked(raftAddress string) bool { // If we don't know enough peers yet, default to voter. // Non-voter demotion only kicks in once we see more than MaxDefaultVoters peers. if len(c.knownPeers) <= MaxDefaultVoters { return true } raftAddrs := make([]string, 0, len(c.knownPeers)) for _, peer := range c.knownPeers { raftAddrs = append(raftAddrs, peer.RaftAddress) } voterSet := computeVoterSet(raftAddrs, MaxDefaultVoters) _, isVoter := voterSet[raftAddress] return isVoter } func (c *ClusterDiscoveryService) writePeersJSON() error { c.mu.RLock() peers := c.getPeersJSONUnlocked() c.mu.RUnlock() return c.writePeersJSONWithData(peers) } // writePeersJSONWithData writes the discovery peers file to a SAFE location // outside the raft directory. This is critical: rqlite v8 treats any // peers.json inside /raft/ as a recovery signal and RESETS // the Raft configuration on startup. Writing there on every periodic sync // caused split-brain on every node restart. // // Safe location: /rqlite/discovery-peers.json // Dangerous location: /rqlite/raft/peers.json (only for explicit recovery) func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]interface{}) error { dataDir := os.ExpandEnv(c.dataDir) if strings.HasPrefix(dataDir, "~") { home, err := os.UserHomeDir() if err != nil { return fmt.Errorf("failed to determine home directory: %w", err) } dataDir = filepath.Join(home, dataDir[1:]) } // Write to /rqlite/ — NOT inside raft/ subdirectory. // rqlite v8 auto-recovers from raft/peers.json on every startup, // which resets the Raft config and causes split-brain. rqliteDir := filepath.Join(dataDir, "rqlite") if err := os.MkdirAll(rqliteDir, 0755); err != nil { return fmt.Errorf("failed to create rqlite directory %s: %w", rqliteDir, err) } peersFile := filepath.Join(rqliteDir, "discovery-peers.json") data, err := json.MarshalIndent(peers, "", " ") if err != nil { return fmt.Errorf("failed to marshal discovery-peers.json: %w", err) } tempFile := peersFile + ".tmp" if err := os.WriteFile(tempFile, data, 0644); err != nil { return fmt.Errorf("failed to write temp discovery-peers.json %s: %w", tempFile, err) } if err := os.Rename(tempFile, peersFile); err != nil { return fmt.Errorf("failed to rename %s to %s: %w", tempFile, peersFile, err) } nodeIDs := make([]string, 0, len(peers)) for _, p := range peers { if id, ok := p["id"].(string); ok { nodeIDs = append(nodeIDs, id) } } c.logger.Debug("discovery-peers.json written", zap.Int("peers", len(peers)), zap.Strings("nodes", nodeIDs)) return nil } // writeRecoveryPeersJSON writes peers.json to the raft directory for // INTENTIONAL cluster recovery only. rqlite v8 will read this file on // startup and reset the Raft configuration accordingly. Only call this // when you explicitly want to trigger Raft recovery. func (c *ClusterDiscoveryService) writeRecoveryPeersJSON(peers []map[string]interface{}) error { dataDir := os.ExpandEnv(c.dataDir) if strings.HasPrefix(dataDir, "~") { home, err := os.UserHomeDir() if err != nil { return fmt.Errorf("failed to determine home directory: %w", err) } dataDir = filepath.Join(home, dataDir[1:]) } raftDir := filepath.Join(dataDir, "rqlite", "raft") if err := os.MkdirAll(raftDir, 0755); err != nil { return fmt.Errorf("failed to create raft directory %s: %w", raftDir, err) } peersFile := filepath.Join(raftDir, "peers.json") data, err := json.MarshalIndent(peers, "", " ") if err != nil { return fmt.Errorf("failed to marshal recovery peers.json: %w", err) } tempFile := peersFile + ".tmp" if err := os.WriteFile(tempFile, data, 0644); err != nil { return fmt.Errorf("failed to write temp recovery peers.json %s: %w", tempFile, err) } if err := os.Rename(tempFile, peersFile); err != nil { return fmt.Errorf("failed to rename %s to %s: %w", tempFile, peersFile, err) } nodeIDs := make([]string, 0, len(peers)) for _, p := range peers { if id, ok := p["id"].(string); ok { nodeIDs = append(nodeIDs, id) } } c.logger.Warn("RECOVERY peers.json written to raft directory — rqlited will reset Raft config on next startup", zap.Int("peers", len(peers)), zap.Strings("nodes", nodeIDs)) return nil }