feat: enhance RQLite manager with improved logging and data directory management

- Added structured logging for RQLite components, including cluster discovery and leadership processes.
- Introduced methods for preparing the data directory and launching the RQLite process, improving code organization.
- Implemented exponential backoff for leadership checks to reduce log noise and improve reliability.
- Enhanced peer health tracking and membership update logic to streamline cluster synchronization and recovery.
This commit is contained in:
anonpenguin23 2025-10-31 11:53:50 +02:00
parent 8f82dc7ca3
commit 2aead48045
6 changed files with 293 additions and 187 deletions

View File

@ -14,6 +14,23 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant
### Fixed ### Fixed
## [0.53.0] - 2025-10-31
### Added
- Discovery manager now tracks failed peer-exchange attempts to suppress repeated warnings while peers negotiate supported protocols.
### Changed
- Scoped logging throughout `cluster_discovery`, `rqlite`, and `discovery` packages so logs carry component tags and keep verbose output at debug level.
- Refactored `ClusterDiscoveryService` membership handling: metadata updates happen under lock, `peers.json` is written outside the lock, self-health is skipped, and change detection is centralized in `computeMembershipChangesLocked`.
- Reworked `RQLiteManager.Start` into helper functions (`prepareDataDir`, `launchProcess`, `waitForReadyAndConnect`, `establishLeadershipOrJoin`) with clearer logging, better error handling, and exponential backoff while waiting for leadership.
- `validateNodeID` now treats empty membership results as transitional states, logging at debug level instead of warning to avoid noisy startups.
### Fixed
- Eliminated spurious `peers.json` churn and node-ID mismatch warnings during cluster formation by aligning IDs with raft addresses and tightening discovery logging.
## [0.52.15] ## [0.52.15]
### Added ### Added

View File

@ -21,7 +21,7 @@ test-e2e:
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports
VERSION := 0.52.21 VERSION := 0.53.0
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'

View File

@ -42,6 +42,7 @@ type Manager struct {
host host.Host host host.Host
logger *zap.Logger logger *zap.Logger
cancel context.CancelFunc cancel context.CancelFunc
failedPeerExchanges map[peer.ID]time.Time // Track failed peer exchange attempts to suppress repeated warnings
} }
// Config contains discovery configuration // Config contains discovery configuration
@ -57,7 +58,9 @@ type Config struct {
func NewManager(h host.Host, _ interface{}, logger *zap.Logger) *Manager { func NewManager(h host.Host, _ interface{}, logger *zap.Logger) *Manager {
return &Manager{ return &Manager{
host: h, host: h,
logger: logger, logger: logger.With(zap.String("component", "peer-discovery")),
cancel: nil,
failedPeerExchanges: make(map[peer.ID]time.Time),
} }
} }
@ -344,13 +347,21 @@ func (d *Manager) requestPeersFromPeer(ctx context.Context, peerID peer.ID, limi
// Open a stream to the peer // Open a stream to the peer
stream, err := d.host.NewStream(ctx, peerID, PeerExchangeProtocol) stream, err := d.host.NewStream(ctx, peerID, PeerExchangeProtocol)
if err != nil { if err != nil {
// Suppress repeated warnings for the same peer (log once per minute max)
lastFailure, seen := d.failedPeerExchanges[peerID]
if !seen || time.Since(lastFailure) > time.Minute {
d.logger.Debug("Failed to open peer exchange stream", d.logger.Debug("Failed to open peer exchange stream",
zap.String("peer_id", peerID.String()[:8]+"..."), zap.String("peer_id", peerID.String()[:8]+"..."),
zap.Error(err)) zap.Error(err))
d.failedPeerExchanges[peerID] = time.Now()
}
return nil return nil
} }
defer stream.Close() defer stream.Close()
// Clear failure tracking on success
delete(d.failedPeerExchanges, peerID)
// Send request // Send request
req := PeerExchangeRequest{Limit: limit} req := PeerExchangeRequest{Limit: limit}
encoder := json.NewEncoder(stream) encoder := json.NewEncoder(stream)

View File

@ -64,7 +64,7 @@ func NewClusterDiscoveryService(
peerHealth: make(map[string]*PeerHealth), peerHealth: make(map[string]*PeerHealth),
updateInterval: 30 * time.Second, updateInterval: 30 * time.Second,
inactivityLimit: 24 * time.Hour, inactivityLimit: 24 * time.Hour,
logger: logger, logger: logger.With(zap.String("component", "cluster-discovery")),
} }
} }
@ -82,9 +82,8 @@ func (c *ClusterDiscoveryService) Start(ctx context.Context) error {
c.cancel = cancel c.cancel = cancel
c.logger.Info("Starting cluster discovery service", c.logger.Info("Starting cluster discovery service",
zap.String("node_id", c.nodeID),
zap.String("node_type", c.nodeType),
zap.String("raft_address", c.raftAddress), zap.String("raft_address", c.raftAddress),
zap.String("node_type", c.nodeType),
zap.String("http_address", c.httpAddress), zap.String("http_address", c.httpAddress),
zap.String("data_dir", c.dataDir), zap.String("data_dir", c.dataDir),
zap.Duration("update_interval", c.updateInterval), zap.Duration("update_interval", c.updateInterval),
@ -120,7 +119,7 @@ func (c *ClusterDiscoveryService) Stop() {
// periodicSync runs periodic cluster membership synchronization // periodicSync runs periodic cluster membership synchronization
func (c *ClusterDiscoveryService) periodicSync(ctx context.Context) { func (c *ClusterDiscoveryService) periodicSync(ctx context.Context) {
c.logger.Info("periodicSync goroutine started, waiting for RQLite readiness") c.logger.Debug("periodicSync goroutine started, waiting for RQLite readiness")
ticker := time.NewTicker(c.updateInterval) ticker := time.NewTicker(c.updateInterval)
defer ticker.Stop() defer ticker.Stop()
@ -129,10 +128,9 @@ func (c *ClusterDiscoveryService) periodicSync(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
c.logger.Info("periodicSync goroutine stopping") c.logger.Debug("periodicSync goroutine stopping")
return return
case <-ticker.C: case <-ticker.C:
c.logger.Debug("Running periodic cluster membership sync")
c.updateClusterMembership() c.updateClusterMembership()
} }
} }
@ -193,6 +191,14 @@ func (c *ClusterDiscoveryService) collectPeerMetadata() []*discovery.RQLiteNodeM
return metadata return metadata
} }
// membershipUpdateResult contains the result of a membership update operation
type membershipUpdateResult struct {
peersJSON []map[string]interface{}
added []string
updated []string
changed bool
}
// updateClusterMembership updates the cluster membership based on discovered peers // updateClusterMembership updates the cluster membership based on discovered peers
func (c *ClusterDiscoveryService) updateClusterMembership() { func (c *ClusterDiscoveryService) updateClusterMembership() {
metadata := c.collectPeerMetadata() metadata := c.collectPeerMetadata()
@ -200,15 +206,58 @@ func (c *ClusterDiscoveryService) updateClusterMembership() {
c.logger.Debug("Collected peer metadata", c.logger.Debug("Collected peer metadata",
zap.Int("metadata_count", len(metadata))) zap.Int("metadata_count", len(metadata)))
// Compute membership changes while holding lock
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() result := c.computeMembershipChangesLocked(metadata)
c.mu.Unlock()
// Perform file I/O outside the lock
if result.changed {
// Log state changes (peer added/removed) at Info level
if len(result.added) > 0 || len(result.updated) > 0 {
c.logger.Info("Cluster membership changed",
zap.Int("added", len(result.added)),
zap.Int("updated", len(result.updated)),
zap.Strings("added_ids", result.added),
zap.Strings("updated_ids", result.updated))
}
// Write peers.json without holding lock
if err := c.writePeersJSONWithData(result.peersJSON); err != nil {
c.logger.Error("CRITICAL: Failed to write peers.json",
zap.Error(err),
zap.String("data_dir", c.dataDir),
zap.Int("peer_count", len(result.peersJSON)))
} else {
c.logger.Debug("peers.json updated",
zap.Int("peer_count", len(result.peersJSON)))
}
// Update lastUpdate timestamp
c.mu.Lock()
c.lastUpdate = time.Now()
c.mu.Unlock()
} else {
c.mu.RLock()
totalPeers := len(c.knownPeers)
c.mu.RUnlock()
c.logger.Debug("No changes to cluster membership",
zap.Int("total_peers", totalPeers))
}
}
// computeMembershipChangesLocked computes membership changes and returns snapshot data
// Must be called with lock held
func (c *ClusterDiscoveryService) computeMembershipChangesLocked(metadata []*discovery.RQLiteNodeMetadata) membershipUpdateResult {
// Track changes // Track changes
added := []string{} added := []string{}
updated := []string{} updated := []string{}
// Update known peers // Update known peers, but skip self for health tracking
for _, meta := range metadata { for _, meta := range metadata {
// Skip self-metadata for health tracking (we only track remote peers)
isSelf := meta.NodeID == c.raftAddress
if existing, ok := c.knownPeers[meta.NodeID]; ok { if existing, ok := c.knownPeers[meta.NodeID]; ok {
// Update existing peer // Update existing peer
if existing.RaftLogIndex != meta.RaftLogIndex || if existing.RaftLogIndex != meta.RaftLogIndex ||
@ -228,7 +277,8 @@ func (c *ClusterDiscoveryService) updateClusterMembership() {
c.knownPeers[meta.NodeID] = meta c.knownPeers[meta.NodeID] = meta
// Update health tracking // Update health tracking only for remote peers
if !isSelf {
if _, ok := c.peerHealth[meta.NodeID]; !ok { if _, ok := c.peerHealth[meta.NodeID]; !ok {
c.peerHealth[meta.NodeID] = &PeerHealth{ c.peerHealth[meta.NodeID] = &PeerHealth{
LastSeen: time.Now(), LastSeen: time.Now(),
@ -241,42 +291,31 @@ func (c *ClusterDiscoveryService) updateClusterMembership() {
c.peerHealth[meta.NodeID].FailureCount = 0 c.peerHealth[meta.NodeID].FailureCount = 0
} }
} }
}
// Generate and write peers.json if there are changes OR if this is the first time // Determine if we should write peers.json
shouldWrite := len(added) > 0 || len(updated) > 0 || c.lastUpdate.IsZero() shouldWrite := len(added) > 0 || len(updated) > 0 || c.lastUpdate.IsZero()
if shouldWrite { if shouldWrite {
c.logger.Info("Updating peers.json", // Log initial sync if this is the first time
zap.Int("added", len(added)), if c.lastUpdate.IsZero() {
zap.Int("updated", len(updated)), c.logger.Info("Initial cluster membership sync",
zap.Int("total_peers", len(c.knownPeers)),
zap.Bool("first_run", c.lastUpdate.IsZero()))
// Get peers JSON while holding the lock
peers := c.getPeersJSONUnlocked()
// Release lock before file I/O
c.mu.Unlock()
// Write without holding lock
if err := c.writePeersJSONWithData(peers); err != nil {
c.logger.Error("CRITICAL: Failed to write peers.json",
zap.Error(err),
zap.String("data_dir", c.dataDir),
zap.Int("peer_count", len(peers)))
} else {
c.logger.Info("Successfully wrote peers.json",
zap.Int("peer_count", len(peers)))
}
// Re-acquire lock to update lastUpdate
c.mu.Lock()
} else {
c.logger.Debug("No changes to cluster membership",
zap.Int("total_peers", len(c.knownPeers))) zap.Int("total_peers", len(c.knownPeers)))
} }
c.lastUpdate = time.Now() // Get peers JSON snapshot
peers := c.getPeersJSONUnlocked()
return membershipUpdateResult{
peersJSON: peers,
added: added,
updated: updated,
changed: true,
}
}
return membershipUpdateResult{
changed: false,
}
} }
// removeInactivePeers removes peers that haven't been seen for longer than the inactivity limit // removeInactivePeers removes peers that haven't been seen for longer than the inactivity limit
@ -349,12 +388,6 @@ func (c *ClusterDiscoveryService) writePeersJSON() error {
// writePeersJSONWithData writes the peers.json file with provided data (no lock needed) // writePeersJSONWithData writes the peers.json file with provided data (no lock needed)
func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]interface{}) error { func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]interface{}) error {
c.logger.Info("writePeersJSON: Starting",
zap.String("data_dir", c.dataDir))
c.logger.Info("writePeersJSON: Got peers JSON",
zap.Int("peer_count", len(peers)))
// Expand ~ in data directory path // Expand ~ in data directory path
dataDir := os.ExpandEnv(c.dataDir) dataDir := os.ExpandEnv(c.dataDir)
if strings.HasPrefix(dataDir, "~") { if strings.HasPrefix(dataDir, "~") {
@ -365,14 +398,14 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte
dataDir = filepath.Join(home, dataDir[1:]) dataDir = filepath.Join(home, dataDir[1:])
} }
c.logger.Info("writePeersJSON: Expanded data dir",
zap.String("expanded_path", dataDir))
// Get the RQLite raft directory // Get the RQLite raft directory
rqliteDir := filepath.Join(dataDir, "rqlite", "raft") rqliteDir := filepath.Join(dataDir, "rqlite", "raft")
c.logger.Info("writePeersJSON: Creating raft directory", c.logger.Debug("Writing peers.json",
zap.String("raft_dir", rqliteDir)) zap.String("data_dir", c.dataDir),
zap.String("expanded_path", dataDir),
zap.String("raft_dir", rqliteDir),
zap.Int("peer_count", len(peers)))
if err := os.MkdirAll(rqliteDir, 0755); err != nil { if err := os.MkdirAll(rqliteDir, 0755); err != nil {
return fmt.Errorf("failed to create raft directory %s: %w", rqliteDir, err) return fmt.Errorf("failed to create raft directory %s: %w", rqliteDir, err)
@ -381,13 +414,9 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte
peersFile := filepath.Join(rqliteDir, "peers.json") peersFile := filepath.Join(rqliteDir, "peers.json")
backupFile := filepath.Join(rqliteDir, "peers.json.backup") backupFile := filepath.Join(rqliteDir, "peers.json.backup")
c.logger.Info("writePeersJSON: File paths",
zap.String("peers_file", peersFile),
zap.String("backup_file", backupFile))
// Backup existing peers.json if it exists // Backup existing peers.json if it exists
if _, err := os.Stat(peersFile); err == nil { if _, err := os.Stat(peersFile); err == nil {
c.logger.Info("writePeersJSON: Backing up existing peers.json") c.logger.Debug("Backing up existing peers.json", zap.String("backup_file", backupFile))
data, err := os.ReadFile(peersFile) data, err := os.ReadFile(peersFile)
if err == nil { if err == nil {
_ = os.WriteFile(backupFile, data, 0644) _ = os.WriteFile(backupFile, data, 0644)
@ -395,27 +424,19 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte
} }
// Marshal to JSON // Marshal to JSON
c.logger.Info("writePeersJSON: Marshaling to JSON")
data, err := json.MarshalIndent(peers, "", " ") data, err := json.MarshalIndent(peers, "", " ")
if err != nil { if err != nil {
return fmt.Errorf("failed to marshal peers.json: %w", err) return fmt.Errorf("failed to marshal peers.json: %w", err)
} }
c.logger.Info("writePeersJSON: JSON marshaled", c.logger.Debug("Marshaled peers.json", zap.Int("data_size", len(data)))
zap.Int("data_size", len(data)))
// Write atomically using temp file + rename // Write atomically using temp file + rename
tempFile := peersFile + ".tmp" tempFile := peersFile + ".tmp"
c.logger.Info("writePeersJSON: Writing temp file",
zap.String("temp_file", tempFile))
if err := os.WriteFile(tempFile, data, 0644); err != nil { if err := os.WriteFile(tempFile, data, 0644); err != nil {
return fmt.Errorf("failed to write temp peers.json %s: %w", tempFile, err) return fmt.Errorf("failed to write temp peers.json %s: %w", tempFile, err)
} }
c.logger.Info("writePeersJSON: Renaming temp file to final")
if err := os.Rename(tempFile, peersFile); err != nil { if err := os.Rename(tempFile, peersFile); err != nil {
return fmt.Errorf("failed to rename %s to %s: %w", tempFile, peersFile, err) return fmt.Errorf("failed to rename %s to %s: %w", tempFile, peersFile, err)
} }
@ -427,7 +448,7 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte
} }
} }
c.logger.Info("peers.json successfully written!", c.logger.Info("peers.json written",
zap.String("file", peersFile), zap.String("file", peersFile),
zap.Int("node_count", len(peers)), zap.Int("node_count", len(peers)),
zap.Strings("node_ids", nodeIDs)) zap.Strings("node_ids", nodeIDs))
@ -573,9 +594,6 @@ func (c *ClusterDiscoveryService) TriggerPeerExchange(ctx context.Context) error
// UpdateOwnMetadata updates our own RQLite metadata in the peerstore // UpdateOwnMetadata updates our own RQLite metadata in the peerstore
func (c *ClusterDiscoveryService) UpdateOwnMetadata() { func (c *ClusterDiscoveryService) UpdateOwnMetadata() {
c.logger.Info("Updating own RQLite metadata for peer exchange",
zap.String("node_id", c.nodeID))
metadata := &discovery.RQLiteNodeMetadata{ metadata := &discovery.RQLiteNodeMetadata{
NodeID: c.raftAddress, // RQLite uses raft address as node ID NodeID: c.raftAddress, // RQLite uses raft address as node ID
RaftAddress: c.raftAddress, RaftAddress: c.raftAddress,
@ -586,12 +604,6 @@ func (c *ClusterDiscoveryService) UpdateOwnMetadata() {
ClusterVersion: "1.0", ClusterVersion: "1.0",
} }
c.logger.Info("Created metadata struct",
zap.String("node_id", metadata.NodeID),
zap.String("raft_address", metadata.RaftAddress),
zap.String("http_address", metadata.HTTPAddress),
zap.Uint64("log_index", metadata.RaftLogIndex))
// Store in our own peerstore for peer exchange // Store in our own peerstore for peer exchange
data, err := json.Marshal(metadata) data, err := json.Marshal(metadata)
if err != nil { if err != nil {
@ -604,8 +616,8 @@ func (c *ClusterDiscoveryService) UpdateOwnMetadata() {
return return
} }
c.logger.Info("Successfully stored own RQLite metadata in peerstore", c.logger.Debug("Updated own RQLite metadata",
zap.String("node_id", c.nodeID), zap.String("node_id", metadata.NodeID),
zap.Uint64("log_index", metadata.RaftLogIndex)) zap.Uint64("log_index", metadata.RaftLogIndex))
} }

View File

@ -70,8 +70,22 @@ func (r *RQLiteManager) getRQLiteNodes() (RQLiteNodes, error) {
return nil, fmt.Errorf("nodes endpoint returned %d: %s", resp.StatusCode, string(body)) return nil, fmt.Errorf("nodes endpoint returned %d: %s", resp.StatusCode, string(body))
} }
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read nodes response: %w", err)
}
// rqlite v8 wraps nodes in a top-level object; fall back to a raw array for older versions.
var wrapped struct {
Nodes RQLiteNodes `json:"nodes"`
}
if err := json.Unmarshal(body, &wrapped); err == nil && wrapped.Nodes != nil {
return wrapped.Nodes, nil
}
// Try legacy format (plain array)
var nodes RQLiteNodes var nodes RQLiteNodes
if err := json.NewDecoder(resp.Body).Decode(&nodes); err != nil { if err := json.Unmarshal(body, &nodes); err != nil {
return nil, fmt.Errorf("failed to decode nodes: %w", err) return nil, fmt.Errorf("failed to decode nodes: %w", err)
} }
@ -106,4 +120,3 @@ func (r *RQLiteManager) isNodeReachable(httpAddress string) bool {
return resp.StatusCode == http.StatusOK return resp.StatusCode == http.StatusOK
} }

View File

@ -64,7 +64,7 @@ func NewRQLiteManager(cfg *config.DatabaseConfig, discoveryCfg *config.Discovery
config: cfg, config: cfg,
discoverConfig: discoveryCfg, discoverConfig: discoveryCfg,
dataDir: dataDir, dataDir: dataDir,
logger: logger, logger: logger.With(zap.String("component", "rqlite-manager")),
} }
} }
@ -75,20 +75,9 @@ func (r *RQLiteManager) SetDiscoveryService(service *ClusterDiscoveryService) {
// Start starts the RQLite node // Start starts the RQLite node
func (r *RQLiteManager) Start(ctx context.Context) error { func (r *RQLiteManager) Start(ctx context.Context) error {
// Expand ~ in data directory path rqliteDataDir, err := r.prepareDataDir()
dataDir := os.ExpandEnv(r.dataDir)
if strings.HasPrefix(dataDir, "~") {
home, err := os.UserHomeDir()
if err != nil { if err != nil {
return fmt.Errorf("failed to determine home directory: %w", err) return err
}
dataDir = filepath.Join(home, dataDir[1:])
}
// Create data directory
rqliteDataDir := filepath.Join(dataDir, "rqlite")
if err := os.MkdirAll(rqliteDataDir, 0755); err != nil {
return fmt.Errorf("failed to create RQLite data directory: %w", err)
} }
if r.discoverConfig.HttpAdvAddress == "" { if r.discoverConfig.HttpAdvAddress == "" {
@ -106,6 +95,55 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
} }
} }
// Launch RQLite process
if err := r.launchProcess(ctx, rqliteDataDir); err != nil {
return err
}
// Wait for RQLite to be ready and establish connection
if err := r.waitForReadyAndConnect(ctx); err != nil {
return err
}
// Establish leadership/SQL availability
if err := r.establishLeadershipOrJoin(ctx, rqliteDataDir); err != nil {
return err
}
// Apply migrations
migrationsDir := "migrations"
if err := r.ApplyMigrations(ctx, migrationsDir); err != nil {
r.logger.Error("Migrations failed", zap.Error(err), zap.String("dir", migrationsDir))
return fmt.Errorf("apply migrations: %w", err)
}
r.logger.Info("RQLite node started successfully")
return nil
}
// prepareDataDir expands and creates the RQLite data directory
func (r *RQLiteManager) prepareDataDir() (string, error) {
// Expand ~ in data directory path
dataDir := os.ExpandEnv(r.dataDir)
if strings.HasPrefix(dataDir, "~") {
home, err := os.UserHomeDir()
if err != nil {
return "", fmt.Errorf("failed to determine home directory: %w", err)
}
dataDir = filepath.Join(home, dataDir[1:])
}
// Create data directory
rqliteDataDir := filepath.Join(dataDir, "rqlite")
if err := os.MkdirAll(rqliteDataDir, 0755); err != nil {
return "", fmt.Errorf("failed to create RQLite data directory: %w", err)
}
return rqliteDataDir, nil
}
// launchProcess starts the RQLite process with appropriate arguments
func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string) error {
// Build RQLite command // Build RQLite command
args := []string{ args := []string{
"-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort), "-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort),
@ -147,9 +185,7 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
zap.String("data_dir", rqliteDataDir), zap.String("data_dir", rqliteDataDir),
zap.Int("http_port", r.config.RQLitePort), zap.Int("http_port", r.config.RQLitePort),
zap.Int("raft_port", r.config.RQLiteRaftPort), zap.Int("raft_port", r.config.RQLiteRaftPort),
zap.String("join_address", r.config.RQLiteJoinAddress), zap.String("join_address", r.config.RQLiteJoinAddress))
zap.Strings("full_args", args),
)
// Start RQLite process (not bound to ctx for graceful Stop handling) // Start RQLite process (not bound to ctx for graceful Stop handling)
r.cmd = exec.Command("rqlited", args...) r.cmd = exec.Command("rqlited", args...)
@ -162,6 +198,11 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
return fmt.Errorf("failed to start RQLite: %w", err) return fmt.Errorf("failed to start RQLite: %w", err)
} }
return nil
}
// waitForReadyAndConnect waits for RQLite to be ready and establishes connection
func (r *RQLiteManager) waitForReadyAndConnect(ctx context.Context) error {
// Wait for RQLite to be ready // Wait for RQLite to be ready
if err := r.waitForReady(ctx); err != nil { if err := r.waitForReady(ctx); err != nil {
if r.cmd != nil && r.cmd.Process != nil { if r.cmd != nil && r.cmd.Process != nil {
@ -182,11 +223,15 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
// Sanity check: verify rqlite's node ID matches our configured raft address // Sanity check: verify rqlite's node ID matches our configured raft address
if err := r.validateNodeID(); err != nil { if err := r.validateNodeID(); err != nil {
r.logger.Warn("Node ID validation failed", zap.Error(err)) r.logger.Debug("Node ID validation skipped", zap.Error(err))
// Don't fail startup, but log the mismatch for debugging // Don't fail startup, but log at debug level
} }
// Leadership/SQL readiness gating with dynamic discovery support return nil
}
// establishLeadershipOrJoin establishes leadership (bootstrap) or waits for SQL availability (joining)
func (r *RQLiteManager) establishLeadershipOrJoin(ctx context.Context, rqliteDataDir string) error {
if r.config.RQLiteJoinAddress == "" { if r.config.RQLiteJoinAddress == "" {
// Bootstrap node logic with data safety checks // Bootstrap node logic with data safety checks
r.logger.Info("Bootstrap node: checking if safe to lead") r.logger.Info("Bootstrap node: checking if safe to lead")
@ -214,7 +259,9 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
leadershipErr := r.waitForLeadership(ctx) leadershipErr := r.waitForLeadership(ctx)
if leadershipErr == nil { if leadershipErr == nil {
r.logger.Info("Bootstrap node successfully established leadership") r.logger.Info("Bootstrap node successfully established leadership")
} else { return nil
}
r.logger.Warn("Initial leadership attempt failed, may need cluster recovery", r.logger.Warn("Initial leadership attempt failed, may need cluster recovery",
zap.Error(leadershipErr)) zap.Error(leadershipErr))
@ -230,6 +277,7 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
leadershipErr = r.waitForLeadership(ctx) leadershipErr = r.waitForLeadership(ctx)
if leadershipErr == nil { if leadershipErr == nil {
r.logger.Info("Bootstrap node established leadership after recovery") r.logger.Info("Bootstrap node established leadership after recovery")
return nil
} }
} else { } else {
r.logger.Warn("Cluster recovery failed", zap.Error(recoveryErr)) r.logger.Warn("Cluster recovery failed", zap.Error(recoveryErr))
@ -238,7 +286,6 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
} }
// Final fallback: SQL availability // Final fallback: SQL availability
if leadershipErr != nil {
r.logger.Warn("Leadership failed, trying SQL availability") r.logger.Warn("Leadership failed, trying SQL availability")
sqlCtx := ctx sqlCtx := ctx
if _, hasDeadline := ctx.Deadline(); !hasDeadline { if _, hasDeadline := ctx.Deadline(); !hasDeadline {
@ -252,8 +299,7 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
} }
return fmt.Errorf("RQLite SQL not available: %w", err) return fmt.Errorf("RQLite SQL not available: %w", err)
} }
} return nil
}
} else { } else {
// Joining node logic // Joining node logic
r.logger.Info("Waiting for RQLite SQL availability (leader discovery)") r.logger.Info("Waiting for RQLite SQL availability (leader discovery)")
@ -269,18 +315,8 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
} }
return fmt.Errorf("RQLite SQL not available: %w", err) return fmt.Errorf("RQLite SQL not available: %w", err)
} }
}
// After waitForLeadership / waitForSQLAvailable succeeds, before returning:
migrationsDir := "migrations"
if err := r.ApplyMigrations(ctx, migrationsDir); err != nil {
r.logger.Error("Migrations failed", zap.Error(err), zap.String("dir", migrationsDir))
return fmt.Errorf("apply migrations: %w", err)
}
r.logger.Info("RQLite node started successfully")
return nil return nil
}
} }
// hasExistingState returns true if the rqlite data directory already contains files or subdirectories. // hasExistingState returns true if the rqlite data directory already contains files or subdirectories.
@ -331,7 +367,12 @@ func (r *RQLiteManager) waitForReady(ctx context.Context) error {
func (r *RQLiteManager) waitForLeadership(ctx context.Context) error { func (r *RQLiteManager) waitForLeadership(ctx context.Context) error {
r.logger.Info("Waiting for RQLite to establish leadership...") r.logger.Info("Waiting for RQLite to establish leadership...")
for i := 0; i < 30; i++ { maxAttempts := 30
attempt := 0
backoffDelay := 500 * time.Millisecond
maxBackoff := 5 * time.Second
for attempt < maxAttempts {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
@ -345,10 +386,19 @@ func (r *RQLiteManager) waitForLeadership(ctx context.Context) error {
r.logger.Info("RQLite leadership established") r.logger.Info("RQLite leadership established")
return nil return nil
} }
r.logger.Debug("Waiting for leadership", zap.Error(err)) // Log every 5th attempt or on first attempt to reduce noise
if attempt%5 == 0 || attempt == 0 {
r.logger.Debug("Waiting for leadership", zap.Int("attempt", attempt+1), zap.Error(err))
}
} }
time.Sleep(1 * time.Second) // Exponential backoff with jitter
time.Sleep(backoffDelay)
backoffDelay = time.Duration(float64(backoffDelay) * 1.5)
if backoffDelay > maxBackoff {
backoffDelay = maxBackoff
}
attempt++
} }
return fmt.Errorf("RQLite failed to establish leadership within timeout") return fmt.Errorf("RQLite failed to establish leadership within timeout")
@ -728,7 +778,9 @@ func (r *RQLiteManager) validateNodeID() error {
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
continue continue
} }
return fmt.Errorf("failed to query nodes endpoint after retries: %w", err) // Log at debug level if validation fails - not critical
r.logger.Debug("Node ID validation skipped (endpoint unavailable)", zap.Error(err))
return nil
} }
expectedID := r.discoverConfig.RaftAdvAddress expectedID := r.discoverConfig.RaftAdvAddress
@ -736,6 +788,12 @@ func (r *RQLiteManager) validateNodeID() error {
return fmt.Errorf("raft_adv_address not configured") return fmt.Errorf("raft_adv_address not configured")
} }
// If cluster is still forming, nodes list might be empty - that's okay
if len(nodes) == 0 {
r.logger.Debug("Node ID validation skipped (cluster not yet formed)")
return nil
}
// Find our node in the cluster (match by address) // Find our node in the cluster (match by address)
for _, node := range nodes { for _, node := range nodes {
if node.Address == expectedID { if node.Address == expectedID {
@ -747,21 +805,16 @@ func (r *RQLiteManager) validateNodeID() error {
zap.String("explanation", "peers.json id field must match rqlite's node ID (raft address)")) zap.String("explanation", "peers.json id field must match rqlite's node ID (raft address)"))
return fmt.Errorf("node ID mismatch: configured %s but rqlite reports %s", expectedID, node.ID) return fmt.Errorf("node ID mismatch: configured %s but rqlite reports %s", expectedID, node.ID)
} }
r.logger.Info("Node ID validation passed", r.logger.Debug("Node ID validation passed",
zap.String("node_id", node.ID), zap.String("node_id", node.ID),
zap.String("address", node.Address)) zap.String("address", node.Address))
return nil return nil
} }
} }
// If cluster is still forming, nodes list might be empty - that's okay // If we can't find ourselves but other nodes exist, cluster might still be forming
if len(nodes) == 0 { // This is fine - don't log a warning
r.logger.Debug("Cluster membership not yet available, skipping validation") r.logger.Debug("Node ID validation skipped (node not yet in cluster membership)",
return nil
}
// If we can't find ourselves but other nodes exist, log a warning
r.logger.Warn("Could not find our node in cluster membership",
zap.String("expected_address", expectedID), zap.String("expected_address", expectedID),
zap.Int("nodes_in_cluster", len(nodes))) zap.Int("nodes_in_cluster", len(nodes)))
return nil return nil