From 27f2460bf2bfbb31f79a30d55477fb49bed2dd79 Mon Sep 17 00:00:00 2001 From: anonpenguin Date: Sat, 9 Aug 2025 08:57:11 +0300 Subject: [PATCH] feat: implement graceful shutdown and improve cluster join reliability --- cmd/node/main.go | 5 ++ pkg/database/rqlite.go | 147 ++++++++++++++++++++++++++++++++++------- pkg/node/node.go | 23 +++++++ 3 files changed, 150 insertions(+), 25 deletions(-) diff --git a/cmd/node/main.go b/cmd/node/main.go index c930a8b..a4ed9eb 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -197,10 +197,12 @@ func main() { // Start node in a goroutine errChan := make(chan error, 1) + doneChan := make(chan struct{}) go func() { if err := startNode(ctx, cfg, port, isBootstrap, logger); err != nil { errChan <- err } + close(doneChan) }() // Wait for interrupt signal or error @@ -214,6 +216,9 @@ func main() { case <-c: logger.Printf("Shutting down node...") cancel() + // Wait for node goroutine to finish cleanly + <-doneChan + logger.Printf("Node shutdown complete") } } diff --git a/pkg/database/rqlite.go b/pkg/database/rqlite.go index f3fef4a..fc10031 100644 --- a/pkg/database/rqlite.go +++ b/pkg/database/rqlite.go @@ -2,6 +2,7 @@ package database import ( "context" + "errors" "fmt" "net" "net/http" @@ -9,6 +10,7 @@ import ( "os/exec" "path/filepath" "strings" + "syscall" "time" "github.com/rqlite/gorqlite" @@ -26,6 +28,34 @@ type RQLiteManager struct { connection *gorqlite.Connection } +// waitForSQLAvailable waits until a simple query succeeds, indicating a leader is known and queries can be served. +func (r *RQLiteManager) waitForSQLAvailable(ctx context.Context) error { + if r.connection == nil { + return fmt.Errorf("no rqlite connection") + } + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + attempts := 0 + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + attempts++ + _, err := r.connection.QueryOne("SELECT 1") + if err == nil { + r.logger.Info("RQLite SQL is available") + return nil + } + if attempts%5 == 0 { // log every ~5s to reduce noise + r.logger.Debug("Waiting for RQLite SQL availability", zap.Error(err)) + } + } + } +} + // NewRQLiteManager creates a new RQLite manager func NewRQLiteManager(cfg *config.DatabaseConfig, dataDir string, logger *zap.Logger) *RQLiteManager { return &RQLiteManager{ @@ -89,14 +119,12 @@ func (r *RQLiteManager) Start(ctx context.Context) error { joinArg = strings.TrimPrefix(joinArg, "https://") } - // Test connectivity (HTTP status) on the leader's HTTP port derived from join host - if err := r.testJoinAddress(joinArg); err != nil { - r.logger.Warn("Join target connectivity test failed, but will still attempt to join", - zap.String("join_address", r.config.RQLiteJoinAddress), - zap.Error(err)) - } else { - r.logger.Info("Join target is reachable, proceeding with cluster join") - } + // Wait for join target to become reachable to avoid forming a separate cluster (wait indefinitely) + if err := r.waitForJoinTarget(ctx, joinArg, 0); err != nil { + r.logger.Warn("Join target did not become reachable within timeout; will still attempt to join", + zap.String("join_address", r.config.RQLiteJoinAddress), + zap.Error(err)) + } // Always add the join parameter in host:port form - let rqlited handle the rest args = append(args, "-join", joinArg) @@ -116,8 +144,8 @@ func (r *RQLiteManager) Start(ctx context.Context) error { zap.Strings("full_args", args), ) - // Start RQLite process - r.cmd = exec.CommandContext(ctx, "rqlited", args...) + // Start RQLite process (not bound to ctx for graceful Stop handling) + r.cmd = exec.Command("rqlited", args...) r.cmd.Stdout = os.Stdout r.cmd.Stderr = os.Stderr @@ -127,29 +155,41 @@ func (r *RQLiteManager) Start(ctx context.Context) error { // Wait for RQLite to be ready if err := r.waitForReady(ctx); err != nil { - r.cmd.Process.Kill() + if r.cmd != nil && r.cmd.Process != nil { + _ = r.cmd.Process.Kill() + } return fmt.Errorf("RQLite failed to become ready: %w", err) } // Create connection conn, err := gorqlite.Open(fmt.Sprintf("http://localhost:%d", r.config.RQLitePort)) if err != nil { - r.cmd.Process.Kill() + if r.cmd != nil && r.cmd.Process != nil { + _ = r.cmd.Process.Kill() + } return fmt.Errorf("failed to connect to RQLite: %w", err) } r.connection = conn - // Wait for RQLite to establish leadership (only on fresh bootstrap) - if r.config.RQLiteJoinAddress == "" { - if !r.hasExistingState(rqliteDataDir) { - if err := r.waitForLeadership(ctx); err != nil { - r.cmd.Process.Kill() - return fmt.Errorf("RQLite failed to establish leadership: %w", err) + // Leadership/SQL readiness gating + // + // Fresh bootstrap (no join, no prior state): wait for leadership so queries will work. + // Existing state or joiners: wait for SQL availability (leader known) before proceeding, + // so higher layers (storage) don't fail with 500 leader-not-found. + if r.config.RQLiteJoinAddress == "" && !r.hasExistingState(rqliteDataDir) { + if err := r.waitForLeadership(ctx); err != nil { + if r.cmd != nil && r.cmd.Process != nil { + _ = r.cmd.Process.Kill() } - } else { - // Existing state implies this node may be part of a multi-voter cluster; quorum may be required. - // Do not fail startup if leadership is not immediately available. - r.logger.Info("Existing Raft state detected; skipping leadership wait (cluster may require quorum)") + return fmt.Errorf("RQLite failed to establish leadership: %w", err) + } + } else { + r.logger.Info("Waiting for RQLite SQL availability (leader discovery)") + if err := r.waitForSQLAvailable(ctx); err != nil { + if r.cmd != nil && r.cmd.Process != nil { + _ = r.cmd.Process.Kill() + } + return fmt.Errorf("RQLite SQL not available: %w", err) } } @@ -235,16 +275,73 @@ func (r *RQLiteManager) GetConnection() *gorqlite.Connection { func (r *RQLiteManager) Stop() error { if r.connection != nil { r.connection.Close() + r.connection = nil } - if r.cmd != nil && r.cmd.Process != nil { - r.logger.Info("Stopping RQLite node") - return r.cmd.Process.Kill() + if r.cmd == nil || r.cmd.Process == nil { + return nil + } + + r.logger.Info("Stopping RQLite node (graceful)") + // Try SIGTERM first + if err := r.cmd.Process.Signal(syscall.SIGTERM); err != nil { + // Fallback to Kill if signaling fails + _ = r.cmd.Process.Kill() + return nil + } + + // Wait up to 5 seconds for graceful shutdown + done := make(chan error, 1) + go func() { done <- r.cmd.Wait() }() + + select { + case err := <-done: + if err != nil && !errors.Is(err, os.ErrClosed) { + r.logger.Warn("RQLite process exited with error", zap.Error(err)) + } + case <-time.After(5 * time.Second): + r.logger.Warn("RQLite did not exit in time; killing") + _ = r.cmd.Process.Kill() } return nil } +// waitForJoinTarget waits until the join target's HTTP status becomes reachable, or until timeout +func (r *RQLiteManager) waitForJoinTarget(ctx context.Context, joinAddress string, timeout time.Duration) error { + var deadline time.Time + if timeout > 0 { + deadline = time.Now().Add(timeout) + } + var lastErr error + + for { + if err := r.testJoinAddress(joinAddress); err == nil { + r.logger.Info("Join target is reachable, proceeding with cluster join") + return nil + } else { + lastErr = err + r.logger.Debug("Join target not yet reachable; waiting...", zap.String("join_address", joinAddress), zap.Error(err)) + } + + // Check context + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(2 * time.Second): + } + + if !deadline.IsZero() && time.Now().After(deadline) { + break + } + } + + if lastErr == nil { + lastErr = fmt.Errorf("join target not reachable within %s", timeout) + } + return lastErr +} + // getExternalIP attempts to get the external IP address of this machine func (r *RQLiteManager) getExternalIP() (string, error) { // Method 1: Try using `ip route get` to find the IP used to reach the internet diff --git a/pkg/node/node.go b/pkg/node/node.go index 1e23918..30989ef 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -160,6 +160,26 @@ func (n *Node) startLibP2P() error { // Don't fail - continue without bootstrap connections } + // Background reconnect loop: keep trying to connect to bootstrap peers for a short window + // This helps when nodes are started slightly out-of-order in dev. + if len(n.config.Discovery.BootstrapPeers) > 0 { + go func() { + for i := 0; i < 12; i++ { // ~60s total + if n.host == nil || n.dht == nil { + return + } + // If we already have peers or DHT table entries, stop retrying + if len(n.host.Network().Peers()) > 0 || len(n.dht.RoutingTable().ListPeers()) > 0 { + return + } + if err := n.connectToBootstrapPeers(); err == nil { + n.logger.Debug("Bootstrap reconnect attempt completed") + } + time.Sleep(5 * time.Second) + } + }() + } + // Add bootstrap peers to DHT routing table BEFORE bootstrapping if len(n.config.Discovery.BootstrapPeers) > 0 { n.logger.Info("Adding bootstrap peers to DHT routing table") @@ -386,6 +406,9 @@ func (n *Node) Stop() error { if n.rqliteAdapter != nil { n.rqliteAdapter.Close() } + if n.rqliteManager != nil { + _ = n.rqliteManager.Stop() + } n.logger.ComponentInfo(logging.ComponentNode, "Network node stopped") return nil