Fixed discoverability issues and bugs

This commit is contained in:
anonpenguin23 2025-10-13 14:52:01 +03:00
parent f2d5a0790e
commit 85e0bc13f4
No known key found for this signature in database
GPG Key ID: 1CBB1FE35AFBEE30
6 changed files with 241 additions and 16 deletions

View File

@ -54,11 +54,15 @@ func parse_and_return_network_flags() (configPath *string, dataDir, nodeID *stri
// Return config values but preserve command line flag values for overrides
// The command line flags will be applied later in load_args_into_config
// Create separate variables to avoid modifying config directly
configDataDir := cfg.Node.DataDir
configAdvAddr := cfg.Discovery.HttpAdvAddress
return configPath,
&cfg.Node.DataDir,
&cfg.Node.ID,
&configDataDir,
nodeID, // Keep the command line flag value, not config value
p2pPort, // Keep the command line flag value
&cfg.Discovery.HttpAdvAddress,
&configAdvAddr,
help,
cfg // Return the loaded config
}
@ -91,13 +95,12 @@ func check_if_should_open_help(help *bool) {
func select_data_dir(dataDir *string, nodeID *string) {
logger := setup_logger(logging.ComponentNode)
// If dataDir is not set from config, set it based on nodeID
if *dataDir == "" {
if *nodeID == "" {
*dataDir = "./data/node"
} else {
*dataDir = fmt.Sprintf("./data/%s", *nodeID)
}
// If nodeID is provided via command line, use it to override the data directory
if *nodeID != "" {
*dataDir = fmt.Sprintf("./data/%s", *nodeID)
} else if *dataDir == "" {
// Fallback to default if neither nodeID nor dataDir is set
*dataDir = "./data/node"
}
logger.Info("Successfully selected Data Directory of: %s", zap.String("dataDir", *dataDir))
@ -175,7 +178,6 @@ func main() {
_, dataDir, nodeID, p2pPort, advAddr, help, loadedConfig := parse_and_return_network_flags()
check_if_should_open_help(help)
select_data_dir(dataDir, nodeID)
// Load Node Configuration - use loaded config if available, otherwise use default
var cfg *config.Config
@ -187,6 +189,9 @@ func main() {
logger.ComponentInfo(logging.ComponentNode, "Using default configuration")
}
// Select data directory based on node ID (this overrides config)
select_data_dir(dataDir, nodeID)
// Apply command line argument overrides
load_args_into_config(cfg, p2pPort, advAddr, dataDir)
logger.ComponentInfo(logging.ComponentNode, "Command line arguments applied to configuration")

View File

@ -114,7 +114,7 @@ func DefaultConfig() *Config {
},
Discovery: DiscoveryConfig{
BootstrapPeers: []string{
"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWKdj4B3LdZ8whYGaa97giwWCoSELciRp6qsFrDvz2Etah",
"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWJSkwRXX1PYA5g2bkAnKani7frhygZp5iY6U853JJPb7K",
// "/ip4/217.76.54.168/tcp/4001/p2p/12D3KooWDp7xeShVY9uHfqNVPSsJeCKUatAviFZV8Y1joox5nUvx",
// "/ip4/217.76.54.178/tcp/4001/p2p/12D3KooWKZnirPwNT4URtNSWK45f6vLkEs4xyUZ792F8Uj1oYnm1",
// "/ip4/51.83.128.181/tcp/4001/p2p/12D3KooWBn2Zf1R8v9pEfmz7hDZ5b3oADxfejA3zJBYzKRCzgvhR",

View File

@ -38,8 +38,9 @@ type Node struct {
clusterManager *database.ClusterManager
// Peer discovery
discoveryCancel context.CancelFunc
bootstrapCancel context.CancelFunc
discoveryCancel context.CancelFunc
bootstrapCancel context.CancelFunc
peerDiscoveryService *pubsub.PeerDiscoveryService
// PubSub
pubsub *pubsub.ClientAdapter
@ -255,6 +256,15 @@ func (n *Node) startLibP2P() error {
n.pubsub = pubsub.NewClientAdapter(ps, n.config.Discovery.NodeNamespace)
n.logger.Info("Initialized pubsub adapter on namespace", zap.String("namespace", n.config.Discovery.NodeNamespace))
// Initialize peer discovery service
// Note: We need to access the internal manager from the adapter
peerDiscoveryManager := pubsub.NewManager(ps, n.config.Discovery.NodeNamespace)
n.peerDiscoveryService = pubsub.NewPeerDiscoveryService(h, peerDiscoveryManager, n.logger.Logger)
if err := n.peerDiscoveryService.Start(); err != nil {
return fmt.Errorf("failed to start peer discovery service: %w", err)
}
n.logger.ComponentInfo(logging.ComponentNode, "Started active peer discovery service")
// Log configured bootstrap peers
if len(n.config.Discovery.BootstrapPeers) > 0 {
n.logger.ComponentInfo(logging.ComponentNode, "Configured bootstrap peers",
@ -564,6 +574,13 @@ func (n *Node) Stop() error {
// Stop peer discovery
n.stopPeerDiscovery()
// Stop peer discovery service
if n.peerDiscoveryService != nil {
if err := n.peerDiscoveryService.Stop(); err != nil {
n.logger.ComponentWarn(logging.ComponentNode, "Error stopping peer discovery service", zap.Error(err))
}
}
// Stop cluster manager
if n.clusterManager != nil {
if err := n.clusterManager.Stop(); err != nil {

View File

@ -41,4 +41,4 @@ func (a *ClientAdapter) ListTopics(ctx context.Context) ([]string, error) {
// Close closes all subscriptions and topics
func (a *ClientAdapter) Close() error {
return a.manager.Close()
}
}

View File

@ -0,0 +1,194 @@
package pubsub
import (
"context"
"encoding/json"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
)
// PeerAnnouncement represents a peer announcing its addresses
type PeerAnnouncement struct {
PeerID string `json:"peer_id"`
Addresses []string `json:"addresses"`
Timestamp int64 `json:"timestamp"`
}
// PeerDiscoveryService handles active peer discovery via pubsub
type PeerDiscoveryService struct {
host host.Host
manager *Manager
logger *zap.Logger
ctx context.Context
cancel context.CancelFunc
topicName string
}
// NewPeerDiscoveryService creates a new peer discovery service
func NewPeerDiscoveryService(host host.Host, manager *Manager, logger *zap.Logger) *PeerDiscoveryService {
ctx, cancel := context.WithCancel(context.Background())
return &PeerDiscoveryService{
host: host,
manager: manager,
logger: logger,
ctx: ctx,
cancel: cancel,
topicName: "/debros/peer-discovery/v1",
}
}
// Start begins the peer discovery service
func (pds *PeerDiscoveryService) Start() error {
// Subscribe to peer discovery topic
if err := pds.manager.Subscribe(pds.ctx, pds.topicName, pds.handlePeerAnnouncement); err != nil {
return err
}
// Announce our own presence periodically
go pds.announcePeriodically()
return nil
}
// Stop stops the peer discovery service
func (pds *PeerDiscoveryService) Stop() error {
pds.cancel()
return pds.manager.Unsubscribe(pds.ctx, pds.topicName)
}
// announcePeriodically announces this peer's addresses to the network
func (pds *PeerDiscoveryService) announcePeriodically() {
// Initial announcement after a short delay to ensure subscriptions are ready
time.Sleep(2 * time.Second)
pds.announceOurselves()
// Then announce periodically
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-pds.ctx.Done():
return
case <-ticker.C:
pds.announceOurselves()
}
}
}
// announceOurselves publishes our peer info to the discovery topic
func (pds *PeerDiscoveryService) announceOurselves() {
// Get our listen addresses
addrs := pds.host.Addrs()
addrStrs := make([]string, 0, len(addrs))
for _, addr := range addrs {
// Create full multiaddr with peer ID
fullAddr := addr.Encapsulate(multiaddr.StringCast("/p2p/" + pds.host.ID().String()))
addrStrs = append(addrStrs, fullAddr.String())
}
announcement := PeerAnnouncement{
PeerID: pds.host.ID().String(),
Addresses: addrStrs,
Timestamp: time.Now().Unix(),
}
data, err := json.Marshal(announcement)
if err != nil {
pds.logger.Debug("Failed to marshal peer announcement", zap.Error(err))
return
}
if err := pds.manager.Publish(pds.ctx, pds.topicName, data); err != nil {
pds.logger.Debug("Failed to publish peer announcement", zap.Error(err))
} else {
pds.logger.Debug("Announced peer presence",
zap.String("peer_id", pds.host.ID().String()[:16]+"..."),
zap.Int("addresses", len(addrStrs)))
}
}
// handlePeerAnnouncement processes incoming peer announcements
func (pds *PeerDiscoveryService) handlePeerAnnouncement(topic string, data []byte) error {
var announcement PeerAnnouncement
if err := json.Unmarshal(data, &announcement); err != nil {
pds.logger.Debug("Failed to unmarshal peer announcement", zap.Error(err))
return nil // Don't return error, just skip invalid messages
}
// Skip our own announcements
if announcement.PeerID == pds.host.ID().String() {
return nil
}
// Validate the announcement is recent (within last 5 minutes)
if time.Now().Unix()-announcement.Timestamp > 300 {
return nil // Ignore stale announcements
}
// Parse peer ID
peerID, err := peer.Decode(announcement.PeerID)
if err != nil {
pds.logger.Debug("Invalid peer ID in announcement", zap.Error(err))
return nil
}
// Skip if it's our own ID (redundant check)
if peerID == pds.host.ID() {
return nil
}
// Parse and add addresses to peerstore
var validAddrs []multiaddr.Multiaddr
for _, addrStr := range announcement.Addresses {
addr, err := multiaddr.NewMultiaddr(addrStr)
if err != nil {
continue
}
validAddrs = append(validAddrs, addr)
}
if len(validAddrs) == 0 {
return nil
}
// Add peer info to peerstore with a reasonable TTL
pds.host.Peerstore().AddAddrs(peerID, validAddrs, time.Hour*24)
pds.logger.Debug("Discovered peer via announcement",
zap.String("peer_id", peerID.String()[:16]+"..."),
zap.Int("addresses", len(validAddrs)))
// Try to connect to the peer if we're not already connected
if pds.host.Network().Connectedness(peerID) != 1 { // 1 = Connected
go pds.tryConnectToPeer(peerID, validAddrs)
}
return nil
}
// tryConnectToPeer attempts to connect to a discovered peer
func (pds *PeerDiscoveryService) tryConnectToPeer(peerID peer.ID, addrs []multiaddr.Multiaddr) {
ctx, cancel := context.WithTimeout(pds.ctx, 15*time.Second)
defer cancel()
peerInfo := peer.AddrInfo{
ID: peerID,
Addrs: addrs,
}
if err := pds.host.Connect(ctx, peerInfo); err != nil {
pds.logger.Debug("Failed to connect to discovered peer",
zap.String("peer_id", peerID.String()[:16]+"..."),
zap.Error(err))
return
}
pds.logger.Info("Successfully connected to discovered peer",
zap.String("peer_id", peerID.String()[:16]+"..."))
}

View File

@ -363,7 +363,16 @@ func (cm *ClusterManager) announceCapacity() {
// monitorHealth monitors the health of active databases
func (cm *ClusterManager) monitorHealth() {
ticker := time.NewTicker(cm.discoveryConfig.HealthCheckInterval)
// Use a default interval if the configured one is invalid
interval := cm.discoveryConfig.HealthCheckInterval
if interval <= 0 {
interval = 10 * time.Second
cm.logger.Warn("Invalid health check interval, using default",
zap.Duration("configured", cm.discoveryConfig.HealthCheckInterval),
zap.Duration("default", interval))
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {