From ea5ef6bc1a76edf0bc2c9ab6fe89c5bc2b0a891a Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 31 Oct 2025 10:46:52 +0200 Subject: [PATCH] feat: implement dynamic cluster discovery and validation for RQLite nodes - Added ClusterDiscoveryService to manage peer discovery and synchronization for RQLite nodes. - Introduced new configuration options for cluster synchronization intervals, peer inactivity limits, and minimum cluster size. - Enhanced validation logic to ensure proper configuration of cluster parameters. - Implemented metrics collection for cluster health and peer status, improving monitoring capabilities. - Updated RQLiteManager to integrate with the new discovery service, allowing for dynamic leadership and cluster joining logic. --- pkg/config/config.go | 10 + pkg/config/validate.go | 64 +++- pkg/discovery/discovery.go | 27 +- pkg/discovery/rqlite_metadata.go | 22 ++ pkg/node/monitoring.go | 71 +++- pkg/node/node.go | 57 ++- pkg/rqlite/cluster_discovery.go | 592 +++++++++++++++++++++++++++++++ pkg/rqlite/data_safety.go | 109 ++++++ pkg/rqlite/metrics.go | 74 ++++ pkg/rqlite/rqlite.go | 160 ++++++++- pkg/rqlite/types.go | 71 ++++ 11 files changed, 1218 insertions(+), 39 deletions(-) create mode 100644 pkg/discovery/rqlite_metadata.go create mode 100644 pkg/rqlite/cluster_discovery.go create mode 100644 pkg/rqlite/data_safety.go create mode 100644 pkg/rqlite/metrics.go create mode 100644 pkg/rqlite/types.go diff --git a/pkg/config/config.go b/pkg/config/config.go index 9ca73d9..5784597 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -36,6 +36,11 @@ type DatabaseConfig struct { RQLitePort int `yaml:"rqlite_port"` // RQLite HTTP API port RQLiteRaftPort int `yaml:"rqlite_raft_port"` // RQLite Raft consensus port RQLiteJoinAddress string `yaml:"rqlite_join_address"` // Address to join RQLite cluster + + // Dynamic discovery configuration (always enabled) + ClusterSyncInterval time.Duration `yaml:"cluster_sync_interval"` // default: 30s + PeerInactivityLimit time.Duration `yaml:"peer_inactivity_limit"` // default: 24h + MinClusterSize int `yaml:"min_cluster_size"` // default: 1 } // DiscoveryConfig contains peer discovery configuration @@ -106,6 +111,11 @@ func DefaultConfig() *Config { RQLitePort: 5001, RQLiteRaftPort: 7001, RQLiteJoinAddress: "", // Empty for bootstrap node + + // Dynamic discovery (always enabled) + ClusterSyncInterval: 30 * time.Second, + PeerInactivityLimit: 24 * time.Hour, + MinClusterSize: 1, }, Discovery: DiscoveryConfig{ BootstrapPeers: []string{}, diff --git a/pkg/config/validate.go b/pkg/config/validate.go index 045d784..b50c7d2 100644 --- a/pkg/config/validate.go +++ b/pkg/config/validate.go @@ -7,6 +7,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" @@ -51,6 +52,15 @@ func (c *Config) validateNode() []error { var errs []error nc := c.Node + // Validate node ID (required for RQLite cluster membership) + if nc.ID == "" { + errs = append(errs, ValidationError{ + Path: "node.id", + Message: "must not be empty (required for cluster membership)", + Hint: "will be auto-generated if empty, but explicit ID recommended", + }) + } + // Validate type if nc.Type != "bootstrap" && nc.Type != "node" { errs = append(errs, ValidationError{ @@ -233,6 +243,40 @@ func (c *Config) validateDatabase() []error { } } + // Validate cluster_sync_interval + if dc.ClusterSyncInterval != 0 && dc.ClusterSyncInterval < 10*time.Second { + errs = append(errs, ValidationError{ + Path: "database.cluster_sync_interval", + Message: fmt.Sprintf("must be >= 10s or 0 (for default); got %v", dc.ClusterSyncInterval), + Hint: "recommended: 30s", + }) + } + + // Validate peer_inactivity_limit + if dc.PeerInactivityLimit != 0 { + if dc.PeerInactivityLimit < time.Hour { + errs = append(errs, ValidationError{ + Path: "database.peer_inactivity_limit", + Message: fmt.Sprintf("must be >= 1h or 0 (for default); got %v", dc.PeerInactivityLimit), + Hint: "recommended: 24h", + }) + } else if dc.PeerInactivityLimit > 7*24*time.Hour { + errs = append(errs, ValidationError{ + Path: "database.peer_inactivity_limit", + Message: fmt.Sprintf("must be <= 7d; got %v", dc.PeerInactivityLimit), + Hint: "recommended: 24h", + }) + } + } + + // Validate min_cluster_size + if dc.MinClusterSize < 1 { + errs = append(errs, ValidationError{ + Path: "database.min_cluster_size", + Message: fmt.Sprintf("must be >= 1; got %d", dc.MinClusterSize), + }) + } + return errs } @@ -320,8 +364,14 @@ func (c *Config) validateDiscovery() []error { seenPeers[peer] = true } - // Validate http_adv_address - if disc.HttpAdvAddress != "" { + // Validate http_adv_address (required for cluster discovery) + if disc.HttpAdvAddress == "" { + errs = append(errs, ValidationError{ + Path: "discovery.http_adv_address", + Message: "required for RQLite cluster discovery", + Hint: "set to your public HTTP address (e.g., 51.83.128.181:5001)", + }) + } else { if err := validateHostOrHostPort(disc.HttpAdvAddress); err != nil { errs = append(errs, ValidationError{ Path: "discovery.http_adv_address", @@ -331,8 +381,14 @@ func (c *Config) validateDiscovery() []error { } } - // Validate raft_adv_address - if disc.RaftAdvAddress != "" { + // Validate raft_adv_address (required for cluster discovery) + if disc.RaftAdvAddress == "" { + errs = append(errs, ValidationError{ + Path: "discovery.raft_adv_address", + Message: "required for RQLite cluster discovery", + Hint: "set to your public Raft address (e.g., 51.83.128.181:7001)", + }) + } else { if err := validateHostOrHostPort(disc.RaftAdvAddress); err != nil { errs = append(errs, ValidationError{ Path: "discovery.raft_adv_address", diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index 442ebbf..19b5887 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -24,7 +24,8 @@ type PeerExchangeRequest struct { // PeerExchangeResponse represents a list of peers to exchange type PeerExchangeResponse struct { - Peers []PeerInfo `json:"peers"` + Peers []PeerInfo `json:"peers"` + RQLiteMetadata *RQLiteNodeMetadata `json:"rqlite_metadata,omitempty"` } // PeerInfo contains peer identity and addresses @@ -123,6 +124,16 @@ func (d *Manager) handlePeerExchangeStream(s network.Stream) { added++ } + // Add RQLite metadata if available + if val, err := d.host.Peerstore().Get(d.host.ID(), "rqlite_metadata"); err == nil { + if jsonData, ok := val.([]byte); ok { + var metadata RQLiteNodeMetadata + if err := json.Unmarshal(jsonData, &metadata); err == nil { + resp.RQLiteMetadata = &metadata + } + } + } + // Send response encoder := json.NewEncoder(s) if err := encoder.Encode(&resp); err != nil { @@ -131,7 +142,8 @@ func (d *Manager) handlePeerExchangeStream(s network.Stream) { } d.logger.Debug("Sent peer exchange response", - zap.Int("peer_count", len(resp.Peers))) + zap.Int("peer_count", len(resp.Peers)), + zap.Bool("has_rqlite_metadata", resp.RQLiteMetadata != nil)) } // Start begins periodic peer discovery @@ -363,6 +375,17 @@ func (d *Manager) requestPeersFromPeer(ctx context.Context, peerID peer.ID, limi return nil } + // Store remote peer's RQLite metadata if available + if resp.RQLiteMetadata != nil { + metadataJSON, err := json.Marshal(resp.RQLiteMetadata) + if err == nil { + _ = d.host.Peerstore().Put(peerID, "rqlite_metadata", metadataJSON) + d.logger.Debug("Stored RQLite metadata from peer", + zap.String("peer_id", peerID.String()[:8]+"..."), + zap.String("node_id", resp.RQLiteMetadata.NodeID)) + } + } + return resp.Peers } diff --git a/pkg/discovery/rqlite_metadata.go b/pkg/discovery/rqlite_metadata.go new file mode 100644 index 0000000..65a7048 --- /dev/null +++ b/pkg/discovery/rqlite_metadata.go @@ -0,0 +1,22 @@ +package discovery + +import ( + "time" +) + +// RQLiteNodeMetadata contains RQLite-specific information announced via LibP2P +type RQLiteNodeMetadata struct { + NodeID string `json:"node_id"` // RQLite node ID (from config) + RaftAddress string `json:"raft_address"` // Raft port address (e.g., "51.83.128.181:7001") + HTTPAddress string `json:"http_address"` // HTTP API address (e.g., "51.83.128.181:5001") + NodeType string `json:"node_type"` // "bootstrap" or "node" + RaftLogIndex uint64 `json:"raft_log_index"` // Current Raft log index (for data comparison) + LastSeen time.Time `json:"last_seen"` // Updated on every announcement + ClusterVersion string `json:"cluster_version"` // For compatibility checking +} + +// PeerExchangeResponseV2 extends the original response with RQLite metadata +type PeerExchangeResponseV2 struct { + Peers []PeerInfo `json:"peers"` + RQLiteMetadata *RQLiteNodeMetadata `json:"rqlite_metadata,omitempty"` +} diff --git a/pkg/node/monitoring.go b/pkg/node/monitoring.go index 866a4fd..bf9931f 100644 --- a/pkg/node/monitoring.go +++ b/pkg/node/monitoring.go @@ -91,12 +91,13 @@ func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory } msg := struct { - PeerID string `json:"peer_id"` - PeerCount int `json:"peer_count"` - PeerIDs []string `json:"peer_ids,omitempty"` - CPU uint64 `json:"cpu_usage"` - Memory uint64 `json:"memory_usage"` - Timestamp int64 `json:"timestamp"` + PeerID string `json:"peer_id"` + PeerCount int `json:"peer_count"` + PeerIDs []string `json:"peer_ids,omitempty"` + CPU uint64 `json:"cpu_usage"` + Memory uint64 `json:"memory_usage"` + Timestamp int64 `json:"timestamp"` + ClusterHealth map[string]interface{} `json:"cluster_health,omitempty"` }{ PeerID: n.host.ID().String(), PeerCount: len(peers), @@ -106,6 +107,20 @@ func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory Timestamp: time.Now().Unix(), } + // Add cluster health metrics if available + if n.clusterDiscovery != nil { + metrics := n.clusterDiscovery.GetMetrics() + msg.ClusterHealth = map[string]interface{}{ + "cluster_size": metrics.ClusterSize, + "active_nodes": metrics.ActiveNodes, + "inactive_nodes": metrics.InactiveNodes, + "discovery_status": metrics.DiscoveryStatus, + "current_leader": metrics.CurrentLeader, + "average_peer_health": metrics.AveragePeerHealth, + "last_update": metrics.LastUpdate.Format(time.RFC3339), + } + } + data, err := json.Marshal(msg) if err != nil { return err @@ -119,6 +134,50 @@ func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory return nil } +// GetClusterHealth returns cluster health information +func (n *Node) GetClusterHealth() map[string]interface{} { + if n.clusterDiscovery == nil { + return map[string]interface{}{ + "status": "not_initialized", + } + } + + metrics := n.clusterDiscovery.GetMetrics() + return map[string]interface{}{ + "cluster_size": metrics.ClusterSize, + "active_nodes": metrics.ActiveNodes, + "inactive_nodes": metrics.InactiveNodes, + "discovery_status": metrics.DiscoveryStatus, + "current_leader": metrics.CurrentLeader, + "average_peer_health": metrics.AveragePeerHealth, + "last_update": metrics.LastUpdate, + } +} + +// GetDiscoveryStatus returns discovery service status +func (n *Node) GetDiscoveryStatus() map[string]interface{} { + if n.clusterDiscovery == nil { + return map[string]interface{}{ + "status": "disabled", + "message": "cluster discovery not initialized", + } + } + + metrics := n.clusterDiscovery.GetMetrics() + status := "healthy" + if metrics.DiscoveryStatus == "no_peers" { + status = "warning" + } else if metrics.DiscoveryStatus == "degraded" { + status = "degraded" + } + + return map[string]interface{}{ + "status": status, + "cluster_size": metrics.ClusterSize, + "last_update": metrics.LastUpdate, + } +} + // startConnectionMonitoring starts minimal connection monitoring for the lightweight client. // Unlike nodes which need extensive monitoring, clients only need basic health checks. func (n *Node) startConnectionMonitoring() { diff --git a/pkg/node/node.go b/pkg/node/node.go index 988ab1d..852d768 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -33,8 +33,9 @@ type Node struct { logger *logging.ColoredLogger host host.Host - rqliteManager *database.RQLiteManager - rqliteAdapter *database.RQLiteAdapter + rqliteManager *database.RQLiteManager + rqliteAdapter *database.RQLiteAdapter + clusterDiscovery *database.ClusterDiscoveryService // Peer discovery bootstrapCancel context.CancelFunc @@ -67,6 +68,41 @@ func (n *Node) startRQLite(ctx context.Context) error { // Create RQLite manager n.rqliteManager = database.NewRQLiteManager(&n.config.Database, &n.config.Discovery, n.config.Node.DataDir, n.logger.Logger) + // Initialize cluster discovery service if LibP2P host is available + if n.host != nil && n.discoveryManager != nil { + // Determine node type + nodeType := "node" + if n.config.Node.Type == "bootstrap" { + nodeType = "bootstrap" + } + + // Create cluster discovery service + n.clusterDiscovery = database.NewClusterDiscoveryService( + n.host, + n.discoveryManager, + n.rqliteManager, + n.config.Node.ID, + nodeType, + n.config.Discovery.RaftAdvAddress, + n.config.Discovery.HttpAdvAddress, + n.config.Node.DataDir, + n.logger.Logger, + ) + + // Set discovery service on RQLite manager + n.rqliteManager.SetDiscoveryService(n.clusterDiscovery) + + // Start cluster discovery + if err := n.clusterDiscovery.Start(ctx); err != nil { + return fmt.Errorf("failed to start cluster discovery: %w", err) + } + + // Update our own metadata + n.clusterDiscovery.UpdateOwnMetadata() + + n.logger.Info("Cluster discovery service started") + } + // Start RQLite if err := n.rqliteManager.Start(ctx); err != nil { return err @@ -532,6 +568,11 @@ func (n *Node) stopPeerDiscovery() { func (n *Node) Stop() error { n.logger.ComponentInfo(logging.ComponentNode, "Stopping network node") + // Stop cluster discovery + if n.clusterDiscovery != nil { + n.clusterDiscovery.Stop() + } + // Stop bootstrap reconnection loop if n.bootstrapCancel != nil { n.bootstrapCancel() @@ -577,16 +618,16 @@ func (n *Node) Start(ctx context.Context) error { return fmt.Errorf("failed to create data directory: %w", err) } - // Start RQLite - if err := n.startRQLite(ctx); err != nil { - return fmt.Errorf("failed to start RQLite: %w", err) - } - - // Start LibP2P host + // Start LibP2P host first (needed for cluster discovery) if err := n.startLibP2P(); err != nil { return fmt.Errorf("failed to start LibP2P: %w", err) } + // Start RQLite with cluster discovery + if err := n.startRQLite(ctx); err != nil { + return fmt.Errorf("failed to start RQLite: %w", err) + } + // Get listen addresses for logging var listenAddrs []string for _, addr := range n.host.Addrs() { diff --git a/pkg/rqlite/cluster_discovery.go b/pkg/rqlite/cluster_discovery.go new file mode 100644 index 0000000..e386beb --- /dev/null +++ b/pkg/rqlite/cluster_discovery.go @@ -0,0 +1,592 @@ +package rqlite + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/DeBrosOfficial/network/pkg/discovery" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "go.uber.org/zap" +) + +// ClusterDiscoveryService bridges LibP2P discovery with RQLite cluster management +type ClusterDiscoveryService struct { + host host.Host + discoveryMgr *discovery.Manager + rqliteManager *RQLiteManager + nodeID string + nodeType string + raftAddress string + httpAddress string + dataDir string + + knownPeers map[string]*discovery.RQLiteNodeMetadata // NodeID -> Metadata + peerHealth map[string]*PeerHealth // NodeID -> Health + lastUpdate time.Time + updateInterval time.Duration // 30 seconds + inactivityLimit time.Duration // 24 hours + + logger *zap.Logger + mu sync.RWMutex + cancel context.CancelFunc + started bool +} + +// NewClusterDiscoveryService creates a new cluster discovery service +func NewClusterDiscoveryService( + h host.Host, + discoveryMgr *discovery.Manager, + rqliteManager *RQLiteManager, + nodeID string, + nodeType string, + raftAddress string, + httpAddress string, + dataDir string, + logger *zap.Logger, +) *ClusterDiscoveryService { + return &ClusterDiscoveryService{ + host: h, + discoveryMgr: discoveryMgr, + rqliteManager: rqliteManager, + nodeID: nodeID, + nodeType: nodeType, + raftAddress: raftAddress, + httpAddress: httpAddress, + dataDir: dataDir, + knownPeers: make(map[string]*discovery.RQLiteNodeMetadata), + peerHealth: make(map[string]*PeerHealth), + updateInterval: 30 * time.Second, + inactivityLimit: 24 * time.Hour, + logger: logger, + } +} + +// Start begins the cluster discovery service +func (c *ClusterDiscoveryService) Start(ctx context.Context) error { + c.mu.Lock() + if c.started { + c.mu.Unlock() + return fmt.Errorf("cluster discovery already started") + } + c.started = true + c.mu.Unlock() + + ctx, cancel := context.WithCancel(ctx) + c.cancel = cancel + + c.logger.Info("Starting cluster discovery service", + zap.String("node_id", c.nodeID), + zap.String("node_type", c.nodeType), + zap.String("raft_address", c.raftAddress), + zap.String("http_address", c.httpAddress), + zap.String("data_dir", c.dataDir), + zap.Duration("update_interval", c.updateInterval), + zap.Duration("inactivity_limit", c.inactivityLimit)) + + // Start periodic sync in background + go c.periodicSync(ctx) + + // Start periodic cleanup in background + go c.periodicCleanup(ctx) + + c.logger.Info("Cluster discovery goroutines started") + + return nil +} + +// Stop stops the cluster discovery service +func (c *ClusterDiscoveryService) Stop() { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.started { + return + } + + if c.cancel != nil { + c.cancel() + } + c.started = false + + c.logger.Info("Cluster discovery service stopped") +} + +// periodicSync runs periodic cluster membership synchronization +func (c *ClusterDiscoveryService) periodicSync(ctx context.Context) { + c.logger.Info("periodicSync goroutine started, doing initial sync immediately") + + ticker := time.NewTicker(c.updateInterval) + defer ticker.Stop() + + // Do initial sync immediately + c.logger.Info("Running initial cluster membership sync") + c.updateClusterMembership() + c.logger.Info("Initial cluster membership sync completed") + + for { + select { + case <-ctx.Done(): + c.logger.Info("periodicSync goroutine stopping") + return + case <-ticker.C: + c.logger.Debug("Running periodic cluster membership sync") + c.updateClusterMembership() + } + } +} + +// periodicCleanup runs periodic cleanup of inactive nodes +func (c *ClusterDiscoveryService) periodicCleanup(ctx context.Context) { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.removeInactivePeers() + } + } +} + +// collectPeerMetadata collects RQLite metadata from LibP2P peers +func (c *ClusterDiscoveryService) collectPeerMetadata() []*discovery.RQLiteNodeMetadata { + connectedPeers := c.host.Network().Peers() + var metadata []*discovery.RQLiteNodeMetadata + + c.logger.Debug("Collecting peer metadata from LibP2P", + zap.Int("connected_libp2p_peers", len(connectedPeers))) + + // Add ourselves + ourMetadata := &discovery.RQLiteNodeMetadata{ + NodeID: c.nodeID, + RaftAddress: c.raftAddress, + HTTPAddress: c.httpAddress, + NodeType: c.nodeType, + RaftLogIndex: c.rqliteManager.getRaftLogIndex(), + LastSeen: time.Now(), + ClusterVersion: "1.0", + } + metadata = append(metadata, ourMetadata) + + // Query connected peers for their RQLite metadata + // For now, we'll use a simple approach - store metadata in peer metadata store + // In a full implementation, this would use a custom protocol to exchange RQLite metadata + for _, peerID := range connectedPeers { + // Try to get stored metadata from peerstore + // This would be populated by a peer exchange protocol + if val, err := c.host.Peerstore().Get(peerID, "rqlite_metadata"); err == nil { + if jsonData, ok := val.([]byte); ok { + var peerMeta discovery.RQLiteNodeMetadata + if err := json.Unmarshal(jsonData, &peerMeta); err == nil { + peerMeta.LastSeen = time.Now() + metadata = append(metadata, &peerMeta) + } + } + } + } + + return metadata +} + +// updateClusterMembership updates the cluster membership based on discovered peers +func (c *ClusterDiscoveryService) updateClusterMembership() { + metadata := c.collectPeerMetadata() + + c.logger.Debug("Collected peer metadata", + zap.Int("metadata_count", len(metadata))) + + c.mu.Lock() + defer c.mu.Unlock() + + // Track changes + added := []string{} + updated := []string{} + + // Update known peers + for _, meta := range metadata { + if existing, ok := c.knownPeers[meta.NodeID]; ok { + // Update existing peer + if existing.RaftLogIndex != meta.RaftLogIndex || + existing.HTTPAddress != meta.HTTPAddress || + existing.RaftAddress != meta.RaftAddress { + updated = append(updated, meta.NodeID) + } + } else { + // New peer discovered + added = append(added, meta.NodeID) + c.logger.Info("Node added to cluster", + zap.String("node_id", meta.NodeID), + zap.String("raft_address", meta.RaftAddress), + zap.String("node_type", meta.NodeType), + zap.Uint64("log_index", meta.RaftLogIndex)) + } + + c.knownPeers[meta.NodeID] = meta + + // Update health tracking + if _, ok := c.peerHealth[meta.NodeID]; !ok { + c.peerHealth[meta.NodeID] = &PeerHealth{ + LastSeen: time.Now(), + LastSuccessful: time.Now(), + Status: "active", + } + } else { + c.peerHealth[meta.NodeID].LastSeen = time.Now() + c.peerHealth[meta.NodeID].Status = "active" + c.peerHealth[meta.NodeID].FailureCount = 0 + } + } + + // Generate and write peers.json if there are changes OR if this is the first time + shouldWrite := len(added) > 0 || len(updated) > 0 || c.lastUpdate.IsZero() + + if shouldWrite { + c.logger.Info("Updating peers.json", + zap.Int("added", len(added)), + zap.Int("updated", len(updated)), + zap.Int("total_peers", len(c.knownPeers)), + zap.Bool("first_run", c.lastUpdate.IsZero())) + + // Get peers JSON while holding the lock + peers := c.getPeersJSONUnlocked() + + // Release lock before file I/O + c.mu.Unlock() + + // Write without holding lock + if err := c.writePeersJSONWithData(peers); err != nil { + c.logger.Error("CRITICAL: Failed to write peers.json", + zap.Error(err), + zap.String("data_dir", c.dataDir), + zap.Int("peer_count", len(peers))) + } else { + c.logger.Info("Successfully wrote peers.json", + zap.Int("peer_count", len(peers))) + } + + // Re-acquire lock to update lastUpdate + c.mu.Lock() + } else { + c.logger.Debug("No changes to cluster membership", + zap.Int("total_peers", len(c.knownPeers))) + } + + c.lastUpdate = time.Now() +} + +// removeInactivePeers removes peers that haven't been seen for longer than the inactivity limit +func (c *ClusterDiscoveryService) removeInactivePeers() { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + removed := []string{} + + for nodeID, health := range c.peerHealth { + inactiveDuration := now.Sub(health.LastSeen) + + if inactiveDuration > c.inactivityLimit { + // Mark as inactive and remove + c.logger.Warn("Node removed from cluster", + zap.String("node_id", nodeID), + zap.String("reason", "inactive"), + zap.Duration("inactive_duration", inactiveDuration)) + + delete(c.knownPeers, nodeID) + delete(c.peerHealth, nodeID) + removed = append(removed, nodeID) + } + } + + // Regenerate peers.json if any peers were removed + if len(removed) > 0 { + c.logger.Info("Removed inactive nodes, regenerating peers.json", + zap.Int("removed", len(removed)), + zap.Strings("node_ids", removed)) + + if err := c.writePeersJSON(); err != nil { + c.logger.Error("Failed to write peers.json after cleanup", zap.Error(err)) + } + } +} + +// getPeersJSON generates the peers.json structure from active peers (acquires lock) +func (c *ClusterDiscoveryService) getPeersJSON() []map[string]interface{} { + c.mu.RLock() + defer c.mu.RUnlock() + return c.getPeersJSONUnlocked() +} + +// getPeersJSONUnlocked generates the peers.json structure (must be called with lock held) +func (c *ClusterDiscoveryService) getPeersJSONUnlocked() []map[string]interface{} { + peers := make([]map[string]interface{}, 0, len(c.knownPeers)) + + for _, peer := range c.knownPeers { + peerEntry := map[string]interface{}{ + "id": peer.NodeID, + "address": peer.RaftAddress, + "non_voter": false, + } + peers = append(peers, peerEntry) + } + + return peers +} + +// writePeersJSON atomically writes the peers.json file (acquires lock) +func (c *ClusterDiscoveryService) writePeersJSON() error { + c.mu.RLock() + peers := c.getPeersJSONUnlocked() + c.mu.RUnlock() + + return c.writePeersJSONWithData(peers) +} + +// writePeersJSONWithData writes the peers.json file with provided data (no lock needed) +func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]interface{}) error { + c.logger.Info("writePeersJSON: Starting", + zap.String("data_dir", c.dataDir)) + + c.logger.Info("writePeersJSON: Got peers JSON", + zap.Int("peer_count", len(peers))) + + // Expand ~ in data directory path + dataDir := os.ExpandEnv(c.dataDir) + if strings.HasPrefix(dataDir, "~") { + home, err := os.UserHomeDir() + if err != nil { + return fmt.Errorf("failed to determine home directory: %w", err) + } + dataDir = filepath.Join(home, dataDir[1:]) + } + + c.logger.Info("writePeersJSON: Expanded data dir", + zap.String("expanded_path", dataDir)) + + // Get the RQLite raft directory + rqliteDir := filepath.Join(dataDir, "rqlite", "raft") + + c.logger.Info("writePeersJSON: Creating raft directory", + zap.String("raft_dir", rqliteDir)) + + if err := os.MkdirAll(rqliteDir, 0755); err != nil { + return fmt.Errorf("failed to create raft directory %s: %w", rqliteDir, err) + } + + peersFile := filepath.Join(rqliteDir, "peers.json") + backupFile := filepath.Join(rqliteDir, "peers.json.backup") + + c.logger.Info("writePeersJSON: File paths", + zap.String("peers_file", peersFile), + zap.String("backup_file", backupFile)) + + // Backup existing peers.json if it exists + if _, err := os.Stat(peersFile); err == nil { + c.logger.Info("writePeersJSON: Backing up existing peers.json") + data, err := os.ReadFile(peersFile) + if err == nil { + _ = os.WriteFile(backupFile, data, 0644) + } + } + + // Marshal to JSON + c.logger.Info("writePeersJSON: Marshaling to JSON") + data, err := json.MarshalIndent(peers, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal peers.json: %w", err) + } + + c.logger.Info("writePeersJSON: JSON marshaled", + zap.Int("data_size", len(data))) + + // Write atomically using temp file + rename + tempFile := peersFile + ".tmp" + + c.logger.Info("writePeersJSON: Writing temp file", + zap.String("temp_file", tempFile)) + + if err := os.WriteFile(tempFile, data, 0644); err != nil { + return fmt.Errorf("failed to write temp peers.json %s: %w", tempFile, err) + } + + c.logger.Info("writePeersJSON: Renaming temp file to final") + + if err := os.Rename(tempFile, peersFile); err != nil { + return fmt.Errorf("failed to rename %s to %s: %w", tempFile, peersFile, err) + } + + nodeIDs := make([]string, 0, len(peers)) + for _, p := range peers { + if id, ok := p["id"].(string); ok { + nodeIDs = append(nodeIDs, id) + } + } + + c.logger.Info("peers.json successfully written!", + zap.String("file", peersFile), + zap.Int("node_count", len(peers)), + zap.Strings("node_ids", nodeIDs)) + + return nil +} + +// GetActivePeers returns a list of active peers (not including self) +func (c *ClusterDiscoveryService) GetActivePeers() []*discovery.RQLiteNodeMetadata { + c.mu.RLock() + defer c.mu.RUnlock() + + peers := make([]*discovery.RQLiteNodeMetadata, 0, len(c.knownPeers)) + for _, peer := range c.knownPeers { + // Skip self + if peer.NodeID == c.nodeID { + continue + } + peers = append(peers, peer) + } + + return peers +} + +// GetNodeWithHighestLogIndex returns the node with the highest Raft log index +func (c *ClusterDiscoveryService) GetNodeWithHighestLogIndex() *discovery.RQLiteNodeMetadata { + c.mu.RLock() + defer c.mu.RUnlock() + + var highest *discovery.RQLiteNodeMetadata + var maxIndex uint64 = 0 + + for _, peer := range c.knownPeers { + // Skip self + if peer.NodeID == c.nodeID { + continue + } + + if peer.RaftLogIndex > maxIndex { + maxIndex = peer.RaftLogIndex + highest = peer + } + } + + return highest +} + +// HasRecentPeersJSON checks if peers.json was recently updated +func (c *ClusterDiscoveryService) HasRecentPeersJSON() bool { + c.mu.RLock() + defer c.mu.RUnlock() + + // Consider recent if updated in last 5 minutes + return time.Since(c.lastUpdate) < 5*time.Minute +} + +// FindJoinTargets discovers join targets via LibP2P, prioritizing bootstrap nodes +func (c *ClusterDiscoveryService) FindJoinTargets() []string { + c.mu.RLock() + defer c.mu.RUnlock() + + targets := []string{} + + // Prioritize bootstrap nodes + for _, peer := range c.knownPeers { + if peer.NodeType == "bootstrap" { + targets = append(targets, peer.RaftAddress) + } + } + + // Add other nodes as fallback + for _, peer := range c.knownPeers { + if peer.NodeType != "bootstrap" { + targets = append(targets, peer.RaftAddress) + } + } + + return targets +} + +// WaitForDiscoverySettling waits for LibP2P discovery to settle (used on concurrent startup) +func (c *ClusterDiscoveryService) WaitForDiscoverySettling(ctx context.Context) { + settleDuration := 60 * time.Second + c.logger.Info("Waiting for discovery to settle", + zap.Duration("duration", settleDuration)) + + select { + case <-ctx.Done(): + return + case <-time.After(settleDuration): + } + + // Collect final peer list + c.updateClusterMembership() + + c.mu.RLock() + peerCount := len(c.knownPeers) + c.mu.RUnlock() + + c.logger.Info("Discovery settled", + zap.Int("peer_count", peerCount)) +} + +// UpdateOwnMetadata updates our own RQLite metadata in the peerstore +func (c *ClusterDiscoveryService) UpdateOwnMetadata() { + c.logger.Info("Updating own RQLite metadata for peer exchange", + zap.String("node_id", c.nodeID)) + + metadata := &discovery.RQLiteNodeMetadata{ + NodeID: c.nodeID, + RaftAddress: c.raftAddress, + HTTPAddress: c.httpAddress, + NodeType: c.nodeType, + RaftLogIndex: c.rqliteManager.getRaftLogIndex(), + LastSeen: time.Now(), + ClusterVersion: "1.0", + } + + c.logger.Info("Created metadata struct", + zap.String("node_id", metadata.NodeID), + zap.String("raft_address", metadata.RaftAddress), + zap.String("http_address", metadata.HTTPAddress), + zap.Uint64("log_index", metadata.RaftLogIndex)) + + // Store in our own peerstore for peer exchange + data, err := json.Marshal(metadata) + if err != nil { + c.logger.Error("Failed to marshal own metadata", zap.Error(err)) + return + } + + if err := c.host.Peerstore().Put(c.host.ID(), "rqlite_metadata", data); err != nil { + c.logger.Error("Failed to store own metadata", zap.Error(err)) + return + } + + c.logger.Info("Successfully stored own RQLite metadata in peerstore", + zap.String("node_id", c.nodeID), + zap.Uint64("log_index", metadata.RaftLogIndex)) +} + +// StoreRemotePeerMetadata stores metadata received from a remote peer +func (c *ClusterDiscoveryService) StoreRemotePeerMetadata(peerID peer.ID, metadata *discovery.RQLiteNodeMetadata) error { + data, err := json.Marshal(metadata) + if err != nil { + return fmt.Errorf("failed to marshal metadata: %w", err) + } + + if err := c.host.Peerstore().Put(peerID, "rqlite_metadata", data); err != nil { + return fmt.Errorf("failed to store metadata: %w", err) + } + + c.logger.Debug("Stored remote peer metadata", + zap.String("peer_id", peerID.String()[:8]+"..."), + zap.String("node_id", metadata.NodeID)) + + return nil +} diff --git a/pkg/rqlite/data_safety.go b/pkg/rqlite/data_safety.go new file mode 100644 index 0000000..de2ad54 --- /dev/null +++ b/pkg/rqlite/data_safety.go @@ -0,0 +1,109 @@ +package rqlite + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "go.uber.org/zap" +) + +// getRaftLogIndex returns the current Raft log index for this node +func (r *RQLiteManager) getRaftLogIndex() uint64 { + status, err := r.getRQLiteStatus() + if err != nil { + r.logger.Debug("Failed to get Raft log index", zap.Error(err)) + return 0 + } + + // Return the highest index we have + maxIndex := status.Store.Raft.LastLogIndex + if status.Store.Raft.AppliedIndex > maxIndex { + maxIndex = status.Store.Raft.AppliedIndex + } + if status.Store.Raft.CommitIndex > maxIndex { + maxIndex = status.Store.Raft.CommitIndex + } + + return maxIndex +} + +// getRQLiteStatus queries the /status endpoint for cluster information +func (r *RQLiteManager) getRQLiteStatus() (*RQLiteStatus, error) { + url := fmt.Sprintf("http://localhost:%d/status", r.config.RQLitePort) + client := &http.Client{Timeout: 5 * time.Second} + + resp, err := client.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to query status: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("status endpoint returned %d: %s", resp.StatusCode, string(body)) + } + + var status RQLiteStatus + if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { + return nil, fmt.Errorf("failed to decode status: %w", err) + } + + return &status, nil +} + +// getRQLiteNodes queries the /nodes endpoint for cluster membership +func (r *RQLiteManager) getRQLiteNodes() (RQLiteNodes, error) { + url := fmt.Sprintf("http://localhost:%d/nodes?ver=2", r.config.RQLitePort) + client := &http.Client{Timeout: 5 * time.Second} + + resp, err := client.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to query nodes: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("nodes endpoint returned %d: %s", resp.StatusCode, string(body)) + } + + var nodes RQLiteNodes + if err := json.NewDecoder(resp.Body).Decode(&nodes); err != nil { + return nil, fmt.Errorf("failed to decode nodes: %w", err) + } + + return nodes, nil +} + +// getRQLiteLeader returns the current leader address +func (r *RQLiteManager) getRQLiteLeader() (string, error) { + status, err := r.getRQLiteStatus() + if err != nil { + return "", err + } + + leaderAddr := status.Store.Raft.LeaderAddr + if leaderAddr == "" { + return "", fmt.Errorf("no leader found") + } + + return leaderAddr, nil +} + +// isNodeReachable tests if a specific node is responding +func (r *RQLiteManager) isNodeReachable(httpAddress string) bool { + url := fmt.Sprintf("http://%s/status", httpAddress) + client := &http.Client{Timeout: 3 * time.Second} + + resp, err := client.Get(url) + if err != nil { + return false + } + defer resp.Body.Close() + + return resp.StatusCode == http.StatusOK +} + diff --git a/pkg/rqlite/metrics.go b/pkg/rqlite/metrics.go new file mode 100644 index 0000000..de21c5f --- /dev/null +++ b/pkg/rqlite/metrics.go @@ -0,0 +1,74 @@ +package rqlite + +import ( + "time" +) + +// GetMetrics returns current cluster metrics +func (c *ClusterDiscoveryService) GetMetrics() *ClusterMetrics { + c.mu.RLock() + defer c.mu.RUnlock() + + activeCount := 0 + inactiveCount := 0 + totalHealth := 0.0 + currentLeader := "" + + now := time.Now() + + for nodeID, health := range c.peerHealth { + if health.Status == "active" { + activeCount++ + + // Calculate health score (0-100) based on last seen + timeSinceLastSeen := now.Sub(health.LastSeen) + healthScore := 100.0 + if timeSinceLastSeen > time.Minute { + // Degrade health score based on time since last seen + healthScore = 100.0 - (float64(timeSinceLastSeen.Seconds()) / float64(c.inactivityLimit.Seconds()) * 100.0) + if healthScore < 0 { + healthScore = 0 + } + } + totalHealth += healthScore + } else { + inactiveCount++ + } + + // Try to determine leader + if peer, ok := c.knownPeers[nodeID]; ok { + // We'd need to check the actual leader status from RQLite + // For now, bootstrap nodes are more likely to be leader + if peer.NodeType == "bootstrap" && currentLeader == "" { + currentLeader = nodeID + } + } + } + + averageHealth := 0.0 + if activeCount > 0 { + averageHealth = totalHealth / float64(activeCount) + } + + // Determine discovery status + discoveryStatus := "healthy" + if len(c.knownPeers) == 0 { + discoveryStatus = "no_peers" + } else if len(c.knownPeers) == 1 { + discoveryStatus = "single_node" + } else if averageHealth < 50 { + discoveryStatus = "degraded" + } + + return &ClusterMetrics{ + ClusterSize: len(c.knownPeers), + ActiveNodes: activeCount, + InactiveNodes: inactiveCount, + RemovedNodes: 0, // Could track this with a counter + LastUpdate: c.lastUpdate, + DiscoveryStatus: discoveryStatus, + CurrentLeader: currentLeader, + AveragePeerHealth: averageHealth, + } +} + diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index 3498bc2..e179ccd 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -20,12 +20,13 @@ import ( // RQLiteManager manages an RQLite node instance type RQLiteManager struct { - config *config.DatabaseConfig - discoverConfig *config.DiscoveryConfig - dataDir string - logger *zap.Logger - cmd *exec.Cmd - connection *gorqlite.Connection + config *config.DatabaseConfig + discoverConfig *config.DiscoveryConfig + dataDir string + logger *zap.Logger + cmd *exec.Cmd + connection *gorqlite.Connection + discoveryService *ClusterDiscoveryService } // waitForSQLAvailable waits until a simple query succeeds, indicating a leader is known and queries can be served. @@ -67,6 +68,11 @@ func NewRQLiteManager(cfg *config.DatabaseConfig, discoveryCfg *config.Discovery } } +// SetDiscoveryService sets the cluster discovery service for this RQLite manager +func (r *RQLiteManager) SetDiscoveryService(service *ClusterDiscoveryService) { + r.discoveryService = service +} + // Start starts the RQLite node func (r *RQLiteManager) Start(ctx context.Context) error { // Expand ~ in data directory path @@ -162,26 +168,65 @@ func (r *RQLiteManager) Start(ctx context.Context) error { } r.connection = conn - // Leadership/SQL readiness gating - // - // Fresh bootstrap (no join, no prior state): wait for leadership so queries will work. - // Existing state or joiners: wait for SQL availability (leader known) before proceeding, - // so higher layers (storage) don't fail with 500 leader-not-found. - if r.config.RQLiteJoinAddress == "" && !r.hasExistingState(rqliteDataDir) { - if err := r.waitForLeadership(ctx); err != nil { - if r.cmd != nil && r.cmd.Process != nil { - _ = r.cmd.Process.Kill() + // Leadership/SQL readiness gating with dynamic discovery support + if r.config.RQLiteJoinAddress == "" { + // Bootstrap node logic with data safety checks + r.logger.Info("Bootstrap node: checking if safe to lead") + + // SAFETY: Check if we can safely become leader + canLead, err := r.canSafelyBecomeLeader() + if !canLead && err != nil { + r.logger.Warn("Not safe to become leader, attempting to join existing cluster", + zap.Error(err)) + + // Find node with highest log index and join it + if r.discoveryService != nil { + targetNode := r.discoveryService.GetNodeWithHighestLogIndex() + if targetNode != nil { + r.logger.Info("Joining node with higher data", + zap.String("target_node", targetNode.NodeID), + zap.String("raft_address", targetNode.RaftAddress), + zap.Uint64("their_index", targetNode.RaftLogIndex)) + return r.joinExistingCluster(ctx, targetNode.RaftAddress) + } + } + } + + // Safe to lead - attempt leadership + leadershipErr := r.waitForLeadership(ctx) + if leadershipErr == nil { + r.logger.Info("Bootstrap node successfully established leadership") + } else { + // Leadership failed - check if peers.json from discovery exists + if r.discoveryService != nil && r.discoveryService.HasRecentPeersJSON() { + r.logger.Info("Retrying leadership after discovery update") + leadershipErr = r.waitForLeadership(ctx) + } + + // Final fallback: SQL availability + if leadershipErr != nil { + r.logger.Warn("Leadership failed, trying SQL availability") + sqlCtx := ctx + if _, hasDeadline := ctx.Deadline(); !hasDeadline { + var cancel context.CancelFunc + sqlCtx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + } + if err := r.waitForSQLAvailable(sqlCtx); err != nil { + if r.cmd != nil && r.cmd.Process != nil { + _ = r.cmd.Process.Kill() + } + return fmt.Errorf("RQLite SQL not available: %w", err) + } } - return fmt.Errorf("RQLite failed to establish leadership: %w", err) } } else { + // Joining node logic r.logger.Info("Waiting for RQLite SQL availability (leader discovery)") - // For joining nodes, wait longer for SQL availability sqlCtx := ctx if _, hasDeadline := ctx.Deadline(); !hasDeadline { - // If no deadline in context, create one for SQL availability check var cancel context.CancelFunc - sqlCtx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) + sqlCtx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() } if err := r.waitForSQLAvailable(sqlCtx); err != nil { @@ -380,3 +425,80 @@ func (r *RQLiteManager) testJoinAddress(joinAddress string) error { r.logger.Info("Leader HTTP reachable", zap.String("status_url", statusURL)) return nil } + +// canSafelyBecomeLeader checks if this node can safely become leader without causing data loss +func (r *RQLiteManager) canSafelyBecomeLeader() (bool, error) { + // Get our current Raft log index + ourLogIndex := r.getRaftLogIndex() + + // If no discovery service, assume it's safe (backward compatibility) + if r.discoveryService == nil { + r.logger.Debug("No discovery service, assuming safe to lead") + return true, nil + } + + // Query discovery service for other nodes + otherNodes := r.discoveryService.GetActivePeers() + + if len(otherNodes) == 0 { + // No other nodes - safe to bootstrap + r.logger.Debug("No other nodes discovered, safe to lead", + zap.Uint64("our_log_index", ourLogIndex)) + return true, nil + } + + // Check if any other node has higher log index + for _, peer := range otherNodes { + if peer.RaftLogIndex > ourLogIndex { + // Other node has more data - we should join them + return false, fmt.Errorf( + "node %s has higher log index (%d > %d), should join as follower", + peer.NodeID, peer.RaftLogIndex, ourLogIndex) + } + } + + // We have most recent data or equal - safe to lead + r.logger.Info("Safe to lead - we have most recent data", + zap.Uint64("our_log_index", ourLogIndex), + zap.Int("other_nodes_checked", len(otherNodes))) + return true, nil +} + +// joinExistingCluster attempts to join an existing cluster as a follower +func (r *RQLiteManager) joinExistingCluster(ctx context.Context, raftAddress string) error { + r.logger.Info("Attempting to join existing cluster", + zap.String("target_raft_address", raftAddress)) + + // Wait for the target to be reachable + if err := r.waitForJoinTarget(ctx, raftAddress, 2*time.Minute); err != nil { + return fmt.Errorf("join target not reachable: %w", err) + } + + // Wait for SQL availability (the target should have a leader) + sqlCtx := ctx + if _, hasDeadline := ctx.Deadline(); !hasDeadline { + var cancel context.CancelFunc + sqlCtx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + } + + if err := r.waitForSQLAvailable(sqlCtx); err != nil { + return fmt.Errorf("failed to join cluster - SQL not available: %w", err) + } + + r.logger.Info("Successfully joined existing cluster") + return nil +} + +// exponentialBackoff calculates exponential backoff duration with jitter +func (r *RQLiteManager) exponentialBackoff(attempt int, baseDelay time.Duration, maxDelay time.Duration) time.Duration { + // Calculate exponential backoff: baseDelay * 2^attempt + delay := baseDelay * time.Duration(1< maxDelay { + delay = maxDelay + } + + // Add jitter (±20%) + jitter := time.Duration(float64(delay) * 0.2 * (2.0*float64(time.Now().UnixNano()%100)/100.0 - 1.0)) + return delay + jitter +} diff --git a/pkg/rqlite/types.go b/pkg/rqlite/types.go new file mode 100644 index 0000000..d817e74 --- /dev/null +++ b/pkg/rqlite/types.go @@ -0,0 +1,71 @@ +package rqlite + +import "time" + +// RQLiteStatus represents the response from RQLite's /status endpoint +type RQLiteStatus struct { + Store struct { + Raft struct { + AppliedIndex uint64 `json:"applied_index"` + CommitIndex uint64 `json:"commit_index"` + LastLogIndex uint64 `json:"last_log_index"` + LastSnapshotIndex uint64 `json:"last_snapshot_index"` + State string `json:"state"` + LeaderID string `json:"leader_id"` + LeaderAddr string `json:"leader_addr"` + } `json:"raft"` + DBConf struct { + DSN string `json:"dsn"` + Memory bool `json:"memory"` + } `json:"db_conf"` + } `json:"store"` + Runtime struct { + GOARCH string `json:"GOARCH"` + GOOS string `json:"GOOS"` + GOMAXPROCS int `json:"GOMAXPROCS"` + NumCPU int `json:"num_cpu"` + NumGoroutine int `json:"num_goroutine"` + Version string `json:"version"` + } `json:"runtime"` + HTTP struct { + Addr string `json:"addr"` + Auth string `json:"auth"` + } `json:"http"` + Node struct { + Uptime string `json:"uptime"` + StartTime string `json:"start_time"` + } `json:"node"` +} + +// RQLiteNode represents a node in the RQLite cluster +type RQLiteNode struct { + ID string `json:"id"` + Address string `json:"address"` + Leader bool `json:"leader"` + Voter bool `json:"voter"` + Reachable bool `json:"reachable"` +} + +// RQLiteNodes represents the response from RQLite's /nodes endpoint +type RQLiteNodes []RQLiteNode + +// PeerHealth tracks the health status of a peer +type PeerHealth struct { + LastSeen time.Time + LastSuccessful time.Time + FailureCount int + Status string // "active", "degraded", "inactive" +} + +// ClusterMetrics contains cluster-wide metrics +type ClusterMetrics struct { + ClusterSize int + ActiveNodes int + InactiveNodes int + RemovedNodes int + LastUpdate time.Time + DiscoveryStatus string + CurrentLeader string + AveragePeerHealth float64 +} +