Remove peer discovery from client; improve bootstrap and monitoring

This commit is contained in:
anonpenguin 2025-08-14 14:51:58 +03:00
parent 138644aebd
commit 170b06b213
5 changed files with 168 additions and 117 deletions

View File

@ -18,7 +18,6 @@ import (
libp2ppubsub "github.com/libp2p/go-libp2p-pubsub" libp2ppubsub "github.com/libp2p/go-libp2p-pubsub"
"git.debros.io/DeBros/network/pkg/anyoneproxy" "git.debros.io/DeBros/network/pkg/anyoneproxy"
"git.debros.io/DeBros/network/pkg/discovery"
"git.debros.io/DeBros/network/pkg/pubsub" "git.debros.io/DeBros/network/pkg/pubsub"
"git.debros.io/DeBros/network/pkg/storage" "git.debros.io/DeBros/network/pkg/storage"
) )
@ -38,9 +37,6 @@ type Client struct {
network *NetworkInfoImpl network *NetworkInfoImpl
pubsub *pubSubBridge pubsub *pubSubBridge
// Managers
discoveryMgr *discovery.Manager
// State // State
connected bool connected bool
startTime time.Time startTime time.Time
@ -211,20 +207,11 @@ func (c *Client) Connect() error {
} }
} }
// Initialize discovery manager (discovery.NewManager accepts a second parameter for compatibility; // Client is a lightweight P2P participant - no discovery needed
// the value is ignored by the new implementation) // We only connect to known bootstrap peers and let nodes handle discovery
c.discoveryMgr = discovery.NewManager(c.host, nil, c.logger) c.logger.Debug("Client configured as lightweight P2P participant (no discovery)")
// Start peer discovery // Start minimal connection monitoring
discoveryConfig := discovery.Config{
DiscoveryInterval: 5 * time.Second, // More frequent discovery
MaxConnections: 10, // Allow more connections
}
if err := c.discoveryMgr.Start(discoveryConfig); err != nil {
c.logger.Warn("Failed to start peer discovery", zap.Error(err))
}
// Start connection monitoring
c.startConnectionMonitoring() c.startConnectionMonitoring()
c.connected = true c.connected = true
@ -243,11 +230,6 @@ func (c *Client) Disconnect() error {
return nil return nil
} }
// Stop peer discovery
if c.discoveryMgr != nil {
c.discoveryMgr.Stop()
}
// Close pubsub adapter // Close pubsub adapter
if c.pubsub != nil && c.pubsub.adapter != nil { if c.pubsub != nil && c.pubsub.adapter != nil {
if err := c.pubsub.adapter.Close(); err != nil { if err := c.pubsub.adapter.Close(); err != nil {

View File

@ -19,39 +19,28 @@ func (c *Client) connectToBootstrap(ctx context.Context, addr string) error {
// Try to extract peer info if it's a full multiaddr with peer ID // Try to extract peer info if it's a full multiaddr with peer ID
peerInfo, err := peer.AddrInfoFromP2pAddr(ma) peerInfo, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil { if err != nil {
// If there's no peer ID, try to discover the peer at this address // If there's no peer ID, we can't connect
return c.connectToAddress(ctx, ma) c.logger.Warn("Bootstrap address missing peer ID, skipping",
} zap.String("addr", addr))
// Avoid dialing ourselves: if the bootstrap address resolves to our own peer ID, skip.
c.logger.Debug(string(peerInfo.ID))
c.logger.Debug(string(c.host.ID()))
if c.host != nil && peerInfo.ID == c.host.ID() {
c.logger.Debug("Skipping bootstrap address because it resolves to self",
zap.String("addr", addr),
zap.String("peer", peerInfo.ID.String()))
return nil return nil
} }
// Avoid dialing ourselves: if the bootstrap address resolves to our own peer ID, skip.
if c.host != nil && peerInfo.ID == c.host.ID() {
c.logger.Debug("Skipping bootstrap address because it resolves to self",
zap.String("addr", addr),
zap.String("peer_id", peerInfo.ID.String()))
return nil
}
// Attempt connection
if err := c.host.Connect(ctx, *peerInfo); err != nil { if err := c.host.Connect(ctx, *peerInfo); err != nil {
return fmt.Errorf("failed to connect to peer: %w", err) return fmt.Errorf("failed to connect to peer: %w", err)
} }
c.logger.Debug("Connected to bootstrap peer", c.logger.Debug("Connected to bootstrap peer",
zap.String("peer", peerInfo.ID.String()), zap.String("peer_id", peerInfo.ID.String()),
zap.String("addr", addr)) zap.String("addr", addr))
return nil return nil
} }
// connectToAddress attempts to discover and connect to a peer at the given address
func (c *Client) connectToAddress(ctx context.Context, ma multiaddr.Multiaddr) error {
// For the simple case, we'll just warn and continue
// In a production environment, you'd implement proper peer discovery
c.logger.Warn("No peer ID provided in address, skipping bootstrap connection",
zap.String("addr", ma.String()),
zap.String("suggestion", "Use full multiaddr with peer ID like: /ip4/127.0.0.1/tcp/4001/p2p/<peer-id>"))
return nil // Don't fail - let the client continue without bootstrap
}

View File

@ -1,6 +1,7 @@
package client package client
import ( import (
"os"
"strconv" "strconv"
"strings" "strings"
@ -9,45 +10,51 @@ import (
) )
// DefaultBootstrapPeers returns the library's default bootstrap peer multiaddrs. // DefaultBootstrapPeers returns the library's default bootstrap peer multiaddrs.
// These can be overridden by environment variables or config.
func DefaultBootstrapPeers() []string { func DefaultBootstrapPeers() []string {
var cfg *config.Config // Check environment variable first
return cfg.Discovery.BootstrapPeers if envPeers := os.Getenv("DEBROS_BOOTSTRAP_PEERS"); envPeers != "" {
} return splitCSVOrSpace(envPeers)
// truthy reports if s is a common truthy string.
func truthy(s string) bool {
switch s {
case "1", "true", "TRUE", "True", "yes", "YES", "on", "ON":
return true
default:
return false
} }
// Return defaults from config package
defaultCfg := config.DefaultConfig()
return defaultCfg.Discovery.BootstrapPeers
} }
// DefaultDatabaseEndpoints returns default DB HTTP endpoints derived from default bootstrap peers. // DefaultDatabaseEndpoints returns default DB HTTP endpoints.
// Port defaults to RQLite HTTP 5001, or RQLITE_PORT if set. // These can be overridden by environment variables or config.
func DefaultDatabaseEndpoints() []string { func DefaultDatabaseEndpoints() []string {
var cfg *config.Config // Check environment variable first
peers := DefaultBootstrapPeers() if envNodes := os.Getenv("RQLITE_NODES"); envNodes != "" {
port := cfg.Database.RQLitePort return normalizeEndpoints(splitCSVOrSpace(envNodes))
if len(peers) == 0 {
return []string{"http://localhost:" + strconv.Itoa(cfg.Database.RQLitePort)}
} }
endpoints := make([]string, 0, len(peers)) // Get default port from environment or use port from config
for _, s := range peers { defaultCfg := config.DefaultConfig()
ma, err := multiaddr.NewMultiaddr(s) port := defaultCfg.Database.RQLitePort
if err != nil { if envPort := os.Getenv("RQLITE_PORT"); envPort != "" {
continue if p, err := strconv.Atoi(envPort); err == nil && p > 0 {
port = p
} }
endpoints = append(endpoints, endpointFromMultiaddr(ma, port))
} }
out := dedupeStrings(endpoints) // Try to derive from bootstrap peers if available
if len(out) == 0 { peers := DefaultBootstrapPeers()
out = []string{"http://localhost:" + strconv.Itoa(port)} if len(peers) > 0 {
endpoints := make([]string, 0, len(peers))
for _, s := range peers {
ma, err := multiaddr.NewMultiaddr(s)
if err != nil {
continue
}
endpoints = append(endpoints, endpointFromMultiaddr(ma, port))
}
return dedupeStrings(endpoints)
} }
return out
// Fallback to localhost
return []string{"http://localhost:" + strconv.Itoa(port)}
} }
// MapAddrsToDBEndpoints converts a set of peer multiaddrs to DB HTTP endpoints using dbPort. // MapAddrsToDBEndpoints converts a set of peer multiaddrs to DB HTTP endpoints using dbPort.
@ -62,8 +69,10 @@ func MapAddrsToDBEndpoints(addrs []multiaddr.Multiaddr, dbPort int) []string {
return dedupeStrings(eps) return dedupeStrings(eps)
} }
// endpointFromMultiaddr extracts host from multiaddr and creates HTTP endpoint
func endpointFromMultiaddr(ma multiaddr.Multiaddr, port int) string { func endpointFromMultiaddr(ma multiaddr.Multiaddr, port int) string {
var host string var host string
// Prefer DNS if present, then IP // Prefer DNS if present, then IP
if v, err := ma.ValueForProtocol(multiaddr.P_DNS); err == nil && v != "" { if v, err := ma.ValueForProtocol(multiaddr.P_DNS); err == nil && v != "" {
host = v host = v
@ -85,28 +94,88 @@ func endpointFromMultiaddr(ma multiaddr.Multiaddr, port int) string {
} }
if host == "" { if host == "" {
if v, err := ma.ValueForProtocol(multiaddr.P_IP6); err == nil && v != "" { if v, err := ma.ValueForProtocol(multiaddr.P_IP6); err == nil && v != "" {
host = v host = "[" + v + "]" // IPv6 needs brackets in URLs
} }
} }
if host == "" { if host == "" {
host = "localhost" host = "localhost"
} }
return "http://" + host + ":" + strconv.Itoa(port) return "http://" + host + ":" + strconv.Itoa(port)
} }
func dedupeStrings(in []string) []string { // normalizeEndpoints ensures each endpoint has an http scheme and a port (defaults to 5001)
m := make(map[string]struct{}, len(in)) func normalizeEndpoints(in []string) []string {
out := make([]string, 0, len(in)) out := make([]string, 0, len(in))
for _, s := range in { for _, s := range in {
s = strings.TrimSpace(s) s = strings.TrimSpace(s)
if s == "" { if s == "" {
continue continue
} }
if _, ok := m[s]; ok {
continue // Prepend scheme if missing
if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") {
s = "http://" + s
} }
m[s] = struct{}{}
// Simple check for port (doesn't handle all cases but good enough)
if !strings.Contains(s, ":5001") && !strings.Contains(s, ":500") && !strings.Contains(s, ":501") {
// Check if there's already a port after the host
parts := strings.Split(s, "://")
if len(parts) == 2 {
hostPart := parts[1]
// Count colons to detect port (simple heuristic)
colonCount := strings.Count(hostPart, ":")
if colonCount == 0 || (strings.Contains(hostPart, "[") && colonCount == 1) {
// No port found, add default
s = s + ":5001"
}
}
}
out = append(out, s) out = append(out, s)
} }
return out return out
} }
// dedupeStrings removes duplicate strings from slice
func dedupeStrings(in []string) []string {
if len(in) == 0 {
return in
}
seen := make(map[string]struct{}, len(in))
out := make([]string, 0, len(in))
for _, s := range in {
s = strings.TrimSpace(s)
if s == "" {
continue
}
if _, ok := seen[s]; ok {
continue
}
seen[s] = struct{}{}
out = append(out, s)
}
return out
}
// splitCSVOrSpace splits a string by commas or spaces
func splitCSVOrSpace(s string) []string {
// Replace commas with spaces, then split on spaces
s = strings.ReplaceAll(s, ",", " ")
fields := strings.Fields(s)
return fields
}
// truthy reports if s is a common truthy string
func truthy(s string) bool {
switch strings.ToLower(strings.TrimSpace(s)) {
case "1", "true", "yes", "on":
return true
default:
return false
}
}

View File

@ -3,7 +3,6 @@ package client
import ( import (
"context" "context"
"fmt" "fmt"
"net/url"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -187,30 +186,7 @@ func (d *DatabaseClientImpl) getRQLiteNodes() []string {
return DefaultDatabaseEndpoints() return DefaultDatabaseEndpoints()
} }
// normalizeEndpoints ensures each endpoint has an http scheme and a port (defaults to 5001) // normalizeEndpoints is now imported from defaults.go
func normalizeEndpoints(in []string) []string {
out := make([]string, 0, len(in))
for _, s := range in {
s = strings.TrimSpace(s)
if s == "" {
continue
}
// Prepend scheme if missing so url.Parse handles host:port
if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") {
s = "http://" + s
}
u, err := url.Parse(s)
if err != nil || u.Host == "" {
continue
}
// Ensure port present
if h := u.Host; !hasPort(h) {
u.Host = u.Host + ":5001"
}
out = append(out, u.String())
}
return out
}
func hasPort(hostport string) bool { func hasPort(hostport string) bool {
// cheap check for :port suffix (IPv6 with brackets handled by url.Parse earlier) // cheap check for :port suffix (IPv6 with brackets handled by url.Parse earlier)
@ -226,13 +202,6 @@ func hasPort(hostport string) bool {
return false return false
} }
func splitCSVOrSpace(s string) []string {
// replace commas with spaces, then split on spaces
s = strings.ReplaceAll(s, ",", " ")
fields := strings.Fields(s)
return fields
}
// connectToAvailableNode tries to connect to any available RQLite node // connectToAvailableNode tries to connect to any available RQLite node
func (d *DatabaseClientImpl) connectToAvailableNode() (*gorqlite.Connection, error) { func (d *DatabaseClientImpl) connectToAvailableNode() (*gorqlite.Connection, error) {
// Get RQLite nodes from environment or use defaults // Get RQLite nodes from environment or use defaults

View File

@ -2,23 +2,65 @@ package client
import ( import (
"time" "time"
"go.uber.org/zap"
) )
// startConnectionMonitoring monitors connection health and logs status // startConnectionMonitoring starts minimal connection monitoring for the lightweight client.
// Unlike nodes which need extensive monitoring, clients only need basic health checks.
func (c *Client) startConnectionMonitoring() { func (c *Client) startConnectionMonitoring() {
go func() { go func() {
ticker := time.NewTicker(30 * time.Second) ticker := time.NewTicker(60 * time.Second) // Less frequent than nodes (60s vs 30s)
defer ticker.Stop() defer ticker.Stop()
c.logger.Debug("Connection monitoring started") var lastPeerCount int
firstCheck := true
for range ticker.C { for range ticker.C {
if !c.isConnected() { if !c.isConnected() {
c.logger.Debug("Connection monitoring stopped: client disconnected") c.logger.Debug("Connection monitoring stopped: client disconnected")
return return
} }
// Only touch network to detect issues; avoid noisy logs if c.host == nil {
_ = c.host.Network().Peers() return
}
// Get current peer count
peers := c.host.Network().Peers()
currentPeerCount := len(peers)
// Only log if peer count changed or on first check
if firstCheck || currentPeerCount != lastPeerCount {
if currentPeerCount == 0 {
c.logger.Warn("Client has no connected peers",
zap.String("client_id", c.host.ID().String()))
} else if currentPeerCount < lastPeerCount {
c.logger.Info("Client lost peers",
zap.Int("current_peers", currentPeerCount),
zap.Int("previous_peers", lastPeerCount))
} else if currentPeerCount > lastPeerCount && !firstCheck {
c.logger.Debug("Client gained peers",
zap.Int("current_peers", currentPeerCount),
zap.Int("previous_peers", lastPeerCount))
}
lastPeerCount = currentPeerCount
firstCheck = false
}
// Log detailed peer info at debug level occasionally (every 5 minutes)
if time.Now().Unix()%300 == 0 && currentPeerCount > 0 {
peerIDs := make([]string, 0, currentPeerCount)
for _, p := range peers {
peerIDs = append(peerIDs, p.String())
}
c.logger.Debug("Client peer status",
zap.Int("peer_count", currentPeerCount),
zap.Strings("peer_ids", peerIDs))
}
} }
}() }()
c.logger.Debug("Lightweight connection monitoring started")
} }