network/pkg/node/node.go
anonpenguin23 30d18aca02 feat: add RQLite command support and help documentation
- Introduced a new RQLite command in the CLI to handle RQLite-related operations.
- Implemented the 'fix' subcommand to automatically repair common RQLite cluster issues, including correcting misconfigured join addresses and cleaning stale raft state.
- Updated help documentation to include RQLite commands and their usage.
2025-11-03 07:10:25 +02:00

654 lines
20 KiB
Go

package node
import (
"context"
"fmt"
mathrand "math/rand"
"os"
"path/filepath"
"strings"
"time"
"github.com/libp2p/go-libp2p"
libp2ppubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
noise "github.com/libp2p/go-libp2p/p2p/security/noise"
"github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
"github.com/DeBrosOfficial/network/pkg/config"
"github.com/DeBrosOfficial/network/pkg/discovery"
"github.com/DeBrosOfficial/network/pkg/encryption"
"github.com/DeBrosOfficial/network/pkg/logging"
"github.com/DeBrosOfficial/network/pkg/pubsub"
database "github.com/DeBrosOfficial/network/pkg/rqlite"
)
// Node represents a network node with RQLite database
type Node struct {
config *config.Config
logger *logging.ColoredLogger
host host.Host
rqliteManager *database.RQLiteManager
rqliteAdapter *database.RQLiteAdapter
clusterDiscovery *database.ClusterDiscoveryService
// Peer discovery
bootstrapCancel context.CancelFunc
// PubSub
pubsub *pubsub.ClientAdapter
// Discovery
discoveryManager *discovery.Manager
}
// NewNode creates a new network node
func NewNode(cfg *config.Config) (*Node, error) {
// Create colored logger
logger, err := logging.NewColoredLogger(logging.ComponentNode, true)
if err != nil {
return nil, fmt.Errorf("failed to create logger: %w", err)
}
return &Node{
config: cfg,
logger: logger,
}, nil
}
// startRQLite initializes and starts the RQLite database
func (n *Node) startRQLite(ctx context.Context) error {
n.logger.Info("Starting RQLite database")
// Create RQLite manager
n.rqliteManager = database.NewRQLiteManager(&n.config.Database, &n.config.Discovery, n.config.Node.DataDir, n.logger.Logger)
// Initialize cluster discovery service if LibP2P host is available
if n.host != nil && n.discoveryManager != nil {
// Determine node type
nodeType := "node"
if n.config.Node.Type == "bootstrap" {
nodeType = "bootstrap"
}
// Create cluster discovery service
n.clusterDiscovery = database.NewClusterDiscoveryService(
n.host,
n.discoveryManager,
n.rqliteManager,
n.config.Node.ID,
nodeType,
n.config.Discovery.RaftAdvAddress,
n.config.Discovery.HttpAdvAddress,
n.config.Node.DataDir,
n.logger.Logger,
)
// Set discovery service on RQLite manager BEFORE starting RQLite
// This is critical for pre-start cluster discovery during recovery
n.rqliteManager.SetDiscoveryService(n.clusterDiscovery)
// Start cluster discovery (but don't trigger initial sync yet)
if err := n.clusterDiscovery.Start(ctx); err != nil {
return fmt.Errorf("failed to start cluster discovery: %w", err)
}
// Publish initial metadata (with log_index=0) so peers can discover us during recovery
// The metadata will be updated with actual log index after RQLite starts
n.clusterDiscovery.UpdateOwnMetadata()
n.logger.Info("Cluster discovery service started (waiting for RQLite)")
}
// Start RQLite FIRST before updating metadata
if err := n.rqliteManager.Start(ctx); err != nil {
return err
}
// NOW update metadata after RQLite is running
if n.clusterDiscovery != nil {
n.clusterDiscovery.UpdateOwnMetadata()
n.clusterDiscovery.TriggerSync() // Do initial cluster sync now that RQLite is ready
n.logger.Info("RQLite metadata published and cluster synced")
}
// Create adapter for sql.DB compatibility
adapter, err := database.NewRQLiteAdapter(n.rqliteManager)
if err != nil {
return fmt.Errorf("failed to create RQLite adapter: %w", err)
}
n.rqliteAdapter = adapter
return nil
}
// bootstrapPeerSource returns a PeerSource that yields peers from BootstrapPeers.
func bootstrapPeerSource(bootstrapAddrs []string, logger *zap.Logger) func(context.Context, int) <-chan peer.AddrInfo {
return func(ctx context.Context, num int) <-chan peer.AddrInfo {
out := make(chan peer.AddrInfo, num)
go func() {
defer close(out)
count := 0
for _, s := range bootstrapAddrs {
if count >= num {
return
}
ma, err := multiaddr.NewMultiaddr(s)
if err != nil {
logger.Debug("invalid bootstrap multiaddr", zap.String("addr", s), zap.Error(err))
continue
}
ai, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
logger.Debug("failed to parse bootstrap peer", zap.String("addr", s), zap.Error(err))
continue
}
select {
case out <- *ai:
count++
case <-ctx.Done():
return
}
}
}()
return out
}
}
// hasBootstrapConnections checks if we're connected to any bootstrap peers
func (n *Node) hasBootstrapConnections() bool {
if n.host == nil || len(n.config.Discovery.BootstrapPeers) == 0 {
return false
}
connectedPeers := n.host.Network().Peers()
if len(connectedPeers) == 0 {
return false
}
// Parse bootstrap peer IDs
bootstrapPeerIDs := make(map[peer.ID]bool)
for _, bootstrapAddr := range n.config.Discovery.BootstrapPeers {
ma, err := multiaddr.NewMultiaddr(bootstrapAddr)
if err != nil {
continue
}
peerInfo, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
continue
}
bootstrapPeerIDs[peerInfo.ID] = true
}
// Check if any connected peer is a bootstrap peer
for _, peerID := range connectedPeers {
if bootstrapPeerIDs[peerID] {
return true
}
}
return false
}
// calculateNextBackoff calculates the next backoff interval with exponential growth
func calculateNextBackoff(current time.Duration) time.Duration {
// Multiply by 1.5 for gentler exponential growth
next := time.Duration(float64(current) * 1.5)
// Cap at 10 minutes
maxInterval := 10 * time.Minute
if next > maxInterval {
next = maxInterval
}
return next
}
// addJitter adds random jitter to prevent thundering herd
func addJitter(interval time.Duration) time.Duration {
// Add ±20% jitter
jitterPercent := 0.2
jitterRange := float64(interval) * jitterPercent
jitter := (mathrand.Float64() - 0.5) * 2 * jitterRange // -jitterRange to +jitterRange
result := time.Duration(float64(interval) + jitter)
// Ensure we don't go below 1 second
if result < time.Second {
result = time.Second
}
return result
}
// connectToBootstrapPeer connects to a single bootstrap peer
func (n *Node) connectToBootstrapPeer(ctx context.Context, addr string) error {
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return fmt.Errorf("invalid multiaddr: %w", err)
}
// Extract peer info from multiaddr
peerInfo, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
return fmt.Errorf("failed to extract peer info: %w", err)
}
// Avoid dialing ourselves: if the bootstrap address resolves to our own peer ID, skip.
if n.host != nil && peerInfo.ID == n.host.ID() {
n.logger.ComponentDebug(logging.ComponentNode, "Skipping bootstrap address because it resolves to self",
zap.String("addr", addr),
zap.String("peer_id", peerInfo.ID.String()))
return nil
}
// Log resolved peer info prior to connect
n.logger.ComponentDebug(logging.ComponentNode, "Resolved bootstrap peer",
zap.String("peer_id", peerInfo.ID.String()),
zap.String("addr", addr),
zap.Int("addr_count", len(peerInfo.Addrs)),
)
// Connect to the peer
if err := n.host.Connect(ctx, *peerInfo); err != nil {
return fmt.Errorf("failed to connect to peer: %w", err)
}
n.logger.Info("Connected to bootstrap peer",
zap.String("peer", peerInfo.ID.String()),
zap.String("addr", addr))
return nil
}
// connectToBootstrapPeers connects to configured LibP2P bootstrap peers
func (n *Node) connectToBootstrapPeers(ctx context.Context) error {
if len(n.config.Discovery.BootstrapPeers) == 0 {
n.logger.ComponentDebug(logging.ComponentNode, "No bootstrap peers configured")
return nil
}
// Use passed context with a reasonable timeout for bootstrap connections
connectCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
for _, bootstrapAddr := range n.config.Discovery.BootstrapPeers {
if err := n.connectToBootstrapPeer(connectCtx, bootstrapAddr); err != nil {
n.logger.ComponentWarn(logging.ComponentNode, "Failed to connect to bootstrap peer",
zap.String("addr", bootstrapAddr),
zap.Error(err))
continue
}
}
return nil
}
// startLibP2P initializes the LibP2P host
func (n *Node) startLibP2P() error {
n.logger.ComponentInfo(logging.ComponentLibP2P, "Starting LibP2P host")
// Load or create persistent identity
identity, err := n.loadOrCreateIdentity()
if err != nil {
return fmt.Errorf("failed to load identity: %w", err)
}
// Create LibP2P host with explicit listen addresses
var opts []libp2p.Option
opts = append(opts,
libp2p.Identity(identity),
libp2p.Security(noise.ID, noise.New),
libp2p.DefaultMuxers,
)
// Add explicit listen addresses from config
if len(n.config.Node.ListenAddresses) > 0 {
listenAddrs := make([]multiaddr.Multiaddr, 0, len(n.config.Node.ListenAddresses))
for _, addr := range n.config.Node.ListenAddresses {
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return fmt.Errorf("invalid listen address %s: %w", addr, err)
}
listenAddrs = append(listenAddrs, ma)
}
opts = append(opts, libp2p.ListenAddrs(listenAddrs...))
n.logger.ComponentInfo(logging.ComponentLibP2P, "Configured listen addresses",
zap.Strings("addrs", n.config.Node.ListenAddresses))
}
// For localhost/development, disable NAT services
// For production, these would be enabled
isLocalhost := len(n.config.Node.ListenAddresses) > 0 &&
(strings.Contains(n.config.Node.ListenAddresses[0], "127.0.0.1") ||
strings.Contains(n.config.Node.ListenAddresses[0], "localhost"))
if isLocalhost {
n.logger.ComponentInfo(logging.ComponentLibP2P, "Localhost detected - disabling NAT services for local development")
// Don't add NAT/AutoRelay options for localhost
} else {
n.logger.ComponentInfo(logging.ComponentLibP2P, "Production mode - enabling NAT services")
opts = append(opts,
libp2p.EnableNATService(),
libp2p.EnableAutoNATv2(),
libp2p.EnableRelay(),
libp2p.NATPortMap(),
libp2p.EnableAutoRelayWithPeerSource(
bootstrapPeerSource(n.config.Discovery.BootstrapPeers, n.logger.Logger),
),
)
}
h, err := libp2p.New(opts...)
if err != nil {
return err
}
n.host = h
// Initialize pubsub
ps, err := libp2ppubsub.NewGossipSub(context.Background(), h,
libp2ppubsub.WithPeerExchange(true),
libp2ppubsub.WithFloodPublish(true), // Ensure messages reach all peers, not just mesh
libp2ppubsub.WithDirectPeers(nil), // Enable direct peer connections
)
if err != nil {
return fmt.Errorf("failed to create pubsub: %w", err)
}
// Create pubsub adapter with "node" namespace
n.pubsub = pubsub.NewClientAdapter(ps, n.config.Discovery.NodeNamespace)
n.logger.Info("Initialized pubsub adapter on namespace", zap.String("namespace", n.config.Discovery.NodeNamespace))
// Log configured bootstrap peers
if len(n.config.Discovery.BootstrapPeers) > 0 {
n.logger.ComponentInfo(logging.ComponentNode, "Configured bootstrap peers",
zap.Strings("peers", n.config.Discovery.BootstrapPeers))
} else {
n.logger.ComponentDebug(logging.ComponentNode, "No bootstrap peers configured")
}
// Connect to LibP2P bootstrap peers if configured
if err := n.connectToBootstrapPeers(context.Background()); err != nil {
n.logger.ComponentWarn(logging.ComponentNode, "Failed to connect to bootstrap peers", zap.Error(err))
// Don't fail - continue without bootstrap connections
}
// Start exponential backoff reconnection for bootstrap peers
if len(n.config.Discovery.BootstrapPeers) > 0 {
bootstrapCtx, cancel := context.WithCancel(context.Background())
n.bootstrapCancel = cancel
go func() {
interval := 5 * time.Second
consecutiveFailures := 0
n.logger.ComponentInfo(logging.ComponentNode, "Starting bootstrap peer reconnection with exponential backoff",
zap.Duration("initial_interval", interval),
zap.Duration("max_interval", 10*time.Minute))
for {
select {
case <-bootstrapCtx.Done():
n.logger.ComponentDebug(logging.ComponentNode, "Bootstrap reconnection loop stopped")
return
default:
}
// Check if we need to attempt connection
if !n.hasBootstrapConnections() {
n.logger.ComponentDebug(logging.ComponentNode, "Attempting bootstrap peer connection",
zap.Duration("current_interval", interval),
zap.Int("consecutive_failures", consecutiveFailures))
if err := n.connectToBootstrapPeers(context.Background()); err != nil {
consecutiveFailures++
// Calculate next backoff interval
jitteredInterval := addJitter(interval)
n.logger.ComponentDebug(logging.ComponentNode, "Bootstrap connection failed, backing off",
zap.Error(err),
zap.Duration("next_attempt_in", jitteredInterval),
zap.Int("consecutive_failures", consecutiveFailures))
// Sleep with jitter
select {
case <-bootstrapCtx.Done():
return
case <-time.After(jitteredInterval):
}
// Increase interval for next attempt
interval = calculateNextBackoff(interval)
// Log interval increases occasionally to show progress
if consecutiveFailures%5 == 0 {
n.logger.ComponentInfo(logging.ComponentNode, "Bootstrap connection still failing",
zap.Int("consecutive_failures", consecutiveFailures),
zap.Duration("current_interval", interval))
}
} else {
// Success! Reset interval and counters
if consecutiveFailures > 0 {
n.logger.ComponentInfo(logging.ComponentNode, "Successfully connected to bootstrap peers",
zap.Int("failures_overcome", consecutiveFailures))
}
interval = 5 * time.Second
consecutiveFailures = 0
// Wait 30 seconds before checking connection again
select {
case <-bootstrapCtx.Done():
return
case <-time.After(30 * time.Second):
}
}
} else {
// We have bootstrap connections, just wait and check periodically
select {
case <-bootstrapCtx.Done():
return
case <-time.After(30 * time.Second):
}
}
}
}()
}
// Add bootstrap peers to peerstore for peer exchange
if len(n.config.Discovery.BootstrapPeers) > 0 {
n.logger.ComponentInfo(logging.ComponentNode, "Adding bootstrap peers to peerstore")
for _, bootstrapAddr := range n.config.Discovery.BootstrapPeers {
if ma, err := multiaddr.NewMultiaddr(bootstrapAddr); err == nil {
if peerInfo, err := peer.AddrInfoFromP2pAddr(ma); err == nil {
// Add to peerstore with longer TTL for peer exchange
n.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, time.Hour*24)
n.logger.ComponentDebug(logging.ComponentNode, "Added bootstrap peer to peerstore",
zap.String("peer", peerInfo.ID.String()))
}
}
}
}
// Initialize discovery manager with peer exchange protocol
n.discoveryManager = discovery.NewManager(h, nil, n.logger.Logger)
n.discoveryManager.StartProtocolHandler()
n.logger.ComponentInfo(logging.ComponentNode, "LibP2P host started successfully - using active peer exchange discovery")
// Start peer discovery and monitoring
n.startPeerDiscovery()
n.logger.ComponentInfo(logging.ComponentLibP2P, "LibP2P host started",
zap.String("peer_id", h.ID().String()))
return nil
}
// loadOrCreateIdentity loads an existing identity or creates a new one
// loadOrCreateIdentity loads an existing identity or creates a new one
func (n *Node) loadOrCreateIdentity() (crypto.PrivKey, error) {
identityFile := filepath.Join(n.config.Node.DataDir, "identity.key")
// Expand ~ in data directory path
identityFile = os.ExpandEnv(identityFile)
if strings.HasPrefix(identityFile, "~") {
home, err := os.UserHomeDir()
if err != nil {
return nil, fmt.Errorf("failed to determine home directory: %w", err)
}
identityFile = filepath.Join(home, identityFile[1:])
}
// Try to load existing identity using the shared package
if _, err := os.Stat(identityFile); err == nil {
info, err := encryption.LoadIdentity(identityFile)
if err != nil {
n.logger.Warn("Failed to load existing identity, creating new one", zap.Error(err))
} else {
n.logger.ComponentInfo(logging.ComponentNode, "Loaded existing identity",
zap.String("file", identityFile),
zap.String("peer_id", info.PeerID.String()))
return info.PrivateKey, nil
}
}
// Create new identity using shared package
n.logger.Info("Creating new identity", zap.String("file", identityFile))
info, err := encryption.GenerateIdentity()
if err != nil {
return nil, fmt.Errorf("failed to generate identity: %w", err)
}
// Save identity using shared package
if err := encryption.SaveIdentity(info, identityFile); err != nil {
return nil, fmt.Errorf("failed to save identity: %w", err)
}
n.logger.Info("Identity saved",
zap.String("file", identityFile),
zap.String("peer_id", info.PeerID.String()))
return info.PrivateKey, nil
}
// GetPeerID returns the peer ID of this node
func (n *Node) GetPeerID() string {
if n.host == nil {
return ""
}
return n.host.ID().String()
}
// startPeerDiscovery starts periodic peer discovery for the node
func (n *Node) startPeerDiscovery() {
if n.discoveryManager == nil {
n.logger.ComponentWarn(logging.ComponentNode, "Discovery manager not initialized")
return
}
// Start the discovery manager with config from node config
discoveryConfig := discovery.Config{
DiscoveryInterval: n.config.Discovery.DiscoveryInterval,
MaxConnections: n.config.Node.MaxConnections,
}
if err := n.discoveryManager.Start(discoveryConfig); err != nil {
n.logger.ComponentWarn(logging.ComponentNode, "Failed to start discovery manager", zap.Error(err))
return
}
n.logger.ComponentInfo(logging.ComponentNode, "Peer discovery manager started",
zap.Duration("interval", discoveryConfig.DiscoveryInterval),
zap.Int("max_connections", discoveryConfig.MaxConnections))
}
// stopPeerDiscovery stops peer discovery
func (n *Node) stopPeerDiscovery() {
if n.discoveryManager != nil {
n.discoveryManager.Stop()
}
n.logger.ComponentInfo(logging.ComponentNode, "Peer discovery stopped")
}
// getListenAddresses returns the current listen addresses as strings
// Stop stops the node and all its services
func (n *Node) Stop() error {
n.logger.ComponentInfo(logging.ComponentNode, "Stopping network node")
// Stop cluster discovery
if n.clusterDiscovery != nil {
n.clusterDiscovery.Stop()
}
// Stop bootstrap reconnection loop
if n.bootstrapCancel != nil {
n.bootstrapCancel()
}
// Stop peer discovery
n.stopPeerDiscovery()
// Stop LibP2P host
if n.host != nil {
n.host.Close()
}
// Stop RQLite
if n.rqliteAdapter != nil {
n.rqliteAdapter.Close()
}
if n.rqliteManager != nil {
_ = n.rqliteManager.Stop()
}
n.logger.ComponentInfo(logging.ComponentNode, "Network node stopped")
return nil
}
// Starts the network node
func (n *Node) Start(ctx context.Context) error {
n.logger.Info("Starting network node", zap.String("data_dir", n.config.Node.DataDir))
// Expand ~ in data directory path
dataDir := n.config.Node.DataDir
dataDir = os.ExpandEnv(dataDir)
if strings.HasPrefix(dataDir, "~") {
home, err := os.UserHomeDir()
if err != nil {
return fmt.Errorf("failed to determine home directory: %w", err)
}
dataDir = filepath.Join(home, dataDir[1:])
}
// Create data directory
if err := os.MkdirAll(dataDir, 0755); err != nil {
return fmt.Errorf("failed to create data directory: %w", err)
}
// Start LibP2P host first (needed for cluster discovery)
if err := n.startLibP2P(); err != nil {
return fmt.Errorf("failed to start LibP2P: %w", err)
}
// Start RQLite with cluster discovery
if err := n.startRQLite(ctx); err != nil {
return fmt.Errorf("failed to start RQLite: %w", err)
}
// Get listen addresses for logging
var listenAddrs []string
for _, addr := range n.host.Addrs() {
listenAddrs = append(listenAddrs, addr.String())
}
n.logger.ComponentInfo(logging.ComponentNode, "Network node started successfully",
zap.String("peer_id", n.host.ID().String()),
zap.Strings("listen_addrs", listenAddrs),
)
n.startConnectionMonitoring()
return nil
}