mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-12-11 08:18:49 +00:00
feat: add service enable/disable functionality to production commands
- Introduced new functions to check if a service is enabled and to enable or disable services as needed during production command execution. - Enhanced the `handleProdStart` and `handleProdStop` functions to manage service states more effectively, ensuring services are re-enabled after being stopped and disabled when stopped. - Improved logging to provide clear feedback on service status changes, enhancing user experience during service management.
This commit is contained in:
parent
7f77836d73
commit
47ffe817b4
17
CHANGELOG.md
17
CHANGELOG.md
@ -13,6 +13,23 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant
|
||||
### Deprecated
|
||||
|
||||
### Fixed
|
||||
## [0.69.10] - 2025-11-13
|
||||
|
||||
### Added
|
||||
- Automatic health monitoring and recovery for RQLite cluster split-brain scenarios.
|
||||
- RQLite now waits indefinitely for the minimum cluster size to be met before starting, preventing single-node cluster formation.
|
||||
|
||||
### Changed
|
||||
- Updated default IPFS swarm port from 4001 to 4101 to avoid conflicts with LibP2P.
|
||||
|
||||
### Deprecated
|
||||
|
||||
### Removed
|
||||
|
||||
### Fixed
|
||||
- Resolved an issue where RQLite could start as a single-node cluster if peer discovery was slow, by enforcing minimum cluster size before startup.
|
||||
- Improved cluster recovery logic to correctly use `bootstrap-expect` for new clusters and ensure proper process restart during recovery.
|
||||
|
||||
## [0.69.9] - 2025-11-12
|
||||
|
||||
### Added
|
||||
|
||||
2
Makefile
2
Makefile
@ -19,7 +19,7 @@ test-e2e:
|
||||
|
||||
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill
|
||||
|
||||
VERSION := 0.69.9
|
||||
VERSION := 0.69.10
|
||||
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
|
||||
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
|
||||
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
|
||||
|
||||
@ -277,9 +277,9 @@ func (ps *ProductionSetup) Phase2cInitializeServices(nodeType string) error {
|
||||
dataDir := filepath.Join(ps.debrosDir, "data", nodeType)
|
||||
|
||||
// Initialize IPFS repo with correct path structure
|
||||
// Use port 4501 for API (to avoid conflict with RQLite on 5001), 8080 for gateway (standard), 4001 for swarm
|
||||
// Use port 4501 for API (to avoid conflict with RQLite on 5001), 8080 for gateway (standard), 4101 for swarm (to avoid conflict with LibP2P on 4001)
|
||||
ipfsRepoPath := filepath.Join(dataDir, "ipfs", "repo")
|
||||
if err := ps.binaryInstaller.InitializeIPFSRepo(nodeType, ipfsRepoPath, filepath.Join(ps.debrosDir, "secrets", "swarm.key"), 4501, 8080, 4001); err != nil {
|
||||
if err := ps.binaryInstaller.InitializeIPFSRepo(nodeType, ipfsRepoPath, filepath.Join(ps.debrosDir, "secrets", "swarm.key"), 4501, 8080, 4101); err != nil {
|
||||
return fmt.Errorf("failed to initialize IPFS repo: %w", err)
|
||||
}
|
||||
|
||||
|
||||
@ -21,14 +21,15 @@ import (
|
||||
|
||||
// ClusterDiscoveryService bridges LibP2P discovery with RQLite cluster management
|
||||
type ClusterDiscoveryService struct {
|
||||
host host.Host
|
||||
discoveryMgr *discovery.Manager
|
||||
rqliteManager *RQLiteManager
|
||||
nodeID string
|
||||
nodeType string
|
||||
raftAddress string
|
||||
httpAddress string
|
||||
dataDir string
|
||||
host host.Host
|
||||
discoveryMgr *discovery.Manager
|
||||
rqliteManager *RQLiteManager
|
||||
nodeID string
|
||||
nodeType string
|
||||
raftAddress string
|
||||
httpAddress string
|
||||
dataDir string
|
||||
minClusterSize int // Minimum cluster size required
|
||||
|
||||
knownPeers map[string]*discovery.RQLiteNodeMetadata // NodeID -> Metadata
|
||||
peerHealth map[string]*PeerHealth // NodeID -> Health
|
||||
@ -54,6 +55,11 @@ func NewClusterDiscoveryService(
|
||||
dataDir string,
|
||||
logger *zap.Logger,
|
||||
) *ClusterDiscoveryService {
|
||||
minClusterSize := 1
|
||||
if rqliteManager != nil && rqliteManager.config != nil {
|
||||
minClusterSize = rqliteManager.config.MinClusterSize
|
||||
}
|
||||
|
||||
return &ClusterDiscoveryService{
|
||||
host: h,
|
||||
discoveryMgr: discoveryMgr,
|
||||
@ -63,6 +69,7 @@ func NewClusterDiscoveryService(
|
||||
raftAddress: raftAddress,
|
||||
httpAddress: httpAddress,
|
||||
dataDir: dataDir,
|
||||
minClusterSize: minClusterSize,
|
||||
knownPeers: make(map[string]*discovery.RQLiteNodeMetadata),
|
||||
peerHealth: make(map[string]*PeerHealth),
|
||||
updateInterval: 30 * time.Second,
|
||||
@ -323,18 +330,58 @@ func (c *ClusterDiscoveryService) computeMembershipChangesLocked(metadata []*dis
|
||||
}
|
||||
}
|
||||
|
||||
// CRITICAL FIX: Count remote peers (excluding self)
|
||||
remotePeerCount := 0
|
||||
for _, peer := range c.knownPeers {
|
||||
if peer.NodeID != c.raftAddress {
|
||||
remotePeerCount++
|
||||
}
|
||||
}
|
||||
|
||||
// Get peers JSON snapshot (for checking if it would be empty)
|
||||
peers := c.getPeersJSONUnlocked()
|
||||
|
||||
// Determine if we should write peers.json
|
||||
shouldWrite := len(added) > 0 || len(updated) > 0 || c.lastUpdate.IsZero()
|
||||
|
||||
// CRITICAL FIX: Don't write peers.json until we have minimum cluster size
|
||||
// This prevents RQLite from starting as a single-node cluster
|
||||
// For min_cluster_size=3, we need at least 2 remote peers (plus self = 3 total)
|
||||
if shouldWrite {
|
||||
// For initial sync, wait until we have at least (MinClusterSize - 1) remote peers
|
||||
// This ensures peers.json contains enough peers for proper cluster formation
|
||||
if c.lastUpdate.IsZero() {
|
||||
requiredRemotePeers := c.minClusterSize - 1
|
||||
|
||||
if remotePeerCount < requiredRemotePeers {
|
||||
c.logger.Info("Skipping initial peers.json write - not enough remote peers discovered",
|
||||
zap.Int("remote_peers", remotePeerCount),
|
||||
zap.Int("required_remote_peers", requiredRemotePeers),
|
||||
zap.Int("min_cluster_size", c.minClusterSize),
|
||||
zap.String("action", "will write when enough peers are discovered"))
|
||||
return membershipUpdateResult{
|
||||
changed: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Additional safety check: don't write empty peers.json (would cause single-node cluster)
|
||||
if len(peers) == 0 && c.lastUpdate.IsZero() {
|
||||
c.logger.Info("Skipping peers.json write - no remote peers to include",
|
||||
zap.String("action", "will write when peers are discovered"))
|
||||
return membershipUpdateResult{
|
||||
changed: false,
|
||||
}
|
||||
}
|
||||
|
||||
// Log initial sync if this is the first time
|
||||
if c.lastUpdate.IsZero() {
|
||||
c.logger.Info("Initial cluster membership sync",
|
||||
zap.Int("total_peers", len(c.knownPeers)))
|
||||
zap.Int("total_peers", len(c.knownPeers)),
|
||||
zap.Int("remote_peers", remotePeerCount),
|
||||
zap.Int("peers_in_json", len(peers)))
|
||||
}
|
||||
|
||||
// Get peers JSON snapshot
|
||||
peers := c.getPeersJSONUnlocked()
|
||||
return membershipUpdateResult{
|
||||
peersJSON: peers,
|
||||
added: added,
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
@ -86,6 +87,25 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
|
||||
return fmt.Errorf("discovery config HttpAdvAddress is empty")
|
||||
}
|
||||
|
||||
// CRITICAL FIX: Ensure peers.json exists with minimum cluster size BEFORE starting RQLite
|
||||
// This prevents split-brain where each node starts as a single-node cluster
|
||||
// We NEVER start as a single-node cluster - we wait indefinitely until minimum cluster size is met
|
||||
// This applies to ALL nodes (bootstrap AND regular nodes with join addresses)
|
||||
if r.discoveryService != nil {
|
||||
r.logger.Info("Ensuring peers.json exists with minimum cluster size before RQLite startup",
|
||||
zap.String("policy", "will wait indefinitely - never start as single-node cluster"),
|
||||
zap.Bool("has_join_address", r.config.RQLiteJoinAddress != ""))
|
||||
|
||||
// Wait for peer discovery to find minimum cluster size - NO TIMEOUT
|
||||
// This ensures we never start as a single-node cluster, regardless of join address
|
||||
if err := r.waitForMinClusterSizeBeforeStart(ctx, rqliteDataDir); err != nil {
|
||||
r.logger.Error("Failed to ensure minimum cluster size before start",
|
||||
zap.Error(err),
|
||||
zap.String("action", "startup aborted - will not start as single-node cluster"))
|
||||
return fmt.Errorf("cannot start RQLite: minimum cluster size not met: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// CRITICAL: Check if we need to do pre-start cluster discovery to build peers.json
|
||||
// This handles the case where nodes have old cluster state and need coordinated recovery
|
||||
if needsClusterRecovery, err := r.checkNeedsClusterRecovery(rqliteDataDir); err != nil {
|
||||
@ -107,6 +127,11 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start periodic health monitoring for automatic recovery
|
||||
if r.discoveryService != nil {
|
||||
go r.startHealthMonitoring(ctx)
|
||||
}
|
||||
|
||||
// Establish leadership/SQL availability
|
||||
if err := r.establishLeadershipOrJoin(ctx, rqliteDataDir); err != nil {
|
||||
return err
|
||||
@ -212,6 +237,27 @@ func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string)
|
||||
args = append(args, "-join", joinArg, "-join-attempts", "30", "-join-interval", "10s")
|
||||
} else {
|
||||
r.logger.Info("No join address specified - starting as new cluster")
|
||||
|
||||
// For bootstrap nodes, use bootstrap-expect if we know about other peers
|
||||
if r.discoveryService != nil {
|
||||
allPeers := r.discoveryService.GetAllPeers()
|
||||
remotePeerCount := 0
|
||||
for _, peer := range allPeers {
|
||||
if peer.NodeID != r.discoverConfig.RaftAdvAddress {
|
||||
remotePeerCount++
|
||||
}
|
||||
}
|
||||
|
||||
// Use bootstrap-expect if we have discovered enough peers
|
||||
// This tells RQLite to wait for the expected number of nodes before forming cluster
|
||||
if remotePeerCount >= (r.config.MinClusterSize - 1) {
|
||||
expectedPeers := r.config.MinClusterSize
|
||||
args = append(args, "-bootstrap-expect", strconv.Itoa(expectedPeers))
|
||||
r.logger.Info("Using bootstrap-expect to wait for cluster formation",
|
||||
zap.Int("expected_peers", expectedPeers),
|
||||
zap.Int("remote_peers_discovered", remotePeerCount))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add data directory as positional argument
|
||||
@ -349,7 +395,7 @@ func (r *RQLiteManager) establishLeadershipOrJoin(ctx context.Context, rqliteDat
|
||||
r.logger.Info("Attempting cluster recovery using peers.json",
|
||||
zap.String("peers_file", peersPath))
|
||||
|
||||
if recoveryErr := r.recoverCluster(peersPath); recoveryErr == nil {
|
||||
if recoveryErr := r.recoverCluster(ctx, peersPath); recoveryErr == nil {
|
||||
r.logger.Info("Cluster recovery successful, retrying leadership")
|
||||
leadershipErr = r.waitForLeadership(ctx)
|
||||
if leadershipErr == nil {
|
||||
@ -373,7 +419,7 @@ func (r *RQLiteManager) establishLeadershipOrJoin(ctx context.Context, rqliteDat
|
||||
} else {
|
||||
// Restart RQLite with clean state
|
||||
r.logger.Info("Raft state cleared, restarting RQLite for clean rejoin")
|
||||
if recoveryErr := r.recoverCluster(peersPath); recoveryErr == nil {
|
||||
if recoveryErr := r.recoverCluster(ctx, peersPath); recoveryErr == nil {
|
||||
// Retry leadership after state clear
|
||||
leadershipErr = r.waitForLeadership(ctx)
|
||||
if leadershipErr == nil {
|
||||
@ -624,6 +670,90 @@ func (r *RQLiteManager) waitForJoinTarget(ctx context.Context, joinAddress strin
|
||||
return lastErr
|
||||
}
|
||||
|
||||
// waitForMinClusterSizeBeforeStart waits for minimum cluster size to be discovered
|
||||
// and ensures peers.json exists before RQLite starts
|
||||
// CRITICAL: This function waits INDEFINITELY - it will NEVER timeout
|
||||
// We never start as a single-node cluster, regardless of how long we wait
|
||||
func (r *RQLiteManager) waitForMinClusterSizeBeforeStart(ctx context.Context, rqliteDataDir string) error {
|
||||
if r.discoveryService == nil {
|
||||
return fmt.Errorf("discovery service not available")
|
||||
}
|
||||
|
||||
requiredRemotePeers := r.config.MinClusterSize - 1
|
||||
r.logger.Info("Waiting for minimum cluster size before RQLite startup",
|
||||
zap.Int("min_cluster_size", r.config.MinClusterSize),
|
||||
zap.Int("required_remote_peers", requiredRemotePeers),
|
||||
zap.String("policy", "waiting indefinitely - will never start as single-node cluster"))
|
||||
|
||||
// Trigger peer exchange to collect metadata
|
||||
if err := r.discoveryService.TriggerPeerExchange(ctx); err != nil {
|
||||
r.logger.Warn("Peer exchange failed", zap.Error(err))
|
||||
}
|
||||
|
||||
// NO TIMEOUT - wait indefinitely until minimum cluster size is met
|
||||
// Only exit on context cancellation or when minimum cluster size is achieved
|
||||
checkInterval := 2 * time.Second
|
||||
lastLogTime := time.Now()
|
||||
|
||||
for {
|
||||
// Check context cancellation first
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("context cancelled while waiting for minimum cluster size: %w", ctx.Err())
|
||||
default:
|
||||
}
|
||||
|
||||
// Trigger sync to update knownPeers
|
||||
r.discoveryService.TriggerSync()
|
||||
time.Sleep(checkInterval)
|
||||
|
||||
// Check if we have enough remote peers
|
||||
allPeers := r.discoveryService.GetAllPeers()
|
||||
remotePeerCount := 0
|
||||
for _, peer := range allPeers {
|
||||
if peer.NodeID != r.discoverConfig.RaftAdvAddress {
|
||||
remotePeerCount++
|
||||
}
|
||||
}
|
||||
|
||||
if remotePeerCount >= requiredRemotePeers {
|
||||
// Found enough peers - verify peers.json exists and contains them
|
||||
peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json")
|
||||
|
||||
// Trigger one more sync to ensure peers.json is written
|
||||
r.discoveryService.TriggerSync()
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Verify peers.json exists and contains enough peers
|
||||
if info, err := os.Stat(peersPath); err == nil && info.Size() > 10 {
|
||||
// Read and verify it contains enough peers
|
||||
data, err := os.ReadFile(peersPath)
|
||||
if err == nil {
|
||||
var peers []map[string]interface{}
|
||||
if err := json.Unmarshal(data, &peers); err == nil && len(peers) >= requiredRemotePeers {
|
||||
r.logger.Info("peers.json exists with minimum cluster size, safe to start RQLite",
|
||||
zap.String("peers_file", peersPath),
|
||||
zap.Int("remote_peers_discovered", remotePeerCount),
|
||||
zap.Int("peers_in_json", len(peers)),
|
||||
zap.Int("min_cluster_size", r.config.MinClusterSize))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Log progress every 10 seconds
|
||||
if time.Since(lastLogTime) >= 10*time.Second {
|
||||
r.logger.Info("Waiting for minimum cluster size (indefinitely)...",
|
||||
zap.Int("discovered_peers", len(allPeers)),
|
||||
zap.Int("remote_peers", remotePeerCount),
|
||||
zap.Int("required_remote_peers", requiredRemotePeers),
|
||||
zap.String("status", "will continue waiting until minimum cluster size is met"))
|
||||
lastLogTime = time.Now()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// testJoinAddress tests if a join address is reachable
|
||||
func (r *RQLiteManager) testJoinAddress(joinAddress string) error {
|
||||
// Determine the HTTP status URL to probe.
|
||||
@ -735,7 +865,9 @@ func (r *RQLiteManager) exponentialBackoff(attempt int, baseDelay time.Duration,
|
||||
}
|
||||
|
||||
// recoverCluster restarts RQLite using the recovery.db created from peers.json
|
||||
func (r *RQLiteManager) recoverCluster(peersJSONPath string) error {
|
||||
// It reuses launchProcess and waitForReadyAndConnect to ensure all join/backoff logic
|
||||
// and proper readiness checks are applied during recovery.
|
||||
func (r *RQLiteManager) recoverCluster(ctx context.Context, peersJSONPath string) error {
|
||||
r.logger.Info("Initiating cluster recovery by restarting RQLite",
|
||||
zap.String("peers_file", peersJSONPath))
|
||||
|
||||
@ -748,41 +880,29 @@ func (r *RQLiteManager) recoverCluster(peersJSONPath string) error {
|
||||
// Wait for process to fully stop
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Restart RQLite - it will automatically detect peers.json and perform recovery
|
||||
r.logger.Info("Restarting RQLite (will auto-recover using peers.json)")
|
||||
|
||||
// Rebuild the launch arguments using the centralized path helper
|
||||
// Get the data directory path
|
||||
rqliteDataDir, err := r.rqliteDataDirPath()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to resolve RQLite data directory: %w", err)
|
||||
}
|
||||
args := []string{
|
||||
"-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort),
|
||||
"-http-adv-addr", r.discoverConfig.HttpAdvAddress,
|
||||
"-raft-adv-addr", r.discoverConfig.RaftAdvAddress,
|
||||
"-raft-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLiteRaftPort),
|
||||
rqliteDataDir,
|
||||
|
||||
// Restart RQLite using launchProcess to ensure all join/backoff logic is applied
|
||||
// This includes: join address handling, join retries, bootstrap-expect, etc.
|
||||
r.logger.Info("Restarting RQLite (will auto-recover using peers.json)")
|
||||
if err := r.launchProcess(ctx, rqliteDataDir); err != nil {
|
||||
return fmt.Errorf("failed to restart RQLite process: %w", err)
|
||||
}
|
||||
|
||||
// Restart RQLite
|
||||
r.cmd = exec.Command("rqlited", args...)
|
||||
r.cmd.Stdout = os.Stdout
|
||||
r.cmd.Stderr = os.Stderr
|
||||
|
||||
if err := r.cmd.Start(); err != nil {
|
||||
return fmt.Errorf("failed to restart RQLite: %w", err)
|
||||
// Wait for RQLite to be ready and establish connection using proper readiness checks
|
||||
// This includes retries for "store is not open" errors during recovery
|
||||
if err := r.waitForReadyAndConnect(ctx); err != nil {
|
||||
// Clean up the process if connection failed
|
||||
if r.cmd != nil && r.cmd.Process != nil {
|
||||
_ = r.cmd.Process.Kill()
|
||||
}
|
||||
return fmt.Errorf("failed to wait for RQLite readiness after recovery: %w", err)
|
||||
}
|
||||
|
||||
r.logger.Info("RQLite restarted, waiting for it to become ready")
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
// Recreate connection
|
||||
conn, err := gorqlite.Open(fmt.Sprintf("http://localhost:%d", r.config.RQLitePort))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to reconnect to RQLite: %w", err)
|
||||
}
|
||||
r.connection = conn
|
||||
|
||||
r.logger.Info("Cluster recovery completed, RQLite restarted with new configuration")
|
||||
return nil
|
||||
}
|
||||
@ -887,26 +1007,25 @@ func (r *RQLiteManager) clearRaftState(rqliteDataDir string) error {
|
||||
}
|
||||
|
||||
// isStuckInConfigurationMismatch checks if we're stuck due to configuration mismatch
|
||||
// This detects the "not part of stable configuration" scenario where Raft can't elect a leader
|
||||
// This detects both configuration mismatch AND split-brain scenarios
|
||||
func (r *RQLiteManager) isStuckInConfigurationMismatch() bool {
|
||||
// Check Raft state via status endpoint
|
||||
// First check for split-brain (all followers, term 0, no peers)
|
||||
if r.isInSplitBrainState() {
|
||||
return true
|
||||
}
|
||||
|
||||
// Then check for traditional configuration mismatch
|
||||
status, err := r.getRQLiteStatus()
|
||||
if err != nil {
|
||||
r.logger.Debug("Cannot check Raft status for configuration mismatch", zap.Error(err))
|
||||
return false // Can't determine, don't clear
|
||||
}
|
||||
|
||||
// Get Raft state and leader information
|
||||
raftState := strings.ToLower(status.Store.Raft.State)
|
||||
hasLeader := status.Store.Raft.LeaderAddr != ""
|
||||
|
||||
// Stuck if: no leader AND state is not "leader" or "follower"
|
||||
// (likely stuck in "candidate" or configuration mismatch)
|
||||
if !hasLeader && raftState != "leader" && raftState != "follower" {
|
||||
r.logger.Debug("Detected potential configuration mismatch",
|
||||
zap.String("raft_state", raftState),
|
||||
zap.Bool("has_leader", hasLeader))
|
||||
// Verify all peers are also stuck
|
||||
if r.allPeersAreStuck() {
|
||||
return true
|
||||
}
|
||||
@ -993,6 +1112,204 @@ func (r *RQLiteManager) isPeerReachable(httpAddr string) bool {
|
||||
return resp.StatusCode == http.StatusOK
|
||||
}
|
||||
|
||||
// isInSplitBrainState detects if we're in a split-brain scenario where all nodes
|
||||
// are followers with no peers (each node thinks it's alone)
|
||||
func (r *RQLiteManager) isInSplitBrainState() bool {
|
||||
status, err := r.getRQLiteStatus()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
raft := status.Store.Raft
|
||||
|
||||
// Split-brain indicators:
|
||||
// - State is Follower (not Leader)
|
||||
// - Term is 0 (no leader election has occurred)
|
||||
// - num_peers is 0 (node thinks it's alone)
|
||||
// - voter is false (node not configured as voter)
|
||||
isSplitBrain := raft.State == "Follower" &&
|
||||
raft.Term == 0 &&
|
||||
raft.NumPeers == 0 &&
|
||||
!raft.Voter &&
|
||||
raft.LeaderAddr == ""
|
||||
|
||||
if !isSplitBrain {
|
||||
return false
|
||||
}
|
||||
|
||||
// Verify all discovered peers are also in split-brain state
|
||||
if r.discoveryService == nil {
|
||||
r.logger.Debug("No discovery service to verify split-brain across peers")
|
||||
return false
|
||||
}
|
||||
|
||||
peers := r.discoveryService.GetActivePeers()
|
||||
if len(peers) == 0 {
|
||||
// No peers discovered yet - might be network issue, not split-brain
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if all reachable peers are also in split-brain
|
||||
splitBrainCount := 0
|
||||
reachableCount := 0
|
||||
for _, peer := range peers {
|
||||
if !r.isPeerReachable(peer.HTTPAddress) {
|
||||
continue
|
||||
}
|
||||
reachableCount++
|
||||
|
||||
peerStatus, err := r.getPeerRQLiteStatus(peer.HTTPAddress)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
peerRaft := peerStatus.Store.Raft
|
||||
if peerRaft.State == "Follower" &&
|
||||
peerRaft.Term == 0 &&
|
||||
peerRaft.NumPeers == 0 &&
|
||||
!peerRaft.Voter {
|
||||
splitBrainCount++
|
||||
}
|
||||
}
|
||||
|
||||
// If all reachable peers are in split-brain, we have cluster-wide split-brain
|
||||
if reachableCount > 0 && splitBrainCount == reachableCount {
|
||||
r.logger.Warn("Detected cluster-wide split-brain state",
|
||||
zap.Int("reachable_peers", reachableCount),
|
||||
zap.Int("split_brain_peers", splitBrainCount))
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// getPeerRQLiteStatus queries a peer's status endpoint
|
||||
func (r *RQLiteManager) getPeerRQLiteStatus(httpAddr string) (*RQLiteStatus, error) {
|
||||
url := fmt.Sprintf("http://%s/status", httpAddr)
|
||||
client := &http.Client{Timeout: 3 * time.Second}
|
||||
|
||||
resp, err := client.Get(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("peer returned status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
var status RQLiteStatus
|
||||
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &status, nil
|
||||
}
|
||||
|
||||
// startHealthMonitoring runs periodic health checks and automatically recovers from split-brain
|
||||
func (r *RQLiteManager) startHealthMonitoring(ctx context.Context) {
|
||||
// Wait a bit after startup before starting health checks
|
||||
time.Sleep(30 * time.Second)
|
||||
|
||||
ticker := time.NewTicker(60 * time.Second) // Check every minute
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
// Check for split-brain state
|
||||
if r.isInSplitBrainState() {
|
||||
r.logger.Warn("Split-brain detected during health check, initiating automatic recovery")
|
||||
|
||||
// Attempt automatic recovery
|
||||
if err := r.recoverFromSplitBrain(ctx); err != nil {
|
||||
r.logger.Error("Automatic split-brain recovery failed",
|
||||
zap.Error(err),
|
||||
zap.String("action", "will retry on next health check"))
|
||||
} else {
|
||||
r.logger.Info("Successfully recovered from split-brain")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// recoverFromSplitBrain automatically recovers from split-brain state
|
||||
func (r *RQLiteManager) recoverFromSplitBrain(ctx context.Context) error {
|
||||
if r.discoveryService == nil {
|
||||
return fmt.Errorf("discovery service not available for recovery")
|
||||
}
|
||||
|
||||
r.logger.Info("Starting automatic split-brain recovery")
|
||||
|
||||
// Step 1: Ensure we have latest peer information
|
||||
r.discoveryService.TriggerPeerExchange(ctx)
|
||||
time.Sleep(2 * time.Second)
|
||||
r.discoveryService.TriggerSync()
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Step 2: Get data directory
|
||||
rqliteDataDir, err := r.rqliteDataDirPath()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get data directory: %w", err)
|
||||
}
|
||||
|
||||
// Step 3: Check if peers have more recent data
|
||||
allPeers := r.discoveryService.GetAllPeers()
|
||||
maxPeerIndex := uint64(0)
|
||||
for _, peer := range allPeers {
|
||||
if peer.NodeID == r.discoverConfig.RaftAdvAddress {
|
||||
continue // Skip self
|
||||
}
|
||||
if peer.RaftLogIndex > maxPeerIndex {
|
||||
maxPeerIndex = peer.RaftLogIndex
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: Clear our Raft state if peers have more recent data
|
||||
ourIndex := r.getRaftLogIndex()
|
||||
if maxPeerIndex > ourIndex || (maxPeerIndex == 0 && ourIndex == 0) {
|
||||
r.logger.Info("Clearing Raft state to allow clean cluster join",
|
||||
zap.Uint64("our_index", ourIndex),
|
||||
zap.Uint64("peer_max_index", maxPeerIndex))
|
||||
|
||||
if err := r.clearRaftState(rqliteDataDir); err != nil {
|
||||
return fmt.Errorf("failed to clear Raft state: %w", err)
|
||||
}
|
||||
|
||||
// Step 5: Ensure peers.json exists with all discovered peers
|
||||
r.discoveryService.TriggerSync()
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json")
|
||||
if _, err := os.Stat(peersPath); err != nil {
|
||||
return fmt.Errorf("peers.json not created: %w", err)
|
||||
}
|
||||
|
||||
// Step 6: Restart RQLite to pick up new peers.json
|
||||
r.logger.Info("Restarting RQLite to apply new cluster configuration")
|
||||
if err := r.recoverCluster(ctx, peersPath); err != nil {
|
||||
return fmt.Errorf("failed to restart RQLite: %w", err)
|
||||
}
|
||||
|
||||
// Step 7: Wait for cluster to form (waitForReadyAndConnect already handled readiness)
|
||||
r.logger.Info("Waiting for cluster to stabilize after recovery...")
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
// Verify recovery succeeded
|
||||
if r.isInSplitBrainState() {
|
||||
return fmt.Errorf("still in split-brain after recovery attempt")
|
||||
}
|
||||
|
||||
r.logger.Info("Split-brain recovery completed successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("cannot recover: we have more recent data than peers")
|
||||
}
|
||||
|
||||
// isSafeToClearState verifies we can safely clear Raft state
|
||||
// Returns true only if peers have higher log indexes (they have more recent data)
|
||||
// or if we have no meaningful state (index == 0)
|
||||
|
||||
@ -6,13 +6,16 @@ import "time"
|
||||
type RQLiteStatus struct {
|
||||
Store struct {
|
||||
Raft struct {
|
||||
AppliedIndex uint64 `json:"applied_index"`
|
||||
CommitIndex uint64 `json:"commit_index"`
|
||||
LastLogIndex uint64 `json:"last_log_index"`
|
||||
AppliedIndex uint64 `json:"applied_index"`
|
||||
CommitIndex uint64 `json:"commit_index"`
|
||||
LastLogIndex uint64 `json:"last_log_index"`
|
||||
LastSnapshotIndex uint64 `json:"last_snapshot_index"`
|
||||
State string `json:"state"`
|
||||
LeaderID string `json:"leader_id"`
|
||||
LeaderAddr string `json:"leader_addr"`
|
||||
State string `json:"state"`
|
||||
LeaderID string `json:"leader_id"`
|
||||
LeaderAddr string `json:"leader_addr"`
|
||||
Term uint64 `json:"term"`
|
||||
NumPeers int `json:"num_peers"`
|
||||
Voter bool `json:"voter"`
|
||||
} `json:"raft"`
|
||||
DBConf struct {
|
||||
DSN string `json:"dsn"`
|
||||
@ -20,30 +23,30 @@ type RQLiteStatus struct {
|
||||
} `json:"db_conf"`
|
||||
} `json:"store"`
|
||||
Runtime struct {
|
||||
GOARCH string `json:"GOARCH"`
|
||||
GOOS string `json:"GOOS"`
|
||||
GOMAXPROCS int `json:"GOMAXPROCS"`
|
||||
NumCPU int `json:"num_cpu"`
|
||||
NumGoroutine int `json:"num_goroutine"`
|
||||
Version string `json:"version"`
|
||||
GOARCH string `json:"GOARCH"`
|
||||
GOOS string `json:"GOOS"`
|
||||
GOMAXPROCS int `json:"GOMAXPROCS"`
|
||||
NumCPU int `json:"num_cpu"`
|
||||
NumGoroutine int `json:"num_goroutine"`
|
||||
Version string `json:"version"`
|
||||
} `json:"runtime"`
|
||||
HTTP struct {
|
||||
Addr string `json:"addr"`
|
||||
Auth string `json:"auth"`
|
||||
Addr string `json:"addr"`
|
||||
Auth string `json:"auth"`
|
||||
} `json:"http"`
|
||||
Node struct {
|
||||
Uptime string `json:"uptime"`
|
||||
StartTime string `json:"start_time"`
|
||||
Uptime string `json:"uptime"`
|
||||
StartTime string `json:"start_time"`
|
||||
} `json:"node"`
|
||||
}
|
||||
|
||||
// RQLiteNode represents a node in the RQLite cluster
|
||||
type RQLiteNode struct {
|
||||
ID string `json:"id"`
|
||||
Address string `json:"address"`
|
||||
Leader bool `json:"leader"`
|
||||
Voter bool `json:"voter"`
|
||||
Reachable bool `json:"reachable"`
|
||||
ID string `json:"id"`
|
||||
Address string `json:"address"`
|
||||
Leader bool `json:"leader"`
|
||||
Voter bool `json:"voter"`
|
||||
Reachable bool `json:"reachable"`
|
||||
}
|
||||
|
||||
// RQLiteNodes represents the response from RQLite's /nodes endpoint
|
||||
@ -68,4 +71,3 @@ type ClusterMetrics struct {
|
||||
CurrentLeader string
|
||||
AveragePeerHealth float64
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user