Merge pull request #62 from DeBrosOfficial/nightly

chore: update version and enhance database connection configuration
This commit is contained in:
anonpenguin 2025-10-31 13:17:08 +02:00 committed by GitHub
commit ca00561da1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 1902 additions and 106 deletions

View File

@ -14,6 +14,23 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant
### Fixed ### Fixed
## [0.53.0] - 2025-10-31
### Added
- Discovery manager now tracks failed peer-exchange attempts to suppress repeated warnings while peers negotiate supported protocols.
### Changed
- Scoped logging throughout `cluster_discovery`, `rqlite`, and `discovery` packages so logs carry component tags and keep verbose output at debug level.
- Refactored `ClusterDiscoveryService` membership handling: metadata updates happen under lock, `peers.json` is written outside the lock, self-health is skipped, and change detection is centralized in `computeMembershipChangesLocked`.
- Reworked `RQLiteManager.Start` into helper functions (`prepareDataDir`, `launchProcess`, `waitForReadyAndConnect`, `establishLeadershipOrJoin`) with clearer logging, better error handling, and exponential backoff while waiting for leadership.
- `validateNodeID` now treats empty membership results as transitional states, logging at debug level instead of warning to avoid noisy startups.
### Fixed
- Eliminated spurious `peers.json` churn and node-ID mismatch warnings during cluster formation by aligning IDs with raft addresses and tightening discovery logging.
## [0.52.15] ## [0.52.15]
### Added ### Added

View File

@ -21,7 +21,7 @@ test-e2e:
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports
VERSION := 0.52.15 VERSION := 0.53.1
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"os" "os"
"os/exec" "os/exec"
"path/filepath"
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
@ -166,6 +167,19 @@ func promptYesNo() bool {
return response == "yes" || response == "y" return response == "yes" || response == "y"
} }
func promptBranch() string {
reader := bufio.NewReader(os.Stdin)
fmt.Printf(" Select branch (main/nightly) [default: main]: ")
response, _ := reader.ReadString('\n')
response = strings.ToLower(strings.TrimSpace(response))
if response == "nightly" {
return "nightly"
}
// Default to main for anything else (including empty)
return "main"
}
// isValidMultiaddr validates bootstrap peer multiaddr format // isValidMultiaddr validates bootstrap peer multiaddr format
func isValidMultiaddr(s string) bool { func isValidMultiaddr(s string) bool {
s = strings.TrimSpace(s) s = strings.TrimSpace(s)
@ -396,43 +410,89 @@ func installAnon() {
return return
} }
// Install via APT (official method from docs.anyone.io) // Check Ubuntu version - Ubuntu 25.04 is not yet supported by Anon repository
fmt.Printf(" Adding Anyone APT repository...\n") osInfo := detectLinuxDistro()
if strings.Contains(strings.ToLower(osInfo), "ubuntu 25.04") || strings.Contains(strings.ToLower(osInfo), "plucky") {
// Add GPG key fmt.Fprintf(os.Stderr, "⚠️ Ubuntu 25.04 (Plucky) is not yet supported by Anon repository\n")
cmd := exec.Command("sh", "-c", "curl -fsSL https://deb.anyone.io/gpg.key | gpg --dearmor -o /usr/share/keyrings/anyone-archive-keyring.gpg") fmt.Fprintf(os.Stderr, " Anon installation will be skipped. The gateway will work without it,\n")
if err := cmd.Run(); err != nil { fmt.Fprintf(os.Stderr, " but anonymous proxy functionality will not be available.\n")
fmt.Fprintf(os.Stderr, "⚠️ Failed to add Anyone GPG key: %v\n", err) fmt.Fprintf(os.Stderr, " You can manually install Anon later when support is added:\n")
fmt.Fprintf(os.Stderr, " You can manually install with:\n") fmt.Fprintf(os.Stderr, " sudo /bin/bash -c \"$(curl -fsSL https://raw.githubusercontent.com/anyone-protocol/anon-install/refs/heads/main/install.sh)\"\n\n")
fmt.Fprintf(os.Stderr, " curl -fsSL https://deb.anyone.io/gpg.key | sudo gpg --dearmor -o /usr/share/keyrings/anyone-archive-keyring.gpg\n")
fmt.Fprintf(os.Stderr, " echo 'deb [signed-by=/usr/share/keyrings/anyone-archive-keyring.gpg] https://deb.anyone.io/ anyone main' | sudo tee /etc/apt/sources.list.d/anyone.list\n")
fmt.Fprintf(os.Stderr, " sudo apt update && sudo apt install -y anon\n")
return return
} }
// Add repository // Install via official installation script (from GitHub)
repoLine := "deb [signed-by=/usr/share/keyrings/anyone-archive-keyring.gpg] https://deb.anyone.io/ anyone main" fmt.Printf(" Installing Anon using official installation script...\n")
if err := os.WriteFile("/etc/apt/sources.list.d/anyone.list", []byte(repoLine+"\n"), 0644); err != nil { fmt.Printf(" Note: The installation script may prompt for configuration\n")
fmt.Fprintf(os.Stderr, "⚠️ Failed to add Anyone repository: %v\n", err)
return // Clean up any old APT repository files from previous installation attempts
gpgKeyPath := "/usr/share/keyrings/anyone-archive-keyring.gpg"
repoPath := "/etc/apt/sources.list.d/anyone.list"
if _, err := os.Stat(gpgKeyPath); err == nil {
fmt.Printf(" Removing old GPG key file...\n")
os.Remove(gpgKeyPath)
}
if _, err := os.Stat(repoPath); err == nil {
fmt.Printf(" Removing old repository file...\n")
os.Remove(repoPath)
} }
// Update package list // Preseed debconf before installation
fmt.Printf(" Updating package list...\n") fmt.Printf(" Pre-accepting Anon terms and conditions...\n")
exec.Command("apt", "update", "-qq").Run() preseedCmd := exec.Command("sh", "-c", `echo "anon anon/terms boolean true" | debconf-set-selections`)
preseedCmd.Run() // Ignore errors, preseed might not be critical
// Create anonrc directory and file with AgreeToTerms before installation
// This ensures terms are accepted even if the post-install script checks the file
anonrcDir := "/etc/anon"
anonrcPath := "/etc/anon/anonrc"
if err := os.MkdirAll(anonrcDir, 0755); err == nil {
if _, err := os.Stat(anonrcPath); os.IsNotExist(err) {
// Create file with AgreeToTerms already set
os.WriteFile(anonrcPath, []byte("AgreeToTerms 1\n"), 0644)
}
}
// Create terms-agreement files in multiple possible locations
// Anon might check for these files to verify terms acceptance
termsLocations := []string{
"/var/lib/anon/terms-agreement",
"/usr/share/anon/terms-agreement",
"/usr/share/keyrings/anon/terms-agreement",
"/usr/share/keyrings/anyone-terms-agreed",
}
for _, loc := range termsLocations {
dir := filepath.Dir(loc)
if err := os.MkdirAll(dir, 0755); err == nil {
os.WriteFile(loc, []byte("agreed\n"), 0644)
}
}
// Use the official installation script from GitHub
// Rely on debconf preseed and file-based acceptance methods
// If prompts still appear, pipe a few "yes" responses (not infinite)
installScriptURL := "https://raw.githubusercontent.com/anyone-protocol/anon-install/refs/heads/main/install.sh"
// Pipe multiple "yes" responses (but limited) in case of multiple prompts
yesResponses := strings.Repeat("yes\n", 10) // 10 "yes" responses should be enough
cmd := exec.Command("sh", "-c", fmt.Sprintf("curl -fsSL %s | bash", installScriptURL))
cmd.Env = append(os.Environ(), "DEBIAN_FRONTEND=noninteractive")
cmd.Stdin = strings.NewReader(yesResponses)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// Install anon
fmt.Printf(" Installing Anon package...\n")
cmd = exec.Command("apt", "install", "-y", "anon")
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
fmt.Fprintf(os.Stderr, "⚠️ Anon installation failed: %v\n", err) fmt.Fprintf(os.Stderr, "⚠️ Anon installation failed: %v\n", err)
return fmt.Fprintf(os.Stderr, " The gateway will work without Anon, but anonymous proxy functionality will not be available.\n")
fmt.Fprintf(os.Stderr, " You can manually install Anon later:\n")
fmt.Fprintf(os.Stderr, " sudo /bin/bash -c \"$(curl -fsSL https://raw.githubusercontent.com/anyone-protocol/anon-install/refs/heads/main/install.sh)\"\n")
return // Continue setup without Anon
} }
// Verify installation // Verify installation
if _, err := exec.LookPath("anon"); err != nil { if _, err := exec.LookPath("anon"); err != nil {
fmt.Fprintf(os.Stderr, "⚠️ Anon installation may have failed\n") fmt.Fprintf(os.Stderr, "⚠️ Anon installation verification failed: binary not found in PATH\n")
return fmt.Fprintf(os.Stderr, " Continuing setup without Anon...\n")
return // Continue setup without Anon
} }
fmt.Printf(" ✓ Anon installed\n") fmt.Printf(" ✓ Anon installed\n")
@ -448,11 +508,28 @@ func installAnon() {
// Enable and start service // Enable and start service
fmt.Printf(" Enabling Anon service...\n") fmt.Printf(" Enabling Anon service...\n")
exec.Command("systemctl", "enable", "anon").Run() enableCmd := exec.Command("systemctl", "enable", "anon")
exec.Command("systemctl", "start", "anon").Run() if output, err := enableCmd.CombinedOutput(); err != nil {
fmt.Fprintf(os.Stderr, "⚠️ Failed to enable Anon service: %v\n", err)
if len(output) > 0 {
fmt.Fprintf(os.Stderr, " Output: %s\n", string(output))
}
}
startCmd := exec.Command("systemctl", "start", "anon")
if output, err := startCmd.CombinedOutput(); err != nil {
fmt.Fprintf(os.Stderr, "⚠️ Failed to start Anon service: %v\n", err)
if len(output) > 0 {
fmt.Fprintf(os.Stderr, " Output: %s\n", string(output))
}
fmt.Fprintf(os.Stderr, " Check service status: systemctl status anon\n")
} else {
fmt.Printf(" ✓ Anon service started\n")
}
// Verify service is running
if exec.Command("systemctl", "is-active", "--quiet", "anon").Run() == nil { if exec.Command("systemctl", "is-active", "--quiet", "anon").Run() == nil {
fmt.Printf(" ✓ Anon service is running\n") fmt.Printf(" ✓ Anon service is active\n")
} else { } else {
fmt.Fprintf(os.Stderr, "⚠️ Anon service may not be running. Check: systemctl status anon\n") fmt.Fprintf(os.Stderr, "⚠️ Anon service may not be running. Check: systemctl status anon\n")
} }
@ -488,6 +565,10 @@ func configureAnonDefaults() {
if !strings.Contains(config, "SocksPort") { if !strings.Contains(config, "SocksPort") {
config += "SocksPort 9050\n" config += "SocksPort 9050\n"
} }
// Auto-accept terms to avoid interactive prompts
if !strings.Contains(config, "AgreeToTerms") {
config += "AgreeToTerms 1\n"
}
// Write back // Write back
os.WriteFile(anonrcPath, []byte(config), 0644) os.WriteFile(anonrcPath, []byte(config), 0644)
@ -496,6 +577,7 @@ func configureAnonDefaults() {
fmt.Printf(" ORPort: 9001 (default)\n") fmt.Printf(" ORPort: 9001 (default)\n")
fmt.Printf(" ControlPort: 9051\n") fmt.Printf(" ControlPort: 9051\n")
fmt.Printf(" SOCKSPort: 9050\n") fmt.Printf(" SOCKSPort: 9050\n")
fmt.Printf(" AgreeToTerms: 1 (auto-accepted)\n")
} }
} }
@ -598,6 +680,7 @@ func setupDirectories() {
"/home/debros/bin", "/home/debros/bin",
"/home/debros/src", "/home/debros/src",
"/home/debros/.debros", "/home/debros/.debros",
"/home/debros/go", // Go module cache directory
} }
for _, dir := range dirs { for _, dir := range dirs {
@ -616,16 +699,38 @@ func setupDirectories() {
func cloneAndBuild() { func cloneAndBuild() {
fmt.Printf("🔨 Cloning and building DeBros Network...\n") fmt.Printf("🔨 Cloning and building DeBros Network...\n")
// Prompt for branch selection
branch := promptBranch()
fmt.Printf(" Using branch: %s\n", branch)
// Check if already cloned // Check if already cloned
if _, err := os.Stat("/home/debros/src/.git"); err == nil { if _, err := os.Stat("/home/debros/src/.git"); err == nil {
fmt.Printf(" Updating repository...\n") fmt.Printf(" Updating repository...\n")
cmd := exec.Command("sudo", "-u", "debros", "git", "-C", "/home/debros/src", "pull", "origin", "nightly")
// Check current branch and switch if needed
currentBranchCmd := exec.Command("sudo", "-u", "debros", "git", "-C", "/home/debros/src", "rev-parse", "--abbrev-ref", "HEAD")
if output, err := currentBranchCmd.Output(); err == nil {
currentBranch := strings.TrimSpace(string(output))
if currentBranch != branch {
fmt.Printf(" Switching from %s to %s...\n", currentBranch, branch)
// Fetch the target branch first (needed for shallow clones)
exec.Command("sudo", "-u", "debros", "git", "-C", "/home/debros/src", "fetch", "origin", branch).Run()
// Checkout the selected branch
checkoutCmd := exec.Command("sudo", "-u", "debros", "git", "-C", "/home/debros/src", "checkout", branch)
if err := checkoutCmd.Run(); err != nil {
fmt.Fprintf(os.Stderr, "⚠️ Failed to switch branch: %v\n", err)
}
}
}
// Pull latest changes
cmd := exec.Command("sudo", "-u", "debros", "git", "-C", "/home/debros/src", "pull", "origin", branch)
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
fmt.Fprintf(os.Stderr, "⚠️ Failed to update repo: %v\n", err) fmt.Fprintf(os.Stderr, "⚠️ Failed to update repo: %v\n", err)
} }
} else { } else {
fmt.Printf(" Cloning repository...\n") fmt.Printf(" Cloning repository...\n")
cmd := exec.Command("sudo", "-u", "debros", "git", "clone", "--branch", "nightly", "--depth", "1", "https://github.com/DeBrosOfficial/network.git", "/home/debros/src") cmd := exec.Command("sudo", "-u", "debros", "git", "clone", "--branch", branch, "--depth", "1", "https://github.com/DeBrosOfficial/network.git", "/home/debros/src")
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
fmt.Fprintf(os.Stderr, "❌ Failed to clone repo: %v\n", err) fmt.Fprintf(os.Stderr, "❌ Failed to clone repo: %v\n", err)
os.Exit(1) os.Exit(1)
@ -639,8 +744,10 @@ func cloneAndBuild() {
os.Setenv("PATH", os.Getenv("PATH")+":/usr/local/go/bin") os.Setenv("PATH", os.Getenv("PATH")+":/usr/local/go/bin")
// Use sudo with --preserve-env=PATH to pass Go path to debros user // Use sudo with --preserve-env=PATH to pass Go path to debros user
// Set HOME so Go knows where to create module cache
cmd := exec.Command("sudo", "--preserve-env=PATH", "-u", "debros", "make", "build") cmd := exec.Command("sudo", "--preserve-env=PATH", "-u", "debros", "make", "build")
cmd.Dir = "/home/debros/src" cmd.Dir = "/home/debros/src"
cmd.Env = append(os.Environ(), "HOME=/home/debros", "PATH="+os.Getenv("PATH")+":/usr/local/go/bin")
if output, err := cmd.CombinedOutput(); err != nil { if output, err := cmd.CombinedOutput(); err != nil {
fmt.Fprintf(os.Stderr, "❌ Failed to build: %v\n%s\n", err, output) fmt.Fprintf(os.Stderr, "❌ Failed to build: %v\n%s\n", err, output)
os.Exit(1) os.Exit(1)

View File

@ -36,6 +36,11 @@ type DatabaseConfig struct {
RQLitePort int `yaml:"rqlite_port"` // RQLite HTTP API port RQLitePort int `yaml:"rqlite_port"` // RQLite HTTP API port
RQLiteRaftPort int `yaml:"rqlite_raft_port"` // RQLite Raft consensus port RQLiteRaftPort int `yaml:"rqlite_raft_port"` // RQLite Raft consensus port
RQLiteJoinAddress string `yaml:"rqlite_join_address"` // Address to join RQLite cluster RQLiteJoinAddress string `yaml:"rqlite_join_address"` // Address to join RQLite cluster
// Dynamic discovery configuration (always enabled)
ClusterSyncInterval time.Duration `yaml:"cluster_sync_interval"` // default: 30s
PeerInactivityLimit time.Duration `yaml:"peer_inactivity_limit"` // default: 24h
MinClusterSize int `yaml:"min_cluster_size"` // default: 1
} }
// DiscoveryConfig contains peer discovery configuration // DiscoveryConfig contains peer discovery configuration
@ -106,6 +111,11 @@ func DefaultConfig() *Config {
RQLitePort: 5001, RQLitePort: 5001,
RQLiteRaftPort: 7001, RQLiteRaftPort: 7001,
RQLiteJoinAddress: "", // Empty for bootstrap node RQLiteJoinAddress: "", // Empty for bootstrap node
// Dynamic discovery (always enabled)
ClusterSyncInterval: 30 * time.Second,
PeerInactivityLimit: 24 * time.Hour,
MinClusterSize: 1,
}, },
Discovery: DiscoveryConfig{ Discovery: DiscoveryConfig{
BootstrapPeers: []string{}, BootstrapPeers: []string{},

View File

@ -7,6 +7,7 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net" manet "github.com/multiformats/go-multiaddr/net"
@ -51,6 +52,15 @@ func (c *Config) validateNode() []error {
var errs []error var errs []error
nc := c.Node nc := c.Node
// Validate node ID (required for RQLite cluster membership)
if nc.ID == "" {
errs = append(errs, ValidationError{
Path: "node.id",
Message: "must not be empty (required for cluster membership)",
Hint: "will be auto-generated if empty, but explicit ID recommended",
})
}
// Validate type // Validate type
if nc.Type != "bootstrap" && nc.Type != "node" { if nc.Type != "bootstrap" && nc.Type != "node" {
errs = append(errs, ValidationError{ errs = append(errs, ValidationError{
@ -233,6 +243,40 @@ func (c *Config) validateDatabase() []error {
} }
} }
// Validate cluster_sync_interval
if dc.ClusterSyncInterval != 0 && dc.ClusterSyncInterval < 10*time.Second {
errs = append(errs, ValidationError{
Path: "database.cluster_sync_interval",
Message: fmt.Sprintf("must be >= 10s or 0 (for default); got %v", dc.ClusterSyncInterval),
Hint: "recommended: 30s",
})
}
// Validate peer_inactivity_limit
if dc.PeerInactivityLimit != 0 {
if dc.PeerInactivityLimit < time.Hour {
errs = append(errs, ValidationError{
Path: "database.peer_inactivity_limit",
Message: fmt.Sprintf("must be >= 1h or 0 (for default); got %v", dc.PeerInactivityLimit),
Hint: "recommended: 24h",
})
} else if dc.PeerInactivityLimit > 7*24*time.Hour {
errs = append(errs, ValidationError{
Path: "database.peer_inactivity_limit",
Message: fmt.Sprintf("must be <= 7d; got %v", dc.PeerInactivityLimit),
Hint: "recommended: 24h",
})
}
}
// Validate min_cluster_size
if dc.MinClusterSize < 1 {
errs = append(errs, ValidationError{
Path: "database.min_cluster_size",
Message: fmt.Sprintf("must be >= 1; got %d", dc.MinClusterSize),
})
}
return errs return errs
} }
@ -320,8 +364,14 @@ func (c *Config) validateDiscovery() []error {
seenPeers[peer] = true seenPeers[peer] = true
} }
// Validate http_adv_address // Validate http_adv_address (required for cluster discovery)
if disc.HttpAdvAddress != "" { if disc.HttpAdvAddress == "" {
errs = append(errs, ValidationError{
Path: "discovery.http_adv_address",
Message: "required for RQLite cluster discovery",
Hint: "set to your public HTTP address (e.g., 51.83.128.181:5001)",
})
} else {
if err := validateHostOrHostPort(disc.HttpAdvAddress); err != nil { if err := validateHostOrHostPort(disc.HttpAdvAddress); err != nil {
errs = append(errs, ValidationError{ errs = append(errs, ValidationError{
Path: "discovery.http_adv_address", Path: "discovery.http_adv_address",
@ -331,8 +381,14 @@ func (c *Config) validateDiscovery() []error {
} }
} }
// Validate raft_adv_address // Validate raft_adv_address (required for cluster discovery)
if disc.RaftAdvAddress != "" { if disc.RaftAdvAddress == "" {
errs = append(errs, ValidationError{
Path: "discovery.raft_adv_address",
Message: "required for RQLite cluster discovery",
Hint: "set to your public Raft address (e.g., 51.83.128.181:7001)",
})
} else {
if err := validateHostOrHostPort(disc.RaftAdvAddress); err != nil { if err := validateHostOrHostPort(disc.RaftAdvAddress); err != nil {
errs = append(errs, ValidationError{ errs = append(errs, ValidationError{
Path: "discovery.raft_adv_address", Path: "discovery.raft_adv_address",

View File

@ -24,7 +24,8 @@ type PeerExchangeRequest struct {
// PeerExchangeResponse represents a list of peers to exchange // PeerExchangeResponse represents a list of peers to exchange
type PeerExchangeResponse struct { type PeerExchangeResponse struct {
Peers []PeerInfo `json:"peers"` Peers []PeerInfo `json:"peers"`
RQLiteMetadata *RQLiteNodeMetadata `json:"rqlite_metadata,omitempty"`
} }
// PeerInfo contains peer identity and addresses // PeerInfo contains peer identity and addresses
@ -38,9 +39,10 @@ type PeerInfo struct {
// interface{} to remain source-compatible with previous call sites that // interface{} to remain source-compatible with previous call sites that
// passed a DHT instance. The value is ignored. // passed a DHT instance. The value is ignored.
type Manager struct { type Manager struct {
host host.Host host host.Host
logger *zap.Logger logger *zap.Logger
cancel context.CancelFunc cancel context.CancelFunc
failedPeerExchanges map[peer.ID]time.Time // Track failed peer exchange attempts to suppress repeated warnings
} }
// Config contains discovery configuration // Config contains discovery configuration
@ -55,8 +57,10 @@ type Config struct {
// previously passed a DHT instance can continue to do so; the value is ignored. // previously passed a DHT instance can continue to do so; the value is ignored.
func NewManager(h host.Host, _ interface{}, logger *zap.Logger) *Manager { func NewManager(h host.Host, _ interface{}, logger *zap.Logger) *Manager {
return &Manager{ return &Manager{
host: h, host: h,
logger: logger, logger: logger.With(zap.String("component", "peer-discovery")),
cancel: nil,
failedPeerExchanges: make(map[peer.ID]time.Time),
} }
} }
@ -123,6 +127,16 @@ func (d *Manager) handlePeerExchangeStream(s network.Stream) {
added++ added++
} }
// Add RQLite metadata if available
if val, err := d.host.Peerstore().Get(d.host.ID(), "rqlite_metadata"); err == nil {
if jsonData, ok := val.([]byte); ok {
var metadata RQLiteNodeMetadata
if err := json.Unmarshal(jsonData, &metadata); err == nil {
resp.RQLiteMetadata = &metadata
}
}
}
// Send response // Send response
encoder := json.NewEncoder(s) encoder := json.NewEncoder(s)
if err := encoder.Encode(&resp); err != nil { if err := encoder.Encode(&resp); err != nil {
@ -131,7 +145,8 @@ func (d *Manager) handlePeerExchangeStream(s network.Stream) {
} }
d.logger.Debug("Sent peer exchange response", d.logger.Debug("Sent peer exchange response",
zap.Int("peer_count", len(resp.Peers))) zap.Int("peer_count", len(resp.Peers)),
zap.Bool("has_rqlite_metadata", resp.RQLiteMetadata != nil))
} }
// Start begins periodic peer discovery // Start begins periodic peer discovery
@ -332,13 +347,21 @@ func (d *Manager) requestPeersFromPeer(ctx context.Context, peerID peer.ID, limi
// Open a stream to the peer // Open a stream to the peer
stream, err := d.host.NewStream(ctx, peerID, PeerExchangeProtocol) stream, err := d.host.NewStream(ctx, peerID, PeerExchangeProtocol)
if err != nil { if err != nil {
d.logger.Debug("Failed to open peer exchange stream", // Suppress repeated warnings for the same peer (log once per minute max)
zap.String("peer_id", peerID.String()[:8]+"..."), lastFailure, seen := d.failedPeerExchanges[peerID]
zap.Error(err)) if !seen || time.Since(lastFailure) > time.Minute {
d.logger.Debug("Failed to open peer exchange stream",
zap.String("peer_id", peerID.String()[:8]+"..."),
zap.Error(err))
d.failedPeerExchanges[peerID] = time.Now()
}
return nil return nil
} }
defer stream.Close() defer stream.Close()
// Clear failure tracking on success
delete(d.failedPeerExchanges, peerID)
// Send request // Send request
req := PeerExchangeRequest{Limit: limit} req := PeerExchangeRequest{Limit: limit}
encoder := json.NewEncoder(stream) encoder := json.NewEncoder(stream)
@ -363,9 +386,52 @@ func (d *Manager) requestPeersFromPeer(ctx context.Context, peerID peer.ID, limi
return nil return nil
} }
// Store remote peer's RQLite metadata if available
if resp.RQLiteMetadata != nil {
metadataJSON, err := json.Marshal(resp.RQLiteMetadata)
if err == nil {
_ = d.host.Peerstore().Put(peerID, "rqlite_metadata", metadataJSON)
d.logger.Debug("Stored RQLite metadata from peer",
zap.String("peer_id", peerID.String()[:8]+"..."),
zap.String("node_id", resp.RQLiteMetadata.NodeID))
}
}
return resp.Peers return resp.Peers
} }
// TriggerPeerExchange manually triggers peer exchange with all connected peers
// This is useful for pre-startup cluster discovery to populate the peerstore with RQLite metadata
func (d *Manager) TriggerPeerExchange(ctx context.Context) int {
connectedPeers := d.host.Network().Peers()
if len(connectedPeers) == 0 {
d.logger.Debug("No connected peers for peer exchange")
return 0
}
d.logger.Info("Manually triggering peer exchange",
zap.Int("connected_peers", len(connectedPeers)))
metadataCollected := 0
for _, peerID := range connectedPeers {
// Request peer list from this peer (which includes their RQLite metadata)
_ = d.requestPeersFromPeer(ctx, peerID, 50) // Request up to 50 peers
// Check if we got RQLite metadata from this peer
if val, err := d.host.Peerstore().Get(peerID, "rqlite_metadata"); err == nil {
if _, ok := val.([]byte); ok {
metadataCollected++
}
}
}
d.logger.Info("Peer exchange completed",
zap.Int("peers_with_metadata", metadataCollected),
zap.Int("total_peers", len(connectedPeers)))
return metadataCollected
}
// connectToPeer attempts to connect to a specific peer using its peerstore info. // connectToPeer attempts to connect to a specific peer using its peerstore info.
func (d *Manager) connectToPeer(ctx context.Context, peerID peer.ID) error { func (d *Manager) connectToPeer(ctx context.Context, peerID peer.ID) error {
peerInfo := d.host.Peerstore().PeerInfo(peerID) peerInfo := d.host.Peerstore().PeerInfo(peerID)

View File

@ -0,0 +1,22 @@
package discovery
import (
"time"
)
// RQLiteNodeMetadata contains RQLite-specific information announced via LibP2P
type RQLiteNodeMetadata struct {
NodeID string `json:"node_id"` // RQLite node ID (from config)
RaftAddress string `json:"raft_address"` // Raft port address (e.g., "51.83.128.181:7001")
HTTPAddress string `json:"http_address"` // HTTP API address (e.g., "51.83.128.181:5001")
NodeType string `json:"node_type"` // "bootstrap" or "node"
RaftLogIndex uint64 `json:"raft_log_index"` // Current Raft log index (for data comparison)
LastSeen time.Time `json:"last_seen"` // Updated on every announcement
ClusterVersion string `json:"cluster_version"` // For compatibility checking
}
// PeerExchangeResponseV2 extends the original response with RQLite metadata
type PeerExchangeResponseV2 struct {
Peers []PeerInfo `json:"peers"`
RQLiteMetadata *RQLiteNodeMetadata `json:"rqlite_metadata,omitempty"`
}

View File

@ -108,13 +108,22 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) {
if dbErr != nil { if dbErr != nil {
logger.ComponentWarn(logging.ComponentGeneral, "failed to open rqlite sql db; http orm gateway disabled", zap.Error(dbErr)) logger.ComponentWarn(logging.ComponentGeneral, "failed to open rqlite sql db; http orm gateway disabled", zap.Error(dbErr))
} else { } else {
// Configure connection pool with proper timeouts and limits
db.SetMaxOpenConns(25) // Maximum number of open connections
db.SetMaxIdleConns(5) // Maximum number of idle connections
db.SetConnMaxLifetime(5 * time.Minute) // Maximum lifetime of a connection
db.SetConnMaxIdleTime(2 * time.Minute) // Maximum idle time before closing
gw.sqlDB = db gw.sqlDB = db
orm := rqlite.NewClient(db) orm := rqlite.NewClient(db)
gw.ormClient = orm gw.ormClient = orm
gw.ormHTTP = rqlite.NewHTTPGateway(orm, "/v1/db") gw.ormHTTP = rqlite.NewHTTPGateway(orm, "/v1/db")
// Set a reasonable timeout for HTTP requests (30 seconds)
gw.ormHTTP.Timeout = 30 * time.Second
logger.ComponentInfo(logging.ComponentGeneral, "RQLite ORM HTTP gateway ready", logger.ComponentInfo(logging.ComponentGeneral, "RQLite ORM HTTP gateway ready",
zap.String("dsn", dsn), zap.String("dsn", dsn),
zap.String("base_path", "/v1/db"), zap.String("base_path", "/v1/db"),
zap.Duration("timeout", gw.ormHTTP.Timeout),
) )
} }

View File

@ -91,12 +91,13 @@ func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory
} }
msg := struct { msg := struct {
PeerID string `json:"peer_id"` PeerID string `json:"peer_id"`
PeerCount int `json:"peer_count"` PeerCount int `json:"peer_count"`
PeerIDs []string `json:"peer_ids,omitempty"` PeerIDs []string `json:"peer_ids,omitempty"`
CPU uint64 `json:"cpu_usage"` CPU uint64 `json:"cpu_usage"`
Memory uint64 `json:"memory_usage"` Memory uint64 `json:"memory_usage"`
Timestamp int64 `json:"timestamp"` Timestamp int64 `json:"timestamp"`
ClusterHealth map[string]interface{} `json:"cluster_health,omitempty"`
}{ }{
PeerID: n.host.ID().String(), PeerID: n.host.ID().String(),
PeerCount: len(peers), PeerCount: len(peers),
@ -106,6 +107,20 @@ func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory
Timestamp: time.Now().Unix(), Timestamp: time.Now().Unix(),
} }
// Add cluster health metrics if available
if n.clusterDiscovery != nil {
metrics := n.clusterDiscovery.GetMetrics()
msg.ClusterHealth = map[string]interface{}{
"cluster_size": metrics.ClusterSize,
"active_nodes": metrics.ActiveNodes,
"inactive_nodes": metrics.InactiveNodes,
"discovery_status": metrics.DiscoveryStatus,
"current_leader": metrics.CurrentLeader,
"average_peer_health": metrics.AveragePeerHealth,
"last_update": metrics.LastUpdate.Format(time.RFC3339),
}
}
data, err := json.Marshal(msg) data, err := json.Marshal(msg)
if err != nil { if err != nil {
return err return err
@ -119,6 +134,50 @@ func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory
return nil return nil
} }
// GetClusterHealth returns cluster health information
func (n *Node) GetClusterHealth() map[string]interface{} {
if n.clusterDiscovery == nil {
return map[string]interface{}{
"status": "not_initialized",
}
}
metrics := n.clusterDiscovery.GetMetrics()
return map[string]interface{}{
"cluster_size": metrics.ClusterSize,
"active_nodes": metrics.ActiveNodes,
"inactive_nodes": metrics.InactiveNodes,
"discovery_status": metrics.DiscoveryStatus,
"current_leader": metrics.CurrentLeader,
"average_peer_health": metrics.AveragePeerHealth,
"last_update": metrics.LastUpdate,
}
}
// GetDiscoveryStatus returns discovery service status
func (n *Node) GetDiscoveryStatus() map[string]interface{} {
if n.clusterDiscovery == nil {
return map[string]interface{}{
"status": "disabled",
"message": "cluster discovery not initialized",
}
}
metrics := n.clusterDiscovery.GetMetrics()
status := "healthy"
if metrics.DiscoveryStatus == "no_peers" {
status = "warning"
} else if metrics.DiscoveryStatus == "degraded" {
status = "degraded"
}
return map[string]interface{}{
"status": status,
"cluster_size": metrics.ClusterSize,
"last_update": metrics.LastUpdate,
}
}
// startConnectionMonitoring starts minimal connection monitoring for the lightweight client. // startConnectionMonitoring starts minimal connection monitoring for the lightweight client.
// Unlike nodes which need extensive monitoring, clients only need basic health checks. // Unlike nodes which need extensive monitoring, clients only need basic health checks.
func (n *Node) startConnectionMonitoring() { func (n *Node) startConnectionMonitoring() {

View File

@ -33,8 +33,9 @@ type Node struct {
logger *logging.ColoredLogger logger *logging.ColoredLogger
host host.Host host host.Host
rqliteManager *database.RQLiteManager rqliteManager *database.RQLiteManager
rqliteAdapter *database.RQLiteAdapter rqliteAdapter *database.RQLiteAdapter
clusterDiscovery *database.ClusterDiscoveryService
// Peer discovery // Peer discovery
bootstrapCancel context.CancelFunc bootstrapCancel context.CancelFunc
@ -67,11 +68,55 @@ func (n *Node) startRQLite(ctx context.Context) error {
// Create RQLite manager // Create RQLite manager
n.rqliteManager = database.NewRQLiteManager(&n.config.Database, &n.config.Discovery, n.config.Node.DataDir, n.logger.Logger) n.rqliteManager = database.NewRQLiteManager(&n.config.Database, &n.config.Discovery, n.config.Node.DataDir, n.logger.Logger)
// Start RQLite // Initialize cluster discovery service if LibP2P host is available
if n.host != nil && n.discoveryManager != nil {
// Determine node type
nodeType := "node"
if n.config.Node.Type == "bootstrap" {
nodeType = "bootstrap"
}
// Create cluster discovery service
n.clusterDiscovery = database.NewClusterDiscoveryService(
n.host,
n.discoveryManager,
n.rqliteManager,
n.config.Node.ID,
nodeType,
n.config.Discovery.RaftAdvAddress,
n.config.Discovery.HttpAdvAddress,
n.config.Node.DataDir,
n.logger.Logger,
)
// Set discovery service on RQLite manager BEFORE starting RQLite
// This is critical for pre-start cluster discovery during recovery
n.rqliteManager.SetDiscoveryService(n.clusterDiscovery)
// Start cluster discovery (but don't trigger initial sync yet)
if err := n.clusterDiscovery.Start(ctx); err != nil {
return fmt.Errorf("failed to start cluster discovery: %w", err)
}
// Publish initial metadata (with log_index=0) so peers can discover us during recovery
// The metadata will be updated with actual log index after RQLite starts
n.clusterDiscovery.UpdateOwnMetadata()
n.logger.Info("Cluster discovery service started (waiting for RQLite)")
}
// Start RQLite FIRST before updating metadata
if err := n.rqliteManager.Start(ctx); err != nil { if err := n.rqliteManager.Start(ctx); err != nil {
return err return err
} }
// NOW update metadata after RQLite is running
if n.clusterDiscovery != nil {
n.clusterDiscovery.UpdateOwnMetadata()
n.clusterDiscovery.TriggerSync() // Do initial cluster sync now that RQLite is ready
n.logger.Info("RQLite metadata published and cluster synced")
}
// Create adapter for sql.DB compatibility // Create adapter for sql.DB compatibility
adapter, err := database.NewRQLiteAdapter(n.rqliteManager) adapter, err := database.NewRQLiteAdapter(n.rqliteManager)
if err != nil { if err != nil {
@ -532,6 +577,11 @@ func (n *Node) stopPeerDiscovery() {
func (n *Node) Stop() error { func (n *Node) Stop() error {
n.logger.ComponentInfo(logging.ComponentNode, "Stopping network node") n.logger.ComponentInfo(logging.ComponentNode, "Stopping network node")
// Stop cluster discovery
if n.clusterDiscovery != nil {
n.clusterDiscovery.Stop()
}
// Stop bootstrap reconnection loop // Stop bootstrap reconnection loop
if n.bootstrapCancel != nil { if n.bootstrapCancel != nil {
n.bootstrapCancel() n.bootstrapCancel()
@ -577,16 +627,16 @@ func (n *Node) Start(ctx context.Context) error {
return fmt.Errorf("failed to create data directory: %w", err) return fmt.Errorf("failed to create data directory: %w", err)
} }
// Start RQLite // Start LibP2P host first (needed for cluster discovery)
if err := n.startRQLite(ctx); err != nil {
return fmt.Errorf("failed to start RQLite: %w", err)
}
// Start LibP2P host
if err := n.startLibP2P(); err != nil { if err := n.startLibP2P(); err != nil {
return fmt.Errorf("failed to start LibP2P: %w", err) return fmt.Errorf("failed to start LibP2P: %w", err)
} }
// Start RQLite with cluster discovery
if err := n.startRQLite(ctx); err != nil {
return fmt.Errorf("failed to start RQLite: %w", err)
}
// Get listen addresses for logging // Get listen addresses for logging
var listenAddrs []string var listenAddrs []string
for _, addr := range n.host.Addrs() { for _, addr := range n.host.Addrs() {

View File

@ -3,6 +3,7 @@ package rqlite
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
"time"
_ "github.com/rqlite/gorqlite/stdlib" // Import the database/sql driver _ "github.com/rqlite/gorqlite/stdlib" // Import the database/sql driver
) )
@ -21,6 +22,12 @@ func NewRQLiteAdapter(manager *RQLiteManager) (*RQLiteAdapter, error) {
return nil, fmt.Errorf("failed to open RQLite SQL connection: %w", err) return nil, fmt.Errorf("failed to open RQLite SQL connection: %w", err)
} }
// Configure connection pool with proper timeouts and limits
db.SetMaxOpenConns(25) // Maximum number of open connections
db.SetMaxIdleConns(5) // Maximum number of idle connections
db.SetConnMaxLifetime(5 * time.Minute) // Maximum lifetime of a connection
db.SetConnMaxIdleTime(2 * time.Minute) // Maximum idle time before closing
return &RQLiteAdapter{ return &RQLiteAdapter{
manager: manager, manager: manager,
db: db, db: db,

View File

@ -0,0 +1,640 @@
package rqlite
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/DeBrosOfficial/network/pkg/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap"
)
// ClusterDiscoveryService bridges LibP2P discovery with RQLite cluster management
type ClusterDiscoveryService struct {
host host.Host
discoveryMgr *discovery.Manager
rqliteManager *RQLiteManager
nodeID string
nodeType string
raftAddress string
httpAddress string
dataDir string
knownPeers map[string]*discovery.RQLiteNodeMetadata // NodeID -> Metadata
peerHealth map[string]*PeerHealth // NodeID -> Health
lastUpdate time.Time
updateInterval time.Duration // 30 seconds
inactivityLimit time.Duration // 24 hours
logger *zap.Logger
mu sync.RWMutex
cancel context.CancelFunc
started bool
}
// NewClusterDiscoveryService creates a new cluster discovery service
func NewClusterDiscoveryService(
h host.Host,
discoveryMgr *discovery.Manager,
rqliteManager *RQLiteManager,
nodeID string,
nodeType string,
raftAddress string,
httpAddress string,
dataDir string,
logger *zap.Logger,
) *ClusterDiscoveryService {
return &ClusterDiscoveryService{
host: h,
discoveryMgr: discoveryMgr,
rqliteManager: rqliteManager,
nodeID: nodeID,
nodeType: nodeType,
raftAddress: raftAddress,
httpAddress: httpAddress,
dataDir: dataDir,
knownPeers: make(map[string]*discovery.RQLiteNodeMetadata),
peerHealth: make(map[string]*PeerHealth),
updateInterval: 30 * time.Second,
inactivityLimit: 24 * time.Hour,
logger: logger.With(zap.String("component", "cluster-discovery")),
}
}
// Start begins the cluster discovery service
func (c *ClusterDiscoveryService) Start(ctx context.Context) error {
c.mu.Lock()
if c.started {
c.mu.Unlock()
return fmt.Errorf("cluster discovery already started")
}
c.started = true
c.mu.Unlock()
ctx, cancel := context.WithCancel(ctx)
c.cancel = cancel
c.logger.Info("Starting cluster discovery service",
zap.String("raft_address", c.raftAddress),
zap.String("node_type", c.nodeType),
zap.String("http_address", c.httpAddress),
zap.String("data_dir", c.dataDir),
zap.Duration("update_interval", c.updateInterval),
zap.Duration("inactivity_limit", c.inactivityLimit))
// Start periodic sync in background
go c.periodicSync(ctx)
// Start periodic cleanup in background
go c.periodicCleanup(ctx)
c.logger.Info("Cluster discovery goroutines started")
return nil
}
// Stop stops the cluster discovery service
func (c *ClusterDiscoveryService) Stop() {
c.mu.Lock()
defer c.mu.Unlock()
if !c.started {
return
}
if c.cancel != nil {
c.cancel()
}
c.started = false
c.logger.Info("Cluster discovery service stopped")
}
// periodicSync runs periodic cluster membership synchronization
func (c *ClusterDiscoveryService) periodicSync(ctx context.Context) {
c.logger.Debug("periodicSync goroutine started, waiting for RQLite readiness")
ticker := time.NewTicker(c.updateInterval)
defer ticker.Stop()
// Wait for first ticker interval before syncing (RQLite needs time to start)
for {
select {
case <-ctx.Done():
c.logger.Debug("periodicSync goroutine stopping")
return
case <-ticker.C:
c.updateClusterMembership()
}
}
}
// periodicCleanup runs periodic cleanup of inactive nodes
func (c *ClusterDiscoveryService) periodicCleanup(ctx context.Context) {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.removeInactivePeers()
}
}
}
// collectPeerMetadata collects RQLite metadata from LibP2P peers
func (c *ClusterDiscoveryService) collectPeerMetadata() []*discovery.RQLiteNodeMetadata {
connectedPeers := c.host.Network().Peers()
var metadata []*discovery.RQLiteNodeMetadata
c.logger.Debug("Collecting peer metadata from LibP2P",
zap.Int("connected_libp2p_peers", len(connectedPeers)))
// Add ourselves
ourMetadata := &discovery.RQLiteNodeMetadata{
NodeID: c.raftAddress, // RQLite uses raft address as node ID
RaftAddress: c.raftAddress,
HTTPAddress: c.httpAddress,
NodeType: c.nodeType,
RaftLogIndex: c.rqliteManager.getRaftLogIndex(),
LastSeen: time.Now(),
ClusterVersion: "1.0",
}
metadata = append(metadata, ourMetadata)
// Query connected peers for their RQLite metadata
// For now, we'll use a simple approach - store metadata in peer metadata store
// In a full implementation, this would use a custom protocol to exchange RQLite metadata
for _, peerID := range connectedPeers {
// Try to get stored metadata from peerstore
// This would be populated by a peer exchange protocol
if val, err := c.host.Peerstore().Get(peerID, "rqlite_metadata"); err == nil {
if jsonData, ok := val.([]byte); ok {
var peerMeta discovery.RQLiteNodeMetadata
if err := json.Unmarshal(jsonData, &peerMeta); err == nil {
peerMeta.LastSeen = time.Now()
metadata = append(metadata, &peerMeta)
}
}
}
}
return metadata
}
// membershipUpdateResult contains the result of a membership update operation
type membershipUpdateResult struct {
peersJSON []map[string]interface{}
added []string
updated []string
changed bool
}
// updateClusterMembership updates the cluster membership based on discovered peers
func (c *ClusterDiscoveryService) updateClusterMembership() {
metadata := c.collectPeerMetadata()
c.logger.Debug("Collected peer metadata",
zap.Int("metadata_count", len(metadata)))
// Compute membership changes while holding lock
c.mu.Lock()
result := c.computeMembershipChangesLocked(metadata)
c.mu.Unlock()
// Perform file I/O outside the lock
if result.changed {
// Log state changes (peer added/removed) at Info level
if len(result.added) > 0 || len(result.updated) > 0 {
c.logger.Info("Cluster membership changed",
zap.Int("added", len(result.added)),
zap.Int("updated", len(result.updated)),
zap.Strings("added_ids", result.added),
zap.Strings("updated_ids", result.updated))
}
// Write peers.json without holding lock
if err := c.writePeersJSONWithData(result.peersJSON); err != nil {
c.logger.Error("CRITICAL: Failed to write peers.json",
zap.Error(err),
zap.String("data_dir", c.dataDir),
zap.Int("peer_count", len(result.peersJSON)))
} else {
c.logger.Debug("peers.json updated",
zap.Int("peer_count", len(result.peersJSON)))
}
// Update lastUpdate timestamp
c.mu.Lock()
c.lastUpdate = time.Now()
c.mu.Unlock()
} else {
c.mu.RLock()
totalPeers := len(c.knownPeers)
c.mu.RUnlock()
c.logger.Debug("No changes to cluster membership",
zap.Int("total_peers", totalPeers))
}
}
// computeMembershipChangesLocked computes membership changes and returns snapshot data
// Must be called with lock held
func (c *ClusterDiscoveryService) computeMembershipChangesLocked(metadata []*discovery.RQLiteNodeMetadata) membershipUpdateResult {
// Track changes
added := []string{}
updated := []string{}
// Update known peers, but skip self for health tracking
for _, meta := range metadata {
// Skip self-metadata for health tracking (we only track remote peers)
isSelf := meta.NodeID == c.raftAddress
if existing, ok := c.knownPeers[meta.NodeID]; ok {
// Update existing peer
if existing.RaftLogIndex != meta.RaftLogIndex ||
existing.HTTPAddress != meta.HTTPAddress ||
existing.RaftAddress != meta.RaftAddress {
updated = append(updated, meta.NodeID)
}
} else {
// New peer discovered
added = append(added, meta.NodeID)
c.logger.Info("Node added to cluster",
zap.String("node_id", meta.NodeID),
zap.String("raft_address", meta.RaftAddress),
zap.String("node_type", meta.NodeType),
zap.Uint64("log_index", meta.RaftLogIndex))
}
c.knownPeers[meta.NodeID] = meta
// Update health tracking only for remote peers
if !isSelf {
if _, ok := c.peerHealth[meta.NodeID]; !ok {
c.peerHealth[meta.NodeID] = &PeerHealth{
LastSeen: time.Now(),
LastSuccessful: time.Now(),
Status: "active",
}
} else {
c.peerHealth[meta.NodeID].LastSeen = time.Now()
c.peerHealth[meta.NodeID].Status = "active"
c.peerHealth[meta.NodeID].FailureCount = 0
}
}
}
// Determine if we should write peers.json
shouldWrite := len(added) > 0 || len(updated) > 0 || c.lastUpdate.IsZero()
if shouldWrite {
// Log initial sync if this is the first time
if c.lastUpdate.IsZero() {
c.logger.Info("Initial cluster membership sync",
zap.Int("total_peers", len(c.knownPeers)))
}
// Get peers JSON snapshot
peers := c.getPeersJSONUnlocked()
return membershipUpdateResult{
peersJSON: peers,
added: added,
updated: updated,
changed: true,
}
}
return membershipUpdateResult{
changed: false,
}
}
// removeInactivePeers removes peers that haven't been seen for longer than the inactivity limit
func (c *ClusterDiscoveryService) removeInactivePeers() {
c.mu.Lock()
defer c.mu.Unlock()
now := time.Now()
removed := []string{}
for nodeID, health := range c.peerHealth {
inactiveDuration := now.Sub(health.LastSeen)
if inactiveDuration > c.inactivityLimit {
// Mark as inactive and remove
c.logger.Warn("Node removed from cluster",
zap.String("node_id", nodeID),
zap.String("reason", "inactive"),
zap.Duration("inactive_duration", inactiveDuration))
delete(c.knownPeers, nodeID)
delete(c.peerHealth, nodeID)
removed = append(removed, nodeID)
}
}
// Regenerate peers.json if any peers were removed
if len(removed) > 0 {
c.logger.Info("Removed inactive nodes, regenerating peers.json",
zap.Int("removed", len(removed)),
zap.Strings("node_ids", removed))
if err := c.writePeersJSON(); err != nil {
c.logger.Error("Failed to write peers.json after cleanup", zap.Error(err))
}
}
}
// getPeersJSON generates the peers.json structure from active peers (acquires lock)
func (c *ClusterDiscoveryService) getPeersJSON() []map[string]interface{} {
c.mu.RLock()
defer c.mu.RUnlock()
return c.getPeersJSONUnlocked()
}
// getPeersJSONUnlocked generates the peers.json structure (must be called with lock held)
func (c *ClusterDiscoveryService) getPeersJSONUnlocked() []map[string]interface{} {
peers := make([]map[string]interface{}, 0, len(c.knownPeers))
for _, peer := range c.knownPeers {
peerEntry := map[string]interface{}{
"id": peer.RaftAddress, // RQLite uses raft address as node ID
"address": peer.RaftAddress,
"non_voter": false,
}
peers = append(peers, peerEntry)
}
return peers
}
// writePeersJSON atomically writes the peers.json file (acquires lock)
func (c *ClusterDiscoveryService) writePeersJSON() error {
c.mu.RLock()
peers := c.getPeersJSONUnlocked()
c.mu.RUnlock()
return c.writePeersJSONWithData(peers)
}
// writePeersJSONWithData writes the peers.json file with provided data (no lock needed)
func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]interface{}) error {
// Expand ~ in data directory path
dataDir := os.ExpandEnv(c.dataDir)
if strings.HasPrefix(dataDir, "~") {
home, err := os.UserHomeDir()
if err != nil {
return fmt.Errorf("failed to determine home directory: %w", err)
}
dataDir = filepath.Join(home, dataDir[1:])
}
// Get the RQLite raft directory
rqliteDir := filepath.Join(dataDir, "rqlite", "raft")
c.logger.Debug("Writing peers.json",
zap.String("data_dir", c.dataDir),
zap.String("expanded_path", dataDir),
zap.String("raft_dir", rqliteDir),
zap.Int("peer_count", len(peers)))
if err := os.MkdirAll(rqliteDir, 0755); err != nil {
return fmt.Errorf("failed to create raft directory %s: %w", rqliteDir, err)
}
peersFile := filepath.Join(rqliteDir, "peers.json")
backupFile := filepath.Join(rqliteDir, "peers.json.backup")
// Backup existing peers.json if it exists
if _, err := os.Stat(peersFile); err == nil {
c.logger.Debug("Backing up existing peers.json", zap.String("backup_file", backupFile))
data, err := os.ReadFile(peersFile)
if err == nil {
_ = os.WriteFile(backupFile, data, 0644)
}
}
// Marshal to JSON
data, err := json.MarshalIndent(peers, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal peers.json: %w", err)
}
c.logger.Debug("Marshaled peers.json", zap.Int("data_size", len(data)))
// Write atomically using temp file + rename
tempFile := peersFile + ".tmp"
if err := os.WriteFile(tempFile, data, 0644); err != nil {
return fmt.Errorf("failed to write temp peers.json %s: %w", tempFile, err)
}
if err := os.Rename(tempFile, peersFile); err != nil {
return fmt.Errorf("failed to rename %s to %s: %w", tempFile, peersFile, err)
}
nodeIDs := make([]string, 0, len(peers))
for _, p := range peers {
if id, ok := p["id"].(string); ok {
nodeIDs = append(nodeIDs, id)
}
}
c.logger.Info("peers.json written",
zap.String("file", peersFile),
zap.Int("node_count", len(peers)),
zap.Strings("node_ids", nodeIDs))
return nil
}
// GetActivePeers returns a list of active peers (not including self)
func (c *ClusterDiscoveryService) GetActivePeers() []*discovery.RQLiteNodeMetadata {
c.mu.RLock()
defer c.mu.RUnlock()
peers := make([]*discovery.RQLiteNodeMetadata, 0, len(c.knownPeers))
for _, peer := range c.knownPeers {
// Skip self (compare by raft address since that's the NodeID now)
if peer.NodeID == c.raftAddress {
continue
}
peers = append(peers, peer)
}
return peers
}
// GetAllPeers returns a list of all known peers (including self)
func (c *ClusterDiscoveryService) GetAllPeers() []*discovery.RQLiteNodeMetadata {
c.mu.RLock()
defer c.mu.RUnlock()
peers := make([]*discovery.RQLiteNodeMetadata, 0, len(c.knownPeers))
for _, peer := range c.knownPeers {
peers = append(peers, peer)
}
return peers
}
// GetNodeWithHighestLogIndex returns the node with the highest Raft log index
func (c *ClusterDiscoveryService) GetNodeWithHighestLogIndex() *discovery.RQLiteNodeMetadata {
c.mu.RLock()
defer c.mu.RUnlock()
var highest *discovery.RQLiteNodeMetadata
var maxIndex uint64 = 0
for _, peer := range c.knownPeers {
// Skip self (compare by raft address since that's the NodeID now)
if peer.NodeID == c.raftAddress {
continue
}
if peer.RaftLogIndex > maxIndex {
maxIndex = peer.RaftLogIndex
highest = peer
}
}
return highest
}
// HasRecentPeersJSON checks if peers.json was recently updated
func (c *ClusterDiscoveryService) HasRecentPeersJSON() bool {
c.mu.RLock()
defer c.mu.RUnlock()
// Consider recent if updated in last 5 minutes
return time.Since(c.lastUpdate) < 5*time.Minute
}
// FindJoinTargets discovers join targets via LibP2P, prioritizing bootstrap nodes
func (c *ClusterDiscoveryService) FindJoinTargets() []string {
c.mu.RLock()
defer c.mu.RUnlock()
targets := []string{}
// Prioritize bootstrap nodes
for _, peer := range c.knownPeers {
if peer.NodeType == "bootstrap" {
targets = append(targets, peer.RaftAddress)
}
}
// Add other nodes as fallback
for _, peer := range c.knownPeers {
if peer.NodeType != "bootstrap" {
targets = append(targets, peer.RaftAddress)
}
}
return targets
}
// WaitForDiscoverySettling waits for LibP2P discovery to settle (used on concurrent startup)
func (c *ClusterDiscoveryService) WaitForDiscoverySettling(ctx context.Context) {
settleDuration := 60 * time.Second
c.logger.Info("Waiting for discovery to settle",
zap.Duration("duration", settleDuration))
select {
case <-ctx.Done():
return
case <-time.After(settleDuration):
}
// Collect final peer list
c.updateClusterMembership()
c.mu.RLock()
peerCount := len(c.knownPeers)
c.mu.RUnlock()
c.logger.Info("Discovery settled",
zap.Int("peer_count", peerCount))
}
// TriggerSync manually triggers a cluster membership sync
func (c *ClusterDiscoveryService) TriggerSync() {
c.logger.Info("Manually triggering cluster membership sync")
// For bootstrap nodes, wait a bit for peer discovery to stabilize
if c.nodeType == "bootstrap" {
c.logger.Info("Bootstrap node: waiting for peer discovery to complete")
time.Sleep(5 * time.Second)
}
c.updateClusterMembership()
}
// TriggerPeerExchange actively exchanges peer information with connected peers
// This populates the peerstore with RQLite metadata from other nodes
func (c *ClusterDiscoveryService) TriggerPeerExchange(ctx context.Context) error {
if c.discoveryMgr == nil {
return fmt.Errorf("discovery manager not available")
}
c.logger.Info("Triggering peer exchange via discovery manager")
collected := c.discoveryMgr.TriggerPeerExchange(ctx)
c.logger.Info("Peer exchange completed", zap.Int("peers_with_metadata", collected))
return nil
}
// UpdateOwnMetadata updates our own RQLite metadata in the peerstore
func (c *ClusterDiscoveryService) UpdateOwnMetadata() {
metadata := &discovery.RQLiteNodeMetadata{
NodeID: c.raftAddress, // RQLite uses raft address as node ID
RaftAddress: c.raftAddress,
HTTPAddress: c.httpAddress,
NodeType: c.nodeType,
RaftLogIndex: c.rqliteManager.getRaftLogIndex(),
LastSeen: time.Now(),
ClusterVersion: "1.0",
}
// Store in our own peerstore for peer exchange
data, err := json.Marshal(metadata)
if err != nil {
c.logger.Error("Failed to marshal own metadata", zap.Error(err))
return
}
if err := c.host.Peerstore().Put(c.host.ID(), "rqlite_metadata", data); err != nil {
c.logger.Error("Failed to store own metadata", zap.Error(err))
return
}
c.logger.Debug("Updated own RQLite metadata",
zap.String("node_id", metadata.NodeID),
zap.Uint64("log_index", metadata.RaftLogIndex))
}
// StoreRemotePeerMetadata stores metadata received from a remote peer
func (c *ClusterDiscoveryService) StoreRemotePeerMetadata(peerID peer.ID, metadata *discovery.RQLiteNodeMetadata) error {
data, err := json.Marshal(metadata)
if err != nil {
return fmt.Errorf("failed to marshal metadata: %w", err)
}
if err := c.host.Peerstore().Put(peerID, "rqlite_metadata", data); err != nil {
return fmt.Errorf("failed to store metadata: %w", err)
}
c.logger.Debug("Stored remote peer metadata",
zap.String("peer_id", peerID.String()[:8]+"..."),
zap.String("node_id", metadata.NodeID))
return nil
}

122
pkg/rqlite/data_safety.go Normal file
View File

@ -0,0 +1,122 @@
package rqlite
import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"go.uber.org/zap"
)
// getRaftLogIndex returns the current Raft log index for this node
func (r *RQLiteManager) getRaftLogIndex() uint64 {
status, err := r.getRQLiteStatus()
if err != nil {
r.logger.Debug("Failed to get Raft log index", zap.Error(err))
return 0
}
// Return the highest index we have
maxIndex := status.Store.Raft.LastLogIndex
if status.Store.Raft.AppliedIndex > maxIndex {
maxIndex = status.Store.Raft.AppliedIndex
}
if status.Store.Raft.CommitIndex > maxIndex {
maxIndex = status.Store.Raft.CommitIndex
}
return maxIndex
}
// getRQLiteStatus queries the /status endpoint for cluster information
func (r *RQLiteManager) getRQLiteStatus() (*RQLiteStatus, error) {
url := fmt.Sprintf("http://localhost:%d/status", r.config.RQLitePort)
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to query status: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("status endpoint returned %d: %s", resp.StatusCode, string(body))
}
var status RQLiteStatus
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
return nil, fmt.Errorf("failed to decode status: %w", err)
}
return &status, nil
}
// getRQLiteNodes queries the /nodes endpoint for cluster membership
func (r *RQLiteManager) getRQLiteNodes() (RQLiteNodes, error) {
url := fmt.Sprintf("http://localhost:%d/nodes?ver=2", r.config.RQLitePort)
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to query nodes: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("nodes endpoint returned %d: %s", resp.StatusCode, string(body))
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read nodes response: %w", err)
}
// rqlite v8 wraps nodes in a top-level object; fall back to a raw array for older versions.
var wrapped struct {
Nodes RQLiteNodes `json:"nodes"`
}
if err := json.Unmarshal(body, &wrapped); err == nil && wrapped.Nodes != nil {
return wrapped.Nodes, nil
}
// Try legacy format (plain array)
var nodes RQLiteNodes
if err := json.Unmarshal(body, &nodes); err != nil {
return nil, fmt.Errorf("failed to decode nodes: %w", err)
}
return nodes, nil
}
// getRQLiteLeader returns the current leader address
func (r *RQLiteManager) getRQLiteLeader() (string, error) {
status, err := r.getRQLiteStatus()
if err != nil {
return "", err
}
leaderAddr := status.Store.Raft.LeaderAddr
if leaderAddr == "" {
return "", fmt.Errorf("no leader found")
}
return leaderAddr, nil
}
// isNodeReachable tests if a specific node is responding
func (r *RQLiteManager) isNodeReachable(httpAddress string) bool {
url := fmt.Sprintf("http://%s/status", httpAddress)
client := &http.Client{Timeout: 3 * time.Second}
resp, err := client.Get(url)
if err != nil {
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
}

74
pkg/rqlite/metrics.go Normal file
View File

@ -0,0 +1,74 @@
package rqlite
import (
"time"
)
// GetMetrics returns current cluster metrics
func (c *ClusterDiscoveryService) GetMetrics() *ClusterMetrics {
c.mu.RLock()
defer c.mu.RUnlock()
activeCount := 0
inactiveCount := 0
totalHealth := 0.0
currentLeader := ""
now := time.Now()
for nodeID, health := range c.peerHealth {
if health.Status == "active" {
activeCount++
// Calculate health score (0-100) based on last seen
timeSinceLastSeen := now.Sub(health.LastSeen)
healthScore := 100.0
if timeSinceLastSeen > time.Minute {
// Degrade health score based on time since last seen
healthScore = 100.0 - (float64(timeSinceLastSeen.Seconds()) / float64(c.inactivityLimit.Seconds()) * 100.0)
if healthScore < 0 {
healthScore = 0
}
}
totalHealth += healthScore
} else {
inactiveCount++
}
// Try to determine leader
if peer, ok := c.knownPeers[nodeID]; ok {
// We'd need to check the actual leader status from RQLite
// For now, bootstrap nodes are more likely to be leader
if peer.NodeType == "bootstrap" && currentLeader == "" {
currentLeader = nodeID
}
}
}
averageHealth := 0.0
if activeCount > 0 {
averageHealth = totalHealth / float64(activeCount)
}
// Determine discovery status
discoveryStatus := "healthy"
if len(c.knownPeers) == 0 {
discoveryStatus = "no_peers"
} else if len(c.knownPeers) == 1 {
discoveryStatus = "single_node"
} else if averageHealth < 50 {
discoveryStatus = "degraded"
}
return &ClusterMetrics{
ClusterSize: len(c.knownPeers),
ActiveNodes: activeCount,
InactiveNodes: inactiveCount,
RemovedNodes: 0, // Could track this with a counter
LastUpdate: c.lastUpdate,
DiscoveryStatus: discoveryStatus,
CurrentLeader: currentLeader,
AveragePeerHealth: averageHealth,
}
}

View File

@ -20,12 +20,13 @@ import (
// RQLiteManager manages an RQLite node instance // RQLiteManager manages an RQLite node instance
type RQLiteManager struct { type RQLiteManager struct {
config *config.DatabaseConfig config *config.DatabaseConfig
discoverConfig *config.DiscoveryConfig discoverConfig *config.DiscoveryConfig
dataDir string dataDir string
logger *zap.Logger logger *zap.Logger
cmd *exec.Cmd cmd *exec.Cmd
connection *gorqlite.Connection connection *gorqlite.Connection
discoveryService *ClusterDiscoveryService
} }
// waitForSQLAvailable waits until a simple query succeeds, indicating a leader is known and queries can be served. // waitForSQLAvailable waits until a simple query succeeds, indicating a leader is known and queries can be served.
@ -63,18 +64,71 @@ func NewRQLiteManager(cfg *config.DatabaseConfig, discoveryCfg *config.Discovery
config: cfg, config: cfg,
discoverConfig: discoveryCfg, discoverConfig: discoveryCfg,
dataDir: dataDir, dataDir: dataDir,
logger: logger, logger: logger.With(zap.String("component", "rqlite-manager")),
} }
} }
// SetDiscoveryService sets the cluster discovery service for this RQLite manager
func (r *RQLiteManager) SetDiscoveryService(service *ClusterDiscoveryService) {
r.discoveryService = service
}
// Start starts the RQLite node // Start starts the RQLite node
func (r *RQLiteManager) Start(ctx context.Context) error { func (r *RQLiteManager) Start(ctx context.Context) error {
rqliteDataDir, err := r.prepareDataDir()
if err != nil {
return err
}
if r.discoverConfig.HttpAdvAddress == "" {
return fmt.Errorf("discovery config HttpAdvAddress is empty")
}
// CRITICAL: Check if we need to do pre-start cluster discovery to build peers.json
// This handles the case where nodes have old cluster state and need coordinated recovery
if needsClusterRecovery, err := r.checkNeedsClusterRecovery(rqliteDataDir); err != nil {
return fmt.Errorf("failed to check cluster recovery status: %w", err)
} else if needsClusterRecovery {
r.logger.Info("Detected old cluster state requiring coordinated recovery")
if err := r.performPreStartClusterDiscovery(ctx, rqliteDataDir); err != nil {
return fmt.Errorf("pre-start cluster discovery failed: %w", err)
}
}
// Launch RQLite process
if err := r.launchProcess(ctx, rqliteDataDir); err != nil {
return err
}
// Wait for RQLite to be ready and establish connection
if err := r.waitForReadyAndConnect(ctx); err != nil {
return err
}
// Establish leadership/SQL availability
if err := r.establishLeadershipOrJoin(ctx, rqliteDataDir); err != nil {
return err
}
// Apply migrations
migrationsDir := "migrations"
if err := r.ApplyMigrations(ctx, migrationsDir); err != nil {
r.logger.Error("Migrations failed", zap.Error(err), zap.String("dir", migrationsDir))
return fmt.Errorf("apply migrations: %w", err)
}
r.logger.Info("RQLite node started successfully")
return nil
}
// prepareDataDir expands and creates the RQLite data directory
func (r *RQLiteManager) prepareDataDir() (string, error) {
// Expand ~ in data directory path // Expand ~ in data directory path
dataDir := os.ExpandEnv(r.dataDir) dataDir := os.ExpandEnv(r.dataDir)
if strings.HasPrefix(dataDir, "~") { if strings.HasPrefix(dataDir, "~") {
home, err := os.UserHomeDir() home, err := os.UserHomeDir()
if err != nil { if err != nil {
return fmt.Errorf("failed to determine home directory: %w", err) return "", fmt.Errorf("failed to determine home directory: %w", err)
} }
dataDir = filepath.Join(home, dataDir[1:]) dataDir = filepath.Join(home, dataDir[1:])
} }
@ -82,13 +136,14 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
// Create data directory // Create data directory
rqliteDataDir := filepath.Join(dataDir, "rqlite") rqliteDataDir := filepath.Join(dataDir, "rqlite")
if err := os.MkdirAll(rqliteDataDir, 0755); err != nil { if err := os.MkdirAll(rqliteDataDir, 0755); err != nil {
return fmt.Errorf("failed to create RQLite data directory: %w", err) return "", fmt.Errorf("failed to create RQLite data directory: %w", err)
} }
if r.discoverConfig.HttpAdvAddress == "" { return rqliteDataDir, nil
return fmt.Errorf("discovery config HttpAdvAddress is empty") }
}
// launchProcess starts the RQLite process with appropriate arguments
func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string) error {
// Build RQLite command // Build RQLite command
args := []string{ args := []string{
"-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort), "-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort),
@ -117,7 +172,8 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
} }
// Always add the join parameter in host:port form - let rqlited handle the rest // Always add the join parameter in host:port form - let rqlited handle the rest
args = append(args, "-join", joinArg) // Add retry parameters to handle slow cluster startup (e.g., during recovery)
args = append(args, "-join", joinArg, "-join-attempts", "30", "-join-interval", "10s")
} else { } else {
r.logger.Info("No join address specified - starting as new cluster") r.logger.Info("No join address specified - starting as new cluster")
} }
@ -129,9 +185,7 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
zap.String("data_dir", rqliteDataDir), zap.String("data_dir", rqliteDataDir),
zap.Int("http_port", r.config.RQLitePort), zap.Int("http_port", r.config.RQLitePort),
zap.Int("raft_port", r.config.RQLiteRaftPort), zap.Int("raft_port", r.config.RQLiteRaftPort),
zap.String("join_address", r.config.RQLiteJoinAddress), zap.String("join_address", r.config.RQLiteJoinAddress))
zap.Strings("full_args", args),
)
// Start RQLite process (not bound to ctx for graceful Stop handling) // Start RQLite process (not bound to ctx for graceful Stop handling)
r.cmd = exec.Command("rqlited", args...) r.cmd = exec.Command("rqlited", args...)
@ -144,6 +198,11 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
return fmt.Errorf("failed to start RQLite: %w", err) return fmt.Errorf("failed to start RQLite: %w", err)
} }
return nil
}
// waitForReadyAndConnect waits for RQLite to be ready and establishes connection
func (r *RQLiteManager) waitForReadyAndConnect(ctx context.Context) error {
// Wait for RQLite to be ready // Wait for RQLite to be ready
if err := r.waitForReady(ctx); err != nil { if err := r.waitForReady(ctx); err != nil {
if r.cmd != nil && r.cmd.Process != nil { if r.cmd != nil && r.cmd.Process != nil {
@ -162,24 +221,74 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
} }
r.connection = conn r.connection = conn
// Leadership/SQL readiness gating // Sanity check: verify rqlite's node ID matches our configured raft address
// if err := r.validateNodeID(); err != nil {
// Fresh bootstrap (no join, no prior state): wait for leadership so queries will work. r.logger.Debug("Node ID validation skipped", zap.Error(err))
// Existing state or joiners: wait for SQL availability (leader known) before proceeding, // Don't fail startup, but log at debug level
// so higher layers (storage) don't fail with 500 leader-not-found. }
if r.config.RQLiteJoinAddress == "" && !r.hasExistingState(rqliteDataDir) {
if err := r.waitForLeadership(ctx); err != nil { return nil
if r.cmd != nil && r.cmd.Process != nil { }
_ = r.cmd.Process.Kill()
// establishLeadershipOrJoin establishes leadership (bootstrap) or waits for SQL availability (joining)
func (r *RQLiteManager) establishLeadershipOrJoin(ctx context.Context, rqliteDataDir string) error {
if r.config.RQLiteJoinAddress == "" {
// Bootstrap node logic with data safety checks
r.logger.Info("Bootstrap node: checking if safe to lead")
// SAFETY: Check if we can safely become leader
canLead, err := r.canSafelyBecomeLeader()
if !canLead && err != nil {
r.logger.Warn("Not safe to become leader, attempting to join existing cluster",
zap.Error(err))
// Find node with highest log index and join it
if r.discoveryService != nil {
targetNode := r.discoveryService.GetNodeWithHighestLogIndex()
if targetNode != nil {
r.logger.Info("Joining node with higher data",
zap.String("target_node", targetNode.NodeID),
zap.String("raft_address", targetNode.RaftAddress),
zap.Uint64("their_index", targetNode.RaftLogIndex))
return r.joinExistingCluster(ctx, targetNode.RaftAddress)
}
} }
return fmt.Errorf("RQLite failed to establish leadership: %w", err)
} }
} else {
r.logger.Info("Waiting for RQLite SQL availability (leader discovery)") // Safe to lead - attempt leadership
// For joining nodes, wait longer for SQL availability leadershipErr := r.waitForLeadership(ctx)
if leadershipErr == nil {
r.logger.Info("Bootstrap node successfully established leadership")
return nil
}
r.logger.Warn("Initial leadership attempt failed, may need cluster recovery",
zap.Error(leadershipErr))
// Try recovery if we have peers.json from discovery
if r.discoveryService != nil {
peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json")
if _, err := os.Stat(peersPath); err == nil {
r.logger.Info("Attempting cluster recovery using peers.json",
zap.String("peers_file", peersPath))
if recoveryErr := r.recoverCluster(peersPath); recoveryErr == nil {
r.logger.Info("Cluster recovery successful, retrying leadership")
leadershipErr = r.waitForLeadership(ctx)
if leadershipErr == nil {
r.logger.Info("Bootstrap node established leadership after recovery")
return nil
}
} else {
r.logger.Warn("Cluster recovery failed", zap.Error(recoveryErr))
}
}
}
// Final fallback: SQL availability
r.logger.Warn("Leadership failed, trying SQL availability")
sqlCtx := ctx sqlCtx := ctx
if _, hasDeadline := ctx.Deadline(); !hasDeadline { if _, hasDeadline := ctx.Deadline(); !hasDeadline {
// If no deadline in context, create one for SQL availability check
var cancel context.CancelFunc var cancel context.CancelFunc
sqlCtx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) sqlCtx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel() defer cancel()
@ -190,18 +299,24 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
} }
return fmt.Errorf("RQLite SQL not available: %w", err) return fmt.Errorf("RQLite SQL not available: %w", err)
} }
return nil
} else {
// Joining node logic
r.logger.Info("Waiting for RQLite SQL availability (leader discovery)")
sqlCtx := ctx
if _, hasDeadline := ctx.Deadline(); !hasDeadline {
var cancel context.CancelFunc
sqlCtx, cancel = context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
}
if err := r.waitForSQLAvailable(sqlCtx); err != nil {
if r.cmd != nil && r.cmd.Process != nil {
_ = r.cmd.Process.Kill()
}
return fmt.Errorf("RQLite SQL not available: %w", err)
}
return nil
} }
// After waitForLeadership / waitForSQLAvailable succeeds, before returning:
migrationsDir := "migrations"
if err := r.ApplyMigrations(ctx, migrationsDir); err != nil {
r.logger.Error("Migrations failed", zap.Error(err), zap.String("dir", migrationsDir))
return fmt.Errorf("apply migrations: %w", err)
}
r.logger.Info("RQLite node started successfully")
return nil
} }
// hasExistingState returns true if the rqlite data directory already contains files or subdirectories. // hasExistingState returns true if the rqlite data directory already contains files or subdirectories.
@ -252,7 +367,12 @@ func (r *RQLiteManager) waitForReady(ctx context.Context) error {
func (r *RQLiteManager) waitForLeadership(ctx context.Context) error { func (r *RQLiteManager) waitForLeadership(ctx context.Context) error {
r.logger.Info("Waiting for RQLite to establish leadership...") r.logger.Info("Waiting for RQLite to establish leadership...")
for i := 0; i < 30; i++ { maxAttempts := 30
attempt := 0
backoffDelay := 500 * time.Millisecond
maxBackoff := 5 * time.Second
for attempt < maxAttempts {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
@ -266,10 +386,19 @@ func (r *RQLiteManager) waitForLeadership(ctx context.Context) error {
r.logger.Info("RQLite leadership established") r.logger.Info("RQLite leadership established")
return nil return nil
} }
r.logger.Debug("Waiting for leadership", zap.Error(err)) // Log every 5th attempt or on first attempt to reduce noise
if attempt%5 == 0 || attempt == 0 {
r.logger.Debug("Waiting for leadership", zap.Int("attempt", attempt+1), zap.Error(err))
}
} }
time.Sleep(1 * time.Second) // Exponential backoff with jitter
time.Sleep(backoffDelay)
backoffDelay = time.Duration(float64(backoffDelay) * 1.5)
if backoffDelay > maxBackoff {
backoffDelay = maxBackoff
}
attempt++
} }
return fmt.Errorf("RQLite failed to establish leadership within timeout") return fmt.Errorf("RQLite failed to establish leadership within timeout")
@ -380,3 +509,316 @@ func (r *RQLiteManager) testJoinAddress(joinAddress string) error {
r.logger.Info("Leader HTTP reachable", zap.String("status_url", statusURL)) r.logger.Info("Leader HTTP reachable", zap.String("status_url", statusURL))
return nil return nil
} }
// canSafelyBecomeLeader checks if this node can safely become leader without causing data loss
func (r *RQLiteManager) canSafelyBecomeLeader() (bool, error) {
// Get our current Raft log index
ourLogIndex := r.getRaftLogIndex()
// If no discovery service, assume it's safe (backward compatibility)
if r.discoveryService == nil {
r.logger.Debug("No discovery service, assuming safe to lead")
return true, nil
}
// Query discovery service for other nodes
otherNodes := r.discoveryService.GetActivePeers()
if len(otherNodes) == 0 {
// No other nodes - safe to bootstrap
r.logger.Debug("No other nodes discovered, safe to lead",
zap.Uint64("our_log_index", ourLogIndex))
return true, nil
}
// Check if any other node has higher log index
for _, peer := range otherNodes {
if peer.RaftLogIndex > ourLogIndex {
// Other node has more data - we should join them
return false, fmt.Errorf(
"node %s has higher log index (%d > %d), should join as follower",
peer.NodeID, peer.RaftLogIndex, ourLogIndex)
}
}
// We have most recent data or equal - safe to lead
r.logger.Info("Safe to lead - we have most recent data",
zap.Uint64("our_log_index", ourLogIndex),
zap.Int("other_nodes_checked", len(otherNodes)))
return true, nil
}
// joinExistingCluster attempts to join an existing cluster as a follower
func (r *RQLiteManager) joinExistingCluster(ctx context.Context, raftAddress string) error {
r.logger.Info("Attempting to join existing cluster",
zap.String("target_raft_address", raftAddress))
// Wait for the target to be reachable
if err := r.waitForJoinTarget(ctx, raftAddress, 2*time.Minute); err != nil {
return fmt.Errorf("join target not reachable: %w", err)
}
// Wait for SQL availability (the target should have a leader)
sqlCtx := ctx
if _, hasDeadline := ctx.Deadline(); !hasDeadline {
var cancel context.CancelFunc
sqlCtx, cancel = context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
}
if err := r.waitForSQLAvailable(sqlCtx); err != nil {
return fmt.Errorf("failed to join cluster - SQL not available: %w", err)
}
r.logger.Info("Successfully joined existing cluster")
return nil
}
// exponentialBackoff calculates exponential backoff duration with jitter
func (r *RQLiteManager) exponentialBackoff(attempt int, baseDelay time.Duration, maxDelay time.Duration) time.Duration {
// Calculate exponential backoff: baseDelay * 2^attempt
delay := baseDelay * time.Duration(1<<uint(attempt))
if delay > maxDelay {
delay = maxDelay
}
// Add jitter (±20%)
jitter := time.Duration(float64(delay) * 0.2 * (2.0*float64(time.Now().UnixNano()%100)/100.0 - 1.0))
return delay + jitter
}
// recoverCluster restarts RQLite using the recovery.db created from peers.json
func (r *RQLiteManager) recoverCluster(peersJSONPath string) error {
r.logger.Info("Initiating cluster recovery by restarting RQLite",
zap.String("peers_file", peersJSONPath))
// Stop the current RQLite process
r.logger.Info("Stopping RQLite for recovery")
if err := r.Stop(); err != nil {
r.logger.Warn("Error stopping RQLite", zap.Error(err))
}
// Wait for process to fully stop
time.Sleep(2 * time.Second)
// Restart RQLite - it will automatically detect peers.json and perform recovery
r.logger.Info("Restarting RQLite (will auto-recover using peers.json)")
// Build the same args as original Start() - expand ~ in data directory
dataDir := os.ExpandEnv(r.dataDir)
if strings.HasPrefix(dataDir, "~") {
home, err := os.UserHomeDir()
if err != nil {
return fmt.Errorf("failed to determine home directory: %w", err)
}
dataDir = filepath.Join(home, dataDir[1:])
}
rqliteDataDir := filepath.Join(dataDir, "rqlite")
args := []string{
"-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort),
"-http-adv-addr", r.discoverConfig.HttpAdvAddress,
"-raft-adv-addr", r.discoverConfig.RaftAdvAddress,
"-raft-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLiteRaftPort),
rqliteDataDir,
}
// Restart RQLite
r.cmd = exec.Command("rqlited", args...)
r.cmd.Stdout = os.Stdout
r.cmd.Stderr = os.Stderr
if err := r.cmd.Start(); err != nil {
return fmt.Errorf("failed to restart RQLite: %w", err)
}
r.logger.Info("RQLite restarted, waiting for it to become ready")
time.Sleep(3 * time.Second)
// Recreate connection
conn, err := gorqlite.Open(fmt.Sprintf("http://localhost:%d", r.config.RQLitePort))
if err != nil {
return fmt.Errorf("failed to reconnect to RQLite: %w", err)
}
r.connection = conn
r.logger.Info("Cluster recovery completed, RQLite restarted with new configuration")
return nil
}
// checkNeedsClusterRecovery checks if the node has old cluster state that requires coordinated recovery
// Returns true if there are snapshots but the raft log is empty (typical after a crash/restart)
func (r *RQLiteManager) checkNeedsClusterRecovery(rqliteDataDir string) (bool, error) {
// Check for snapshots directory
snapshotsDir := filepath.Join(rqliteDataDir, "rsnapshots")
if _, err := os.Stat(snapshotsDir); os.IsNotExist(err) {
// No snapshots = fresh start, no recovery needed
return false, nil
}
// Check if snapshots directory has any snapshots
entries, err := os.ReadDir(snapshotsDir)
if err != nil {
return false, fmt.Errorf("failed to read snapshots directory: %w", err)
}
hasSnapshots := false
for _, entry := range entries {
if entry.IsDir() || strings.HasSuffix(entry.Name(), ".db") {
hasSnapshots = true
break
}
}
if !hasSnapshots {
// No snapshots = fresh start
return false, nil
}
// Check raft log size - if it's the default empty size, we need recovery
raftLogPath := filepath.Join(rqliteDataDir, "raft.db")
if info, err := os.Stat(raftLogPath); err == nil {
// Empty or default-sized log with snapshots means we need coordinated recovery
if info.Size() <= 8*1024*1024 { // <= 8MB (default empty log size)
r.logger.Info("Detected cluster recovery situation: snapshots exist but raft log is empty/default size",
zap.String("snapshots_dir", snapshotsDir),
zap.Int64("raft_log_size", info.Size()))
return true, nil
}
}
return false, nil
}
// performPreStartClusterDiscovery waits for peer discovery and builds a complete peers.json
// before starting RQLite. This ensures all nodes use the same cluster membership for recovery.
func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rqliteDataDir string) error {
if r.discoveryService == nil {
r.logger.Warn("No discovery service available, cannot perform pre-start cluster discovery")
return fmt.Errorf("discovery service not available")
}
r.logger.Info("Waiting for peer discovery to find other cluster members...")
// CRITICAL: First, actively trigger peer exchange to populate peerstore with RQLite metadata
// The peerstore needs RQLite metadata from other nodes BEFORE we can collect it
r.logger.Info("Triggering peer exchange to collect RQLite metadata from connected peers")
if err := r.discoveryService.TriggerPeerExchange(ctx); err != nil {
r.logger.Warn("Peer exchange failed, continuing anyway", zap.Error(err))
}
// Give peer exchange a moment to complete
time.Sleep(1 * time.Second)
// Now trigger cluster membership sync to populate knownPeers map from the peerstore
r.logger.Info("Triggering initial cluster membership sync to populate peer list")
r.discoveryService.TriggerSync()
// Give the sync a moment to complete
time.Sleep(2 * time.Second)
// Wait for peer discovery - give it time to find peers (30 seconds should be enough)
discoveryDeadline := time.Now().Add(30 * time.Second)
var discoveredPeers int
for time.Now().Before(discoveryDeadline) {
// Check how many peers with RQLite metadata we've discovered
allPeers := r.discoveryService.GetAllPeers()
discoveredPeers = len(allPeers)
r.logger.Info("Peer discovery progress",
zap.Int("discovered_peers", discoveredPeers),
zap.Duration("time_remaining", time.Until(discoveryDeadline)))
// If we have at least our minimum cluster size, proceed
if discoveredPeers >= r.config.MinClusterSize {
r.logger.Info("Found minimum cluster size peers, proceeding with recovery",
zap.Int("discovered_peers", discoveredPeers),
zap.Int("min_cluster_size", r.config.MinClusterSize))
break
}
// Wait a bit before checking again
time.Sleep(2 * time.Second)
}
if discoveredPeers == 0 {
r.logger.Warn("No peers discovered during pre-start discovery window, will attempt solo recovery")
// Continue anyway - might be the only node left
}
// Trigger final sync to ensure peers.json is up to date with latest discovered peers
r.logger.Info("Triggering final cluster membership sync to build complete peers.json")
r.discoveryService.TriggerSync()
// Wait a moment for the sync to complete
time.Sleep(2 * time.Second)
// Verify peers.json was created
peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json")
if _, err := os.Stat(peersPath); err != nil {
return fmt.Errorf("peers.json was not created after discovery: %w", err)
}
r.logger.Info("Pre-start cluster discovery completed successfully",
zap.String("peers_file", peersPath),
zap.Int("peer_count", discoveredPeers))
return nil
}
// validateNodeID checks that rqlite's reported node ID matches our configured raft address
func (r *RQLiteManager) validateNodeID() error {
// Query /nodes endpoint to get our node ID
// Retry a few times as the endpoint might not be ready immediately
for i := 0; i < 5; i++ {
nodes, err := r.getRQLiteNodes()
if err != nil {
// If endpoint is not ready yet, wait and retry
if i < 4 {
time.Sleep(500 * time.Millisecond)
continue
}
// Log at debug level if validation fails - not critical
r.logger.Debug("Node ID validation skipped (endpoint unavailable)", zap.Error(err))
return nil
}
expectedID := r.discoverConfig.RaftAdvAddress
if expectedID == "" {
return fmt.Errorf("raft_adv_address not configured")
}
// If cluster is still forming, nodes list might be empty - that's okay
if len(nodes) == 0 {
r.logger.Debug("Node ID validation skipped (cluster not yet formed)")
return nil
}
// Find our node in the cluster (match by address)
for _, node := range nodes {
if node.Address == expectedID {
if node.ID != expectedID {
r.logger.Error("CRITICAL: RQLite node ID mismatch",
zap.String("configured_raft_address", expectedID),
zap.String("rqlite_node_id", node.ID),
zap.String("rqlite_node_address", node.Address),
zap.String("explanation", "peers.json id field must match rqlite's node ID (raft address)"))
return fmt.Errorf("node ID mismatch: configured %s but rqlite reports %s", expectedID, node.ID)
}
r.logger.Debug("Node ID validation passed",
zap.String("node_id", node.ID),
zap.String("address", node.Address))
return nil
}
}
// If we can't find ourselves but other nodes exist, cluster might still be forming
// This is fine - don't log a warning
r.logger.Debug("Node ID validation skipped (node not yet in cluster membership)",
zap.String("expected_address", expectedID),
zap.Int("nodes_in_cluster", len(nodes)))
return nil
}
return nil
}

71
pkg/rqlite/types.go Normal file
View File

@ -0,0 +1,71 @@
package rqlite
import "time"
// RQLiteStatus represents the response from RQLite's /status endpoint
type RQLiteStatus struct {
Store struct {
Raft struct {
AppliedIndex uint64 `json:"applied_index"`
CommitIndex uint64 `json:"commit_index"`
LastLogIndex uint64 `json:"last_log_index"`
LastSnapshotIndex uint64 `json:"last_snapshot_index"`
State string `json:"state"`
LeaderID string `json:"leader_id"`
LeaderAddr string `json:"leader_addr"`
} `json:"raft"`
DBConf struct {
DSN string `json:"dsn"`
Memory bool `json:"memory"`
} `json:"db_conf"`
} `json:"store"`
Runtime struct {
GOARCH string `json:"GOARCH"`
GOOS string `json:"GOOS"`
GOMAXPROCS int `json:"GOMAXPROCS"`
NumCPU int `json:"num_cpu"`
NumGoroutine int `json:"num_goroutine"`
Version string `json:"version"`
} `json:"runtime"`
HTTP struct {
Addr string `json:"addr"`
Auth string `json:"auth"`
} `json:"http"`
Node struct {
Uptime string `json:"uptime"`
StartTime string `json:"start_time"`
} `json:"node"`
}
// RQLiteNode represents a node in the RQLite cluster
type RQLiteNode struct {
ID string `json:"id"`
Address string `json:"address"`
Leader bool `json:"leader"`
Voter bool `json:"voter"`
Reachable bool `json:"reachable"`
}
// RQLiteNodes represents the response from RQLite's /nodes endpoint
type RQLiteNodes []RQLiteNode
// PeerHealth tracks the health status of a peer
type PeerHealth struct {
LastSeen time.Time
LastSuccessful time.Time
FailureCount int
Status string // "active", "degraded", "inactive"
}
// ClusterMetrics contains cluster-wide metrics
type ClusterMetrics struct {
ClusterSize int
ActiveNodes int
InactiveNodes int
RemovedNodes int
LastUpdate time.Time
DiscoveryStatus string
CurrentLeader string
AveragePeerHealth float64
}

View File

@ -223,10 +223,48 @@ install_anon() {
# Add repository # Add repository
echo "deb [signed-by=/usr/share/keyrings/anyone-archive-keyring.gpg] https://deb.anyone.io/ anyone main" | sudo tee /etc/apt/sources.list.d/anyone.list >/dev/null echo "deb [signed-by=/usr/share/keyrings/anyone-archive-keyring.gpg] https://deb.anyone.io/ anyone main" | sudo tee /etc/apt/sources.list.d/anyone.list >/dev/null
# Update and install # Preseed terms acceptance to avoid interactive prompt
log "Pre-accepting Anon terms and conditions..."
# Try multiple debconf question formats
echo "anon anon/terms boolean true" | sudo debconf-set-selections
echo "anon anon/terms seen true" | sudo debconf-set-selections
# Also try with select/string format
echo "anon anon/terms select true" | sudo debconf-set-selections || true
# Query debconf to verify the question exists and set it properly
# Some packages use different question formats
sudo debconf-get-selections | grep -i anon || true
# Create anonrc directory and file with AgreeToTerms before installation
# This ensures terms are accepted even if the post-install script checks the file
sudo mkdir -p /etc/anon
if [ ! -f /etc/anon/anonrc ]; then
echo "AgreeToTerms 1" | sudo tee /etc/anon/anonrc >/dev/null
fi
# Also create a terms-agreement file if Anon checks for it
# Check multiple possible locations where Anon might look for terms acceptance
sudo mkdir -p /var/lib/anon
echo "agreed" | sudo tee /var/lib/anon/terms-agreement >/dev/null 2>&1 || true
sudo mkdir -p /usr/share/anon
echo "agreed" | sudo tee /usr/share/anon/terms-agreement >/dev/null 2>&1 || true
# Also create near the GPG keyring directory (as the user suggested)
sudo mkdir -p /usr/share/keyrings/anon
echo "agreed" | sudo tee /usr/share/keyrings/anon/terms-agreement >/dev/null 2>&1 || true
# Create in the keyring directory itself as a marker file
echo "agreed" | sudo tee /usr/share/keyrings/anyone-terms-agreed >/dev/null 2>&1 || true
# Update and install with non-interactive frontend
log "Installing Anon package..." log "Installing Anon package..."
sudo apt update -qq sudo apt update -qq
if ! sudo apt install -y anon; then
# Use DEBIAN_FRONTEND=noninteractive and set debconf values directly via apt-get options
# This is more reliable than just debconf-set-selections
if ! sudo DEBIAN_FRONTEND=noninteractive \
apt-get install -y \
-o Dpkg::Options::="--force-confdef" \
-o Dpkg::Options::="--force-confold" \
anon; then
warning "Anon installation failed" warning "Anon installation failed"
return 1 return 1
fi fi
@ -285,10 +323,16 @@ configure_anon_defaults() {
echo "SocksPort 9050" | sudo tee -a /etc/anon/anonrc >/dev/null echo "SocksPort 9050" | sudo tee -a /etc/anon/anonrc >/dev/null
fi fi
# Auto-accept terms in config file
if ! grep -q "^AgreeToTerms" /etc/anon/anonrc; then
echo "AgreeToTerms 1" | sudo tee -a /etc/anon/anonrc >/dev/null
fi
log " Nickname: ${HOSTNAME}" log " Nickname: ${HOSTNAME}"
log " ORPort: 9001 (default)" log " ORPort: 9001 (default)"
log " ControlPort: 9051" log " ControlPort: 9051"
log " SOCKSPort: 9050" log " SOCKSPort: 9050"
log " AgreeToTerms: 1 (auto-accepted)"
fi fi
} }