From 358de8a8ad7ff7243758e1748a415e0a7a228c48 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Thu, 13 Nov 2025 10:26:50 +0200 Subject: [PATCH] feat: enhance production service initialization and logging - Updated the `Phase2cInitializeServices` function to accept bootstrap peers and VPS IP, improving service configuration for non-bootstrap nodes. - Refactored the `handleProdInstall` and `handleProdUpgrade` functions to ensure proper initialization of services with the new parameters. - Improved logging to provide clearer feedback during service initialization and configuration, enhancing user experience and troubleshooting capabilities. --- CHANGELOG.md | 18 + Makefile | 2 +- pkg/cli/prod_commands.go | 17 +- pkg/discovery/discovery.go | 107 +++--- pkg/environments/production/installers.go | 55 ++- pkg/environments/production/orchestrator.go | 31 +- pkg/ipfs/cluster.go | 54 ++- pkg/logging/logger.go | 34 +- pkg/rqlite/cluster_discovery.go | 153 ++++---- pkg/rqlite/rqlite.go | 55 ++- scripts/test-cluster-health.sh | 379 ++++++++++++++++++++ 11 files changed, 745 insertions(+), 160 deletions(-) create mode 100755 scripts/test-cluster-health.sh diff --git a/CHANGELOG.md b/CHANGELOG.md index cdc29c3..8cd24ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,24 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant ### Deprecated ### Fixed +## [0.69.11] - 2025-11-13 + +### Added +- Added a new comprehensive shell script (`scripts/test-cluster-health.sh`) for checking the health and replication status of RQLite, IPFS, and IPFS Cluster across production environments. + +### Changed +- Improved RQLite cluster discovery logic to ensure `peers.json` is correctly generated and includes the local node, which is crucial for reliable cluster recovery. +- Refactored logging across discovery and RQLite components for cleaner, more concise output, especially for routine operations. +- Updated the installation and upgrade process to correctly configure IPFS Cluster bootstrap peers using the node's public IP, improving cluster formation reliability. + +### Deprecated + +### Removed + +### Fixed +- Fixed an issue where RQLite recovery operations (like clearing Raft state) did not correctly force the regeneration of `peers.json`, preventing successful cluster rejoin. +- Corrected the port calculation logic for IPFS Cluster to ensure the correct LibP2P listen port (9098) is used for bootstrap peer addressing. + ## [0.69.10] - 2025-11-13 ### Added diff --git a/Makefile b/Makefile index 8202e69..cc82e39 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill -VERSION := 0.69.10 +VERSION := 0.69.11 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/cli/prod_commands.go b/pkg/cli/prod_commands.go index 9d68483..9e25f91 100644 --- a/pkg/cli/prod_commands.go +++ b/pkg/cli/prod_commands.go @@ -264,7 +264,7 @@ func handleProdInstall(args []string) { // Phase 2c: Initialize services (after secrets are in place) fmt.Printf("\nPhase 2c: Initializing services...\n") - if err := setup.Phase2cInitializeServices(nodeType); err != nil { + if err := setup.Phase2cInitializeServices(nodeType, bootstrapPeers, *vpsIP); err != nil { fmt.Fprintf(os.Stderr, "āŒ Service initialization failed: %v\n", err) os.Exit(1) } @@ -447,13 +447,6 @@ func handleProdUpgrade(args []string) { nodeType = "bootstrap" // Default for upgrade if nothing exists } - // Phase 2c: Ensure services are properly initialized (fixes existing repos) - fmt.Printf("\nPhase 2c: Ensuring services are properly initialized...\n") - if err := setup.Phase2cInitializeServices(nodeType); err != nil { - fmt.Fprintf(os.Stderr, "āŒ Service initialization failed: %v\n", err) - os.Exit(1) - } - // Phase 3: Ensure secrets exist (preserves existing secrets) fmt.Printf("\nšŸ” Phase 3: Ensuring secrets...\n") if err := setup.Phase3GenerateSecrets(nodeType == "bootstrap"); err != nil { @@ -584,6 +577,14 @@ func handleProdUpgrade(args []string) { fmt.Printf(" - Bootstrap join address: %s\n", bootstrapJoin) } + // Phase 2c: Ensure services are properly initialized (fixes existing repos) + // Now that we have bootstrap peers and VPS IP, we can properly configure IPFS Cluster + fmt.Printf("\nPhase 2c: Ensuring services are properly initialized...\n") + if err := setup.Phase2cInitializeServices(nodeType, bootstrapPeers, vpsIP); err != nil { + fmt.Fprintf(os.Stderr, "āŒ Service initialization failed: %v\n", err) + os.Exit(1) + } + if err := setup.Phase4GenerateConfigs(nodeType == "bootstrap", bootstrapPeers, vpsIP, enableHTTPS, domain, bootstrapJoin); err != nil { fmt.Fprintf(os.Stderr, "āš ļø Config generation warning: %v\n", err) fmt.Fprintf(os.Stderr, " Existing configs preserved\n") diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index e9481af..5f9a41c 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -140,19 +140,9 @@ func (d *Manager) handlePeerExchangeStream(s network.Stream) { } } - // Log if addresses were filtered out - if filteredCount > 0 { - d.logger.Debug("Filtered out non-libp2p addresses", - zap.String("peer_id", pid.String()[:8]+"..."), - zap.Int("filtered_count", filteredCount), - zap.Int("valid_count", len(filteredAddrs))) - } - // If no addresses remain after filtering, skip this peer + // (Filtering is routine - no need to log every occurrence) if len(filteredAddrs) == 0 { - d.logger.Debug("No valid addresses after filtering", - zap.String("peer_id", pid.String()[:8]+"..."), - zap.Int("original_count", len(addrs))) continue } @@ -186,9 +176,7 @@ func (d *Manager) handlePeerExchangeStream(s network.Stream) { return } - d.logger.Debug("Sent peer exchange response", - zap.Int("peer_count", len(resp.Peers)), - zap.Bool("has_rqlite_metadata", resp.RQLiteMetadata != nil)) + // Response sent - routine operation, no need to log } // Start begins periodic peer discovery @@ -231,9 +219,6 @@ func (d *Manager) discoverPeers(ctx context.Context, config Config) { connectedPeers := d.host.Network().Peers() initialCount := len(connectedPeers) - d.logger.Debug("Starting peer discovery", - zap.Int("current_peers", initialCount)) - newConnections := 0 // Strategy 1: Try to connect to peers learned from the host's peerstore @@ -246,11 +231,12 @@ func (d *Manager) discoverPeers(ctx context.Context, config Config) { finalPeerCount := len(d.host.Network().Peers()) + // Summary log: only log if there were changes or new connections if newConnections > 0 || finalPeerCount != initialCount { - d.logger.Debug("Peer discovery completed", - zap.Int("new_connections", newConnections), - zap.Int("initial_peers", initialCount), - zap.Int("final_peers", finalPeerCount)) + d.logger.Debug("Discovery summary", + zap.Int("connected", finalPeerCount), + zap.Int("new", newConnections), + zap.Int("was", initialCount)) } } @@ -265,7 +251,10 @@ func (d *Manager) discoverViaPeerstore(ctx context.Context, maxConnections int) // Iterate over peerstore known peers peers := d.host.Peerstore().Peers() - d.logger.Debug("Peerstore contains peers", zap.Int("count", len(peers))) + + // Only connect to peers on our standard LibP2P port to avoid cross-connecting + // with IPFS/IPFS Cluster instances that use different ports + const libp2pPort = 4001 for _, pid := range peers { if connected >= maxConnections { @@ -280,6 +269,24 @@ func (d *Manager) discoverViaPeerstore(ctx context.Context, maxConnections int) continue } + // Filter peers to only include those with addresses on our port (4001) + // This prevents attempting to connect to IPFS (port 4101) or IPFS Cluster (port 9096) + peerInfo := d.host.Peerstore().PeerInfo(pid) + hasValidPort := false + for _, addr := range peerInfo.Addrs { + if port, err := addr.ValueForProtocol(multiaddr.P_TCP); err == nil { + if portNum, err := strconv.Atoi(port); err == nil && portNum == libp2pPort { + hasValidPort = true + break + } + } + } + + // Skip peers without valid port 4001 addresses + if !hasValidPort { + continue + } + // Try to connect if err := d.connectToPeer(ctx, pid); err == nil { connected++ @@ -302,8 +309,8 @@ func (d *Manager) discoverViaPeerExchange(ctx context.Context, maxConnections in return 0 } - d.logger.Debug("Starting peer exchange with connected peers", - zap.Int("num_peers", len(connectedPeers))) + exchangedPeers := 0 + metadataCollected := 0 for _, peerID := range connectedPeers { if connected >= maxConnections { @@ -316,9 +323,13 @@ func (d *Manager) discoverViaPeerExchange(ctx context.Context, maxConnections in continue } - d.logger.Debug("Received peer list from peer", - zap.String("from_peer", peerID.String()[:8]+"..."), - zap.Int("peer_count", len(peers))) + exchangedPeers++ + // Check if we got RQLite metadata + if val, err := d.host.Peerstore().Get(peerID, "rqlite_metadata"); err == nil { + if _, ok := val.([]byte); ok { + metadataCollected++ + } + } // Try to connect to discovered peers for _, peerInfo := range peers { @@ -365,9 +376,7 @@ func (d *Manager) discoverViaPeerExchange(ctx context.Context, maxConnections in } if len(addrs) == 0 { - d.logger.Debug("No valid libp2p addresses (port 4001) for peer", - zap.String("peer_id", parsedID.String()[:8]+"..."), - zap.Int("total_addresses", len(peerInfo.Addrs))) + // Skip peers without valid addresses - no need to log every occurrence continue } @@ -380,20 +389,29 @@ func (d *Manager) discoverViaPeerExchange(ctx context.Context, maxConnections in if err := d.host.Connect(connectCtx, peerAddrInfo); err != nil { cancel() - d.logger.Debug("Failed to connect to discovered peer", - zap.String("peer_id", parsedID.String()[:8]+"..."), + // Only log connection failures for debugging - errors are still useful + d.logger.Debug("Connect failed", + zap.String("peer", parsedID.String()[:8]+"..."), zap.Error(err)) continue } cancel() - d.logger.Info("Successfully connected to discovered peer", - zap.String("peer_id", parsedID.String()[:8]+"..."), - zap.String("discovered_from", peerID.String()[:8]+"...")) + d.logger.Info("Connected", + zap.String("peer", parsedID.String()[:8]+"..."), + zap.String("from", peerID.String()[:8]+"...")) connected++ } } + // Summary log for peer exchange + if exchangedPeers > 0 { + d.logger.Debug("Exchange summary", + zap.Int("exchanged_with", exchangedPeers), + zap.Int("metadata_collected", metadataCollected), + zap.Int("new_connections", connected)) + } + return connected } @@ -446,9 +464,10 @@ func (d *Manager) requestPeersFromPeer(ctx context.Context, peerID peer.ID, limi metadataJSON, err := json.Marshal(resp.RQLiteMetadata) if err == nil { _ = d.host.Peerstore().Put(peerID, "rqlite_metadata", metadataJSON) - d.logger.Debug("Stored RQLite metadata from peer", - zap.String("peer_id", peerID.String()[:8]+"..."), - zap.String("node_id", resp.RQLiteMetadata.NodeID)) + // Only log when new metadata is stored (useful for debugging) + d.logger.Debug("Metadata stored", + zap.String("peer", peerID.String()[:8]+"..."), + zap.String("node", resp.RQLiteMetadata.NodeID)) } } @@ -464,9 +483,6 @@ func (d *Manager) TriggerPeerExchange(ctx context.Context) int { return 0 } - d.logger.Info("Manually triggering peer exchange", - zap.Int("connected_peers", len(connectedPeers))) - metadataCollected := 0 for _, peerID := range connectedPeers { // Request peer list from this peer (which includes their RQLite metadata) @@ -480,9 +496,9 @@ func (d *Manager) TriggerPeerExchange(ctx context.Context) int { } } - d.logger.Info("Peer exchange completed", - zap.Int("peers_with_metadata", metadataCollected), - zap.Int("total_peers", len(connectedPeers))) + d.logger.Info("Exchange completed", + zap.Int("peers", len(connectedPeers)), + zap.Int("with_metadata", metadataCollected)) return metadataCollected } @@ -502,8 +518,7 @@ func (d *Manager) connectToPeer(ctx context.Context, peerID peer.ID) error { return err } - d.logger.Debug("Successfully connected to peer", - zap.String("peer_id", peerID.String()[:8]+"...")) + // Connection success logged at higher level - no need for duplicate DEBUG log return nil } diff --git a/pkg/environments/production/installers.go b/pkg/environments/production/installers.go index f4596ae..72a2a93 100644 --- a/pkg/environments/production/installers.go +++ b/pkg/environments/production/installers.go @@ -532,7 +532,8 @@ func (bi *BinaryInstaller) configureIPFSAddresses(ipfsRepoPath string, apiPort, // InitializeIPFSClusterConfig initializes IPFS Cluster configuration // This runs `ipfs-cluster-service init` to create the service.json configuration file. // For existing installations, it ensures the cluster secret is up to date. -func (bi *BinaryInstaller) InitializeIPFSClusterConfig(nodeType, clusterPath, clusterSecret string, ipfsAPIPort int) error { +// bootstrapClusterPeers should be in format: ["/ip4//tcp/9098/p2p/"] +func (bi *BinaryInstaller) InitializeIPFSClusterConfig(nodeType, clusterPath, clusterSecret string, ipfsAPIPort int, bootstrapClusterPeers []string) error { serviceJSONPath := filepath.Join(clusterPath, "service.json") configExists := false if _, err := os.Stat(serviceJSONPath); err == nil { @@ -567,11 +568,11 @@ func (bi *BinaryInstaller) InitializeIPFSClusterConfig(nodeType, clusterPath, cl } } - // Always update the cluster secret and IPFS port (for both new and existing configs) + // Always update the cluster secret, IPFS port, and peer addresses (for both new and existing configs) // This ensures existing installations get the secret and port synchronized if clusterSecret != "" { - fmt.Fprintf(bi.logWriter.(interface{ Write([]byte) (int, error) }), " Updating cluster secret and IPFS port...\n") - if err := bi.updateClusterConfig(clusterPath, clusterSecret, ipfsAPIPort); err != nil { + fmt.Fprintf(bi.logWriter.(interface{ Write([]byte) (int, error) }), " Updating cluster secret, IPFS port, and peer addresses...\n") + if err := bi.updateClusterConfig(clusterPath, clusterSecret, ipfsAPIPort, bootstrapClusterPeers); err != nil { return fmt.Errorf("failed to update cluster config: %w", err) } } @@ -582,8 +583,8 @@ func (bi *BinaryInstaller) InitializeIPFSClusterConfig(nodeType, clusterPath, cl return nil } -// updateClusterConfig updates the secret and IPFS port in IPFS Cluster service.json -func (bi *BinaryInstaller) updateClusterConfig(clusterPath, secret string, ipfsAPIPort int) error { +// updateClusterConfig updates the secret, IPFS port, and peer addresses in IPFS Cluster service.json +func (bi *BinaryInstaller) updateClusterConfig(clusterPath, secret string, ipfsAPIPort int, bootstrapClusterPeers []string) error { serviceJSONPath := filepath.Join(clusterPath, "service.json") // Read existing config @@ -598,13 +599,22 @@ func (bi *BinaryInstaller) updateClusterConfig(clusterPath, secret string, ipfsA return fmt.Errorf("failed to parse service.json: %w", err) } - // Update cluster secret + // Update cluster secret and peer addresses if cluster, ok := config["cluster"].(map[string]interface{}); ok { cluster["secret"] = secret + // Configure peer addresses for cluster discovery + // This allows nodes to find and connect to each other + if len(bootstrapClusterPeers) > 0 { + cluster["peer_addresses"] = bootstrapClusterPeers + } } else { - config["cluster"] = map[string]interface{}{ + clusterConfig := map[string]interface{}{ "secret": secret, } + if len(bootstrapClusterPeers) > 0 { + clusterConfig["peer_addresses"] = bootstrapClusterPeers + } + config["cluster"] = clusterConfig } // Update IPFS port in IPFS Proxy configuration @@ -635,6 +645,35 @@ func (bi *BinaryInstaller) updateClusterConfig(clusterPath, secret string, ipfsA return nil } +// GetClusterPeerMultiaddr reads the IPFS Cluster peer ID and returns its multiaddress +// Returns format: /ip4//tcp/9098/p2p/ +func (bi *BinaryInstaller) GetClusterPeerMultiaddr(clusterPath string, nodeIP string) (string, error) { + identityPath := filepath.Join(clusterPath, "identity.json") + + // Read identity file + data, err := os.ReadFile(identityPath) + if err != nil { + return "", fmt.Errorf("failed to read identity.json: %w", err) + } + + // Parse JSON + var identity map[string]interface{} + if err := json.Unmarshal(data, &identity); err != nil { + return "", fmt.Errorf("failed to parse identity.json: %w", err) + } + + // Get peer ID + peerID, ok := identity["id"].(string) + if !ok || peerID == "" { + return "", fmt.Errorf("peer ID not found in identity.json") + } + + // Construct multiaddress: /ip4//tcp/9098/p2p/ + // Port 9098 is the default cluster listen port + multiaddr := fmt.Sprintf("/ip4/%s/tcp/9098/p2p/%s", nodeIP, peerID) + return multiaddr, nil +} + // InitializeRQLiteDataDir initializes RQLite data directory func (bi *BinaryInstaller) InitializeRQLiteDataDir(nodeType, dataDir string) error { fmt.Fprintf(bi.logWriter.(interface{ Write([]byte) (int, error) }), " Initializing RQLite data dir for %s...\n", nodeType) diff --git a/pkg/environments/production/orchestrator.go b/pkg/environments/production/orchestrator.go index e84615e..f05fd76 100644 --- a/pkg/environments/production/orchestrator.go +++ b/pkg/environments/production/orchestrator.go @@ -265,7 +265,7 @@ func (ps *ProductionSetup) Phase2bInstallBinaries() error { } // Phase2cInitializeServices initializes service repositories and configurations -func (ps *ProductionSetup) Phase2cInitializeServices(nodeType string) error { +func (ps *ProductionSetup) Phase2cInitializeServices(nodeType string, bootstrapPeers []string, vpsIP string) error { ps.logf("Phase 2c: Initializing services...") // Ensure node-specific directories exist @@ -289,7 +289,34 @@ func (ps *ProductionSetup) Phase2cInitializeServices(nodeType string) error { if err != nil { return fmt.Errorf("failed to get cluster secret: %w", err) } - if err := ps.binaryInstaller.InitializeIPFSClusterConfig(nodeType, clusterPath, clusterSecret, 4501); err != nil { + + // Get bootstrap cluster peer addresses for non-bootstrap nodes + var bootstrapClusterPeers []string + if nodeType != "bootstrap" && len(bootstrapPeers) > 0 { + // Try to read bootstrap cluster peer ID and construct multiaddress + bootstrapClusterPath := filepath.Join(ps.debrosDir, "data", "bootstrap", "ipfs-cluster") + + // Infer bootstrap IP from bootstrap peers + bootstrapIP := inferBootstrapIP(bootstrapPeers, vpsIP) + if bootstrapIP != "" { + // Check if bootstrap cluster identity exists + if _, err := os.Stat(filepath.Join(bootstrapClusterPath, "identity.json")); err == nil { + // Bootstrap cluster is initialized, get its multiaddress + if clusterMultiaddr, err := ps.binaryInstaller.GetClusterPeerMultiaddr(bootstrapClusterPath, bootstrapIP); err == nil { + bootstrapClusterPeers = []string{clusterMultiaddr} + ps.logf(" ā„¹ļø Configured IPFS Cluster to connect to bootstrap: %s", clusterMultiaddr) + } else { + ps.logf(" āš ļø Could not read bootstrap cluster peer ID: %v", err) + ps.logf(" āš ļø IPFS Cluster will rely on mDNS discovery (may not work across internet)") + } + } else { + ps.logf(" ā„¹ļø Bootstrap cluster not yet initialized, peer_addresses will be empty") + ps.logf(" ā„¹ļø IPFS Cluster will rely on mDNS discovery (may not work across internet)") + } + } + } + + if err := ps.binaryInstaller.InitializeIPFSClusterConfig(nodeType, clusterPath, clusterSecret, 4501, bootstrapClusterPeers); err != nil { return fmt.Errorf("failed to initialize IPFS Cluster: %w", err) } diff --git a/pkg/ipfs/cluster.go b/pkg/ipfs/cluster.go index f0306ac..ca7cd93 100644 --- a/pkg/ipfs/cluster.go +++ b/pkg/ipfs/cluster.go @@ -161,9 +161,15 @@ func (cm *ClusterConfigManager) EnsureConfig() error { } // Calculate ports based on pattern - proxyPort := clusterPort - 1 - pinSvcPort := clusterPort + 1 - clusterListenPort := clusterPort + 2 + // REST API: 9094 + // Proxy: 9094 - 1 = 9093 (NOT USED - keeping for reference) + // PinSvc: 9094 + 1 = 9095 + // Proxy API: 9094 + 1 = 9095 (actual proxy port) + // PinSvc API: 9094 + 3 = 9097 + // Cluster LibP2P: 9094 + 4 = 9098 + proxyPort := clusterPort + 1 // 9095 (IPFSProxy API) + pinSvcPort := clusterPort + 3 // 9097 (PinSvc API) + clusterListenPort := clusterPort + 4 // 9098 (Cluster LibP2P) // If config doesn't exist, initialize it with ipfs-cluster-service init // This ensures we have all required sections (datastore, informer, etc.) @@ -246,8 +252,9 @@ func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) (bo return false, fmt.Errorf("failed to parse bootstrap cluster API URL: %w", err) } - // Bootstrap listens on clusterPort + 2 (same pattern) - bootstrapClusterPort := clusterPort + 2 + // Bootstrap cluster LibP2P listens on clusterPort + 4 + // (REST API is 9094, LibP2P is 9098 = 9094 + 4) + bootstrapClusterPort := clusterPort + 4 // Determine IP protocol (ip4 or ip6) based on the host var ipProtocol string @@ -266,17 +273,31 @@ func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) (bo return false, fmt.Errorf("failed to load config: %w", err) } - // Check if we already have the correct address configured - if len(cfg.Cluster.PeerAddresses) > 0 && cfg.Cluster.PeerAddresses[0] == bootstrapPeerAddr { - cm.logger.Debug("Bootstrap peer address already correct", zap.String("addr", bootstrapPeerAddr)) - return true, nil + // CRITICAL: Always update peerstore file to ensure no stale addresses remain + // Stale addresses (e.g., from old port configurations) cause LibP2P dial backoff, + // preventing cluster peers from connecting even if the correct address is present. + // We must clean and rewrite the peerstore on every update to avoid this. + peerstorePath := filepath.Join(cm.clusterPath, "peerstore") + + // Check if peerstore needs updating (avoid unnecessary writes but always clean stale entries) + needsUpdate := true + if peerstoreData, err := os.ReadFile(peerstorePath); err == nil { + // Only skip update if peerstore contains EXACTLY the correct address and nothing else + existingAddrs := strings.Split(strings.TrimSpace(string(peerstoreData)), "\n") + if len(existingAddrs) == 1 && strings.TrimSpace(existingAddrs[0]) == bootstrapPeerAddr { + cm.logger.Debug("Bootstrap peer address already correct in peerstore", zap.String("addr", bootstrapPeerAddr)) + needsUpdate = false + } } - // Update peerstore file FIRST - this is what IPFS Cluster reads for bootstrapping - // Peerstore is the source of truth, service.json is just for our tracking - peerstorePath := filepath.Join(cm.clusterPath, "peerstore") - if err := os.WriteFile(peerstorePath, []byte(bootstrapPeerAddr+"\n"), 0644); err != nil { - return false, fmt.Errorf("failed to write peerstore: %w", err) + if needsUpdate { + // Write ONLY the correct bootstrap peer address, removing any stale entries + if err := os.WriteFile(peerstorePath, []byte(bootstrapPeerAddr+"\n"), 0644); err != nil { + return false, fmt.Errorf("failed to write peerstore: %w", err) + } + cm.logger.Info("Updated peerstore with bootstrap peer (cleaned stale entries)", + zap.String("addr", bootstrapPeerAddr), + zap.String("peerstore_path", peerstorePath)) } // Then sync service.json from peerstore to keep them in sync @@ -852,8 +873,9 @@ func parseClusterPorts(clusterAPIURL string) (clusterPort, restAPIPort int, err return 0, 0, fmt.Errorf("invalid port: %s", portStr) } - // Cluster listen port is typically REST API port + 2 - clusterPort = restAPIPort + 2 + // clusterPort is used as the base port for calculations + // The actual cluster LibP2P listen port is calculated as clusterPort + 4 + clusterPort = restAPIPort return clusterPort, restAPIPort, nil } diff --git a/pkg/logging/logger.go b/pkg/logging/logger.go index 8ee1d87..5b78c67 100644 --- a/pkg/logging/logger.go +++ b/pkg/logging/logger.go @@ -101,8 +101,10 @@ func getLevelColor(level zapcore.Level) string { // coloredConsoleEncoder creates a custom encoder with colors func coloredConsoleEncoder(enableColors bool) zapcore.Encoder { config := zap.NewDevelopmentEncoderConfig() + + // Ultra-short timestamp: HH:MM:SS (no milliseconds, no date, no timezone) config.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) { - timeStr := t.Format("2006-01-02T15:04:05.000Z0700") + timeStr := t.Format("15:04:05") if enableColors { enc.AppendString(fmt.Sprintf("%s%s%s", Dim, timeStr, Reset)) } else { @@ -110,21 +112,41 @@ func coloredConsoleEncoder(enableColors bool) zapcore.Encoder { } } + // Single letter level: D, I, W, E config.EncodeLevel = func(level zapcore.Level, enc zapcore.PrimitiveArrayEncoder) { - levelStr := strings.ToUpper(level.String()) + levelMap := map[zapcore.Level]string{ + zapcore.DebugLevel: "D", + zapcore.InfoLevel: "I", + zapcore.WarnLevel: "W", + zapcore.ErrorLevel: "E", + } + levelStr := levelMap[level] + if levelStr == "" { + levelStr = "?" + } if enableColors { color := getLevelColor(level) - enc.AppendString(fmt.Sprintf("%s%s%-5s%s", color, Bold, levelStr, Reset)) + enc.AppendString(fmt.Sprintf("%s%s%s%s", color, Bold, levelStr, Reset)) } else { - enc.AppendString(fmt.Sprintf("%-5s", levelStr)) + enc.AppendString(levelStr) } } + // Just filename, no line number for cleaner output config.EncodeCaller = func(caller zapcore.EntryCaller, enc zapcore.PrimitiveArrayEncoder) { + file := caller.File + // Extract just the filename from the path + if idx := strings.LastIndex(file, "/"); idx >= 0 { + file = file[idx+1:] + } + // Remove .go extension for even more compact format + if strings.HasSuffix(file, ".go") { + file = file[:len(file)-3] + } if enableColors { - enc.AppendString(fmt.Sprintf("%s%s%s", Dim, caller.TrimmedPath(), Reset)) + enc.AppendString(fmt.Sprintf("%s%s%s", Dim, file, Reset)) } else { - enc.AppendString(caller.TrimmedPath()) + enc.AppendString(file) } } diff --git a/pkg/rqlite/cluster_discovery.go b/pkg/rqlite/cluster_discovery.go index 790e396..a1f6e8b 100644 --- a/pkg/rqlite/cluster_discovery.go +++ b/pkg/rqlite/cluster_discovery.go @@ -166,8 +166,7 @@ func (c *ClusterDiscoveryService) collectPeerMetadata() []*discovery.RQLiteNodeM connectedPeers := c.host.Network().Peers() var metadata []*discovery.RQLiteNodeMetadata - c.logger.Debug("Collecting peer metadata from LibP2P", - zap.Int("connected_libp2p_peers", len(connectedPeers))) + // Metadata collection is routine - no need to log every occurrence c.mu.RLock() currentRaftAddr := c.raftAddress @@ -240,9 +239,6 @@ type membershipUpdateResult struct { func (c *ClusterDiscoveryService) updateClusterMembership() { metadata := c.collectPeerMetadata() - c.logger.Debug("Collected peer metadata", - zap.Int("metadata_count", len(metadata))) - // Compute membership changes while holding lock c.mu.Lock() result := c.computeMembershipChangesLocked(metadata) @@ -252,35 +248,30 @@ func (c *ClusterDiscoveryService) updateClusterMembership() { if result.changed { // Log state changes (peer added/removed) at Info level if len(result.added) > 0 || len(result.updated) > 0 { - c.logger.Info("Cluster membership changed", + c.logger.Info("Membership changed", zap.Int("added", len(result.added)), zap.Int("updated", len(result.updated)), - zap.Strings("added_ids", result.added), - zap.Strings("updated_ids", result.updated)) + zap.Strings("added", result.added), + zap.Strings("updated", result.updated)) } // Write peers.json without holding lock if err := c.writePeersJSONWithData(result.peersJSON); err != nil { - c.logger.Error("CRITICAL: Failed to write peers.json", + c.logger.Error("Failed to write peers.json", zap.Error(err), zap.String("data_dir", c.dataDir), - zap.Int("peer_count", len(result.peersJSON))) + zap.Int("peers", len(result.peersJSON))) } else { c.logger.Debug("peers.json updated", - zap.Int("peer_count", len(result.peersJSON))) + zap.Int("peers", len(result.peersJSON))) } // Update lastUpdate timestamp c.mu.Lock() c.lastUpdate = time.Now() c.mu.Unlock() - } else { - c.mu.RLock() - totalPeers := len(c.knownPeers) - c.mu.RUnlock() - c.logger.Debug("No changes to cluster membership", - zap.Int("total_peers", totalPeers)) } + // No changes - don't log (reduces noise) } // computeMembershipChangesLocked computes membership changes and returns snapshot data @@ -305,10 +296,10 @@ func (c *ClusterDiscoveryService) computeMembershipChangesLocked(metadata []*dis } else { // New peer discovered added = append(added, meta.NodeID) - c.logger.Info("Node added to cluster", - zap.String("node_id", meta.NodeID), - zap.String("raft_address", meta.RaftAddress), - zap.String("node_type", meta.NodeType), + c.logger.Info("Node added", + zap.String("node", meta.NodeID), + zap.String("raft", meta.RaftAddress), + zap.String("type", meta.NodeType), zap.Uint64("log_index", meta.RaftLogIndex)) } @@ -354,11 +345,10 @@ func (c *ClusterDiscoveryService) computeMembershipChangesLocked(metadata []*dis requiredRemotePeers := c.minClusterSize - 1 if remotePeerCount < requiredRemotePeers { - c.logger.Info("Skipping initial peers.json write - not enough remote peers discovered", - zap.Int("remote_peers", remotePeerCount), - zap.Int("required_remote_peers", requiredRemotePeers), - zap.Int("min_cluster_size", c.minClusterSize), - zap.String("action", "will write when enough peers are discovered")) + c.logger.Info("Waiting for peers", + zap.Int("have", remotePeerCount), + zap.Int("need", requiredRemotePeers), + zap.Int("min_size", c.minClusterSize)) return membershipUpdateResult{ changed: false, } @@ -367,8 +357,7 @@ func (c *ClusterDiscoveryService) computeMembershipChangesLocked(metadata []*dis // Additional safety check: don't write empty peers.json (would cause single-node cluster) if len(peers) == 0 && c.lastUpdate.IsZero() { - c.logger.Info("Skipping peers.json write - no remote peers to include", - zap.String("action", "will write when peers are discovered")) + c.logger.Info("No remote peers - waiting") return membershipUpdateResult{ changed: false, } @@ -376,10 +365,10 @@ func (c *ClusterDiscoveryService) computeMembershipChangesLocked(metadata []*dis // Log initial sync if this is the first time if c.lastUpdate.IsZero() { - c.logger.Info("Initial cluster membership sync", - zap.Int("total_peers", len(c.knownPeers)), - zap.Int("remote_peers", remotePeerCount), - zap.Int("peers_in_json", len(peers))) + c.logger.Info("Initial sync", + zap.Int("total", len(c.knownPeers)), + zap.Int("remote", remotePeerCount), + zap.Int("in_json", len(peers))) } return membershipUpdateResult{ @@ -408,8 +397,8 @@ func (c *ClusterDiscoveryService) removeInactivePeers() { if inactiveDuration > c.inactivityLimit { // Mark as inactive and remove - c.logger.Warn("Node removed from cluster", - zap.String("node_id", nodeID), + c.logger.Warn("Node removed", + zap.String("node", nodeID), zap.String("reason", "inactive"), zap.Duration("inactive_duration", inactiveDuration)) @@ -421,9 +410,9 @@ func (c *ClusterDiscoveryService) removeInactivePeers() { // Regenerate peers.json if any peers were removed if len(removed) > 0 { - c.logger.Info("Removed inactive nodes, regenerating peers.json", - zap.Int("removed", len(removed)), - zap.Strings("node_ids", removed)) + c.logger.Info("Removed inactive", + zap.Int("count", len(removed)), + zap.Strings("nodes", removed)) if err := c.writePeersJSON(); err != nil { c.logger.Error("Failed to write peers.json after cleanup", zap.Error(err)) @@ -443,10 +432,11 @@ func (c *ClusterDiscoveryService) getPeersJSONUnlocked() []map[string]interface{ peers := make([]map[string]interface{}, 0, len(c.knownPeers)) for _, peer := range c.knownPeers { - // Skip self - RQLite knows about itself, shouldn't be in peers.json - if peer.NodeID == c.raftAddress { - continue - } + // CRITICAL FIX: Include ALL peers (including self) in peers.json + // When using bootstrap-expect with recovery, RQLite needs the complete + // expected cluster configuration to properly form consensus. + // The peers.json file is used by RQLite's recovery mechanism to know + // what the full cluster membership should be, including the local node. peerEntry := map[string]interface{}{ "id": peer.RaftAddress, // RQLite uses raft address as node ID "address": peer.RaftAddress, @@ -482,11 +472,7 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte // Get the RQLite raft directory rqliteDir := filepath.Join(dataDir, "rqlite", "raft") - c.logger.Debug("Writing peers.json", - zap.String("data_dir", c.dataDir), - zap.String("expanded_path", dataDir), - zap.String("raft_dir", rqliteDir), - zap.Int("peer_count", len(peers))) + // Writing peers.json - routine operation, no need to log details if err := os.MkdirAll(rqliteDir, 0755); err != nil { return fmt.Errorf("failed to create raft directory %s: %w", rqliteDir, err) @@ -497,7 +483,7 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte // Backup existing peers.json if it exists if _, err := os.Stat(peersFile); err == nil { - c.logger.Debug("Backing up existing peers.json", zap.String("backup_file", backupFile)) + // Backup existing peers.json if it exists - routine operation data, err := os.ReadFile(peersFile) if err == nil { _ = os.WriteFile(backupFile, data, 0644) @@ -510,7 +496,7 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte return fmt.Errorf("failed to marshal peers.json: %w", err) } - c.logger.Debug("Marshaled peers.json", zap.Int("data_size", len(data))) + // Marshaled peers.json - routine operation // Write atomically using temp file + rename tempFile := peersFile + ".tmp" @@ -530,9 +516,8 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte } c.logger.Info("peers.json written", - zap.String("file", peersFile), - zap.Int("node_count", len(peers)), - zap.Strings("node_ids", nodeIDs)) + zap.Int("peers", len(peers)), + zap.Strings("nodes", nodeIDs)) return nil } @@ -648,17 +633,58 @@ func (c *ClusterDiscoveryService) WaitForDiscoverySettling(ctx context.Context) // TriggerSync manually triggers a cluster membership sync func (c *ClusterDiscoveryService) TriggerSync() { - c.logger.Info("Manually triggering cluster membership sync") - // For bootstrap nodes, wait a bit for peer discovery to stabilize if c.nodeType == "bootstrap" { - c.logger.Info("Bootstrap node: waiting for peer discovery to complete") time.Sleep(5 * time.Second) } c.updateClusterMembership() } +// ForceWritePeersJSON forces writing peers.json regardless of membership changes +// This is useful after clearing raft state when we need to recreate peers.json +func (c *ClusterDiscoveryService) ForceWritePeersJSON() error { + c.logger.Info("Force writing peers.json") + + // First, collect latest peer metadata to ensure we have current information + metadata := c.collectPeerMetadata() + + // Update known peers with latest metadata (without writing file yet) + c.mu.Lock() + for _, meta := range metadata { + c.knownPeers[meta.NodeID] = meta + // Update health tracking for remote peers + if meta.NodeID != c.raftAddress { + if _, ok := c.peerHealth[meta.NodeID]; !ok { + c.peerHealth[meta.NodeID] = &PeerHealth{ + LastSeen: time.Now(), + LastSuccessful: time.Now(), + Status: "active", + } + } else { + c.peerHealth[meta.NodeID].LastSeen = time.Now() + c.peerHealth[meta.NodeID].Status = "active" + } + } + } + peers := c.getPeersJSONUnlocked() + c.mu.Unlock() + + // Now force write the file + if err := c.writePeersJSONWithData(peers); err != nil { + c.logger.Error("Failed to force write peers.json", + zap.Error(err), + zap.String("data_dir", c.dataDir), + zap.Int("peers", len(peers))) + return err + } + + c.logger.Info("peers.json written", + zap.Int("peers", len(peers))) + + return nil +} + // TriggerPeerExchange actively exchanges peer information with connected peers // This populates the peerstore with RQLite metadata from other nodes func (c *ClusterDiscoveryService) TriggerPeerExchange(ctx context.Context) error { @@ -666,9 +692,8 @@ func (c *ClusterDiscoveryService) TriggerPeerExchange(ctx context.Context) error return fmt.Errorf("discovery manager not available") } - c.logger.Info("Triggering peer exchange via discovery manager") collected := c.discoveryMgr.TriggerPeerExchange(ctx) - c.logger.Info("Peer exchange completed", zap.Int("peers_with_metadata", collected)) + c.logger.Debug("Exchange completed", zap.Int("with_metadata", collected)) return nil } @@ -709,8 +734,8 @@ func (c *ClusterDiscoveryService) UpdateOwnMetadata() { return } - c.logger.Debug("Updated own RQLite metadata", - zap.String("node_id", metadata.NodeID), + c.logger.Debug("Metadata updated", + zap.String("node", metadata.NodeID), zap.Uint64("log_index", metadata.RaftLogIndex)) } @@ -740,9 +765,9 @@ func (c *ClusterDiscoveryService) StoreRemotePeerMetadata(peerID peer.ID, metada return fmt.Errorf("failed to store metadata: %w", err) } - c.logger.Debug("Stored remote peer metadata", - zap.String("peer_id", shortPeerID(peerID)), - zap.String("node_id", metadata.NodeID)) + c.logger.Debug("Metadata stored", + zap.String("peer", shortPeerID(peerID)), + zap.String("node", metadata.NodeID)) return nil } @@ -758,9 +783,9 @@ func (c *ClusterDiscoveryService) adjustPeerAdvertisedAddresses(peerID peer.ID, changed, stale := rewriteAdvertisedAddresses(meta, ip, true) if changed { - c.logger.Debug("Normalized peer advertised RQLite addresses", - zap.String("peer_id", shortPeerID(peerID)), - zap.String("raft_address", meta.RaftAddress), + c.logger.Debug("Addresses normalized", + zap.String("peer", shortPeerID(peerID)), + zap.String("raft", meta.RaftAddress), zap.String("http_address", meta.HTTPAddress)) } return changed, stale diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index fd83a83..5af5d7c 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -34,11 +34,6 @@ type RQLiteManager struct { // waitForSQLAvailable waits until a simple query succeeds, indicating a leader is known and queries can be served. func (r *RQLiteManager) waitForSQLAvailable(ctx context.Context) error { - if r.connection == nil { - r.logger.Error("No rqlite connection") - return errors.New("no rqlite connection") - } - ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() @@ -48,6 +43,16 @@ func (r *RQLiteManager) waitForSQLAvailable(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case <-ticker.C: + // Check for nil connection inside the loop to handle cases where + // connection becomes nil during restart/recovery operations + if r.connection == nil { + attempts++ + if attempts%5 == 0 { // log every ~5s to reduce noise + r.logger.Debug("Waiting for RQLite connection to be established") + } + continue + } + attempts++ _, err := r.connection.QueryOne("SELECT 1") if err == nil { @@ -417,6 +422,15 @@ func (r *RQLiteManager) establishLeadershipOrJoin(ctx context.Context, rqliteDat if err := r.clearRaftState(rqliteDataDir); err != nil { r.logger.Error("Failed to clear Raft state", zap.Error(err)) } else { + // Force write peers.json after clearing state + if r.discoveryService != nil { + r.logger.Info("Force writing peers.json after clearing state for configuration mismatch recovery") + if err := r.discoveryService.ForceWritePeersJSON(); err != nil { + r.logger.Error("Failed to force write peers.json", zap.Error(err)) + } + // Update peersPath after force write + peersPath = filepath.Join(rqliteDataDir, "raft", "peers.json") + } // Restart RQLite with clean state r.logger.Info("Raft state cleared, restarting RQLite for clean rejoin") if recoveryErr := r.recoverCluster(ctx, peersPath); recoveryErr == nil { @@ -1279,15 +1293,30 @@ func (r *RQLiteManager) recoverFromSplitBrain(ctx context.Context) error { return fmt.Errorf("failed to clear Raft state: %w", err) } - // Step 5: Ensure peers.json exists with all discovered peers - r.discoveryService.TriggerSync() - time.Sleep(2 * time.Second) + // Step 5: Refresh peer metadata and force write peers.json + // We trigger peer exchange again to ensure we have the absolute latest metadata + // after clearing state, then force write peers.json regardless of changes + r.logger.Info("Refreshing peer metadata after clearing raft state") + r.discoveryService.TriggerPeerExchange(ctx) + time.Sleep(1 * time.Second) // Brief wait for peer exchange to complete + r.logger.Info("Force writing peers.json with all discovered peers") + // We use ForceWritePeersJSON instead of TriggerSync because TriggerSync + // only writes if membership changed, but after clearing state we need + // to write regardless of changes + if err := r.discoveryService.ForceWritePeersJSON(); err != nil { + return fmt.Errorf("failed to force write peers.json: %w", err) + } + + // Verify peers.json was created peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json") if _, err := os.Stat(peersPath); err != nil { - return fmt.Errorf("peers.json not created: %w", err) + return fmt.Errorf("peers.json not created after force write: %w", err) } + r.logger.Info("peers.json verified after force write", + zap.String("peers_path", peersPath)) + // Step 6: Restart RQLite to pick up new peers.json r.logger.Info("Restarting RQLite to apply new cluster configuration") if err := r.recoverCluster(ctx, peersPath); err != nil { @@ -1435,6 +1464,14 @@ func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rql if err := r.clearRaftState(rqliteDataDir); err != nil { r.logger.Error("Failed to clear Raft state", zap.Error(err)) // Continue anyway - rqlite might still be able to recover + } else { + // Force write peers.json after clearing stale state + if r.discoveryService != nil { + r.logger.Info("Force writing peers.json after clearing stale Raft state") + if err := r.discoveryService.ForceWritePeersJSON(); err != nil { + r.logger.Error("Failed to force write peers.json after clearing stale state", zap.Error(err)) + } + } } } } diff --git a/scripts/test-cluster-health.sh b/scripts/test-cluster-health.sh new file mode 100755 index 0000000..9589ecd --- /dev/null +++ b/scripts/test-cluster-health.sh @@ -0,0 +1,379 @@ +#!/bin/bash + +# Production Cluster Health Check Script +# Tests RQLite, IPFS, and IPFS Cluster connectivity and replication + +# Note: We don't use 'set -e' here because we want to continue testing even if individual checks fail + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Node IPs - Update these if needed +BOOTSTRAP="${BOOTSTRAP:-51.83.128.181}" +NODE1="${NODE1:-57.128.223.92}" +NODE2="${NODE2:-185.185.83.89}" + +ALL_NODES=($BOOTSTRAP $NODE1 $NODE2) + +# Counters +PASSED=0 +FAILED=0 +WARNINGS=0 + +# Helper functions +print_header() { + echo "" + echo -e "${BLUE}========================================${NC}" + echo -e "${BLUE}$1${NC}" + echo -e "${BLUE}========================================${NC}" +} + +print_test() { + echo -e "${YELLOW}ā–¶ $1${NC}" +} + +print_pass() { + echo -e "${GREEN}āœ“ $1${NC}" + PASSED=$((PASSED + 1)) +} + +print_fail() { + echo -e "${RED}āœ— $1${NC}" + FAILED=$((FAILED + 1)) +} + +print_warn() { + echo -e "${YELLOW}⚠ $1${NC}" + WARNINGS=$((WARNINGS + 1)) +} + +print_info() { + echo -e " $1" +} + +# Test functions +test_rqlite_status() { + print_header "1. RQLITE CLUSTER STATUS" + + local leader_found=false + local follower_count=0 + local commit_indices=() + + for i in "${!ALL_NODES[@]}"; do + local node="${ALL_NODES[$i]}" + print_test "Testing RQLite on $node" + + if ! response=$(curl -s --max-time 5 http://$node:5001/status 2>/dev/null); then + print_fail "Cannot connect to RQLite on $node:5001" + continue + fi + + local state=$(echo "$response" | jq -r '.store.raft.state // "unknown"') + local num_peers=$(echo "$response" | jq -r '.store.raft.num_peers // 0') + local commit_index=$(echo "$response" | jq -r '.store.raft.commit_index // 0') + local last_contact=$(echo "$response" | jq -r '.store.raft.last_contact // "N/A"') + local config=$(echo "$response" | jq -r '.store.raft.latest_configuration // "[]"') + local node_count=$(echo "$config" | grep -o "Address" | wc -l | tr -d ' ') + + commit_indices+=($commit_index) + + print_info "State: $state | Peers: $num_peers | Commit Index: $commit_index | Cluster Nodes: $node_count" + + # Check state + if [ "$state" = "Leader" ]; then + leader_found=true + print_pass "Node $node is the Leader" + elif [ "$state" = "Follower" ]; then + follower_count=$((follower_count + 1)) + # Check last contact + if [ "$last_contact" != "N/A" ] && [ "$last_contact" != "0" ]; then + print_pass "Node $node is a Follower (last contact: $last_contact)" + else + print_warn "Node $node is Follower but last_contact is $last_contact" + fi + else + print_fail "Node $node has unexpected state: $state" + fi + + # Check peer count + if [ "$num_peers" = "2" ]; then + print_pass "Node $node has correct peer count: 2" + else + print_fail "Node $node has incorrect peer count: $num_peers (expected 2)" + fi + + # Check cluster configuration + if [ "$node_count" = "3" ]; then + print_pass "Node $node sees all 3 cluster members" + else + print_fail "Node $node only sees $node_count cluster members (expected 3)" + fi + + echo "" + done + + # Check for exactly 1 leader + if [ "$leader_found" = true ] && [ "$follower_count" = "2" ]; then + print_pass "Cluster has 1 Leader and 2 Followers āœ“" + else + print_fail "Invalid cluster state (Leader found: $leader_found, Followers: $follower_count)" + fi + + # Check commit index sync + if [ ${#commit_indices[@]} -eq 3 ]; then + local first="${commit_indices[0]}" + local all_same=true + for idx in "${commit_indices[@]}"; do + if [ "$idx" != "$first" ]; then + all_same=false + break + fi + done + + if [ "$all_same" = true ]; then + print_pass "All nodes have synced commit index: $first" + else + print_warn "Commit indices differ: ${commit_indices[*]} (might be normal if writes are happening)" + fi + fi +} + +test_rqlite_replication() { + print_header "2. RQLITE REPLICATION TEST" + + print_test "Creating test table and inserting data on leader ($BOOTSTRAP)" + + # Create table + if ! response=$(curl -s --max-time 5 -XPOST "http://$BOOTSTRAP:5001/db/execute" \ + -H "Content-Type: application/json" \ + -d '[["CREATE TABLE IF NOT EXISTS test_cluster_health (id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, node TEXT, value TEXT)"]]' 2>/dev/null); then + print_fail "Failed to create table" + return + fi + + if echo "$response" | jq -e '.results[0].error' >/dev/null 2>&1; then + local error=$(echo "$response" | jq -r '.results[0].error') + if [[ "$error" != "table test_cluster_health already exists" ]]; then + print_fail "Table creation error: $error" + return + fi + fi + print_pass "Table exists" + + # Insert test data + local test_value="test_$(date +%s)" + if ! response=$(curl -s --max-time 5 -XPOST "http://$BOOTSTRAP:5001/db/execute" \ + -H "Content-Type: application/json" \ + -d "[ + [\"INSERT INTO test_cluster_health (timestamp, node, value) VALUES (datetime('now'), 'bootstrap', '$test_value')\"] + ]" 2>/dev/null); then + print_fail "Failed to insert data" + return + fi + + if echo "$response" | jq -e '.results[0].error' >/dev/null 2>&1; then + local error=$(echo "$response" | jq -r '.results[0].error') + print_fail "Insert error: $error" + return + fi + print_pass "Data inserted: $test_value" + + # Wait for replication + print_info "Waiting 2 seconds for replication..." + sleep 2 + + # Query from all nodes + for node in "${ALL_NODES[@]}"; do + print_test "Reading from $node" + + if ! response=$(curl -s --max-time 5 -XPOST "http://$node:5001/db/query?level=weak" \ + -H "Content-Type: application/json" \ + -d "[\"SELECT * FROM test_cluster_health WHERE value = '$test_value' LIMIT 1\"]" 2>/dev/null); then + print_fail "Failed to query from $node" + continue + fi + + if echo "$response" | jq -e '.results[0].error' >/dev/null 2>&1; then + local error=$(echo "$response" | jq -r '.results[0].error') + print_fail "Query error on $node: $error" + continue + fi + + local row_count=$(echo "$response" | jq -r '.results[0].values | length // 0') + if [ "$row_count" = "1" ]; then + local retrieved_value=$(echo "$response" | jq -r '.results[0].values[0][3] // ""') + if [ "$retrieved_value" = "$test_value" ]; then + print_pass "Data replicated correctly to $node" + else + print_fail "Data mismatch on $node (got: $retrieved_value, expected: $test_value)" + fi + else + print_fail "Expected 1 row from $node, got $row_count" + fi + done +} + +test_ipfs_status() { + print_header "3. IPFS DAEMON STATUS" + + for node in "${ALL_NODES[@]}"; do + print_test "Testing IPFS on $node" + + if ! response=$(curl -s --max-time 5 -X POST http://$node:4501/api/v0/id 2>/dev/null); then + print_fail "Cannot connect to IPFS on $node:4501" + continue + fi + + local peer_id=$(echo "$response" | jq -r '.ID // "unknown"') + local addr_count=$(echo "$response" | jq -r '.Addresses | length // 0') + local agent=$(echo "$response" | jq -r '.AgentVersion // "unknown"') + + if [ "$peer_id" != "unknown" ]; then + print_pass "IPFS running on $node (ID: ${peer_id:0:12}...)" + print_info "Agent: $agent | Addresses: $addr_count" + else + print_fail "IPFS not responding correctly on $node" + fi + done +} + +test_ipfs_swarm() { + print_header "4. IPFS SWARM CONNECTIVITY" + + for node in "${ALL_NODES[@]}"; do + print_test "Checking IPFS swarm peers on $node" + + if ! response=$(curl -s --max-time 5 -X POST http://$node:4501/api/v0/swarm/peers 2>/dev/null); then + print_fail "Failed to get swarm peers from $node" + continue + fi + + local peer_count=$(echo "$response" | jq -r '.Peers | length // 0') + + if [ "$peer_count" = "2" ]; then + print_pass "Node $node connected to 2 IPFS peers" + elif [ "$peer_count" -gt "0" ]; then + print_warn "Node $node connected to $peer_count IPFS peers (expected 2)" + else + print_fail "Node $node has no IPFS swarm peers" + fi + done +} + +test_ipfs_cluster_status() { + print_header "5. IPFS CLUSTER STATUS" + + for node in "${ALL_NODES[@]}"; do + print_test "Testing IPFS Cluster on $node" + + if ! response=$(curl -s --max-time 5 http://$node:9094/id 2>/dev/null); then + print_fail "Cannot connect to IPFS Cluster on $node:9094" + continue + fi + + local cluster_id=$(echo "$response" | jq -r '.id // "unknown"') + local cluster_peers=$(echo "$response" | jq -r '.cluster_peers | length // 0') + local version=$(echo "$response" | jq -r '.version // "unknown"') + + if [ "$cluster_id" != "unknown" ]; then + print_pass "IPFS Cluster running on $node (ID: ${cluster_id:0:12}...)" + print_info "Version: $version | Cluster Peers: $cluster_peers" + + if [ "$cluster_peers" = "3" ]; then + print_pass "Node $node sees all 3 cluster peers" + else + print_warn "Node $node sees $cluster_peers cluster peers (expected 3)" + fi + else + print_fail "IPFS Cluster not responding correctly on $node" + fi + done +} + +test_ipfs_cluster_pins() { + print_header "6. IPFS CLUSTER PIN CONSISTENCY" + + local pin_counts=() + + for node in "${ALL_NODES[@]}"; do + print_test "Checking pins on $node" + + if ! response=$(curl -s --max-time 5 http://$node:9094/pins 2>/dev/null); then + print_fail "Failed to get pins from $node" + pin_counts+=(0) + continue + fi + + local pin_count=$(echo "$response" | jq -r 'length // 0') + pin_counts+=($pin_count) + print_pass "Node $node has $pin_count pins" + done + + # Check if all nodes have same pin count + if [ ${#pin_counts[@]} -eq 3 ]; then + local first="${pin_counts[0]}" + local all_same=true + for count in "${pin_counts[@]}"; do + if [ "$count" != "$first" ]; then + all_same=false + break + fi + done + + if [ "$all_same" = true ]; then + print_pass "All nodes have consistent pin count: $first" + else + print_warn "Pin counts differ: ${pin_counts[*]} (might be syncing)" + fi + fi +} + +print_summary() { + print_header "TEST SUMMARY" + + echo "" + echo -e "${GREEN}Passed: $PASSED${NC}" + echo -e "${YELLOW}Warnings: $WARNINGS${NC}" + echo -e "${RED}Failed: $FAILED${NC}" + echo "" + + if [ $FAILED -eq 0 ]; then + echo -e "${GREEN}šŸŽ‰ All critical tests passed! Cluster is healthy.${NC}" + exit 0 + elif [ $FAILED -le 2 ]; then + echo -e "${YELLOW}āš ļø Some tests failed. Review the output above.${NC}" + exit 1 + else + echo -e "${RED}āŒ Multiple failures detected. Cluster needs attention.${NC}" + exit 2 + fi +} + +# Main execution +main() { + echo "" + echo -e "${BLUE}╔════════════════════════════════════════════╗${NC}" + echo -e "${BLUE}ā•‘ DEBROS Production Cluster Health Check ā•‘${NC}" + echo -e "${BLUE}ā•šā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•${NC}" + echo "" + echo "Testing cluster:" + echo " Bootstrap: $BOOTSTRAP" + echo " Node 1: $NODE1" + echo " Node 2: $NODE2" + + test_rqlite_status + test_rqlite_replication + test_ipfs_status + test_ipfs_swarm + test_ipfs_cluster_status + test_ipfs_cluster_pins + print_summary +} + +# Run main +main +