diff --git a/pkg/client/client.go b/pkg/client/client.go index 1ce16e2..4f46c90 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -18,7 +18,6 @@ import ( libp2ppubsub "github.com/libp2p/go-libp2p-pubsub" "git.debros.io/DeBros/network/pkg/anyoneproxy" - "git.debros.io/DeBros/network/pkg/discovery" "git.debros.io/DeBros/network/pkg/pubsub" "git.debros.io/DeBros/network/pkg/storage" ) @@ -38,9 +37,6 @@ type Client struct { network *NetworkInfoImpl pubsub *pubSubBridge - // Managers - discoveryMgr *discovery.Manager - // State connected bool startTime time.Time @@ -211,20 +207,11 @@ func (c *Client) Connect() error { } } - // Initialize discovery manager (discovery.NewManager accepts a second parameter for compatibility; - // the value is ignored by the new implementation) - c.discoveryMgr = discovery.NewManager(c.host, nil, c.logger) + // Client is a lightweight P2P participant - no discovery needed + // We only connect to known bootstrap peers and let nodes handle discovery + c.logger.Debug("Client configured as lightweight P2P participant (no discovery)") - // Start peer discovery - discoveryConfig := discovery.Config{ - DiscoveryInterval: 5 * time.Second, // More frequent discovery - MaxConnections: 10, // Allow more connections - } - if err := c.discoveryMgr.Start(discoveryConfig); err != nil { - c.logger.Warn("Failed to start peer discovery", zap.Error(err)) - } - - // Start connection monitoring + // Start minimal connection monitoring c.startConnectionMonitoring() c.connected = true @@ -243,11 +230,6 @@ func (c *Client) Disconnect() error { return nil } - // Stop peer discovery - if c.discoveryMgr != nil { - c.discoveryMgr.Stop() - } - // Close pubsub adapter if c.pubsub != nil && c.pubsub.adapter != nil { if err := c.pubsub.adapter.Close(); err != nil { diff --git a/pkg/client/connect_bootstrap.go b/pkg/client/connect_bootstrap.go index 9f7d1f1..7307ad4 100644 --- a/pkg/client/connect_bootstrap.go +++ b/pkg/client/connect_bootstrap.go @@ -19,39 +19,28 @@ func (c *Client) connectToBootstrap(ctx context.Context, addr string) error { // Try to extract peer info if it's a full multiaddr with peer ID peerInfo, err := peer.AddrInfoFromP2pAddr(ma) if err != nil { - // If there's no peer ID, try to discover the peer at this address - return c.connectToAddress(ctx, ma) - } - - // Avoid dialing ourselves: if the bootstrap address resolves to our own peer ID, skip. - c.logger.Debug(string(peerInfo.ID)) - c.logger.Debug(string(c.host.ID())) - if c.host != nil && peerInfo.ID == c.host.ID() { - c.logger.Debug("Skipping bootstrap address because it resolves to self", - zap.String("addr", addr), - zap.String("peer", peerInfo.ID.String())) + // If there's no peer ID, we can't connect + c.logger.Warn("Bootstrap address missing peer ID, skipping", + zap.String("addr", addr)) return nil } + // Avoid dialing ourselves: if the bootstrap address resolves to our own peer ID, skip. + if c.host != nil && peerInfo.ID == c.host.ID() { + c.logger.Debug("Skipping bootstrap address because it resolves to self", + zap.String("addr", addr), + zap.String("peer_id", peerInfo.ID.String())) + return nil + } + + // Attempt connection if err := c.host.Connect(ctx, *peerInfo); err != nil { return fmt.Errorf("failed to connect to peer: %w", err) } c.logger.Debug("Connected to bootstrap peer", - zap.String("peer", peerInfo.ID.String()), + zap.String("peer_id", peerInfo.ID.String()), zap.String("addr", addr)) return nil } - -// connectToAddress attempts to discover and connect to a peer at the given address -func (c *Client) connectToAddress(ctx context.Context, ma multiaddr.Multiaddr) error { - // For the simple case, we'll just warn and continue - // In a production environment, you'd implement proper peer discovery - - c.logger.Warn("No peer ID provided in address, skipping bootstrap connection", - zap.String("addr", ma.String()), - zap.String("suggestion", "Use full multiaddr with peer ID like: /ip4/127.0.0.1/tcp/4001/p2p/")) - - return nil // Don't fail - let the client continue without bootstrap -} diff --git a/pkg/client/defaults.go b/pkg/client/defaults.go index c669064..05b990f 100644 --- a/pkg/client/defaults.go +++ b/pkg/client/defaults.go @@ -1,6 +1,7 @@ package client import ( + "os" "strconv" "strings" @@ -9,45 +10,51 @@ import ( ) // DefaultBootstrapPeers returns the library's default bootstrap peer multiaddrs. +// These can be overridden by environment variables or config. func DefaultBootstrapPeers() []string { - var cfg *config.Config - return cfg.Discovery.BootstrapPeers -} - -// truthy reports if s is a common truthy string. -func truthy(s string) bool { - switch s { - case "1", "true", "TRUE", "True", "yes", "YES", "on", "ON": - return true - default: - return false + // Check environment variable first + if envPeers := os.Getenv("DEBROS_BOOTSTRAP_PEERS"); envPeers != "" { + return splitCSVOrSpace(envPeers) } + + // Return defaults from config package + defaultCfg := config.DefaultConfig() + return defaultCfg.Discovery.BootstrapPeers } -// DefaultDatabaseEndpoints returns default DB HTTP endpoints derived from default bootstrap peers. -// Port defaults to RQLite HTTP 5001, or RQLITE_PORT if set. +// DefaultDatabaseEndpoints returns default DB HTTP endpoints. +// These can be overridden by environment variables or config. func DefaultDatabaseEndpoints() []string { - var cfg *config.Config - peers := DefaultBootstrapPeers() - port := cfg.Database.RQLitePort - if len(peers) == 0 { - return []string{"http://localhost:" + strconv.Itoa(cfg.Database.RQLitePort)} + // Check environment variable first + if envNodes := os.Getenv("RQLITE_NODES"); envNodes != "" { + return normalizeEndpoints(splitCSVOrSpace(envNodes)) } - endpoints := make([]string, 0, len(peers)) - for _, s := range peers { - ma, err := multiaddr.NewMultiaddr(s) - if err != nil { - continue + // Get default port from environment or use port from config + defaultCfg := config.DefaultConfig() + port := defaultCfg.Database.RQLitePort + if envPort := os.Getenv("RQLITE_PORT"); envPort != "" { + if p, err := strconv.Atoi(envPort); err == nil && p > 0 { + port = p } - endpoints = append(endpoints, endpointFromMultiaddr(ma, port)) } - out := dedupeStrings(endpoints) - if len(out) == 0 { - out = []string{"http://localhost:" + strconv.Itoa(port)} + // Try to derive from bootstrap peers if available + peers := DefaultBootstrapPeers() + if len(peers) > 0 { + endpoints := make([]string, 0, len(peers)) + for _, s := range peers { + ma, err := multiaddr.NewMultiaddr(s) + if err != nil { + continue + } + endpoints = append(endpoints, endpointFromMultiaddr(ma, port)) + } + return dedupeStrings(endpoints) } - return out + + // Fallback to localhost + return []string{"http://localhost:" + strconv.Itoa(port)} } // MapAddrsToDBEndpoints converts a set of peer multiaddrs to DB HTTP endpoints using dbPort. @@ -62,8 +69,10 @@ func MapAddrsToDBEndpoints(addrs []multiaddr.Multiaddr, dbPort int) []string { return dedupeStrings(eps) } +// endpointFromMultiaddr extracts host from multiaddr and creates HTTP endpoint func endpointFromMultiaddr(ma multiaddr.Multiaddr, port int) string { var host string + // Prefer DNS if present, then IP if v, err := ma.ValueForProtocol(multiaddr.P_DNS); err == nil && v != "" { host = v @@ -85,28 +94,88 @@ func endpointFromMultiaddr(ma multiaddr.Multiaddr, port int) string { } if host == "" { if v, err := ma.ValueForProtocol(multiaddr.P_IP6); err == nil && v != "" { - host = v + host = "[" + v + "]" // IPv6 needs brackets in URLs } } if host == "" { host = "localhost" } + return "http://" + host + ":" + strconv.Itoa(port) } -func dedupeStrings(in []string) []string { - m := make(map[string]struct{}, len(in)) +// normalizeEndpoints ensures each endpoint has an http scheme and a port (defaults to 5001) +func normalizeEndpoints(in []string) []string { out := make([]string, 0, len(in)) for _, s := range in { s = strings.TrimSpace(s) if s == "" { continue } - if _, ok := m[s]; ok { - continue + + // Prepend scheme if missing + if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") { + s = "http://" + s } - m[s] = struct{}{} + + // Simple check for port (doesn't handle all cases but good enough) + if !strings.Contains(s, ":5001") && !strings.Contains(s, ":500") && !strings.Contains(s, ":501") { + // Check if there's already a port after the host + parts := strings.Split(s, "://") + if len(parts) == 2 { + hostPart := parts[1] + // Count colons to detect port (simple heuristic) + colonCount := strings.Count(hostPart, ":") + if colonCount == 0 || (strings.Contains(hostPart, "[") && colonCount == 1) { + // No port found, add default + s = s + ":5001" + } + } + } + out = append(out, s) } return out } + +// dedupeStrings removes duplicate strings from slice +func dedupeStrings(in []string) []string { + if len(in) == 0 { + return in + } + + seen := make(map[string]struct{}, len(in)) + out := make([]string, 0, len(in)) + + for _, s := range in { + s = strings.TrimSpace(s) + if s == "" { + continue + } + if _, ok := seen[s]; ok { + continue + } + seen[s] = struct{}{} + out = append(out, s) + } + + return out +} + +// splitCSVOrSpace splits a string by commas or spaces +func splitCSVOrSpace(s string) []string { + // Replace commas with spaces, then split on spaces + s = strings.ReplaceAll(s, ",", " ") + fields := strings.Fields(s) + return fields +} + +// truthy reports if s is a common truthy string +func truthy(s string) bool { + switch strings.ToLower(strings.TrimSpace(s)) { + case "1", "true", "yes", "on": + return true + default: + return false + } +} diff --git a/pkg/client/implementations.go b/pkg/client/implementations.go index 2ee5e2b..f6a5763 100644 --- a/pkg/client/implementations.go +++ b/pkg/client/implementations.go @@ -3,7 +3,6 @@ package client import ( "context" "fmt" - "net/url" "strings" "sync" "time" @@ -187,30 +186,7 @@ func (d *DatabaseClientImpl) getRQLiteNodes() []string { return DefaultDatabaseEndpoints() } -// normalizeEndpoints ensures each endpoint has an http scheme and a port (defaults to 5001) -func normalizeEndpoints(in []string) []string { - out := make([]string, 0, len(in)) - for _, s := range in { - s = strings.TrimSpace(s) - if s == "" { - continue - } - // Prepend scheme if missing so url.Parse handles host:port - if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") { - s = "http://" + s - } - u, err := url.Parse(s) - if err != nil || u.Host == "" { - continue - } - // Ensure port present - if h := u.Host; !hasPort(h) { - u.Host = u.Host + ":5001" - } - out = append(out, u.String()) - } - return out -} +// normalizeEndpoints is now imported from defaults.go func hasPort(hostport string) bool { // cheap check for :port suffix (IPv6 with brackets handled by url.Parse earlier) @@ -226,13 +202,6 @@ func hasPort(hostport string) bool { return false } -func splitCSVOrSpace(s string) []string { - // replace commas with spaces, then split on spaces - s = strings.ReplaceAll(s, ",", " ") - fields := strings.Fields(s) - return fields -} - // connectToAvailableNode tries to connect to any available RQLite node func (d *DatabaseClientImpl) connectToAvailableNode() (*gorqlite.Connection, error) { // Get RQLite nodes from environment or use defaults diff --git a/pkg/client/monitoring.go b/pkg/client/monitoring.go index 209e168..f59c4bb 100644 --- a/pkg/client/monitoring.go +++ b/pkg/client/monitoring.go @@ -2,23 +2,65 @@ package client import ( "time" + + "go.uber.org/zap" ) -// startConnectionMonitoring monitors connection health and logs status +// startConnectionMonitoring starts minimal connection monitoring for the lightweight client. +// Unlike nodes which need extensive monitoring, clients only need basic health checks. func (c *Client) startConnectionMonitoring() { go func() { - ticker := time.NewTicker(30 * time.Second) + ticker := time.NewTicker(60 * time.Second) // Less frequent than nodes (60s vs 30s) defer ticker.Stop() - c.logger.Debug("Connection monitoring started") + var lastPeerCount int + firstCheck := true + for range ticker.C { if !c.isConnected() { c.logger.Debug("Connection monitoring stopped: client disconnected") return } - // Only touch network to detect issues; avoid noisy logs - _ = c.host.Network().Peers() + if c.host == nil { + return + } + + // Get current peer count + peers := c.host.Network().Peers() + currentPeerCount := len(peers) + + // Only log if peer count changed or on first check + if firstCheck || currentPeerCount != lastPeerCount { + if currentPeerCount == 0 { + c.logger.Warn("Client has no connected peers", + zap.String("client_id", c.host.ID().String())) + } else if currentPeerCount < lastPeerCount { + c.logger.Info("Client lost peers", + zap.Int("current_peers", currentPeerCount), + zap.Int("previous_peers", lastPeerCount)) + } else if currentPeerCount > lastPeerCount && !firstCheck { + c.logger.Debug("Client gained peers", + zap.Int("current_peers", currentPeerCount), + zap.Int("previous_peers", lastPeerCount)) + } + + lastPeerCount = currentPeerCount + firstCheck = false + } + + // Log detailed peer info at debug level occasionally (every 5 minutes) + if time.Now().Unix()%300 == 0 && currentPeerCount > 0 { + peerIDs := make([]string, 0, currentPeerCount) + for _, p := range peers { + peerIDs = append(peerIDs, p.String()) + } + c.logger.Debug("Client peer status", + zap.Int("peer_count", currentPeerCount), + zap.Strings("peer_ids", peerIDs)) + } } }() + + c.logger.Debug("Lightweight connection monitoring started") }