From 6301ed9182b9821bd9cd67d74a4e3413dc3587ca Mon Sep 17 00:00:00 2001 From: anonpenguin Date: Sat, 9 Aug 2025 12:00:35 +0300 Subject: [PATCH] refactor: split network client code into focused modules and extract config mapping The changes reorganize the network client code by splitting it into focused modules for better maintainability, including --- Makefile | 2 +- cmd/node/configmap.go | 134 ++++++++++++ cmd/node/main.go | 115 +--------- pkg/client/anchat_connectivity.go | 121 ++++++++++ pkg/client/client.go | 340 +---------------------------- pkg/client/connect_bootstrap.go | 48 ++++ pkg/client/discovery_aggressive.go | 42 ++++ pkg/client/mdns_discovery.go | 38 ++++ pkg/client/monitoring.go | 20 ++ pkg/client/pubsub_bridge.go | 32 +++ 10 files changed, 450 insertions(+), 442 deletions(-) create mode 100644 cmd/node/configmap.go create mode 100644 pkg/client/anchat_connectivity.go create mode 100644 pkg/client/connect_bootstrap.go create mode 100644 pkg/client/discovery_aggressive.go create mode 100644 pkg/client/mdns_discovery.go create mode 100644 pkg/client/monitoring.go create mode 100644 pkg/client/pubsub_bridge.go diff --git a/Makefile b/Makefile index ca428c9..294a9d8 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ build: deps @echo "Building network executables..." @mkdir -p bin - go build -o bin/node cmd/node/main.go + go build -o bin/node ./cmd/node go build -o bin/network-cli cmd/cli/main.go @echo "Build complete!" diff --git a/cmd/node/configmap.go b/cmd/node/configmap.go new file mode 100644 index 0000000..2f2b454 --- /dev/null +++ b/cmd/node/configmap.go @@ -0,0 +1,134 @@ +package main + +import ( + "fmt" + "strings" + + "git.debros.io/DeBros/network/pkg/config" + "git.debros.io/DeBros/network/pkg/constants" + "git.debros.io/DeBros/network/pkg/logging" +) + +// NodeFlagValues holds parsed CLI flag values in a structured form. +type NodeFlagValues struct { + DataDir string + NodeID string + Bootstrap string + Role string + P2PPort int + RqlHTTP int + RqlRaft int + Advertise string +} + +// MapFlagsAndEnvToConfig applies environment overrides and CLI flags to cfg. +// Precedence: flags > env > defaults. Behavior mirrors previous inline logic in main.go. +// Returns the derived RQLite Raft join address for non-bootstrap nodes (empty for bootstrap nodes). +func MapFlagsAndEnvToConfig(cfg *config.Config, fv NodeFlagValues, isBootstrap bool, logger *logging.StandardLogger) string { + // Apply environment variable overrides first so that flags can override them after + config.ApplyEnvOverrides(cfg) + + // Determine data directory if not provided + if fv.DataDir == "" { + if isBootstrap { + fv.DataDir = "./data/bootstrap" + } else { + if fv.NodeID != "" { + fv.DataDir = fmt.Sprintf("./data/node-%s", fv.NodeID) + } else { + fv.DataDir = "./data/node" + } + } + } + + // Node basics + cfg.Node.DataDir = fv.DataDir + cfg.Node.ListenAddresses = []string{ + fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", fv.P2PPort), + fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", fv.P2PPort), + } + + // Database port settings + cfg.Database.RQLitePort = fv.RqlHTTP + cfg.Database.RQLiteRaftPort = fv.RqlRaft + cfg.Database.AdvertiseMode = strings.ToLower(fv.Advertise) + logger.Printf("RQLite advertise mode: %s", cfg.Database.AdvertiseMode) + + // Bootstrap-specific vs regular-node logic + if isBootstrap { + bootstrapPeers := constants.GetBootstrapPeers() + isSecondaryBootstrap := false + if len(bootstrapPeers) > 1 { + for i := 1; i < len(bootstrapPeers); i++ { + host := parseHostFromMultiaddr(bootstrapPeers[i]) + if host != "" && isLocalIP(host) { + isSecondaryBootstrap = true + break + } + } + } + + if isSecondaryBootstrap { + primaryBootstrapHost := parseHostFromMultiaddr(bootstrapPeers[0]) + cfg.Database.RQLiteJoinAddress = fmt.Sprintf("%s:%d", primaryBootstrapHost, 7001) + logger.Printf("Secondary bootstrap node - joining primary bootstrap (raft) at: %s", cfg.Database.RQLiteJoinAddress) + } else { + cfg.Database.RQLiteJoinAddress = "" + logger.Printf("Primary bootstrap node - starting new RQLite cluster") + } + + return "" + } + + // Regular node: compute bootstrap peers and join address + var rqliteJoinAddr string + if fv.Bootstrap != "" { + cfg.Discovery.BootstrapPeers = []string{fv.Bootstrap} + bootstrapHost := parseHostFromMultiaddr(fv.Bootstrap) + if bootstrapHost != "" { + if (bootstrapHost == "127.0.0.1" || strings.EqualFold(bootstrapHost, "localhost")) && cfg.Database.AdvertiseMode != "localhost" { + if extIP, err := getPreferredLocalIP(); err == nil && extIP != "" { + logger.Printf("Translating localhost bootstrap to external IP %s for RQLite join", extIP) + bootstrapHost = extIP + } else { + logger.Printf("Warning: Failed to resolve external IP, keeping localhost for RQLite join") + } + } + rqliteJoinAddr = fmt.Sprintf("%s:%d", bootstrapHost, 7001) + logger.Printf("Using extracted bootstrap host %s for RQLite Raft join (port 7001)", bootstrapHost) + } else { + logger.Printf("Warning: Could not extract host from bootstrap peer %s, using localhost fallback", fv.Bootstrap) + rqliteJoinAddr = fmt.Sprintf("localhost:%d", 7001) + } + logger.Printf("Using command line bootstrap peer: %s", fv.Bootstrap) + } else { + bootstrapPeers := cfg.Discovery.BootstrapPeers + if len(bootstrapPeers) == 0 { + bootstrapPeers = constants.GetBootstrapPeers() + } + if len(bootstrapPeers) > 0 { + cfg.Discovery.BootstrapPeers = bootstrapPeers + bootstrapHost := parseHostFromMultiaddr(bootstrapPeers[0]) + if bootstrapHost != "" { + rqliteJoinAddr = fmt.Sprintf("%s:%d", bootstrapHost, 7001) + logger.Printf("Using extracted bootstrap host %s for RQLite Raft join", bootstrapHost) + } else { + logger.Printf("Warning: Could not extract host from bootstrap peer %s", bootstrapPeers[0]) + rqliteJoinAddr = "localhost:7001" + } + logger.Printf("Using environment bootstrap peers: %v", bootstrapPeers) + } else { + logger.Printf("Warning: No bootstrap peers configured") + rqliteJoinAddr = "localhost:7001" + logger.Printf("Using localhost fallback for RQLite Raft join") + } + + logger.Printf("=== NETWORK DIAGNOSTICS ===") + logger.Printf("Target RQLite Raft join address: %s", rqliteJoinAddr) + runNetworkDiagnostics(rqliteJoinAddr, logger) + } + + cfg.Database.RQLiteJoinAddress = rqliteJoinAddr + logger.Printf("Regular node - joining RQLite cluster (raft) at: %s", cfg.Database.RQLiteJoinAddress) + return rqliteJoinAddr +} diff --git a/cmd/node/main.go b/cmd/node/main.go index 46faa39..e5bd9f4 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -88,110 +88,17 @@ func main() { logger.Printf("Starting regular node...") } - // Apply environment variable overrides before applying CLI flags so that - // precedence is: flags > env > defaults - config.ApplyEnvOverrides(cfg) - - // Set basic configuration - cfg.Node.DataDir = *dataDir - cfg.Node.ListenAddresses = []string{ - fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port), - fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", port), - } - - // RQLite ports (overridable for local multi-node on one host) - cfg.Database.RQLitePort = *rqlHTTP - cfg.Database.RQLiteRaftPort = *rqlRaft - cfg.Database.AdvertiseMode = strings.ToLower(*advertise) - logger.Printf("RQLite advertise mode: %s", cfg.Database.AdvertiseMode) - - if isBootstrap { - // Check if this is the primary bootstrap node (first in list) or secondary - bootstrapPeers := constants.GetBootstrapPeers() - isSecondaryBootstrap := false - if len(bootstrapPeers) > 1 { - // Check if this machine matches any bootstrap peer other than the first - for i := 1; i < len(bootstrapPeers); i++ { - host := parseHostFromMultiaddr(bootstrapPeers[i]) - if host != "" && isLocalIP(host) { - isSecondaryBootstrap = true - break - } - } - } - - if isSecondaryBootstrap { - // Secondary bootstrap nodes join the primary bootstrap Raft address (standardized to 7001) - primaryBootstrapHost := parseHostFromMultiaddr(bootstrapPeers[0]) - cfg.Database.RQLiteJoinAddress = fmt.Sprintf("%s:%d", primaryBootstrapHost, 7001) - logger.Printf("Secondary bootstrap node - joining primary bootstrap (raft) at: %s", cfg.Database.RQLiteJoinAddress) - } else { - // Primary bootstrap node doesn't join anyone - it starts the cluster - cfg.Database.RQLiteJoinAddress = "" - logger.Printf("Primary bootstrap node - starting new RQLite cluster") - } - } else { - // Configure bootstrap peers for P2P discovery - var rqliteJoinAddr string // host:port for Raft join - if *bootstrap != "" { - // Use command line bootstrap if provided - cfg.Discovery.BootstrapPeers = []string{*bootstrap} - // Extract IP from bootstrap peer for RQLite - bootstrapHost := parseHostFromMultiaddr(*bootstrap) - if bootstrapHost != "" { - // Only translate localhost to external IP when not explicitly in localhost advertise mode - if (bootstrapHost == "127.0.0.1" || strings.EqualFold(bootstrapHost, "localhost")) && cfg.Database.AdvertiseMode != "localhost" { - if extIP, err := getPreferredLocalIP(); err == nil && extIP != "" { - logger.Printf("Translating localhost bootstrap to external IP %s for RQLite join", extIP) - bootstrapHost = extIP - } else { - logger.Printf("Warning: Failed to resolve external IP, keeping localhost for RQLite join") - } - } - // Regular nodes should join the bootstrap's RQLite Raft port (standardized to 7001) - rqliteJoinAddr = fmt.Sprintf("%s:%d", bootstrapHost, 7001) - logger.Printf("Using extracted bootstrap host %s for RQLite Raft join (port 7001)", bootstrapHost) - } else { - logger.Printf("Warning: Could not extract host from bootstrap peer %s, using localhost fallback", *bootstrap) - rqliteJoinAddr = fmt.Sprintf("localhost:%d", 7001) // Use localhost raft fallback instead - } - logger.Printf("Using command line bootstrap peer: %s", *bootstrap) - } else { - // Use environment-configured bootstrap peers if provided; otherwise fallback to constants - bootstrapPeers := cfg.Discovery.BootstrapPeers - if len(bootstrapPeers) == 0 { - bootstrapPeers = constants.GetBootstrapPeers() - } - if len(bootstrapPeers) > 0 { - cfg.Discovery.BootstrapPeers = bootstrapPeers - // Use the first bootstrap peer for RQLite join - bootstrapHost := parseHostFromMultiaddr(bootstrapPeers[0]) - if bootstrapHost != "" { - rqliteJoinAddr = fmt.Sprintf("%s:%d", bootstrapHost, 7001) - logger.Printf("Using extracted bootstrap host %s for RQLite Raft join", bootstrapHost) - } else { - logger.Printf("Warning: Could not extract host from bootstrap peer %s", bootstrapPeers[0]) - // Try primary production server as fallback - rqliteJoinAddr = "localhost:7001" - } - logger.Printf("Using environment bootstrap peers: %v", bootstrapPeers) - } else { - logger.Printf("Warning: No bootstrap peers configured") - // Default to localhost when no peers configured - rqliteJoinAddr = "localhost:7001" - logger.Printf("Using localhost fallback for RQLite Raft join") - } - - // Log network connectivity diagnostics - logger.Printf("=== NETWORK DIAGNOSTICS ===") - logger.Printf("Target RQLite Raft join address: %s", rqliteJoinAddr) - runNetworkDiagnostics(rqliteJoinAddr, logger) - } - - // Regular nodes join the bootstrap node's RQLite cluster - cfg.Database.RQLiteJoinAddress = rqliteJoinAddr - logger.Printf("Regular node - joining RQLite cluster (raft) at: %s", cfg.Database.RQLiteJoinAddress) - } + // Centralized mapping from flags/env to config (flags > env > defaults) + _ = MapFlagsAndEnvToConfig(cfg, NodeFlagValues{ + DataDir: *dataDir, + NodeID: *nodeID, + Bootstrap: *bootstrap, + Role: *role, + P2PPort: port, + RqlHTTP: *rqlHTTP, + RqlRaft: *rqlRaft, + Advertise: *advertise, + }, isBootstrap, logger) logger.Printf("Data directory: %s", cfg.Node.DataDir) logger.Printf("Listen addresses: %v", cfg.Node.ListenAddresses) diff --git a/pkg/client/anchat_connectivity.go b/pkg/client/anchat_connectivity.go new file mode 100644 index 0000000..cd5658c --- /dev/null +++ b/pkg/client/anchat_connectivity.go @@ -0,0 +1,121 @@ +package client + +import ( + "context" + "time" + + "go.uber.org/zap" +) + +// ensureAnchatPeerConnectivity ensures Anchat clients can discover each other through bootstrap +func (c *Client) ensureAnchatPeerConnectivity() { + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for i := 0; i < 30; i++ { // Run for ~1 minute + <-ticker.C + + if !c.isConnected() { + return + } + + connectedPeers := c.host.Network().Peers() + + if c.dht != nil { + // Try to find peers through DHT routing table + routingPeers := c.dht.RoutingTable().ListPeers() + + for _, peerID := range routingPeers { + if peerID == c.host.ID() { + continue + } + + // Check already connected + alreadyConnected := false + for _, p := range connectedPeers { + if p == peerID { + alreadyConnected = true + break + } + } + + if !alreadyConnected { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + peerInfo := c.host.Peerstore().PeerInfo(peerID) + + if len(peerInfo.Addrs) == 0 { + if found, err := c.dht.FindPeer(ctx, peerID); err == nil { + peerInfo = found + c.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, time.Hour*24) + } + } + + if len(peerInfo.Addrs) > 0 { + if err := c.host.Connect(ctx, peerInfo); err == nil { + c.logger.Info("Anchat discovered and connected to peer", + zap.String("peer", peerID.String()[:8]+"...")) + + if added, err := c.dht.RoutingTable().TryAddPeer(peerID, true, true); err == nil && added { + c.logger.Debug("Added new peer to DHT routing table", + zap.String("peer", peerID.String()[:8]+"...")) + } + + if c.libp2pPS != nil { + time.Sleep(100 * time.Millisecond) + _ = c.libp2pPS.ListPeers("") + } + } else { + c.logger.Debug("Failed to connect to discovered peer", + zap.String("peer", peerID.String()[:8]+"..."), + zap.Error(err)) + } + } + cancel() + } + } + + if len(routingPeers) == 0 { + for _, id := range connectedPeers { + if id != c.host.ID() { + if added, err := c.dht.RoutingTable().TryAddPeer(id, true, true); err == nil && added { + c.logger.Info("Force-added connected peer to DHT routing table", + zap.String("peer", id.String()[:8]+"...")) + } + } + } + c.dht.RefreshRoutingTable() + } + } + + // Reconnect to known peers not currently connected + allKnownPeers := c.host.Peerstore().Peers() + for _, id := range allKnownPeers { + if id == c.host.ID() { + continue + } + already := false + for _, p := range connectedPeers { + if p == id { already = true; break } + } + if !already { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + pi := c.host.Peerstore().PeerInfo(id) + if len(pi.Addrs) > 0 { + if err := c.host.Connect(ctx, pi); err == nil { + c.logger.Info("Anchat reconnected to known peer", + zap.String("peer", id.String()[:8]+"...")) + if c.libp2pPS != nil { time.Sleep(100 * time.Millisecond); _ = c.libp2pPS.ListPeers("") } + } + } + cancel() + } + } + + if i%5 == 0 && len(connectedPeers) > 0 { + c.logger.Info("Anchat peer discovery progress", + zap.Int("iteration", i+1), + zap.Int("connected_peers", len(connectedPeers)), + zap.Int("known_peers", len(allKnownPeers))) + } + } +} diff --git a/pkg/client/client.go b/pkg/client/client.go index cba6930..4e877ff 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -17,7 +17,6 @@ import ( dht "github.com/libp2p/go-libp2p-kad-dht" libp2ppubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p/p2p/discovery/mdns" "git.debros.io/DeBros/network/pkg/discovery" "git.debros.io/DeBros/network/pkg/pubsub" @@ -49,31 +48,6 @@ type Client struct { mu sync.RWMutex } -// pubSubBridge bridges between our PubSubClient interface and the pubsub package -type pubSubBridge struct { - adapter *pubsub.ClientAdapter -} - -func (p *pubSubBridge) Subscribe(ctx context.Context, topic string, handler MessageHandler) error { - // Convert our MessageHandler to the pubsub package MessageHandler - pubsubHandler := func(topic string, data []byte) error { - return handler(topic, data) - } - return p.adapter.Subscribe(ctx, topic, pubsubHandler) -} - -func (p *pubSubBridge) Publish(ctx context.Context, topic string, data []byte) error { - return p.adapter.Publish(ctx, topic, data) -} - -func (p *pubSubBridge) Unsubscribe(ctx context.Context, topic string) error { - return p.adapter.Unsubscribe(ctx, topic) -} - -func (p *pubSubBridge) ListTopics(ctx context.Context) ([]string, error) { - return p.adapter.ListTopics(ctx) -} - // NewClient creates a new network client func NewClient(config *ClientConfig) (NetworkClient, error) { if config == nil { @@ -237,15 +211,8 @@ func (c *Client) Connect() error { c.logger.Warn("Failed to start peer discovery", zap.Error(err)) } - // For Anchat clients, ensure we connect to all other clients through bootstrap - if c.config.AppName == "anchat" { - // Start mDNS discovery for local network peer discovery - go c.startMDNSDiscovery() - go c.ensureAnchatPeerConnectivity() - } else { - // Start aggressive peer discovery for other apps - go c.startAggressivePeerDiscovery() - } + // Start generic aggressive peer discovery for all apps + go c.startAggressivePeerDiscovery() // Start connection monitoring c.startConnectionMonitoring() @@ -255,43 +222,7 @@ func (c *Client) Connect() error { return nil } -// connectToBootstrap connects to a bootstrap peer -func (c *Client) connectToBootstrap(ctx context.Context, addr string) error { - ma, err := multiaddr.NewMultiaddr(addr) - if err != nil { - return fmt.Errorf("invalid multiaddr: %w", err) - } - - // Try to extract peer info if it's a full multiaddr with peer ID - peerInfo, err := peer.AddrInfoFromP2pAddr(ma) - if err != nil { - // If there's no peer ID, try to discover the peer at this address - return c.connectToAddress(ctx, ma) - } - - if err := c.host.Connect(ctx, *peerInfo); err != nil { - return fmt.Errorf("failed to connect to peer: %w", err) - } - - c.logger.Debug("Connected to bootstrap peer", - zap.String("peer", peerInfo.ID.String()), - zap.String("addr", addr)) - - return nil -} - -// connectToAddress attempts to discover and connect to a peer at the given address -func (c *Client) connectToAddress(ctx context.Context, ma multiaddr.Multiaddr) error { - // For the simple case, we'll just warn and continue - // In a production environment, you'd implement proper peer discovery - // using mDNS, DHT, or other mechanisms - - c.logger.Warn("No peer ID provided in address, skipping bootstrap connection", - zap.String("addr", ma.String()), - zap.String("suggestion", "Use full multiaddr with peer ID like: /ip4/127.0.0.1/tcp/4001/p2p/")) - - return nil // Don't fail - let the client continue without bootstrap -} // Disconnect closes the connection to the network +// Disconnect closes the connection to the network func (c *Client) Disconnect() error { c.mu.Lock() defer c.mu.Unlock() @@ -372,268 +303,3 @@ func (c *Client) isConnected() bool { func (c *Client) getAppNamespace() string { return c.config.AppName } - -// startConnectionMonitoring monitors connection health and logs status -func (c *Client) startConnectionMonitoring() { - go func() { - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - for range ticker.C { - if !c.isConnected() { - return - } - - // Remove connection status logging for cleaner output - // connectedPeers := c.host.Network().Peers() - // Only log if there are connection issues - _ = c.host.Network().Peers() - } - }() -} - -// ensureAnchatPeerConnectivity ensures Anchat clients can discover each other through bootstrap -func (c *Client) ensureAnchatPeerConnectivity() { - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() - - for i := 0; i < 30; i++ { // Run for 1 minute - <-ticker.C - - if !c.isConnected() { - return - } - - // Get current connected peers - connectedPeers := c.host.Network().Peers() - - // For Anchat, we need to be very aggressive about finding other clients - // The key insight: we need to ask our connected peers (like bootstrap) for their peers - - if c.dht != nil { - // Try to find peers through DHT routing table - routingPeers := c.dht.RoutingTable().ListPeers() - - for _, peerID := range routingPeers { - if peerID == c.host.ID() { - continue - } - - // Check if we're already connected to this peer - alreadyConnected := false - for _, alreadyConnectedPeer := range connectedPeers { - if alreadyConnectedPeer == peerID { - alreadyConnected = true - break - } - } - - if !alreadyConnected { - // Try to connect to this peer - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - peerInfo := c.host.Peerstore().PeerInfo(peerID) - - // If we don't have addresses, try to find them through the DHT - if len(peerInfo.Addrs) == 0 { - if foundPeerInfo, err := c.dht.FindPeer(ctx, peerID); err == nil { - peerInfo = foundPeerInfo - // Add to peerstore for future use - c.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, time.Hour*24) - } - } - - if len(peerInfo.Addrs) > 0 { - err := c.host.Connect(ctx, peerInfo) - if err == nil { - c.logger.Info("Anchat discovered and connected to peer", - zap.String("peer", peerID.String()[:8]+"...")) - - // Add newly connected peer to DHT routing table - if added, addErr := c.dht.RoutingTable().TryAddPeer(peerID, true, true); addErr == nil && added { - c.logger.Debug("Added new peer to DHT routing table", - zap.String("peer", peerID.String()[:8]+"...")) - } - - // Force pubsub to recognize the new peer and form mesh connections - if c.libp2pPS != nil { - // Wait a moment for connection to stabilize - time.Sleep(100 * time.Millisecond) - // List peers to trigger mesh formation - _ = c.libp2pPS.ListPeers("") - } - } else { - c.logger.Debug("Failed to connect to discovered peer", - zap.String("peer", peerID.String()[:8]+"..."), - zap.Error(err)) - } - } - cancel() - } - } - - // If DHT routing table is still empty, try to force populate it - if len(routingPeers) == 0 { - // Try to add all connected peers to DHT routing table - for _, connectedPeerID := range connectedPeers { - if connectedPeerID != c.host.ID() { - if added, err := c.dht.RoutingTable().TryAddPeer(connectedPeerID, true, true); err == nil && added { - c.logger.Info("Force-added connected peer to DHT routing table", - zap.String("peer", connectedPeerID.String()[:8]+"...")) - } - } - } - - // Force DHT refresh - c.dht.RefreshRoutingTable() - } - } - - // Also try to connect to any peers we might have in our peerstore but aren't connected to - allKnownPeers := c.host.Peerstore().Peers() - for _, knownPeerID := range allKnownPeers { - if knownPeerID == c.host.ID() { - continue - } - - // Check if we're already connected - alreadyConnected := false - for _, connectedPeer := range connectedPeers { - if connectedPeer == knownPeerID { - alreadyConnected = true - break - } - } - - if !alreadyConnected { - // Try to connect to this known peer - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - peerInfo := c.host.Peerstore().PeerInfo(knownPeerID) - if len(peerInfo.Addrs) > 0 { - err := c.host.Connect(ctx, peerInfo) - if err == nil { - c.logger.Info("Anchat reconnected to known peer", - zap.String("peer", knownPeerID.String()[:8]+"...")) - - // Force pubsub mesh formation - if c.libp2pPS != nil { - time.Sleep(100 * time.Millisecond) - _ = c.libp2pPS.ListPeers("") - } - } - } - cancel() - } - } - - // Log status every 5 iterations (10 seconds) - if i%5 == 0 && len(connectedPeers) > 0 { - c.logger.Info("Anchat peer discovery progress", - zap.Int("iteration", i+1), - zap.Int("connected_peers", len(connectedPeers)), - zap.Int("known_peers", len(allKnownPeers))) - } - } -} // startAggressivePeerDiscovery implements aggressive peer discovery for non-Anchat apps -func (c *Client) startAggressivePeerDiscovery() { - ticker := time.NewTicker(3 * time.Second) - defer ticker.Stop() - - for i := 0; i < 20; i++ { // Run for 1 minute - <-ticker.C - - if !c.isConnected() { - return - } - - // Get current connected peers - connectedPeers := c.host.Network().Peers() - - // Try to discover more peers through the DHT - if c.dht != nil { - // Get peers from the DHT routing table - routingPeers := c.dht.RoutingTable().ListPeers() - - for _, peerID := range routingPeers { - if peerID == c.host.ID() { - continue - } - - // Check if we're already connected - alreadyConnected := false - for _, connectedPeer := range connectedPeers { - if connectedPeer == peerID { - alreadyConnected = true - break - } - } - - if !alreadyConnected { - // Try to connect - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - peerInfo := c.host.Peerstore().PeerInfo(peerID) - if len(peerInfo.Addrs) > 0 { - err := c.host.Connect(ctx, peerInfo) - if err == nil { - c.logger.Debug("Connected to discovered peer", - zap.String("peer", peerID.String()[:8]+"...")) - } - } - cancel() - } - } - } - - // Log current status every 10 iterations (30 seconds) - if i%10 == 0 { - c.logger.Debug("Peer discovery status", - zap.Int("iteration", i+1), - zap.Int("connected_peers", len(connectedPeers))) - } - } -} - -// startMDNSDiscovery enables mDNS peer discovery for local network -func (c *Client) startMDNSDiscovery() { - // Setup mDNS discovery service for Anchat - mdnsService := mdns.NewMdnsService(c.host, "anchat-p2p", &discoveryNotifee{ - client: c, - logger: c.logger, - }) - - if err := mdnsService.Start(); err != nil { - c.logger.Warn("Failed to start mDNS discovery", zap.Error(err)) - return - } - - c.logger.Info("Started mDNS discovery for Anchat") -} - -// discoveryNotifee handles mDNS peer discovery notifications -type discoveryNotifee struct { - client *Client - logger *zap.Logger -} - -func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) { - n.logger.Info("mDNS discovered Anchat peer", - zap.String("peer", pi.ID.String()[:8]+"..."), - zap.Int("addrs", len(pi.Addrs))) - - // Connect to the discovered peer - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - if err := n.client.host.Connect(ctx, pi); err != nil { - n.logger.Debug("Failed to connect to mDNS discovered peer", - zap.String("peer", pi.ID.String()[:8]+"..."), - zap.Error(err)) - } else { - n.logger.Info("Successfully connected to mDNS discovered peer", - zap.String("peer", pi.ID.String()[:8]+"...")) - - // Force pubsub to recognize the new peer - if n.client.libp2pPS != nil { - _ = n.client.libp2pPS.ListPeers("") - } - } -} diff --git a/pkg/client/connect_bootstrap.go b/pkg/client/connect_bootstrap.go new file mode 100644 index 0000000..aae635d --- /dev/null +++ b/pkg/client/connect_bootstrap.go @@ -0,0 +1,48 @@ +package client + +import ( + "context" + "fmt" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "go.uber.org/zap" +) + +// connectToBootstrap connects to a bootstrap peer +func (c *Client) connectToBootstrap(ctx context.Context, addr string) error { + ma, err := multiaddr.NewMultiaddr(addr) + if err != nil { + return fmt.Errorf("invalid multiaddr: %w", err) + } + + // Try to extract peer info if it's a full multiaddr with peer ID + peerInfo, err := peer.AddrInfoFromP2pAddr(ma) + if err != nil { + // If there's no peer ID, try to discover the peer at this address + return c.connectToAddress(ctx, ma) + } + + if err := c.host.Connect(ctx, *peerInfo); err != nil { + return fmt.Errorf("failed to connect to peer: %w", err) + } + + c.logger.Debug("Connected to bootstrap peer", + zap.String("peer", peerInfo.ID.String()), + zap.String("addr", addr)) + + return nil +} + +// connectToAddress attempts to discover and connect to a peer at the given address +func (c *Client) connectToAddress(ctx context.Context, ma multiaddr.Multiaddr) error { + // For the simple case, we'll just warn and continue + // In a production environment, you'd implement proper peer discovery + // using mDNS, DHT, or other mechanisms + + c.logger.Warn("No peer ID provided in address, skipping bootstrap connection", + zap.String("addr", ma.String()), + zap.String("suggestion", "Use full multiaddr with peer ID like: /ip4/127.0.0.1/tcp/4001/p2p/")) + + return nil // Don't fail - let the client continue without bootstrap +} diff --git a/pkg/client/discovery_aggressive.go b/pkg/client/discovery_aggressive.go new file mode 100644 index 0000000..23648b6 --- /dev/null +++ b/pkg/client/discovery_aggressive.go @@ -0,0 +1,42 @@ +package client + +import ( + "context" + "time" + + "go.uber.org/zap" +) + +// startAggressivePeerDiscovery implements aggressive peer discovery for non-Anchat apps +func (c *Client) startAggressivePeerDiscovery() { + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + + for i := 0; i < 20; i++ { // ~1 minute + <-ticker.C + if !c.isConnected() { return } + + connectedPeers := c.host.Network().Peers() + if c.dht != nil { + routingPeers := c.dht.RoutingTable().ListPeers() + for _, pid := range routingPeers { + if pid == c.host.ID() { continue } + already := false + for _, cp := range connectedPeers { if cp == pid { already = true; break } } + if !already { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + pi := c.host.Peerstore().PeerInfo(pid) + if len(pi.Addrs) > 0 { + if err := c.host.Connect(ctx, pi); err == nil { + c.logger.Debug("Connected to discovered peer", zap.String("peer", pid.String()[:8]+"...")) + } + } + cancel() + } + } + } + if i%10 == 0 { + c.logger.Debug("Peer discovery status", zap.Int("iteration", i+1), zap.Int("connected_peers", len(connectedPeers))) + } + } +} diff --git a/pkg/client/mdns_discovery.go b/pkg/client/mdns_discovery.go new file mode 100644 index 0000000..bf164f5 --- /dev/null +++ b/pkg/client/mdns_discovery.go @@ -0,0 +1,38 @@ +package client + +import ( + "context" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/discovery/mdns" + "go.uber.org/zap" +) + +// startMDNSDiscovery enables mDNS peer discovery for local network +func (c *Client) startMDNSDiscovery() { + mdnsService := mdns.NewMdnsService(c.host, "anchat-p2p", &discoveryNotifee{ client: c, logger: c.logger }) + if err := mdnsService.Start(); err != nil { + c.logger.Warn("Failed to start mDNS discovery", zap.Error(err)) + return + } + c.logger.Info("Started mDNS discovery for Anchat") +} + +// discoveryNotifee handles mDNS peer discovery notifications +type discoveryNotifee struct { + client *Client + logger *zap.Logger +} + +func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) { + n.logger.Info("mDNS discovered Anchat peer", zap.String("peer", pi.ID.String()[:8]+"..."), zap.Int("addrs", len(pi.Addrs))) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := n.client.host.Connect(ctx, pi); err != nil { + n.logger.Debug("Failed to connect to mDNS discovered peer", zap.String("peer", pi.ID.String()[:8]+"..."), zap.Error(err)) + } else { + n.logger.Info("Successfully connected to mDNS discovered peer", zap.String("peer", pi.ID.String()[:8]+"...")) + if n.client.libp2pPS != nil { _ = n.client.libp2pPS.ListPeers("") } + } +} diff --git a/pkg/client/monitoring.go b/pkg/client/monitoring.go new file mode 100644 index 0000000..46f749b --- /dev/null +++ b/pkg/client/monitoring.go @@ -0,0 +1,20 @@ +package client + +import "time" + +// startConnectionMonitoring monitors connection health and logs status +func (c *Client) startConnectionMonitoring() { + go func() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for range ticker.C { + if !c.isConnected() { + return + } + + // Only touch network to detect issues; avoid noisy logs + _ = c.host.Network().Peers() + } + }() +} diff --git a/pkg/client/pubsub_bridge.go b/pkg/client/pubsub_bridge.go new file mode 100644 index 0000000..095ddaa --- /dev/null +++ b/pkg/client/pubsub_bridge.go @@ -0,0 +1,32 @@ +package client + +import ( + "context" + + "git.debros.io/DeBros/network/pkg/pubsub" +) + +// pubSubBridge bridges between our PubSubClient interface and the pubsub package +type pubSubBridge struct { + adapter *pubsub.ClientAdapter +} + +func (p *pubSubBridge) Subscribe(ctx context.Context, topic string, handler MessageHandler) error { + // Convert our MessageHandler to the pubsub package MessageHandler + pubsubHandler := func(topic string, data []byte) error { + return handler(topic, data) + } + return p.adapter.Subscribe(ctx, topic, pubsubHandler) +} + +func (p *pubSubBridge) Publish(ctx context.Context, topic string, data []byte) error { + return p.adapter.Publish(ctx, topic, data) +} + +func (p *pubSubBridge) Unsubscribe(ctx context.Context, topic string) error { + return p.adapter.Unsubscribe(ctx, topic) +} + +func (p *pubSubBridge) ListTopics(ctx context.Context) ([]string, error) { + return p.adapter.ListTopics(ctx) +}