diff --git a/Makefile b/Makefile index 95e84de..a5ad0da 100644 --- a/Makefile +++ b/Makefile @@ -33,14 +33,14 @@ run-node: run-node2: @echo "Starting REGULAR node2 (role=node)..." @if [ -z "$(BOOTSTRAP)" ]; then echo "ERROR: Provide BOOTSTRAP multiaddr: make run-node2 BOOTSTRAP=/ip4/127.0.0.1/tcp/4001/p2p/ [HTTP=5002 RAFT=7002]"; exit 1; fi - go run cmd/node/main.go -role node -id node2 -data ./data/node2 -bootstrap $(BOOTSTRAP) -rqlite-http-port ${HTTP-5002} -rqlite-raft-port ${RAFT-7002} + go run cmd/node/main.go -role node -id node2 -data ./data/node2 -bootstrap $(BOOTSTRAP) -rqlite-http-port $${HTTP:-5002} -rqlite-raft-port $${RAFT:-7002} # Run third node (regular) - requires BOOTSTRAP multiaddr # Usage: make run-node3 BOOTSTRAP=/ip4/127.0.0.1/tcp/4001/p2p/ HTTP=5003 RAFT=7003 run-node3: @echo "Starting REGULAR node3 (role=node)..." @if [ -z "$(BOOTSTRAP)" ]; then echo "ERROR: Provide BOOTSTRAP multiaddr: make run-node3 BOOTSTRAP=/ip4/127.0.0.1/tcp/4001/p2p/ [HTTP=5003 RAFT=7003]"; exit 1; fi - go run cmd/node/main.go -role node -id node3 -data ./data/node3 -bootstrap $(BOOTSTRAP) -rqlite-http-port ${HTTP-5003} -rqlite-raft-port ${RAFT-7003} + go run cmd/node/main.go -role node -id node3 -data ./data/node3 -bootstrap $(BOOTSTRAP) -rqlite-http-port $${HTTP:-5003} -rqlite-raft-port $${RAFT:-7003} # Show how to run with flags show-bootstrap: diff --git a/cmd/node/main.go b/cmd/node/main.go index 8da0843..d9c6d28 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "net" "log" "os" "os/exec" @@ -112,10 +113,10 @@ func main() { } if isSecondaryBootstrap { - // Secondary bootstrap nodes join the primary bootstrap + // Secondary bootstrap nodes join the primary bootstrap Raft address (standardized to 7001) primaryBootstrapHost := parseHostFromMultiaddr(bootstrapPeers[0]) - cfg.Database.RQLiteJoinAddress = fmt.Sprintf("http://%s:%d", primaryBootstrapHost, *rqlHTTP) - logger.Printf("Secondary bootstrap node - joining primary bootstrap at: %s", cfg.Database.RQLiteJoinAddress) + cfg.Database.RQLiteJoinAddress = fmt.Sprintf("%s:%d", primaryBootstrapHost, 7001) + logger.Printf("Secondary bootstrap node - joining primary bootstrap (raft) at: %s", cfg.Database.RQLiteJoinAddress) } else { // Primary bootstrap node doesn't join anyone - it starts the cluster cfg.Database.RQLiteJoinAddress = "" @@ -123,18 +124,28 @@ func main() { } } else { // Configure bootstrap peers for P2P discovery - var rqliteJoinAddr string + var rqliteJoinAddr string // host:port for Raft join if *bootstrap != "" { // Use command line bootstrap if provided cfg.Discovery.BootstrapPeers = []string{*bootstrap} - // Extract IP from bootstrap peer for RQLite join + // Extract IP from bootstrap peer for RQLite bootstrapHost := parseHostFromMultiaddr(*bootstrap) if bootstrapHost != "" { - rqliteJoinAddr = fmt.Sprintf("http://%s:%d", bootstrapHost, *rqlHTTP) - logger.Printf("Using extracted bootstrap host %s for RQLite join", bootstrapHost) + // If user provided localhost for libp2p, translate to this host's primary IP so rqlite can join the correct process. + if bootstrapHost == "127.0.0.1" || strings.EqualFold(bootstrapHost, "localhost") { + if extIP, err := getPreferredLocalIP(); err == nil && extIP != "" { + logger.Printf("Translating localhost bootstrap to external IP %s for RQLite join", extIP) + bootstrapHost = extIP + } else { + logger.Printf("Warning: Failed to resolve external IP, keeping localhost for RQLite join") + } + } + // Regular nodes should join the bootstrap's RQLite Raft port (standardized to 7001) + rqliteJoinAddr = fmt.Sprintf("%s:%d", bootstrapHost, 7001) + logger.Printf("Using extracted bootstrap host %s for RQLite Raft join (port 7001)", bootstrapHost) } else { logger.Printf("Warning: Could not extract host from bootstrap peer %s, using localhost fallback", *bootstrap) - rqliteJoinAddr = fmt.Sprintf("http://localhost:%d", *rqlHTTP) // Use localhost fallback instead + rqliteJoinAddr = fmt.Sprintf("localhost:%d", 7001) // Use localhost raft fallback instead } logger.Printf("Using command line bootstrap peer: %s", *bootstrap) } else { @@ -145,30 +156,30 @@ func main() { // Use the first bootstrap peer for RQLite join bootstrapHost := parseHostFromMultiaddr(bootstrapPeers[0]) if bootstrapHost != "" { - rqliteJoinAddr = fmt.Sprintf("http://%s:5001", bootstrapHost) - logger.Printf("Using extracted bootstrap host %s for RQLite join", bootstrapHost) + rqliteJoinAddr = fmt.Sprintf("%s:%d", bootstrapHost, 7001) + logger.Printf("Using extracted bootstrap host %s for RQLite Raft join", bootstrapHost) } else { logger.Printf("Warning: Could not extract host from bootstrap peer %s", bootstrapPeers[0]) // Try primary production server as fallback - rqliteJoinAddr = "http://localhost:5001" + rqliteJoinAddr = "localhost:7001" } - logger.Printf("Using environment bootstrap peers: %v", bootstrapPeers) + logger.Printf("Using environment bootstrap peers: %v", bootstrapPeers) } else { logger.Printf("Warning: No bootstrap peers configured") // Default to localhost when no peers configured - rqliteJoinAddr = "http://localhost:5001" - logger.Printf("Using localhost fallback for RQLite join") + rqliteJoinAddr = "localhost:7001" + logger.Printf("Using localhost fallback for RQLite Raft join") } // Log network connectivity diagnostics logger.Printf("=== NETWORK DIAGNOSTICS ===") - logger.Printf("Target RQLite join address: %s", rqliteJoinAddr) + logger.Printf("Target RQLite Raft join address: %s", rqliteJoinAddr) runNetworkDiagnostics(rqliteJoinAddr, logger) } // Regular nodes join the bootstrap node's RQLite cluster cfg.Database.RQLiteJoinAddress = rqliteJoinAddr - logger.Printf("Regular node - joining RQLite cluster at: %s", cfg.Database.RQLiteJoinAddress) + logger.Printf("Regular node - joining RQLite cluster (raft) at: %s", cfg.Database.RQLiteJoinAddress) } logger.Printf("Data directory: %s", cfg.Node.DataDir) @@ -236,40 +247,86 @@ func isBootstrapNode() bool { return false } -// parseHostFromMultiaddr extracts the host from a multiaddr -func parseHostFromMultiaddr(multiaddr string) string { - // Simple parsing for /ip4/host/tcp/port/p2p/peerid format - parts := strings.Split(multiaddr, "/") - - // Look for ip4/ip6/dns host in the multiaddr - for i, part := range parts { - if (part == "ip4" || part == "ip6" || part == "dns" || part == "dns4" || part == "dns6") && i+1 < len(parts) { - return parts[i+1] - } - } - return "" +// getPreferredLocalIP returns a non-loopback IPv4 address of this machine +func getPreferredLocalIP() (string, error) { + ifaces, err := net.Interfaces() + if err != nil { + return "", err + } + for _, iface := range ifaces { + if (iface.Flags&net.FlagUp) == 0 || (iface.Flags&net.FlagLoopback) != 0 { + continue + } + addrs, err := iface.Addrs() + if err != nil { + continue + } + for _, addr := range addrs { + var ip net.IP + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + } + if ip == nil || ip.IsLoopback() { + continue + } + ip = ip.To4() + if ip == nil { + continue + } + return ip.String(), nil + } + } + return "", fmt.Errorf("no non-loopback IPv4 found") } // isLocalIP checks if the given IP address belongs to this machine func isLocalIP(ip string) bool { - // Try to run ip command to get local IPs - if output, err := exec.Command("ip", "addr", "show").Output(); err == nil { - if strings.Contains(string(output), ip) { - return true - } - } - - // Fallback: try hostname -I command - if output, err := exec.Command("hostname", "-I").Output(); err == nil { - ips := strings.Fields(strings.TrimSpace(string(output))) - for _, localIP := range ips { - if localIP == ip { - return true - } - } - } - - return false + if ip == "127.0.0.1" || strings.EqualFold(ip, "localhost") { + return true + } + ifaces, err := net.Interfaces() + if err != nil { + return false + } + for _, iface := range ifaces { + if (iface.Flags&net.FlagUp) == 0 { + continue + } + addrs, err := iface.Addrs() + if err != nil { + continue + } + for _, addr := range addrs { + var a net.IP + switch v := addr.(type) { + case *net.IPNet: + a = v.IP + case *net.IPAddr: + a = v.IP + } + if a != nil && a.String() == ip { + return true + } + } + } + return false +} + +// parseHostFromMultiaddr extracts the host from a multiaddr +func parseHostFromMultiaddr(multiaddr string) string { + // Simple parsing for /ip4/host/tcp/port/p2p/peerid format + parts := strings.Split(multiaddr, "/") + + // Look for ip4/ip6/dns host in the multiaddr + for i, part := range parts { + if (part == "ip4" || part == "ip6" || part == "dns" || part == "dns4" || part == "dns6") && i+1 < len(parts) { + return parts[i+1] + } + } + return "" } func startNode(ctx context.Context, cfg *config.Config, port int, isBootstrap bool, logger *logging.StandardLogger) error { @@ -307,73 +364,74 @@ func startNode(ctx context.Context, cfg *config.Config, port int, isBootstrap bo } // runNetworkDiagnostics performs network connectivity tests -func runNetworkDiagnostics(rqliteJoinAddr string, logger *logging.StandardLogger) { - // Extract host and port from the join address - if !strings.HasPrefix(rqliteJoinAddr, "http://") { - logger.Printf("Invalid join address format: %s", rqliteJoinAddr) - return - } - - // Parse URL to extract host:port - url := strings.TrimPrefix(rqliteJoinAddr, "http://") - parts := strings.Split(url, ":") - if len(parts) != 2 { - logger.Printf("Cannot parse host:port from %s", rqliteJoinAddr) - return - } - - host := parts[0] - port := parts[1] - - logger.Printf("Testing connectivity to %s:%s", host, port) - - // Test 1: Basic connectivity with netcat or telnet - if output, err := exec.Command("timeout", "5", "nc", "-z", "-v", host, port).CombinedOutput(); err == nil { - logger.Printf("✅ Port %s:%s is reachable", host, port) - logger.Printf("netcat output: %s", strings.TrimSpace(string(output))) - } else { - logger.Printf("❌ Port %s:%s is NOT reachable", host, port) - logger.Printf("netcat error: %v", err) - logger.Printf("netcat output: %s", strings.TrimSpace(string(output))) - } - - // Test 2: HTTP connectivity test - if output, err := exec.Command("timeout", "5", "curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", rqliteJoinAddr+"/status").Output(); err == nil { - httpCode := strings.TrimSpace(string(output)) - if httpCode == "200" { - logger.Printf("✅ HTTP service is responding correctly (status: %s)", httpCode) - } else { - logger.Printf("⚠️ HTTP service responded with status: %s", httpCode) - } - } else { - logger.Printf("❌ HTTP request failed: %v", err) - } - - // Test 3: Ping test - if output, err := exec.Command("ping", "-c", "3", "-W", "2", host).Output(); err == nil { - lines := strings.Split(string(output), "\n") - for _, line := range lines { - if strings.Contains(line, "packet loss") { - logger.Printf("🏓 Ping result: %s", strings.TrimSpace(line)) - break - } - } - } else { - logger.Printf("❌ Ping test failed: %v", err) - } - - // Test 4: DNS resolution - if output, err := exec.Command("nslookup", host).Output(); err == nil { - logger.Printf("🔍 DNS resolution successful") - lines := strings.Split(string(output), "\n") - for _, line := range lines { - if strings.Contains(line, "Address:") && !strings.Contains(line, "#53") { - logger.Printf("DNS result: %s", strings.TrimSpace(line)) - } - } - } else { - logger.Printf("❌ DNS resolution failed: %v", err) - } - - logger.Printf("=== END DIAGNOSTICS ===") +func runNetworkDiagnostics(target string, logger *logging.StandardLogger) { + // If target has scheme, treat as HTTP URL. Otherwise treat as host:port raft. + var host, port string + if strings.HasPrefix(target, "http://") || strings.HasPrefix(target, "https://") { + url := strings.TrimPrefix(strings.TrimPrefix(target, "http://"), "https://") + parts := strings.Split(url, ":") + if len(parts) == 2 { + host, port = parts[0], parts[1] + } + } else { + parts := strings.Split(target, ":") + if len(parts) == 2 { + host, port = parts[0], parts[1] + } + } + if host == "" || port == "" { + logger.Printf("Cannot parse host:port from %s", target) + return + } + + logger.Printf("Testing TCP connectivity to %s:%s", host, port) + if output, err := exec.Command("timeout", "5", "nc", "-z", "-v", host, port).CombinedOutput(); err == nil { + logger.Printf("✅ Port %s:%s is reachable", host, port) + logger.Printf("netcat output: %s", strings.TrimSpace(string(output))) + } else { + logger.Printf("❌ Port %s:%s is NOT reachable", host, port) + logger.Printf("netcat error: %v", err) + logger.Printf("netcat output: %s", strings.TrimSpace(string(output))) + } + + // Also probe HTTP status on port 5001 of the same host, which is the default HTTP API + httpURL := fmt.Sprintf("http://%s:%d/status", host, 5001) + if output, err := exec.Command("timeout", "5", "curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", httpURL).Output(); err == nil { + httpCode := strings.TrimSpace(string(output)) + if httpCode == "200" { + logger.Printf("✅ HTTP service on %s is responding correctly (status: %s)", httpURL, httpCode) + } else { + logger.Printf("⚠️ HTTP service on %s responded with status: %s", httpURL, httpCode) + } + } else { + logger.Printf("❌ HTTP request to %s failed: %v", httpURL, err) + } + + // Ping test + if output, err := exec.Command("ping", "-c", "3", "-W", "2", host).Output(); err == nil { + lines := strings.Split(string(output), "\n") + for _, line := range lines { + if strings.Contains(line, "packet loss") { + logger.Printf("🏓 Ping result: %s", strings.TrimSpace(line)) + break + } + } + } else { + logger.Printf("❌ Ping test failed: %v", err) + } + + // DNS resolution + if output, err := exec.Command("nslookup", host).Output(); err == nil { + logger.Printf("🔍 DNS resolution successful") + lines := strings.Split(string(output), "\n") + for _, line := range lines { + if strings.Contains(line, "Address:") && !strings.Contains(line, "#53") { + logger.Printf("DNS result: %s", strings.TrimSpace(line)) + } + } + } else { + logger.Printf("❌ DNS resolution failed: %v", err) + } + + logger.Printf("=== END DIAGNOSTICS ===") } diff --git a/pkg/database/rqlite.go b/pkg/database/rqlite.go index a8e5750..53458ee 100644 --- a/pkg/database/rqlite.go +++ b/pkg/database/rqlite.go @@ -67,23 +67,26 @@ func (r *RQLiteManager) Start(ctx context.Context) error { // Add join address if specified (for non-bootstrap or secondary bootstrap nodes) if r.config.RQLiteJoinAddress != "" { r.logger.Info("Joining RQLite cluster", zap.String("join_address", r.config.RQLiteJoinAddress)) - - // Validate join address format before using it - if strings.HasPrefix(r.config.RQLiteJoinAddress, "http://") { - // Test connectivity and log the results, but always attempt to join - if err := r.testJoinAddress(r.config.RQLiteJoinAddress); err != nil { - r.logger.Warn("Join address connectivity test failed, but will still attempt to join", - zap.String("join_address", r.config.RQLiteJoinAddress), - zap.Error(err)) - } else { - r.logger.Info("Join address is reachable, proceeding with cluster join") - } - // Always add the join parameter - let RQLite handle retries - args = append(args, "-join", r.config.RQLiteJoinAddress) - } else { - r.logger.Warn("Invalid join address format, skipping join", zap.String("address", r.config.RQLiteJoinAddress)) - return fmt.Errorf("invalid RQLite join address format: %s (must start with http://)", r.config.RQLiteJoinAddress) + + // Normalize join address to host:port for rqlited -join + joinArg := r.config.RQLiteJoinAddress + if strings.HasPrefix(joinArg, "http://") { + joinArg = strings.TrimPrefix(joinArg, "http://") + } else if strings.HasPrefix(joinArg, "https://") { + joinArg = strings.TrimPrefix(joinArg, "https://") } + + // Test connectivity (HTTP status) on the leader's HTTP port derived from join host + if err := r.testJoinAddress(joinArg); err != nil { + r.logger.Warn("Join target connectivity test failed, but will still attempt to join", + zap.String("join_address", r.config.RQLiteJoinAddress), + zap.Error(err)) + } else { + r.logger.Info("Join target is reachable, proceeding with cluster join") + } + + // Always add the join parameter in host:port form - let rqlited handle the rest + args = append(args, "-join", joinArg) } else { r.logger.Info("No join address specified - starting as new cluster") } @@ -302,24 +305,34 @@ func (r *RQLiteManager) getExternalIP() (string, error) { // testJoinAddress tests if a join address is reachable func (r *RQLiteManager) testJoinAddress(joinAddress string) error { - // Test connection to the join address with a short timeout + // Determine the HTTP status URL to probe. + // If joinAddress contains a scheme, use it directly. Otherwise treat joinAddress + // as host:port (Raft) and probe the standard HTTP API port 5001 on that host. client := &http.Client{Timeout: 5 * time.Second} - - // Try to connect to the status endpoint - statusURL := joinAddress + "/status" - r.logger.Debug("Testing join address", zap.String("url", statusURL)) - + + var statusURL string + if strings.HasPrefix(joinAddress, "http://") || strings.HasPrefix(joinAddress, "https://") { + statusURL = strings.TrimRight(joinAddress, "/") + "/status" + } else { + // Extract host from host:port + host := joinAddress + if idx := strings.Index(joinAddress, ":"); idx != -1 { + host = joinAddress[:idx] + } + statusURL = fmt.Sprintf("http://%s:%d/status", host, 5001) + } + + r.logger.Debug("Testing join target via HTTP", zap.String("url", statusURL)) resp, err := client.Get(statusURL) if err != nil { - return fmt.Errorf("failed to connect to join address %s: %w", joinAddress, err) + return fmt.Errorf("failed to connect to leader HTTP at %s: %w", statusURL, err) } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("join address %s returned status %d", joinAddress, resp.StatusCode) + return fmt.Errorf("leader HTTP at %s returned status %d", statusURL, resp.StatusCode) } - - r.logger.Info("Join address is reachable", zap.String("address", joinAddress)) + + r.logger.Info("Leader HTTP reachable", zap.String("status_url", statusURL)) return nil }