From 8f82dc7ca3d530e2870aca6b4d774c2872c95e99 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 31 Oct 2025 11:41:20 +0200 Subject: [PATCH] feat: enhance RQLite cluster discovery and recovery mechanisms - Introduced TriggerPeerExchange method to facilitate manual peer exchange for RQLite metadata. - Implemented performPreStartClusterDiscovery to ensure coordinated recovery by building peers.json before RQLite startup. - Added validation for node ID consistency with raft address during RQLite startup. - Enhanced logging for cluster recovery processes and peer discovery progress. - Updated cluster synchronization logic to improve reliability during node recovery scenarios. --- pkg/discovery/discovery.go | 36 +++- pkg/node/node.go | 19 +- pkg/rqlite/cluster_discovery.go | 62 +++++-- pkg/rqlite/rqlite.go | 305 ++++++++++++++++++++++++++++++-- 4 files changed, 383 insertions(+), 39 deletions(-) diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index 19b5887..6d9470b 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -24,8 +24,8 @@ type PeerExchangeRequest struct { // PeerExchangeResponse represents a list of peers to exchange type PeerExchangeResponse struct { - Peers []PeerInfo `json:"peers"` - RQLiteMetadata *RQLiteNodeMetadata `json:"rqlite_metadata,omitempty"` + Peers []PeerInfo `json:"peers"` + RQLiteMetadata *RQLiteNodeMetadata `json:"rqlite_metadata,omitempty"` } // PeerInfo contains peer identity and addresses @@ -389,6 +389,38 @@ func (d *Manager) requestPeersFromPeer(ctx context.Context, peerID peer.ID, limi return resp.Peers } +// TriggerPeerExchange manually triggers peer exchange with all connected peers +// This is useful for pre-startup cluster discovery to populate the peerstore with RQLite metadata +func (d *Manager) TriggerPeerExchange(ctx context.Context) int { + connectedPeers := d.host.Network().Peers() + if len(connectedPeers) == 0 { + d.logger.Debug("No connected peers for peer exchange") + return 0 + } + + d.logger.Info("Manually triggering peer exchange", + zap.Int("connected_peers", len(connectedPeers))) + + metadataCollected := 0 + for _, peerID := range connectedPeers { + // Request peer list from this peer (which includes their RQLite metadata) + _ = d.requestPeersFromPeer(ctx, peerID, 50) // Request up to 50 peers + + // Check if we got RQLite metadata from this peer + if val, err := d.host.Peerstore().Get(peerID, "rqlite_metadata"); err == nil { + if _, ok := val.([]byte); ok { + metadataCollected++ + } + } + } + + d.logger.Info("Peer exchange completed", + zap.Int("peers_with_metadata", metadataCollected), + zap.Int("total_peers", len(connectedPeers))) + + return metadataCollected +} + // connectToPeer attempts to connect to a specific peer using its peerstore info. func (d *Manager) connectToPeer(ctx context.Context, peerID peer.ID) error { peerInfo := d.host.Peerstore().PeerInfo(peerID) diff --git a/pkg/node/node.go b/pkg/node/node.go index 852d768..7aa7824 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -89,25 +89,34 @@ func (n *Node) startRQLite(ctx context.Context) error { n.logger.Logger, ) - // Set discovery service on RQLite manager + // Set discovery service on RQLite manager BEFORE starting RQLite + // This is critical for pre-start cluster discovery during recovery n.rqliteManager.SetDiscoveryService(n.clusterDiscovery) - // Start cluster discovery + // Start cluster discovery (but don't trigger initial sync yet) if err := n.clusterDiscovery.Start(ctx); err != nil { return fmt.Errorf("failed to start cluster discovery: %w", err) } - // Update our own metadata + // Publish initial metadata (with log_index=0) so peers can discover us during recovery + // The metadata will be updated with actual log index after RQLite starts n.clusterDiscovery.UpdateOwnMetadata() - n.logger.Info("Cluster discovery service started") + n.logger.Info("Cluster discovery service started (waiting for RQLite)") } - // Start RQLite + // Start RQLite FIRST before updating metadata if err := n.rqliteManager.Start(ctx); err != nil { return err } + // NOW update metadata after RQLite is running + if n.clusterDiscovery != nil { + n.clusterDiscovery.UpdateOwnMetadata() + n.clusterDiscovery.TriggerSync() // Do initial cluster sync now that RQLite is ready + n.logger.Info("RQLite metadata published and cluster synced") + } + // Create adapter for sql.DB compatibility adapter, err := database.NewRQLiteAdapter(n.rqliteManager) if err != nil { diff --git a/pkg/rqlite/cluster_discovery.go b/pkg/rqlite/cluster_discovery.go index e386beb..7585cbf 100644 --- a/pkg/rqlite/cluster_discovery.go +++ b/pkg/rqlite/cluster_discovery.go @@ -120,16 +120,12 @@ func (c *ClusterDiscoveryService) Stop() { // periodicSync runs periodic cluster membership synchronization func (c *ClusterDiscoveryService) periodicSync(ctx context.Context) { - c.logger.Info("periodicSync goroutine started, doing initial sync immediately") + c.logger.Info("periodicSync goroutine started, waiting for RQLite readiness") ticker := time.NewTicker(c.updateInterval) defer ticker.Stop() - // Do initial sync immediately - c.logger.Info("Running initial cluster membership sync") - c.updateClusterMembership() - c.logger.Info("Initial cluster membership sync completed") - + // Wait for first ticker interval before syncing (RQLite needs time to start) for { select { case <-ctx.Done(): @@ -167,7 +163,7 @@ func (c *ClusterDiscoveryService) collectPeerMetadata() []*discovery.RQLiteNodeM // Add ourselves ourMetadata := &discovery.RQLiteNodeMetadata{ - NodeID: c.nodeID, + NodeID: c.raftAddress, // RQLite uses raft address as node ID RaftAddress: c.raftAddress, HTTPAddress: c.httpAddress, NodeType: c.nodeType, @@ -332,7 +328,7 @@ func (c *ClusterDiscoveryService) getPeersJSONUnlocked() []map[string]interface{ for _, peer := range c.knownPeers { peerEntry := map[string]interface{}{ - "id": peer.NodeID, + "id": peer.RaftAddress, // RQLite uses raft address as node ID "address": peer.RaftAddress, "non_voter": false, } @@ -446,8 +442,8 @@ func (c *ClusterDiscoveryService) GetActivePeers() []*discovery.RQLiteNodeMetada peers := make([]*discovery.RQLiteNodeMetadata, 0, len(c.knownPeers)) for _, peer := range c.knownPeers { - // Skip self - if peer.NodeID == c.nodeID { + // Skip self (compare by raft address since that's the NodeID now) + if peer.NodeID == c.raftAddress { continue } peers = append(peers, peer) @@ -456,6 +452,19 @@ func (c *ClusterDiscoveryService) GetActivePeers() []*discovery.RQLiteNodeMetada return peers } +// GetAllPeers returns a list of all known peers (including self) +func (c *ClusterDiscoveryService) GetAllPeers() []*discovery.RQLiteNodeMetadata { + c.mu.RLock() + defer c.mu.RUnlock() + + peers := make([]*discovery.RQLiteNodeMetadata, 0, len(c.knownPeers)) + for _, peer := range c.knownPeers { + peers = append(peers, peer) + } + + return peers +} + // GetNodeWithHighestLogIndex returns the node with the highest Raft log index func (c *ClusterDiscoveryService) GetNodeWithHighestLogIndex() *discovery.RQLiteNodeMetadata { c.mu.RLock() @@ -465,8 +474,8 @@ func (c *ClusterDiscoveryService) GetNodeWithHighestLogIndex() *discovery.RQLite var maxIndex uint64 = 0 for _, peer := range c.knownPeers { - // Skip self - if peer.NodeID == c.nodeID { + // Skip self (compare by raft address since that's the NodeID now) + if peer.NodeID == c.raftAddress { continue } @@ -535,13 +544,40 @@ func (c *ClusterDiscoveryService) WaitForDiscoverySettling(ctx context.Context) zap.Int("peer_count", peerCount)) } +// TriggerSync manually triggers a cluster membership sync +func (c *ClusterDiscoveryService) TriggerSync() { + c.logger.Info("Manually triggering cluster membership sync") + + // For bootstrap nodes, wait a bit for peer discovery to stabilize + if c.nodeType == "bootstrap" { + c.logger.Info("Bootstrap node: waiting for peer discovery to complete") + time.Sleep(5 * time.Second) + } + + c.updateClusterMembership() +} + +// TriggerPeerExchange actively exchanges peer information with connected peers +// This populates the peerstore with RQLite metadata from other nodes +func (c *ClusterDiscoveryService) TriggerPeerExchange(ctx context.Context) error { + if c.discoveryMgr == nil { + return fmt.Errorf("discovery manager not available") + } + + c.logger.Info("Triggering peer exchange via discovery manager") + collected := c.discoveryMgr.TriggerPeerExchange(ctx) + c.logger.Info("Peer exchange completed", zap.Int("peers_with_metadata", collected)) + + return nil +} + // UpdateOwnMetadata updates our own RQLite metadata in the peerstore func (c *ClusterDiscoveryService) UpdateOwnMetadata() { c.logger.Info("Updating own RQLite metadata for peer exchange", zap.String("node_id", c.nodeID)) metadata := &discovery.RQLiteNodeMetadata{ - NodeID: c.nodeID, + NodeID: c.raftAddress, // RQLite uses raft address as node ID RaftAddress: c.raftAddress, HTTPAddress: c.httpAddress, NodeType: c.nodeType, diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index e179ccd..d491fa0 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -95,6 +95,17 @@ func (r *RQLiteManager) Start(ctx context.Context) error { return fmt.Errorf("discovery config HttpAdvAddress is empty") } + // CRITICAL: Check if we need to do pre-start cluster discovery to build peers.json + // This handles the case where nodes have old cluster state and need coordinated recovery + if needsClusterRecovery, err := r.checkNeedsClusterRecovery(rqliteDataDir); err != nil { + return fmt.Errorf("failed to check cluster recovery status: %w", err) + } else if needsClusterRecovery { + r.logger.Info("Detected old cluster state requiring coordinated recovery") + if err := r.performPreStartClusterDiscovery(ctx, rqliteDataDir); err != nil { + return fmt.Errorf("pre-start cluster discovery failed: %w", err) + } + } + // Build RQLite command args := []string{ "-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort), @@ -123,7 +134,8 @@ func (r *RQLiteManager) Start(ctx context.Context) error { } // Always add the join parameter in host:port form - let rqlited handle the rest - args = append(args, "-join", joinArg) + // Add retry parameters to handle slow cluster startup (e.g., during recovery) + args = append(args, "-join", joinArg, "-join-attempts", "30", "-join-interval", "10s") } else { r.logger.Info("No join address specified - starting as new cluster") } @@ -168,17 +180,23 @@ func (r *RQLiteManager) Start(ctx context.Context) error { } r.connection = conn + // Sanity check: verify rqlite's node ID matches our configured raft address + if err := r.validateNodeID(); err != nil { + r.logger.Warn("Node ID validation failed", zap.Error(err)) + // Don't fail startup, but log the mismatch for debugging + } + // Leadership/SQL readiness gating with dynamic discovery support if r.config.RQLiteJoinAddress == "" { // Bootstrap node logic with data safety checks r.logger.Info("Bootstrap node: checking if safe to lead") - + // SAFETY: Check if we can safely become leader canLead, err := r.canSafelyBecomeLeader() if !canLead && err != nil { r.logger.Warn("Not safe to become leader, attempting to join existing cluster", zap.Error(err)) - + // Find node with highest log index and join it if r.discoveryService != nil { targetNode := r.discoveryService.GetNodeWithHighestLogIndex() @@ -191,18 +209,34 @@ func (r *RQLiteManager) Start(ctx context.Context) error { } } } - + // Safe to lead - attempt leadership leadershipErr := r.waitForLeadership(ctx) if leadershipErr == nil { r.logger.Info("Bootstrap node successfully established leadership") } else { - // Leadership failed - check if peers.json from discovery exists - if r.discoveryService != nil && r.discoveryService.HasRecentPeersJSON() { - r.logger.Info("Retrying leadership after discovery update") - leadershipErr = r.waitForLeadership(ctx) + r.logger.Warn("Initial leadership attempt failed, may need cluster recovery", + zap.Error(leadershipErr)) + + // Try recovery if we have peers.json from discovery + if r.discoveryService != nil { + peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json") + if _, err := os.Stat(peersPath); err == nil { + r.logger.Info("Attempting cluster recovery using peers.json", + zap.String("peers_file", peersPath)) + + if recoveryErr := r.recoverCluster(peersPath); recoveryErr == nil { + r.logger.Info("Cluster recovery successful, retrying leadership") + leadershipErr = r.waitForLeadership(ctx) + if leadershipErr == nil { + r.logger.Info("Bootstrap node established leadership after recovery") + } + } else { + r.logger.Warn("Cluster recovery failed", zap.Error(recoveryErr)) + } + } } - + // Final fallback: SQL availability if leadershipErr != nil { r.logger.Warn("Leadership failed, trying SQL availability") @@ -430,23 +464,23 @@ func (r *RQLiteManager) testJoinAddress(joinAddress string) error { func (r *RQLiteManager) canSafelyBecomeLeader() (bool, error) { // Get our current Raft log index ourLogIndex := r.getRaftLogIndex() - + // If no discovery service, assume it's safe (backward compatibility) if r.discoveryService == nil { r.logger.Debug("No discovery service, assuming safe to lead") return true, nil } - + // Query discovery service for other nodes otherNodes := r.discoveryService.GetActivePeers() - + if len(otherNodes) == 0 { // No other nodes - safe to bootstrap r.logger.Debug("No other nodes discovered, safe to lead", zap.Uint64("our_log_index", ourLogIndex)) return true, nil } - + // Check if any other node has higher log index for _, peer := range otherNodes { if peer.RaftLogIndex > ourLogIndex { @@ -456,7 +490,7 @@ func (r *RQLiteManager) canSafelyBecomeLeader() (bool, error) { peer.NodeID, peer.RaftLogIndex, ourLogIndex) } } - + // We have most recent data or equal - safe to lead r.logger.Info("Safe to lead - we have most recent data", zap.Uint64("our_log_index", ourLogIndex), @@ -468,12 +502,12 @@ func (r *RQLiteManager) canSafelyBecomeLeader() (bool, error) { func (r *RQLiteManager) joinExistingCluster(ctx context.Context, raftAddress string) error { r.logger.Info("Attempting to join existing cluster", zap.String("target_raft_address", raftAddress)) - + // Wait for the target to be reachable if err := r.waitForJoinTarget(ctx, raftAddress, 2*time.Minute); err != nil { return fmt.Errorf("join target not reachable: %w", err) } - + // Wait for SQL availability (the target should have a leader) sqlCtx := ctx if _, hasDeadline := ctx.Deadline(); !hasDeadline { @@ -481,11 +515,11 @@ func (r *RQLiteManager) joinExistingCluster(ctx context.Context, raftAddress str sqlCtx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() } - + if err := r.waitForSQLAvailable(sqlCtx); err != nil { return fmt.Errorf("failed to join cluster - SQL not available: %w", err) } - + r.logger.Info("Successfully joined existing cluster") return nil } @@ -497,8 +531,241 @@ func (r *RQLiteManager) exponentialBackoff(attempt int, baseDelay time.Duration, if delay > maxDelay { delay = maxDelay } - + // Add jitter (±20%) jitter := time.Duration(float64(delay) * 0.2 * (2.0*float64(time.Now().UnixNano()%100)/100.0 - 1.0)) return delay + jitter } + +// recoverCluster restarts RQLite using the recovery.db created from peers.json +func (r *RQLiteManager) recoverCluster(peersJSONPath string) error { + r.logger.Info("Initiating cluster recovery by restarting RQLite", + zap.String("peers_file", peersJSONPath)) + + // Stop the current RQLite process + r.logger.Info("Stopping RQLite for recovery") + if err := r.Stop(); err != nil { + r.logger.Warn("Error stopping RQLite", zap.Error(err)) + } + + // Wait for process to fully stop + time.Sleep(2 * time.Second) + + // Restart RQLite - it will automatically detect peers.json and perform recovery + r.logger.Info("Restarting RQLite (will auto-recover using peers.json)") + + // Build the same args as original Start() - expand ~ in data directory + dataDir := os.ExpandEnv(r.dataDir) + if strings.HasPrefix(dataDir, "~") { + home, err := os.UserHomeDir() + if err != nil { + return fmt.Errorf("failed to determine home directory: %w", err) + } + dataDir = filepath.Join(home, dataDir[1:]) + } + rqliteDataDir := filepath.Join(dataDir, "rqlite") + args := []string{ + "-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort), + "-http-adv-addr", r.discoverConfig.HttpAdvAddress, + "-raft-adv-addr", r.discoverConfig.RaftAdvAddress, + "-raft-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLiteRaftPort), + rqliteDataDir, + } + + // Restart RQLite + r.cmd = exec.Command("rqlited", args...) + r.cmd.Stdout = os.Stdout + r.cmd.Stderr = os.Stderr + + if err := r.cmd.Start(); err != nil { + return fmt.Errorf("failed to restart RQLite: %w", err) + } + + r.logger.Info("RQLite restarted, waiting for it to become ready") + time.Sleep(3 * time.Second) + + // Recreate connection + conn, err := gorqlite.Open(fmt.Sprintf("http://localhost:%d", r.config.RQLitePort)) + if err != nil { + return fmt.Errorf("failed to reconnect to RQLite: %w", err) + } + r.connection = conn + + r.logger.Info("Cluster recovery completed, RQLite restarted with new configuration") + return nil +} + +// checkNeedsClusterRecovery checks if the node has old cluster state that requires coordinated recovery +// Returns true if there are snapshots but the raft log is empty (typical after a crash/restart) +func (r *RQLiteManager) checkNeedsClusterRecovery(rqliteDataDir string) (bool, error) { + // Check for snapshots directory + snapshotsDir := filepath.Join(rqliteDataDir, "rsnapshots") + if _, err := os.Stat(snapshotsDir); os.IsNotExist(err) { + // No snapshots = fresh start, no recovery needed + return false, nil + } + + // Check if snapshots directory has any snapshots + entries, err := os.ReadDir(snapshotsDir) + if err != nil { + return false, fmt.Errorf("failed to read snapshots directory: %w", err) + } + + hasSnapshots := false + for _, entry := range entries { + if entry.IsDir() || strings.HasSuffix(entry.Name(), ".db") { + hasSnapshots = true + break + } + } + + if !hasSnapshots { + // No snapshots = fresh start + return false, nil + } + + // Check raft log size - if it's the default empty size, we need recovery + raftLogPath := filepath.Join(rqliteDataDir, "raft.db") + if info, err := os.Stat(raftLogPath); err == nil { + // Empty or default-sized log with snapshots means we need coordinated recovery + if info.Size() <= 8*1024*1024 { // <= 8MB (default empty log size) + r.logger.Info("Detected cluster recovery situation: snapshots exist but raft log is empty/default size", + zap.String("snapshots_dir", snapshotsDir), + zap.Int64("raft_log_size", info.Size())) + return true, nil + } + } + + return false, nil +} + +// performPreStartClusterDiscovery waits for peer discovery and builds a complete peers.json +// before starting RQLite. This ensures all nodes use the same cluster membership for recovery. +func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rqliteDataDir string) error { + if r.discoveryService == nil { + r.logger.Warn("No discovery service available, cannot perform pre-start cluster discovery") + return fmt.Errorf("discovery service not available") + } + + r.logger.Info("Waiting for peer discovery to find other cluster members...") + + // CRITICAL: First, actively trigger peer exchange to populate peerstore with RQLite metadata + // The peerstore needs RQLite metadata from other nodes BEFORE we can collect it + r.logger.Info("Triggering peer exchange to collect RQLite metadata from connected peers") + if err := r.discoveryService.TriggerPeerExchange(ctx); err != nil { + r.logger.Warn("Peer exchange failed, continuing anyway", zap.Error(err)) + } + + // Give peer exchange a moment to complete + time.Sleep(1 * time.Second) + + // Now trigger cluster membership sync to populate knownPeers map from the peerstore + r.logger.Info("Triggering initial cluster membership sync to populate peer list") + r.discoveryService.TriggerSync() + + // Give the sync a moment to complete + time.Sleep(2 * time.Second) + + // Wait for peer discovery - give it time to find peers (30 seconds should be enough) + discoveryDeadline := time.Now().Add(30 * time.Second) + var discoveredPeers int + + for time.Now().Before(discoveryDeadline) { + // Check how many peers with RQLite metadata we've discovered + allPeers := r.discoveryService.GetAllPeers() + discoveredPeers = len(allPeers) + + r.logger.Info("Peer discovery progress", + zap.Int("discovered_peers", discoveredPeers), + zap.Duration("time_remaining", time.Until(discoveryDeadline))) + + // If we have at least our minimum cluster size, proceed + if discoveredPeers >= r.config.MinClusterSize { + r.logger.Info("Found minimum cluster size peers, proceeding with recovery", + zap.Int("discovered_peers", discoveredPeers), + zap.Int("min_cluster_size", r.config.MinClusterSize)) + break + } + + // Wait a bit before checking again + time.Sleep(2 * time.Second) + } + + if discoveredPeers == 0 { + r.logger.Warn("No peers discovered during pre-start discovery window, will attempt solo recovery") + // Continue anyway - might be the only node left + } + + // Trigger final sync to ensure peers.json is up to date with latest discovered peers + r.logger.Info("Triggering final cluster membership sync to build complete peers.json") + r.discoveryService.TriggerSync() + + // Wait a moment for the sync to complete + time.Sleep(2 * time.Second) + + // Verify peers.json was created + peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json") + if _, err := os.Stat(peersPath); err != nil { + return fmt.Errorf("peers.json was not created after discovery: %w", err) + } + + r.logger.Info("Pre-start cluster discovery completed successfully", + zap.String("peers_file", peersPath), + zap.Int("peer_count", discoveredPeers)) + + return nil +} + +// validateNodeID checks that rqlite's reported node ID matches our configured raft address +func (r *RQLiteManager) validateNodeID() error { + // Query /nodes endpoint to get our node ID + // Retry a few times as the endpoint might not be ready immediately + for i := 0; i < 5; i++ { + nodes, err := r.getRQLiteNodes() + if err != nil { + // If endpoint is not ready yet, wait and retry + if i < 4 { + time.Sleep(500 * time.Millisecond) + continue + } + return fmt.Errorf("failed to query nodes endpoint after retries: %w", err) + } + + expectedID := r.discoverConfig.RaftAdvAddress + if expectedID == "" { + return fmt.Errorf("raft_adv_address not configured") + } + + // Find our node in the cluster (match by address) + for _, node := range nodes { + if node.Address == expectedID { + if node.ID != expectedID { + r.logger.Error("CRITICAL: RQLite node ID mismatch", + zap.String("configured_raft_address", expectedID), + zap.String("rqlite_node_id", node.ID), + zap.String("rqlite_node_address", node.Address), + zap.String("explanation", "peers.json id field must match rqlite's node ID (raft address)")) + return fmt.Errorf("node ID mismatch: configured %s but rqlite reports %s", expectedID, node.ID) + } + r.logger.Info("Node ID validation passed", + zap.String("node_id", node.ID), + zap.String("address", node.Address)) + return nil + } + } + + // If cluster is still forming, nodes list might be empty - that's okay + if len(nodes) == 0 { + r.logger.Debug("Cluster membership not yet available, skipping validation") + return nil + } + + // If we can't find ourselves but other nodes exist, log a warning + r.logger.Warn("Could not find our node in cluster membership", + zap.String("expected_address", expectedID), + zap.Int("nodes_in_cluster", len(nodes))) + return nil + } + + return nil +}