From 1d060490a8e237ec0afd9ef63449798cf8a17e47 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Wed, 12 Nov 2025 11:18:50 +0200 Subject: [PATCH] feat: add service enable/disable functionality to production commands - Introduced new functions to check if a service is enabled and to enable or disable services as needed during production command execution. - Enhanced the `handleProdStart` and `handleProdStop` functions to manage service states more effectively, ensuring services are re-enabled after being stopped and disabled when stopped. - Improved logging to provide clear feedback on service status changes, enhancing user experience during service management. --- CHANGELOG.md | 17 +++ Makefile | 2 +- pkg/cli/prod_commands.go | 180 +++++++++++++++++++++---- pkg/discovery/discovery.go | 48 +++++-- pkg/ipfs/cluster.go | 270 +++++++++++++++++++++++++++++++++++-- pkg/node/monitoring.go | 39 +++++- pkg/node/node.go | 24 ++-- 7 files changed, 511 insertions(+), 69 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 306a1f8..5b93536 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,23 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant ### Deprecated ### Fixed +## [0.69.8] - 2025-11-12 + +### Added +- Improved `dbn prod start` to automatically unmask and re-enable services if they were previously masked or disabled. +- Added automatic discovery and configuration of all IPFS Cluster peers during runtime to improve cluster connectivity. + +### Changed +- Enhanced `dbn prod start` and `dbn prod stop` reliability by adding service state resets, retries, and ensuring services are disabled when stopped. +- Filtered peer exchange addresses in LibP2P discovery to only include the standard LibP2P port (4001), preventing exposure of internal service ports. + +### Deprecated + +### Removed + +### Fixed +- Improved IPFS Cluster bootstrap configuration repair logic to automatically infer and update bootstrap peer addresses if the bootstrap node is available. + ## [0.69.7] - 2025-11-12 ### Added diff --git a/Makefile b/Makefile index c1580ce..b2204b5 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill -VERSION := 0.69.7 +VERSION := 0.69.8 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/cli/prod_commands.go b/pkg/cli/prod_commands.go index 921ca98..9d68483 100644 --- a/pkg/cli/prod_commands.go +++ b/pkg/cli/prod_commands.go @@ -862,6 +862,22 @@ func isServiceActive(service string) (bool, error) { return true, nil } +func isServiceEnabled(service string) (bool, error) { + cmd := exec.Command("systemctl", "is-enabled", "--quiet", service) + if err := cmd.Run(); err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + switch exitErr.ExitCode() { + case 1: + return false, nil // Service is disabled + case 4: + return false, errServiceNotFound + } + } + return false, err + } + return true, nil +} + func collectPortsForServices(services []string, skipActive bool) ([]portSpec, error) { seen := make(map[int]portSpec) for _, svc := range services { @@ -998,6 +1014,19 @@ func getProductionServices() []string { return existing } +func isServiceMasked(service string) (bool, error) { + cmd := exec.Command("systemctl", "is-enabled", service) + output, err := cmd.CombinedOutput() + if err != nil { + outputStr := string(output) + if strings.Contains(outputStr, "masked") { + return true, nil + } + return false, err + } + return false, nil +} + func handleProdStart() { if os.Geteuid() != 0 { fmt.Fprintf(os.Stderr, "❌ Production commands must be run as root (use sudo)\n") @@ -1012,9 +1041,26 @@ func handleProdStart() { return } + // Reset failed state for all services before starting + // This helps with services that were previously in failed state + resetArgs := []string{"reset-failed"} + resetArgs = append(resetArgs, services...) + exec.Command("systemctl", resetArgs...).Run() + // Check which services are inactive and need to be started inactive := make([]string, 0, len(services)) for _, svc := range services { + // Check if service is masked and unmask it + masked, err := isServiceMasked(svc) + if err == nil && masked { + fmt.Printf(" ⚠️ %s is masked, unmasking...\n", svc) + if err := exec.Command("systemctl", "unmask", svc).Run(); err != nil { + fmt.Printf(" ⚠️ Failed to unmask %s: %v\n", svc, err) + } else { + fmt.Printf(" ✓ Unmasked %s\n", svc) + } + } + active, err := isServiceActive(svc) if err != nil { fmt.Printf(" ⚠️ Unable to check %s: %v\n", svc, err) @@ -1022,6 +1068,15 @@ func handleProdStart() { } if active { fmt.Printf(" ℹ️ %s already running\n", svc) + // Re-enable if disabled (in case it was stopped with 'dbn prod stop') + enabled, err := isServiceEnabled(svc) + if err == nil && !enabled { + if err := exec.Command("systemctl", "enable", svc).Run(); err != nil { + fmt.Printf(" ⚠️ Failed to re-enable %s: %v\n", svc, err) + } else { + fmt.Printf(" ✓ Re-enabled %s (will auto-start on boot)\n", svc) + } + } continue } inactive = append(inactive, svc) @@ -1043,8 +1098,19 @@ func handleProdStart() { os.Exit(1) } - // Start inactive services + // Enable and start inactive services for _, svc := range inactive { + // Re-enable the service first (in case it was disabled by 'dbn prod stop') + enabled, err := isServiceEnabled(svc) + if err == nil && !enabled { + if err := exec.Command("systemctl", "enable", svc).Run(); err != nil { + fmt.Printf(" ⚠️ Failed to enable %s: %v\n", svc, err) + } else { + fmt.Printf(" ✓ Enabled %s (will auto-start on boot)\n", svc) + } + } + + // Start the service if err := exec.Command("systemctl", "start", svc).Run(); err != nil { fmt.Printf(" ⚠️ Failed to start %s: %v\n", svc, err) } else { @@ -1052,13 +1118,37 @@ func handleProdStart() { } } - // Give services a moment to fully initialize before verification + // Give services more time to fully initialize before verification + // Some services may need more time to start up, especially if they're + // waiting for dependencies or initializing databases fmt.Printf(" ⏳ Waiting for services to initialize...\n") - time.Sleep(3 * time.Second) + time.Sleep(5 * time.Second) + + // Wait for services to actually become active (with retries) + maxRetries := 6 + for i := 0; i < maxRetries; i++ { + allActive := true + for _, svc := range inactive { + active, err := isServiceActive(svc) + if err != nil || !active { + allActive = false + break + } + } + if allActive { + break + } + if i < maxRetries-1 { + time.Sleep(2 * time.Second) + } + } // Verify all services are healthy if err := verifyProductionRuntime("prod start"); err != nil { fmt.Fprintf(os.Stderr, "❌ %v\n", err) + fmt.Fprintf(os.Stderr, "\n Services may still be starting. Check status with:\n") + fmt.Fprintf(os.Stderr, " systemctl status debros-*\n") + fmt.Fprintf(os.Stderr, " dbn prod logs \n") os.Exit(1) } @@ -1079,6 +1169,31 @@ func handleProdStop() { return } + // Stop all services at once using a single systemctl command + // This is more efficient and ensures they all stop together + stopArgs := []string{"stop"} + stopArgs = append(stopArgs, services...) + if err := exec.Command("systemctl", stopArgs...).Run(); err != nil { + fmt.Printf(" ⚠️ Warning: Some services may have failed to stop: %v\n", err) + // Continue anyway - we'll verify and handle individually below + } + + // Wait a moment for services to fully stop + time.Sleep(2 * time.Second) + + // Reset failed state for any services that might be in failed state + // This helps with services stuck in "activating auto-restart" + resetArgs := []string{"reset-failed"} + resetArgs = append(resetArgs, services...) + exec.Command("systemctl", resetArgs...).Run() + + // Wait again after reset-failed + time.Sleep(1 * time.Second) + + // Stop again to ensure they're stopped (in case reset-failed caused a restart) + exec.Command("systemctl", stopArgs...).Run() + time.Sleep(1 * time.Second) + hadError := false for _, svc := range services { active, err := isServiceActive(svc) @@ -1088,32 +1203,51 @@ func handleProdStop() { continue } if !active { - fmt.Printf(" ℹ️ %s already stopped\n", svc) - continue - } - if err := exec.Command("systemctl", "stop", svc).Run(); err != nil { - fmt.Printf(" ⚠️ Failed to stop %s: %v\n", svc, err) - hadError = true - continue - } - // Verify the service actually stopped and didn't restart itself - if stillActive, err := isServiceActive(svc); err != nil { - fmt.Printf(" ⚠️ Unable to verify %s stop: %v\n", svc, err) - hadError = true - } else if stillActive { - fmt.Printf(" ❌ %s restarted itself immediately\n", svc) - hadError = true - } else { fmt.Printf(" ✓ Stopped %s\n", svc) + } else { + // Service is still active, try stopping it individually + fmt.Printf(" ⚠️ %s still active, attempting individual stop...\n", svc) + if err := exec.Command("systemctl", "stop", svc).Run(); err != nil { + fmt.Printf(" ❌ Failed to stop %s: %v\n", svc, err) + hadError = true + } else { + // Wait and verify again + time.Sleep(1 * time.Second) + if stillActive, _ := isServiceActive(svc); stillActive { + fmt.Printf(" ❌ %s restarted itself (Restart=always)\n", svc) + hadError = true + } else { + fmt.Printf(" ✓ Stopped %s\n", svc) + } + } + } + + // Disable the service to prevent it from auto-starting on boot + enabled, err := isServiceEnabled(svc) + if err != nil { + fmt.Printf(" ⚠️ Unable to check if %s is enabled: %v\n", svc, err) + // Continue anyway - try to disable + } + if enabled { + if err := exec.Command("systemctl", "disable", svc).Run(); err != nil { + fmt.Printf(" ⚠️ Failed to disable %s: %v\n", svc, err) + hadError = true + } else { + fmt.Printf(" ✓ Disabled %s (will not auto-start on boot)\n", svc) + } + } else { + fmt.Printf(" ℹ️ %s already disabled\n", svc) } } if hadError { - fmt.Fprintf(os.Stderr, "\n❌ One or more services failed to stop cleanly\n") - os.Exit(1) + fmt.Fprintf(os.Stderr, "\n⚠️ Some services may still be restarting due to Restart=always\n") + fmt.Fprintf(os.Stderr, " Check status with: systemctl list-units 'debros-*'\n") + fmt.Fprintf(os.Stderr, " If services are still restarting, they may need manual intervention\n") + } else { + fmt.Printf("\n✅ All services stopped and disabled (will not auto-start on boot)\n") + fmt.Printf(" Use 'dbn prod start' to start and re-enable services\n") } - - fmt.Printf("\n✅ All services stopped and remain inactive\n") } func handleProdRestart() { diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index 45a6555..e9481af 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -115,30 +115,39 @@ func (d *Manager) handlePeerExchangeStream(s network.Stream) { continue } - // Include all addresses with valid TCP ports - // This allows test clients and dynamic allocations to participate in peer discovery + // Filter addresses to only include port 4001 (standard libp2p port) + // This prevents including non-libp2p service ports (like RQLite ports) in peer exchange + const libp2pPort = 4001 filteredAddrs := make([]multiaddr.Multiaddr, 0) + filteredCount := 0 for _, addr := range addrs { // Extract TCP port from multiaddr port, err := addr.ValueForProtocol(multiaddr.P_TCP) if err == nil { portNum, err := strconv.Atoi(port) if err == nil { - // Accept all valid TCP ports > 0, including ephemeral ports - // Test clients and dynamic allocations may use high ports (> 32768) - if portNum > 0 { + // Only include addresses with port 4001 + if portNum == libp2pPort { filteredAddrs = append(filteredAddrs, addr) + } else { + filteredCount++ } - } else { - // If we can't parse port, include it anyway (might be non-TCP) - filteredAddrs = append(filteredAddrs, addr) } + // Skip addresses with unparseable ports } else { - // If no TCP port found, include it anyway (might be non-TCP) - filteredAddrs = append(filteredAddrs, addr) + // Skip non-TCP addresses (libp2p uses TCP) + filteredCount++ } } + // Log if addresses were filtered out + if filteredCount > 0 { + d.logger.Debug("Filtered out non-libp2p addresses", + zap.String("peer_id", pid.String()[:8]+"..."), + zap.Int("filtered_count", filteredCount), + zap.Int("valid_count", len(filteredAddrs))) + } + // If no addresses remain after filtering, skip this peer if len(filteredAddrs) == 0 { d.logger.Debug("No valid addresses after filtering", @@ -334,7 +343,8 @@ func (d *Manager) discoverViaPeerExchange(ctx context.Context, maxConnections in continue } - // Parse addresses + // Parse and filter addresses to only include port 4001 (standard libp2p port) + const libp2pPort = 4001 addrs := make([]multiaddr.Multiaddr, 0, len(peerInfo.Addrs)) for _, addrStr := range peerInfo.Addrs { ma, err := multiaddr.NewMultiaddr(addrStr) @@ -342,14 +352,26 @@ func (d *Manager) discoverViaPeerExchange(ctx context.Context, maxConnections in d.logger.Debug("Failed to parse multiaddr", zap.Error(err)) continue } - addrs = append(addrs, ma) + // Only include addresses with port 4001 + port, err := ma.ValueForProtocol(multiaddr.P_TCP) + if err == nil { + portNum, err := strconv.Atoi(port) + if err == nil && portNum == libp2pPort { + addrs = append(addrs, ma) + } + // Skip addresses with wrong ports + } + // Skip non-TCP addresses } if len(addrs) == 0 { + d.logger.Debug("No valid libp2p addresses (port 4001) for peer", + zap.String("peer_id", parsedID.String()[:8]+"..."), + zap.Int("total_addresses", len(peerInfo.Addrs))) continue } - // Add to peerstore + // Add to peerstore (only valid addresses with port 4001) d.host.Peerstore().AddAddrs(parsedID, addrs, time.Hour*24) // Try to connect diff --git a/pkg/ipfs/cluster.go b/pkg/ipfs/cluster.go index effbee6..3228333 100644 --- a/pkg/ipfs/cluster.go +++ b/pkg/ipfs/cluster.go @@ -19,6 +19,7 @@ import ( "go.uber.org/zap" "github.com/DeBrosOfficial/network/pkg/config" + "github.com/multiformats/go-multiaddr" ) // ClusterConfigManager manages IPFS Cluster configuration files @@ -212,31 +213,36 @@ func (cm *ClusterConfigManager) EnsureConfig() error { } // UpdateBootstrapPeers updates peer_addresses and peerstore with bootstrap peer information -func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) error { +// Returns true if update was successful, false if bootstrap is not available yet (non-fatal) +func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) (bool, error) { if cm.cfg.Database.IPFS.ClusterAPIURL == "" { - return nil // IPFS not configured + return false, nil // IPFS not configured } // Skip if this is the bootstrap node itself if cm.cfg.Node.Type == "bootstrap" { - return nil + return false, nil } // Query bootstrap cluster API to get peer ID peerID, err := getBootstrapPeerID(bootstrapAPIURL) if err != nil { - return fmt.Errorf("failed to get bootstrap peer ID: %w", err) + // Non-fatal: bootstrap might not be available yet + cm.logger.Debug("Bootstrap peer not available yet, will retry", + zap.String("bootstrap_api", bootstrapAPIURL), + zap.Error(err)) + return false, nil } if peerID == "" { - cm.logger.Warn("Bootstrap peer ID not available yet") - return nil + cm.logger.Debug("Bootstrap peer ID not available yet") + return false, nil } // Extract bootstrap host and cluster port from URL bootstrapHost, clusterPort, err := parseBootstrapHostAndPort(bootstrapAPIURL) if err != nil { - return fmt.Errorf("failed to parse bootstrap cluster API URL: %w", err) + return false, fmt.Errorf("failed to parse bootstrap cluster API URL: %w", err) } // Bootstrap listens on clusterPort + 2 (same pattern) @@ -256,7 +262,13 @@ func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) err serviceJSONPath := filepath.Join(cm.clusterPath, "service.json") cfg, err := cm.loadOrCreateConfig(serviceJSONPath) if err != nil { - return fmt.Errorf("failed to load config: %w", err) + return false, fmt.Errorf("failed to load config: %w", err) + } + + // Check if we already have the correct address configured + if len(cfg.Cluster.PeerAddresses) > 0 && cfg.Cluster.PeerAddresses[0] == bootstrapPeerAddr { + cm.logger.Debug("Bootstrap peer address already correct", zap.String("addr", bootstrapPeerAddr)) + return true, nil } // Update peer_addresses @@ -264,20 +276,234 @@ func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) err // Save config if err := cm.saveConfig(serviceJSONPath, cfg); err != nil { - return fmt.Errorf("failed to save config: %w", err) + return false, fmt.Errorf("failed to save config: %w", err) } // Write to peerstore file peerstorePath := filepath.Join(cm.clusterPath, "peerstore") if err := os.WriteFile(peerstorePath, []byte(bootstrapPeerAddr+"\n"), 0644); err != nil { - return fmt.Errorf("failed to write peerstore: %w", err) + return false, fmt.Errorf("failed to write peerstore: %w", err) } cm.logger.Info("Updated bootstrap peer configuration", zap.String("bootstrap_peer_addr", bootstrapPeerAddr), zap.String("peerstore_path", peerstorePath)) - return nil + return true, nil +} + +// UpdateAllClusterPeers discovers all cluster peers from the local cluster API +// and updates peer_addresses in service.json. This allows IPFS Cluster to automatically +// connect to all discovered peers in the cluster. +// Returns true if update was successful, false if cluster is not available yet (non-fatal) +func (cm *ClusterConfigManager) UpdateAllClusterPeers() (bool, error) { + if cm.cfg.Database.IPFS.ClusterAPIURL == "" { + return false, nil // IPFS not configured + } + + // Query local cluster API to get all peers + client := &standardHTTPClient{} + peersURL := fmt.Sprintf("%s/peers", cm.cfg.Database.IPFS.ClusterAPIURL) + resp, err := client.Get(peersURL) + if err != nil { + // Non-fatal: cluster might not be available yet + cm.logger.Debug("Cluster API not available yet, will retry", + zap.String("peers_url", peersURL), + zap.Error(err)) + return false, nil + } + + // Parse NDJSON response + dec := json.NewDecoder(bytes.NewReader(resp)) + var allPeerAddresses []string + seenPeers := make(map[string]bool) + peerIDToAddresses := make(map[string][]string) + + // First pass: collect all peer IDs and their addresses + for { + var peerInfo struct { + ID string `json:"id"` + Addresses []string `json:"addresses"` + ClusterPeers []string `json:"cluster_peers"` + ClusterPeersAddresses []string `json:"cluster_peers_addresses"` + } + + err := dec.Decode(&peerInfo) + if err != nil { + if err == io.EOF { + break + } + cm.logger.Debug("Failed to decode peer info", zap.Error(err)) + continue + } + + // Store this peer's addresses + if peerInfo.ID != "" { + peerIDToAddresses[peerInfo.ID] = peerInfo.Addresses + } + + // Also collect cluster peers addresses if available + // These are addresses of all peers in the cluster + for _, addr := range peerInfo.ClusterPeersAddresses { + if ma, err := multiaddr.NewMultiaddr(addr); err == nil { + // Validate it has p2p component (peer ID) + if _, err := ma.ValueForProtocol(multiaddr.P_P2P); err == nil { + addrStr := ma.String() + if !seenPeers[addrStr] { + allPeerAddresses = append(allPeerAddresses, addrStr) + seenPeers[addrStr] = true + } + } + } + } + } + + // If we didn't get cluster_peers_addresses, try to construct them from peer IDs and addresses + if len(allPeerAddresses) == 0 && len(peerIDToAddresses) > 0 { + // Get cluster listen port from config + serviceJSONPath := filepath.Join(cm.clusterPath, "service.json") + cfg, err := cm.loadOrCreateConfig(serviceJSONPath) + if err == nil && len(cfg.Cluster.ListenMultiaddress) > 0 { + // Extract port from listen_multiaddress (e.g., "/ip4/0.0.0.0/tcp/9098") + listenAddr := cfg.Cluster.ListenMultiaddress[0] + if ma, err := multiaddr.NewMultiaddr(listenAddr); err == nil { + if port, err := ma.ValueForProtocol(multiaddr.P_TCP); err == nil { + // For each peer ID, try to find its IP address and construct cluster multiaddr + for peerID, addresses := range peerIDToAddresses { + // Try to find an IP address in the peer's addresses + for _, addrStr := range addresses { + if ma, err := multiaddr.NewMultiaddr(addrStr); err == nil { + // Extract IP address (IPv4 or IPv6) + if ip, err := ma.ValueForProtocol(multiaddr.P_IP4); err == nil && ip != "" { + clusterAddr := fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", ip, port, peerID) + if !seenPeers[clusterAddr] { + allPeerAddresses = append(allPeerAddresses, clusterAddr) + seenPeers[clusterAddr] = true + } + break + } else if ip, err := ma.ValueForProtocol(multiaddr.P_IP6); err == nil && ip != "" { + clusterAddr := fmt.Sprintf("/ip6/%s/tcp/%s/p2p/%s", ip, port, peerID) + if !seenPeers[clusterAddr] { + allPeerAddresses = append(allPeerAddresses, clusterAddr) + seenPeers[clusterAddr] = true + } + break + } + } + } + } + } + } + } + } + + if len(allPeerAddresses) == 0 { + cm.logger.Debug("No cluster peer addresses found in API response") + return false, nil + } + + // Load current config + serviceJSONPath := filepath.Join(cm.clusterPath, "service.json") + cfg, err := cm.loadOrCreateConfig(serviceJSONPath) + if err != nil { + return false, fmt.Errorf("failed to load config: %w", err) + } + + // Check if peer addresses have changed + addressesChanged := false + if len(cfg.Cluster.PeerAddresses) != len(allPeerAddresses) { + addressesChanged = true + } else { + // Check if addresses are different + currentAddrs := make(map[string]bool) + for _, addr := range cfg.Cluster.PeerAddresses { + currentAddrs[addr] = true + } + for _, addr := range allPeerAddresses { + if !currentAddrs[addr] { + addressesChanged = true + break + } + } + } + + if !addressesChanged { + cm.logger.Debug("Cluster peer addresses already up to date", + zap.Int("peer_count", len(allPeerAddresses))) + return true, nil + } + + // Update peer_addresses + cfg.Cluster.PeerAddresses = allPeerAddresses + + // Save config + if err := cm.saveConfig(serviceJSONPath, cfg); err != nil { + return false, fmt.Errorf("failed to save config: %w", err) + } + + // Also update peerstore file + peerstorePath := filepath.Join(cm.clusterPath, "peerstore") + peerstoreContent := strings.Join(allPeerAddresses, "\n") + "\n" + if err := os.WriteFile(peerstorePath, []byte(peerstoreContent), 0644); err != nil { + cm.logger.Warn("Failed to update peerstore file", zap.Error(err)) + // Non-fatal, continue + } + + cm.logger.Info("Updated cluster peer addresses", + zap.Int("peer_count", len(allPeerAddresses)), + zap.Strings("peer_addresses", allPeerAddresses)) + + return true, nil +} + +// RepairBootstrapPeers automatically discovers and repairs bootstrap peer configuration +// Tries multiple methods: config-based discovery, bootstrap peer multiaddr, or discovery service +func (cm *ClusterConfigManager) RepairBootstrapPeers() (bool, error) { + if cm.cfg.Database.IPFS.ClusterAPIURL == "" { + return false, nil // IPFS not configured + } + + // Skip if this is the bootstrap node itself + if cm.cfg.Node.Type == "bootstrap" { + return false, nil + } + + // Method 1: Try to use bootstrap API URL from config if available + // Check if we have a bootstrap node's cluster API URL in discovery metadata + // For now, we'll infer from bootstrap peers multiaddr + + var bootstrapAPIURL string + + // Try to extract from bootstrap peers multiaddr + if len(cm.cfg.Discovery.BootstrapPeers) > 0 { + if ip := extractIPFromMultiaddrForCluster(cm.cfg.Discovery.BootstrapPeers[0]); ip != "" { + // Default cluster API port is 9094 + bootstrapAPIURL = fmt.Sprintf("http://%s:9094", ip) + cm.logger.Debug("Inferred bootstrap cluster API from bootstrap peer", + zap.String("bootstrap_api", bootstrapAPIURL)) + } + } + + // Fallback to localhost if nothing found (for local development) + if bootstrapAPIURL == "" { + bootstrapAPIURL = "http://localhost:9094" + cm.logger.Debug("Using localhost fallback for bootstrap cluster API") + } + + // Try to update bootstrap peers + success, err := cm.UpdateBootstrapPeers(bootstrapAPIURL) + if err != nil { + return false, err + } + + if success { + cm.logger.Info("Successfully repaired bootstrap peer configuration") + return true, nil + } + + // If update failed (bootstrap not available), return false but no error + // This allows retries later + return false, nil } // loadOrCreateConfig loads existing service.json or creates a template @@ -644,6 +870,28 @@ func (c *standardHTTPClient) Get(url string) ([]byte, error) { return data, nil } +// extractIPFromMultiaddrForCluster extracts IP address from a LibP2P multiaddr string +// Used for inferring bootstrap cluster API URL +func extractIPFromMultiaddrForCluster(multiaddrStr string) string { + // Parse multiaddr + ma, err := multiaddr.NewMultiaddr(multiaddrStr) + if err != nil { + return "" + } + + // Try to extract IPv4 address + if ipv4, err := ma.ValueForProtocol(multiaddr.P_IP4); err == nil && ipv4 != "" { + return ipv4 + } + + // Try to extract IPv6 address + if ipv6, err := ma.ValueForProtocol(multiaddr.P_IP6); err == nil && ipv6 != "" { + return ipv6 + } + + return "" +} + // FixIPFSConfigAddresses fixes localhost addresses in IPFS config to use 127.0.0.1 // This is necessary because IPFS doesn't accept "localhost" as a valid IP address in multiaddrs // This function always ensures the config is correct, regardless of current state diff --git a/pkg/node/monitoring.go b/pkg/node/monitoring.go index bf9931f..bc374d2 100644 --- a/pkg/node/monitoring.go +++ b/pkg/node/monitoring.go @@ -10,6 +10,8 @@ import ( "github.com/mackerelio/go-osstat/cpu" "github.com/mackerelio/go-osstat/memory" "go.uber.org/zap" + + "github.com/DeBrosOfficial/network/pkg/logging" ) func logPeerStatus(n *Node, currentPeerCount int, lastPeerCount int, firstCheck bool) (int, bool) { @@ -91,13 +93,13 @@ func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory } msg := struct { - PeerID string `json:"peer_id"` - PeerCount int `json:"peer_count"` - PeerIDs []string `json:"peer_ids,omitempty"` - CPU uint64 `json:"cpu_usage"` - Memory uint64 `json:"memory_usage"` - Timestamp int64 `json:"timestamp"` - ClusterHealth map[string]interface{} `json:"cluster_health,omitempty"` + PeerID string `json:"peer_id"` + PeerCount int `json:"peer_count"` + PeerIDs []string `json:"peer_ids,omitempty"` + CPU uint64 `json:"cpu_usage"` + Memory uint64 `json:"memory_usage"` + Timestamp int64 `json:"timestamp"` + ClusterHealth map[string]interface{} `json:"cluster_health,omitempty"` }{ PeerID: n.host.ID().String(), PeerCount: len(peers), @@ -210,6 +212,29 @@ func (n *Node) startConnectionMonitoring() { if err := announceMetrics(n, peers, cpuUsage, mem); err != nil { n.logger.Error("Failed to announce metrics", zap.Error(err)) } + + // Periodically update IPFS Cluster peer addresses + // This discovers all cluster peers and updates peer_addresses in service.json + // so IPFS Cluster can automatically connect to all discovered peers + if n.clusterConfigManager != nil { + // Update all cluster peers every 2 minutes to discover new peers + if time.Now().Unix()%120 == 0 { + if success, err := n.clusterConfigManager.UpdateAllClusterPeers(); err != nil { + n.logger.ComponentWarn(logging.ComponentNode, "Failed to update cluster peers during monitoring", zap.Error(err)) + } else if success { + n.logger.ComponentInfo(logging.ComponentNode, "Cluster peer addresses updated during monitoring") + } + + // Also try to repair bootstrap peers if this is not a bootstrap node + if n.config.Node.Type != "bootstrap" { + if success, err := n.clusterConfigManager.RepairBootstrapPeers(); err != nil { + n.logger.ComponentWarn(logging.ComponentNode, "Failed to repair bootstrap peers during monitoring", zap.Error(err)) + } else if success { + n.logger.ComponentInfo(logging.ComponentNode, "Bootstrap peer configuration repaired during monitoring") + } + } + } + } } }() diff --git a/pkg/node/node.go b/pkg/node/node.go index bdded3c..aa7edd8 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -372,7 +372,7 @@ func (n *Node) startLibP2P() error { // For production, these would be enabled isLocalhost := len(n.config.Node.ListenAddresses) > 0 && (strings.Contains(n.config.Node.ListenAddresses[0], "localhost") || - strings.Contains(n.config.Node.ListenAddresses[0], "localhost")) + strings.Contains(n.config.Node.ListenAddresses[0], "127.0.0.1")) if isLocalhost { n.logger.ComponentInfo(logging.ComponentLibP2P, "Localhost detected - disabling NAT services for local development") @@ -732,19 +732,15 @@ func (n *Node) startIPFSClusterConfig() error { return fmt.Errorf("failed to ensure cluster config: %w", err) } - // If this is not the bootstrap node, try to update bootstrap peer info - if n.config.Node.Type != "bootstrap" && len(n.config.Discovery.BootstrapPeers) > 0 { - // Infer bootstrap cluster API URL from first bootstrap peer multiaddr - bootstrapClusterAPI := "http://localhost:9094" // Default fallback - if len(n.config.Discovery.BootstrapPeers) > 0 { - // Extract IP from first bootstrap peer multiaddr - if ip := extractIPFromMultiaddr(n.config.Discovery.BootstrapPeers[0]); ip != "" { - bootstrapClusterAPI = fmt.Sprintf("http://%s:9094", ip) - } - } - if err := cm.UpdateBootstrapPeers(bootstrapClusterAPI); err != nil { - n.logger.ComponentWarn(logging.ComponentNode, "Failed to update bootstrap peers, will retry later", zap.Error(err)) - // Don't fail - peers can connect later via mDNS or manual config + // Try to repair bootstrap peer configuration automatically + // This will be retried periodically if bootstrap is not available yet + if n.config.Node.Type != "bootstrap" { + if success, err := cm.RepairBootstrapPeers(); err != nil { + n.logger.ComponentWarn(logging.ComponentNode, "Failed to repair bootstrap peers, will retry later", zap.Error(err)) + } else if success { + n.logger.ComponentInfo(logging.ComponentNode, "Bootstrap peer configuration repaired successfully") + } else { + n.logger.ComponentDebug(logging.ComponentNode, "Bootstrap peer not available yet, will retry periodically") } }