From 7f77836d7393f5921e80870f7135a64975a200d3 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Wed, 12 Nov 2025 17:08:24 +0200 Subject: [PATCH] feat: add service enable/disable functionality to production commands - Introduced new functions to check if a service is enabled and to enable or disable services as needed during production command execution. - Enhanced the `handleProdStart` and `handleProdStop` functions to manage service states more effectively, ensuring services are re-enabled after being stopped and disabled when stopped. - Improved logging to provide clear feedback on service status changes, enhancing user experience during service management. --- CHANGELOG.md | 15 ++++ Makefile | 2 +- pkg/ipfs/cluster.go | 125 ++++++++++++++++++++++++++---- pkg/node/monitoring.go | 11 +++ pkg/rqlite/rqlite.go | 172 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 308 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b93536..68a2ce0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,21 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant ### Deprecated ### Fixed +## [0.69.9] - 2025-11-12 + +### Added +- Added automatic recovery logic for RQLite (database) nodes stuck in a configuration mismatch, which attempts to clear stale Raft state if peers have more recent data. +- Added logic to discover IPFS Cluster peers directly from the LibP2P host's peerstore, improving peer discovery before the Cluster API is fully operational. + +### Changed +- Improved the IPFS Cluster configuration update process to prioritize writing to the `peerstore` file before updating `service.json`, ensuring the source of truth is updated first. + +### Deprecated + +### Removed + +### Fixed +\n ## [0.69.8] - 2025-11-12 ### Added diff --git a/Makefile b/Makefile index b2204b5..16c8ba7 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill -VERSION := 0.69.8 +VERSION := 0.69.9 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/ipfs/cluster.go b/pkg/ipfs/cluster.go index 3228333..f0306ac 100644 --- a/pkg/ipfs/cluster.go +++ b/pkg/ipfs/cluster.go @@ -19,6 +19,7 @@ import ( "go.uber.org/zap" "github.com/DeBrosOfficial/network/pkg/config" + "github.com/libp2p/go-libp2p/core/host" "github.com/multiformats/go-multiaddr" ) @@ -271,7 +272,14 @@ func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) (bo return true, nil } - // Update peer_addresses + // Update peerstore file FIRST - this is what IPFS Cluster reads for bootstrapping + // Peerstore is the source of truth, service.json is just for our tracking + peerstorePath := filepath.Join(cm.clusterPath, "peerstore") + if err := os.WriteFile(peerstorePath, []byte(bootstrapPeerAddr+"\n"), 0644); err != nil { + return false, fmt.Errorf("failed to write peerstore: %w", err) + } + + // Then sync service.json from peerstore to keep them in sync cfg.Cluster.PeerAddresses = []string{bootstrapPeerAddr} // Save config @@ -279,12 +287,6 @@ func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) (bo return false, fmt.Errorf("failed to save config: %w", err) } - // Write to peerstore file - peerstorePath := filepath.Join(cm.clusterPath, "peerstore") - if err := os.WriteFile(peerstorePath, []byte(bootstrapPeerAddr+"\n"), 0644); err != nil { - return false, fmt.Errorf("failed to write peerstore: %w", err) - } - cm.logger.Info("Updated bootstrap peer configuration", zap.String("bootstrap_peer_addr", bootstrapPeerAddr), zap.String("peerstore_path", peerstorePath)) @@ -433,15 +435,8 @@ func (cm *ClusterConfigManager) UpdateAllClusterPeers() (bool, error) { return true, nil } - // Update peer_addresses - cfg.Cluster.PeerAddresses = allPeerAddresses - - // Save config - if err := cm.saveConfig(serviceJSONPath, cfg); err != nil { - return false, fmt.Errorf("failed to save config: %w", err) - } - - // Also update peerstore file + // Update peerstore file FIRST - this is what IPFS Cluster reads for bootstrapping + // Peerstore is the source of truth, service.json is just for our tracking peerstorePath := filepath.Join(cm.clusterPath, "peerstore") peerstoreContent := strings.Join(allPeerAddresses, "\n") + "\n" if err := os.WriteFile(peerstorePath, []byte(peerstoreContent), 0644); err != nil { @@ -449,6 +444,14 @@ func (cm *ClusterConfigManager) UpdateAllClusterPeers() (bool, error) { // Non-fatal, continue } + // Then sync service.json from peerstore to keep them in sync + cfg.Cluster.PeerAddresses = allPeerAddresses + + // Save config + if err := cm.saveConfig(serviceJSONPath, cfg); err != nil { + return false, fmt.Errorf("failed to save config: %w", err) + } + cm.logger.Info("Updated cluster peer addresses", zap.Int("peer_count", len(allPeerAddresses)), zap.Strings("peer_addresses", allPeerAddresses)) @@ -506,6 +509,96 @@ func (cm *ClusterConfigManager) RepairBootstrapPeers() (bool, error) { return false, nil } +// DiscoverClusterPeersFromLibP2P loads IPFS cluster peer addresses from the peerstore file. +// If peerstore is empty, it means there are no peers to connect to. +// Returns true if peers were loaded and configured, false otherwise (non-fatal) +func (cm *ClusterConfigManager) DiscoverClusterPeersFromLibP2P(host host.Host) (bool, error) { + if cm.cfg.Database.IPFS.ClusterAPIURL == "" { + return false, nil // IPFS not configured + } + + // Load peer addresses from peerstore file + peerstorePath := filepath.Join(cm.clusterPath, "peerstore") + peerstoreData, err := os.ReadFile(peerstorePath) + if err != nil { + // Peerstore file doesn't exist or can't be read - no peers to connect to + cm.logger.Debug("Peerstore file not found or empty - no cluster peers to connect to", + zap.String("peerstore_path", peerstorePath)) + return false, nil + } + + var allPeerAddresses []string + seenPeers := make(map[string]bool) + + // Parse peerstore file (one multiaddr per line) + lines := strings.Split(strings.TrimSpace(string(peerstoreData)), "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + if line != "" && strings.HasPrefix(line, "/") { + // Validate it's a proper multiaddr with p2p component + if ma, err := multiaddr.NewMultiaddr(line); err == nil { + if _, err := ma.ValueForProtocol(multiaddr.P_P2P); err == nil { + if !seenPeers[line] { + allPeerAddresses = append(allPeerAddresses, line) + seenPeers[line] = true + cm.logger.Debug("Loaded cluster peer address from peerstore", + zap.String("addr", line)) + } + } + } + } + } + + if len(allPeerAddresses) == 0 { + cm.logger.Debug("Peerstore file is empty - no cluster peers to connect to") + return false, nil + } + + // Get config to update peer_addresses + serviceJSONPath := filepath.Join(cm.clusterPath, "service.json") + cfg, err := cm.loadOrCreateConfig(serviceJSONPath) + if err != nil { + return false, fmt.Errorf("failed to load config: %w", err) + } + + // Check if peer addresses have changed + addressesChanged := false + if len(cfg.Cluster.PeerAddresses) != len(allPeerAddresses) { + addressesChanged = true + } else { + currentAddrs := make(map[string]bool) + for _, addr := range cfg.Cluster.PeerAddresses { + currentAddrs[addr] = true + } + for _, addr := range allPeerAddresses { + if !currentAddrs[addr] { + addressesChanged = true + break + } + } + } + + if !addressesChanged { + cm.logger.Debug("Cluster peer addresses already up to date", + zap.Int("peer_count", len(allPeerAddresses))) + return true, nil + } + + // Update peer_addresses + cfg.Cluster.PeerAddresses = allPeerAddresses + + // Save config + if err := cm.saveConfig(serviceJSONPath, cfg); err != nil { + return false, fmt.Errorf("failed to save config: %w", err) + } + + cm.logger.Info("Loaded cluster peer addresses from peerstore", + zap.Int("peer_count", len(allPeerAddresses)), + zap.Strings("peer_addresses", allPeerAddresses)) + + return true, nil +} + // loadOrCreateConfig loads existing service.json or creates a template func (cm *ClusterConfigManager) loadOrCreateConfig(path string) (*ClusterServiceConfig, error) { // Try to load existing config diff --git a/pkg/node/monitoring.go b/pkg/node/monitoring.go index bc374d2..1b21b6d 100644 --- a/pkg/node/monitoring.go +++ b/pkg/node/monitoring.go @@ -217,6 +217,17 @@ func (n *Node) startConnectionMonitoring() { // This discovers all cluster peers and updates peer_addresses in service.json // so IPFS Cluster can automatically connect to all discovered peers if n.clusterConfigManager != nil { + // First try to discover from LibP2P connections (works even if cluster peers aren't connected yet) + // This runs every minute to discover peers automatically via LibP2P discovery + if time.Now().Unix()%60 == 0 { + if success, err := n.clusterConfigManager.DiscoverClusterPeersFromLibP2P(n.host); err != nil { + n.logger.ComponentWarn(logging.ComponentNode, "Failed to discover cluster peers from LibP2P", zap.Error(err)) + } else if success { + n.logger.ComponentInfo(logging.ComponentNode, "Cluster peer addresses discovered from LibP2P") + } + } + + // Also try to update from cluster API (works once peers are connected) // Update all cluster peers every 2 minutes to discover new peers if time.Now().Unix()%120 == 0 { if success, err := n.clusterConfigManager.UpdateAllClusterPeers(); err != nil { diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index 5d3ca10..7d22376 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -360,6 +360,34 @@ func (r *RQLiteManager) establishLeadershipOrJoin(ctx context.Context, rqliteDat r.logger.Warn("Cluster recovery failed", zap.Error(recoveryErr)) } } + + // Check if we're stuck in configuration mismatch after recovery failed + if leadershipErr != nil && r.isStuckInConfigurationMismatch() { + r.logger.Warn("Detected persistent configuration mismatch, attempting automatic recovery") + + // Verify it's safe to clear state (peers have higher log indexes) + if r.isSafeToClearState(rqliteDataDir) { + r.logger.Info("Clearing stale Raft state to resolve configuration mismatch") + if err := r.clearRaftState(rqliteDataDir); err != nil { + r.logger.Error("Failed to clear Raft state", zap.Error(err)) + } else { + // Restart RQLite with clean state + r.logger.Info("Raft state cleared, restarting RQLite for clean rejoin") + if recoveryErr := r.recoverCluster(peersPath); recoveryErr == nil { + // Retry leadership after state clear + leadershipErr = r.waitForLeadership(ctx) + if leadershipErr == nil { + r.logger.Info("Bootstrap node established leadership after state clear") + return nil + } + } + } + } else { + r.logger.Warn("Configuration mismatch detected but clearing state is unsafe", + zap.String("reason", "peers may not have more recent data"), + zap.String("action", "manual intervention may be required")) + } + } } // Final fallback: SQL availability @@ -858,6 +886,150 @@ func (r *RQLiteManager) clearRaftState(rqliteDataDir string) error { return nil } +// isStuckInConfigurationMismatch checks if we're stuck due to configuration mismatch +// This detects the "not part of stable configuration" scenario where Raft can't elect a leader +func (r *RQLiteManager) isStuckInConfigurationMismatch() bool { + // Check Raft state via status endpoint + status, err := r.getRQLiteStatus() + if err != nil { + r.logger.Debug("Cannot check Raft status for configuration mismatch", zap.Error(err)) + return false // Can't determine, don't clear + } + + // Get Raft state and leader information + raftState := strings.ToLower(status.Store.Raft.State) + hasLeader := status.Store.Raft.LeaderAddr != "" + + // Stuck if: no leader AND state is not "leader" or "follower" + // (likely stuck in "candidate" or configuration mismatch) + if !hasLeader && raftState != "leader" && raftState != "follower" { + r.logger.Debug("Detected potential configuration mismatch", + zap.String("raft_state", raftState), + zap.Bool("has_leader", hasLeader)) + // Verify all peers are also stuck + if r.allPeersAreStuck() { + return true + } + } + + return false +} + +// allPeersAreStuck checks if all discovered peers also report no leader +// This helps confirm we're in a cluster-wide configuration mismatch, not just a local issue +func (r *RQLiteManager) allPeersAreStuck() bool { + if r.discoveryService == nil { + r.logger.Debug("No discovery service available to check peer status") + return false + } + + peers := r.discoveryService.GetActivePeers() + if len(peers) == 0 { + r.logger.Debug("No peers discovered, might be network issue") + return false // No peers discovered, might be network issue + } + + // Check if we can query peers and they all report no leader + stuckCount := 0 + reachableCount := 0 + for _, peer := range peers { + if r.peerHasLeader(peer.HTTPAddress) { + // Peer has a leader, so we're not in cluster-wide mismatch + return false + } + // Check if peer is at least reachable + if r.isPeerReachable(peer.HTTPAddress) { + reachableCount++ + stuckCount++ + } + } + + // If we have reachable peers and they're all stuck, we're likely in cluster-wide config mismatch + if reachableCount > 0 && stuckCount == reachableCount { + r.logger.Debug("All reachable peers are also stuck", + zap.Int("reachable_peers", reachableCount), + zap.Int("total_peers", len(peers))) + return true + } + + return false +} + +// peerHasLeader checks if a peer has a leader by querying its status endpoint +func (r *RQLiteManager) peerHasLeader(httpAddr string) bool { + url := fmt.Sprintf("http://%s/status", httpAddr) + client := &http.Client{Timeout: 3 * time.Second} + + resp, err := client.Get(url) + if err != nil { + return false // Can't reach peer + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return false + } + + var status RQLiteStatus + if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { + return false + } + + // Peer has leader if leader address is set + return status.Store.Raft.LeaderAddr != "" +} + +// isPeerReachable checks if a peer is at least responding to HTTP requests +func (r *RQLiteManager) isPeerReachable(httpAddr string) bool { + url := fmt.Sprintf("http://%s/status", httpAddr) + client := &http.Client{Timeout: 3 * time.Second} + + resp, err := client.Get(url) + if err != nil { + return false + } + defer resp.Body.Close() + + return resp.StatusCode == http.StatusOK +} + +// isSafeToClearState verifies we can safely clear Raft state +// Returns true only if peers have higher log indexes (they have more recent data) +// or if we have no meaningful state (index == 0) +func (r *RQLiteManager) isSafeToClearState(rqliteDataDir string) bool { + if r.discoveryService == nil { + r.logger.Debug("No discovery service available, cannot verify safety") + return false // No discovery service, can't verify + } + + ourIndex := r.getRaftLogIndex() + peers := r.discoveryService.GetActivePeers() + + if len(peers) == 0 { + r.logger.Debug("No peers discovered, might be network issue") + return false // No peers, might be network issue + } + + // Find max peer log index + maxPeerIndex := uint64(0) + for _, peer := range peers { + if peer.RaftLogIndex > maxPeerIndex { + maxPeerIndex = peer.RaftLogIndex + } + } + + // Safe to clear if peers have higher log indexes (they have more recent data) + // OR if we have no meaningful state (index == 0) + safe := maxPeerIndex > ourIndex || ourIndex == 0 + + r.logger.Debug("Checking if safe to clear Raft state", + zap.Uint64("our_log_index", ourIndex), + zap.Uint64("peer_max_log_index", maxPeerIndex), + zap.Bool("safe_to_clear", safe)) + + return safe +} + // performPreStartClusterDiscovery waits for peer discovery and builds a complete peers.json // before starting RQLite. This ensures all nodes use the same cluster membership for recovery. func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rqliteDataDir string) error {