orama/pkg/rqlite/cluster_discovery_membership.go
2026-02-01 15:58:28 +02:00

385 lines
10 KiB
Go

package rqlite
import (
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"sort"
"strings"
"time"
"github.com/DeBrosOfficial/network/pkg/discovery"
"go.uber.org/zap"
)
// MaxDefaultVoters is the maximum number of voter nodes in the default cluster.
// Additional nodes join as non-voters (read replicas). Voter election is
// deterministic: all peers sorted by the IP component of their raft address,
// and the first MaxDefaultVoters are voters.
const MaxDefaultVoters = 5
// collectPeerMetadata collects RQLite metadata from LibP2P peers
func (c *ClusterDiscoveryService) collectPeerMetadata() []*discovery.RQLiteNodeMetadata {
connectedPeers := c.host.Network().Peers()
var metadata []*discovery.RQLiteNodeMetadata
c.mu.RLock()
currentRaftAddr := c.raftAddress
currentHTTPAddr := c.httpAddress
c.mu.RUnlock()
// Add ourselves
ourMetadata := &discovery.RQLiteNodeMetadata{
NodeID: currentRaftAddr, // RQLite uses raft address as node ID
RaftAddress: currentRaftAddr,
HTTPAddress: currentHTTPAddr,
NodeType: c.nodeType,
RaftLogIndex: c.rqliteManager.getRaftLogIndex(),
LastSeen: time.Now(),
ClusterVersion: "1.0",
}
if c.adjustSelfAdvertisedAddresses(ourMetadata) {
c.logger.Debug("Adjusted self-advertised RQLite addresses",
zap.String("raft_address", ourMetadata.RaftAddress),
zap.String("http_address", ourMetadata.HTTPAddress))
}
metadata = append(metadata, ourMetadata)
staleNodeIDs := make([]string, 0)
for _, peerID := range connectedPeers {
if val, err := c.host.Peerstore().Get(peerID, "rqlite_metadata"); err == nil {
if jsonData, ok := val.([]byte); ok {
var peerMeta discovery.RQLiteNodeMetadata
if err := json.Unmarshal(jsonData, &peerMeta); err == nil {
if updated, stale := c.adjustPeerAdvertisedAddresses(peerID, &peerMeta); updated && stale != "" {
staleNodeIDs = append(staleNodeIDs, stale)
}
peerMeta.LastSeen = time.Now()
metadata = append(metadata, &peerMeta)
}
}
}
}
if len(staleNodeIDs) > 0 {
c.mu.Lock()
for _, id := range staleNodeIDs {
delete(c.knownPeers, id)
delete(c.peerHealth, id)
}
c.mu.Unlock()
}
return metadata
}
type membershipUpdateResult struct {
peersJSON []map[string]interface{}
added []string
updated []string
changed bool
}
func (c *ClusterDiscoveryService) updateClusterMembership() {
metadata := c.collectPeerMetadata()
c.mu.Lock()
result := c.computeMembershipChangesLocked(metadata)
c.mu.Unlock()
if result.changed {
if len(result.added) > 0 || len(result.updated) > 0 {
c.logger.Info("Membership changed",
zap.Int("added", len(result.added)),
zap.Int("updated", len(result.updated)),
zap.Strings("added", result.added),
zap.Strings("updated", result.updated))
}
if err := c.writePeersJSONWithData(result.peersJSON); err != nil {
c.logger.Error("Failed to write peers.json",
zap.Error(err),
zap.String("data_dir", c.dataDir),
zap.Int("peers", len(result.peersJSON)))
} else {
c.logger.Debug("peers.json updated",
zap.Int("peers", len(result.peersJSON)))
}
c.mu.Lock()
c.lastUpdate = time.Now()
c.mu.Unlock()
}
}
func (c *ClusterDiscoveryService) computeMembershipChangesLocked(metadata []*discovery.RQLiteNodeMetadata) membershipUpdateResult {
added := []string{}
updated := []string{}
for _, meta := range metadata {
isSelf := meta.NodeID == c.raftAddress
if existing, ok := c.knownPeers[meta.NodeID]; ok {
if existing.RaftLogIndex != meta.RaftLogIndex ||
existing.HTTPAddress != meta.HTTPAddress ||
existing.RaftAddress != meta.RaftAddress {
updated = append(updated, meta.NodeID)
}
} else {
added = append(added, meta.NodeID)
c.logger.Info("Node added",
zap.String("node", meta.NodeID),
zap.String("raft", meta.RaftAddress),
zap.String("type", meta.NodeType),
zap.Uint64("log_index", meta.RaftLogIndex))
}
c.knownPeers[meta.NodeID] = meta
if !isSelf {
if _, ok := c.peerHealth[meta.NodeID]; !ok {
c.peerHealth[meta.NodeID] = &PeerHealth{
LastSeen: time.Now(),
LastSuccessful: time.Now(),
Status: "active",
}
} else {
c.peerHealth[meta.NodeID].LastSeen = time.Now()
c.peerHealth[meta.NodeID].Status = "active"
c.peerHealth[meta.NodeID].FailureCount = 0
}
}
}
remotePeerCount := 0
for _, peer := range c.knownPeers {
if peer.NodeID != c.raftAddress {
remotePeerCount++
}
}
peers := c.getPeersJSONUnlocked()
shouldWrite := len(added) > 0 || len(updated) > 0 || c.lastUpdate.IsZero()
if shouldWrite {
if c.lastUpdate.IsZero() {
requiredRemotePeers := c.minClusterSize - 1
if remotePeerCount < requiredRemotePeers {
c.logger.Info("Waiting for peers",
zap.Int("have", remotePeerCount),
zap.Int("need", requiredRemotePeers),
zap.Int("min_size", c.minClusterSize))
return membershipUpdateResult{
changed: false,
}
}
}
if len(peers) == 0 && c.lastUpdate.IsZero() {
c.logger.Info("No remote peers - waiting")
return membershipUpdateResult{
changed: false,
}
}
if c.lastUpdate.IsZero() {
c.logger.Info("Initial sync",
zap.Int("total", len(c.knownPeers)),
zap.Int("remote", remotePeerCount),
zap.Int("in_json", len(peers)))
}
return membershipUpdateResult{
peersJSON: peers,
added: added,
updated: updated,
changed: true,
}
}
return membershipUpdateResult{
changed: false,
}
}
func (c *ClusterDiscoveryService) removeInactivePeers() {
c.mu.Lock()
defer c.mu.Unlock()
now := time.Now()
removed := []string{}
for nodeID, health := range c.peerHealth {
inactiveDuration := now.Sub(health.LastSeen)
if inactiveDuration > c.inactivityLimit {
c.logger.Warn("Node removed",
zap.String("node", nodeID),
zap.String("reason", "inactive"),
zap.Duration("inactive_duration", inactiveDuration))
delete(c.knownPeers, nodeID)
delete(c.peerHealth, nodeID)
removed = append(removed, nodeID)
}
}
if len(removed) > 0 {
c.logger.Info("Removed inactive",
zap.Int("count", len(removed)),
zap.Strings("nodes", removed))
if err := c.writePeersJSON(); err != nil {
c.logger.Error("Failed to write peers.json after cleanup", zap.Error(err))
}
}
}
func (c *ClusterDiscoveryService) getPeersJSON() []map[string]interface{} {
c.mu.RLock()
defer c.mu.RUnlock()
return c.getPeersJSONUnlocked()
}
func (c *ClusterDiscoveryService) getPeersJSONUnlocked() []map[string]interface{} {
// Collect all raft addresses
raftAddrs := make([]string, 0, len(c.knownPeers))
for _, peer := range c.knownPeers {
raftAddrs = append(raftAddrs, peer.RaftAddress)
}
// Determine voter set
voterSet := computeVoterSet(raftAddrs, MaxDefaultVoters)
peers := make([]map[string]interface{}, 0, len(c.knownPeers))
for _, peer := range c.knownPeers {
_, isVoter := voterSet[peer.RaftAddress]
peerEntry := map[string]interface{}{
"id": peer.RaftAddress,
"address": peer.RaftAddress,
"non_voter": !isVoter,
}
peers = append(peers, peerEntry)
}
return peers
}
// computeVoterSet returns the set of raft addresses that should be voters.
// It sorts addresses by their IP component and selects the first maxVoters.
// This is deterministic — all nodes compute the same voter set from the same peer list.
func computeVoterSet(raftAddrs []string, maxVoters int) map[string]struct{} {
sorted := make([]string, len(raftAddrs))
copy(sorted, raftAddrs)
sort.Slice(sorted, func(i, j int) bool {
ipI := extractIPForSort(sorted[i])
ipJ := extractIPForSort(sorted[j])
return ipI < ipJ
})
voters := make(map[string]struct{})
for i, addr := range sorted {
if i >= maxVoters {
break
}
voters[addr] = struct{}{}
}
return voters
}
// extractIPForSort extracts the IP string from a raft address (host:port) for sorting.
func extractIPForSort(raftAddr string) string {
host, _, err := net.SplitHostPort(raftAddr)
if err != nil {
return raftAddr
}
return host
}
// IsVoter returns true if the given raft address is in the voter set
// based on the current known peers. Must be called with c.mu held.
func (c *ClusterDiscoveryService) IsVoterLocked(raftAddress string) bool {
// If we don't know enough peers yet, default to voter.
// Non-voter demotion only kicks in once we see more than MaxDefaultVoters peers.
if len(c.knownPeers) <= MaxDefaultVoters {
return true
}
raftAddrs := make([]string, 0, len(c.knownPeers))
for _, peer := range c.knownPeers {
raftAddrs = append(raftAddrs, peer.RaftAddress)
}
voterSet := computeVoterSet(raftAddrs, MaxDefaultVoters)
_, isVoter := voterSet[raftAddress]
return isVoter
}
func (c *ClusterDiscoveryService) writePeersJSON() error {
c.mu.RLock()
peers := c.getPeersJSONUnlocked()
c.mu.RUnlock()
return c.writePeersJSONWithData(peers)
}
func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]interface{}) error {
dataDir := os.ExpandEnv(c.dataDir)
if strings.HasPrefix(dataDir, "~") {
home, err := os.UserHomeDir()
if err != nil {
return fmt.Errorf("failed to determine home directory: %w", err)
}
dataDir = filepath.Join(home, dataDir[1:])
}
rqliteDir := filepath.Join(dataDir, "rqlite", "raft")
if err := os.MkdirAll(rqliteDir, 0755); err != nil {
return fmt.Errorf("failed to create raft directory %s: %w", rqliteDir, err)
}
peersFile := filepath.Join(rqliteDir, "peers.json")
backupFile := filepath.Join(rqliteDir, "peers.json.backup")
if _, err := os.Stat(peersFile); err == nil {
data, err := os.ReadFile(peersFile)
if err == nil {
_ = os.WriteFile(backupFile, data, 0644)
}
}
data, err := json.MarshalIndent(peers, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal peers.json: %w", err)
}
tempFile := peersFile + ".tmp"
if err := os.WriteFile(tempFile, data, 0644); err != nil {
return fmt.Errorf("failed to write temp peers.json %s: %w", tempFile, err)
}
if err := os.Rename(tempFile, peersFile); err != nil {
return fmt.Errorf("failed to rename %s to %s: %w", tempFile, peersFile, err)
}
nodeIDs := make([]string, 0, len(peers))
for _, p := range peers {
if id, ok := p["id"].(string); ok {
nodeIDs = append(nodeIDs, id)
}
}
c.logger.Info("peers.json written",
zap.Int("peers", len(peers)),
zap.Strings("nodes", nodeIDs))
return nil
}