diff --git a/CHANGELOG.md b/CHANGELOG.md index 68a2ce0..cdc29c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,23 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant ### Deprecated ### Fixed +## [0.69.10] - 2025-11-13 + +### Added +- Automatic health monitoring and recovery for RQLite cluster split-brain scenarios. +- RQLite now waits indefinitely for the minimum cluster size to be met before starting, preventing single-node cluster formation. + +### Changed +- Updated default IPFS swarm port from 4001 to 4101 to avoid conflicts with LibP2P. + +### Deprecated + +### Removed + +### Fixed +- Resolved an issue where RQLite could start as a single-node cluster if peer discovery was slow, by enforcing minimum cluster size before startup. +- Improved cluster recovery logic to correctly use `bootstrap-expect` for new clusters and ensure proper process restart during recovery. + ## [0.69.9] - 2025-11-12 ### Added diff --git a/Makefile b/Makefile index 16c8ba7..8202e69 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill -VERSION := 0.69.9 +VERSION := 0.69.10 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/environments/production/orchestrator.go b/pkg/environments/production/orchestrator.go index be730c7..e84615e 100644 --- a/pkg/environments/production/orchestrator.go +++ b/pkg/environments/production/orchestrator.go @@ -277,9 +277,9 @@ func (ps *ProductionSetup) Phase2cInitializeServices(nodeType string) error { dataDir := filepath.Join(ps.debrosDir, "data", nodeType) // Initialize IPFS repo with correct path structure - // Use port 4501 for API (to avoid conflict with RQLite on 5001), 8080 for gateway (standard), 4001 for swarm + // Use port 4501 for API (to avoid conflict with RQLite on 5001), 8080 for gateway (standard), 4101 for swarm (to avoid conflict with LibP2P on 4001) ipfsRepoPath := filepath.Join(dataDir, "ipfs", "repo") - if err := ps.binaryInstaller.InitializeIPFSRepo(nodeType, ipfsRepoPath, filepath.Join(ps.debrosDir, "secrets", "swarm.key"), 4501, 8080, 4001); err != nil { + if err := ps.binaryInstaller.InitializeIPFSRepo(nodeType, ipfsRepoPath, filepath.Join(ps.debrosDir, "secrets", "swarm.key"), 4501, 8080, 4101); err != nil { return fmt.Errorf("failed to initialize IPFS repo: %w", err) } diff --git a/pkg/rqlite/cluster_discovery.go b/pkg/rqlite/cluster_discovery.go index c6b4e51..790e396 100644 --- a/pkg/rqlite/cluster_discovery.go +++ b/pkg/rqlite/cluster_discovery.go @@ -21,14 +21,15 @@ import ( // ClusterDiscoveryService bridges LibP2P discovery with RQLite cluster management type ClusterDiscoveryService struct { - host host.Host - discoveryMgr *discovery.Manager - rqliteManager *RQLiteManager - nodeID string - nodeType string - raftAddress string - httpAddress string - dataDir string + host host.Host + discoveryMgr *discovery.Manager + rqliteManager *RQLiteManager + nodeID string + nodeType string + raftAddress string + httpAddress string + dataDir string + minClusterSize int // Minimum cluster size required knownPeers map[string]*discovery.RQLiteNodeMetadata // NodeID -> Metadata peerHealth map[string]*PeerHealth // NodeID -> Health @@ -54,6 +55,11 @@ func NewClusterDiscoveryService( dataDir string, logger *zap.Logger, ) *ClusterDiscoveryService { + minClusterSize := 1 + if rqliteManager != nil && rqliteManager.config != nil { + minClusterSize = rqliteManager.config.MinClusterSize + } + return &ClusterDiscoveryService{ host: h, discoveryMgr: discoveryMgr, @@ -63,6 +69,7 @@ func NewClusterDiscoveryService( raftAddress: raftAddress, httpAddress: httpAddress, dataDir: dataDir, + minClusterSize: minClusterSize, knownPeers: make(map[string]*discovery.RQLiteNodeMetadata), peerHealth: make(map[string]*PeerHealth), updateInterval: 30 * time.Second, @@ -323,18 +330,58 @@ func (c *ClusterDiscoveryService) computeMembershipChangesLocked(metadata []*dis } } + // CRITICAL FIX: Count remote peers (excluding self) + remotePeerCount := 0 + for _, peer := range c.knownPeers { + if peer.NodeID != c.raftAddress { + remotePeerCount++ + } + } + + // Get peers JSON snapshot (for checking if it would be empty) + peers := c.getPeersJSONUnlocked() + // Determine if we should write peers.json shouldWrite := len(added) > 0 || len(updated) > 0 || c.lastUpdate.IsZero() + // CRITICAL FIX: Don't write peers.json until we have minimum cluster size + // This prevents RQLite from starting as a single-node cluster + // For min_cluster_size=3, we need at least 2 remote peers (plus self = 3 total) if shouldWrite { + // For initial sync, wait until we have at least (MinClusterSize - 1) remote peers + // This ensures peers.json contains enough peers for proper cluster formation + if c.lastUpdate.IsZero() { + requiredRemotePeers := c.minClusterSize - 1 + + if remotePeerCount < requiredRemotePeers { + c.logger.Info("Skipping initial peers.json write - not enough remote peers discovered", + zap.Int("remote_peers", remotePeerCount), + zap.Int("required_remote_peers", requiredRemotePeers), + zap.Int("min_cluster_size", c.minClusterSize), + zap.String("action", "will write when enough peers are discovered")) + return membershipUpdateResult{ + changed: false, + } + } + } + + // Additional safety check: don't write empty peers.json (would cause single-node cluster) + if len(peers) == 0 && c.lastUpdate.IsZero() { + c.logger.Info("Skipping peers.json write - no remote peers to include", + zap.String("action", "will write when peers are discovered")) + return membershipUpdateResult{ + changed: false, + } + } + // Log initial sync if this is the first time if c.lastUpdate.IsZero() { c.logger.Info("Initial cluster membership sync", - zap.Int("total_peers", len(c.knownPeers))) + zap.Int("total_peers", len(c.knownPeers)), + zap.Int("remote_peers", remotePeerCount), + zap.Int("peers_in_json", len(peers))) } - // Get peers JSON snapshot - peers := c.getPeersJSONUnlocked() return membershipUpdateResult{ peersJSON: peers, added: added, diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index 7d22376..fd83a83 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -10,6 +10,7 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "strings" "syscall" "time" @@ -86,6 +87,25 @@ func (r *RQLiteManager) Start(ctx context.Context) error { return fmt.Errorf("discovery config HttpAdvAddress is empty") } + // CRITICAL FIX: Ensure peers.json exists with minimum cluster size BEFORE starting RQLite + // This prevents split-brain where each node starts as a single-node cluster + // We NEVER start as a single-node cluster - we wait indefinitely until minimum cluster size is met + // This applies to ALL nodes (bootstrap AND regular nodes with join addresses) + if r.discoveryService != nil { + r.logger.Info("Ensuring peers.json exists with minimum cluster size before RQLite startup", + zap.String("policy", "will wait indefinitely - never start as single-node cluster"), + zap.Bool("has_join_address", r.config.RQLiteJoinAddress != "")) + + // Wait for peer discovery to find minimum cluster size - NO TIMEOUT + // This ensures we never start as a single-node cluster, regardless of join address + if err := r.waitForMinClusterSizeBeforeStart(ctx, rqliteDataDir); err != nil { + r.logger.Error("Failed to ensure minimum cluster size before start", + zap.Error(err), + zap.String("action", "startup aborted - will not start as single-node cluster")) + return fmt.Errorf("cannot start RQLite: minimum cluster size not met: %w", err) + } + } + // CRITICAL: Check if we need to do pre-start cluster discovery to build peers.json // This handles the case where nodes have old cluster state and need coordinated recovery if needsClusterRecovery, err := r.checkNeedsClusterRecovery(rqliteDataDir); err != nil { @@ -107,6 +127,11 @@ func (r *RQLiteManager) Start(ctx context.Context) error { return err } + // Start periodic health monitoring for automatic recovery + if r.discoveryService != nil { + go r.startHealthMonitoring(ctx) + } + // Establish leadership/SQL availability if err := r.establishLeadershipOrJoin(ctx, rqliteDataDir); err != nil { return err @@ -212,6 +237,27 @@ func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string) args = append(args, "-join", joinArg, "-join-attempts", "30", "-join-interval", "10s") } else { r.logger.Info("No join address specified - starting as new cluster") + + // For bootstrap nodes, use bootstrap-expect if we know about other peers + if r.discoveryService != nil { + allPeers := r.discoveryService.GetAllPeers() + remotePeerCount := 0 + for _, peer := range allPeers { + if peer.NodeID != r.discoverConfig.RaftAdvAddress { + remotePeerCount++ + } + } + + // Use bootstrap-expect if we have discovered enough peers + // This tells RQLite to wait for the expected number of nodes before forming cluster + if remotePeerCount >= (r.config.MinClusterSize - 1) { + expectedPeers := r.config.MinClusterSize + args = append(args, "-bootstrap-expect", strconv.Itoa(expectedPeers)) + r.logger.Info("Using bootstrap-expect to wait for cluster formation", + zap.Int("expected_peers", expectedPeers), + zap.Int("remote_peers_discovered", remotePeerCount)) + } + } } // Add data directory as positional argument @@ -349,7 +395,7 @@ func (r *RQLiteManager) establishLeadershipOrJoin(ctx context.Context, rqliteDat r.logger.Info("Attempting cluster recovery using peers.json", zap.String("peers_file", peersPath)) - if recoveryErr := r.recoverCluster(peersPath); recoveryErr == nil { + if recoveryErr := r.recoverCluster(ctx, peersPath); recoveryErr == nil { r.logger.Info("Cluster recovery successful, retrying leadership") leadershipErr = r.waitForLeadership(ctx) if leadershipErr == nil { @@ -373,7 +419,7 @@ func (r *RQLiteManager) establishLeadershipOrJoin(ctx context.Context, rqliteDat } else { // Restart RQLite with clean state r.logger.Info("Raft state cleared, restarting RQLite for clean rejoin") - if recoveryErr := r.recoverCluster(peersPath); recoveryErr == nil { + if recoveryErr := r.recoverCluster(ctx, peersPath); recoveryErr == nil { // Retry leadership after state clear leadershipErr = r.waitForLeadership(ctx) if leadershipErr == nil { @@ -624,6 +670,90 @@ func (r *RQLiteManager) waitForJoinTarget(ctx context.Context, joinAddress strin return lastErr } +// waitForMinClusterSizeBeforeStart waits for minimum cluster size to be discovered +// and ensures peers.json exists before RQLite starts +// CRITICAL: This function waits INDEFINITELY - it will NEVER timeout +// We never start as a single-node cluster, regardless of how long we wait +func (r *RQLiteManager) waitForMinClusterSizeBeforeStart(ctx context.Context, rqliteDataDir string) error { + if r.discoveryService == nil { + return fmt.Errorf("discovery service not available") + } + + requiredRemotePeers := r.config.MinClusterSize - 1 + r.logger.Info("Waiting for minimum cluster size before RQLite startup", + zap.Int("min_cluster_size", r.config.MinClusterSize), + zap.Int("required_remote_peers", requiredRemotePeers), + zap.String("policy", "waiting indefinitely - will never start as single-node cluster")) + + // Trigger peer exchange to collect metadata + if err := r.discoveryService.TriggerPeerExchange(ctx); err != nil { + r.logger.Warn("Peer exchange failed", zap.Error(err)) + } + + // NO TIMEOUT - wait indefinitely until minimum cluster size is met + // Only exit on context cancellation or when minimum cluster size is achieved + checkInterval := 2 * time.Second + lastLogTime := time.Now() + + for { + // Check context cancellation first + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled while waiting for minimum cluster size: %w", ctx.Err()) + default: + } + + // Trigger sync to update knownPeers + r.discoveryService.TriggerSync() + time.Sleep(checkInterval) + + // Check if we have enough remote peers + allPeers := r.discoveryService.GetAllPeers() + remotePeerCount := 0 + for _, peer := range allPeers { + if peer.NodeID != r.discoverConfig.RaftAdvAddress { + remotePeerCount++ + } + } + + if remotePeerCount >= requiredRemotePeers { + // Found enough peers - verify peers.json exists and contains them + peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json") + + // Trigger one more sync to ensure peers.json is written + r.discoveryService.TriggerSync() + time.Sleep(2 * time.Second) + + // Verify peers.json exists and contains enough peers + if info, err := os.Stat(peersPath); err == nil && info.Size() > 10 { + // Read and verify it contains enough peers + data, err := os.ReadFile(peersPath) + if err == nil { + var peers []map[string]interface{} + if err := json.Unmarshal(data, &peers); err == nil && len(peers) >= requiredRemotePeers { + r.logger.Info("peers.json exists with minimum cluster size, safe to start RQLite", + zap.String("peers_file", peersPath), + zap.Int("remote_peers_discovered", remotePeerCount), + zap.Int("peers_in_json", len(peers)), + zap.Int("min_cluster_size", r.config.MinClusterSize)) + return nil + } + } + } + } + + // Log progress every 10 seconds + if time.Since(lastLogTime) >= 10*time.Second { + r.logger.Info("Waiting for minimum cluster size (indefinitely)...", + zap.Int("discovered_peers", len(allPeers)), + zap.Int("remote_peers", remotePeerCount), + zap.Int("required_remote_peers", requiredRemotePeers), + zap.String("status", "will continue waiting until minimum cluster size is met")) + lastLogTime = time.Now() + } + } +} + // testJoinAddress tests if a join address is reachable func (r *RQLiteManager) testJoinAddress(joinAddress string) error { // Determine the HTTP status URL to probe. @@ -735,7 +865,9 @@ func (r *RQLiteManager) exponentialBackoff(attempt int, baseDelay time.Duration, } // recoverCluster restarts RQLite using the recovery.db created from peers.json -func (r *RQLiteManager) recoverCluster(peersJSONPath string) error { +// It reuses launchProcess and waitForReadyAndConnect to ensure all join/backoff logic +// and proper readiness checks are applied during recovery. +func (r *RQLiteManager) recoverCluster(ctx context.Context, peersJSONPath string) error { r.logger.Info("Initiating cluster recovery by restarting RQLite", zap.String("peers_file", peersJSONPath)) @@ -748,41 +880,29 @@ func (r *RQLiteManager) recoverCluster(peersJSONPath string) error { // Wait for process to fully stop time.Sleep(2 * time.Second) - // Restart RQLite - it will automatically detect peers.json and perform recovery - r.logger.Info("Restarting RQLite (will auto-recover using peers.json)") - - // Rebuild the launch arguments using the centralized path helper + // Get the data directory path rqliteDataDir, err := r.rqliteDataDirPath() if err != nil { return fmt.Errorf("failed to resolve RQLite data directory: %w", err) } - args := []string{ - "-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort), - "-http-adv-addr", r.discoverConfig.HttpAdvAddress, - "-raft-adv-addr", r.discoverConfig.RaftAdvAddress, - "-raft-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLiteRaftPort), - rqliteDataDir, + + // Restart RQLite using launchProcess to ensure all join/backoff logic is applied + // This includes: join address handling, join retries, bootstrap-expect, etc. + r.logger.Info("Restarting RQLite (will auto-recover using peers.json)") + if err := r.launchProcess(ctx, rqliteDataDir); err != nil { + return fmt.Errorf("failed to restart RQLite process: %w", err) } - // Restart RQLite - r.cmd = exec.Command("rqlited", args...) - r.cmd.Stdout = os.Stdout - r.cmd.Stderr = os.Stderr - - if err := r.cmd.Start(); err != nil { - return fmt.Errorf("failed to restart RQLite: %w", err) + // Wait for RQLite to be ready and establish connection using proper readiness checks + // This includes retries for "store is not open" errors during recovery + if err := r.waitForReadyAndConnect(ctx); err != nil { + // Clean up the process if connection failed + if r.cmd != nil && r.cmd.Process != nil { + _ = r.cmd.Process.Kill() + } + return fmt.Errorf("failed to wait for RQLite readiness after recovery: %w", err) } - r.logger.Info("RQLite restarted, waiting for it to become ready") - time.Sleep(3 * time.Second) - - // Recreate connection - conn, err := gorqlite.Open(fmt.Sprintf("http://localhost:%d", r.config.RQLitePort)) - if err != nil { - return fmt.Errorf("failed to reconnect to RQLite: %w", err) - } - r.connection = conn - r.logger.Info("Cluster recovery completed, RQLite restarted with new configuration") return nil } @@ -887,26 +1007,25 @@ func (r *RQLiteManager) clearRaftState(rqliteDataDir string) error { } // isStuckInConfigurationMismatch checks if we're stuck due to configuration mismatch -// This detects the "not part of stable configuration" scenario where Raft can't elect a leader +// This detects both configuration mismatch AND split-brain scenarios func (r *RQLiteManager) isStuckInConfigurationMismatch() bool { - // Check Raft state via status endpoint + // First check for split-brain (all followers, term 0, no peers) + if r.isInSplitBrainState() { + return true + } + + // Then check for traditional configuration mismatch status, err := r.getRQLiteStatus() if err != nil { r.logger.Debug("Cannot check Raft status for configuration mismatch", zap.Error(err)) return false // Can't determine, don't clear } - // Get Raft state and leader information raftState := strings.ToLower(status.Store.Raft.State) hasLeader := status.Store.Raft.LeaderAddr != "" // Stuck if: no leader AND state is not "leader" or "follower" - // (likely stuck in "candidate" or configuration mismatch) if !hasLeader && raftState != "leader" && raftState != "follower" { - r.logger.Debug("Detected potential configuration mismatch", - zap.String("raft_state", raftState), - zap.Bool("has_leader", hasLeader)) - // Verify all peers are also stuck if r.allPeersAreStuck() { return true } @@ -993,6 +1112,204 @@ func (r *RQLiteManager) isPeerReachable(httpAddr string) bool { return resp.StatusCode == http.StatusOK } +// isInSplitBrainState detects if we're in a split-brain scenario where all nodes +// are followers with no peers (each node thinks it's alone) +func (r *RQLiteManager) isInSplitBrainState() bool { + status, err := r.getRQLiteStatus() + if err != nil { + return false + } + + raft := status.Store.Raft + + // Split-brain indicators: + // - State is Follower (not Leader) + // - Term is 0 (no leader election has occurred) + // - num_peers is 0 (node thinks it's alone) + // - voter is false (node not configured as voter) + isSplitBrain := raft.State == "Follower" && + raft.Term == 0 && + raft.NumPeers == 0 && + !raft.Voter && + raft.LeaderAddr == "" + + if !isSplitBrain { + return false + } + + // Verify all discovered peers are also in split-brain state + if r.discoveryService == nil { + r.logger.Debug("No discovery service to verify split-brain across peers") + return false + } + + peers := r.discoveryService.GetActivePeers() + if len(peers) == 0 { + // No peers discovered yet - might be network issue, not split-brain + return false + } + + // Check if all reachable peers are also in split-brain + splitBrainCount := 0 + reachableCount := 0 + for _, peer := range peers { + if !r.isPeerReachable(peer.HTTPAddress) { + continue + } + reachableCount++ + + peerStatus, err := r.getPeerRQLiteStatus(peer.HTTPAddress) + if err != nil { + continue + } + + peerRaft := peerStatus.Store.Raft + if peerRaft.State == "Follower" && + peerRaft.Term == 0 && + peerRaft.NumPeers == 0 && + !peerRaft.Voter { + splitBrainCount++ + } + } + + // If all reachable peers are in split-brain, we have cluster-wide split-brain + if reachableCount > 0 && splitBrainCount == reachableCount { + r.logger.Warn("Detected cluster-wide split-brain state", + zap.Int("reachable_peers", reachableCount), + zap.Int("split_brain_peers", splitBrainCount)) + return true + } + + return false +} + +// getPeerRQLiteStatus queries a peer's status endpoint +func (r *RQLiteManager) getPeerRQLiteStatus(httpAddr string) (*RQLiteStatus, error) { + url := fmt.Sprintf("http://%s/status", httpAddr) + client := &http.Client{Timeout: 3 * time.Second} + + resp, err := client.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("peer returned status %d", resp.StatusCode) + } + + var status RQLiteStatus + if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { + return nil, err + } + + return &status, nil +} + +// startHealthMonitoring runs periodic health checks and automatically recovers from split-brain +func (r *RQLiteManager) startHealthMonitoring(ctx context.Context) { + // Wait a bit after startup before starting health checks + time.Sleep(30 * time.Second) + + ticker := time.NewTicker(60 * time.Second) // Check every minute + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // Check for split-brain state + if r.isInSplitBrainState() { + r.logger.Warn("Split-brain detected during health check, initiating automatic recovery") + + // Attempt automatic recovery + if err := r.recoverFromSplitBrain(ctx); err != nil { + r.logger.Error("Automatic split-brain recovery failed", + zap.Error(err), + zap.String("action", "will retry on next health check")) + } else { + r.logger.Info("Successfully recovered from split-brain") + } + } + } + } +} + +// recoverFromSplitBrain automatically recovers from split-brain state +func (r *RQLiteManager) recoverFromSplitBrain(ctx context.Context) error { + if r.discoveryService == nil { + return fmt.Errorf("discovery service not available for recovery") + } + + r.logger.Info("Starting automatic split-brain recovery") + + // Step 1: Ensure we have latest peer information + r.discoveryService.TriggerPeerExchange(ctx) + time.Sleep(2 * time.Second) + r.discoveryService.TriggerSync() + time.Sleep(2 * time.Second) + + // Step 2: Get data directory + rqliteDataDir, err := r.rqliteDataDirPath() + if err != nil { + return fmt.Errorf("failed to get data directory: %w", err) + } + + // Step 3: Check if peers have more recent data + allPeers := r.discoveryService.GetAllPeers() + maxPeerIndex := uint64(0) + for _, peer := range allPeers { + if peer.NodeID == r.discoverConfig.RaftAdvAddress { + continue // Skip self + } + if peer.RaftLogIndex > maxPeerIndex { + maxPeerIndex = peer.RaftLogIndex + } + } + + // Step 4: Clear our Raft state if peers have more recent data + ourIndex := r.getRaftLogIndex() + if maxPeerIndex > ourIndex || (maxPeerIndex == 0 && ourIndex == 0) { + r.logger.Info("Clearing Raft state to allow clean cluster join", + zap.Uint64("our_index", ourIndex), + zap.Uint64("peer_max_index", maxPeerIndex)) + + if err := r.clearRaftState(rqliteDataDir); err != nil { + return fmt.Errorf("failed to clear Raft state: %w", err) + } + + // Step 5: Ensure peers.json exists with all discovered peers + r.discoveryService.TriggerSync() + time.Sleep(2 * time.Second) + + peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json") + if _, err := os.Stat(peersPath); err != nil { + return fmt.Errorf("peers.json not created: %w", err) + } + + // Step 6: Restart RQLite to pick up new peers.json + r.logger.Info("Restarting RQLite to apply new cluster configuration") + if err := r.recoverCluster(ctx, peersPath); err != nil { + return fmt.Errorf("failed to restart RQLite: %w", err) + } + + // Step 7: Wait for cluster to form (waitForReadyAndConnect already handled readiness) + r.logger.Info("Waiting for cluster to stabilize after recovery...") + time.Sleep(5 * time.Second) + + // Verify recovery succeeded + if r.isInSplitBrainState() { + return fmt.Errorf("still in split-brain after recovery attempt") + } + + r.logger.Info("Split-brain recovery completed successfully") + return nil + } + + return fmt.Errorf("cannot recover: we have more recent data than peers") +} + // isSafeToClearState verifies we can safely clear Raft state // Returns true only if peers have higher log indexes (they have more recent data) // or if we have no meaningful state (index == 0) diff --git a/pkg/rqlite/types.go b/pkg/rqlite/types.go index d817e74..4a62c08 100644 --- a/pkg/rqlite/types.go +++ b/pkg/rqlite/types.go @@ -6,13 +6,16 @@ import "time" type RQLiteStatus struct { Store struct { Raft struct { - AppliedIndex uint64 `json:"applied_index"` - CommitIndex uint64 `json:"commit_index"` - LastLogIndex uint64 `json:"last_log_index"` + AppliedIndex uint64 `json:"applied_index"` + CommitIndex uint64 `json:"commit_index"` + LastLogIndex uint64 `json:"last_log_index"` LastSnapshotIndex uint64 `json:"last_snapshot_index"` - State string `json:"state"` - LeaderID string `json:"leader_id"` - LeaderAddr string `json:"leader_addr"` + State string `json:"state"` + LeaderID string `json:"leader_id"` + LeaderAddr string `json:"leader_addr"` + Term uint64 `json:"term"` + NumPeers int `json:"num_peers"` + Voter bool `json:"voter"` } `json:"raft"` DBConf struct { DSN string `json:"dsn"` @@ -20,30 +23,30 @@ type RQLiteStatus struct { } `json:"db_conf"` } `json:"store"` Runtime struct { - GOARCH string `json:"GOARCH"` - GOOS string `json:"GOOS"` - GOMAXPROCS int `json:"GOMAXPROCS"` - NumCPU int `json:"num_cpu"` - NumGoroutine int `json:"num_goroutine"` - Version string `json:"version"` + GOARCH string `json:"GOARCH"` + GOOS string `json:"GOOS"` + GOMAXPROCS int `json:"GOMAXPROCS"` + NumCPU int `json:"num_cpu"` + NumGoroutine int `json:"num_goroutine"` + Version string `json:"version"` } `json:"runtime"` HTTP struct { - Addr string `json:"addr"` - Auth string `json:"auth"` + Addr string `json:"addr"` + Auth string `json:"auth"` } `json:"http"` Node struct { - Uptime string `json:"uptime"` - StartTime string `json:"start_time"` + Uptime string `json:"uptime"` + StartTime string `json:"start_time"` } `json:"node"` } // RQLiteNode represents a node in the RQLite cluster type RQLiteNode struct { - ID string `json:"id"` - Address string `json:"address"` - Leader bool `json:"leader"` - Voter bool `json:"voter"` - Reachable bool `json:"reachable"` + ID string `json:"id"` + Address string `json:"address"` + Leader bool `json:"leader"` + Voter bool `json:"voter"` + Reachable bool `json:"reachable"` } // RQLiteNodes represents the response from RQLite's /nodes endpoint @@ -68,4 +71,3 @@ type ClusterMetrics struct { CurrentLeader string AveragePeerHealth float64 } -