package discovery import ( "context" "encoding/json" "errors" "io" "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "go.uber.org/zap" ) // Protocol ID for peer exchange const PeerExchangeProtocol = "/debros/peer-exchange/1.0.0" // PeerExchangeRequest represents a request for peer information type PeerExchangeRequest struct { Limit int `json:"limit"` } // PeerExchangeResponse represents a list of peers to exchange type PeerExchangeResponse struct { Peers []PeerInfo `json:"peers"` } // PeerInfo contains peer identity and addresses type PeerInfo struct { ID string `json:"id"` Addrs []string `json:"addrs"` } // Manager handles peer discovery operations without a DHT dependency. // Note: The constructor intentionally accepts a second parameter of type // interface{} to remain source-compatible with previous call sites that // passed a DHT instance. The value is ignored. type Manager struct { host host.Host logger *zap.Logger cancel context.CancelFunc } // Config contains discovery configuration type Config struct { DiscoveryInterval time.Duration MaxConnections int } // NewManager creates a new discovery manager. // // The second parameter is intentionally typed as interface{} so callers that // previously passed a DHT instance can continue to do so; the value is ignored. func NewManager(h host.Host, _ interface{}, logger *zap.Logger) *Manager { return &Manager{ host: h, logger: logger, } } // NewManagerSimple creates a manager with a cleaner signature (host + logger). func NewManagerSimple(h host.Host, logger *zap.Logger) *Manager { return NewManager(h, nil, logger) } // StartProtocolHandler registers the peer exchange protocol handler on the host func (d *Manager) StartProtocolHandler() { d.host.SetStreamHandler(PeerExchangeProtocol, d.handlePeerExchangeStream) d.logger.Debug("Registered peer exchange protocol handler") } // handlePeerExchangeStream handles incoming peer exchange requests func (d *Manager) handlePeerExchangeStream(s network.Stream) { defer s.Close() // Read request var req PeerExchangeRequest decoder := json.NewDecoder(s) if err := decoder.Decode(&req); err != nil { d.logger.Debug("Failed to decode peer exchange request", zap.Error(err)) return } // Get local peer list peers := d.host.Peerstore().Peers() if req.Limit <= 0 { req.Limit = 10 // Default limit } if req.Limit > len(peers) { req.Limit = len(peers) } // Build response with peer information resp := PeerExchangeResponse{Peers: make([]PeerInfo, 0, req.Limit)} added := 0 for _, pid := range peers { if added >= req.Limit { break } // Skip self if pid == d.host.ID() { continue } addrs := d.host.Peerstore().Addrs(pid) if len(addrs) == 0 { continue } // Convert addresses to strings addrStrs := make([]string, len(addrs)) for i, addr := range addrs { addrStrs[i] = addr.String() } resp.Peers = append(resp.Peers, PeerInfo{ ID: pid.String(), Addrs: addrStrs, }) added++ } // Send response encoder := json.NewEncoder(s) if err := encoder.Encode(&resp); err != nil { d.logger.Debug("Failed to encode peer exchange response", zap.Error(err)) return } d.logger.Debug("Sent peer exchange response", zap.Int("peer_count", len(resp.Peers))) } // Start begins periodic peer discovery func (d *Manager) Start(config Config) error { ctx, cancel := context.WithCancel(context.Background()) d.cancel = cancel go func() { // Do initial discovery immediately d.discoverPeers(ctx, config) // Continue with periodic discovery ticker := time.NewTicker(config.DiscoveryInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: d.discoverPeers(ctx, config) } } }() return nil } // Stop stops peer discovery func (d *Manager) Stop() { if d.cancel != nil { d.cancel() } } // discoverPeers discovers and connects to new peers using non-DHT strategies: // - Peerstore entries (bootstrap peers added to peerstore by the caller) // - Peer exchange: query currently connected peers' peerstore entries func (d *Manager) discoverPeers(ctx context.Context, config Config) { connectedPeers := d.host.Network().Peers() initialCount := len(connectedPeers) d.logger.Debug("Starting peer discovery", zap.Int("current_peers", initialCount)) newConnections := 0 // Strategy 1: Try to connect to peers learned from the host's peerstore newConnections += d.discoverViaPeerstore(ctx, config.MaxConnections-newConnections) // Strategy 2: Ask connected peers about their connections (peer exchange) if newConnections < config.MaxConnections { newConnections += d.discoverViaPeerExchange(ctx, config.MaxConnections-newConnections) } finalPeerCount := len(d.host.Network().Peers()) if newConnections > 0 || finalPeerCount != initialCount { d.logger.Debug("Peer discovery completed", zap.Int("new_connections", newConnections), zap.Int("initial_peers", initialCount), zap.Int("final_peers", finalPeerCount)) } } // discoverViaPeerstore attempts to connect to peers found in the host's peerstore. // This is useful for bootstrap peers that have been pre-populated into the peerstore. func (d *Manager) discoverViaPeerstore(ctx context.Context, maxConnections int) int { if maxConnections <= 0 { return 0 } connected := 0 // Iterate over peerstore known peers peers := d.host.Peerstore().Peers() d.logger.Debug("Peerstore contains peers", zap.Int("count", len(peers))) for _, pid := range peers { if connected >= maxConnections { break } // Skip self if pid == d.host.ID() { continue } // Skip already connected peers if d.host.Network().Connectedness(pid) != network.NotConnected { continue } // Try to connect if err := d.connectToPeer(ctx, pid); err == nil { connected++ } } return connected } // discoverViaPeerExchange asks currently connected peers for addresses of other peers // by using an active peer exchange protocol. func (d *Manager) discoverViaPeerExchange(ctx context.Context, maxConnections int) int { if maxConnections <= 0 { return 0 } connected := 0 connectedPeers := d.host.Network().Peers() if len(connectedPeers) == 0 { return 0 } d.logger.Debug("Starting peer exchange with connected peers", zap.Int("num_peers", len(connectedPeers))) for _, peerID := range connectedPeers { if connected >= maxConnections { break } // Request peer list from this peer peers := d.requestPeersFromPeer(ctx, peerID, maxConnections-connected) if len(peers) == 0 { continue } d.logger.Debug("Received peer list from peer", zap.String("from_peer", peerID.String()[:8]+"..."), zap.Int("peer_count", len(peers))) // Try to connect to discovered peers for _, peerInfo := range peers { if connected >= maxConnections { break } // Parse peer ID and addresses parsedID, err := peer.Decode(peerInfo.ID) if err != nil { d.logger.Debug("Failed to parse peer ID", zap.Error(err)) continue } // Skip self if parsedID == d.host.ID() { continue } // Skip if already connected if d.host.Network().Connectedness(parsedID) != network.NotConnected { continue } // Parse addresses addrs := make([]multiaddr.Multiaddr, 0, len(peerInfo.Addrs)) for _, addrStr := range peerInfo.Addrs { ma, err := multiaddr.NewMultiaddr(addrStr) if err != nil { d.logger.Debug("Failed to parse multiaddr", zap.Error(err)) continue } addrs = append(addrs, ma) } if len(addrs) == 0 { continue } // Add to peerstore d.host.Peerstore().AddAddrs(parsedID, addrs, time.Hour*24) // Try to connect connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second) peerAddrInfo := peer.AddrInfo{ID: parsedID, Addrs: addrs} if err := d.host.Connect(connectCtx, peerAddrInfo); err != nil { cancel() d.logger.Debug("Failed to connect to discovered peer", zap.String("peer_id", parsedID.String()[:8]+"..."), zap.Error(err)) continue } cancel() d.logger.Info("Successfully connected to discovered peer", zap.String("peer_id", parsedID.String()[:8]+"..."), zap.String("discovered_from", peerID.String()[:8]+"...")) connected++ } } return connected } // requestPeersFromPeer asks a specific peer for its peer list func (d *Manager) requestPeersFromPeer(ctx context.Context, peerID peer.ID, limit int) []PeerInfo { // Open a stream to the peer stream, err := d.host.NewStream(ctx, peerID, PeerExchangeProtocol) if err != nil { d.logger.Debug("Failed to open peer exchange stream", zap.String("peer_id", peerID.String()[:8]+"..."), zap.Error(err)) return nil } defer stream.Close() // Send request req := PeerExchangeRequest{Limit: limit} encoder := json.NewEncoder(stream) if err := encoder.Encode(&req); err != nil { d.logger.Debug("Failed to send peer exchange request", zap.Error(err)) return nil } // Set read deadline if err := stream.SetReadDeadline(time.Now().Add(10 * time.Second)); err != nil { d.logger.Debug("Failed to set read deadline", zap.Error(err)) return nil } // Read response var resp PeerExchangeResponse decoder := json.NewDecoder(stream) if err := decoder.Decode(&resp); err != nil { if err != io.EOF { d.logger.Debug("Failed to read peer exchange response", zap.Error(err)) } return nil } return resp.Peers } // connectToPeer attempts to connect to a specific peer using its peerstore info. func (d *Manager) connectToPeer(ctx context.Context, peerID peer.ID) error { peerInfo := d.host.Peerstore().PeerInfo(peerID) if len(peerInfo.Addrs) == 0 { return errors.New("no addresses for peer") } // Attempt connection if err := d.host.Connect(ctx, peerInfo); err != nil { d.logger.Debug("Failed to connect to peer", zap.String("peer_id", peerID.String()[:8]+"..."), zap.Error(err)) return err } d.logger.Debug("Successfully connected to peer", zap.String("peer_id", peerID.String()[:8]+"...")) return nil }