mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-10-06 13:49:07 +00:00
feat: implement graceful shutdown and improve cluster join reliability
This commit is contained in:
parent
a38cc08809
commit
27f2460bf2
@ -197,10 +197,12 @@ func main() {
|
|||||||
|
|
||||||
// Start node in a goroutine
|
// Start node in a goroutine
|
||||||
errChan := make(chan error, 1)
|
errChan := make(chan error, 1)
|
||||||
|
doneChan := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
if err := startNode(ctx, cfg, port, isBootstrap, logger); err != nil {
|
if err := startNode(ctx, cfg, port, isBootstrap, logger); err != nil {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
}
|
}
|
||||||
|
close(doneChan)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Wait for interrupt signal or error
|
// Wait for interrupt signal or error
|
||||||
@ -214,6 +216,9 @@ func main() {
|
|||||||
case <-c:
|
case <-c:
|
||||||
logger.Printf("Shutting down node...")
|
logger.Printf("Shutting down node...")
|
||||||
cancel()
|
cancel()
|
||||||
|
// Wait for node goroutine to finish cleanly
|
||||||
|
<-doneChan
|
||||||
|
logger.Printf("Node shutdown complete")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,6 +2,7 @@ package database
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -9,6 +10,7 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/rqlite/gorqlite"
|
"github.com/rqlite/gorqlite"
|
||||||
@ -26,6 +28,34 @@ type RQLiteManager struct {
|
|||||||
connection *gorqlite.Connection
|
connection *gorqlite.Connection
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitForSQLAvailable waits until a simple query succeeds, indicating a leader is known and queries can be served.
|
||||||
|
func (r *RQLiteManager) waitForSQLAvailable(ctx context.Context) error {
|
||||||
|
if r.connection == nil {
|
||||||
|
return fmt.Errorf("no rqlite connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
attempts := 0
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-ticker.C:
|
||||||
|
attempts++
|
||||||
|
_, err := r.connection.QueryOne("SELECT 1")
|
||||||
|
if err == nil {
|
||||||
|
r.logger.Info("RQLite SQL is available")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if attempts%5 == 0 { // log every ~5s to reduce noise
|
||||||
|
r.logger.Debug("Waiting for RQLite SQL availability", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NewRQLiteManager creates a new RQLite manager
|
// NewRQLiteManager creates a new RQLite manager
|
||||||
func NewRQLiteManager(cfg *config.DatabaseConfig, dataDir string, logger *zap.Logger) *RQLiteManager {
|
func NewRQLiteManager(cfg *config.DatabaseConfig, dataDir string, logger *zap.Logger) *RQLiteManager {
|
||||||
return &RQLiteManager{
|
return &RQLiteManager{
|
||||||
@ -89,13 +119,11 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
|
|||||||
joinArg = strings.TrimPrefix(joinArg, "https://")
|
joinArg = strings.TrimPrefix(joinArg, "https://")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test connectivity (HTTP status) on the leader's HTTP port derived from join host
|
// Wait for join target to become reachable to avoid forming a separate cluster (wait indefinitely)
|
||||||
if err := r.testJoinAddress(joinArg); err != nil {
|
if err := r.waitForJoinTarget(ctx, joinArg, 0); err != nil {
|
||||||
r.logger.Warn("Join target connectivity test failed, but will still attempt to join",
|
r.logger.Warn("Join target did not become reachable within timeout; will still attempt to join",
|
||||||
zap.String("join_address", r.config.RQLiteJoinAddress),
|
zap.String("join_address", r.config.RQLiteJoinAddress),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
} else {
|
|
||||||
r.logger.Info("Join target is reachable, proceeding with cluster join")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Always add the join parameter in host:port form - let rqlited handle the rest
|
// Always add the join parameter in host:port form - let rqlited handle the rest
|
||||||
@ -116,8 +144,8 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
|
|||||||
zap.Strings("full_args", args),
|
zap.Strings("full_args", args),
|
||||||
)
|
)
|
||||||
|
|
||||||
// Start RQLite process
|
// Start RQLite process (not bound to ctx for graceful Stop handling)
|
||||||
r.cmd = exec.CommandContext(ctx, "rqlited", args...)
|
r.cmd = exec.Command("rqlited", args...)
|
||||||
r.cmd.Stdout = os.Stdout
|
r.cmd.Stdout = os.Stdout
|
||||||
r.cmd.Stderr = os.Stderr
|
r.cmd.Stderr = os.Stderr
|
||||||
|
|
||||||
@ -127,29 +155,41 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
|
|||||||
|
|
||||||
// Wait for RQLite to be ready
|
// Wait for RQLite to be ready
|
||||||
if err := r.waitForReady(ctx); err != nil {
|
if err := r.waitForReady(ctx); err != nil {
|
||||||
r.cmd.Process.Kill()
|
if r.cmd != nil && r.cmd.Process != nil {
|
||||||
|
_ = r.cmd.Process.Kill()
|
||||||
|
}
|
||||||
return fmt.Errorf("RQLite failed to become ready: %w", err)
|
return fmt.Errorf("RQLite failed to become ready: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create connection
|
// Create connection
|
||||||
conn, err := gorqlite.Open(fmt.Sprintf("http://localhost:%d", r.config.RQLitePort))
|
conn, err := gorqlite.Open(fmt.Sprintf("http://localhost:%d", r.config.RQLitePort))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.cmd.Process.Kill()
|
if r.cmd != nil && r.cmd.Process != nil {
|
||||||
|
_ = r.cmd.Process.Kill()
|
||||||
|
}
|
||||||
return fmt.Errorf("failed to connect to RQLite: %w", err)
|
return fmt.Errorf("failed to connect to RQLite: %w", err)
|
||||||
}
|
}
|
||||||
r.connection = conn
|
r.connection = conn
|
||||||
|
|
||||||
// Wait for RQLite to establish leadership (only on fresh bootstrap)
|
// Leadership/SQL readiness gating
|
||||||
if r.config.RQLiteJoinAddress == "" {
|
//
|
||||||
if !r.hasExistingState(rqliteDataDir) {
|
// Fresh bootstrap (no join, no prior state): wait for leadership so queries will work.
|
||||||
|
// Existing state or joiners: wait for SQL availability (leader known) before proceeding,
|
||||||
|
// so higher layers (storage) don't fail with 500 leader-not-found.
|
||||||
|
if r.config.RQLiteJoinAddress == "" && !r.hasExistingState(rqliteDataDir) {
|
||||||
if err := r.waitForLeadership(ctx); err != nil {
|
if err := r.waitForLeadership(ctx); err != nil {
|
||||||
r.cmd.Process.Kill()
|
if r.cmd != nil && r.cmd.Process != nil {
|
||||||
|
_ = r.cmd.Process.Kill()
|
||||||
|
}
|
||||||
return fmt.Errorf("RQLite failed to establish leadership: %w", err)
|
return fmt.Errorf("RQLite failed to establish leadership: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Existing state implies this node may be part of a multi-voter cluster; quorum may be required.
|
r.logger.Info("Waiting for RQLite SQL availability (leader discovery)")
|
||||||
// Do not fail startup if leadership is not immediately available.
|
if err := r.waitForSQLAvailable(ctx); err != nil {
|
||||||
r.logger.Info("Existing Raft state detected; skipping leadership wait (cluster may require quorum)")
|
if r.cmd != nil && r.cmd.Process != nil {
|
||||||
|
_ = r.cmd.Process.Kill()
|
||||||
|
}
|
||||||
|
return fmt.Errorf("RQLite SQL not available: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -235,16 +275,73 @@ func (r *RQLiteManager) GetConnection() *gorqlite.Connection {
|
|||||||
func (r *RQLiteManager) Stop() error {
|
func (r *RQLiteManager) Stop() error {
|
||||||
if r.connection != nil {
|
if r.connection != nil {
|
||||||
r.connection.Close()
|
r.connection.Close()
|
||||||
|
r.connection = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.cmd != nil && r.cmd.Process != nil {
|
if r.cmd == nil || r.cmd.Process == nil {
|
||||||
r.logger.Info("Stopping RQLite node")
|
return nil
|
||||||
return r.cmd.Process.Kill()
|
}
|
||||||
|
|
||||||
|
r.logger.Info("Stopping RQLite node (graceful)")
|
||||||
|
// Try SIGTERM first
|
||||||
|
if err := r.cmd.Process.Signal(syscall.SIGTERM); err != nil {
|
||||||
|
// Fallback to Kill if signaling fails
|
||||||
|
_ = r.cmd.Process.Kill()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait up to 5 seconds for graceful shutdown
|
||||||
|
done := make(chan error, 1)
|
||||||
|
go func() { done <- r.cmd.Wait() }()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-done:
|
||||||
|
if err != nil && !errors.Is(err, os.ErrClosed) {
|
||||||
|
r.logger.Warn("RQLite process exited with error", zap.Error(err))
|
||||||
|
}
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
r.logger.Warn("RQLite did not exit in time; killing")
|
||||||
|
_ = r.cmd.Process.Kill()
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitForJoinTarget waits until the join target's HTTP status becomes reachable, or until timeout
|
||||||
|
func (r *RQLiteManager) waitForJoinTarget(ctx context.Context, joinAddress string, timeout time.Duration) error {
|
||||||
|
var deadline time.Time
|
||||||
|
if timeout > 0 {
|
||||||
|
deadline = time.Now().Add(timeout)
|
||||||
|
}
|
||||||
|
var lastErr error
|
||||||
|
|
||||||
|
for {
|
||||||
|
if err := r.testJoinAddress(joinAddress); err == nil {
|
||||||
|
r.logger.Info("Join target is reachable, proceeding with cluster join")
|
||||||
|
return nil
|
||||||
|
} else {
|
||||||
|
lastErr = err
|
||||||
|
r.logger.Debug("Join target not yet reachable; waiting...", zap.String("join_address", joinAddress), zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check context
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
}
|
||||||
|
|
||||||
|
if !deadline.IsZero() && time.Now().After(deadline) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if lastErr == nil {
|
||||||
|
lastErr = fmt.Errorf("join target not reachable within %s", timeout)
|
||||||
|
}
|
||||||
|
return lastErr
|
||||||
|
}
|
||||||
|
|
||||||
// getExternalIP attempts to get the external IP address of this machine
|
// getExternalIP attempts to get the external IP address of this machine
|
||||||
func (r *RQLiteManager) getExternalIP() (string, error) {
|
func (r *RQLiteManager) getExternalIP() (string, error) {
|
||||||
// Method 1: Try using `ip route get` to find the IP used to reach the internet
|
// Method 1: Try using `ip route get` to find the IP used to reach the internet
|
||||||
|
@ -160,6 +160,26 @@ func (n *Node) startLibP2P() error {
|
|||||||
// Don't fail - continue without bootstrap connections
|
// Don't fail - continue without bootstrap connections
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Background reconnect loop: keep trying to connect to bootstrap peers for a short window
|
||||||
|
// This helps when nodes are started slightly out-of-order in dev.
|
||||||
|
if len(n.config.Discovery.BootstrapPeers) > 0 {
|
||||||
|
go func() {
|
||||||
|
for i := 0; i < 12; i++ { // ~60s total
|
||||||
|
if n.host == nil || n.dht == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// If we already have peers or DHT table entries, stop retrying
|
||||||
|
if len(n.host.Network().Peers()) > 0 || len(n.dht.RoutingTable().ListPeers()) > 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := n.connectToBootstrapPeers(); err == nil {
|
||||||
|
n.logger.Debug("Bootstrap reconnect attempt completed")
|
||||||
|
}
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// Add bootstrap peers to DHT routing table BEFORE bootstrapping
|
// Add bootstrap peers to DHT routing table BEFORE bootstrapping
|
||||||
if len(n.config.Discovery.BootstrapPeers) > 0 {
|
if len(n.config.Discovery.BootstrapPeers) > 0 {
|
||||||
n.logger.Info("Adding bootstrap peers to DHT routing table")
|
n.logger.Info("Adding bootstrap peers to DHT routing table")
|
||||||
@ -386,6 +406,9 @@ func (n *Node) Stop() error {
|
|||||||
if n.rqliteAdapter != nil {
|
if n.rqliteAdapter != nil {
|
||||||
n.rqliteAdapter.Close()
|
n.rqliteAdapter.Close()
|
||||||
}
|
}
|
||||||
|
if n.rqliteManager != nil {
|
||||||
|
_ = n.rqliteManager.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
n.logger.ComponentInfo(logging.ComponentNode, "Network node stopped")
|
n.logger.ComponentInfo(logging.ComponentNode, "Network node stopped")
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
x
Reference in New Issue
Block a user