From 2aead480459aad3ff9b3feed38e8352e4dec0334 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 31 Oct 2025 11:53:50 +0200 Subject: [PATCH] feat: enhance RQLite manager with improved logging and data directory management - Added structured logging for RQLite components, including cluster discovery and leadership processes. - Introduced methods for preparing the data directory and launching the RQLite process, improving code organization. - Implemented exponential backoff for leadership checks to reduce log noise and improve reliability. - Enhanced peer health tracking and membership update logic to streamline cluster synchronization and recovery. --- CHANGELOG.md | 17 +++ Makefile | 2 +- pkg/discovery/discovery.go | 27 ++-- pkg/rqlite/cluster_discovery.go | 178 ++++++++++++++------------- pkg/rqlite/data_safety.go | 45 ++++--- pkg/rqlite/rqlite.go | 211 ++++++++++++++++++++------------ 6 files changed, 293 insertions(+), 187 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index da5b7ae..bc44467 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,23 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant ### Fixed +## [0.53.0] - 2025-10-31 + +### Added + +- Discovery manager now tracks failed peer-exchange attempts to suppress repeated warnings while peers negotiate supported protocols. + +### Changed + +- Scoped logging throughout `cluster_discovery`, `rqlite`, and `discovery` packages so logs carry component tags and keep verbose output at debug level. +- Refactored `ClusterDiscoveryService` membership handling: metadata updates happen under lock, `peers.json` is written outside the lock, self-health is skipped, and change detection is centralized in `computeMembershipChangesLocked`. +- Reworked `RQLiteManager.Start` into helper functions (`prepareDataDir`, `launchProcess`, `waitForReadyAndConnect`, `establishLeadershipOrJoin`) with clearer logging, better error handling, and exponential backoff while waiting for leadership. +- `validateNodeID` now treats empty membership results as transitional states, logging at debug level instead of warning to avoid noisy startups. + +### Fixed + +- Eliminated spurious `peers.json` churn and node-ID mismatch warnings during cluster formation by aligning IDs with raft addresses and tightening discovery logging. + ## [0.52.15] ### Added diff --git a/Makefile b/Makefile index 5240288..9e9bf5d 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.52.21 +VERSION := 0.53.0 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index 6d9470b..d022828 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -39,9 +39,10 @@ type PeerInfo struct { // interface{} to remain source-compatible with previous call sites that // passed a DHT instance. The value is ignored. type Manager struct { - host host.Host - logger *zap.Logger - cancel context.CancelFunc + host host.Host + logger *zap.Logger + cancel context.CancelFunc + failedPeerExchanges map[peer.ID]time.Time // Track failed peer exchange attempts to suppress repeated warnings } // Config contains discovery configuration @@ -56,8 +57,10 @@ type Config struct { // previously passed a DHT instance can continue to do so; the value is ignored. func NewManager(h host.Host, _ interface{}, logger *zap.Logger) *Manager { return &Manager{ - host: h, - logger: logger, + host: h, + logger: logger.With(zap.String("component", "peer-discovery")), + cancel: nil, + failedPeerExchanges: make(map[peer.ID]time.Time), } } @@ -344,13 +347,21 @@ func (d *Manager) requestPeersFromPeer(ctx context.Context, peerID peer.ID, limi // Open a stream to the peer stream, err := d.host.NewStream(ctx, peerID, PeerExchangeProtocol) if err != nil { - d.logger.Debug("Failed to open peer exchange stream", - zap.String("peer_id", peerID.String()[:8]+"..."), - zap.Error(err)) + // Suppress repeated warnings for the same peer (log once per minute max) + lastFailure, seen := d.failedPeerExchanges[peerID] + if !seen || time.Since(lastFailure) > time.Minute { + d.logger.Debug("Failed to open peer exchange stream", + zap.String("peer_id", peerID.String()[:8]+"..."), + zap.Error(err)) + d.failedPeerExchanges[peerID] = time.Now() + } return nil } defer stream.Close() + // Clear failure tracking on success + delete(d.failedPeerExchanges, peerID) + // Send request req := PeerExchangeRequest{Limit: limit} encoder := json.NewEncoder(stream) diff --git a/pkg/rqlite/cluster_discovery.go b/pkg/rqlite/cluster_discovery.go index 7585cbf..3e1b46b 100644 --- a/pkg/rqlite/cluster_discovery.go +++ b/pkg/rqlite/cluster_discovery.go @@ -64,7 +64,7 @@ func NewClusterDiscoveryService( peerHealth: make(map[string]*PeerHealth), updateInterval: 30 * time.Second, inactivityLimit: 24 * time.Hour, - logger: logger, + logger: logger.With(zap.String("component", "cluster-discovery")), } } @@ -82,9 +82,8 @@ func (c *ClusterDiscoveryService) Start(ctx context.Context) error { c.cancel = cancel c.logger.Info("Starting cluster discovery service", - zap.String("node_id", c.nodeID), - zap.String("node_type", c.nodeType), zap.String("raft_address", c.raftAddress), + zap.String("node_type", c.nodeType), zap.String("http_address", c.httpAddress), zap.String("data_dir", c.dataDir), zap.Duration("update_interval", c.updateInterval), @@ -120,7 +119,7 @@ func (c *ClusterDiscoveryService) Stop() { // periodicSync runs periodic cluster membership synchronization func (c *ClusterDiscoveryService) periodicSync(ctx context.Context) { - c.logger.Info("periodicSync goroutine started, waiting for RQLite readiness") + c.logger.Debug("periodicSync goroutine started, waiting for RQLite readiness") ticker := time.NewTicker(c.updateInterval) defer ticker.Stop() @@ -129,10 +128,9 @@ func (c *ClusterDiscoveryService) periodicSync(ctx context.Context) { for { select { case <-ctx.Done(): - c.logger.Info("periodicSync goroutine stopping") + c.logger.Debug("periodicSync goroutine stopping") return case <-ticker.C: - c.logger.Debug("Running periodic cluster membership sync") c.updateClusterMembership() } } @@ -193,6 +191,14 @@ func (c *ClusterDiscoveryService) collectPeerMetadata() []*discovery.RQLiteNodeM return metadata } +// membershipUpdateResult contains the result of a membership update operation +type membershipUpdateResult struct { + peersJSON []map[string]interface{} + added []string + updated []string + changed bool +} + // updateClusterMembership updates the cluster membership based on discovered peers func (c *ClusterDiscoveryService) updateClusterMembership() { metadata := c.collectPeerMetadata() @@ -200,15 +206,58 @@ func (c *ClusterDiscoveryService) updateClusterMembership() { c.logger.Debug("Collected peer metadata", zap.Int("metadata_count", len(metadata))) + // Compute membership changes while holding lock c.mu.Lock() - defer c.mu.Unlock() + result := c.computeMembershipChangesLocked(metadata) + c.mu.Unlock() + // Perform file I/O outside the lock + if result.changed { + // Log state changes (peer added/removed) at Info level + if len(result.added) > 0 || len(result.updated) > 0 { + c.logger.Info("Cluster membership changed", + zap.Int("added", len(result.added)), + zap.Int("updated", len(result.updated)), + zap.Strings("added_ids", result.added), + zap.Strings("updated_ids", result.updated)) + } + + // Write peers.json without holding lock + if err := c.writePeersJSONWithData(result.peersJSON); err != nil { + c.logger.Error("CRITICAL: Failed to write peers.json", + zap.Error(err), + zap.String("data_dir", c.dataDir), + zap.Int("peer_count", len(result.peersJSON))) + } else { + c.logger.Debug("peers.json updated", + zap.Int("peer_count", len(result.peersJSON))) + } + + // Update lastUpdate timestamp + c.mu.Lock() + c.lastUpdate = time.Now() + c.mu.Unlock() + } else { + c.mu.RLock() + totalPeers := len(c.knownPeers) + c.mu.RUnlock() + c.logger.Debug("No changes to cluster membership", + zap.Int("total_peers", totalPeers)) + } +} + +// computeMembershipChangesLocked computes membership changes and returns snapshot data +// Must be called with lock held +func (c *ClusterDiscoveryService) computeMembershipChangesLocked(metadata []*discovery.RQLiteNodeMetadata) membershipUpdateResult { // Track changes added := []string{} updated := []string{} - // Update known peers + // Update known peers, but skip self for health tracking for _, meta := range metadata { + // Skip self-metadata for health tracking (we only track remote peers) + isSelf := meta.NodeID == c.raftAddress + if existing, ok := c.knownPeers[meta.NodeID]; ok { // Update existing peer if existing.RaftLogIndex != meta.RaftLogIndex || @@ -228,55 +277,45 @@ func (c *ClusterDiscoveryService) updateClusterMembership() { c.knownPeers[meta.NodeID] = meta - // Update health tracking - if _, ok := c.peerHealth[meta.NodeID]; !ok { - c.peerHealth[meta.NodeID] = &PeerHealth{ - LastSeen: time.Now(), - LastSuccessful: time.Now(), - Status: "active", + // Update health tracking only for remote peers + if !isSelf { + if _, ok := c.peerHealth[meta.NodeID]; !ok { + c.peerHealth[meta.NodeID] = &PeerHealth{ + LastSeen: time.Now(), + LastSuccessful: time.Now(), + Status: "active", + } + } else { + c.peerHealth[meta.NodeID].LastSeen = time.Now() + c.peerHealth[meta.NodeID].Status = "active" + c.peerHealth[meta.NodeID].FailureCount = 0 } - } else { - c.peerHealth[meta.NodeID].LastSeen = time.Now() - c.peerHealth[meta.NodeID].Status = "active" - c.peerHealth[meta.NodeID].FailureCount = 0 } } - // Generate and write peers.json if there are changes OR if this is the first time + // Determine if we should write peers.json shouldWrite := len(added) > 0 || len(updated) > 0 || c.lastUpdate.IsZero() if shouldWrite { - c.logger.Info("Updating peers.json", - zap.Int("added", len(added)), - zap.Int("updated", len(updated)), - zap.Int("total_peers", len(c.knownPeers)), - zap.Bool("first_run", c.lastUpdate.IsZero())) - - // Get peers JSON while holding the lock - peers := c.getPeersJSONUnlocked() - - // Release lock before file I/O - c.mu.Unlock() - - // Write without holding lock - if err := c.writePeersJSONWithData(peers); err != nil { - c.logger.Error("CRITICAL: Failed to write peers.json", - zap.Error(err), - zap.String("data_dir", c.dataDir), - zap.Int("peer_count", len(peers))) - } else { - c.logger.Info("Successfully wrote peers.json", - zap.Int("peer_count", len(peers))) + // Log initial sync if this is the first time + if c.lastUpdate.IsZero() { + c.logger.Info("Initial cluster membership sync", + zap.Int("total_peers", len(c.knownPeers))) } - // Re-acquire lock to update lastUpdate - c.mu.Lock() - } else { - c.logger.Debug("No changes to cluster membership", - zap.Int("total_peers", len(c.knownPeers))) + // Get peers JSON snapshot + peers := c.getPeersJSONUnlocked() + return membershipUpdateResult{ + peersJSON: peers, + added: added, + updated: updated, + changed: true, + } } - c.lastUpdate = time.Now() + return membershipUpdateResult{ + changed: false, + } } // removeInactivePeers removes peers that haven't been seen for longer than the inactivity limit @@ -349,12 +388,6 @@ func (c *ClusterDiscoveryService) writePeersJSON() error { // writePeersJSONWithData writes the peers.json file with provided data (no lock needed) func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]interface{}) error { - c.logger.Info("writePeersJSON: Starting", - zap.String("data_dir", c.dataDir)) - - c.logger.Info("writePeersJSON: Got peers JSON", - zap.Int("peer_count", len(peers))) - // Expand ~ in data directory path dataDir := os.ExpandEnv(c.dataDir) if strings.HasPrefix(dataDir, "~") { @@ -365,14 +398,14 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte dataDir = filepath.Join(home, dataDir[1:]) } - c.logger.Info("writePeersJSON: Expanded data dir", - zap.String("expanded_path", dataDir)) - // Get the RQLite raft directory rqliteDir := filepath.Join(dataDir, "rqlite", "raft") - c.logger.Info("writePeersJSON: Creating raft directory", - zap.String("raft_dir", rqliteDir)) + c.logger.Debug("Writing peers.json", + zap.String("data_dir", c.dataDir), + zap.String("expanded_path", dataDir), + zap.String("raft_dir", rqliteDir), + zap.Int("peer_count", len(peers))) if err := os.MkdirAll(rqliteDir, 0755); err != nil { return fmt.Errorf("failed to create raft directory %s: %w", rqliteDir, err) @@ -381,13 +414,9 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte peersFile := filepath.Join(rqliteDir, "peers.json") backupFile := filepath.Join(rqliteDir, "peers.json.backup") - c.logger.Info("writePeersJSON: File paths", - zap.String("peers_file", peersFile), - zap.String("backup_file", backupFile)) - // Backup existing peers.json if it exists if _, err := os.Stat(peersFile); err == nil { - c.logger.Info("writePeersJSON: Backing up existing peers.json") + c.logger.Debug("Backing up existing peers.json", zap.String("backup_file", backupFile)) data, err := os.ReadFile(peersFile) if err == nil { _ = os.WriteFile(backupFile, data, 0644) @@ -395,27 +424,19 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte } // Marshal to JSON - c.logger.Info("writePeersJSON: Marshaling to JSON") data, err := json.MarshalIndent(peers, "", " ") if err != nil { return fmt.Errorf("failed to marshal peers.json: %w", err) } - c.logger.Info("writePeersJSON: JSON marshaled", - zap.Int("data_size", len(data))) + c.logger.Debug("Marshaled peers.json", zap.Int("data_size", len(data))) // Write atomically using temp file + rename tempFile := peersFile + ".tmp" - - c.logger.Info("writePeersJSON: Writing temp file", - zap.String("temp_file", tempFile)) - if err := os.WriteFile(tempFile, data, 0644); err != nil { return fmt.Errorf("failed to write temp peers.json %s: %w", tempFile, err) } - c.logger.Info("writePeersJSON: Renaming temp file to final") - if err := os.Rename(tempFile, peersFile); err != nil { return fmt.Errorf("failed to rename %s to %s: %w", tempFile, peersFile, err) } @@ -427,7 +448,7 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte } } - c.logger.Info("peers.json successfully written!", + c.logger.Info("peers.json written", zap.String("file", peersFile), zap.Int("node_count", len(peers)), zap.Strings("node_ids", nodeIDs)) @@ -573,9 +594,6 @@ func (c *ClusterDiscoveryService) TriggerPeerExchange(ctx context.Context) error // UpdateOwnMetadata updates our own RQLite metadata in the peerstore func (c *ClusterDiscoveryService) UpdateOwnMetadata() { - c.logger.Info("Updating own RQLite metadata for peer exchange", - zap.String("node_id", c.nodeID)) - metadata := &discovery.RQLiteNodeMetadata{ NodeID: c.raftAddress, // RQLite uses raft address as node ID RaftAddress: c.raftAddress, @@ -586,12 +604,6 @@ func (c *ClusterDiscoveryService) UpdateOwnMetadata() { ClusterVersion: "1.0", } - c.logger.Info("Created metadata struct", - zap.String("node_id", metadata.NodeID), - zap.String("raft_address", metadata.RaftAddress), - zap.String("http_address", metadata.HTTPAddress), - zap.Uint64("log_index", metadata.RaftLogIndex)) - // Store in our own peerstore for peer exchange data, err := json.Marshal(metadata) if err != nil { @@ -604,8 +616,8 @@ func (c *ClusterDiscoveryService) UpdateOwnMetadata() { return } - c.logger.Info("Successfully stored own RQLite metadata in peerstore", - zap.String("node_id", c.nodeID), + c.logger.Debug("Updated own RQLite metadata", + zap.String("node_id", metadata.NodeID), zap.Uint64("log_index", metadata.RaftLogIndex)) } diff --git a/pkg/rqlite/data_safety.go b/pkg/rqlite/data_safety.go index de2ad54..7abb3ed 100644 --- a/pkg/rqlite/data_safety.go +++ b/pkg/rqlite/data_safety.go @@ -17,7 +17,7 @@ func (r *RQLiteManager) getRaftLogIndex() uint64 { r.logger.Debug("Failed to get Raft log index", zap.Error(err)) return 0 } - + // Return the highest index we have maxIndex := status.Store.Raft.LastLogIndex if status.Store.Raft.AppliedIndex > maxIndex { @@ -26,7 +26,7 @@ func (r *RQLiteManager) getRaftLogIndex() uint64 { if status.Store.Raft.CommitIndex > maxIndex { maxIndex = status.Store.Raft.CommitIndex } - + return maxIndex } @@ -34,23 +34,23 @@ func (r *RQLiteManager) getRaftLogIndex() uint64 { func (r *RQLiteManager) getRQLiteStatus() (*RQLiteStatus, error) { url := fmt.Sprintf("http://localhost:%d/status", r.config.RQLitePort) client := &http.Client{Timeout: 5 * time.Second} - + resp, err := client.Get(url) if err != nil { return nil, fmt.Errorf("failed to query status: %w", err) } defer resp.Body.Close() - + if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("status endpoint returned %d: %s", resp.StatusCode, string(body)) } - + var status RQLiteStatus if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { return nil, fmt.Errorf("failed to decode status: %w", err) } - + return &status, nil } @@ -58,23 +58,37 @@ func (r *RQLiteManager) getRQLiteStatus() (*RQLiteStatus, error) { func (r *RQLiteManager) getRQLiteNodes() (RQLiteNodes, error) { url := fmt.Sprintf("http://localhost:%d/nodes?ver=2", r.config.RQLitePort) client := &http.Client{Timeout: 5 * time.Second} - + resp, err := client.Get(url) if err != nil { return nil, fmt.Errorf("failed to query nodes: %w", err) } defer resp.Body.Close() - + if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("nodes endpoint returned %d: %s", resp.StatusCode, string(body)) } - + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read nodes response: %w", err) + } + + // rqlite v8 wraps nodes in a top-level object; fall back to a raw array for older versions. + var wrapped struct { + Nodes RQLiteNodes `json:"nodes"` + } + if err := json.Unmarshal(body, &wrapped); err == nil && wrapped.Nodes != nil { + return wrapped.Nodes, nil + } + + // Try legacy format (plain array) var nodes RQLiteNodes - if err := json.NewDecoder(resp.Body).Decode(&nodes); err != nil { + if err := json.Unmarshal(body, &nodes); err != nil { return nil, fmt.Errorf("failed to decode nodes: %w", err) } - + return nodes, nil } @@ -84,12 +98,12 @@ func (r *RQLiteManager) getRQLiteLeader() (string, error) { if err != nil { return "", err } - + leaderAddr := status.Store.Raft.LeaderAddr if leaderAddr == "" { return "", fmt.Errorf("no leader found") } - + return leaderAddr, nil } @@ -97,13 +111,12 @@ func (r *RQLiteManager) getRQLiteLeader() (string, error) { func (r *RQLiteManager) isNodeReachable(httpAddress string) bool { url := fmt.Sprintf("http://%s/status", httpAddress) client := &http.Client{Timeout: 3 * time.Second} - + resp, err := client.Get(url) if err != nil { return false } defer resp.Body.Close() - + return resp.StatusCode == http.StatusOK } - diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index d491fa0..dc84881 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -64,7 +64,7 @@ func NewRQLiteManager(cfg *config.DatabaseConfig, discoveryCfg *config.Discovery config: cfg, discoverConfig: discoveryCfg, dataDir: dataDir, - logger: logger, + logger: logger.With(zap.String("component", "rqlite-manager")), } } @@ -75,20 +75,9 @@ func (r *RQLiteManager) SetDiscoveryService(service *ClusterDiscoveryService) { // Start starts the RQLite node func (r *RQLiteManager) Start(ctx context.Context) error { - // Expand ~ in data directory path - dataDir := os.ExpandEnv(r.dataDir) - if strings.HasPrefix(dataDir, "~") { - home, err := os.UserHomeDir() - if err != nil { - return fmt.Errorf("failed to determine home directory: %w", err) - } - dataDir = filepath.Join(home, dataDir[1:]) - } - - // Create data directory - rqliteDataDir := filepath.Join(dataDir, "rqlite") - if err := os.MkdirAll(rqliteDataDir, 0755); err != nil { - return fmt.Errorf("failed to create RQLite data directory: %w", err) + rqliteDataDir, err := r.prepareDataDir() + if err != nil { + return err } if r.discoverConfig.HttpAdvAddress == "" { @@ -106,6 +95,55 @@ func (r *RQLiteManager) Start(ctx context.Context) error { } } + // Launch RQLite process + if err := r.launchProcess(ctx, rqliteDataDir); err != nil { + return err + } + + // Wait for RQLite to be ready and establish connection + if err := r.waitForReadyAndConnect(ctx); err != nil { + return err + } + + // Establish leadership/SQL availability + if err := r.establishLeadershipOrJoin(ctx, rqliteDataDir); err != nil { + return err + } + + // Apply migrations + migrationsDir := "migrations" + if err := r.ApplyMigrations(ctx, migrationsDir); err != nil { + r.logger.Error("Migrations failed", zap.Error(err), zap.String("dir", migrationsDir)) + return fmt.Errorf("apply migrations: %w", err) + } + + r.logger.Info("RQLite node started successfully") + return nil +} + +// prepareDataDir expands and creates the RQLite data directory +func (r *RQLiteManager) prepareDataDir() (string, error) { + // Expand ~ in data directory path + dataDir := os.ExpandEnv(r.dataDir) + if strings.HasPrefix(dataDir, "~") { + home, err := os.UserHomeDir() + if err != nil { + return "", fmt.Errorf("failed to determine home directory: %w", err) + } + dataDir = filepath.Join(home, dataDir[1:]) + } + + // Create data directory + rqliteDataDir := filepath.Join(dataDir, "rqlite") + if err := os.MkdirAll(rqliteDataDir, 0755); err != nil { + return "", fmt.Errorf("failed to create RQLite data directory: %w", err) + } + + return rqliteDataDir, nil +} + +// launchProcess starts the RQLite process with appropriate arguments +func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string) error { // Build RQLite command args := []string{ "-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort), @@ -147,9 +185,7 @@ func (r *RQLiteManager) Start(ctx context.Context) error { zap.String("data_dir", rqliteDataDir), zap.Int("http_port", r.config.RQLitePort), zap.Int("raft_port", r.config.RQLiteRaftPort), - zap.String("join_address", r.config.RQLiteJoinAddress), - zap.Strings("full_args", args), - ) + zap.String("join_address", r.config.RQLiteJoinAddress)) // Start RQLite process (not bound to ctx for graceful Stop handling) r.cmd = exec.Command("rqlited", args...) @@ -162,6 +198,11 @@ func (r *RQLiteManager) Start(ctx context.Context) error { return fmt.Errorf("failed to start RQLite: %w", err) } + return nil +} + +// waitForReadyAndConnect waits for RQLite to be ready and establishes connection +func (r *RQLiteManager) waitForReadyAndConnect(ctx context.Context) error { // Wait for RQLite to be ready if err := r.waitForReady(ctx); err != nil { if r.cmd != nil && r.cmd.Process != nil { @@ -182,11 +223,15 @@ func (r *RQLiteManager) Start(ctx context.Context) error { // Sanity check: verify rqlite's node ID matches our configured raft address if err := r.validateNodeID(); err != nil { - r.logger.Warn("Node ID validation failed", zap.Error(err)) - // Don't fail startup, but log the mismatch for debugging + r.logger.Debug("Node ID validation skipped", zap.Error(err)) + // Don't fail startup, but log at debug level } - // Leadership/SQL readiness gating with dynamic discovery support + return nil +} + +// establishLeadershipOrJoin establishes leadership (bootstrap) or waits for SQL availability (joining) +func (r *RQLiteManager) establishLeadershipOrJoin(ctx context.Context, rqliteDataDir string) error { if r.config.RQLiteJoinAddress == "" { // Bootstrap node logic with data safety checks r.logger.Info("Bootstrap node: checking if safe to lead") @@ -214,46 +259,47 @@ func (r *RQLiteManager) Start(ctx context.Context) error { leadershipErr := r.waitForLeadership(ctx) if leadershipErr == nil { r.logger.Info("Bootstrap node successfully established leadership") - } else { - r.logger.Warn("Initial leadership attempt failed, may need cluster recovery", - zap.Error(leadershipErr)) + return nil + } - // Try recovery if we have peers.json from discovery - if r.discoveryService != nil { - peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json") - if _, err := os.Stat(peersPath); err == nil { - r.logger.Info("Attempting cluster recovery using peers.json", - zap.String("peers_file", peersPath)) + r.logger.Warn("Initial leadership attempt failed, may need cluster recovery", + zap.Error(leadershipErr)) - if recoveryErr := r.recoverCluster(peersPath); recoveryErr == nil { - r.logger.Info("Cluster recovery successful, retrying leadership") - leadershipErr = r.waitForLeadership(ctx) - if leadershipErr == nil { - r.logger.Info("Bootstrap node established leadership after recovery") - } - } else { - r.logger.Warn("Cluster recovery failed", zap.Error(recoveryErr)) + // Try recovery if we have peers.json from discovery + if r.discoveryService != nil { + peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json") + if _, err := os.Stat(peersPath); err == nil { + r.logger.Info("Attempting cluster recovery using peers.json", + zap.String("peers_file", peersPath)) + + if recoveryErr := r.recoverCluster(peersPath); recoveryErr == nil { + r.logger.Info("Cluster recovery successful, retrying leadership") + leadershipErr = r.waitForLeadership(ctx) + if leadershipErr == nil { + r.logger.Info("Bootstrap node established leadership after recovery") + return nil } - } - } - - // Final fallback: SQL availability - if leadershipErr != nil { - r.logger.Warn("Leadership failed, trying SQL availability") - sqlCtx := ctx - if _, hasDeadline := ctx.Deadline(); !hasDeadline { - var cancel context.CancelFunc - sqlCtx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - defer cancel() - } - if err := r.waitForSQLAvailable(sqlCtx); err != nil { - if r.cmd != nil && r.cmd.Process != nil { - _ = r.cmd.Process.Kill() - } - return fmt.Errorf("RQLite SQL not available: %w", err) + } else { + r.logger.Warn("Cluster recovery failed", zap.Error(recoveryErr)) } } } + + // Final fallback: SQL availability + r.logger.Warn("Leadership failed, trying SQL availability") + sqlCtx := ctx + if _, hasDeadline := ctx.Deadline(); !hasDeadline { + var cancel context.CancelFunc + sqlCtx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + } + if err := r.waitForSQLAvailable(sqlCtx); err != nil { + if r.cmd != nil && r.cmd.Process != nil { + _ = r.cmd.Process.Kill() + } + return fmt.Errorf("RQLite SQL not available: %w", err) + } + return nil } else { // Joining node logic r.logger.Info("Waiting for RQLite SQL availability (leader discovery)") @@ -269,18 +315,8 @@ func (r *RQLiteManager) Start(ctx context.Context) error { } return fmt.Errorf("RQLite SQL not available: %w", err) } + return nil } - - // After waitForLeadership / waitForSQLAvailable succeeds, before returning: - migrationsDir := "migrations" - - if err := r.ApplyMigrations(ctx, migrationsDir); err != nil { - r.logger.Error("Migrations failed", zap.Error(err), zap.String("dir", migrationsDir)) - return fmt.Errorf("apply migrations: %w", err) - } - - r.logger.Info("RQLite node started successfully") - return nil } // hasExistingState returns true if the rqlite data directory already contains files or subdirectories. @@ -331,7 +367,12 @@ func (r *RQLiteManager) waitForReady(ctx context.Context) error { func (r *RQLiteManager) waitForLeadership(ctx context.Context) error { r.logger.Info("Waiting for RQLite to establish leadership...") - for i := 0; i < 30; i++ { + maxAttempts := 30 + attempt := 0 + backoffDelay := 500 * time.Millisecond + maxBackoff := 5 * time.Second + + for attempt < maxAttempts { select { case <-ctx.Done(): return ctx.Err() @@ -345,10 +386,19 @@ func (r *RQLiteManager) waitForLeadership(ctx context.Context) error { r.logger.Info("RQLite leadership established") return nil } - r.logger.Debug("Waiting for leadership", zap.Error(err)) + // Log every 5th attempt or on first attempt to reduce noise + if attempt%5 == 0 || attempt == 0 { + r.logger.Debug("Waiting for leadership", zap.Int("attempt", attempt+1), zap.Error(err)) + } } - time.Sleep(1 * time.Second) + // Exponential backoff with jitter + time.Sleep(backoffDelay) + backoffDelay = time.Duration(float64(backoffDelay) * 1.5) + if backoffDelay > maxBackoff { + backoffDelay = maxBackoff + } + attempt++ } return fmt.Errorf("RQLite failed to establish leadership within timeout") @@ -728,7 +778,9 @@ func (r *RQLiteManager) validateNodeID() error { time.Sleep(500 * time.Millisecond) continue } - return fmt.Errorf("failed to query nodes endpoint after retries: %w", err) + // Log at debug level if validation fails - not critical + r.logger.Debug("Node ID validation skipped (endpoint unavailable)", zap.Error(err)) + return nil } expectedID := r.discoverConfig.RaftAdvAddress @@ -736,6 +788,12 @@ func (r *RQLiteManager) validateNodeID() error { return fmt.Errorf("raft_adv_address not configured") } + // If cluster is still forming, nodes list might be empty - that's okay + if len(nodes) == 0 { + r.logger.Debug("Node ID validation skipped (cluster not yet formed)") + return nil + } + // Find our node in the cluster (match by address) for _, node := range nodes { if node.Address == expectedID { @@ -747,21 +805,16 @@ func (r *RQLiteManager) validateNodeID() error { zap.String("explanation", "peers.json id field must match rqlite's node ID (raft address)")) return fmt.Errorf("node ID mismatch: configured %s but rqlite reports %s", expectedID, node.ID) } - r.logger.Info("Node ID validation passed", + r.logger.Debug("Node ID validation passed", zap.String("node_id", node.ID), zap.String("address", node.Address)) return nil } } - // If cluster is still forming, nodes list might be empty - that's okay - if len(nodes) == 0 { - r.logger.Debug("Cluster membership not yet available, skipping validation") - return nil - } - - // If we can't find ourselves but other nodes exist, log a warning - r.logger.Warn("Could not find our node in cluster membership", + // If we can't find ourselves but other nodes exist, cluster might still be forming + // This is fine - don't log a warning + r.logger.Debug("Node ID validation skipped (node not yet in cluster membership)", zap.String("expected_address", expectedID), zap.Int("nodes_in_cluster", len(nodes))) return nil