network/pkg/discovery/discovery.go
anonpenguin23 3f63194c22
Update node configuration and enhance peer discovery protocol
- Changed the configuration file for run-node3 to use node3.yaml.
- Modified select_data_dir function to require a hasConfigFile parameter and added error handling for missing configuration.
- Updated main function to pass the config path to select_data_dir.
- Introduced a peer exchange protocol in the discovery package, allowing nodes to request and exchange peer information.
- Refactored peer discovery logic in the node package to utilize the new discovery manager for active peer exchange.
- Cleaned up unused code related to previous peer discovery methods.
2025-10-22 07:13:47 +03:00

389 lines
9.9 KiB
Go

package discovery
import (
"context"
"encoding/json"
"errors"
"io"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
)
// Protocol ID for peer exchange
const PeerExchangeProtocol = "/debros/peer-exchange/1.0.0"
// PeerExchangeRequest represents a request for peer information
type PeerExchangeRequest struct {
Limit int `json:"limit"`
}
// PeerExchangeResponse represents a list of peers to exchange
type PeerExchangeResponse struct {
Peers []PeerInfo `json:"peers"`
}
// PeerInfo contains peer identity and addresses
type PeerInfo struct {
ID string `json:"id"`
Addrs []string `json:"addrs"`
}
// Manager handles peer discovery operations without a DHT dependency.
// Note: The constructor intentionally accepts a second parameter of type
// interface{} to remain source-compatible with previous call sites that
// passed a DHT instance. The value is ignored.
type Manager struct {
host host.Host
logger *zap.Logger
cancel context.CancelFunc
}
// Config contains discovery configuration
type Config struct {
DiscoveryInterval time.Duration
MaxConnections int
}
// NewManager creates a new discovery manager.
//
// The second parameter is intentionally typed as interface{} so callers that
// previously passed a DHT instance can continue to do so; the value is ignored.
func NewManager(h host.Host, _ interface{}, logger *zap.Logger) *Manager {
return &Manager{
host: h,
logger: logger,
}
}
// NewManagerSimple creates a manager with a cleaner signature (host + logger).
func NewManagerSimple(h host.Host, logger *zap.Logger) *Manager {
return NewManager(h, nil, logger)
}
// StartProtocolHandler registers the peer exchange protocol handler on the host
func (d *Manager) StartProtocolHandler() {
d.host.SetStreamHandler(PeerExchangeProtocol, d.handlePeerExchangeStream)
d.logger.Debug("Registered peer exchange protocol handler")
}
// handlePeerExchangeStream handles incoming peer exchange requests
func (d *Manager) handlePeerExchangeStream(s network.Stream) {
defer s.Close()
// Read request
var req PeerExchangeRequest
decoder := json.NewDecoder(s)
if err := decoder.Decode(&req); err != nil {
d.logger.Debug("Failed to decode peer exchange request", zap.Error(err))
return
}
// Get local peer list
peers := d.host.Peerstore().Peers()
if req.Limit <= 0 {
req.Limit = 10 // Default limit
}
if req.Limit > len(peers) {
req.Limit = len(peers)
}
// Build response with peer information
resp := PeerExchangeResponse{Peers: make([]PeerInfo, 0, req.Limit)}
added := 0
for _, pid := range peers {
if added >= req.Limit {
break
}
// Skip self
if pid == d.host.ID() {
continue
}
addrs := d.host.Peerstore().Addrs(pid)
if len(addrs) == 0 {
continue
}
// Convert addresses to strings
addrStrs := make([]string, len(addrs))
for i, addr := range addrs {
addrStrs[i] = addr.String()
}
resp.Peers = append(resp.Peers, PeerInfo{
ID: pid.String(),
Addrs: addrStrs,
})
added++
}
// Send response
encoder := json.NewEncoder(s)
if err := encoder.Encode(&resp); err != nil {
d.logger.Debug("Failed to encode peer exchange response", zap.Error(err))
return
}
d.logger.Debug("Sent peer exchange response",
zap.Int("peer_count", len(resp.Peers)))
}
// Start begins periodic peer discovery
func (d *Manager) Start(config Config) error {
ctx, cancel := context.WithCancel(context.Background())
d.cancel = cancel
go func() {
// Do initial discovery immediately
d.discoverPeers(ctx, config)
// Continue with periodic discovery
ticker := time.NewTicker(config.DiscoveryInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
d.discoverPeers(ctx, config)
}
}
}()
return nil
}
// Stop stops peer discovery
func (d *Manager) Stop() {
if d.cancel != nil {
d.cancel()
}
}
// discoverPeers discovers and connects to new peers using non-DHT strategies:
// - Peerstore entries (bootstrap peers added to peerstore by the caller)
// - Peer exchange: query currently connected peers' peerstore entries
func (d *Manager) discoverPeers(ctx context.Context, config Config) {
connectedPeers := d.host.Network().Peers()
initialCount := len(connectedPeers)
d.logger.Debug("Starting peer discovery",
zap.Int("current_peers", initialCount))
newConnections := 0
// Strategy 1: Try to connect to peers learned from the host's peerstore
newConnections += d.discoverViaPeerstore(ctx, config.MaxConnections-newConnections)
// Strategy 2: Ask connected peers about their connections (peer exchange)
if newConnections < config.MaxConnections {
newConnections += d.discoverViaPeerExchange(ctx, config.MaxConnections-newConnections)
}
finalPeerCount := len(d.host.Network().Peers())
if newConnections > 0 || finalPeerCount != initialCount {
d.logger.Debug("Peer discovery completed",
zap.Int("new_connections", newConnections),
zap.Int("initial_peers", initialCount),
zap.Int("final_peers", finalPeerCount))
}
}
// discoverViaPeerstore attempts to connect to peers found in the host's peerstore.
// This is useful for bootstrap peers that have been pre-populated into the peerstore.
func (d *Manager) discoverViaPeerstore(ctx context.Context, maxConnections int) int {
if maxConnections <= 0 {
return 0
}
connected := 0
// Iterate over peerstore known peers
peers := d.host.Peerstore().Peers()
d.logger.Debug("Peerstore contains peers", zap.Int("count", len(peers)))
for _, pid := range peers {
if connected >= maxConnections {
break
}
// Skip self
if pid == d.host.ID() {
continue
}
// Skip already connected peers
if d.host.Network().Connectedness(pid) != network.NotConnected {
continue
}
// Try to connect
if err := d.connectToPeer(ctx, pid); err == nil {
connected++
}
}
return connected
}
// discoverViaPeerExchange asks currently connected peers for addresses of other peers
// by using an active peer exchange protocol.
func (d *Manager) discoverViaPeerExchange(ctx context.Context, maxConnections int) int {
if maxConnections <= 0 {
return 0
}
connected := 0
connectedPeers := d.host.Network().Peers()
if len(connectedPeers) == 0 {
return 0
}
d.logger.Debug("Starting peer exchange with connected peers",
zap.Int("num_peers", len(connectedPeers)))
for _, peerID := range connectedPeers {
if connected >= maxConnections {
break
}
// Request peer list from this peer
peers := d.requestPeersFromPeer(ctx, peerID, maxConnections-connected)
if len(peers) == 0 {
continue
}
d.logger.Debug("Received peer list from peer",
zap.String("from_peer", peerID.String()[:8]+"..."),
zap.Int("peer_count", len(peers)))
// Try to connect to discovered peers
for _, peerInfo := range peers {
if connected >= maxConnections {
break
}
// Parse peer ID and addresses
parsedID, err := peer.Decode(peerInfo.ID)
if err != nil {
d.logger.Debug("Failed to parse peer ID", zap.Error(err))
continue
}
// Skip self
if parsedID == d.host.ID() {
continue
}
// Skip if already connected
if d.host.Network().Connectedness(parsedID) != network.NotConnected {
continue
}
// Parse addresses
addrs := make([]multiaddr.Multiaddr, 0, len(peerInfo.Addrs))
for _, addrStr := range peerInfo.Addrs {
ma, err := multiaddr.NewMultiaddr(addrStr)
if err != nil {
d.logger.Debug("Failed to parse multiaddr", zap.Error(err))
continue
}
addrs = append(addrs, ma)
}
if len(addrs) == 0 {
continue
}
// Add to peerstore
d.host.Peerstore().AddAddrs(parsedID, addrs, time.Hour*24)
// Try to connect
connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
peerAddrInfo := peer.AddrInfo{ID: parsedID, Addrs: addrs}
if err := d.host.Connect(connectCtx, peerAddrInfo); err != nil {
cancel()
d.logger.Debug("Failed to connect to discovered peer",
zap.String("peer_id", parsedID.String()[:8]+"..."),
zap.Error(err))
continue
}
cancel()
d.logger.Info("Successfully connected to discovered peer",
zap.String("peer_id", parsedID.String()[:8]+"..."),
zap.String("discovered_from", peerID.String()[:8]+"..."))
connected++
}
}
return connected
}
// requestPeersFromPeer asks a specific peer for its peer list
func (d *Manager) requestPeersFromPeer(ctx context.Context, peerID peer.ID, limit int) []PeerInfo {
// Open a stream to the peer
stream, err := d.host.NewStream(ctx, peerID, PeerExchangeProtocol)
if err != nil {
d.logger.Debug("Failed to open peer exchange stream",
zap.String("peer_id", peerID.String()[:8]+"..."),
zap.Error(err))
return nil
}
defer stream.Close()
// Send request
req := PeerExchangeRequest{Limit: limit}
encoder := json.NewEncoder(stream)
if err := encoder.Encode(&req); err != nil {
d.logger.Debug("Failed to send peer exchange request", zap.Error(err))
return nil
}
// Set read deadline
if err := stream.SetReadDeadline(time.Now().Add(10 * time.Second)); err != nil {
d.logger.Debug("Failed to set read deadline", zap.Error(err))
return nil
}
// Read response
var resp PeerExchangeResponse
decoder := json.NewDecoder(stream)
if err := decoder.Decode(&resp); err != nil {
if err != io.EOF {
d.logger.Debug("Failed to read peer exchange response", zap.Error(err))
}
return nil
}
return resp.Peers
}
// connectToPeer attempts to connect to a specific peer using its peerstore info.
func (d *Manager) connectToPeer(ctx context.Context, peerID peer.ID) error {
peerInfo := d.host.Peerstore().PeerInfo(peerID)
if len(peerInfo.Addrs) == 0 {
return errors.New("no addresses for peer")
}
// Attempt connection
if err := d.host.Connect(ctx, peerInfo); err != nil {
d.logger.Debug("Failed to connect to peer",
zap.String("peer_id", peerID.String()[:8]+"..."),
zap.Error(err))
return err
}
d.logger.Debug("Successfully connected to peer",
zap.String("peer_id", peerID.String()[:8]+"..."))
return nil
}