diff --git a/cmd/node/main.go b/cmd/node/main.go index dcdcd16..8578fc9 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -54,11 +54,15 @@ func parse_and_return_network_flags() (configPath *string, dataDir, nodeID *stri // Return config values but preserve command line flag values for overrides // The command line flags will be applied later in load_args_into_config + // Create separate variables to avoid modifying config directly + configDataDir := cfg.Node.DataDir + configAdvAddr := cfg.Discovery.HttpAdvAddress + return configPath, - &cfg.Node.DataDir, - &cfg.Node.ID, + &configDataDir, + nodeID, // Keep the command line flag value, not config value p2pPort, // Keep the command line flag value - &cfg.Discovery.HttpAdvAddress, + &configAdvAddr, help, cfg // Return the loaded config } @@ -91,13 +95,12 @@ func check_if_should_open_help(help *bool) { func select_data_dir(dataDir *string, nodeID *string) { logger := setup_logger(logging.ComponentNode) - // If dataDir is not set from config, set it based on nodeID - if *dataDir == "" { - if *nodeID == "" { - *dataDir = "./data/node" - } else { - *dataDir = fmt.Sprintf("./data/%s", *nodeID) - } + // If nodeID is provided via command line, use it to override the data directory + if *nodeID != "" { + *dataDir = fmt.Sprintf("./data/%s", *nodeID) + } else if *dataDir == "" { + // Fallback to default if neither nodeID nor dataDir is set + *dataDir = "./data/node" } logger.Info("Successfully selected Data Directory of: %s", zap.String("dataDir", *dataDir)) @@ -175,7 +178,6 @@ func main() { _, dataDir, nodeID, p2pPort, advAddr, help, loadedConfig := parse_and_return_network_flags() check_if_should_open_help(help) - select_data_dir(dataDir, nodeID) // Load Node Configuration - use loaded config if available, otherwise use default var cfg *config.Config @@ -187,6 +189,9 @@ func main() { logger.ComponentInfo(logging.ComponentNode, "Using default configuration") } + // Select data directory based on node ID (this overrides config) + select_data_dir(dataDir, nodeID) + // Apply command line argument overrides load_args_into_config(cfg, p2pPort, advAddr, dataDir) logger.ComponentInfo(logging.ComponentNode, "Command line arguments applied to configuration") diff --git a/pkg/config/config.go b/pkg/config/config.go index 692f9d0..5c90a5a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -114,7 +114,7 @@ func DefaultConfig() *Config { }, Discovery: DiscoveryConfig{ BootstrapPeers: []string{ - "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWKdj4B3LdZ8whYGaa97giwWCoSELciRp6qsFrDvz2Etah", + "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWJSkwRXX1PYA5g2bkAnKani7frhygZp5iY6U853JJPb7K", // "/ip4/217.76.54.168/tcp/4001/p2p/12D3KooWDp7xeShVY9uHfqNVPSsJeCKUatAviFZV8Y1joox5nUvx", // "/ip4/217.76.54.178/tcp/4001/p2p/12D3KooWKZnirPwNT4URtNSWK45f6vLkEs4xyUZ792F8Uj1oYnm1", // "/ip4/51.83.128.181/tcp/4001/p2p/12D3KooWBn2Zf1R8v9pEfmz7hDZ5b3oADxfejA3zJBYzKRCzgvhR", diff --git a/pkg/node/node.go b/pkg/node/node.go index 3338e24..c3df3e9 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -38,8 +38,9 @@ type Node struct { clusterManager *database.ClusterManager // Peer discovery - discoveryCancel context.CancelFunc - bootstrapCancel context.CancelFunc + discoveryCancel context.CancelFunc + bootstrapCancel context.CancelFunc + peerDiscoveryService *pubsub.PeerDiscoveryService // PubSub pubsub *pubsub.ClientAdapter @@ -255,6 +256,15 @@ func (n *Node) startLibP2P() error { n.pubsub = pubsub.NewClientAdapter(ps, n.config.Discovery.NodeNamespace) n.logger.Info("Initialized pubsub adapter on namespace", zap.String("namespace", n.config.Discovery.NodeNamespace)) + // Initialize peer discovery service + // Note: We need to access the internal manager from the adapter + peerDiscoveryManager := pubsub.NewManager(ps, n.config.Discovery.NodeNamespace) + n.peerDiscoveryService = pubsub.NewPeerDiscoveryService(h, peerDiscoveryManager, n.logger.Logger) + if err := n.peerDiscoveryService.Start(); err != nil { + return fmt.Errorf("failed to start peer discovery service: %w", err) + } + n.logger.ComponentInfo(logging.ComponentNode, "Started active peer discovery service") + // Log configured bootstrap peers if len(n.config.Discovery.BootstrapPeers) > 0 { n.logger.ComponentInfo(logging.ComponentNode, "Configured bootstrap peers", @@ -564,6 +574,13 @@ func (n *Node) Stop() error { // Stop peer discovery n.stopPeerDiscovery() + // Stop peer discovery service + if n.peerDiscoveryService != nil { + if err := n.peerDiscoveryService.Stop(); err != nil { + n.logger.ComponentWarn(logging.ComponentNode, "Error stopping peer discovery service", zap.Error(err)) + } + } + // Stop cluster manager if n.clusterManager != nil { if err := n.clusterManager.Stop(); err != nil { diff --git a/pkg/pubsub/adapter.go b/pkg/pubsub/adapter.go index 51e0893..9ad0d07 100644 --- a/pkg/pubsub/adapter.go +++ b/pkg/pubsub/adapter.go @@ -41,4 +41,4 @@ func (a *ClientAdapter) ListTopics(ctx context.Context) ([]string, error) { // Close closes all subscriptions and topics func (a *ClientAdapter) Close() error { return a.manager.Close() -} \ No newline at end of file +} diff --git a/pkg/pubsub/peer_discovery.go b/pkg/pubsub/peer_discovery.go new file mode 100644 index 0000000..d029b63 --- /dev/null +++ b/pkg/pubsub/peer_discovery.go @@ -0,0 +1,194 @@ +package pubsub + +import ( + "context" + "encoding/json" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "go.uber.org/zap" +) + +// PeerAnnouncement represents a peer announcing its addresses +type PeerAnnouncement struct { + PeerID string `json:"peer_id"` + Addresses []string `json:"addresses"` + Timestamp int64 `json:"timestamp"` +} + +// PeerDiscoveryService handles active peer discovery via pubsub +type PeerDiscoveryService struct { + host host.Host + manager *Manager + logger *zap.Logger + ctx context.Context + cancel context.CancelFunc + topicName string +} + +// NewPeerDiscoveryService creates a new peer discovery service +func NewPeerDiscoveryService(host host.Host, manager *Manager, logger *zap.Logger) *PeerDiscoveryService { + ctx, cancel := context.WithCancel(context.Background()) + return &PeerDiscoveryService{ + host: host, + manager: manager, + logger: logger, + ctx: ctx, + cancel: cancel, + topicName: "/debros/peer-discovery/v1", + } +} + +// Start begins the peer discovery service +func (pds *PeerDiscoveryService) Start() error { + // Subscribe to peer discovery topic + if err := pds.manager.Subscribe(pds.ctx, pds.topicName, pds.handlePeerAnnouncement); err != nil { + return err + } + + // Announce our own presence periodically + go pds.announcePeriodically() + + return nil +} + +// Stop stops the peer discovery service +func (pds *PeerDiscoveryService) Stop() error { + pds.cancel() + return pds.manager.Unsubscribe(pds.ctx, pds.topicName) +} + +// announcePeriodically announces this peer's addresses to the network +func (pds *PeerDiscoveryService) announcePeriodically() { + // Initial announcement after a short delay to ensure subscriptions are ready + time.Sleep(2 * time.Second) + pds.announceOurselves() + + // Then announce periodically + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-pds.ctx.Done(): + return + case <-ticker.C: + pds.announceOurselves() + } + } +} + +// announceOurselves publishes our peer info to the discovery topic +func (pds *PeerDiscoveryService) announceOurselves() { + // Get our listen addresses + addrs := pds.host.Addrs() + addrStrs := make([]string, 0, len(addrs)) + + for _, addr := range addrs { + // Create full multiaddr with peer ID + fullAddr := addr.Encapsulate(multiaddr.StringCast("/p2p/" + pds.host.ID().String())) + addrStrs = append(addrStrs, fullAddr.String()) + } + + announcement := PeerAnnouncement{ + PeerID: pds.host.ID().String(), + Addresses: addrStrs, + Timestamp: time.Now().Unix(), + } + + data, err := json.Marshal(announcement) + if err != nil { + pds.logger.Debug("Failed to marshal peer announcement", zap.Error(err)) + return + } + + if err := pds.manager.Publish(pds.ctx, pds.topicName, data); err != nil { + pds.logger.Debug("Failed to publish peer announcement", zap.Error(err)) + } else { + pds.logger.Debug("Announced peer presence", + zap.String("peer_id", pds.host.ID().String()[:16]+"..."), + zap.Int("addresses", len(addrStrs))) + } +} + +// handlePeerAnnouncement processes incoming peer announcements +func (pds *PeerDiscoveryService) handlePeerAnnouncement(topic string, data []byte) error { + var announcement PeerAnnouncement + if err := json.Unmarshal(data, &announcement); err != nil { + pds.logger.Debug("Failed to unmarshal peer announcement", zap.Error(err)) + return nil // Don't return error, just skip invalid messages + } + + // Skip our own announcements + if announcement.PeerID == pds.host.ID().String() { + return nil + } + + // Validate the announcement is recent (within last 5 minutes) + if time.Now().Unix()-announcement.Timestamp > 300 { + return nil // Ignore stale announcements + } + + // Parse peer ID + peerID, err := peer.Decode(announcement.PeerID) + if err != nil { + pds.logger.Debug("Invalid peer ID in announcement", zap.Error(err)) + return nil + } + + // Skip if it's our own ID (redundant check) + if peerID == pds.host.ID() { + return nil + } + + // Parse and add addresses to peerstore + var validAddrs []multiaddr.Multiaddr + for _, addrStr := range announcement.Addresses { + addr, err := multiaddr.NewMultiaddr(addrStr) + if err != nil { + continue + } + validAddrs = append(validAddrs, addr) + } + + if len(validAddrs) == 0 { + return nil + } + + // Add peer info to peerstore with a reasonable TTL + pds.host.Peerstore().AddAddrs(peerID, validAddrs, time.Hour*24) + + pds.logger.Debug("Discovered peer via announcement", + zap.String("peer_id", peerID.String()[:16]+"..."), + zap.Int("addresses", len(validAddrs))) + + // Try to connect to the peer if we're not already connected + if pds.host.Network().Connectedness(peerID) != 1 { // 1 = Connected + go pds.tryConnectToPeer(peerID, validAddrs) + } + + return nil +} + +// tryConnectToPeer attempts to connect to a discovered peer +func (pds *PeerDiscoveryService) tryConnectToPeer(peerID peer.ID, addrs []multiaddr.Multiaddr) { + ctx, cancel := context.WithTimeout(pds.ctx, 15*time.Second) + defer cancel() + + peerInfo := peer.AddrInfo{ + ID: peerID, + Addrs: addrs, + } + + if err := pds.host.Connect(ctx, peerInfo); err != nil { + pds.logger.Debug("Failed to connect to discovered peer", + zap.String("peer_id", peerID.String()[:16]+"..."), + zap.Error(err)) + return + } + + pds.logger.Info("Successfully connected to discovered peer", + zap.String("peer_id", peerID.String()[:16]+"...")) +} diff --git a/pkg/rqlite/cluster_manager.go b/pkg/rqlite/cluster_manager.go index 2feafec..cf928e0 100644 --- a/pkg/rqlite/cluster_manager.go +++ b/pkg/rqlite/cluster_manager.go @@ -363,7 +363,16 @@ func (cm *ClusterManager) announceCapacity() { // monitorHealth monitors the health of active databases func (cm *ClusterManager) monitorHealth() { - ticker := time.NewTicker(cm.discoveryConfig.HealthCheckInterval) + // Use a default interval if the configured one is invalid + interval := cm.discoveryConfig.HealthCheckInterval + if interval <= 0 { + interval = 10 * time.Second + cm.logger.Warn("Invalid health check interval, using default", + zap.Duration("configured", cm.discoveryConfig.HealthCheckInterval), + zap.Duration("default", interval)) + } + + ticker := time.NewTicker(interval) defer ticker.Stop() for {