network/pkg/database/rqlite.go
johnysigma 56f0a01b79 Add RQLite join address connectivity testing
- Test join address reachability before attempting to join cluster
- Fall back to starting new cluster if join address is unreachable
- Add comprehensive logging for join address testing
- Prevent RQLite fatal errors when bootstrap node is down

This fixes the issue where secondary nodes fail with 'invalid join address'
when the primary bootstrap node is not accessible on port 4001.
2025-08-06 13:05:58 +03:00

323 lines
8.6 KiB
Go

package database
import (
"context"
"fmt"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/rqlite/gorqlite"
"go.uber.org/zap"
"git.debros.io/DeBros/network/pkg/config"
)
// RQLiteManager manages an RQLite node instance
type RQLiteManager struct {
config *config.DatabaseConfig
dataDir string
logger *zap.Logger
cmd *exec.Cmd
connection *gorqlite.Connection
}
// NewRQLiteManager creates a new RQLite manager
func NewRQLiteManager(cfg *config.DatabaseConfig, dataDir string, logger *zap.Logger) *RQLiteManager {
return &RQLiteManager{
config: cfg,
dataDir: dataDir,
logger: logger,
}
}
// Start starts the RQLite node
func (r *RQLiteManager) Start(ctx context.Context) error {
// Create data directory
rqliteDataDir := filepath.Join(r.dataDir, "rqlite")
if err := os.MkdirAll(rqliteDataDir, 0755); err != nil {
return fmt.Errorf("failed to create RQLite data directory: %w", err)
}
// Get the external IP address for advertising
externalIP, err := r.getExternalIP()
if err != nil {
r.logger.Warn("Failed to get external IP, using localhost", zap.Error(err))
externalIP = "localhost"
}
r.logger.Info("Using external IP for RQLite advertising", zap.String("ip", externalIP))
// Build RQLite command
args := []string{
"-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort),
"-raft-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLiteRaftPort),
}
// Add advertised addresses if we have an external IP
if externalIP != "localhost" {
args = append(args, "-http-adv-addr", fmt.Sprintf("%s:%d", externalIP, r.config.RQLitePort))
args = append(args, "-raft-adv-addr", fmt.Sprintf("%s:%d", externalIP, r.config.RQLiteRaftPort))
}
// Add join address if specified (for non-bootstrap or secondary bootstrap nodes)
if r.config.RQLiteJoinAddress != "" {
r.logger.Info("Joining RQLite cluster", zap.String("join_address", r.config.RQLiteJoinAddress))
// Validate join address format before using it
if strings.HasPrefix(r.config.RQLiteJoinAddress, "http://") {
// Test if the join address is reachable before attempting to join
if err := r.testJoinAddress(r.config.RQLiteJoinAddress); err != nil {
r.logger.Warn("Join address is not reachable, starting as new cluster instead",
zap.String("join_address", r.config.RQLiteJoinAddress),
zap.Error(err))
// Don't add the -join parameter, let this node start its own cluster
} else {
args = append(args, "-join", r.config.RQLiteJoinAddress)
}
} else {
r.logger.Warn("Invalid join address format, skipping join", zap.String("address", r.config.RQLiteJoinAddress))
return fmt.Errorf("invalid RQLite join address format: %s (must start with http://)", r.config.RQLiteJoinAddress)
}
} else {
r.logger.Info("No join address specified - starting as new cluster")
}
// Add data directory as positional argument
args = append(args, rqliteDataDir)
r.logger.Info("Starting RQLite node",
zap.String("data_dir", rqliteDataDir),
zap.Int("http_port", r.config.RQLitePort),
zap.Int("raft_port", r.config.RQLiteRaftPort),
zap.String("join_address", r.config.RQLiteJoinAddress),
zap.String("external_ip", externalIP),
zap.Strings("full_args", args),
)
// Start RQLite process
r.cmd = exec.CommandContext(ctx, "rqlited", args...)
r.cmd.Stdout = os.Stdout
r.cmd.Stderr = os.Stderr
if err := r.cmd.Start(); err != nil {
return fmt.Errorf("failed to start RQLite: %w", err)
}
// Wait for RQLite to be ready
if err := r.waitForReady(ctx); err != nil {
r.cmd.Process.Kill()
return fmt.Errorf("RQLite failed to become ready: %w", err)
}
// Create connection
conn, err := gorqlite.Open(fmt.Sprintf("http://localhost:%d", r.config.RQLitePort))
if err != nil {
r.cmd.Process.Kill()
return fmt.Errorf("failed to connect to RQLite: %w", err)
}
r.connection = conn
// Wait for RQLite to establish leadership (for bootstrap nodes)
if r.config.RQLiteJoinAddress == "" {
if err := r.waitForLeadership(ctx); err != nil {
r.cmd.Process.Kill()
return fmt.Errorf("RQLite failed to establish leadership: %w", err)
}
}
r.logger.Info("RQLite node started successfully")
return nil
}
// waitForReady waits for RQLite to be ready to accept connections
func (r *RQLiteManager) waitForReady(ctx context.Context) error {
url := fmt.Sprintf("http://localhost:%d/status", r.config.RQLitePort)
client := &http.Client{Timeout: 2 * time.Second}
for i := 0; i < 30; i++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
resp, err := client.Get(url)
if err == nil {
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return nil
}
}
time.Sleep(1 * time.Second)
}
return fmt.Errorf("RQLite did not become ready within timeout")
}
// waitForLeadership waits for RQLite to establish leadership (for bootstrap nodes)
func (r *RQLiteManager) waitForLeadership(ctx context.Context) error {
r.logger.Info("Waiting for RQLite to establish leadership...")
for i := 0; i < 30; i++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Try a simple query to check if leadership is established
if r.connection != nil {
_, err := r.connection.QueryOne("SELECT 1")
if err == nil {
r.logger.Info("RQLite leadership established")
return nil
}
r.logger.Debug("Waiting for leadership", zap.Error(err))
}
time.Sleep(1 * time.Second)
}
return fmt.Errorf("RQLite failed to establish leadership within timeout")
}
// GetConnection returns the RQLite connection
func (r *RQLiteManager) GetConnection() *gorqlite.Connection {
return r.connection
}
// Stop stops the RQLite node
func (r *RQLiteManager) Stop() error {
if r.connection != nil {
r.connection.Close()
}
if r.cmd != nil && r.cmd.Process != nil {
r.logger.Info("Stopping RQLite node")
return r.cmd.Process.Kill()
}
return nil
}
// getExternalIP attempts to get the external IP address of this machine
func (r *RQLiteManager) getExternalIP() (string, error) {
// Method 1: Try using `ip route get` to find the IP used to reach the internet
if output, err := exec.Command("ip", "route", "get", "8.8.8.8").Output(); err == nil {
lines := strings.Split(string(output), "\n")
for _, line := range lines {
if strings.Contains(line, "src") {
parts := strings.Fields(line)
for i, part := range parts {
if part == "src" && i+1 < len(parts) {
ip := parts[i+1]
if net.ParseIP(ip) != nil {
r.logger.Debug("Found external IP via ip route", zap.String("ip", ip))
return ip, nil
}
}
}
}
}
}
// Method 2: Get all network interfaces and find non-localhost, non-private IPs
interfaces, err := net.Interfaces()
if err != nil {
return "", err
}
for _, iface := range interfaces {
if iface.Flags&net.FlagUp == 0 || iface.Flags&net.FlagLoopback != 0 {
continue
}
addrs, err := iface.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip == nil || ip.IsLoopback() {
continue
}
// Prefer public IPs over private IPs
if ip.To4() != nil && !ip.IsPrivate() {
r.logger.Debug("Found public IP", zap.String("ip", ip.String()))
return ip.String(), nil
}
}
}
// Method 3: Fall back to private IPs if no public IP found
for _, iface := range interfaces {
if iface.Flags&net.FlagUp == 0 || iface.Flags&net.FlagLoopback != 0 {
continue
}
addrs, err := iface.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip == nil || ip.IsLoopback() {
continue
}
// Use any IPv4 address
if ip.To4() != nil {
r.logger.Debug("Found private IP", zap.String("ip", ip.String()))
return ip.String(), nil
}
}
}
return "", fmt.Errorf("no suitable IP address found")
}
// testJoinAddress tests if a join address is reachable
func (r *RQLiteManager) testJoinAddress(joinAddress string) error {
// Test connection to the join address with a short timeout
client := &http.Client{Timeout: 5 * time.Second}
// Try to connect to the status endpoint
statusURL := joinAddress + "/status"
r.logger.Debug("Testing join address", zap.String("url", statusURL))
resp, err := client.Get(statusURL)
if err != nil {
return fmt.Errorf("failed to connect to join address %s: %w", joinAddress, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("join address %s returned status %d", joinAddress, resp.StatusCode)
}
r.logger.Info("Join address is reachable", zap.String("address", joinAddress))
return nil
}