mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-12-11 08:18:49 +00:00
feat: add service enable/disable functionality to production commands
- Introduced new functions to check if a service is enabled and to enable or disable services as needed during production command execution. - Enhanced the `handleProdStart` and `handleProdStop` functions to manage service states more effectively, ensuring services are re-enabled after being stopped and disabled when stopped. - Improved logging to provide clear feedback on service status changes, enhancing user experience during service management.
This commit is contained in:
parent
0421155594
commit
1d060490a8
17
CHANGELOG.md
17
CHANGELOG.md
@ -13,6 +13,23 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant
|
||||
### Deprecated
|
||||
|
||||
### Fixed
|
||||
## [0.69.8] - 2025-11-12
|
||||
|
||||
### Added
|
||||
- Improved `dbn prod start` to automatically unmask and re-enable services if they were previously masked or disabled.
|
||||
- Added automatic discovery and configuration of all IPFS Cluster peers during runtime to improve cluster connectivity.
|
||||
|
||||
### Changed
|
||||
- Enhanced `dbn prod start` and `dbn prod stop` reliability by adding service state resets, retries, and ensuring services are disabled when stopped.
|
||||
- Filtered peer exchange addresses in LibP2P discovery to only include the standard LibP2P port (4001), preventing exposure of internal service ports.
|
||||
|
||||
### Deprecated
|
||||
|
||||
### Removed
|
||||
|
||||
### Fixed
|
||||
- Improved IPFS Cluster bootstrap configuration repair logic to automatically infer and update bootstrap peer addresses if the bootstrap node is available.
|
||||
|
||||
## [0.69.7] - 2025-11-12
|
||||
|
||||
### Added
|
||||
|
||||
2
Makefile
2
Makefile
@ -19,7 +19,7 @@ test-e2e:
|
||||
|
||||
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill
|
||||
|
||||
VERSION := 0.69.7
|
||||
VERSION := 0.69.8
|
||||
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
|
||||
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
|
||||
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
|
||||
|
||||
@ -862,6 +862,22 @@ func isServiceActive(service string) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func isServiceEnabled(service string) (bool, error) {
|
||||
cmd := exec.Command("systemctl", "is-enabled", "--quiet", service)
|
||||
if err := cmd.Run(); err != nil {
|
||||
if exitErr, ok := err.(*exec.ExitError); ok {
|
||||
switch exitErr.ExitCode() {
|
||||
case 1:
|
||||
return false, nil // Service is disabled
|
||||
case 4:
|
||||
return false, errServiceNotFound
|
||||
}
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func collectPortsForServices(services []string, skipActive bool) ([]portSpec, error) {
|
||||
seen := make(map[int]portSpec)
|
||||
for _, svc := range services {
|
||||
@ -998,6 +1014,19 @@ func getProductionServices() []string {
|
||||
return existing
|
||||
}
|
||||
|
||||
func isServiceMasked(service string) (bool, error) {
|
||||
cmd := exec.Command("systemctl", "is-enabled", service)
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
outputStr := string(output)
|
||||
if strings.Contains(outputStr, "masked") {
|
||||
return true, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func handleProdStart() {
|
||||
if os.Geteuid() != 0 {
|
||||
fmt.Fprintf(os.Stderr, "❌ Production commands must be run as root (use sudo)\n")
|
||||
@ -1012,9 +1041,26 @@ func handleProdStart() {
|
||||
return
|
||||
}
|
||||
|
||||
// Reset failed state for all services before starting
|
||||
// This helps with services that were previously in failed state
|
||||
resetArgs := []string{"reset-failed"}
|
||||
resetArgs = append(resetArgs, services...)
|
||||
exec.Command("systemctl", resetArgs...).Run()
|
||||
|
||||
// Check which services are inactive and need to be started
|
||||
inactive := make([]string, 0, len(services))
|
||||
for _, svc := range services {
|
||||
// Check if service is masked and unmask it
|
||||
masked, err := isServiceMasked(svc)
|
||||
if err == nil && masked {
|
||||
fmt.Printf(" ⚠️ %s is masked, unmasking...\n", svc)
|
||||
if err := exec.Command("systemctl", "unmask", svc).Run(); err != nil {
|
||||
fmt.Printf(" ⚠️ Failed to unmask %s: %v\n", svc, err)
|
||||
} else {
|
||||
fmt.Printf(" ✓ Unmasked %s\n", svc)
|
||||
}
|
||||
}
|
||||
|
||||
active, err := isServiceActive(svc)
|
||||
if err != nil {
|
||||
fmt.Printf(" ⚠️ Unable to check %s: %v\n", svc, err)
|
||||
@ -1022,6 +1068,15 @@ func handleProdStart() {
|
||||
}
|
||||
if active {
|
||||
fmt.Printf(" ℹ️ %s already running\n", svc)
|
||||
// Re-enable if disabled (in case it was stopped with 'dbn prod stop')
|
||||
enabled, err := isServiceEnabled(svc)
|
||||
if err == nil && !enabled {
|
||||
if err := exec.Command("systemctl", "enable", svc).Run(); err != nil {
|
||||
fmt.Printf(" ⚠️ Failed to re-enable %s: %v\n", svc, err)
|
||||
} else {
|
||||
fmt.Printf(" ✓ Re-enabled %s (will auto-start on boot)\n", svc)
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
inactive = append(inactive, svc)
|
||||
@ -1043,8 +1098,19 @@ func handleProdStart() {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Start inactive services
|
||||
// Enable and start inactive services
|
||||
for _, svc := range inactive {
|
||||
// Re-enable the service first (in case it was disabled by 'dbn prod stop')
|
||||
enabled, err := isServiceEnabled(svc)
|
||||
if err == nil && !enabled {
|
||||
if err := exec.Command("systemctl", "enable", svc).Run(); err != nil {
|
||||
fmt.Printf(" ⚠️ Failed to enable %s: %v\n", svc, err)
|
||||
} else {
|
||||
fmt.Printf(" ✓ Enabled %s (will auto-start on boot)\n", svc)
|
||||
}
|
||||
}
|
||||
|
||||
// Start the service
|
||||
if err := exec.Command("systemctl", "start", svc).Run(); err != nil {
|
||||
fmt.Printf(" ⚠️ Failed to start %s: %v\n", svc, err)
|
||||
} else {
|
||||
@ -1052,13 +1118,37 @@ func handleProdStart() {
|
||||
}
|
||||
}
|
||||
|
||||
// Give services a moment to fully initialize before verification
|
||||
// Give services more time to fully initialize before verification
|
||||
// Some services may need more time to start up, especially if they're
|
||||
// waiting for dependencies or initializing databases
|
||||
fmt.Printf(" ⏳ Waiting for services to initialize...\n")
|
||||
time.Sleep(3 * time.Second)
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
// Wait for services to actually become active (with retries)
|
||||
maxRetries := 6
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
allActive := true
|
||||
for _, svc := range inactive {
|
||||
active, err := isServiceActive(svc)
|
||||
if err != nil || !active {
|
||||
allActive = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if allActive {
|
||||
break
|
||||
}
|
||||
if i < maxRetries-1 {
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify all services are healthy
|
||||
if err := verifyProductionRuntime("prod start"); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "❌ %v\n", err)
|
||||
fmt.Fprintf(os.Stderr, "\n Services may still be starting. Check status with:\n")
|
||||
fmt.Fprintf(os.Stderr, " systemctl status debros-*\n")
|
||||
fmt.Fprintf(os.Stderr, " dbn prod logs <service>\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
@ -1079,6 +1169,31 @@ func handleProdStop() {
|
||||
return
|
||||
}
|
||||
|
||||
// Stop all services at once using a single systemctl command
|
||||
// This is more efficient and ensures they all stop together
|
||||
stopArgs := []string{"stop"}
|
||||
stopArgs = append(stopArgs, services...)
|
||||
if err := exec.Command("systemctl", stopArgs...).Run(); err != nil {
|
||||
fmt.Printf(" ⚠️ Warning: Some services may have failed to stop: %v\n", err)
|
||||
// Continue anyway - we'll verify and handle individually below
|
||||
}
|
||||
|
||||
// Wait a moment for services to fully stop
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Reset failed state for any services that might be in failed state
|
||||
// This helps with services stuck in "activating auto-restart"
|
||||
resetArgs := []string{"reset-failed"}
|
||||
resetArgs = append(resetArgs, services...)
|
||||
exec.Command("systemctl", resetArgs...).Run()
|
||||
|
||||
// Wait again after reset-failed
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Stop again to ensure they're stopped (in case reset-failed caused a restart)
|
||||
exec.Command("systemctl", stopArgs...).Run()
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
hadError := false
|
||||
for _, svc := range services {
|
||||
active, err := isServiceActive(svc)
|
||||
@ -1088,32 +1203,51 @@ func handleProdStop() {
|
||||
continue
|
||||
}
|
||||
if !active {
|
||||
fmt.Printf(" ℹ️ %s already stopped\n", svc)
|
||||
continue
|
||||
}
|
||||
if err := exec.Command("systemctl", "stop", svc).Run(); err != nil {
|
||||
fmt.Printf(" ⚠️ Failed to stop %s: %v\n", svc, err)
|
||||
hadError = true
|
||||
continue
|
||||
}
|
||||
// Verify the service actually stopped and didn't restart itself
|
||||
if stillActive, err := isServiceActive(svc); err != nil {
|
||||
fmt.Printf(" ⚠️ Unable to verify %s stop: %v\n", svc, err)
|
||||
hadError = true
|
||||
} else if stillActive {
|
||||
fmt.Printf(" ❌ %s restarted itself immediately\n", svc)
|
||||
hadError = true
|
||||
} else {
|
||||
fmt.Printf(" ✓ Stopped %s\n", svc)
|
||||
} else {
|
||||
// Service is still active, try stopping it individually
|
||||
fmt.Printf(" ⚠️ %s still active, attempting individual stop...\n", svc)
|
||||
if err := exec.Command("systemctl", "stop", svc).Run(); err != nil {
|
||||
fmt.Printf(" ❌ Failed to stop %s: %v\n", svc, err)
|
||||
hadError = true
|
||||
} else {
|
||||
// Wait and verify again
|
||||
time.Sleep(1 * time.Second)
|
||||
if stillActive, _ := isServiceActive(svc); stillActive {
|
||||
fmt.Printf(" ❌ %s restarted itself (Restart=always)\n", svc)
|
||||
hadError = true
|
||||
} else {
|
||||
fmt.Printf(" ✓ Stopped %s\n", svc)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Disable the service to prevent it from auto-starting on boot
|
||||
enabled, err := isServiceEnabled(svc)
|
||||
if err != nil {
|
||||
fmt.Printf(" ⚠️ Unable to check if %s is enabled: %v\n", svc, err)
|
||||
// Continue anyway - try to disable
|
||||
}
|
||||
if enabled {
|
||||
if err := exec.Command("systemctl", "disable", svc).Run(); err != nil {
|
||||
fmt.Printf(" ⚠️ Failed to disable %s: %v\n", svc, err)
|
||||
hadError = true
|
||||
} else {
|
||||
fmt.Printf(" ✓ Disabled %s (will not auto-start on boot)\n", svc)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf(" ℹ️ %s already disabled\n", svc)
|
||||
}
|
||||
}
|
||||
|
||||
if hadError {
|
||||
fmt.Fprintf(os.Stderr, "\n❌ One or more services failed to stop cleanly\n")
|
||||
os.Exit(1)
|
||||
fmt.Fprintf(os.Stderr, "\n⚠️ Some services may still be restarting due to Restart=always\n")
|
||||
fmt.Fprintf(os.Stderr, " Check status with: systemctl list-units 'debros-*'\n")
|
||||
fmt.Fprintf(os.Stderr, " If services are still restarting, they may need manual intervention\n")
|
||||
} else {
|
||||
fmt.Printf("\n✅ All services stopped and disabled (will not auto-start on boot)\n")
|
||||
fmt.Printf(" Use 'dbn prod start' to start and re-enable services\n")
|
||||
}
|
||||
|
||||
fmt.Printf("\n✅ All services stopped and remain inactive\n")
|
||||
}
|
||||
|
||||
func handleProdRestart() {
|
||||
|
||||
@ -115,30 +115,39 @@ func (d *Manager) handlePeerExchangeStream(s network.Stream) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Include all addresses with valid TCP ports
|
||||
// This allows test clients and dynamic allocations to participate in peer discovery
|
||||
// Filter addresses to only include port 4001 (standard libp2p port)
|
||||
// This prevents including non-libp2p service ports (like RQLite ports) in peer exchange
|
||||
const libp2pPort = 4001
|
||||
filteredAddrs := make([]multiaddr.Multiaddr, 0)
|
||||
filteredCount := 0
|
||||
for _, addr := range addrs {
|
||||
// Extract TCP port from multiaddr
|
||||
port, err := addr.ValueForProtocol(multiaddr.P_TCP)
|
||||
if err == nil {
|
||||
portNum, err := strconv.Atoi(port)
|
||||
if err == nil {
|
||||
// Accept all valid TCP ports > 0, including ephemeral ports
|
||||
// Test clients and dynamic allocations may use high ports (> 32768)
|
||||
if portNum > 0 {
|
||||
// Only include addresses with port 4001
|
||||
if portNum == libp2pPort {
|
||||
filteredAddrs = append(filteredAddrs, addr)
|
||||
} else {
|
||||
filteredCount++
|
||||
}
|
||||
} else {
|
||||
// If we can't parse port, include it anyway (might be non-TCP)
|
||||
filteredAddrs = append(filteredAddrs, addr)
|
||||
}
|
||||
// Skip addresses with unparseable ports
|
||||
} else {
|
||||
// If no TCP port found, include it anyway (might be non-TCP)
|
||||
filteredAddrs = append(filteredAddrs, addr)
|
||||
// Skip non-TCP addresses (libp2p uses TCP)
|
||||
filteredCount++
|
||||
}
|
||||
}
|
||||
|
||||
// Log if addresses were filtered out
|
||||
if filteredCount > 0 {
|
||||
d.logger.Debug("Filtered out non-libp2p addresses",
|
||||
zap.String("peer_id", pid.String()[:8]+"..."),
|
||||
zap.Int("filtered_count", filteredCount),
|
||||
zap.Int("valid_count", len(filteredAddrs)))
|
||||
}
|
||||
|
||||
// If no addresses remain after filtering, skip this peer
|
||||
if len(filteredAddrs) == 0 {
|
||||
d.logger.Debug("No valid addresses after filtering",
|
||||
@ -334,7 +343,8 @@ func (d *Manager) discoverViaPeerExchange(ctx context.Context, maxConnections in
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse addresses
|
||||
// Parse and filter addresses to only include port 4001 (standard libp2p port)
|
||||
const libp2pPort = 4001
|
||||
addrs := make([]multiaddr.Multiaddr, 0, len(peerInfo.Addrs))
|
||||
for _, addrStr := range peerInfo.Addrs {
|
||||
ma, err := multiaddr.NewMultiaddr(addrStr)
|
||||
@ -342,14 +352,26 @@ func (d *Manager) discoverViaPeerExchange(ctx context.Context, maxConnections in
|
||||
d.logger.Debug("Failed to parse multiaddr", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
addrs = append(addrs, ma)
|
||||
// Only include addresses with port 4001
|
||||
port, err := ma.ValueForProtocol(multiaddr.P_TCP)
|
||||
if err == nil {
|
||||
portNum, err := strconv.Atoi(port)
|
||||
if err == nil && portNum == libp2pPort {
|
||||
addrs = append(addrs, ma)
|
||||
}
|
||||
// Skip addresses with wrong ports
|
||||
}
|
||||
// Skip non-TCP addresses
|
||||
}
|
||||
|
||||
if len(addrs) == 0 {
|
||||
d.logger.Debug("No valid libp2p addresses (port 4001) for peer",
|
||||
zap.String("peer_id", parsedID.String()[:8]+"..."),
|
||||
zap.Int("total_addresses", len(peerInfo.Addrs)))
|
||||
continue
|
||||
}
|
||||
|
||||
// Add to peerstore
|
||||
// Add to peerstore (only valid addresses with port 4001)
|
||||
d.host.Peerstore().AddAddrs(parsedID, addrs, time.Hour*24)
|
||||
|
||||
// Try to connect
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/config"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
// ClusterConfigManager manages IPFS Cluster configuration files
|
||||
@ -212,31 +213,36 @@ func (cm *ClusterConfigManager) EnsureConfig() error {
|
||||
}
|
||||
|
||||
// UpdateBootstrapPeers updates peer_addresses and peerstore with bootstrap peer information
|
||||
func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) error {
|
||||
// Returns true if update was successful, false if bootstrap is not available yet (non-fatal)
|
||||
func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) (bool, error) {
|
||||
if cm.cfg.Database.IPFS.ClusterAPIURL == "" {
|
||||
return nil // IPFS not configured
|
||||
return false, nil // IPFS not configured
|
||||
}
|
||||
|
||||
// Skip if this is the bootstrap node itself
|
||||
if cm.cfg.Node.Type == "bootstrap" {
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Query bootstrap cluster API to get peer ID
|
||||
peerID, err := getBootstrapPeerID(bootstrapAPIURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get bootstrap peer ID: %w", err)
|
||||
// Non-fatal: bootstrap might not be available yet
|
||||
cm.logger.Debug("Bootstrap peer not available yet, will retry",
|
||||
zap.String("bootstrap_api", bootstrapAPIURL),
|
||||
zap.Error(err))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if peerID == "" {
|
||||
cm.logger.Warn("Bootstrap peer ID not available yet")
|
||||
return nil
|
||||
cm.logger.Debug("Bootstrap peer ID not available yet")
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Extract bootstrap host and cluster port from URL
|
||||
bootstrapHost, clusterPort, err := parseBootstrapHostAndPort(bootstrapAPIURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse bootstrap cluster API URL: %w", err)
|
||||
return false, fmt.Errorf("failed to parse bootstrap cluster API URL: %w", err)
|
||||
}
|
||||
|
||||
// Bootstrap listens on clusterPort + 2 (same pattern)
|
||||
@ -256,7 +262,13 @@ func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) err
|
||||
serviceJSONPath := filepath.Join(cm.clusterPath, "service.json")
|
||||
cfg, err := cm.loadOrCreateConfig(serviceJSONPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load config: %w", err)
|
||||
return false, fmt.Errorf("failed to load config: %w", err)
|
||||
}
|
||||
|
||||
// Check if we already have the correct address configured
|
||||
if len(cfg.Cluster.PeerAddresses) > 0 && cfg.Cluster.PeerAddresses[0] == bootstrapPeerAddr {
|
||||
cm.logger.Debug("Bootstrap peer address already correct", zap.String("addr", bootstrapPeerAddr))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Update peer_addresses
|
||||
@ -264,20 +276,234 @@ func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) err
|
||||
|
||||
// Save config
|
||||
if err := cm.saveConfig(serviceJSONPath, cfg); err != nil {
|
||||
return fmt.Errorf("failed to save config: %w", err)
|
||||
return false, fmt.Errorf("failed to save config: %w", err)
|
||||
}
|
||||
|
||||
// Write to peerstore file
|
||||
peerstorePath := filepath.Join(cm.clusterPath, "peerstore")
|
||||
if err := os.WriteFile(peerstorePath, []byte(bootstrapPeerAddr+"\n"), 0644); err != nil {
|
||||
return fmt.Errorf("failed to write peerstore: %w", err)
|
||||
return false, fmt.Errorf("failed to write peerstore: %w", err)
|
||||
}
|
||||
|
||||
cm.logger.Info("Updated bootstrap peer configuration",
|
||||
zap.String("bootstrap_peer_addr", bootstrapPeerAddr),
|
||||
zap.String("peerstore_path", peerstorePath))
|
||||
|
||||
return nil
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// UpdateAllClusterPeers discovers all cluster peers from the local cluster API
|
||||
// and updates peer_addresses in service.json. This allows IPFS Cluster to automatically
|
||||
// connect to all discovered peers in the cluster.
|
||||
// Returns true if update was successful, false if cluster is not available yet (non-fatal)
|
||||
func (cm *ClusterConfigManager) UpdateAllClusterPeers() (bool, error) {
|
||||
if cm.cfg.Database.IPFS.ClusterAPIURL == "" {
|
||||
return false, nil // IPFS not configured
|
||||
}
|
||||
|
||||
// Query local cluster API to get all peers
|
||||
client := &standardHTTPClient{}
|
||||
peersURL := fmt.Sprintf("%s/peers", cm.cfg.Database.IPFS.ClusterAPIURL)
|
||||
resp, err := client.Get(peersURL)
|
||||
if err != nil {
|
||||
// Non-fatal: cluster might not be available yet
|
||||
cm.logger.Debug("Cluster API not available yet, will retry",
|
||||
zap.String("peers_url", peersURL),
|
||||
zap.Error(err))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Parse NDJSON response
|
||||
dec := json.NewDecoder(bytes.NewReader(resp))
|
||||
var allPeerAddresses []string
|
||||
seenPeers := make(map[string]bool)
|
||||
peerIDToAddresses := make(map[string][]string)
|
||||
|
||||
// First pass: collect all peer IDs and their addresses
|
||||
for {
|
||||
var peerInfo struct {
|
||||
ID string `json:"id"`
|
||||
Addresses []string `json:"addresses"`
|
||||
ClusterPeers []string `json:"cluster_peers"`
|
||||
ClusterPeersAddresses []string `json:"cluster_peers_addresses"`
|
||||
}
|
||||
|
||||
err := dec.Decode(&peerInfo)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
cm.logger.Debug("Failed to decode peer info", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
// Store this peer's addresses
|
||||
if peerInfo.ID != "" {
|
||||
peerIDToAddresses[peerInfo.ID] = peerInfo.Addresses
|
||||
}
|
||||
|
||||
// Also collect cluster peers addresses if available
|
||||
// These are addresses of all peers in the cluster
|
||||
for _, addr := range peerInfo.ClusterPeersAddresses {
|
||||
if ma, err := multiaddr.NewMultiaddr(addr); err == nil {
|
||||
// Validate it has p2p component (peer ID)
|
||||
if _, err := ma.ValueForProtocol(multiaddr.P_P2P); err == nil {
|
||||
addrStr := ma.String()
|
||||
if !seenPeers[addrStr] {
|
||||
allPeerAddresses = append(allPeerAddresses, addrStr)
|
||||
seenPeers[addrStr] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we didn't get cluster_peers_addresses, try to construct them from peer IDs and addresses
|
||||
if len(allPeerAddresses) == 0 && len(peerIDToAddresses) > 0 {
|
||||
// Get cluster listen port from config
|
||||
serviceJSONPath := filepath.Join(cm.clusterPath, "service.json")
|
||||
cfg, err := cm.loadOrCreateConfig(serviceJSONPath)
|
||||
if err == nil && len(cfg.Cluster.ListenMultiaddress) > 0 {
|
||||
// Extract port from listen_multiaddress (e.g., "/ip4/0.0.0.0/tcp/9098")
|
||||
listenAddr := cfg.Cluster.ListenMultiaddress[0]
|
||||
if ma, err := multiaddr.NewMultiaddr(listenAddr); err == nil {
|
||||
if port, err := ma.ValueForProtocol(multiaddr.P_TCP); err == nil {
|
||||
// For each peer ID, try to find its IP address and construct cluster multiaddr
|
||||
for peerID, addresses := range peerIDToAddresses {
|
||||
// Try to find an IP address in the peer's addresses
|
||||
for _, addrStr := range addresses {
|
||||
if ma, err := multiaddr.NewMultiaddr(addrStr); err == nil {
|
||||
// Extract IP address (IPv4 or IPv6)
|
||||
if ip, err := ma.ValueForProtocol(multiaddr.P_IP4); err == nil && ip != "" {
|
||||
clusterAddr := fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", ip, port, peerID)
|
||||
if !seenPeers[clusterAddr] {
|
||||
allPeerAddresses = append(allPeerAddresses, clusterAddr)
|
||||
seenPeers[clusterAddr] = true
|
||||
}
|
||||
break
|
||||
} else if ip, err := ma.ValueForProtocol(multiaddr.P_IP6); err == nil && ip != "" {
|
||||
clusterAddr := fmt.Sprintf("/ip6/%s/tcp/%s/p2p/%s", ip, port, peerID)
|
||||
if !seenPeers[clusterAddr] {
|
||||
allPeerAddresses = append(allPeerAddresses, clusterAddr)
|
||||
seenPeers[clusterAddr] = true
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(allPeerAddresses) == 0 {
|
||||
cm.logger.Debug("No cluster peer addresses found in API response")
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Load current config
|
||||
serviceJSONPath := filepath.Join(cm.clusterPath, "service.json")
|
||||
cfg, err := cm.loadOrCreateConfig(serviceJSONPath)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to load config: %w", err)
|
||||
}
|
||||
|
||||
// Check if peer addresses have changed
|
||||
addressesChanged := false
|
||||
if len(cfg.Cluster.PeerAddresses) != len(allPeerAddresses) {
|
||||
addressesChanged = true
|
||||
} else {
|
||||
// Check if addresses are different
|
||||
currentAddrs := make(map[string]bool)
|
||||
for _, addr := range cfg.Cluster.PeerAddresses {
|
||||
currentAddrs[addr] = true
|
||||
}
|
||||
for _, addr := range allPeerAddresses {
|
||||
if !currentAddrs[addr] {
|
||||
addressesChanged = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !addressesChanged {
|
||||
cm.logger.Debug("Cluster peer addresses already up to date",
|
||||
zap.Int("peer_count", len(allPeerAddresses)))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Update peer_addresses
|
||||
cfg.Cluster.PeerAddresses = allPeerAddresses
|
||||
|
||||
// Save config
|
||||
if err := cm.saveConfig(serviceJSONPath, cfg); err != nil {
|
||||
return false, fmt.Errorf("failed to save config: %w", err)
|
||||
}
|
||||
|
||||
// Also update peerstore file
|
||||
peerstorePath := filepath.Join(cm.clusterPath, "peerstore")
|
||||
peerstoreContent := strings.Join(allPeerAddresses, "\n") + "\n"
|
||||
if err := os.WriteFile(peerstorePath, []byte(peerstoreContent), 0644); err != nil {
|
||||
cm.logger.Warn("Failed to update peerstore file", zap.Error(err))
|
||||
// Non-fatal, continue
|
||||
}
|
||||
|
||||
cm.logger.Info("Updated cluster peer addresses",
|
||||
zap.Int("peer_count", len(allPeerAddresses)),
|
||||
zap.Strings("peer_addresses", allPeerAddresses))
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// RepairBootstrapPeers automatically discovers and repairs bootstrap peer configuration
|
||||
// Tries multiple methods: config-based discovery, bootstrap peer multiaddr, or discovery service
|
||||
func (cm *ClusterConfigManager) RepairBootstrapPeers() (bool, error) {
|
||||
if cm.cfg.Database.IPFS.ClusterAPIURL == "" {
|
||||
return false, nil // IPFS not configured
|
||||
}
|
||||
|
||||
// Skip if this is the bootstrap node itself
|
||||
if cm.cfg.Node.Type == "bootstrap" {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Method 1: Try to use bootstrap API URL from config if available
|
||||
// Check if we have a bootstrap node's cluster API URL in discovery metadata
|
||||
// For now, we'll infer from bootstrap peers multiaddr
|
||||
|
||||
var bootstrapAPIURL string
|
||||
|
||||
// Try to extract from bootstrap peers multiaddr
|
||||
if len(cm.cfg.Discovery.BootstrapPeers) > 0 {
|
||||
if ip := extractIPFromMultiaddrForCluster(cm.cfg.Discovery.BootstrapPeers[0]); ip != "" {
|
||||
// Default cluster API port is 9094
|
||||
bootstrapAPIURL = fmt.Sprintf("http://%s:9094", ip)
|
||||
cm.logger.Debug("Inferred bootstrap cluster API from bootstrap peer",
|
||||
zap.String("bootstrap_api", bootstrapAPIURL))
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback to localhost if nothing found (for local development)
|
||||
if bootstrapAPIURL == "" {
|
||||
bootstrapAPIURL = "http://localhost:9094"
|
||||
cm.logger.Debug("Using localhost fallback for bootstrap cluster API")
|
||||
}
|
||||
|
||||
// Try to update bootstrap peers
|
||||
success, err := cm.UpdateBootstrapPeers(bootstrapAPIURL)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if success {
|
||||
cm.logger.Info("Successfully repaired bootstrap peer configuration")
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// If update failed (bootstrap not available), return false but no error
|
||||
// This allows retries later
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// loadOrCreateConfig loads existing service.json or creates a template
|
||||
@ -644,6 +870,28 @@ func (c *standardHTTPClient) Get(url string) ([]byte, error) {
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// extractIPFromMultiaddrForCluster extracts IP address from a LibP2P multiaddr string
|
||||
// Used for inferring bootstrap cluster API URL
|
||||
func extractIPFromMultiaddrForCluster(multiaddrStr string) string {
|
||||
// Parse multiaddr
|
||||
ma, err := multiaddr.NewMultiaddr(multiaddrStr)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
// Try to extract IPv4 address
|
||||
if ipv4, err := ma.ValueForProtocol(multiaddr.P_IP4); err == nil && ipv4 != "" {
|
||||
return ipv4
|
||||
}
|
||||
|
||||
// Try to extract IPv6 address
|
||||
if ipv6, err := ma.ValueForProtocol(multiaddr.P_IP6); err == nil && ipv6 != "" {
|
||||
return ipv6
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
// FixIPFSConfigAddresses fixes localhost addresses in IPFS config to use 127.0.0.1
|
||||
// This is necessary because IPFS doesn't accept "localhost" as a valid IP address in multiaddrs
|
||||
// This function always ensures the config is correct, regardless of current state
|
||||
|
||||
@ -10,6 +10,8 @@ import (
|
||||
"github.com/mackerelio/go-osstat/cpu"
|
||||
"github.com/mackerelio/go-osstat/memory"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/logging"
|
||||
)
|
||||
|
||||
func logPeerStatus(n *Node, currentPeerCount int, lastPeerCount int, firstCheck bool) (int, bool) {
|
||||
@ -91,13 +93,13 @@ func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory
|
||||
}
|
||||
|
||||
msg := struct {
|
||||
PeerID string `json:"peer_id"`
|
||||
PeerCount int `json:"peer_count"`
|
||||
PeerIDs []string `json:"peer_ids,omitempty"`
|
||||
CPU uint64 `json:"cpu_usage"`
|
||||
Memory uint64 `json:"memory_usage"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
ClusterHealth map[string]interface{} `json:"cluster_health,omitempty"`
|
||||
PeerID string `json:"peer_id"`
|
||||
PeerCount int `json:"peer_count"`
|
||||
PeerIDs []string `json:"peer_ids,omitempty"`
|
||||
CPU uint64 `json:"cpu_usage"`
|
||||
Memory uint64 `json:"memory_usage"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
ClusterHealth map[string]interface{} `json:"cluster_health,omitempty"`
|
||||
}{
|
||||
PeerID: n.host.ID().String(),
|
||||
PeerCount: len(peers),
|
||||
@ -210,6 +212,29 @@ func (n *Node) startConnectionMonitoring() {
|
||||
if err := announceMetrics(n, peers, cpuUsage, mem); err != nil {
|
||||
n.logger.Error("Failed to announce metrics", zap.Error(err))
|
||||
}
|
||||
|
||||
// Periodically update IPFS Cluster peer addresses
|
||||
// This discovers all cluster peers and updates peer_addresses in service.json
|
||||
// so IPFS Cluster can automatically connect to all discovered peers
|
||||
if n.clusterConfigManager != nil {
|
||||
// Update all cluster peers every 2 minutes to discover new peers
|
||||
if time.Now().Unix()%120 == 0 {
|
||||
if success, err := n.clusterConfigManager.UpdateAllClusterPeers(); err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "Failed to update cluster peers during monitoring", zap.Error(err))
|
||||
} else if success {
|
||||
n.logger.ComponentInfo(logging.ComponentNode, "Cluster peer addresses updated during monitoring")
|
||||
}
|
||||
|
||||
// Also try to repair bootstrap peers if this is not a bootstrap node
|
||||
if n.config.Node.Type != "bootstrap" {
|
||||
if success, err := n.clusterConfigManager.RepairBootstrapPeers(); err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "Failed to repair bootstrap peers during monitoring", zap.Error(err))
|
||||
} else if success {
|
||||
n.logger.ComponentInfo(logging.ComponentNode, "Bootstrap peer configuration repaired during monitoring")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@ -372,7 +372,7 @@ func (n *Node) startLibP2P() error {
|
||||
// For production, these would be enabled
|
||||
isLocalhost := len(n.config.Node.ListenAddresses) > 0 &&
|
||||
(strings.Contains(n.config.Node.ListenAddresses[0], "localhost") ||
|
||||
strings.Contains(n.config.Node.ListenAddresses[0], "localhost"))
|
||||
strings.Contains(n.config.Node.ListenAddresses[0], "127.0.0.1"))
|
||||
|
||||
if isLocalhost {
|
||||
n.logger.ComponentInfo(logging.ComponentLibP2P, "Localhost detected - disabling NAT services for local development")
|
||||
@ -732,19 +732,15 @@ func (n *Node) startIPFSClusterConfig() error {
|
||||
return fmt.Errorf("failed to ensure cluster config: %w", err)
|
||||
}
|
||||
|
||||
// If this is not the bootstrap node, try to update bootstrap peer info
|
||||
if n.config.Node.Type != "bootstrap" && len(n.config.Discovery.BootstrapPeers) > 0 {
|
||||
// Infer bootstrap cluster API URL from first bootstrap peer multiaddr
|
||||
bootstrapClusterAPI := "http://localhost:9094" // Default fallback
|
||||
if len(n.config.Discovery.BootstrapPeers) > 0 {
|
||||
// Extract IP from first bootstrap peer multiaddr
|
||||
if ip := extractIPFromMultiaddr(n.config.Discovery.BootstrapPeers[0]); ip != "" {
|
||||
bootstrapClusterAPI = fmt.Sprintf("http://%s:9094", ip)
|
||||
}
|
||||
}
|
||||
if err := cm.UpdateBootstrapPeers(bootstrapClusterAPI); err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "Failed to update bootstrap peers, will retry later", zap.Error(err))
|
||||
// Don't fail - peers can connect later via mDNS or manual config
|
||||
// Try to repair bootstrap peer configuration automatically
|
||||
// This will be retried periodically if bootstrap is not available yet
|
||||
if n.config.Node.Type != "bootstrap" {
|
||||
if success, err := cm.RepairBootstrapPeers(); err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "Failed to repair bootstrap peers, will retry later", zap.Error(err))
|
||||
} else if success {
|
||||
n.logger.ComponentInfo(logging.ComponentNode, "Bootstrap peer configuration repaired successfully")
|
||||
} else {
|
||||
n.logger.ComponentDebug(logging.ComponentNode, "Bootstrap peer not available yet, will retry periodically")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user