From 56dc6892de82789be3af5b75ac8b77b28297e5db Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 31 Oct 2025 07:34:27 +0200 Subject: [PATCH 01/13] chore: update version and enhance database connection configuration - Bumped version from 0.52.15 to 0.52.16 in the Makefile. - Added connection pool configuration with timeouts and limits in the RQLite adapter and gateway, improving database connection management. --- Makefile | 2 +- pkg/gateway/gateway.go | 9 +++++++++ pkg/rqlite/adapter.go | 7 +++++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index fc86ede..ac54302 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.52.15 +VERSION := 0.52.16 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index ef03844..6057a34 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -108,13 +108,22 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { if dbErr != nil { logger.ComponentWarn(logging.ComponentGeneral, "failed to open rqlite sql db; http orm gateway disabled", zap.Error(dbErr)) } else { + // Configure connection pool with proper timeouts and limits + db.SetMaxOpenConns(25) // Maximum number of open connections + db.SetMaxIdleConns(5) // Maximum number of idle connections + db.SetConnMaxLifetime(5 * time.Minute) // Maximum lifetime of a connection + db.SetConnMaxIdleTime(2 * time.Minute) // Maximum idle time before closing + gw.sqlDB = db orm := rqlite.NewClient(db) gw.ormClient = orm gw.ormHTTP = rqlite.NewHTTPGateway(orm, "/v1/db") + // Set a reasonable timeout for HTTP requests (30 seconds) + gw.ormHTTP.Timeout = 30 * time.Second logger.ComponentInfo(logging.ComponentGeneral, "RQLite ORM HTTP gateway ready", zap.String("dsn", dsn), zap.String("base_path", "/v1/db"), + zap.Duration("timeout", gw.ormHTTP.Timeout), ) } diff --git a/pkg/rqlite/adapter.go b/pkg/rqlite/adapter.go index 81205bf..ec456d3 100644 --- a/pkg/rqlite/adapter.go +++ b/pkg/rqlite/adapter.go @@ -3,6 +3,7 @@ package rqlite import ( "database/sql" "fmt" + "time" _ "github.com/rqlite/gorqlite/stdlib" // Import the database/sql driver ) @@ -21,6 +22,12 @@ func NewRQLiteAdapter(manager *RQLiteManager) (*RQLiteAdapter, error) { return nil, fmt.Errorf("failed to open RQLite SQL connection: %w", err) } + // Configure connection pool with proper timeouts and limits + db.SetMaxOpenConns(25) // Maximum number of open connections + db.SetMaxIdleConns(5) // Maximum number of idle connections + db.SetConnMaxLifetime(5 * time.Minute) // Maximum lifetime of a connection + db.SetConnMaxIdleTime(2 * time.Minute) // Maximum idle time before closing + return &RQLiteAdapter{ manager: manager, db: db, From d672f01b30cd8cc11bb03763b20486abc39e0250 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 31 Oct 2025 07:36:06 +0200 Subject: [PATCH 02/13] chore: bump version to 0.52.17 in the Makefile --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index ac54302..6eb64e7 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.52.16 +VERSION := 0.52.17 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' From 45dde8917549e7f801b9bc597e5eac82dfd150e3 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 31 Oct 2025 07:38:30 +0200 Subject: [PATCH 03/13] feat: add branch selection prompt in setup process - Introduced a new promptBranch function to allow users to select between 'main' and 'nightly' branches during the setup. - Updated cloneAndBuild function to use the selected branch for cloning and pulling updates, enhancing flexibility in repository management. - Implemented logic to switch branches if the current branch differs from the selected one, ensuring the correct branch is used. --- Makefile | 2 +- pkg/cli/setup.go | 39 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 6eb64e7..6e26c2a 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.52.17 +VERSION := 0.52.18 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/cli/setup.go b/pkg/cli/setup.go index 213c537..9b59ab1 100644 --- a/pkg/cli/setup.go +++ b/pkg/cli/setup.go @@ -166,6 +166,19 @@ func promptYesNo() bool { return response == "yes" || response == "y" } +func promptBranch() string { + reader := bufio.NewReader(os.Stdin) + fmt.Printf(" Select branch (main/nightly) [default: main]: ") + response, _ := reader.ReadString('\n') + response = strings.ToLower(strings.TrimSpace(response)) + + if response == "nightly" { + return "nightly" + } + // Default to main for anything else (including empty) + return "main" +} + // isValidMultiaddr validates bootstrap peer multiaddr format func isValidMultiaddr(s string) bool { s = strings.TrimSpace(s) @@ -616,16 +629,38 @@ func setupDirectories() { func cloneAndBuild() { fmt.Printf("🔨 Cloning and building DeBros Network...\n") + // Prompt for branch selection + branch := promptBranch() + fmt.Printf(" Using branch: %s\n", branch) + // Check if already cloned if _, err := os.Stat("/home/debros/src/.git"); err == nil { fmt.Printf(" Updating repository...\n") - cmd := exec.Command("sudo", "-u", "debros", "git", "-C", "/home/debros/src", "pull", "origin", "nightly") + + // Check current branch and switch if needed + currentBranchCmd := exec.Command("sudo", "-u", "debros", "git", "-C", "/home/debros/src", "rev-parse", "--abbrev-ref", "HEAD") + if output, err := currentBranchCmd.Output(); err == nil { + currentBranch := strings.TrimSpace(string(output)) + if currentBranch != branch { + fmt.Printf(" Switching from %s to %s...\n", currentBranch, branch) + // Fetch the target branch first (needed for shallow clones) + exec.Command("sudo", "-u", "debros", "git", "-C", "/home/debros/src", "fetch", "origin", branch).Run() + // Checkout the selected branch + checkoutCmd := exec.Command("sudo", "-u", "debros", "git", "-C", "/home/debros/src", "checkout", branch) + if err := checkoutCmd.Run(); err != nil { + fmt.Fprintf(os.Stderr, "⚠️ Failed to switch branch: %v\n", err) + } + } + } + + // Pull latest changes + cmd := exec.Command("sudo", "-u", "debros", "git", "-C", "/home/debros/src", "pull", "origin", branch) if err := cmd.Run(); err != nil { fmt.Fprintf(os.Stderr, "⚠️ Failed to update repo: %v\n", err) } } else { fmt.Printf(" Cloning repository...\n") - cmd := exec.Command("sudo", "-u", "debros", "git", "clone", "--branch", "nightly", "--depth", "1", "https://github.com/DeBrosOfficial/network.git", "/home/debros/src") + cmd := exec.Command("sudo", "-u", "debros", "git", "clone", "--branch", branch, "--depth", "1", "https://github.com/DeBrosOfficial/network.git", "/home/debros/src") if err := cmd.Run(); err != nil { fmt.Fprintf(os.Stderr, "❌ Failed to clone repo: %v\n", err) os.Exit(1) From ede253afae4d7790d7a5b2c143a5eabf80fb47b6 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 31 Oct 2025 08:05:59 +0200 Subject: [PATCH 04/13] feat: enhance Anon installation process with Ubuntu version check and improved error handling - Added a check for unsupported Ubuntu version 25.04, providing informative messages if installation is skipped. - Updated the installation method to use the official script from GitHub, ensuring a cleaner installation process. - Implemented cleanup of old APT repository files and GPG keys before installation. - Enhanced error handling for enabling and starting the Anon service, including output of command errors for better debugging. --- Makefile | 2 +- pkg/cli/setup.go | 83 ++++++++++++++++++++++++++++++++---------------- 2 files changed, 56 insertions(+), 29 deletions(-) diff --git a/Makefile b/Makefile index 6e26c2a..fc01177 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.52.18 +VERSION := 0.52.19 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/cli/setup.go b/pkg/cli/setup.go index 9b59ab1..1e37697 100644 --- a/pkg/cli/setup.go +++ b/pkg/cli/setup.go @@ -409,43 +409,53 @@ func installAnon() { return } - // Install via APT (official method from docs.anyone.io) - fmt.Printf(" Adding Anyone APT repository...\n") - - // Add GPG key - cmd := exec.Command("sh", "-c", "curl -fsSL https://deb.anyone.io/gpg.key | gpg --dearmor -o /usr/share/keyrings/anyone-archive-keyring.gpg") - if err := cmd.Run(); err != nil { - fmt.Fprintf(os.Stderr, "⚠️ Failed to add Anyone GPG key: %v\n", err) - fmt.Fprintf(os.Stderr, " You can manually install with:\n") - fmt.Fprintf(os.Stderr, " curl -fsSL https://deb.anyone.io/gpg.key | sudo gpg --dearmor -o /usr/share/keyrings/anyone-archive-keyring.gpg\n") - fmt.Fprintf(os.Stderr, " echo 'deb [signed-by=/usr/share/keyrings/anyone-archive-keyring.gpg] https://deb.anyone.io/ anyone main' | sudo tee /etc/apt/sources.list.d/anyone.list\n") - fmt.Fprintf(os.Stderr, " sudo apt update && sudo apt install -y anon\n") + // Check Ubuntu version - Ubuntu 25.04 is not yet supported by Anon repository + osInfo := detectLinuxDistro() + if strings.Contains(strings.ToLower(osInfo), "ubuntu 25.04") || strings.Contains(strings.ToLower(osInfo), "plucky") { + fmt.Fprintf(os.Stderr, "⚠️ Ubuntu 25.04 (Plucky) is not yet supported by Anon repository\n") + fmt.Fprintf(os.Stderr, " Anon installation will be skipped. The gateway will work without it,\n") + fmt.Fprintf(os.Stderr, " but anonymous proxy functionality will not be available.\n") + fmt.Fprintf(os.Stderr, " You can manually install Anon later when support is added:\n") + fmt.Fprintf(os.Stderr, " sudo /bin/bash -c \"$(curl -fsSL https://raw.githubusercontent.com/anyone-protocol/anon-install/refs/heads/main/install.sh)\"\n\n") return } - // Add repository - repoLine := "deb [signed-by=/usr/share/keyrings/anyone-archive-keyring.gpg] https://deb.anyone.io/ anyone main" - if err := os.WriteFile("/etc/apt/sources.list.d/anyone.list", []byte(repoLine+"\n"), 0644); err != nil { - fmt.Fprintf(os.Stderr, "⚠️ Failed to add Anyone repository: %v\n", err) - return + // Install via official installation script (from GitHub) + fmt.Printf(" Installing Anon using official installation script...\n") + fmt.Printf(" Note: The installation script may prompt for configuration\n") + + // Clean up any old APT repository files from previous installation attempts + gpgKeyPath := "/usr/share/keyrings/anyone-archive-keyring.gpg" + repoPath := "/etc/apt/sources.list.d/anyone.list" + if _, err := os.Stat(gpgKeyPath); err == nil { + fmt.Printf(" Removing old GPG key file...\n") + os.Remove(gpgKeyPath) + } + if _, err := os.Stat(repoPath); err == nil { + fmt.Printf(" Removing old repository file...\n") + os.Remove(repoPath) } - // Update package list - fmt.Printf(" Updating package list...\n") - exec.Command("apt", "update", "-qq").Run() + // Use the official installation script from GitHub + installScriptURL := "https://raw.githubusercontent.com/anyone-protocol/anon-install/refs/heads/main/install.sh" + cmd := exec.Command("sh", "-c", fmt.Sprintf("curl -fsSL %s | bash", installScriptURL)) + cmd.Stdin = os.Stdin // Allow interactive prompts if needed + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr - // Install anon - fmt.Printf(" Installing Anon package...\n") - cmd = exec.Command("apt", "install", "-y", "anon") if err := cmd.Run(); err != nil { fmt.Fprintf(os.Stderr, "⚠️ Anon installation failed: %v\n", err) - return + fmt.Fprintf(os.Stderr, " The gateway will work without Anon, but anonymous proxy functionality will not be available.\n") + fmt.Fprintf(os.Stderr, " You can manually install Anon later:\n") + fmt.Fprintf(os.Stderr, " sudo /bin/bash -c \"$(curl -fsSL https://raw.githubusercontent.com/anyone-protocol/anon-install/refs/heads/main/install.sh)\"\n") + return // Continue setup without Anon } // Verify installation if _, err := exec.LookPath("anon"); err != nil { - fmt.Fprintf(os.Stderr, "⚠️ Anon installation may have failed\n") - return + fmt.Fprintf(os.Stderr, "⚠️ Anon installation verification failed: binary not found in PATH\n") + fmt.Fprintf(os.Stderr, " Continuing setup without Anon...\n") + return // Continue setup without Anon } fmt.Printf(" ✓ Anon installed\n") @@ -461,11 +471,28 @@ func installAnon() { // Enable and start service fmt.Printf(" Enabling Anon service...\n") - exec.Command("systemctl", "enable", "anon").Run() - exec.Command("systemctl", "start", "anon").Run() + enableCmd := exec.Command("systemctl", "enable", "anon") + if output, err := enableCmd.CombinedOutput(); err != nil { + fmt.Fprintf(os.Stderr, "⚠️ Failed to enable Anon service: %v\n", err) + if len(output) > 0 { + fmt.Fprintf(os.Stderr, " Output: %s\n", string(output)) + } + } + startCmd := exec.Command("systemctl", "start", "anon") + if output, err := startCmd.CombinedOutput(); err != nil { + fmt.Fprintf(os.Stderr, "⚠️ Failed to start Anon service: %v\n", err) + if len(output) > 0 { + fmt.Fprintf(os.Stderr, " Output: %s\n", string(output)) + } + fmt.Fprintf(os.Stderr, " Check service status: systemctl status anon\n") + } else { + fmt.Printf(" ✓ Anon service started\n") + } + + // Verify service is running if exec.Command("systemctl", "is-active", "--quiet", "anon").Run() == nil { - fmt.Printf(" ✓ Anon service is running\n") + fmt.Printf(" ✓ Anon service is active\n") } else { fmt.Fprintf(os.Stderr, "⚠️ Anon service may not be running. Check: systemctl status anon\n") } From 472b7c10bb186601d8ad1a00a89ba5c4bece150f Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 31 Oct 2025 08:24:32 +0200 Subject: [PATCH 05/13] feat: automate terms acceptance in Anon configuration and installation - Added automatic acceptance of terms in the Anon configuration file to streamline the setup process. - Updated the installation script to pre-accept terms, eliminating interactive prompts during installation. - Enhanced output messages to reflect the auto-accepted terms for better user awareness. --- pkg/cli/setup.go | 5 +++++ scripts/install-debros-network.sh | 10 ++++++++++ 2 files changed, 15 insertions(+) diff --git a/pkg/cli/setup.go b/pkg/cli/setup.go index 1e37697..be554c5 100644 --- a/pkg/cli/setup.go +++ b/pkg/cli/setup.go @@ -528,6 +528,10 @@ func configureAnonDefaults() { if !strings.Contains(config, "SocksPort") { config += "SocksPort 9050\n" } + // Auto-accept terms to avoid interactive prompts + if !strings.Contains(config, "AgreeToTerms") { + config += "AgreeToTerms 1\n" + } // Write back os.WriteFile(anonrcPath, []byte(config), 0644) @@ -536,6 +540,7 @@ func configureAnonDefaults() { fmt.Printf(" ORPort: 9001 (default)\n") fmt.Printf(" ControlPort: 9051\n") fmt.Printf(" SOCKSPort: 9050\n") + fmt.Printf(" AgreeToTerms: 1 (auto-accepted)\n") } } diff --git a/scripts/install-debros-network.sh b/scripts/install-debros-network.sh index 31d86ae..235ee76 100755 --- a/scripts/install-debros-network.sh +++ b/scripts/install-debros-network.sh @@ -223,6 +223,10 @@ install_anon() { # Add repository echo "deb [signed-by=/usr/share/keyrings/anyone-archive-keyring.gpg] https://deb.anyone.io/ anyone main" | sudo tee /etc/apt/sources.list.d/anyone.list >/dev/null + # Preseed terms acceptance to avoid interactive prompt + log "Pre-accepting Anon terms and conditions..." + echo "anon anon/terms boolean true" | sudo debconf-set-selections + # Update and install log "Installing Anon package..." sudo apt update -qq @@ -285,10 +289,16 @@ configure_anon_defaults() { echo "SocksPort 9050" | sudo tee -a /etc/anon/anonrc >/dev/null fi + # Auto-accept terms in config file + if ! grep -q "^AgreeToTerms" /etc/anon/anonrc; then + echo "AgreeToTerms 1" | sudo tee -a /etc/anon/anonrc >/dev/null + fi + log " Nickname: ${HOSTNAME}" log " ORPort: 9001 (default)" log " ControlPort: 9051" log " SOCKSPort: 9050" + log " AgreeToTerms: 1 (auto-accepted)" fi } From 8538e2eb3f703276d80c5f9f41af8f1b8af6b947 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 31 Oct 2025 08:36:59 +0200 Subject: [PATCH 06/13] feat: streamline Anon installation with automated terms acceptance - Implemented pre-acceptance of Anon terms in both the setup and installation scripts to eliminate interactive prompts. - Enhanced the installation process by creating necessary directories and files to ensure terms are accepted before installation. - Updated the installation command to use a non-interactive frontend, improving reliability during package installation. --- pkg/cli/setup.go | 38 +++++++++++++++++++++++++++++-- scripts/install-debros-network.sh | 38 +++++++++++++++++++++++++++++-- 2 files changed, 72 insertions(+), 4 deletions(-) diff --git a/pkg/cli/setup.go b/pkg/cli/setup.go index be554c5..5ea8175 100644 --- a/pkg/cli/setup.go +++ b/pkg/cli/setup.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "os/exec" + "path/filepath" "runtime" "strconv" "strings" @@ -436,10 +437,43 @@ func installAnon() { os.Remove(repoPath) } + // Preseed debconf before installation + fmt.Printf(" Pre-accepting Anon terms and conditions...\n") + preseedCmd := exec.Command("sh", "-c", `echo "anon anon/terms boolean true" | debconf-set-selections`) + preseedCmd.Run() // Ignore errors, preseed might not be critical + + // Create anonrc directory and file with AgreeToTerms before installation + // This ensures terms are accepted even if the post-install script checks the file + anonrcDir := "/etc/anon" + anonrcPath := "/etc/anon/anonrc" + if err := os.MkdirAll(anonrcDir, 0755); err == nil { + if _, err := os.Stat(anonrcPath); os.IsNotExist(err) { + // Create file with AgreeToTerms already set + os.WriteFile(anonrcPath, []byte("AgreeToTerms 1\n"), 0644) + } + } + + // Create terms-agreement files in multiple possible locations + // Anon might check for these files to verify terms acceptance + termsLocations := []string{ + "/var/lib/anon/terms-agreement", + "/usr/share/anon/terms-agreement", + "/usr/share/keyrings/anon/terms-agreement", + "/usr/share/keyrings/anyone-terms-agreed", + } + for _, loc := range termsLocations { + dir := filepath.Dir(loc) + if err := os.MkdirAll(dir, 0755); err == nil { + os.WriteFile(loc, []byte("agreed\n"), 0644) + } + } + // Use the official installation script from GitHub + // Pipe "yes" repeatedly to automatically accept terms prompt installScriptURL := "https://raw.githubusercontent.com/anyone-protocol/anon-install/refs/heads/main/install.sh" - cmd := exec.Command("sh", "-c", fmt.Sprintf("curl -fsSL %s | bash", installScriptURL)) - cmd.Stdin = os.Stdin // Allow interactive prompts if needed + // Use yes command to pipe multiple "yes" responses if needed + cmd := exec.Command("sh", "-c", fmt.Sprintf("curl -fsSL %s | (yes yes | bash)", installScriptURL)) + cmd.Env = append(os.Environ(), "DEBIAN_FRONTEND=noninteractive") cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr diff --git a/scripts/install-debros-network.sh b/scripts/install-debros-network.sh index 235ee76..efbc1ba 100755 --- a/scripts/install-debros-network.sh +++ b/scripts/install-debros-network.sh @@ -225,12 +225,46 @@ install_anon() { # Preseed terms acceptance to avoid interactive prompt log "Pre-accepting Anon terms and conditions..." + # Try multiple debconf question formats echo "anon anon/terms boolean true" | sudo debconf-set-selections + echo "anon anon/terms seen true" | sudo debconf-set-selections + # Also try with select/string format + echo "anon anon/terms select true" | sudo debconf-set-selections || true - # Update and install + # Query debconf to verify the question exists and set it properly + # Some packages use different question formats + sudo debconf-get-selections | grep -i anon || true + + # Create anonrc directory and file with AgreeToTerms before installation + # This ensures terms are accepted even if the post-install script checks the file + sudo mkdir -p /etc/anon + if [ ! -f /etc/anon/anonrc ]; then + echo "AgreeToTerms 1" | sudo tee /etc/anon/anonrc >/dev/null + fi + + # Also create a terms-agreement file if Anon checks for it + # Check multiple possible locations where Anon might look for terms acceptance + sudo mkdir -p /var/lib/anon + echo "agreed" | sudo tee /var/lib/anon/terms-agreement >/dev/null 2>&1 || true + sudo mkdir -p /usr/share/anon + echo "agreed" | sudo tee /usr/share/anon/terms-agreement >/dev/null 2>&1 || true + # Also create near the GPG keyring directory (as the user suggested) + sudo mkdir -p /usr/share/keyrings/anon + echo "agreed" | sudo tee /usr/share/keyrings/anon/terms-agreement >/dev/null 2>&1 || true + # Create in the keyring directory itself as a marker file + echo "agreed" | sudo tee /usr/share/keyrings/anyone-terms-agreed >/dev/null 2>&1 || true + + # Update and install with non-interactive frontend log "Installing Anon package..." sudo apt update -qq - if ! sudo apt install -y anon; then + + # Use DEBIAN_FRONTEND=noninteractive and set debconf values directly via apt-get options + # This is more reliable than just debconf-set-selections + if ! sudo DEBIAN_FRONTEND=noninteractive \ + apt-get install -y \ + -o Dpkg::Options::="--force-confdef" \ + -o Dpkg::Options::="--force-confold" \ + anon; then warning "Anon installation failed" return 1 fi From 624f92bf11bcf37dfa4d5646f389f98fc24eaa38 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 31 Oct 2025 08:37:12 +0200 Subject: [PATCH 07/13] updated version --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index fc01177..e36b820 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.52.19 +VERSION := 0.52.20 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' From f561bc5311fed299ff3c3de8c290bf57f08ed5b2 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 31 Oct 2025 08:42:07 +0200 Subject: [PATCH 08/13] refactor: improve Anon installation script for terms acceptance - Updated the installation process to rely on debconf preseed and file-based acceptance methods, reducing the need for interactive prompts. - Limited the number of "yes" responses piped during installation to enhance reliability and prevent infinite loops. - Streamlined the command execution for fetching and running the installation script from GitHub. --- Makefile | 2 +- pkg/cli/setup.go | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index e36b820..5240288 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.52.20 +VERSION := 0.52.21 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/cli/setup.go b/pkg/cli/setup.go index 5ea8175..e1a2cc8 100644 --- a/pkg/cli/setup.go +++ b/pkg/cli/setup.go @@ -469,11 +469,14 @@ func installAnon() { } // Use the official installation script from GitHub - // Pipe "yes" repeatedly to automatically accept terms prompt + // Rely on debconf preseed and file-based acceptance methods + // If prompts still appear, pipe a few "yes" responses (not infinite) installScriptURL := "https://raw.githubusercontent.com/anyone-protocol/anon-install/refs/heads/main/install.sh" - // Use yes command to pipe multiple "yes" responses if needed - cmd := exec.Command("sh", "-c", fmt.Sprintf("curl -fsSL %s | (yes yes | bash)", installScriptURL)) + // Pipe multiple "yes" responses (but limited) in case of multiple prompts + yesResponses := strings.Repeat("yes\n", 10) // 10 "yes" responses should be enough + cmd := exec.Command("sh", "-c", fmt.Sprintf("curl -fsSL %s | bash", installScriptURL)) cmd.Env = append(os.Environ(), "DEBIAN_FRONTEND=noninteractive") + cmd.Stdin = strings.NewReader(yesResponses) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr From ea5ef6bc1a76edf0bc2c9ab6fe89c5bc2b0a891a Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 31 Oct 2025 10:46:52 +0200 Subject: [PATCH 09/13] feat: implement dynamic cluster discovery and validation for RQLite nodes - Added ClusterDiscoveryService to manage peer discovery and synchronization for RQLite nodes. - Introduced new configuration options for cluster synchronization intervals, peer inactivity limits, and minimum cluster size. - Enhanced validation logic to ensure proper configuration of cluster parameters. - Implemented metrics collection for cluster health and peer status, improving monitoring capabilities. - Updated RQLiteManager to integrate with the new discovery service, allowing for dynamic leadership and cluster joining logic. --- pkg/config/config.go | 10 + pkg/config/validate.go | 64 +++- pkg/discovery/discovery.go | 27 +- pkg/discovery/rqlite_metadata.go | 22 ++ pkg/node/monitoring.go | 71 +++- pkg/node/node.go | 57 ++- pkg/rqlite/cluster_discovery.go | 592 +++++++++++++++++++++++++++++++ pkg/rqlite/data_safety.go | 109 ++++++ pkg/rqlite/metrics.go | 74 ++++ pkg/rqlite/rqlite.go | 160 ++++++++- pkg/rqlite/types.go | 71 ++++ 11 files changed, 1218 insertions(+), 39 deletions(-) create mode 100644 pkg/discovery/rqlite_metadata.go create mode 100644 pkg/rqlite/cluster_discovery.go create mode 100644 pkg/rqlite/data_safety.go create mode 100644 pkg/rqlite/metrics.go create mode 100644 pkg/rqlite/types.go diff --git a/pkg/config/config.go b/pkg/config/config.go index 9ca73d9..5784597 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -36,6 +36,11 @@ type DatabaseConfig struct { RQLitePort int `yaml:"rqlite_port"` // RQLite HTTP API port RQLiteRaftPort int `yaml:"rqlite_raft_port"` // RQLite Raft consensus port RQLiteJoinAddress string `yaml:"rqlite_join_address"` // Address to join RQLite cluster + + // Dynamic discovery configuration (always enabled) + ClusterSyncInterval time.Duration `yaml:"cluster_sync_interval"` // default: 30s + PeerInactivityLimit time.Duration `yaml:"peer_inactivity_limit"` // default: 24h + MinClusterSize int `yaml:"min_cluster_size"` // default: 1 } // DiscoveryConfig contains peer discovery configuration @@ -106,6 +111,11 @@ func DefaultConfig() *Config { RQLitePort: 5001, RQLiteRaftPort: 7001, RQLiteJoinAddress: "", // Empty for bootstrap node + + // Dynamic discovery (always enabled) + ClusterSyncInterval: 30 * time.Second, + PeerInactivityLimit: 24 * time.Hour, + MinClusterSize: 1, }, Discovery: DiscoveryConfig{ BootstrapPeers: []string{}, diff --git a/pkg/config/validate.go b/pkg/config/validate.go index 045d784..b50c7d2 100644 --- a/pkg/config/validate.go +++ b/pkg/config/validate.go @@ -7,6 +7,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" @@ -51,6 +52,15 @@ func (c *Config) validateNode() []error { var errs []error nc := c.Node + // Validate node ID (required for RQLite cluster membership) + if nc.ID == "" { + errs = append(errs, ValidationError{ + Path: "node.id", + Message: "must not be empty (required for cluster membership)", + Hint: "will be auto-generated if empty, but explicit ID recommended", + }) + } + // Validate type if nc.Type != "bootstrap" && nc.Type != "node" { errs = append(errs, ValidationError{ @@ -233,6 +243,40 @@ func (c *Config) validateDatabase() []error { } } + // Validate cluster_sync_interval + if dc.ClusterSyncInterval != 0 && dc.ClusterSyncInterval < 10*time.Second { + errs = append(errs, ValidationError{ + Path: "database.cluster_sync_interval", + Message: fmt.Sprintf("must be >= 10s or 0 (for default); got %v", dc.ClusterSyncInterval), + Hint: "recommended: 30s", + }) + } + + // Validate peer_inactivity_limit + if dc.PeerInactivityLimit != 0 { + if dc.PeerInactivityLimit < time.Hour { + errs = append(errs, ValidationError{ + Path: "database.peer_inactivity_limit", + Message: fmt.Sprintf("must be >= 1h or 0 (for default); got %v", dc.PeerInactivityLimit), + Hint: "recommended: 24h", + }) + } else if dc.PeerInactivityLimit > 7*24*time.Hour { + errs = append(errs, ValidationError{ + Path: "database.peer_inactivity_limit", + Message: fmt.Sprintf("must be <= 7d; got %v", dc.PeerInactivityLimit), + Hint: "recommended: 24h", + }) + } + } + + // Validate min_cluster_size + if dc.MinClusterSize < 1 { + errs = append(errs, ValidationError{ + Path: "database.min_cluster_size", + Message: fmt.Sprintf("must be >= 1; got %d", dc.MinClusterSize), + }) + } + return errs } @@ -320,8 +364,14 @@ func (c *Config) validateDiscovery() []error { seenPeers[peer] = true } - // Validate http_adv_address - if disc.HttpAdvAddress != "" { + // Validate http_adv_address (required for cluster discovery) + if disc.HttpAdvAddress == "" { + errs = append(errs, ValidationError{ + Path: "discovery.http_adv_address", + Message: "required for RQLite cluster discovery", + Hint: "set to your public HTTP address (e.g., 51.83.128.181:5001)", + }) + } else { if err := validateHostOrHostPort(disc.HttpAdvAddress); err != nil { errs = append(errs, ValidationError{ Path: "discovery.http_adv_address", @@ -331,8 +381,14 @@ func (c *Config) validateDiscovery() []error { } } - // Validate raft_adv_address - if disc.RaftAdvAddress != "" { + // Validate raft_adv_address (required for cluster discovery) + if disc.RaftAdvAddress == "" { + errs = append(errs, ValidationError{ + Path: "discovery.raft_adv_address", + Message: "required for RQLite cluster discovery", + Hint: "set to your public Raft address (e.g., 51.83.128.181:7001)", + }) + } else { if err := validateHostOrHostPort(disc.RaftAdvAddress); err != nil { errs = append(errs, ValidationError{ Path: "discovery.raft_adv_address", diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index 442ebbf..19b5887 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -24,7 +24,8 @@ type PeerExchangeRequest struct { // PeerExchangeResponse represents a list of peers to exchange type PeerExchangeResponse struct { - Peers []PeerInfo `json:"peers"` + Peers []PeerInfo `json:"peers"` + RQLiteMetadata *RQLiteNodeMetadata `json:"rqlite_metadata,omitempty"` } // PeerInfo contains peer identity and addresses @@ -123,6 +124,16 @@ func (d *Manager) handlePeerExchangeStream(s network.Stream) { added++ } + // Add RQLite metadata if available + if val, err := d.host.Peerstore().Get(d.host.ID(), "rqlite_metadata"); err == nil { + if jsonData, ok := val.([]byte); ok { + var metadata RQLiteNodeMetadata + if err := json.Unmarshal(jsonData, &metadata); err == nil { + resp.RQLiteMetadata = &metadata + } + } + } + // Send response encoder := json.NewEncoder(s) if err := encoder.Encode(&resp); err != nil { @@ -131,7 +142,8 @@ func (d *Manager) handlePeerExchangeStream(s network.Stream) { } d.logger.Debug("Sent peer exchange response", - zap.Int("peer_count", len(resp.Peers))) + zap.Int("peer_count", len(resp.Peers)), + zap.Bool("has_rqlite_metadata", resp.RQLiteMetadata != nil)) } // Start begins periodic peer discovery @@ -363,6 +375,17 @@ func (d *Manager) requestPeersFromPeer(ctx context.Context, peerID peer.ID, limi return nil } + // Store remote peer's RQLite metadata if available + if resp.RQLiteMetadata != nil { + metadataJSON, err := json.Marshal(resp.RQLiteMetadata) + if err == nil { + _ = d.host.Peerstore().Put(peerID, "rqlite_metadata", metadataJSON) + d.logger.Debug("Stored RQLite metadata from peer", + zap.String("peer_id", peerID.String()[:8]+"..."), + zap.String("node_id", resp.RQLiteMetadata.NodeID)) + } + } + return resp.Peers } diff --git a/pkg/discovery/rqlite_metadata.go b/pkg/discovery/rqlite_metadata.go new file mode 100644 index 0000000..65a7048 --- /dev/null +++ b/pkg/discovery/rqlite_metadata.go @@ -0,0 +1,22 @@ +package discovery + +import ( + "time" +) + +// RQLiteNodeMetadata contains RQLite-specific information announced via LibP2P +type RQLiteNodeMetadata struct { + NodeID string `json:"node_id"` // RQLite node ID (from config) + RaftAddress string `json:"raft_address"` // Raft port address (e.g., "51.83.128.181:7001") + HTTPAddress string `json:"http_address"` // HTTP API address (e.g., "51.83.128.181:5001") + NodeType string `json:"node_type"` // "bootstrap" or "node" + RaftLogIndex uint64 `json:"raft_log_index"` // Current Raft log index (for data comparison) + LastSeen time.Time `json:"last_seen"` // Updated on every announcement + ClusterVersion string `json:"cluster_version"` // For compatibility checking +} + +// PeerExchangeResponseV2 extends the original response with RQLite metadata +type PeerExchangeResponseV2 struct { + Peers []PeerInfo `json:"peers"` + RQLiteMetadata *RQLiteNodeMetadata `json:"rqlite_metadata,omitempty"` +} diff --git a/pkg/node/monitoring.go b/pkg/node/monitoring.go index 866a4fd..bf9931f 100644 --- a/pkg/node/monitoring.go +++ b/pkg/node/monitoring.go @@ -91,12 +91,13 @@ func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory } msg := struct { - PeerID string `json:"peer_id"` - PeerCount int `json:"peer_count"` - PeerIDs []string `json:"peer_ids,omitempty"` - CPU uint64 `json:"cpu_usage"` - Memory uint64 `json:"memory_usage"` - Timestamp int64 `json:"timestamp"` + PeerID string `json:"peer_id"` + PeerCount int `json:"peer_count"` + PeerIDs []string `json:"peer_ids,omitempty"` + CPU uint64 `json:"cpu_usage"` + Memory uint64 `json:"memory_usage"` + Timestamp int64 `json:"timestamp"` + ClusterHealth map[string]interface{} `json:"cluster_health,omitempty"` }{ PeerID: n.host.ID().String(), PeerCount: len(peers), @@ -106,6 +107,20 @@ func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory Timestamp: time.Now().Unix(), } + // Add cluster health metrics if available + if n.clusterDiscovery != nil { + metrics := n.clusterDiscovery.GetMetrics() + msg.ClusterHealth = map[string]interface{}{ + "cluster_size": metrics.ClusterSize, + "active_nodes": metrics.ActiveNodes, + "inactive_nodes": metrics.InactiveNodes, + "discovery_status": metrics.DiscoveryStatus, + "current_leader": metrics.CurrentLeader, + "average_peer_health": metrics.AveragePeerHealth, + "last_update": metrics.LastUpdate.Format(time.RFC3339), + } + } + data, err := json.Marshal(msg) if err != nil { return err @@ -119,6 +134,50 @@ func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory return nil } +// GetClusterHealth returns cluster health information +func (n *Node) GetClusterHealth() map[string]interface{} { + if n.clusterDiscovery == nil { + return map[string]interface{}{ + "status": "not_initialized", + } + } + + metrics := n.clusterDiscovery.GetMetrics() + return map[string]interface{}{ + "cluster_size": metrics.ClusterSize, + "active_nodes": metrics.ActiveNodes, + "inactive_nodes": metrics.InactiveNodes, + "discovery_status": metrics.DiscoveryStatus, + "current_leader": metrics.CurrentLeader, + "average_peer_health": metrics.AveragePeerHealth, + "last_update": metrics.LastUpdate, + } +} + +// GetDiscoveryStatus returns discovery service status +func (n *Node) GetDiscoveryStatus() map[string]interface{} { + if n.clusterDiscovery == nil { + return map[string]interface{}{ + "status": "disabled", + "message": "cluster discovery not initialized", + } + } + + metrics := n.clusterDiscovery.GetMetrics() + status := "healthy" + if metrics.DiscoveryStatus == "no_peers" { + status = "warning" + } else if metrics.DiscoveryStatus == "degraded" { + status = "degraded" + } + + return map[string]interface{}{ + "status": status, + "cluster_size": metrics.ClusterSize, + "last_update": metrics.LastUpdate, + } +} + // startConnectionMonitoring starts minimal connection monitoring for the lightweight client. // Unlike nodes which need extensive monitoring, clients only need basic health checks. func (n *Node) startConnectionMonitoring() { diff --git a/pkg/node/node.go b/pkg/node/node.go index 988ab1d..852d768 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -33,8 +33,9 @@ type Node struct { logger *logging.ColoredLogger host host.Host - rqliteManager *database.RQLiteManager - rqliteAdapter *database.RQLiteAdapter + rqliteManager *database.RQLiteManager + rqliteAdapter *database.RQLiteAdapter + clusterDiscovery *database.ClusterDiscoveryService // Peer discovery bootstrapCancel context.CancelFunc @@ -67,6 +68,41 @@ func (n *Node) startRQLite(ctx context.Context) error { // Create RQLite manager n.rqliteManager = database.NewRQLiteManager(&n.config.Database, &n.config.Discovery, n.config.Node.DataDir, n.logger.Logger) + // Initialize cluster discovery service if LibP2P host is available + if n.host != nil && n.discoveryManager != nil { + // Determine node type + nodeType := "node" + if n.config.Node.Type == "bootstrap" { + nodeType = "bootstrap" + } + + // Create cluster discovery service + n.clusterDiscovery = database.NewClusterDiscoveryService( + n.host, + n.discoveryManager, + n.rqliteManager, + n.config.Node.ID, + nodeType, + n.config.Discovery.RaftAdvAddress, + n.config.Discovery.HttpAdvAddress, + n.config.Node.DataDir, + n.logger.Logger, + ) + + // Set discovery service on RQLite manager + n.rqliteManager.SetDiscoveryService(n.clusterDiscovery) + + // Start cluster discovery + if err := n.clusterDiscovery.Start(ctx); err != nil { + return fmt.Errorf("failed to start cluster discovery: %w", err) + } + + // Update our own metadata + n.clusterDiscovery.UpdateOwnMetadata() + + n.logger.Info("Cluster discovery service started") + } + // Start RQLite if err := n.rqliteManager.Start(ctx); err != nil { return err @@ -532,6 +568,11 @@ func (n *Node) stopPeerDiscovery() { func (n *Node) Stop() error { n.logger.ComponentInfo(logging.ComponentNode, "Stopping network node") + // Stop cluster discovery + if n.clusterDiscovery != nil { + n.clusterDiscovery.Stop() + } + // Stop bootstrap reconnection loop if n.bootstrapCancel != nil { n.bootstrapCancel() @@ -577,16 +618,16 @@ func (n *Node) Start(ctx context.Context) error { return fmt.Errorf("failed to create data directory: %w", err) } - // Start RQLite - if err := n.startRQLite(ctx); err != nil { - return fmt.Errorf("failed to start RQLite: %w", err) - } - - // Start LibP2P host + // Start LibP2P host first (needed for cluster discovery) if err := n.startLibP2P(); err != nil { return fmt.Errorf("failed to start LibP2P: %w", err) } + // Start RQLite with cluster discovery + if err := n.startRQLite(ctx); err != nil { + return fmt.Errorf("failed to start RQLite: %w", err) + } + // Get listen addresses for logging var listenAddrs []string for _, addr := range n.host.Addrs() { diff --git a/pkg/rqlite/cluster_discovery.go b/pkg/rqlite/cluster_discovery.go new file mode 100644 index 0000000..e386beb --- /dev/null +++ b/pkg/rqlite/cluster_discovery.go @@ -0,0 +1,592 @@ +package rqlite + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/DeBrosOfficial/network/pkg/discovery" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "go.uber.org/zap" +) + +// ClusterDiscoveryService bridges LibP2P discovery with RQLite cluster management +type ClusterDiscoveryService struct { + host host.Host + discoveryMgr *discovery.Manager + rqliteManager *RQLiteManager + nodeID string + nodeType string + raftAddress string + httpAddress string + dataDir string + + knownPeers map[string]*discovery.RQLiteNodeMetadata // NodeID -> Metadata + peerHealth map[string]*PeerHealth // NodeID -> Health + lastUpdate time.Time + updateInterval time.Duration // 30 seconds + inactivityLimit time.Duration // 24 hours + + logger *zap.Logger + mu sync.RWMutex + cancel context.CancelFunc + started bool +} + +// NewClusterDiscoveryService creates a new cluster discovery service +func NewClusterDiscoveryService( + h host.Host, + discoveryMgr *discovery.Manager, + rqliteManager *RQLiteManager, + nodeID string, + nodeType string, + raftAddress string, + httpAddress string, + dataDir string, + logger *zap.Logger, +) *ClusterDiscoveryService { + return &ClusterDiscoveryService{ + host: h, + discoveryMgr: discoveryMgr, + rqliteManager: rqliteManager, + nodeID: nodeID, + nodeType: nodeType, + raftAddress: raftAddress, + httpAddress: httpAddress, + dataDir: dataDir, + knownPeers: make(map[string]*discovery.RQLiteNodeMetadata), + peerHealth: make(map[string]*PeerHealth), + updateInterval: 30 * time.Second, + inactivityLimit: 24 * time.Hour, + logger: logger, + } +} + +// Start begins the cluster discovery service +func (c *ClusterDiscoveryService) Start(ctx context.Context) error { + c.mu.Lock() + if c.started { + c.mu.Unlock() + return fmt.Errorf("cluster discovery already started") + } + c.started = true + c.mu.Unlock() + + ctx, cancel := context.WithCancel(ctx) + c.cancel = cancel + + c.logger.Info("Starting cluster discovery service", + zap.String("node_id", c.nodeID), + zap.String("node_type", c.nodeType), + zap.String("raft_address", c.raftAddress), + zap.String("http_address", c.httpAddress), + zap.String("data_dir", c.dataDir), + zap.Duration("update_interval", c.updateInterval), + zap.Duration("inactivity_limit", c.inactivityLimit)) + + // Start periodic sync in background + go c.periodicSync(ctx) + + // Start periodic cleanup in background + go c.periodicCleanup(ctx) + + c.logger.Info("Cluster discovery goroutines started") + + return nil +} + +// Stop stops the cluster discovery service +func (c *ClusterDiscoveryService) Stop() { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.started { + return + } + + if c.cancel != nil { + c.cancel() + } + c.started = false + + c.logger.Info("Cluster discovery service stopped") +} + +// periodicSync runs periodic cluster membership synchronization +func (c *ClusterDiscoveryService) periodicSync(ctx context.Context) { + c.logger.Info("periodicSync goroutine started, doing initial sync immediately") + + ticker := time.NewTicker(c.updateInterval) + defer ticker.Stop() + + // Do initial sync immediately + c.logger.Info("Running initial cluster membership sync") + c.updateClusterMembership() + c.logger.Info("Initial cluster membership sync completed") + + for { + select { + case <-ctx.Done(): + c.logger.Info("periodicSync goroutine stopping") + return + case <-ticker.C: + c.logger.Debug("Running periodic cluster membership sync") + c.updateClusterMembership() + } + } +} + +// periodicCleanup runs periodic cleanup of inactive nodes +func (c *ClusterDiscoveryService) periodicCleanup(ctx context.Context) { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.removeInactivePeers() + } + } +} + +// collectPeerMetadata collects RQLite metadata from LibP2P peers +func (c *ClusterDiscoveryService) collectPeerMetadata() []*discovery.RQLiteNodeMetadata { + connectedPeers := c.host.Network().Peers() + var metadata []*discovery.RQLiteNodeMetadata + + c.logger.Debug("Collecting peer metadata from LibP2P", + zap.Int("connected_libp2p_peers", len(connectedPeers))) + + // Add ourselves + ourMetadata := &discovery.RQLiteNodeMetadata{ + NodeID: c.nodeID, + RaftAddress: c.raftAddress, + HTTPAddress: c.httpAddress, + NodeType: c.nodeType, + RaftLogIndex: c.rqliteManager.getRaftLogIndex(), + LastSeen: time.Now(), + ClusterVersion: "1.0", + } + metadata = append(metadata, ourMetadata) + + // Query connected peers for their RQLite metadata + // For now, we'll use a simple approach - store metadata in peer metadata store + // In a full implementation, this would use a custom protocol to exchange RQLite metadata + for _, peerID := range connectedPeers { + // Try to get stored metadata from peerstore + // This would be populated by a peer exchange protocol + if val, err := c.host.Peerstore().Get(peerID, "rqlite_metadata"); err == nil { + if jsonData, ok := val.([]byte); ok { + var peerMeta discovery.RQLiteNodeMetadata + if err := json.Unmarshal(jsonData, &peerMeta); err == nil { + peerMeta.LastSeen = time.Now() + metadata = append(metadata, &peerMeta) + } + } + } + } + + return metadata +} + +// updateClusterMembership updates the cluster membership based on discovered peers +func (c *ClusterDiscoveryService) updateClusterMembership() { + metadata := c.collectPeerMetadata() + + c.logger.Debug("Collected peer metadata", + zap.Int("metadata_count", len(metadata))) + + c.mu.Lock() + defer c.mu.Unlock() + + // Track changes + added := []string{} + updated := []string{} + + // Update known peers + for _, meta := range metadata { + if existing, ok := c.knownPeers[meta.NodeID]; ok { + // Update existing peer + if existing.RaftLogIndex != meta.RaftLogIndex || + existing.HTTPAddress != meta.HTTPAddress || + existing.RaftAddress != meta.RaftAddress { + updated = append(updated, meta.NodeID) + } + } else { + // New peer discovered + added = append(added, meta.NodeID) + c.logger.Info("Node added to cluster", + zap.String("node_id", meta.NodeID), + zap.String("raft_address", meta.RaftAddress), + zap.String("node_type", meta.NodeType), + zap.Uint64("log_index", meta.RaftLogIndex)) + } + + c.knownPeers[meta.NodeID] = meta + + // Update health tracking + if _, ok := c.peerHealth[meta.NodeID]; !ok { + c.peerHealth[meta.NodeID] = &PeerHealth{ + LastSeen: time.Now(), + LastSuccessful: time.Now(), + Status: "active", + } + } else { + c.peerHealth[meta.NodeID].LastSeen = time.Now() + c.peerHealth[meta.NodeID].Status = "active" + c.peerHealth[meta.NodeID].FailureCount = 0 + } + } + + // Generate and write peers.json if there are changes OR if this is the first time + shouldWrite := len(added) > 0 || len(updated) > 0 || c.lastUpdate.IsZero() + + if shouldWrite { + c.logger.Info("Updating peers.json", + zap.Int("added", len(added)), + zap.Int("updated", len(updated)), + zap.Int("total_peers", len(c.knownPeers)), + zap.Bool("first_run", c.lastUpdate.IsZero())) + + // Get peers JSON while holding the lock + peers := c.getPeersJSONUnlocked() + + // Release lock before file I/O + c.mu.Unlock() + + // Write without holding lock + if err := c.writePeersJSONWithData(peers); err != nil { + c.logger.Error("CRITICAL: Failed to write peers.json", + zap.Error(err), + zap.String("data_dir", c.dataDir), + zap.Int("peer_count", len(peers))) + } else { + c.logger.Info("Successfully wrote peers.json", + zap.Int("peer_count", len(peers))) + } + + // Re-acquire lock to update lastUpdate + c.mu.Lock() + } else { + c.logger.Debug("No changes to cluster membership", + zap.Int("total_peers", len(c.knownPeers))) + } + + c.lastUpdate = time.Now() +} + +// removeInactivePeers removes peers that haven't been seen for longer than the inactivity limit +func (c *ClusterDiscoveryService) removeInactivePeers() { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + removed := []string{} + + for nodeID, health := range c.peerHealth { + inactiveDuration := now.Sub(health.LastSeen) + + if inactiveDuration > c.inactivityLimit { + // Mark as inactive and remove + c.logger.Warn("Node removed from cluster", + zap.String("node_id", nodeID), + zap.String("reason", "inactive"), + zap.Duration("inactive_duration", inactiveDuration)) + + delete(c.knownPeers, nodeID) + delete(c.peerHealth, nodeID) + removed = append(removed, nodeID) + } + } + + // Regenerate peers.json if any peers were removed + if len(removed) > 0 { + c.logger.Info("Removed inactive nodes, regenerating peers.json", + zap.Int("removed", len(removed)), + zap.Strings("node_ids", removed)) + + if err := c.writePeersJSON(); err != nil { + c.logger.Error("Failed to write peers.json after cleanup", zap.Error(err)) + } + } +} + +// getPeersJSON generates the peers.json structure from active peers (acquires lock) +func (c *ClusterDiscoveryService) getPeersJSON() []map[string]interface{} { + c.mu.RLock() + defer c.mu.RUnlock() + return c.getPeersJSONUnlocked() +} + +// getPeersJSONUnlocked generates the peers.json structure (must be called with lock held) +func (c *ClusterDiscoveryService) getPeersJSONUnlocked() []map[string]interface{} { + peers := make([]map[string]interface{}, 0, len(c.knownPeers)) + + for _, peer := range c.knownPeers { + peerEntry := map[string]interface{}{ + "id": peer.NodeID, + "address": peer.RaftAddress, + "non_voter": false, + } + peers = append(peers, peerEntry) + } + + return peers +} + +// writePeersJSON atomically writes the peers.json file (acquires lock) +func (c *ClusterDiscoveryService) writePeersJSON() error { + c.mu.RLock() + peers := c.getPeersJSONUnlocked() + c.mu.RUnlock() + + return c.writePeersJSONWithData(peers) +} + +// writePeersJSONWithData writes the peers.json file with provided data (no lock needed) +func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]interface{}) error { + c.logger.Info("writePeersJSON: Starting", + zap.String("data_dir", c.dataDir)) + + c.logger.Info("writePeersJSON: Got peers JSON", + zap.Int("peer_count", len(peers))) + + // Expand ~ in data directory path + dataDir := os.ExpandEnv(c.dataDir) + if strings.HasPrefix(dataDir, "~") { + home, err := os.UserHomeDir() + if err != nil { + return fmt.Errorf("failed to determine home directory: %w", err) + } + dataDir = filepath.Join(home, dataDir[1:]) + } + + c.logger.Info("writePeersJSON: Expanded data dir", + zap.String("expanded_path", dataDir)) + + // Get the RQLite raft directory + rqliteDir := filepath.Join(dataDir, "rqlite", "raft") + + c.logger.Info("writePeersJSON: Creating raft directory", + zap.String("raft_dir", rqliteDir)) + + if err := os.MkdirAll(rqliteDir, 0755); err != nil { + return fmt.Errorf("failed to create raft directory %s: %w", rqliteDir, err) + } + + peersFile := filepath.Join(rqliteDir, "peers.json") + backupFile := filepath.Join(rqliteDir, "peers.json.backup") + + c.logger.Info("writePeersJSON: File paths", + zap.String("peers_file", peersFile), + zap.String("backup_file", backupFile)) + + // Backup existing peers.json if it exists + if _, err := os.Stat(peersFile); err == nil { + c.logger.Info("writePeersJSON: Backing up existing peers.json") + data, err := os.ReadFile(peersFile) + if err == nil { + _ = os.WriteFile(backupFile, data, 0644) + } + } + + // Marshal to JSON + c.logger.Info("writePeersJSON: Marshaling to JSON") + data, err := json.MarshalIndent(peers, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal peers.json: %w", err) + } + + c.logger.Info("writePeersJSON: JSON marshaled", + zap.Int("data_size", len(data))) + + // Write atomically using temp file + rename + tempFile := peersFile + ".tmp" + + c.logger.Info("writePeersJSON: Writing temp file", + zap.String("temp_file", tempFile)) + + if err := os.WriteFile(tempFile, data, 0644); err != nil { + return fmt.Errorf("failed to write temp peers.json %s: %w", tempFile, err) + } + + c.logger.Info("writePeersJSON: Renaming temp file to final") + + if err := os.Rename(tempFile, peersFile); err != nil { + return fmt.Errorf("failed to rename %s to %s: %w", tempFile, peersFile, err) + } + + nodeIDs := make([]string, 0, len(peers)) + for _, p := range peers { + if id, ok := p["id"].(string); ok { + nodeIDs = append(nodeIDs, id) + } + } + + c.logger.Info("peers.json successfully written!", + zap.String("file", peersFile), + zap.Int("node_count", len(peers)), + zap.Strings("node_ids", nodeIDs)) + + return nil +} + +// GetActivePeers returns a list of active peers (not including self) +func (c *ClusterDiscoveryService) GetActivePeers() []*discovery.RQLiteNodeMetadata { + c.mu.RLock() + defer c.mu.RUnlock() + + peers := make([]*discovery.RQLiteNodeMetadata, 0, len(c.knownPeers)) + for _, peer := range c.knownPeers { + // Skip self + if peer.NodeID == c.nodeID { + continue + } + peers = append(peers, peer) + } + + return peers +} + +// GetNodeWithHighestLogIndex returns the node with the highest Raft log index +func (c *ClusterDiscoveryService) GetNodeWithHighestLogIndex() *discovery.RQLiteNodeMetadata { + c.mu.RLock() + defer c.mu.RUnlock() + + var highest *discovery.RQLiteNodeMetadata + var maxIndex uint64 = 0 + + for _, peer := range c.knownPeers { + // Skip self + if peer.NodeID == c.nodeID { + continue + } + + if peer.RaftLogIndex > maxIndex { + maxIndex = peer.RaftLogIndex + highest = peer + } + } + + return highest +} + +// HasRecentPeersJSON checks if peers.json was recently updated +func (c *ClusterDiscoveryService) HasRecentPeersJSON() bool { + c.mu.RLock() + defer c.mu.RUnlock() + + // Consider recent if updated in last 5 minutes + return time.Since(c.lastUpdate) < 5*time.Minute +} + +// FindJoinTargets discovers join targets via LibP2P, prioritizing bootstrap nodes +func (c *ClusterDiscoveryService) FindJoinTargets() []string { + c.mu.RLock() + defer c.mu.RUnlock() + + targets := []string{} + + // Prioritize bootstrap nodes + for _, peer := range c.knownPeers { + if peer.NodeType == "bootstrap" { + targets = append(targets, peer.RaftAddress) + } + } + + // Add other nodes as fallback + for _, peer := range c.knownPeers { + if peer.NodeType != "bootstrap" { + targets = append(targets, peer.RaftAddress) + } + } + + return targets +} + +// WaitForDiscoverySettling waits for LibP2P discovery to settle (used on concurrent startup) +func (c *ClusterDiscoveryService) WaitForDiscoverySettling(ctx context.Context) { + settleDuration := 60 * time.Second + c.logger.Info("Waiting for discovery to settle", + zap.Duration("duration", settleDuration)) + + select { + case <-ctx.Done(): + return + case <-time.After(settleDuration): + } + + // Collect final peer list + c.updateClusterMembership() + + c.mu.RLock() + peerCount := len(c.knownPeers) + c.mu.RUnlock() + + c.logger.Info("Discovery settled", + zap.Int("peer_count", peerCount)) +} + +// UpdateOwnMetadata updates our own RQLite metadata in the peerstore +func (c *ClusterDiscoveryService) UpdateOwnMetadata() { + c.logger.Info("Updating own RQLite metadata for peer exchange", + zap.String("node_id", c.nodeID)) + + metadata := &discovery.RQLiteNodeMetadata{ + NodeID: c.nodeID, + RaftAddress: c.raftAddress, + HTTPAddress: c.httpAddress, + NodeType: c.nodeType, + RaftLogIndex: c.rqliteManager.getRaftLogIndex(), + LastSeen: time.Now(), + ClusterVersion: "1.0", + } + + c.logger.Info("Created metadata struct", + zap.String("node_id", metadata.NodeID), + zap.String("raft_address", metadata.RaftAddress), + zap.String("http_address", metadata.HTTPAddress), + zap.Uint64("log_index", metadata.RaftLogIndex)) + + // Store in our own peerstore for peer exchange + data, err := json.Marshal(metadata) + if err != nil { + c.logger.Error("Failed to marshal own metadata", zap.Error(err)) + return + } + + if err := c.host.Peerstore().Put(c.host.ID(), "rqlite_metadata", data); err != nil { + c.logger.Error("Failed to store own metadata", zap.Error(err)) + return + } + + c.logger.Info("Successfully stored own RQLite metadata in peerstore", + zap.String("node_id", c.nodeID), + zap.Uint64("log_index", metadata.RaftLogIndex)) +} + +// StoreRemotePeerMetadata stores metadata received from a remote peer +func (c *ClusterDiscoveryService) StoreRemotePeerMetadata(peerID peer.ID, metadata *discovery.RQLiteNodeMetadata) error { + data, err := json.Marshal(metadata) + if err != nil { + return fmt.Errorf("failed to marshal metadata: %w", err) + } + + if err := c.host.Peerstore().Put(peerID, "rqlite_metadata", data); err != nil { + return fmt.Errorf("failed to store metadata: %w", err) + } + + c.logger.Debug("Stored remote peer metadata", + zap.String("peer_id", peerID.String()[:8]+"..."), + zap.String("node_id", metadata.NodeID)) + + return nil +} diff --git a/pkg/rqlite/data_safety.go b/pkg/rqlite/data_safety.go new file mode 100644 index 0000000..de2ad54 --- /dev/null +++ b/pkg/rqlite/data_safety.go @@ -0,0 +1,109 @@ +package rqlite + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "go.uber.org/zap" +) + +// getRaftLogIndex returns the current Raft log index for this node +func (r *RQLiteManager) getRaftLogIndex() uint64 { + status, err := r.getRQLiteStatus() + if err != nil { + r.logger.Debug("Failed to get Raft log index", zap.Error(err)) + return 0 + } + + // Return the highest index we have + maxIndex := status.Store.Raft.LastLogIndex + if status.Store.Raft.AppliedIndex > maxIndex { + maxIndex = status.Store.Raft.AppliedIndex + } + if status.Store.Raft.CommitIndex > maxIndex { + maxIndex = status.Store.Raft.CommitIndex + } + + return maxIndex +} + +// getRQLiteStatus queries the /status endpoint for cluster information +func (r *RQLiteManager) getRQLiteStatus() (*RQLiteStatus, error) { + url := fmt.Sprintf("http://localhost:%d/status", r.config.RQLitePort) + client := &http.Client{Timeout: 5 * time.Second} + + resp, err := client.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to query status: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("status endpoint returned %d: %s", resp.StatusCode, string(body)) + } + + var status RQLiteStatus + if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { + return nil, fmt.Errorf("failed to decode status: %w", err) + } + + return &status, nil +} + +// getRQLiteNodes queries the /nodes endpoint for cluster membership +func (r *RQLiteManager) getRQLiteNodes() (RQLiteNodes, error) { + url := fmt.Sprintf("http://localhost:%d/nodes?ver=2", r.config.RQLitePort) + client := &http.Client{Timeout: 5 * time.Second} + + resp, err := client.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to query nodes: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("nodes endpoint returned %d: %s", resp.StatusCode, string(body)) + } + + var nodes RQLiteNodes + if err := json.NewDecoder(resp.Body).Decode(&nodes); err != nil { + return nil, fmt.Errorf("failed to decode nodes: %w", err) + } + + return nodes, nil +} + +// getRQLiteLeader returns the current leader address +func (r *RQLiteManager) getRQLiteLeader() (string, error) { + status, err := r.getRQLiteStatus() + if err != nil { + return "", err + } + + leaderAddr := status.Store.Raft.LeaderAddr + if leaderAddr == "" { + return "", fmt.Errorf("no leader found") + } + + return leaderAddr, nil +} + +// isNodeReachable tests if a specific node is responding +func (r *RQLiteManager) isNodeReachable(httpAddress string) bool { + url := fmt.Sprintf("http://%s/status", httpAddress) + client := &http.Client{Timeout: 3 * time.Second} + + resp, err := client.Get(url) + if err != nil { + return false + } + defer resp.Body.Close() + + return resp.StatusCode == http.StatusOK +} + diff --git a/pkg/rqlite/metrics.go b/pkg/rqlite/metrics.go new file mode 100644 index 0000000..de21c5f --- /dev/null +++ b/pkg/rqlite/metrics.go @@ -0,0 +1,74 @@ +package rqlite + +import ( + "time" +) + +// GetMetrics returns current cluster metrics +func (c *ClusterDiscoveryService) GetMetrics() *ClusterMetrics { + c.mu.RLock() + defer c.mu.RUnlock() + + activeCount := 0 + inactiveCount := 0 + totalHealth := 0.0 + currentLeader := "" + + now := time.Now() + + for nodeID, health := range c.peerHealth { + if health.Status == "active" { + activeCount++ + + // Calculate health score (0-100) based on last seen + timeSinceLastSeen := now.Sub(health.LastSeen) + healthScore := 100.0 + if timeSinceLastSeen > time.Minute { + // Degrade health score based on time since last seen + healthScore = 100.0 - (float64(timeSinceLastSeen.Seconds()) / float64(c.inactivityLimit.Seconds()) * 100.0) + if healthScore < 0 { + healthScore = 0 + } + } + totalHealth += healthScore + } else { + inactiveCount++ + } + + // Try to determine leader + if peer, ok := c.knownPeers[nodeID]; ok { + // We'd need to check the actual leader status from RQLite + // For now, bootstrap nodes are more likely to be leader + if peer.NodeType == "bootstrap" && currentLeader == "" { + currentLeader = nodeID + } + } + } + + averageHealth := 0.0 + if activeCount > 0 { + averageHealth = totalHealth / float64(activeCount) + } + + // Determine discovery status + discoveryStatus := "healthy" + if len(c.knownPeers) == 0 { + discoveryStatus = "no_peers" + } else if len(c.knownPeers) == 1 { + discoveryStatus = "single_node" + } else if averageHealth < 50 { + discoveryStatus = "degraded" + } + + return &ClusterMetrics{ + ClusterSize: len(c.knownPeers), + ActiveNodes: activeCount, + InactiveNodes: inactiveCount, + RemovedNodes: 0, // Could track this with a counter + LastUpdate: c.lastUpdate, + DiscoveryStatus: discoveryStatus, + CurrentLeader: currentLeader, + AveragePeerHealth: averageHealth, + } +} + diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index 3498bc2..e179ccd 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -20,12 +20,13 @@ import ( // RQLiteManager manages an RQLite node instance type RQLiteManager struct { - config *config.DatabaseConfig - discoverConfig *config.DiscoveryConfig - dataDir string - logger *zap.Logger - cmd *exec.Cmd - connection *gorqlite.Connection + config *config.DatabaseConfig + discoverConfig *config.DiscoveryConfig + dataDir string + logger *zap.Logger + cmd *exec.Cmd + connection *gorqlite.Connection + discoveryService *ClusterDiscoveryService } // waitForSQLAvailable waits until a simple query succeeds, indicating a leader is known and queries can be served. @@ -67,6 +68,11 @@ func NewRQLiteManager(cfg *config.DatabaseConfig, discoveryCfg *config.Discovery } } +// SetDiscoveryService sets the cluster discovery service for this RQLite manager +func (r *RQLiteManager) SetDiscoveryService(service *ClusterDiscoveryService) { + r.discoveryService = service +} + // Start starts the RQLite node func (r *RQLiteManager) Start(ctx context.Context) error { // Expand ~ in data directory path @@ -162,26 +168,65 @@ func (r *RQLiteManager) Start(ctx context.Context) error { } r.connection = conn - // Leadership/SQL readiness gating - // - // Fresh bootstrap (no join, no prior state): wait for leadership so queries will work. - // Existing state or joiners: wait for SQL availability (leader known) before proceeding, - // so higher layers (storage) don't fail with 500 leader-not-found. - if r.config.RQLiteJoinAddress == "" && !r.hasExistingState(rqliteDataDir) { - if err := r.waitForLeadership(ctx); err != nil { - if r.cmd != nil && r.cmd.Process != nil { - _ = r.cmd.Process.Kill() + // Leadership/SQL readiness gating with dynamic discovery support + if r.config.RQLiteJoinAddress == "" { + // Bootstrap node logic with data safety checks + r.logger.Info("Bootstrap node: checking if safe to lead") + + // SAFETY: Check if we can safely become leader + canLead, err := r.canSafelyBecomeLeader() + if !canLead && err != nil { + r.logger.Warn("Not safe to become leader, attempting to join existing cluster", + zap.Error(err)) + + // Find node with highest log index and join it + if r.discoveryService != nil { + targetNode := r.discoveryService.GetNodeWithHighestLogIndex() + if targetNode != nil { + r.logger.Info("Joining node with higher data", + zap.String("target_node", targetNode.NodeID), + zap.String("raft_address", targetNode.RaftAddress), + zap.Uint64("their_index", targetNode.RaftLogIndex)) + return r.joinExistingCluster(ctx, targetNode.RaftAddress) + } + } + } + + // Safe to lead - attempt leadership + leadershipErr := r.waitForLeadership(ctx) + if leadershipErr == nil { + r.logger.Info("Bootstrap node successfully established leadership") + } else { + // Leadership failed - check if peers.json from discovery exists + if r.discoveryService != nil && r.discoveryService.HasRecentPeersJSON() { + r.logger.Info("Retrying leadership after discovery update") + leadershipErr = r.waitForLeadership(ctx) + } + + // Final fallback: SQL availability + if leadershipErr != nil { + r.logger.Warn("Leadership failed, trying SQL availability") + sqlCtx := ctx + if _, hasDeadline := ctx.Deadline(); !hasDeadline { + var cancel context.CancelFunc + sqlCtx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + } + if err := r.waitForSQLAvailable(sqlCtx); err != nil { + if r.cmd != nil && r.cmd.Process != nil { + _ = r.cmd.Process.Kill() + } + return fmt.Errorf("RQLite SQL not available: %w", err) + } } - return fmt.Errorf("RQLite failed to establish leadership: %w", err) } } else { + // Joining node logic r.logger.Info("Waiting for RQLite SQL availability (leader discovery)") - // For joining nodes, wait longer for SQL availability sqlCtx := ctx if _, hasDeadline := ctx.Deadline(); !hasDeadline { - // If no deadline in context, create one for SQL availability check var cancel context.CancelFunc - sqlCtx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) + sqlCtx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() } if err := r.waitForSQLAvailable(sqlCtx); err != nil { @@ -380,3 +425,80 @@ func (r *RQLiteManager) testJoinAddress(joinAddress string) error { r.logger.Info("Leader HTTP reachable", zap.String("status_url", statusURL)) return nil } + +// canSafelyBecomeLeader checks if this node can safely become leader without causing data loss +func (r *RQLiteManager) canSafelyBecomeLeader() (bool, error) { + // Get our current Raft log index + ourLogIndex := r.getRaftLogIndex() + + // If no discovery service, assume it's safe (backward compatibility) + if r.discoveryService == nil { + r.logger.Debug("No discovery service, assuming safe to lead") + return true, nil + } + + // Query discovery service for other nodes + otherNodes := r.discoveryService.GetActivePeers() + + if len(otherNodes) == 0 { + // No other nodes - safe to bootstrap + r.logger.Debug("No other nodes discovered, safe to lead", + zap.Uint64("our_log_index", ourLogIndex)) + return true, nil + } + + // Check if any other node has higher log index + for _, peer := range otherNodes { + if peer.RaftLogIndex > ourLogIndex { + // Other node has more data - we should join them + return false, fmt.Errorf( + "node %s has higher log index (%d > %d), should join as follower", + peer.NodeID, peer.RaftLogIndex, ourLogIndex) + } + } + + // We have most recent data or equal - safe to lead + r.logger.Info("Safe to lead - we have most recent data", + zap.Uint64("our_log_index", ourLogIndex), + zap.Int("other_nodes_checked", len(otherNodes))) + return true, nil +} + +// joinExistingCluster attempts to join an existing cluster as a follower +func (r *RQLiteManager) joinExistingCluster(ctx context.Context, raftAddress string) error { + r.logger.Info("Attempting to join existing cluster", + zap.String("target_raft_address", raftAddress)) + + // Wait for the target to be reachable + if err := r.waitForJoinTarget(ctx, raftAddress, 2*time.Minute); err != nil { + return fmt.Errorf("join target not reachable: %w", err) + } + + // Wait for SQL availability (the target should have a leader) + sqlCtx := ctx + if _, hasDeadline := ctx.Deadline(); !hasDeadline { + var cancel context.CancelFunc + sqlCtx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + } + + if err := r.waitForSQLAvailable(sqlCtx); err != nil { + return fmt.Errorf("failed to join cluster - SQL not available: %w", err) + } + + r.logger.Info("Successfully joined existing cluster") + return nil +} + +// exponentialBackoff calculates exponential backoff duration with jitter +func (r *RQLiteManager) exponentialBackoff(attempt int, baseDelay time.Duration, maxDelay time.Duration) time.Duration { + // Calculate exponential backoff: baseDelay * 2^attempt + delay := baseDelay * time.Duration(1< maxDelay { + delay = maxDelay + } + + // Add jitter (±20%) + jitter := time.Duration(float64(delay) * 0.2 * (2.0*float64(time.Now().UnixNano()%100)/100.0 - 1.0)) + return delay + jitter +} diff --git a/pkg/rqlite/types.go b/pkg/rqlite/types.go new file mode 100644 index 0000000..d817e74 --- /dev/null +++ b/pkg/rqlite/types.go @@ -0,0 +1,71 @@ +package rqlite + +import "time" + +// RQLiteStatus represents the response from RQLite's /status endpoint +type RQLiteStatus struct { + Store struct { + Raft struct { + AppliedIndex uint64 `json:"applied_index"` + CommitIndex uint64 `json:"commit_index"` + LastLogIndex uint64 `json:"last_log_index"` + LastSnapshotIndex uint64 `json:"last_snapshot_index"` + State string `json:"state"` + LeaderID string `json:"leader_id"` + LeaderAddr string `json:"leader_addr"` + } `json:"raft"` + DBConf struct { + DSN string `json:"dsn"` + Memory bool `json:"memory"` + } `json:"db_conf"` + } `json:"store"` + Runtime struct { + GOARCH string `json:"GOARCH"` + GOOS string `json:"GOOS"` + GOMAXPROCS int `json:"GOMAXPROCS"` + NumCPU int `json:"num_cpu"` + NumGoroutine int `json:"num_goroutine"` + Version string `json:"version"` + } `json:"runtime"` + HTTP struct { + Addr string `json:"addr"` + Auth string `json:"auth"` + } `json:"http"` + Node struct { + Uptime string `json:"uptime"` + StartTime string `json:"start_time"` + } `json:"node"` +} + +// RQLiteNode represents a node in the RQLite cluster +type RQLiteNode struct { + ID string `json:"id"` + Address string `json:"address"` + Leader bool `json:"leader"` + Voter bool `json:"voter"` + Reachable bool `json:"reachable"` +} + +// RQLiteNodes represents the response from RQLite's /nodes endpoint +type RQLiteNodes []RQLiteNode + +// PeerHealth tracks the health status of a peer +type PeerHealth struct { + LastSeen time.Time + LastSuccessful time.Time + FailureCount int + Status string // "active", "degraded", "inactive" +} + +// ClusterMetrics contains cluster-wide metrics +type ClusterMetrics struct { + ClusterSize int + ActiveNodes int + InactiveNodes int + RemovedNodes int + LastUpdate time.Time + DiscoveryStatus string + CurrentLeader string + AveragePeerHealth float64 +} + From 8f82dc7ca3d530e2870aca6b4d774c2872c95e99 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 31 Oct 2025 11:41:20 +0200 Subject: [PATCH 10/13] feat: enhance RQLite cluster discovery and recovery mechanisms - Introduced TriggerPeerExchange method to facilitate manual peer exchange for RQLite metadata. - Implemented performPreStartClusterDiscovery to ensure coordinated recovery by building peers.json before RQLite startup. - Added validation for node ID consistency with raft address during RQLite startup. - Enhanced logging for cluster recovery processes and peer discovery progress. - Updated cluster synchronization logic to improve reliability during node recovery scenarios. --- pkg/discovery/discovery.go | 36 +++- pkg/node/node.go | 19 +- pkg/rqlite/cluster_discovery.go | 62 +++++-- pkg/rqlite/rqlite.go | 305 ++++++++++++++++++++++++++++++-- 4 files changed, 383 insertions(+), 39 deletions(-) diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index 19b5887..6d9470b 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -24,8 +24,8 @@ type PeerExchangeRequest struct { // PeerExchangeResponse represents a list of peers to exchange type PeerExchangeResponse struct { - Peers []PeerInfo `json:"peers"` - RQLiteMetadata *RQLiteNodeMetadata `json:"rqlite_metadata,omitempty"` + Peers []PeerInfo `json:"peers"` + RQLiteMetadata *RQLiteNodeMetadata `json:"rqlite_metadata,omitempty"` } // PeerInfo contains peer identity and addresses @@ -389,6 +389,38 @@ func (d *Manager) requestPeersFromPeer(ctx context.Context, peerID peer.ID, limi return resp.Peers } +// TriggerPeerExchange manually triggers peer exchange with all connected peers +// This is useful for pre-startup cluster discovery to populate the peerstore with RQLite metadata +func (d *Manager) TriggerPeerExchange(ctx context.Context) int { + connectedPeers := d.host.Network().Peers() + if len(connectedPeers) == 0 { + d.logger.Debug("No connected peers for peer exchange") + return 0 + } + + d.logger.Info("Manually triggering peer exchange", + zap.Int("connected_peers", len(connectedPeers))) + + metadataCollected := 0 + for _, peerID := range connectedPeers { + // Request peer list from this peer (which includes their RQLite metadata) + _ = d.requestPeersFromPeer(ctx, peerID, 50) // Request up to 50 peers + + // Check if we got RQLite metadata from this peer + if val, err := d.host.Peerstore().Get(peerID, "rqlite_metadata"); err == nil { + if _, ok := val.([]byte); ok { + metadataCollected++ + } + } + } + + d.logger.Info("Peer exchange completed", + zap.Int("peers_with_metadata", metadataCollected), + zap.Int("total_peers", len(connectedPeers))) + + return metadataCollected +} + // connectToPeer attempts to connect to a specific peer using its peerstore info. func (d *Manager) connectToPeer(ctx context.Context, peerID peer.ID) error { peerInfo := d.host.Peerstore().PeerInfo(peerID) diff --git a/pkg/node/node.go b/pkg/node/node.go index 852d768..7aa7824 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -89,25 +89,34 @@ func (n *Node) startRQLite(ctx context.Context) error { n.logger.Logger, ) - // Set discovery service on RQLite manager + // Set discovery service on RQLite manager BEFORE starting RQLite + // This is critical for pre-start cluster discovery during recovery n.rqliteManager.SetDiscoveryService(n.clusterDiscovery) - // Start cluster discovery + // Start cluster discovery (but don't trigger initial sync yet) if err := n.clusterDiscovery.Start(ctx); err != nil { return fmt.Errorf("failed to start cluster discovery: %w", err) } - // Update our own metadata + // Publish initial metadata (with log_index=0) so peers can discover us during recovery + // The metadata will be updated with actual log index after RQLite starts n.clusterDiscovery.UpdateOwnMetadata() - n.logger.Info("Cluster discovery service started") + n.logger.Info("Cluster discovery service started (waiting for RQLite)") } - // Start RQLite + // Start RQLite FIRST before updating metadata if err := n.rqliteManager.Start(ctx); err != nil { return err } + // NOW update metadata after RQLite is running + if n.clusterDiscovery != nil { + n.clusterDiscovery.UpdateOwnMetadata() + n.clusterDiscovery.TriggerSync() // Do initial cluster sync now that RQLite is ready + n.logger.Info("RQLite metadata published and cluster synced") + } + // Create adapter for sql.DB compatibility adapter, err := database.NewRQLiteAdapter(n.rqliteManager) if err != nil { diff --git a/pkg/rqlite/cluster_discovery.go b/pkg/rqlite/cluster_discovery.go index e386beb..7585cbf 100644 --- a/pkg/rqlite/cluster_discovery.go +++ b/pkg/rqlite/cluster_discovery.go @@ -120,16 +120,12 @@ func (c *ClusterDiscoveryService) Stop() { // periodicSync runs periodic cluster membership synchronization func (c *ClusterDiscoveryService) periodicSync(ctx context.Context) { - c.logger.Info("periodicSync goroutine started, doing initial sync immediately") + c.logger.Info("periodicSync goroutine started, waiting for RQLite readiness") ticker := time.NewTicker(c.updateInterval) defer ticker.Stop() - // Do initial sync immediately - c.logger.Info("Running initial cluster membership sync") - c.updateClusterMembership() - c.logger.Info("Initial cluster membership sync completed") - + // Wait for first ticker interval before syncing (RQLite needs time to start) for { select { case <-ctx.Done(): @@ -167,7 +163,7 @@ func (c *ClusterDiscoveryService) collectPeerMetadata() []*discovery.RQLiteNodeM // Add ourselves ourMetadata := &discovery.RQLiteNodeMetadata{ - NodeID: c.nodeID, + NodeID: c.raftAddress, // RQLite uses raft address as node ID RaftAddress: c.raftAddress, HTTPAddress: c.httpAddress, NodeType: c.nodeType, @@ -332,7 +328,7 @@ func (c *ClusterDiscoveryService) getPeersJSONUnlocked() []map[string]interface{ for _, peer := range c.knownPeers { peerEntry := map[string]interface{}{ - "id": peer.NodeID, + "id": peer.RaftAddress, // RQLite uses raft address as node ID "address": peer.RaftAddress, "non_voter": false, } @@ -446,8 +442,8 @@ func (c *ClusterDiscoveryService) GetActivePeers() []*discovery.RQLiteNodeMetada peers := make([]*discovery.RQLiteNodeMetadata, 0, len(c.knownPeers)) for _, peer := range c.knownPeers { - // Skip self - if peer.NodeID == c.nodeID { + // Skip self (compare by raft address since that's the NodeID now) + if peer.NodeID == c.raftAddress { continue } peers = append(peers, peer) @@ -456,6 +452,19 @@ func (c *ClusterDiscoveryService) GetActivePeers() []*discovery.RQLiteNodeMetada return peers } +// GetAllPeers returns a list of all known peers (including self) +func (c *ClusterDiscoveryService) GetAllPeers() []*discovery.RQLiteNodeMetadata { + c.mu.RLock() + defer c.mu.RUnlock() + + peers := make([]*discovery.RQLiteNodeMetadata, 0, len(c.knownPeers)) + for _, peer := range c.knownPeers { + peers = append(peers, peer) + } + + return peers +} + // GetNodeWithHighestLogIndex returns the node with the highest Raft log index func (c *ClusterDiscoveryService) GetNodeWithHighestLogIndex() *discovery.RQLiteNodeMetadata { c.mu.RLock() @@ -465,8 +474,8 @@ func (c *ClusterDiscoveryService) GetNodeWithHighestLogIndex() *discovery.RQLite var maxIndex uint64 = 0 for _, peer := range c.knownPeers { - // Skip self - if peer.NodeID == c.nodeID { + // Skip self (compare by raft address since that's the NodeID now) + if peer.NodeID == c.raftAddress { continue } @@ -535,13 +544,40 @@ func (c *ClusterDiscoveryService) WaitForDiscoverySettling(ctx context.Context) zap.Int("peer_count", peerCount)) } +// TriggerSync manually triggers a cluster membership sync +func (c *ClusterDiscoveryService) TriggerSync() { + c.logger.Info("Manually triggering cluster membership sync") + + // For bootstrap nodes, wait a bit for peer discovery to stabilize + if c.nodeType == "bootstrap" { + c.logger.Info("Bootstrap node: waiting for peer discovery to complete") + time.Sleep(5 * time.Second) + } + + c.updateClusterMembership() +} + +// TriggerPeerExchange actively exchanges peer information with connected peers +// This populates the peerstore with RQLite metadata from other nodes +func (c *ClusterDiscoveryService) TriggerPeerExchange(ctx context.Context) error { + if c.discoveryMgr == nil { + return fmt.Errorf("discovery manager not available") + } + + c.logger.Info("Triggering peer exchange via discovery manager") + collected := c.discoveryMgr.TriggerPeerExchange(ctx) + c.logger.Info("Peer exchange completed", zap.Int("peers_with_metadata", collected)) + + return nil +} + // UpdateOwnMetadata updates our own RQLite metadata in the peerstore func (c *ClusterDiscoveryService) UpdateOwnMetadata() { c.logger.Info("Updating own RQLite metadata for peer exchange", zap.String("node_id", c.nodeID)) metadata := &discovery.RQLiteNodeMetadata{ - NodeID: c.nodeID, + NodeID: c.raftAddress, // RQLite uses raft address as node ID RaftAddress: c.raftAddress, HTTPAddress: c.httpAddress, NodeType: c.nodeType, diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index e179ccd..d491fa0 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -95,6 +95,17 @@ func (r *RQLiteManager) Start(ctx context.Context) error { return fmt.Errorf("discovery config HttpAdvAddress is empty") } + // CRITICAL: Check if we need to do pre-start cluster discovery to build peers.json + // This handles the case where nodes have old cluster state and need coordinated recovery + if needsClusterRecovery, err := r.checkNeedsClusterRecovery(rqliteDataDir); err != nil { + return fmt.Errorf("failed to check cluster recovery status: %w", err) + } else if needsClusterRecovery { + r.logger.Info("Detected old cluster state requiring coordinated recovery") + if err := r.performPreStartClusterDiscovery(ctx, rqliteDataDir); err != nil { + return fmt.Errorf("pre-start cluster discovery failed: %w", err) + } + } + // Build RQLite command args := []string{ "-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort), @@ -123,7 +134,8 @@ func (r *RQLiteManager) Start(ctx context.Context) error { } // Always add the join parameter in host:port form - let rqlited handle the rest - args = append(args, "-join", joinArg) + // Add retry parameters to handle slow cluster startup (e.g., during recovery) + args = append(args, "-join", joinArg, "-join-attempts", "30", "-join-interval", "10s") } else { r.logger.Info("No join address specified - starting as new cluster") } @@ -168,17 +180,23 @@ func (r *RQLiteManager) Start(ctx context.Context) error { } r.connection = conn + // Sanity check: verify rqlite's node ID matches our configured raft address + if err := r.validateNodeID(); err != nil { + r.logger.Warn("Node ID validation failed", zap.Error(err)) + // Don't fail startup, but log the mismatch for debugging + } + // Leadership/SQL readiness gating with dynamic discovery support if r.config.RQLiteJoinAddress == "" { // Bootstrap node logic with data safety checks r.logger.Info("Bootstrap node: checking if safe to lead") - + // SAFETY: Check if we can safely become leader canLead, err := r.canSafelyBecomeLeader() if !canLead && err != nil { r.logger.Warn("Not safe to become leader, attempting to join existing cluster", zap.Error(err)) - + // Find node with highest log index and join it if r.discoveryService != nil { targetNode := r.discoveryService.GetNodeWithHighestLogIndex() @@ -191,18 +209,34 @@ func (r *RQLiteManager) Start(ctx context.Context) error { } } } - + // Safe to lead - attempt leadership leadershipErr := r.waitForLeadership(ctx) if leadershipErr == nil { r.logger.Info("Bootstrap node successfully established leadership") } else { - // Leadership failed - check if peers.json from discovery exists - if r.discoveryService != nil && r.discoveryService.HasRecentPeersJSON() { - r.logger.Info("Retrying leadership after discovery update") - leadershipErr = r.waitForLeadership(ctx) + r.logger.Warn("Initial leadership attempt failed, may need cluster recovery", + zap.Error(leadershipErr)) + + // Try recovery if we have peers.json from discovery + if r.discoveryService != nil { + peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json") + if _, err := os.Stat(peersPath); err == nil { + r.logger.Info("Attempting cluster recovery using peers.json", + zap.String("peers_file", peersPath)) + + if recoveryErr := r.recoverCluster(peersPath); recoveryErr == nil { + r.logger.Info("Cluster recovery successful, retrying leadership") + leadershipErr = r.waitForLeadership(ctx) + if leadershipErr == nil { + r.logger.Info("Bootstrap node established leadership after recovery") + } + } else { + r.logger.Warn("Cluster recovery failed", zap.Error(recoveryErr)) + } + } } - + // Final fallback: SQL availability if leadershipErr != nil { r.logger.Warn("Leadership failed, trying SQL availability") @@ -430,23 +464,23 @@ func (r *RQLiteManager) testJoinAddress(joinAddress string) error { func (r *RQLiteManager) canSafelyBecomeLeader() (bool, error) { // Get our current Raft log index ourLogIndex := r.getRaftLogIndex() - + // If no discovery service, assume it's safe (backward compatibility) if r.discoveryService == nil { r.logger.Debug("No discovery service, assuming safe to lead") return true, nil } - + // Query discovery service for other nodes otherNodes := r.discoveryService.GetActivePeers() - + if len(otherNodes) == 0 { // No other nodes - safe to bootstrap r.logger.Debug("No other nodes discovered, safe to lead", zap.Uint64("our_log_index", ourLogIndex)) return true, nil } - + // Check if any other node has higher log index for _, peer := range otherNodes { if peer.RaftLogIndex > ourLogIndex { @@ -456,7 +490,7 @@ func (r *RQLiteManager) canSafelyBecomeLeader() (bool, error) { peer.NodeID, peer.RaftLogIndex, ourLogIndex) } } - + // We have most recent data or equal - safe to lead r.logger.Info("Safe to lead - we have most recent data", zap.Uint64("our_log_index", ourLogIndex), @@ -468,12 +502,12 @@ func (r *RQLiteManager) canSafelyBecomeLeader() (bool, error) { func (r *RQLiteManager) joinExistingCluster(ctx context.Context, raftAddress string) error { r.logger.Info("Attempting to join existing cluster", zap.String("target_raft_address", raftAddress)) - + // Wait for the target to be reachable if err := r.waitForJoinTarget(ctx, raftAddress, 2*time.Minute); err != nil { return fmt.Errorf("join target not reachable: %w", err) } - + // Wait for SQL availability (the target should have a leader) sqlCtx := ctx if _, hasDeadline := ctx.Deadline(); !hasDeadline { @@ -481,11 +515,11 @@ func (r *RQLiteManager) joinExistingCluster(ctx context.Context, raftAddress str sqlCtx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() } - + if err := r.waitForSQLAvailable(sqlCtx); err != nil { return fmt.Errorf("failed to join cluster - SQL not available: %w", err) } - + r.logger.Info("Successfully joined existing cluster") return nil } @@ -497,8 +531,241 @@ func (r *RQLiteManager) exponentialBackoff(attempt int, baseDelay time.Duration, if delay > maxDelay { delay = maxDelay } - + // Add jitter (±20%) jitter := time.Duration(float64(delay) * 0.2 * (2.0*float64(time.Now().UnixNano()%100)/100.0 - 1.0)) return delay + jitter } + +// recoverCluster restarts RQLite using the recovery.db created from peers.json +func (r *RQLiteManager) recoverCluster(peersJSONPath string) error { + r.logger.Info("Initiating cluster recovery by restarting RQLite", + zap.String("peers_file", peersJSONPath)) + + // Stop the current RQLite process + r.logger.Info("Stopping RQLite for recovery") + if err := r.Stop(); err != nil { + r.logger.Warn("Error stopping RQLite", zap.Error(err)) + } + + // Wait for process to fully stop + time.Sleep(2 * time.Second) + + // Restart RQLite - it will automatically detect peers.json and perform recovery + r.logger.Info("Restarting RQLite (will auto-recover using peers.json)") + + // Build the same args as original Start() - expand ~ in data directory + dataDir := os.ExpandEnv(r.dataDir) + if strings.HasPrefix(dataDir, "~") { + home, err := os.UserHomeDir() + if err != nil { + return fmt.Errorf("failed to determine home directory: %w", err) + } + dataDir = filepath.Join(home, dataDir[1:]) + } + rqliteDataDir := filepath.Join(dataDir, "rqlite") + args := []string{ + "-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort), + "-http-adv-addr", r.discoverConfig.HttpAdvAddress, + "-raft-adv-addr", r.discoverConfig.RaftAdvAddress, + "-raft-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLiteRaftPort), + rqliteDataDir, + } + + // Restart RQLite + r.cmd = exec.Command("rqlited", args...) + r.cmd.Stdout = os.Stdout + r.cmd.Stderr = os.Stderr + + if err := r.cmd.Start(); err != nil { + return fmt.Errorf("failed to restart RQLite: %w", err) + } + + r.logger.Info("RQLite restarted, waiting for it to become ready") + time.Sleep(3 * time.Second) + + // Recreate connection + conn, err := gorqlite.Open(fmt.Sprintf("http://localhost:%d", r.config.RQLitePort)) + if err != nil { + return fmt.Errorf("failed to reconnect to RQLite: %w", err) + } + r.connection = conn + + r.logger.Info("Cluster recovery completed, RQLite restarted with new configuration") + return nil +} + +// checkNeedsClusterRecovery checks if the node has old cluster state that requires coordinated recovery +// Returns true if there are snapshots but the raft log is empty (typical after a crash/restart) +func (r *RQLiteManager) checkNeedsClusterRecovery(rqliteDataDir string) (bool, error) { + // Check for snapshots directory + snapshotsDir := filepath.Join(rqliteDataDir, "rsnapshots") + if _, err := os.Stat(snapshotsDir); os.IsNotExist(err) { + // No snapshots = fresh start, no recovery needed + return false, nil + } + + // Check if snapshots directory has any snapshots + entries, err := os.ReadDir(snapshotsDir) + if err != nil { + return false, fmt.Errorf("failed to read snapshots directory: %w", err) + } + + hasSnapshots := false + for _, entry := range entries { + if entry.IsDir() || strings.HasSuffix(entry.Name(), ".db") { + hasSnapshots = true + break + } + } + + if !hasSnapshots { + // No snapshots = fresh start + return false, nil + } + + // Check raft log size - if it's the default empty size, we need recovery + raftLogPath := filepath.Join(rqliteDataDir, "raft.db") + if info, err := os.Stat(raftLogPath); err == nil { + // Empty or default-sized log with snapshots means we need coordinated recovery + if info.Size() <= 8*1024*1024 { // <= 8MB (default empty log size) + r.logger.Info("Detected cluster recovery situation: snapshots exist but raft log is empty/default size", + zap.String("snapshots_dir", snapshotsDir), + zap.Int64("raft_log_size", info.Size())) + return true, nil + } + } + + return false, nil +} + +// performPreStartClusterDiscovery waits for peer discovery and builds a complete peers.json +// before starting RQLite. This ensures all nodes use the same cluster membership for recovery. +func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rqliteDataDir string) error { + if r.discoveryService == nil { + r.logger.Warn("No discovery service available, cannot perform pre-start cluster discovery") + return fmt.Errorf("discovery service not available") + } + + r.logger.Info("Waiting for peer discovery to find other cluster members...") + + // CRITICAL: First, actively trigger peer exchange to populate peerstore with RQLite metadata + // The peerstore needs RQLite metadata from other nodes BEFORE we can collect it + r.logger.Info("Triggering peer exchange to collect RQLite metadata from connected peers") + if err := r.discoveryService.TriggerPeerExchange(ctx); err != nil { + r.logger.Warn("Peer exchange failed, continuing anyway", zap.Error(err)) + } + + // Give peer exchange a moment to complete + time.Sleep(1 * time.Second) + + // Now trigger cluster membership sync to populate knownPeers map from the peerstore + r.logger.Info("Triggering initial cluster membership sync to populate peer list") + r.discoveryService.TriggerSync() + + // Give the sync a moment to complete + time.Sleep(2 * time.Second) + + // Wait for peer discovery - give it time to find peers (30 seconds should be enough) + discoveryDeadline := time.Now().Add(30 * time.Second) + var discoveredPeers int + + for time.Now().Before(discoveryDeadline) { + // Check how many peers with RQLite metadata we've discovered + allPeers := r.discoveryService.GetAllPeers() + discoveredPeers = len(allPeers) + + r.logger.Info("Peer discovery progress", + zap.Int("discovered_peers", discoveredPeers), + zap.Duration("time_remaining", time.Until(discoveryDeadline))) + + // If we have at least our minimum cluster size, proceed + if discoveredPeers >= r.config.MinClusterSize { + r.logger.Info("Found minimum cluster size peers, proceeding with recovery", + zap.Int("discovered_peers", discoveredPeers), + zap.Int("min_cluster_size", r.config.MinClusterSize)) + break + } + + // Wait a bit before checking again + time.Sleep(2 * time.Second) + } + + if discoveredPeers == 0 { + r.logger.Warn("No peers discovered during pre-start discovery window, will attempt solo recovery") + // Continue anyway - might be the only node left + } + + // Trigger final sync to ensure peers.json is up to date with latest discovered peers + r.logger.Info("Triggering final cluster membership sync to build complete peers.json") + r.discoveryService.TriggerSync() + + // Wait a moment for the sync to complete + time.Sleep(2 * time.Second) + + // Verify peers.json was created + peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json") + if _, err := os.Stat(peersPath); err != nil { + return fmt.Errorf("peers.json was not created after discovery: %w", err) + } + + r.logger.Info("Pre-start cluster discovery completed successfully", + zap.String("peers_file", peersPath), + zap.Int("peer_count", discoveredPeers)) + + return nil +} + +// validateNodeID checks that rqlite's reported node ID matches our configured raft address +func (r *RQLiteManager) validateNodeID() error { + // Query /nodes endpoint to get our node ID + // Retry a few times as the endpoint might not be ready immediately + for i := 0; i < 5; i++ { + nodes, err := r.getRQLiteNodes() + if err != nil { + // If endpoint is not ready yet, wait and retry + if i < 4 { + time.Sleep(500 * time.Millisecond) + continue + } + return fmt.Errorf("failed to query nodes endpoint after retries: %w", err) + } + + expectedID := r.discoverConfig.RaftAdvAddress + if expectedID == "" { + return fmt.Errorf("raft_adv_address not configured") + } + + // Find our node in the cluster (match by address) + for _, node := range nodes { + if node.Address == expectedID { + if node.ID != expectedID { + r.logger.Error("CRITICAL: RQLite node ID mismatch", + zap.String("configured_raft_address", expectedID), + zap.String("rqlite_node_id", node.ID), + zap.String("rqlite_node_address", node.Address), + zap.String("explanation", "peers.json id field must match rqlite's node ID (raft address)")) + return fmt.Errorf("node ID mismatch: configured %s but rqlite reports %s", expectedID, node.ID) + } + r.logger.Info("Node ID validation passed", + zap.String("node_id", node.ID), + zap.String("address", node.Address)) + return nil + } + } + + // If cluster is still forming, nodes list might be empty - that's okay + if len(nodes) == 0 { + r.logger.Debug("Cluster membership not yet available, skipping validation") + return nil + } + + // If we can't find ourselves but other nodes exist, log a warning + r.logger.Warn("Could not find our node in cluster membership", + zap.String("expected_address", expectedID), + zap.Int("nodes_in_cluster", len(nodes))) + return nil + } + + return nil +} From 2aead480459aad3ff9b3feed38e8352e4dec0334 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 31 Oct 2025 11:53:50 +0200 Subject: [PATCH 11/13] feat: enhance RQLite manager with improved logging and data directory management - Added structured logging for RQLite components, including cluster discovery and leadership processes. - Introduced methods for preparing the data directory and launching the RQLite process, improving code organization. - Implemented exponential backoff for leadership checks to reduce log noise and improve reliability. - Enhanced peer health tracking and membership update logic to streamline cluster synchronization and recovery. --- CHANGELOG.md | 17 +++ Makefile | 2 +- pkg/discovery/discovery.go | 27 ++-- pkg/rqlite/cluster_discovery.go | 178 ++++++++++++++------------- pkg/rqlite/data_safety.go | 45 ++++--- pkg/rqlite/rqlite.go | 211 ++++++++++++++++++++------------ 6 files changed, 293 insertions(+), 187 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index da5b7ae..bc44467 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,23 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant ### Fixed +## [0.53.0] - 2025-10-31 + +### Added + +- Discovery manager now tracks failed peer-exchange attempts to suppress repeated warnings while peers negotiate supported protocols. + +### Changed + +- Scoped logging throughout `cluster_discovery`, `rqlite`, and `discovery` packages so logs carry component tags and keep verbose output at debug level. +- Refactored `ClusterDiscoveryService` membership handling: metadata updates happen under lock, `peers.json` is written outside the lock, self-health is skipped, and change detection is centralized in `computeMembershipChangesLocked`. +- Reworked `RQLiteManager.Start` into helper functions (`prepareDataDir`, `launchProcess`, `waitForReadyAndConnect`, `establishLeadershipOrJoin`) with clearer logging, better error handling, and exponential backoff while waiting for leadership. +- `validateNodeID` now treats empty membership results as transitional states, logging at debug level instead of warning to avoid noisy startups. + +### Fixed + +- Eliminated spurious `peers.json` churn and node-ID mismatch warnings during cluster formation by aligning IDs with raft addresses and tightening discovery logging. + ## [0.52.15] ### Added diff --git a/Makefile b/Makefile index 5240288..9e9bf5d 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.52.21 +VERSION := 0.53.0 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index 6d9470b..d022828 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -39,9 +39,10 @@ type PeerInfo struct { // interface{} to remain source-compatible with previous call sites that // passed a DHT instance. The value is ignored. type Manager struct { - host host.Host - logger *zap.Logger - cancel context.CancelFunc + host host.Host + logger *zap.Logger + cancel context.CancelFunc + failedPeerExchanges map[peer.ID]time.Time // Track failed peer exchange attempts to suppress repeated warnings } // Config contains discovery configuration @@ -56,8 +57,10 @@ type Config struct { // previously passed a DHT instance can continue to do so; the value is ignored. func NewManager(h host.Host, _ interface{}, logger *zap.Logger) *Manager { return &Manager{ - host: h, - logger: logger, + host: h, + logger: logger.With(zap.String("component", "peer-discovery")), + cancel: nil, + failedPeerExchanges: make(map[peer.ID]time.Time), } } @@ -344,13 +347,21 @@ func (d *Manager) requestPeersFromPeer(ctx context.Context, peerID peer.ID, limi // Open a stream to the peer stream, err := d.host.NewStream(ctx, peerID, PeerExchangeProtocol) if err != nil { - d.logger.Debug("Failed to open peer exchange stream", - zap.String("peer_id", peerID.String()[:8]+"..."), - zap.Error(err)) + // Suppress repeated warnings for the same peer (log once per minute max) + lastFailure, seen := d.failedPeerExchanges[peerID] + if !seen || time.Since(lastFailure) > time.Minute { + d.logger.Debug("Failed to open peer exchange stream", + zap.String("peer_id", peerID.String()[:8]+"..."), + zap.Error(err)) + d.failedPeerExchanges[peerID] = time.Now() + } return nil } defer stream.Close() + // Clear failure tracking on success + delete(d.failedPeerExchanges, peerID) + // Send request req := PeerExchangeRequest{Limit: limit} encoder := json.NewEncoder(stream) diff --git a/pkg/rqlite/cluster_discovery.go b/pkg/rqlite/cluster_discovery.go index 7585cbf..3e1b46b 100644 --- a/pkg/rqlite/cluster_discovery.go +++ b/pkg/rqlite/cluster_discovery.go @@ -64,7 +64,7 @@ func NewClusterDiscoveryService( peerHealth: make(map[string]*PeerHealth), updateInterval: 30 * time.Second, inactivityLimit: 24 * time.Hour, - logger: logger, + logger: logger.With(zap.String("component", "cluster-discovery")), } } @@ -82,9 +82,8 @@ func (c *ClusterDiscoveryService) Start(ctx context.Context) error { c.cancel = cancel c.logger.Info("Starting cluster discovery service", - zap.String("node_id", c.nodeID), - zap.String("node_type", c.nodeType), zap.String("raft_address", c.raftAddress), + zap.String("node_type", c.nodeType), zap.String("http_address", c.httpAddress), zap.String("data_dir", c.dataDir), zap.Duration("update_interval", c.updateInterval), @@ -120,7 +119,7 @@ func (c *ClusterDiscoveryService) Stop() { // periodicSync runs periodic cluster membership synchronization func (c *ClusterDiscoveryService) periodicSync(ctx context.Context) { - c.logger.Info("periodicSync goroutine started, waiting for RQLite readiness") + c.logger.Debug("periodicSync goroutine started, waiting for RQLite readiness") ticker := time.NewTicker(c.updateInterval) defer ticker.Stop() @@ -129,10 +128,9 @@ func (c *ClusterDiscoveryService) periodicSync(ctx context.Context) { for { select { case <-ctx.Done(): - c.logger.Info("periodicSync goroutine stopping") + c.logger.Debug("periodicSync goroutine stopping") return case <-ticker.C: - c.logger.Debug("Running periodic cluster membership sync") c.updateClusterMembership() } } @@ -193,6 +191,14 @@ func (c *ClusterDiscoveryService) collectPeerMetadata() []*discovery.RQLiteNodeM return metadata } +// membershipUpdateResult contains the result of a membership update operation +type membershipUpdateResult struct { + peersJSON []map[string]interface{} + added []string + updated []string + changed bool +} + // updateClusterMembership updates the cluster membership based on discovered peers func (c *ClusterDiscoveryService) updateClusterMembership() { metadata := c.collectPeerMetadata() @@ -200,15 +206,58 @@ func (c *ClusterDiscoveryService) updateClusterMembership() { c.logger.Debug("Collected peer metadata", zap.Int("metadata_count", len(metadata))) + // Compute membership changes while holding lock c.mu.Lock() - defer c.mu.Unlock() + result := c.computeMembershipChangesLocked(metadata) + c.mu.Unlock() + // Perform file I/O outside the lock + if result.changed { + // Log state changes (peer added/removed) at Info level + if len(result.added) > 0 || len(result.updated) > 0 { + c.logger.Info("Cluster membership changed", + zap.Int("added", len(result.added)), + zap.Int("updated", len(result.updated)), + zap.Strings("added_ids", result.added), + zap.Strings("updated_ids", result.updated)) + } + + // Write peers.json without holding lock + if err := c.writePeersJSONWithData(result.peersJSON); err != nil { + c.logger.Error("CRITICAL: Failed to write peers.json", + zap.Error(err), + zap.String("data_dir", c.dataDir), + zap.Int("peer_count", len(result.peersJSON))) + } else { + c.logger.Debug("peers.json updated", + zap.Int("peer_count", len(result.peersJSON))) + } + + // Update lastUpdate timestamp + c.mu.Lock() + c.lastUpdate = time.Now() + c.mu.Unlock() + } else { + c.mu.RLock() + totalPeers := len(c.knownPeers) + c.mu.RUnlock() + c.logger.Debug("No changes to cluster membership", + zap.Int("total_peers", totalPeers)) + } +} + +// computeMembershipChangesLocked computes membership changes and returns snapshot data +// Must be called with lock held +func (c *ClusterDiscoveryService) computeMembershipChangesLocked(metadata []*discovery.RQLiteNodeMetadata) membershipUpdateResult { // Track changes added := []string{} updated := []string{} - // Update known peers + // Update known peers, but skip self for health tracking for _, meta := range metadata { + // Skip self-metadata for health tracking (we only track remote peers) + isSelf := meta.NodeID == c.raftAddress + if existing, ok := c.knownPeers[meta.NodeID]; ok { // Update existing peer if existing.RaftLogIndex != meta.RaftLogIndex || @@ -228,55 +277,45 @@ func (c *ClusterDiscoveryService) updateClusterMembership() { c.knownPeers[meta.NodeID] = meta - // Update health tracking - if _, ok := c.peerHealth[meta.NodeID]; !ok { - c.peerHealth[meta.NodeID] = &PeerHealth{ - LastSeen: time.Now(), - LastSuccessful: time.Now(), - Status: "active", + // Update health tracking only for remote peers + if !isSelf { + if _, ok := c.peerHealth[meta.NodeID]; !ok { + c.peerHealth[meta.NodeID] = &PeerHealth{ + LastSeen: time.Now(), + LastSuccessful: time.Now(), + Status: "active", + } + } else { + c.peerHealth[meta.NodeID].LastSeen = time.Now() + c.peerHealth[meta.NodeID].Status = "active" + c.peerHealth[meta.NodeID].FailureCount = 0 } - } else { - c.peerHealth[meta.NodeID].LastSeen = time.Now() - c.peerHealth[meta.NodeID].Status = "active" - c.peerHealth[meta.NodeID].FailureCount = 0 } } - // Generate and write peers.json if there are changes OR if this is the first time + // Determine if we should write peers.json shouldWrite := len(added) > 0 || len(updated) > 0 || c.lastUpdate.IsZero() if shouldWrite { - c.logger.Info("Updating peers.json", - zap.Int("added", len(added)), - zap.Int("updated", len(updated)), - zap.Int("total_peers", len(c.knownPeers)), - zap.Bool("first_run", c.lastUpdate.IsZero())) - - // Get peers JSON while holding the lock - peers := c.getPeersJSONUnlocked() - - // Release lock before file I/O - c.mu.Unlock() - - // Write without holding lock - if err := c.writePeersJSONWithData(peers); err != nil { - c.logger.Error("CRITICAL: Failed to write peers.json", - zap.Error(err), - zap.String("data_dir", c.dataDir), - zap.Int("peer_count", len(peers))) - } else { - c.logger.Info("Successfully wrote peers.json", - zap.Int("peer_count", len(peers))) + // Log initial sync if this is the first time + if c.lastUpdate.IsZero() { + c.logger.Info("Initial cluster membership sync", + zap.Int("total_peers", len(c.knownPeers))) } - // Re-acquire lock to update lastUpdate - c.mu.Lock() - } else { - c.logger.Debug("No changes to cluster membership", - zap.Int("total_peers", len(c.knownPeers))) + // Get peers JSON snapshot + peers := c.getPeersJSONUnlocked() + return membershipUpdateResult{ + peersJSON: peers, + added: added, + updated: updated, + changed: true, + } } - c.lastUpdate = time.Now() + return membershipUpdateResult{ + changed: false, + } } // removeInactivePeers removes peers that haven't been seen for longer than the inactivity limit @@ -349,12 +388,6 @@ func (c *ClusterDiscoveryService) writePeersJSON() error { // writePeersJSONWithData writes the peers.json file with provided data (no lock needed) func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]interface{}) error { - c.logger.Info("writePeersJSON: Starting", - zap.String("data_dir", c.dataDir)) - - c.logger.Info("writePeersJSON: Got peers JSON", - zap.Int("peer_count", len(peers))) - // Expand ~ in data directory path dataDir := os.ExpandEnv(c.dataDir) if strings.HasPrefix(dataDir, "~") { @@ -365,14 +398,14 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte dataDir = filepath.Join(home, dataDir[1:]) } - c.logger.Info("writePeersJSON: Expanded data dir", - zap.String("expanded_path", dataDir)) - // Get the RQLite raft directory rqliteDir := filepath.Join(dataDir, "rqlite", "raft") - c.logger.Info("writePeersJSON: Creating raft directory", - zap.String("raft_dir", rqliteDir)) + c.logger.Debug("Writing peers.json", + zap.String("data_dir", c.dataDir), + zap.String("expanded_path", dataDir), + zap.String("raft_dir", rqliteDir), + zap.Int("peer_count", len(peers))) if err := os.MkdirAll(rqliteDir, 0755); err != nil { return fmt.Errorf("failed to create raft directory %s: %w", rqliteDir, err) @@ -381,13 +414,9 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte peersFile := filepath.Join(rqliteDir, "peers.json") backupFile := filepath.Join(rqliteDir, "peers.json.backup") - c.logger.Info("writePeersJSON: File paths", - zap.String("peers_file", peersFile), - zap.String("backup_file", backupFile)) - // Backup existing peers.json if it exists if _, err := os.Stat(peersFile); err == nil { - c.logger.Info("writePeersJSON: Backing up existing peers.json") + c.logger.Debug("Backing up existing peers.json", zap.String("backup_file", backupFile)) data, err := os.ReadFile(peersFile) if err == nil { _ = os.WriteFile(backupFile, data, 0644) @@ -395,27 +424,19 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte } // Marshal to JSON - c.logger.Info("writePeersJSON: Marshaling to JSON") data, err := json.MarshalIndent(peers, "", " ") if err != nil { return fmt.Errorf("failed to marshal peers.json: %w", err) } - c.logger.Info("writePeersJSON: JSON marshaled", - zap.Int("data_size", len(data))) + c.logger.Debug("Marshaled peers.json", zap.Int("data_size", len(data))) // Write atomically using temp file + rename tempFile := peersFile + ".tmp" - - c.logger.Info("writePeersJSON: Writing temp file", - zap.String("temp_file", tempFile)) - if err := os.WriteFile(tempFile, data, 0644); err != nil { return fmt.Errorf("failed to write temp peers.json %s: %w", tempFile, err) } - c.logger.Info("writePeersJSON: Renaming temp file to final") - if err := os.Rename(tempFile, peersFile); err != nil { return fmt.Errorf("failed to rename %s to %s: %w", tempFile, peersFile, err) } @@ -427,7 +448,7 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte } } - c.logger.Info("peers.json successfully written!", + c.logger.Info("peers.json written", zap.String("file", peersFile), zap.Int("node_count", len(peers)), zap.Strings("node_ids", nodeIDs)) @@ -573,9 +594,6 @@ func (c *ClusterDiscoveryService) TriggerPeerExchange(ctx context.Context) error // UpdateOwnMetadata updates our own RQLite metadata in the peerstore func (c *ClusterDiscoveryService) UpdateOwnMetadata() { - c.logger.Info("Updating own RQLite metadata for peer exchange", - zap.String("node_id", c.nodeID)) - metadata := &discovery.RQLiteNodeMetadata{ NodeID: c.raftAddress, // RQLite uses raft address as node ID RaftAddress: c.raftAddress, @@ -586,12 +604,6 @@ func (c *ClusterDiscoveryService) UpdateOwnMetadata() { ClusterVersion: "1.0", } - c.logger.Info("Created metadata struct", - zap.String("node_id", metadata.NodeID), - zap.String("raft_address", metadata.RaftAddress), - zap.String("http_address", metadata.HTTPAddress), - zap.Uint64("log_index", metadata.RaftLogIndex)) - // Store in our own peerstore for peer exchange data, err := json.Marshal(metadata) if err != nil { @@ -604,8 +616,8 @@ func (c *ClusterDiscoveryService) UpdateOwnMetadata() { return } - c.logger.Info("Successfully stored own RQLite metadata in peerstore", - zap.String("node_id", c.nodeID), + c.logger.Debug("Updated own RQLite metadata", + zap.String("node_id", metadata.NodeID), zap.Uint64("log_index", metadata.RaftLogIndex)) } diff --git a/pkg/rqlite/data_safety.go b/pkg/rqlite/data_safety.go index de2ad54..7abb3ed 100644 --- a/pkg/rqlite/data_safety.go +++ b/pkg/rqlite/data_safety.go @@ -17,7 +17,7 @@ func (r *RQLiteManager) getRaftLogIndex() uint64 { r.logger.Debug("Failed to get Raft log index", zap.Error(err)) return 0 } - + // Return the highest index we have maxIndex := status.Store.Raft.LastLogIndex if status.Store.Raft.AppliedIndex > maxIndex { @@ -26,7 +26,7 @@ func (r *RQLiteManager) getRaftLogIndex() uint64 { if status.Store.Raft.CommitIndex > maxIndex { maxIndex = status.Store.Raft.CommitIndex } - + return maxIndex } @@ -34,23 +34,23 @@ func (r *RQLiteManager) getRaftLogIndex() uint64 { func (r *RQLiteManager) getRQLiteStatus() (*RQLiteStatus, error) { url := fmt.Sprintf("http://localhost:%d/status", r.config.RQLitePort) client := &http.Client{Timeout: 5 * time.Second} - + resp, err := client.Get(url) if err != nil { return nil, fmt.Errorf("failed to query status: %w", err) } defer resp.Body.Close() - + if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("status endpoint returned %d: %s", resp.StatusCode, string(body)) } - + var status RQLiteStatus if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { return nil, fmt.Errorf("failed to decode status: %w", err) } - + return &status, nil } @@ -58,23 +58,37 @@ func (r *RQLiteManager) getRQLiteStatus() (*RQLiteStatus, error) { func (r *RQLiteManager) getRQLiteNodes() (RQLiteNodes, error) { url := fmt.Sprintf("http://localhost:%d/nodes?ver=2", r.config.RQLitePort) client := &http.Client{Timeout: 5 * time.Second} - + resp, err := client.Get(url) if err != nil { return nil, fmt.Errorf("failed to query nodes: %w", err) } defer resp.Body.Close() - + if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("nodes endpoint returned %d: %s", resp.StatusCode, string(body)) } - + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read nodes response: %w", err) + } + + // rqlite v8 wraps nodes in a top-level object; fall back to a raw array for older versions. + var wrapped struct { + Nodes RQLiteNodes `json:"nodes"` + } + if err := json.Unmarshal(body, &wrapped); err == nil && wrapped.Nodes != nil { + return wrapped.Nodes, nil + } + + // Try legacy format (plain array) var nodes RQLiteNodes - if err := json.NewDecoder(resp.Body).Decode(&nodes); err != nil { + if err := json.Unmarshal(body, &nodes); err != nil { return nil, fmt.Errorf("failed to decode nodes: %w", err) } - + return nodes, nil } @@ -84,12 +98,12 @@ func (r *RQLiteManager) getRQLiteLeader() (string, error) { if err != nil { return "", err } - + leaderAddr := status.Store.Raft.LeaderAddr if leaderAddr == "" { return "", fmt.Errorf("no leader found") } - + return leaderAddr, nil } @@ -97,13 +111,12 @@ func (r *RQLiteManager) getRQLiteLeader() (string, error) { func (r *RQLiteManager) isNodeReachable(httpAddress string) bool { url := fmt.Sprintf("http://%s/status", httpAddress) client := &http.Client{Timeout: 3 * time.Second} - + resp, err := client.Get(url) if err != nil { return false } defer resp.Body.Close() - + return resp.StatusCode == http.StatusOK } - diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index d491fa0..dc84881 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -64,7 +64,7 @@ func NewRQLiteManager(cfg *config.DatabaseConfig, discoveryCfg *config.Discovery config: cfg, discoverConfig: discoveryCfg, dataDir: dataDir, - logger: logger, + logger: logger.With(zap.String("component", "rqlite-manager")), } } @@ -75,20 +75,9 @@ func (r *RQLiteManager) SetDiscoveryService(service *ClusterDiscoveryService) { // Start starts the RQLite node func (r *RQLiteManager) Start(ctx context.Context) error { - // Expand ~ in data directory path - dataDir := os.ExpandEnv(r.dataDir) - if strings.HasPrefix(dataDir, "~") { - home, err := os.UserHomeDir() - if err != nil { - return fmt.Errorf("failed to determine home directory: %w", err) - } - dataDir = filepath.Join(home, dataDir[1:]) - } - - // Create data directory - rqliteDataDir := filepath.Join(dataDir, "rqlite") - if err := os.MkdirAll(rqliteDataDir, 0755); err != nil { - return fmt.Errorf("failed to create RQLite data directory: %w", err) + rqliteDataDir, err := r.prepareDataDir() + if err != nil { + return err } if r.discoverConfig.HttpAdvAddress == "" { @@ -106,6 +95,55 @@ func (r *RQLiteManager) Start(ctx context.Context) error { } } + // Launch RQLite process + if err := r.launchProcess(ctx, rqliteDataDir); err != nil { + return err + } + + // Wait for RQLite to be ready and establish connection + if err := r.waitForReadyAndConnect(ctx); err != nil { + return err + } + + // Establish leadership/SQL availability + if err := r.establishLeadershipOrJoin(ctx, rqliteDataDir); err != nil { + return err + } + + // Apply migrations + migrationsDir := "migrations" + if err := r.ApplyMigrations(ctx, migrationsDir); err != nil { + r.logger.Error("Migrations failed", zap.Error(err), zap.String("dir", migrationsDir)) + return fmt.Errorf("apply migrations: %w", err) + } + + r.logger.Info("RQLite node started successfully") + return nil +} + +// prepareDataDir expands and creates the RQLite data directory +func (r *RQLiteManager) prepareDataDir() (string, error) { + // Expand ~ in data directory path + dataDir := os.ExpandEnv(r.dataDir) + if strings.HasPrefix(dataDir, "~") { + home, err := os.UserHomeDir() + if err != nil { + return "", fmt.Errorf("failed to determine home directory: %w", err) + } + dataDir = filepath.Join(home, dataDir[1:]) + } + + // Create data directory + rqliteDataDir := filepath.Join(dataDir, "rqlite") + if err := os.MkdirAll(rqliteDataDir, 0755); err != nil { + return "", fmt.Errorf("failed to create RQLite data directory: %w", err) + } + + return rqliteDataDir, nil +} + +// launchProcess starts the RQLite process with appropriate arguments +func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string) error { // Build RQLite command args := []string{ "-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort), @@ -147,9 +185,7 @@ func (r *RQLiteManager) Start(ctx context.Context) error { zap.String("data_dir", rqliteDataDir), zap.Int("http_port", r.config.RQLitePort), zap.Int("raft_port", r.config.RQLiteRaftPort), - zap.String("join_address", r.config.RQLiteJoinAddress), - zap.Strings("full_args", args), - ) + zap.String("join_address", r.config.RQLiteJoinAddress)) // Start RQLite process (not bound to ctx for graceful Stop handling) r.cmd = exec.Command("rqlited", args...) @@ -162,6 +198,11 @@ func (r *RQLiteManager) Start(ctx context.Context) error { return fmt.Errorf("failed to start RQLite: %w", err) } + return nil +} + +// waitForReadyAndConnect waits for RQLite to be ready and establishes connection +func (r *RQLiteManager) waitForReadyAndConnect(ctx context.Context) error { // Wait for RQLite to be ready if err := r.waitForReady(ctx); err != nil { if r.cmd != nil && r.cmd.Process != nil { @@ -182,11 +223,15 @@ func (r *RQLiteManager) Start(ctx context.Context) error { // Sanity check: verify rqlite's node ID matches our configured raft address if err := r.validateNodeID(); err != nil { - r.logger.Warn("Node ID validation failed", zap.Error(err)) - // Don't fail startup, but log the mismatch for debugging + r.logger.Debug("Node ID validation skipped", zap.Error(err)) + // Don't fail startup, but log at debug level } - // Leadership/SQL readiness gating with dynamic discovery support + return nil +} + +// establishLeadershipOrJoin establishes leadership (bootstrap) or waits for SQL availability (joining) +func (r *RQLiteManager) establishLeadershipOrJoin(ctx context.Context, rqliteDataDir string) error { if r.config.RQLiteJoinAddress == "" { // Bootstrap node logic with data safety checks r.logger.Info("Bootstrap node: checking if safe to lead") @@ -214,46 +259,47 @@ func (r *RQLiteManager) Start(ctx context.Context) error { leadershipErr := r.waitForLeadership(ctx) if leadershipErr == nil { r.logger.Info("Bootstrap node successfully established leadership") - } else { - r.logger.Warn("Initial leadership attempt failed, may need cluster recovery", - zap.Error(leadershipErr)) + return nil + } - // Try recovery if we have peers.json from discovery - if r.discoveryService != nil { - peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json") - if _, err := os.Stat(peersPath); err == nil { - r.logger.Info("Attempting cluster recovery using peers.json", - zap.String("peers_file", peersPath)) + r.logger.Warn("Initial leadership attempt failed, may need cluster recovery", + zap.Error(leadershipErr)) - if recoveryErr := r.recoverCluster(peersPath); recoveryErr == nil { - r.logger.Info("Cluster recovery successful, retrying leadership") - leadershipErr = r.waitForLeadership(ctx) - if leadershipErr == nil { - r.logger.Info("Bootstrap node established leadership after recovery") - } - } else { - r.logger.Warn("Cluster recovery failed", zap.Error(recoveryErr)) + // Try recovery if we have peers.json from discovery + if r.discoveryService != nil { + peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json") + if _, err := os.Stat(peersPath); err == nil { + r.logger.Info("Attempting cluster recovery using peers.json", + zap.String("peers_file", peersPath)) + + if recoveryErr := r.recoverCluster(peersPath); recoveryErr == nil { + r.logger.Info("Cluster recovery successful, retrying leadership") + leadershipErr = r.waitForLeadership(ctx) + if leadershipErr == nil { + r.logger.Info("Bootstrap node established leadership after recovery") + return nil } - } - } - - // Final fallback: SQL availability - if leadershipErr != nil { - r.logger.Warn("Leadership failed, trying SQL availability") - sqlCtx := ctx - if _, hasDeadline := ctx.Deadline(); !hasDeadline { - var cancel context.CancelFunc - sqlCtx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - defer cancel() - } - if err := r.waitForSQLAvailable(sqlCtx); err != nil { - if r.cmd != nil && r.cmd.Process != nil { - _ = r.cmd.Process.Kill() - } - return fmt.Errorf("RQLite SQL not available: %w", err) + } else { + r.logger.Warn("Cluster recovery failed", zap.Error(recoveryErr)) } } } + + // Final fallback: SQL availability + r.logger.Warn("Leadership failed, trying SQL availability") + sqlCtx := ctx + if _, hasDeadline := ctx.Deadline(); !hasDeadline { + var cancel context.CancelFunc + sqlCtx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + } + if err := r.waitForSQLAvailable(sqlCtx); err != nil { + if r.cmd != nil && r.cmd.Process != nil { + _ = r.cmd.Process.Kill() + } + return fmt.Errorf("RQLite SQL not available: %w", err) + } + return nil } else { // Joining node logic r.logger.Info("Waiting for RQLite SQL availability (leader discovery)") @@ -269,18 +315,8 @@ func (r *RQLiteManager) Start(ctx context.Context) error { } return fmt.Errorf("RQLite SQL not available: %w", err) } + return nil } - - // After waitForLeadership / waitForSQLAvailable succeeds, before returning: - migrationsDir := "migrations" - - if err := r.ApplyMigrations(ctx, migrationsDir); err != nil { - r.logger.Error("Migrations failed", zap.Error(err), zap.String("dir", migrationsDir)) - return fmt.Errorf("apply migrations: %w", err) - } - - r.logger.Info("RQLite node started successfully") - return nil } // hasExistingState returns true if the rqlite data directory already contains files or subdirectories. @@ -331,7 +367,12 @@ func (r *RQLiteManager) waitForReady(ctx context.Context) error { func (r *RQLiteManager) waitForLeadership(ctx context.Context) error { r.logger.Info("Waiting for RQLite to establish leadership...") - for i := 0; i < 30; i++ { + maxAttempts := 30 + attempt := 0 + backoffDelay := 500 * time.Millisecond + maxBackoff := 5 * time.Second + + for attempt < maxAttempts { select { case <-ctx.Done(): return ctx.Err() @@ -345,10 +386,19 @@ func (r *RQLiteManager) waitForLeadership(ctx context.Context) error { r.logger.Info("RQLite leadership established") return nil } - r.logger.Debug("Waiting for leadership", zap.Error(err)) + // Log every 5th attempt or on first attempt to reduce noise + if attempt%5 == 0 || attempt == 0 { + r.logger.Debug("Waiting for leadership", zap.Int("attempt", attempt+1), zap.Error(err)) + } } - time.Sleep(1 * time.Second) + // Exponential backoff with jitter + time.Sleep(backoffDelay) + backoffDelay = time.Duration(float64(backoffDelay) * 1.5) + if backoffDelay > maxBackoff { + backoffDelay = maxBackoff + } + attempt++ } return fmt.Errorf("RQLite failed to establish leadership within timeout") @@ -728,7 +778,9 @@ func (r *RQLiteManager) validateNodeID() error { time.Sleep(500 * time.Millisecond) continue } - return fmt.Errorf("failed to query nodes endpoint after retries: %w", err) + // Log at debug level if validation fails - not critical + r.logger.Debug("Node ID validation skipped (endpoint unavailable)", zap.Error(err)) + return nil } expectedID := r.discoverConfig.RaftAdvAddress @@ -736,6 +788,12 @@ func (r *RQLiteManager) validateNodeID() error { return fmt.Errorf("raft_adv_address not configured") } + // If cluster is still forming, nodes list might be empty - that's okay + if len(nodes) == 0 { + r.logger.Debug("Node ID validation skipped (cluster not yet formed)") + return nil + } + // Find our node in the cluster (match by address) for _, node := range nodes { if node.Address == expectedID { @@ -747,21 +805,16 @@ func (r *RQLiteManager) validateNodeID() error { zap.String("explanation", "peers.json id field must match rqlite's node ID (raft address)")) return fmt.Errorf("node ID mismatch: configured %s but rqlite reports %s", expectedID, node.ID) } - r.logger.Info("Node ID validation passed", + r.logger.Debug("Node ID validation passed", zap.String("node_id", node.ID), zap.String("address", node.Address)) return nil } } - // If cluster is still forming, nodes list might be empty - that's okay - if len(nodes) == 0 { - r.logger.Debug("Cluster membership not yet available, skipping validation") - return nil - } - - // If we can't find ourselves but other nodes exist, log a warning - r.logger.Warn("Could not find our node in cluster membership", + // If we can't find ourselves but other nodes exist, cluster might still be forming + // This is fine - don't log a warning + r.logger.Debug("Node ID validation skipped (node not yet in cluster membership)", zap.String("expected_address", expectedID), zap.Int("nodes_in_cluster", len(nodes))) return nil From c5d3dd1f6df5235572f9f10258d37471d93c0c82 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 31 Oct 2025 12:12:27 +0200 Subject: [PATCH 12/13] feat: enhance Go module cache setup in CLI - Added Go module cache directory to the setupDirectories function for improved Go environment configuration. - Updated cloneAndBuild function to set HOME environment variable, ensuring proper module cache creation during build process. --- pkg/cli/setup.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/cli/setup.go b/pkg/cli/setup.go index e1a2cc8..694f1e0 100644 --- a/pkg/cli/setup.go +++ b/pkg/cli/setup.go @@ -680,6 +680,7 @@ func setupDirectories() { "/home/debros/bin", "/home/debros/src", "/home/debros/.debros", + "/home/debros/go", // Go module cache directory } for _, dir := range dirs { @@ -743,8 +744,10 @@ func cloneAndBuild() { os.Setenv("PATH", os.Getenv("PATH")+":/usr/local/go/bin") // Use sudo with --preserve-env=PATH to pass Go path to debros user + // Set HOME so Go knows where to create module cache cmd := exec.Command("sudo", "--preserve-env=PATH", "-u", "debros", "make", "build") cmd.Dir = "/home/debros/src" + cmd.Env = append(os.Environ(), "HOME=/home/debros", "PATH="+os.Getenv("PATH")+":/usr/local/go/bin") if output, err := cmd.CombinedOutput(); err != nil { fmt.Fprintf(os.Stderr, "❌ Failed to build: %v\n%s\n", err, output) os.Exit(1) From 7b7087e5eb905592095285ea36f47f4fe95ae34b Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 31 Oct 2025 12:12:40 +0200 Subject: [PATCH 13/13] chore: update version to 0.53.1 in Makefile --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 9e9bf5d..d4d9195 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.53.0 +VERSION := 0.53.1 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'