mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-10-06 15:29:07 +00:00
The above changes introduce node namespacing and various monitoring improvements. Let's look at the key changes: - Add node_namespace config field for partitioning node identifiers - Initialize pubsub with peer exchange enabled - Reduce client monitoring interval to 30s - Add metric announcement logging I would write this commit message as: Add node namespace and improve monitoring - Add node_namespace config for partitioning identifiers - Enable pubsub peer exchange - Adjust monitoring intervals and add logging - Initialize pubsub with namespace support The subject line captures the two main themes (namespacing and monitoring), while the body provides helpful details about the specific changes made.
658 lines
19 KiB
Go
658 lines
19 KiB
Go
package node
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"fmt"
|
|
mathrand "math/rand"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p"
|
|
libp2ppubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
"github.com/libp2p/go-libp2p/core/crypto"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/network"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
|
|
noise "github.com/libp2p/go-libp2p/p2p/security/noise"
|
|
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
|
|
"github.com/multiformats/go-multiaddr"
|
|
"go.uber.org/zap"
|
|
|
|
"git.debros.io/DeBros/network/pkg/anyoneproxy"
|
|
"git.debros.io/DeBros/network/pkg/config"
|
|
"git.debros.io/DeBros/network/pkg/database"
|
|
"git.debros.io/DeBros/network/pkg/logging"
|
|
"git.debros.io/DeBros/network/pkg/pubsub"
|
|
)
|
|
|
|
// Node represents a network node with RQLite database
|
|
type Node struct {
|
|
config *config.Config
|
|
logger *logging.ColoredLogger
|
|
host host.Host
|
|
|
|
rqliteManager *database.RQLiteManager
|
|
rqliteAdapter *database.RQLiteAdapter
|
|
|
|
// Peer discovery
|
|
discoveryCancel context.CancelFunc
|
|
bootstrapCancel context.CancelFunc
|
|
|
|
// PubSub
|
|
pubsub *pubsub.ClientAdapter
|
|
}
|
|
|
|
// NewNode creates a new network node
|
|
func NewNode(cfg *config.Config) (*Node, error) {
|
|
// Create colored logger
|
|
logger, err := logging.NewColoredLogger(logging.ComponentNode, true)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create logger: %w", err)
|
|
}
|
|
|
|
return &Node{
|
|
config: cfg,
|
|
logger: logger,
|
|
}, nil
|
|
}
|
|
|
|
// startRQLite initializes and starts the RQLite database
|
|
func (n *Node) startRQLite(ctx context.Context) error {
|
|
n.logger.Info("Starting RQLite database")
|
|
|
|
// Create RQLite manager
|
|
n.rqliteManager = database.NewRQLiteManager(&n.config.Database, &n.config.Discovery, n.config.Node.DataDir, n.logger.Logger)
|
|
|
|
// Start RQLite
|
|
if err := n.rqliteManager.Start(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create adapter for sql.DB compatibility
|
|
adapter, err := database.NewRQLiteAdapter(n.rqliteManager)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create RQLite adapter: %w", err)
|
|
}
|
|
n.rqliteAdapter = adapter
|
|
|
|
return nil
|
|
}
|
|
|
|
// hasBootstrapConnections checks if we're connected to any bootstrap peers
|
|
func (n *Node) hasBootstrapConnections() bool {
|
|
if n.host == nil || len(n.config.Discovery.BootstrapPeers) == 0 {
|
|
return false
|
|
}
|
|
|
|
connectedPeers := n.host.Network().Peers()
|
|
if len(connectedPeers) == 0 {
|
|
return false
|
|
}
|
|
|
|
// Parse bootstrap peer IDs
|
|
bootstrapPeerIDs := make(map[peer.ID]bool)
|
|
for _, bootstrapAddr := range n.config.Discovery.BootstrapPeers {
|
|
ma, err := multiaddr.NewMultiaddr(bootstrapAddr)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
peerInfo, err := peer.AddrInfoFromP2pAddr(ma)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
bootstrapPeerIDs[peerInfo.ID] = true
|
|
}
|
|
|
|
// Check if any connected peer is a bootstrap peer
|
|
for _, peerID := range connectedPeers {
|
|
if bootstrapPeerIDs[peerID] {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// calculateNextBackoff calculates the next backoff interval with exponential growth
|
|
func calculateNextBackoff(current time.Duration) time.Duration {
|
|
// Multiply by 1.5 for gentler exponential growth
|
|
next := time.Duration(float64(current) * 1.5)
|
|
// Cap at 10 minutes
|
|
maxInterval := 10 * time.Minute
|
|
if next > maxInterval {
|
|
next = maxInterval
|
|
}
|
|
return next
|
|
}
|
|
|
|
// addJitter adds random jitter to prevent thundering herd
|
|
func addJitter(interval time.Duration) time.Duration {
|
|
// Add ±20% jitter
|
|
jitterPercent := 0.2
|
|
jitterRange := float64(interval) * jitterPercent
|
|
jitter := (mathrand.Float64() - 0.5) * 2 * jitterRange // -jitterRange to +jitterRange
|
|
|
|
result := time.Duration(float64(interval) + jitter)
|
|
// Ensure we don't go below 1 second
|
|
if result < time.Second {
|
|
result = time.Second
|
|
}
|
|
return result
|
|
}
|
|
|
|
// connectToBootstrapPeer connects to a single bootstrap peer
|
|
func (n *Node) connectToBootstrapPeer(ctx context.Context, addr string) error {
|
|
ma, err := multiaddr.NewMultiaddr(addr)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid multiaddr: %w", err)
|
|
}
|
|
|
|
// Extract peer info from multiaddr
|
|
peerInfo, err := peer.AddrInfoFromP2pAddr(ma)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to extract peer info: %w", err)
|
|
}
|
|
|
|
// Avoid dialing ourselves: if the bootstrap address resolves to our own peer ID, skip.
|
|
if n.host != nil && peerInfo.ID == n.host.ID() {
|
|
n.logger.ComponentDebug(logging.ComponentNode, "Skipping bootstrap address because it resolves to self",
|
|
zap.String("addr", addr),
|
|
zap.String("peer_id", peerInfo.ID.String()))
|
|
return nil
|
|
}
|
|
|
|
// Log resolved peer info prior to connect
|
|
n.logger.ComponentDebug(logging.ComponentNode, "Resolved bootstrap peer",
|
|
zap.String("peer_id", peerInfo.ID.String()),
|
|
zap.String("addr", addr),
|
|
zap.Int("addr_count", len(peerInfo.Addrs)),
|
|
)
|
|
|
|
// Connect to the peer
|
|
if err := n.host.Connect(ctx, *peerInfo); err != nil {
|
|
return fmt.Errorf("failed to connect to peer: %w", err)
|
|
}
|
|
|
|
n.logger.Info("Connected to bootstrap peer",
|
|
zap.String("peer", peerInfo.ID.String()),
|
|
zap.String("addr", addr))
|
|
|
|
return nil
|
|
}
|
|
|
|
// connectToBootstrapPeers connects to configured LibP2P bootstrap peers
|
|
func (n *Node) connectToBootstrapPeers(ctx context.Context) error {
|
|
if len(n.config.Discovery.BootstrapPeers) == 0 {
|
|
n.logger.ComponentDebug(logging.ComponentNode, "No bootstrap peers configured")
|
|
return nil
|
|
}
|
|
|
|
// Use passed context with a reasonable timeout for bootstrap connections
|
|
connectCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer cancel()
|
|
|
|
for _, bootstrapAddr := range n.config.Discovery.BootstrapPeers {
|
|
if err := n.connectToBootstrapPeer(connectCtx, bootstrapAddr); err != nil {
|
|
n.logger.ComponentWarn(logging.ComponentNode, "Failed to connect to bootstrap peer",
|
|
zap.String("addr", bootstrapAddr),
|
|
zap.Error(err))
|
|
continue
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// startLibP2P initializes the LibP2P host
|
|
func (n *Node) startLibP2P() error {
|
|
n.logger.ComponentInfo(logging.ComponentLibP2P, "Starting LibP2P host")
|
|
|
|
// Get listen addresses
|
|
listenAddrs, err := n.config.ParseMultiaddrs()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse listen addresses: %w", err)
|
|
}
|
|
|
|
// Load or create persistent identity
|
|
identity, err := n.loadOrCreateIdentity()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load identity: %w", err)
|
|
}
|
|
|
|
// Log Anyone proxy status before constructing host
|
|
n.logger.ComponentInfo(logging.ComponentLibP2P, "Anyone proxy status",
|
|
zap.Bool("proxy_enabled", anyoneproxy.Enabled()),
|
|
zap.String("proxy_addr", anyoneproxy.Address()),
|
|
zap.Bool("proxy_running", anyoneproxy.Running()),
|
|
)
|
|
|
|
if anyoneproxy.Enabled() && !anyoneproxy.Running() {
|
|
n.logger.Warn("Anyone proxy is enabled but not reachable",
|
|
zap.String("addr", anyoneproxy.Address()))
|
|
}
|
|
|
|
// Create LibP2P host with persistent identity
|
|
// Build options allowing conditional proxying via Anyone SOCKS5
|
|
var opts []libp2p.Option
|
|
opts = append(opts,
|
|
libp2p.Identity(identity),
|
|
libp2p.ListenAddrs(listenAddrs...),
|
|
libp2p.Security(noise.ID, noise.New),
|
|
libp2p.DefaultMuxers,
|
|
)
|
|
|
|
// TCP transport with optional SOCKS5 dialer override
|
|
if anyoneproxy.Enabled() {
|
|
opts = append(opts, libp2p.Transport(tcp.NewTCPTransport, tcp.WithDialerForAddr(anyoneproxy.DialerForAddr())))
|
|
} else {
|
|
opts = append(opts, libp2p.Transport(tcp.NewTCPTransport))
|
|
}
|
|
|
|
h, err := libp2p.New(opts...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
n.host = h
|
|
|
|
// Initialize pubsub
|
|
ps, err := libp2ppubsub.NewGossipSub(context.Background(), h,
|
|
libp2ppubsub.WithPeerExchange(true),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create pubsub: %w", err)
|
|
}
|
|
|
|
// Create pubsub adapter with "node" namespace
|
|
n.pubsub = pubsub.NewClientAdapter(ps, n.config.Discovery.NodeNamespace)
|
|
n.logger.Info("Initialized pubsub adapter on namespace", zap.String("namespace", n.config.Discovery.NodeNamespace))
|
|
|
|
// Log configured bootstrap peers
|
|
if len(n.config.Discovery.BootstrapPeers) > 0 {
|
|
n.logger.ComponentInfo(logging.ComponentNode, "Configured bootstrap peers",
|
|
zap.Strings("peers", n.config.Discovery.BootstrapPeers))
|
|
} else {
|
|
n.logger.ComponentDebug(logging.ComponentNode, "No bootstrap peers configured")
|
|
}
|
|
|
|
// Connect to LibP2P bootstrap peers if configured
|
|
if err := n.connectToBootstrapPeers(context.Background()); err != nil {
|
|
n.logger.ComponentWarn(logging.ComponentNode, "Failed to connect to bootstrap peers", zap.Error(err))
|
|
// Don't fail - continue without bootstrap connections
|
|
}
|
|
|
|
// Start exponential backoff reconnection for bootstrap peers
|
|
if len(n.config.Discovery.BootstrapPeers) > 0 {
|
|
bootstrapCtx, cancel := context.WithCancel(context.Background())
|
|
n.bootstrapCancel = cancel
|
|
|
|
go func() {
|
|
interval := 5 * time.Second
|
|
consecutiveFailures := 0
|
|
|
|
n.logger.ComponentInfo(logging.ComponentNode, "Starting bootstrap peer reconnection with exponential backoff",
|
|
zap.Duration("initial_interval", interval),
|
|
zap.Duration("max_interval", 10*time.Minute))
|
|
|
|
for {
|
|
select {
|
|
case <-bootstrapCtx.Done():
|
|
n.logger.ComponentDebug(logging.ComponentNode, "Bootstrap reconnection loop stopped")
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Check if we need to attempt connection
|
|
if !n.hasBootstrapConnections() {
|
|
n.logger.ComponentDebug(logging.ComponentNode, "Attempting bootstrap peer connection",
|
|
zap.Duration("current_interval", interval),
|
|
zap.Int("consecutive_failures", consecutiveFailures))
|
|
|
|
if err := n.connectToBootstrapPeers(context.Background()); err != nil {
|
|
consecutiveFailures++
|
|
// Calculate next backoff interval
|
|
jitteredInterval := addJitter(interval)
|
|
n.logger.ComponentDebug(logging.ComponentNode, "Bootstrap connection failed, backing off",
|
|
zap.Error(err),
|
|
zap.Duration("next_attempt_in", jitteredInterval),
|
|
zap.Int("consecutive_failures", consecutiveFailures))
|
|
|
|
// Sleep with jitter
|
|
select {
|
|
case <-bootstrapCtx.Done():
|
|
return
|
|
case <-time.After(jitteredInterval):
|
|
}
|
|
|
|
// Increase interval for next attempt
|
|
interval = calculateNextBackoff(interval)
|
|
|
|
// Log interval increases occasionally to show progress
|
|
if consecutiveFailures%5 == 0 {
|
|
n.logger.ComponentInfo(logging.ComponentNode, "Bootstrap connection still failing",
|
|
zap.Int("consecutive_failures", consecutiveFailures),
|
|
zap.Duration("current_interval", interval))
|
|
}
|
|
} else {
|
|
// Success! Reset interval and counters
|
|
if consecutiveFailures > 0 {
|
|
n.logger.ComponentInfo(logging.ComponentNode, "Successfully connected to bootstrap peers",
|
|
zap.Int("failures_overcome", consecutiveFailures))
|
|
}
|
|
interval = 5 * time.Second
|
|
consecutiveFailures = 0
|
|
|
|
// Wait 30 seconds before checking connection again
|
|
select {
|
|
case <-bootstrapCtx.Done():
|
|
return
|
|
case <-time.After(30 * time.Second):
|
|
}
|
|
}
|
|
} else {
|
|
// We have bootstrap connections, just wait and check periodically
|
|
select {
|
|
case <-bootstrapCtx.Done():
|
|
return
|
|
case <-time.After(30 * time.Second):
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Add bootstrap peers to peerstore for peer exchange
|
|
if len(n.config.Discovery.BootstrapPeers) > 0 {
|
|
n.logger.ComponentInfo(logging.ComponentNode, "Adding bootstrap peers to peerstore")
|
|
for _, bootstrapAddr := range n.config.Discovery.BootstrapPeers {
|
|
if ma, err := multiaddr.NewMultiaddr(bootstrapAddr); err == nil {
|
|
if peerInfo, err := peer.AddrInfoFromP2pAddr(ma); err == nil {
|
|
// Add to peerstore with longer TTL for peer exchange
|
|
n.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, time.Hour*24)
|
|
n.logger.ComponentDebug(logging.ComponentNode, "Added bootstrap peer to peerstore",
|
|
zap.String("peer", peerInfo.ID.String()))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
n.logger.ComponentInfo(logging.ComponentNode, "LibP2P host started successfully - using bootstrap + peer exchange discovery")
|
|
|
|
// Start peer discovery and monitoring
|
|
n.startPeerDiscovery()
|
|
|
|
n.logger.ComponentInfo(logging.ComponentLibP2P, "LibP2P host started",
|
|
zap.String("peer_id", h.ID().String()))
|
|
|
|
return nil
|
|
}
|
|
|
|
// loadOrCreateIdentity loads an existing identity or creates a new one
|
|
func (n *Node) loadOrCreateIdentity() (crypto.PrivKey, error) {
|
|
identityFile := filepath.Join(n.config.Node.DataDir, "identity.key")
|
|
|
|
// Try to load existing identity
|
|
if _, err := os.Stat(identityFile); err == nil {
|
|
data, err := os.ReadFile(identityFile)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read identity file: %w", err)
|
|
}
|
|
|
|
priv, err := crypto.UnmarshalPrivateKey(data)
|
|
if err != nil {
|
|
n.logger.Warn("Failed to unmarshal existing identity, creating new one", zap.Error(err))
|
|
} else {
|
|
// Extract peer ID from private key for logging
|
|
peerID, err := peer.IDFromPrivateKey(priv)
|
|
if err != nil {
|
|
n.logger.ComponentInfo(logging.ComponentNode, "Loaded existing identity",
|
|
zap.String("file", identityFile),
|
|
zap.String("peer_id", "unable_to_extract"))
|
|
} else {
|
|
n.logger.ComponentInfo(logging.ComponentNode, "Loaded existing identity",
|
|
zap.String("file", identityFile),
|
|
zap.String("peer_id", peerID.String()))
|
|
}
|
|
return priv, nil
|
|
}
|
|
}
|
|
|
|
// Create new identity
|
|
n.logger.Info("Creating new identity", zap.String("file", identityFile))
|
|
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, rand.Reader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to generate key pair: %w", err)
|
|
}
|
|
|
|
// Extract peer ID from private key for logging
|
|
peerID, err := peer.IDFromPrivateKey(priv)
|
|
if err != nil {
|
|
n.logger.Info("Identity created",
|
|
zap.String("peer_id", "unable_to_extract"))
|
|
} else {
|
|
n.logger.Info("Identity created",
|
|
zap.String("peer_id", peerID.String()))
|
|
}
|
|
|
|
// Save identity
|
|
data, err := crypto.MarshalPrivateKey(priv)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal private key: %w", err)
|
|
}
|
|
|
|
if err := os.WriteFile(identityFile, data, 0600); err != nil {
|
|
return nil, fmt.Errorf("failed to save identity: %w", err)
|
|
}
|
|
|
|
n.logger.Info("Identity saved", zap.String("file", identityFile))
|
|
return priv, nil
|
|
}
|
|
|
|
// GetPeerID returns the peer ID of this node
|
|
func (n *Node) GetPeerID() string {
|
|
if n.host == nil {
|
|
return ""
|
|
}
|
|
return n.host.ID().String()
|
|
}
|
|
|
|
// startPeerDiscovery starts periodic peer discovery for the node
|
|
func (n *Node) startPeerDiscovery() {
|
|
// Create a cancellation context for discovery
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
n.discoveryCancel = cancel
|
|
|
|
// Start bootstrap peer connections immediately
|
|
go func() {
|
|
n.connectToBootstrapPeers(ctx)
|
|
|
|
// Periodic peer discovery using interval from config
|
|
ticker := time.NewTicker(n.config.Discovery.DiscoveryInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
n.discoverPeers(ctx)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// discoverPeers discovers and connects to new peers using peer exchange
|
|
func (n *Node) discoverPeers(ctx context.Context) {
|
|
if n.host == nil {
|
|
return
|
|
}
|
|
|
|
connectedPeers := n.host.Network().Peers()
|
|
initialCount := len(connectedPeers)
|
|
|
|
if initialCount == 0 {
|
|
// No peers connected - exponential backoff system handles bootstrap reconnection
|
|
n.logger.ComponentDebug(logging.ComponentNode, "No peers connected, relying on exponential backoff for bootstrap")
|
|
return
|
|
}
|
|
|
|
n.logger.ComponentDebug(logging.ComponentNode, "Discovering peers via peer exchange",
|
|
zap.Int("current_peers", initialCount))
|
|
|
|
// Strategy: Use peer exchange through libp2p's identify protocol
|
|
// LibP2P automatically exchanges peer information when peers connect
|
|
// We just need to try connecting to peers in our peerstore
|
|
|
|
newConnections := n.discoverViaPeerExchange(ctx)
|
|
|
|
finalPeerCount := len(n.host.Network().Peers())
|
|
|
|
if newConnections > 0 {
|
|
n.logger.ComponentInfo(logging.ComponentNode, "Peer discovery completed",
|
|
zap.Int("new_connections", newConnections),
|
|
zap.Int("initial_peers", initialCount),
|
|
zap.Int("final_peers", finalPeerCount))
|
|
}
|
|
}
|
|
|
|
// discoverViaPeerExchange discovers new peers using peer exchange (identify protocol)
|
|
func (n *Node) discoverViaPeerExchange(ctx context.Context) int {
|
|
connected := 0
|
|
maxConnections := 3 // Conservative limit to avoid overwhelming proxy
|
|
|
|
// Get all peers from peerstore (includes peers discovered through identify protocol)
|
|
allKnownPeers := n.host.Peerstore().Peers()
|
|
|
|
for _, knownPeer := range allKnownPeers {
|
|
if knownPeer == n.host.ID() {
|
|
continue
|
|
}
|
|
|
|
// Skip if already connected
|
|
if n.host.Network().Connectedness(knownPeer) == network.Connected {
|
|
continue
|
|
}
|
|
|
|
// Get addresses for this peer
|
|
addrs := n.host.Peerstore().Addrs(knownPeer)
|
|
if len(addrs) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Filter to only standard P2P ports (avoid ephemeral client ports)
|
|
var validAddrs []multiaddr.Multiaddr
|
|
for _, addr := range addrs {
|
|
addrStr := addr.String()
|
|
// Keep addresses with standard P2P ports (4000-4999 range)
|
|
if strings.Contains(addrStr, ":400") {
|
|
validAddrs = append(validAddrs, addr)
|
|
}
|
|
}
|
|
|
|
if len(validAddrs) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Try to connect with shorter timeout (proxy connections are slower)
|
|
connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
|
|
peerInfo := peer.AddrInfo{ID: knownPeer, Addrs: validAddrs}
|
|
|
|
if err := n.host.Connect(connectCtx, peerInfo); err != nil {
|
|
cancel()
|
|
n.logger.ComponentDebug(logging.ComponentNode, "Failed to connect to peer via exchange",
|
|
zap.String("peer", knownPeer.String()),
|
|
zap.Error(err))
|
|
continue
|
|
}
|
|
cancel()
|
|
|
|
n.logger.ComponentInfo(logging.ComponentNode, "Connected to new peer via peer exchange",
|
|
zap.String("peer", knownPeer.String()))
|
|
connected++
|
|
|
|
if connected >= maxConnections {
|
|
break
|
|
}
|
|
}
|
|
|
|
return connected
|
|
}
|
|
|
|
// stopPeerDiscovery stops peer discovery
|
|
func (n *Node) stopPeerDiscovery() {
|
|
if n.discoveryCancel != nil {
|
|
n.discoveryCancel()
|
|
n.discoveryCancel = nil
|
|
}
|
|
n.logger.ComponentInfo(logging.ComponentNode, "Peer discovery stopped")
|
|
}
|
|
|
|
// getListenAddresses returns the current listen addresses as strings
|
|
// Stop stops the node and all its services
|
|
func (n *Node) Stop() error {
|
|
n.logger.ComponentInfo(logging.ComponentNode, "Stopping network node")
|
|
|
|
// Stop bootstrap reconnection loop
|
|
if n.bootstrapCancel != nil {
|
|
n.bootstrapCancel()
|
|
}
|
|
|
|
// Stop peer discovery
|
|
n.stopPeerDiscovery()
|
|
|
|
// Stop LibP2P host
|
|
if n.host != nil {
|
|
n.host.Close()
|
|
}
|
|
|
|
// Stop RQLite
|
|
if n.rqliteAdapter != nil {
|
|
n.rqliteAdapter.Close()
|
|
}
|
|
if n.rqliteManager != nil {
|
|
_ = n.rqliteManager.Stop()
|
|
}
|
|
|
|
n.logger.ComponentInfo(logging.ComponentNode, "Network node stopped")
|
|
return nil
|
|
}
|
|
|
|
// Starts the network node
|
|
func (n *Node) Start(ctx context.Context) error {
|
|
n.logger.Info("Starting network node", zap.String("data_dir", n.config.Node.DataDir))
|
|
|
|
// Create data directory
|
|
if err := os.MkdirAll(n.config.Node.DataDir, 0755); err != nil {
|
|
return fmt.Errorf("failed to create data directory: %w", err)
|
|
}
|
|
|
|
// Start RQLite
|
|
if err := n.startRQLite(ctx); err != nil {
|
|
return fmt.Errorf("failed to start RQLite: %w", err)
|
|
}
|
|
|
|
// Start LibP2P host
|
|
if err := n.startLibP2P(); err != nil {
|
|
return fmt.Errorf("failed to start LibP2P: %w", err)
|
|
}
|
|
|
|
// Get listen addresses for logging
|
|
var listenAddrs []string
|
|
for _, addr := range n.host.Addrs() {
|
|
listenAddrs = append(listenAddrs, addr.String())
|
|
}
|
|
|
|
n.logger.ComponentInfo(logging.ComponentNode, "Network node started successfully",
|
|
zap.String("peer_id", n.host.ID().String()),
|
|
zap.Strings("listen_addrs", listenAddrs),
|
|
)
|
|
|
|
n.startConnectionMonitoring()
|
|
|
|
return nil
|
|
}
|