feat: add service enable/disable functionality to production commands

- Introduced new functions to check if a service is enabled and to enable or disable services as needed during production command execution.
- Enhanced the `handleProdStart` and `handleProdStop` functions to manage service states more effectively, ensuring services are re-enabled after being stopped and disabled when stopped.
- Improved logging to provide clear feedback on service status changes, enhancing user experience during service management.
This commit is contained in:
anonpenguin23 2025-11-12 17:08:24 +02:00
parent 1d060490a8
commit 7f77836d73
No known key found for this signature in database
GPG Key ID: 1CBB1FE35AFBEE30
5 changed files with 308 additions and 17 deletions

View File

@ -13,6 +13,21 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant
### Deprecated
### Fixed
## [0.69.9] - 2025-11-12
### Added
- Added automatic recovery logic for RQLite (database) nodes stuck in a configuration mismatch, which attempts to clear stale Raft state if peers have more recent data.
- Added logic to discover IPFS Cluster peers directly from the LibP2P host's peerstore, improving peer discovery before the Cluster API is fully operational.
### Changed
- Improved the IPFS Cluster configuration update process to prioritize writing to the `peerstore` file before updating `service.json`, ensuring the source of truth is updated first.
### Deprecated
### Removed
### Fixed
\n
## [0.69.8] - 2025-11-12
### Added

View File

@ -19,7 +19,7 @@ test-e2e:
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill
VERSION := 0.69.8
VERSION := 0.69.9
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'

View File

@ -19,6 +19,7 @@ import (
"go.uber.org/zap"
"github.com/DeBrosOfficial/network/pkg/config"
"github.com/libp2p/go-libp2p/core/host"
"github.com/multiformats/go-multiaddr"
)
@ -271,7 +272,14 @@ func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) (bo
return true, nil
}
// Update peer_addresses
// Update peerstore file FIRST - this is what IPFS Cluster reads for bootstrapping
// Peerstore is the source of truth, service.json is just for our tracking
peerstorePath := filepath.Join(cm.clusterPath, "peerstore")
if err := os.WriteFile(peerstorePath, []byte(bootstrapPeerAddr+"\n"), 0644); err != nil {
return false, fmt.Errorf("failed to write peerstore: %w", err)
}
// Then sync service.json from peerstore to keep them in sync
cfg.Cluster.PeerAddresses = []string{bootstrapPeerAddr}
// Save config
@ -279,12 +287,6 @@ func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) (bo
return false, fmt.Errorf("failed to save config: %w", err)
}
// Write to peerstore file
peerstorePath := filepath.Join(cm.clusterPath, "peerstore")
if err := os.WriteFile(peerstorePath, []byte(bootstrapPeerAddr+"\n"), 0644); err != nil {
return false, fmt.Errorf("failed to write peerstore: %w", err)
}
cm.logger.Info("Updated bootstrap peer configuration",
zap.String("bootstrap_peer_addr", bootstrapPeerAddr),
zap.String("peerstore_path", peerstorePath))
@ -433,15 +435,8 @@ func (cm *ClusterConfigManager) UpdateAllClusterPeers() (bool, error) {
return true, nil
}
// Update peer_addresses
cfg.Cluster.PeerAddresses = allPeerAddresses
// Save config
if err := cm.saveConfig(serviceJSONPath, cfg); err != nil {
return false, fmt.Errorf("failed to save config: %w", err)
}
// Also update peerstore file
// Update peerstore file FIRST - this is what IPFS Cluster reads for bootstrapping
// Peerstore is the source of truth, service.json is just for our tracking
peerstorePath := filepath.Join(cm.clusterPath, "peerstore")
peerstoreContent := strings.Join(allPeerAddresses, "\n") + "\n"
if err := os.WriteFile(peerstorePath, []byte(peerstoreContent), 0644); err != nil {
@ -449,6 +444,14 @@ func (cm *ClusterConfigManager) UpdateAllClusterPeers() (bool, error) {
// Non-fatal, continue
}
// Then sync service.json from peerstore to keep them in sync
cfg.Cluster.PeerAddresses = allPeerAddresses
// Save config
if err := cm.saveConfig(serviceJSONPath, cfg); err != nil {
return false, fmt.Errorf("failed to save config: %w", err)
}
cm.logger.Info("Updated cluster peer addresses",
zap.Int("peer_count", len(allPeerAddresses)),
zap.Strings("peer_addresses", allPeerAddresses))
@ -506,6 +509,96 @@ func (cm *ClusterConfigManager) RepairBootstrapPeers() (bool, error) {
return false, nil
}
// DiscoverClusterPeersFromLibP2P loads IPFS cluster peer addresses from the peerstore file.
// If peerstore is empty, it means there are no peers to connect to.
// Returns true if peers were loaded and configured, false otherwise (non-fatal)
func (cm *ClusterConfigManager) DiscoverClusterPeersFromLibP2P(host host.Host) (bool, error) {
if cm.cfg.Database.IPFS.ClusterAPIURL == "" {
return false, nil // IPFS not configured
}
// Load peer addresses from peerstore file
peerstorePath := filepath.Join(cm.clusterPath, "peerstore")
peerstoreData, err := os.ReadFile(peerstorePath)
if err != nil {
// Peerstore file doesn't exist or can't be read - no peers to connect to
cm.logger.Debug("Peerstore file not found or empty - no cluster peers to connect to",
zap.String("peerstore_path", peerstorePath))
return false, nil
}
var allPeerAddresses []string
seenPeers := make(map[string]bool)
// Parse peerstore file (one multiaddr per line)
lines := strings.Split(strings.TrimSpace(string(peerstoreData)), "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if line != "" && strings.HasPrefix(line, "/") {
// Validate it's a proper multiaddr with p2p component
if ma, err := multiaddr.NewMultiaddr(line); err == nil {
if _, err := ma.ValueForProtocol(multiaddr.P_P2P); err == nil {
if !seenPeers[line] {
allPeerAddresses = append(allPeerAddresses, line)
seenPeers[line] = true
cm.logger.Debug("Loaded cluster peer address from peerstore",
zap.String("addr", line))
}
}
}
}
}
if len(allPeerAddresses) == 0 {
cm.logger.Debug("Peerstore file is empty - no cluster peers to connect to")
return false, nil
}
// Get config to update peer_addresses
serviceJSONPath := filepath.Join(cm.clusterPath, "service.json")
cfg, err := cm.loadOrCreateConfig(serviceJSONPath)
if err != nil {
return false, fmt.Errorf("failed to load config: %w", err)
}
// Check if peer addresses have changed
addressesChanged := false
if len(cfg.Cluster.PeerAddresses) != len(allPeerAddresses) {
addressesChanged = true
} else {
currentAddrs := make(map[string]bool)
for _, addr := range cfg.Cluster.PeerAddresses {
currentAddrs[addr] = true
}
for _, addr := range allPeerAddresses {
if !currentAddrs[addr] {
addressesChanged = true
break
}
}
}
if !addressesChanged {
cm.logger.Debug("Cluster peer addresses already up to date",
zap.Int("peer_count", len(allPeerAddresses)))
return true, nil
}
// Update peer_addresses
cfg.Cluster.PeerAddresses = allPeerAddresses
// Save config
if err := cm.saveConfig(serviceJSONPath, cfg); err != nil {
return false, fmt.Errorf("failed to save config: %w", err)
}
cm.logger.Info("Loaded cluster peer addresses from peerstore",
zap.Int("peer_count", len(allPeerAddresses)),
zap.Strings("peer_addresses", allPeerAddresses))
return true, nil
}
// loadOrCreateConfig loads existing service.json or creates a template
func (cm *ClusterConfigManager) loadOrCreateConfig(path string) (*ClusterServiceConfig, error) {
// Try to load existing config

View File

@ -217,6 +217,17 @@ func (n *Node) startConnectionMonitoring() {
// This discovers all cluster peers and updates peer_addresses in service.json
// so IPFS Cluster can automatically connect to all discovered peers
if n.clusterConfigManager != nil {
// First try to discover from LibP2P connections (works even if cluster peers aren't connected yet)
// This runs every minute to discover peers automatically via LibP2P discovery
if time.Now().Unix()%60 == 0 {
if success, err := n.clusterConfigManager.DiscoverClusterPeersFromLibP2P(n.host); err != nil {
n.logger.ComponentWarn(logging.ComponentNode, "Failed to discover cluster peers from LibP2P", zap.Error(err))
} else if success {
n.logger.ComponentInfo(logging.ComponentNode, "Cluster peer addresses discovered from LibP2P")
}
}
// Also try to update from cluster API (works once peers are connected)
// Update all cluster peers every 2 minutes to discover new peers
if time.Now().Unix()%120 == 0 {
if success, err := n.clusterConfigManager.UpdateAllClusterPeers(); err != nil {

View File

@ -360,6 +360,34 @@ func (r *RQLiteManager) establishLeadershipOrJoin(ctx context.Context, rqliteDat
r.logger.Warn("Cluster recovery failed", zap.Error(recoveryErr))
}
}
// Check if we're stuck in configuration mismatch after recovery failed
if leadershipErr != nil && r.isStuckInConfigurationMismatch() {
r.logger.Warn("Detected persistent configuration mismatch, attempting automatic recovery")
// Verify it's safe to clear state (peers have higher log indexes)
if r.isSafeToClearState(rqliteDataDir) {
r.logger.Info("Clearing stale Raft state to resolve configuration mismatch")
if err := r.clearRaftState(rqliteDataDir); err != nil {
r.logger.Error("Failed to clear Raft state", zap.Error(err))
} else {
// Restart RQLite with clean state
r.logger.Info("Raft state cleared, restarting RQLite for clean rejoin")
if recoveryErr := r.recoverCluster(peersPath); recoveryErr == nil {
// Retry leadership after state clear
leadershipErr = r.waitForLeadership(ctx)
if leadershipErr == nil {
r.logger.Info("Bootstrap node established leadership after state clear")
return nil
}
}
}
} else {
r.logger.Warn("Configuration mismatch detected but clearing state is unsafe",
zap.String("reason", "peers may not have more recent data"),
zap.String("action", "manual intervention may be required"))
}
}
}
// Final fallback: SQL availability
@ -858,6 +886,150 @@ func (r *RQLiteManager) clearRaftState(rqliteDataDir string) error {
return nil
}
// isStuckInConfigurationMismatch checks if we're stuck due to configuration mismatch
// This detects the "not part of stable configuration" scenario where Raft can't elect a leader
func (r *RQLiteManager) isStuckInConfigurationMismatch() bool {
// Check Raft state via status endpoint
status, err := r.getRQLiteStatus()
if err != nil {
r.logger.Debug("Cannot check Raft status for configuration mismatch", zap.Error(err))
return false // Can't determine, don't clear
}
// Get Raft state and leader information
raftState := strings.ToLower(status.Store.Raft.State)
hasLeader := status.Store.Raft.LeaderAddr != ""
// Stuck if: no leader AND state is not "leader" or "follower"
// (likely stuck in "candidate" or configuration mismatch)
if !hasLeader && raftState != "leader" && raftState != "follower" {
r.logger.Debug("Detected potential configuration mismatch",
zap.String("raft_state", raftState),
zap.Bool("has_leader", hasLeader))
// Verify all peers are also stuck
if r.allPeersAreStuck() {
return true
}
}
return false
}
// allPeersAreStuck checks if all discovered peers also report no leader
// This helps confirm we're in a cluster-wide configuration mismatch, not just a local issue
func (r *RQLiteManager) allPeersAreStuck() bool {
if r.discoveryService == nil {
r.logger.Debug("No discovery service available to check peer status")
return false
}
peers := r.discoveryService.GetActivePeers()
if len(peers) == 0 {
r.logger.Debug("No peers discovered, might be network issue")
return false // No peers discovered, might be network issue
}
// Check if we can query peers and they all report no leader
stuckCount := 0
reachableCount := 0
for _, peer := range peers {
if r.peerHasLeader(peer.HTTPAddress) {
// Peer has a leader, so we're not in cluster-wide mismatch
return false
}
// Check if peer is at least reachable
if r.isPeerReachable(peer.HTTPAddress) {
reachableCount++
stuckCount++
}
}
// If we have reachable peers and they're all stuck, we're likely in cluster-wide config mismatch
if reachableCount > 0 && stuckCount == reachableCount {
r.logger.Debug("All reachable peers are also stuck",
zap.Int("reachable_peers", reachableCount),
zap.Int("total_peers", len(peers)))
return true
}
return false
}
// peerHasLeader checks if a peer has a leader by querying its status endpoint
func (r *RQLiteManager) peerHasLeader(httpAddr string) bool {
url := fmt.Sprintf("http://%s/status", httpAddr)
client := &http.Client{Timeout: 3 * time.Second}
resp, err := client.Get(url)
if err != nil {
return false // Can't reach peer
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return false
}
var status RQLiteStatus
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
return false
}
// Peer has leader if leader address is set
return status.Store.Raft.LeaderAddr != ""
}
// isPeerReachable checks if a peer is at least responding to HTTP requests
func (r *RQLiteManager) isPeerReachable(httpAddr string) bool {
url := fmt.Sprintf("http://%s/status", httpAddr)
client := &http.Client{Timeout: 3 * time.Second}
resp, err := client.Get(url)
if err != nil {
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
}
// isSafeToClearState verifies we can safely clear Raft state
// Returns true only if peers have higher log indexes (they have more recent data)
// or if we have no meaningful state (index == 0)
func (r *RQLiteManager) isSafeToClearState(rqliteDataDir string) bool {
if r.discoveryService == nil {
r.logger.Debug("No discovery service available, cannot verify safety")
return false // No discovery service, can't verify
}
ourIndex := r.getRaftLogIndex()
peers := r.discoveryService.GetActivePeers()
if len(peers) == 0 {
r.logger.Debug("No peers discovered, might be network issue")
return false // No peers, might be network issue
}
// Find max peer log index
maxPeerIndex := uint64(0)
for _, peer := range peers {
if peer.RaftLogIndex > maxPeerIndex {
maxPeerIndex = peer.RaftLogIndex
}
}
// Safe to clear if peers have higher log indexes (they have more recent data)
// OR if we have no meaningful state (index == 0)
safe := maxPeerIndex > ourIndex || ourIndex == 0
r.logger.Debug("Checking if safe to clear Raft state",
zap.Uint64("our_log_index", ourIndex),
zap.Uint64("peer_max_log_index", maxPeerIndex),
zap.Bool("safe_to_clear", safe))
return safe
}
// performPreStartClusterDiscovery waits for peer discovery and builds a complete peers.json
// before starting RQLite. This ensures all nodes use the same cluster membership for recovery.
func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rqliteDataDir string) error {