fix: standardize rqlite join addresses to use host:port format without http scheme

This commit is contained in:
anonpenguin 2025-08-08 20:30:46 +03:00
parent b744f7f513
commit 05798471dd
3 changed files with 215 additions and 144 deletions

View File

@ -33,14 +33,14 @@ run-node:
run-node2:
@echo "Starting REGULAR node2 (role=node)..."
@if [ -z "$(BOOTSTRAP)" ]; then echo "ERROR: Provide BOOTSTRAP multiaddr: make run-node2 BOOTSTRAP=/ip4/127.0.0.1/tcp/4001/p2p/<ID> [HTTP=5002 RAFT=7002]"; exit 1; fi
go run cmd/node/main.go -role node -id node2 -data ./data/node2 -bootstrap $(BOOTSTRAP) -rqlite-http-port ${HTTP-5002} -rqlite-raft-port ${RAFT-7002}
go run cmd/node/main.go -role node -id node2 -data ./data/node2 -bootstrap $(BOOTSTRAP) -rqlite-http-port $${HTTP:-5002} -rqlite-raft-port $${RAFT:-7002}
# Run third node (regular) - requires BOOTSTRAP multiaddr
# Usage: make run-node3 BOOTSTRAP=/ip4/127.0.0.1/tcp/4001/p2p/<ID> HTTP=5003 RAFT=7003
run-node3:
@echo "Starting REGULAR node3 (role=node)..."
@if [ -z "$(BOOTSTRAP)" ]; then echo "ERROR: Provide BOOTSTRAP multiaddr: make run-node3 BOOTSTRAP=/ip4/127.0.0.1/tcp/4001/p2p/<ID> [HTTP=5003 RAFT=7003]"; exit 1; fi
go run cmd/node/main.go -role node -id node3 -data ./data/node3 -bootstrap $(BOOTSTRAP) -rqlite-http-port ${HTTP-5003} -rqlite-raft-port ${RAFT-7003}
go run cmd/node/main.go -role node -id node3 -data ./data/node3 -bootstrap $(BOOTSTRAP) -rqlite-http-port $${HTTP:-5003} -rqlite-raft-port $${RAFT:-7003}
# Show how to run with flags
show-bootstrap:

View File

@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"net"
"log"
"os"
"os/exec"
@ -112,10 +113,10 @@ func main() {
}
if isSecondaryBootstrap {
// Secondary bootstrap nodes join the primary bootstrap
// Secondary bootstrap nodes join the primary bootstrap Raft address (standardized to 7001)
primaryBootstrapHost := parseHostFromMultiaddr(bootstrapPeers[0])
cfg.Database.RQLiteJoinAddress = fmt.Sprintf("http://%s:%d", primaryBootstrapHost, *rqlHTTP)
logger.Printf("Secondary bootstrap node - joining primary bootstrap at: %s", cfg.Database.RQLiteJoinAddress)
cfg.Database.RQLiteJoinAddress = fmt.Sprintf("%s:%d", primaryBootstrapHost, 7001)
logger.Printf("Secondary bootstrap node - joining primary bootstrap (raft) at: %s", cfg.Database.RQLiteJoinAddress)
} else {
// Primary bootstrap node doesn't join anyone - it starts the cluster
cfg.Database.RQLiteJoinAddress = ""
@ -123,18 +124,28 @@ func main() {
}
} else {
// Configure bootstrap peers for P2P discovery
var rqliteJoinAddr string
var rqliteJoinAddr string // host:port for Raft join
if *bootstrap != "" {
// Use command line bootstrap if provided
cfg.Discovery.BootstrapPeers = []string{*bootstrap}
// Extract IP from bootstrap peer for RQLite join
// Extract IP from bootstrap peer for RQLite
bootstrapHost := parseHostFromMultiaddr(*bootstrap)
if bootstrapHost != "" {
rqliteJoinAddr = fmt.Sprintf("http://%s:%d", bootstrapHost, *rqlHTTP)
logger.Printf("Using extracted bootstrap host %s for RQLite join", bootstrapHost)
// If user provided localhost for libp2p, translate to this host's primary IP so rqlite can join the correct process.
if bootstrapHost == "127.0.0.1" || strings.EqualFold(bootstrapHost, "localhost") {
if extIP, err := getPreferredLocalIP(); err == nil && extIP != "" {
logger.Printf("Translating localhost bootstrap to external IP %s for RQLite join", extIP)
bootstrapHost = extIP
} else {
logger.Printf("Warning: Failed to resolve external IP, keeping localhost for RQLite join")
}
}
// Regular nodes should join the bootstrap's RQLite Raft port (standardized to 7001)
rqliteJoinAddr = fmt.Sprintf("%s:%d", bootstrapHost, 7001)
logger.Printf("Using extracted bootstrap host %s for RQLite Raft join (port 7001)", bootstrapHost)
} else {
logger.Printf("Warning: Could not extract host from bootstrap peer %s, using localhost fallback", *bootstrap)
rqliteJoinAddr = fmt.Sprintf("http://localhost:%d", *rqlHTTP) // Use localhost fallback instead
rqliteJoinAddr = fmt.Sprintf("localhost:%d", 7001) // Use localhost raft fallback instead
}
logger.Printf("Using command line bootstrap peer: %s", *bootstrap)
} else {
@ -145,30 +156,30 @@ func main() {
// Use the first bootstrap peer for RQLite join
bootstrapHost := parseHostFromMultiaddr(bootstrapPeers[0])
if bootstrapHost != "" {
rqliteJoinAddr = fmt.Sprintf("http://%s:5001", bootstrapHost)
logger.Printf("Using extracted bootstrap host %s for RQLite join", bootstrapHost)
rqliteJoinAddr = fmt.Sprintf("%s:%d", bootstrapHost, 7001)
logger.Printf("Using extracted bootstrap host %s for RQLite Raft join", bootstrapHost)
} else {
logger.Printf("Warning: Could not extract host from bootstrap peer %s", bootstrapPeers[0])
// Try primary production server as fallback
rqliteJoinAddr = "http://localhost:5001"
rqliteJoinAddr = "localhost:7001"
}
logger.Printf("Using environment bootstrap peers: %v", bootstrapPeers)
logger.Printf("Using environment bootstrap peers: %v", bootstrapPeers)
} else {
logger.Printf("Warning: No bootstrap peers configured")
// Default to localhost when no peers configured
rqliteJoinAddr = "http://localhost:5001"
logger.Printf("Using localhost fallback for RQLite join")
rqliteJoinAddr = "localhost:7001"
logger.Printf("Using localhost fallback for RQLite Raft join")
}
// Log network connectivity diagnostics
logger.Printf("=== NETWORK DIAGNOSTICS ===")
logger.Printf("Target RQLite join address: %s", rqliteJoinAddr)
logger.Printf("Target RQLite Raft join address: %s", rqliteJoinAddr)
runNetworkDiagnostics(rqliteJoinAddr, logger)
}
// Regular nodes join the bootstrap node's RQLite cluster
cfg.Database.RQLiteJoinAddress = rqliteJoinAddr
logger.Printf("Regular node - joining RQLite cluster at: %s", cfg.Database.RQLiteJoinAddress)
logger.Printf("Regular node - joining RQLite cluster (raft) at: %s", cfg.Database.RQLiteJoinAddress)
}
logger.Printf("Data directory: %s", cfg.Node.DataDir)
@ -236,40 +247,86 @@ func isBootstrapNode() bool {
return false
}
// parseHostFromMultiaddr extracts the host from a multiaddr
func parseHostFromMultiaddr(multiaddr string) string {
// Simple parsing for /ip4/host/tcp/port/p2p/peerid format
parts := strings.Split(multiaddr, "/")
// Look for ip4/ip6/dns host in the multiaddr
for i, part := range parts {
if (part == "ip4" || part == "ip6" || part == "dns" || part == "dns4" || part == "dns6") && i+1 < len(parts) {
return parts[i+1]
}
}
return ""
// getPreferredLocalIP returns a non-loopback IPv4 address of this machine
func getPreferredLocalIP() (string, error) {
ifaces, err := net.Interfaces()
if err != nil {
return "", err
}
for _, iface := range ifaces {
if (iface.Flags&net.FlagUp) == 0 || (iface.Flags&net.FlagLoopback) != 0 {
continue
}
addrs, err := iface.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip == nil || ip.IsLoopback() {
continue
}
ip = ip.To4()
if ip == nil {
continue
}
return ip.String(), nil
}
}
return "", fmt.Errorf("no non-loopback IPv4 found")
}
// isLocalIP checks if the given IP address belongs to this machine
func isLocalIP(ip string) bool {
// Try to run ip command to get local IPs
if output, err := exec.Command("ip", "addr", "show").Output(); err == nil {
if strings.Contains(string(output), ip) {
return true
}
}
if ip == "127.0.0.1" || strings.EqualFold(ip, "localhost") {
return true
}
ifaces, err := net.Interfaces()
if err != nil {
return false
}
for _, iface := range ifaces {
if (iface.Flags&net.FlagUp) == 0 {
continue
}
addrs, err := iface.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
var a net.IP
switch v := addr.(type) {
case *net.IPNet:
a = v.IP
case *net.IPAddr:
a = v.IP
}
if a != nil && a.String() == ip {
return true
}
}
}
return false
}
// Fallback: try hostname -I command
if output, err := exec.Command("hostname", "-I").Output(); err == nil {
ips := strings.Fields(strings.TrimSpace(string(output)))
for _, localIP := range ips {
if localIP == ip {
return true
}
}
}
// parseHostFromMultiaddr extracts the host from a multiaddr
func parseHostFromMultiaddr(multiaddr string) string {
// Simple parsing for /ip4/host/tcp/port/p2p/peerid format
parts := strings.Split(multiaddr, "/")
return false
// Look for ip4/ip6/dns host in the multiaddr
for i, part := range parts {
if (part == "ip4" || part == "ip6" || part == "dns" || part == "dns4" || part == "dns6") && i+1 < len(parts) {
return parts[i+1]
}
}
return ""
}
func startNode(ctx context.Context, cfg *config.Config, port int, isBootstrap bool, logger *logging.StandardLogger) error {
@ -307,73 +364,74 @@ func startNode(ctx context.Context, cfg *config.Config, port int, isBootstrap bo
}
// runNetworkDiagnostics performs network connectivity tests
func runNetworkDiagnostics(rqliteJoinAddr string, logger *logging.StandardLogger) {
// Extract host and port from the join address
if !strings.HasPrefix(rqliteJoinAddr, "http://") {
logger.Printf("Invalid join address format: %s", rqliteJoinAddr)
return
}
func runNetworkDiagnostics(target string, logger *logging.StandardLogger) {
// If target has scheme, treat as HTTP URL. Otherwise treat as host:port raft.
var host, port string
if strings.HasPrefix(target, "http://") || strings.HasPrefix(target, "https://") {
url := strings.TrimPrefix(strings.TrimPrefix(target, "http://"), "https://")
parts := strings.Split(url, ":")
if len(parts) == 2 {
host, port = parts[0], parts[1]
}
} else {
parts := strings.Split(target, ":")
if len(parts) == 2 {
host, port = parts[0], parts[1]
}
}
if host == "" || port == "" {
logger.Printf("Cannot parse host:port from %s", target)
return
}
// Parse URL to extract host:port
url := strings.TrimPrefix(rqliteJoinAddr, "http://")
parts := strings.Split(url, ":")
if len(parts) != 2 {
logger.Printf("Cannot parse host:port from %s", rqliteJoinAddr)
return
}
logger.Printf("Testing TCP connectivity to %s:%s", host, port)
if output, err := exec.Command("timeout", "5", "nc", "-z", "-v", host, port).CombinedOutput(); err == nil {
logger.Printf("✅ Port %s:%s is reachable", host, port)
logger.Printf("netcat output: %s", strings.TrimSpace(string(output)))
} else {
logger.Printf("❌ Port %s:%s is NOT reachable", host, port)
logger.Printf("netcat error: %v", err)
logger.Printf("netcat output: %s", strings.TrimSpace(string(output)))
}
host := parts[0]
port := parts[1]
// Also probe HTTP status on port 5001 of the same host, which is the default HTTP API
httpURL := fmt.Sprintf("http://%s:%d/status", host, 5001)
if output, err := exec.Command("timeout", "5", "curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", httpURL).Output(); err == nil {
httpCode := strings.TrimSpace(string(output))
if httpCode == "200" {
logger.Printf("✅ HTTP service on %s is responding correctly (status: %s)", httpURL, httpCode)
} else {
logger.Printf("⚠️ HTTP service on %s responded with status: %s", httpURL, httpCode)
}
} else {
logger.Printf("❌ HTTP request to %s failed: %v", httpURL, err)
}
logger.Printf("Testing connectivity to %s:%s", host, port)
// Ping test
if output, err := exec.Command("ping", "-c", "3", "-W", "2", host).Output(); err == nil {
lines := strings.Split(string(output), "\n")
for _, line := range lines {
if strings.Contains(line, "packet loss") {
logger.Printf("🏓 Ping result: %s", strings.TrimSpace(line))
break
}
}
} else {
logger.Printf("❌ Ping test failed: %v", err)
}
// Test 1: Basic connectivity with netcat or telnet
if output, err := exec.Command("timeout", "5", "nc", "-z", "-v", host, port).CombinedOutput(); err == nil {
logger.Printf("✅ Port %s:%s is reachable", host, port)
logger.Printf("netcat output: %s", strings.TrimSpace(string(output)))
} else {
logger.Printf("❌ Port %s:%s is NOT reachable", host, port)
logger.Printf("netcat error: %v", err)
logger.Printf("netcat output: %s", strings.TrimSpace(string(output)))
}
// DNS resolution
if output, err := exec.Command("nslookup", host).Output(); err == nil {
logger.Printf("🔍 DNS resolution successful")
lines := strings.Split(string(output), "\n")
for _, line := range lines {
if strings.Contains(line, "Address:") && !strings.Contains(line, "#53") {
logger.Printf("DNS result: %s", strings.TrimSpace(line))
}
}
} else {
logger.Printf("❌ DNS resolution failed: %v", err)
}
// Test 2: HTTP connectivity test
if output, err := exec.Command("timeout", "5", "curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", rqliteJoinAddr+"/status").Output(); err == nil {
httpCode := strings.TrimSpace(string(output))
if httpCode == "200" {
logger.Printf("✅ HTTP service is responding correctly (status: %s)", httpCode)
} else {
logger.Printf("⚠️ HTTP service responded with status: %s", httpCode)
}
} else {
logger.Printf("❌ HTTP request failed: %v", err)
}
// Test 3: Ping test
if output, err := exec.Command("ping", "-c", "3", "-W", "2", host).Output(); err == nil {
lines := strings.Split(string(output), "\n")
for _, line := range lines {
if strings.Contains(line, "packet loss") {
logger.Printf("🏓 Ping result: %s", strings.TrimSpace(line))
break
}
}
} else {
logger.Printf("❌ Ping test failed: %v", err)
}
// Test 4: DNS resolution
if output, err := exec.Command("nslookup", host).Output(); err == nil {
logger.Printf("🔍 DNS resolution successful")
lines := strings.Split(string(output), "\n")
for _, line := range lines {
if strings.Contains(line, "Address:") && !strings.Contains(line, "#53") {
logger.Printf("DNS result: %s", strings.TrimSpace(line))
}
}
} else {
logger.Printf("❌ DNS resolution failed: %v", err)
}
logger.Printf("=== END DIAGNOSTICS ===")
logger.Printf("=== END DIAGNOSTICS ===")
}

View File

@ -68,22 +68,25 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
if r.config.RQLiteJoinAddress != "" {
r.logger.Info("Joining RQLite cluster", zap.String("join_address", r.config.RQLiteJoinAddress))
// Validate join address format before using it
if strings.HasPrefix(r.config.RQLiteJoinAddress, "http://") {
// Test connectivity and log the results, but always attempt to join
if err := r.testJoinAddress(r.config.RQLiteJoinAddress); err != nil {
r.logger.Warn("Join address connectivity test failed, but will still attempt to join",
zap.String("join_address", r.config.RQLiteJoinAddress),
zap.Error(err))
} else {
r.logger.Info("Join address is reachable, proceeding with cluster join")
}
// Always add the join parameter - let RQLite handle retries
args = append(args, "-join", r.config.RQLiteJoinAddress)
} else {
r.logger.Warn("Invalid join address format, skipping join", zap.String("address", r.config.RQLiteJoinAddress))
return fmt.Errorf("invalid RQLite join address format: %s (must start with http://)", r.config.RQLiteJoinAddress)
// Normalize join address to host:port for rqlited -join
joinArg := r.config.RQLiteJoinAddress
if strings.HasPrefix(joinArg, "http://") {
joinArg = strings.TrimPrefix(joinArg, "http://")
} else if strings.HasPrefix(joinArg, "https://") {
joinArg = strings.TrimPrefix(joinArg, "https://")
}
// Test connectivity (HTTP status) on the leader's HTTP port derived from join host
if err := r.testJoinAddress(joinArg); err != nil {
r.logger.Warn("Join target connectivity test failed, but will still attempt to join",
zap.String("join_address", r.config.RQLiteJoinAddress),
zap.Error(err))
} else {
r.logger.Info("Join target is reachable, proceeding with cluster join")
}
// Always add the join parameter in host:port form - let rqlited handle the rest
args = append(args, "-join", joinArg)
} else {
r.logger.Info("No join address specified - starting as new cluster")
}
@ -302,24 +305,34 @@ func (r *RQLiteManager) getExternalIP() (string, error) {
// testJoinAddress tests if a join address is reachable
func (r *RQLiteManager) testJoinAddress(joinAddress string) error {
// Test connection to the join address with a short timeout
// Determine the HTTP status URL to probe.
// If joinAddress contains a scheme, use it directly. Otherwise treat joinAddress
// as host:port (Raft) and probe the standard HTTP API port 5001 on that host.
client := &http.Client{Timeout: 5 * time.Second}
// Try to connect to the status endpoint
statusURL := joinAddress + "/status"
r.logger.Debug("Testing join address", zap.String("url", statusURL))
var statusURL string
if strings.HasPrefix(joinAddress, "http://") || strings.HasPrefix(joinAddress, "https://") {
statusURL = strings.TrimRight(joinAddress, "/") + "/status"
} else {
// Extract host from host:port
host := joinAddress
if idx := strings.Index(joinAddress, ":"); idx != -1 {
host = joinAddress[:idx]
}
statusURL = fmt.Sprintf("http://%s:%d/status", host, 5001)
}
r.logger.Debug("Testing join target via HTTP", zap.String("url", statusURL))
resp, err := client.Get(statusURL)
if err != nil {
return fmt.Errorf("failed to connect to join address %s: %w", joinAddress, err)
return fmt.Errorf("failed to connect to leader HTTP at %s: %w", statusURL, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("join address %s returned status %d", joinAddress, resp.StatusCode)
return fmt.Errorf("leader HTTP at %s returned status %d", statusURL, resp.StatusCode)
}
r.logger.Info("Join address is reachable", zap.String("address", joinAddress))
r.logger.Info("Leader HTTP reachable", zap.String("status_url", statusURL))
return nil
}