refactor: split network client code into focused modules and extract config mapping

The changes reorganize the network client code by splitting it into focused modules for better maintainability, including
This commit is contained in:
anonpenguin 2025-08-09 12:00:35 +03:00
parent e037773ece
commit 6301ed9182
10 changed files with 450 additions and 442 deletions

View File

@ -7,7 +7,7 @@
build: deps build: deps
@echo "Building network executables..." @echo "Building network executables..."
@mkdir -p bin @mkdir -p bin
go build -o bin/node cmd/node/main.go go build -o bin/node ./cmd/node
go build -o bin/network-cli cmd/cli/main.go go build -o bin/network-cli cmd/cli/main.go
@echo "Build complete!" @echo "Build complete!"

134
cmd/node/configmap.go Normal file
View File

@ -0,0 +1,134 @@
package main
import (
"fmt"
"strings"
"git.debros.io/DeBros/network/pkg/config"
"git.debros.io/DeBros/network/pkg/constants"
"git.debros.io/DeBros/network/pkg/logging"
)
// NodeFlagValues holds parsed CLI flag values in a structured form.
type NodeFlagValues struct {
DataDir string
NodeID string
Bootstrap string
Role string
P2PPort int
RqlHTTP int
RqlRaft int
Advertise string
}
// MapFlagsAndEnvToConfig applies environment overrides and CLI flags to cfg.
// Precedence: flags > env > defaults. Behavior mirrors previous inline logic in main.go.
// Returns the derived RQLite Raft join address for non-bootstrap nodes (empty for bootstrap nodes).
func MapFlagsAndEnvToConfig(cfg *config.Config, fv NodeFlagValues, isBootstrap bool, logger *logging.StandardLogger) string {
// Apply environment variable overrides first so that flags can override them after
config.ApplyEnvOverrides(cfg)
// Determine data directory if not provided
if fv.DataDir == "" {
if isBootstrap {
fv.DataDir = "./data/bootstrap"
} else {
if fv.NodeID != "" {
fv.DataDir = fmt.Sprintf("./data/node-%s", fv.NodeID)
} else {
fv.DataDir = "./data/node"
}
}
}
// Node basics
cfg.Node.DataDir = fv.DataDir
cfg.Node.ListenAddresses = []string{
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", fv.P2PPort),
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", fv.P2PPort),
}
// Database port settings
cfg.Database.RQLitePort = fv.RqlHTTP
cfg.Database.RQLiteRaftPort = fv.RqlRaft
cfg.Database.AdvertiseMode = strings.ToLower(fv.Advertise)
logger.Printf("RQLite advertise mode: %s", cfg.Database.AdvertiseMode)
// Bootstrap-specific vs regular-node logic
if isBootstrap {
bootstrapPeers := constants.GetBootstrapPeers()
isSecondaryBootstrap := false
if len(bootstrapPeers) > 1 {
for i := 1; i < len(bootstrapPeers); i++ {
host := parseHostFromMultiaddr(bootstrapPeers[i])
if host != "" && isLocalIP(host) {
isSecondaryBootstrap = true
break
}
}
}
if isSecondaryBootstrap {
primaryBootstrapHost := parseHostFromMultiaddr(bootstrapPeers[0])
cfg.Database.RQLiteJoinAddress = fmt.Sprintf("%s:%d", primaryBootstrapHost, 7001)
logger.Printf("Secondary bootstrap node - joining primary bootstrap (raft) at: %s", cfg.Database.RQLiteJoinAddress)
} else {
cfg.Database.RQLiteJoinAddress = ""
logger.Printf("Primary bootstrap node - starting new RQLite cluster")
}
return ""
}
// Regular node: compute bootstrap peers and join address
var rqliteJoinAddr string
if fv.Bootstrap != "" {
cfg.Discovery.BootstrapPeers = []string{fv.Bootstrap}
bootstrapHost := parseHostFromMultiaddr(fv.Bootstrap)
if bootstrapHost != "" {
if (bootstrapHost == "127.0.0.1" || strings.EqualFold(bootstrapHost, "localhost")) && cfg.Database.AdvertiseMode != "localhost" {
if extIP, err := getPreferredLocalIP(); err == nil && extIP != "" {
logger.Printf("Translating localhost bootstrap to external IP %s for RQLite join", extIP)
bootstrapHost = extIP
} else {
logger.Printf("Warning: Failed to resolve external IP, keeping localhost for RQLite join")
}
}
rqliteJoinAddr = fmt.Sprintf("%s:%d", bootstrapHost, 7001)
logger.Printf("Using extracted bootstrap host %s for RQLite Raft join (port 7001)", bootstrapHost)
} else {
logger.Printf("Warning: Could not extract host from bootstrap peer %s, using localhost fallback", fv.Bootstrap)
rqliteJoinAddr = fmt.Sprintf("localhost:%d", 7001)
}
logger.Printf("Using command line bootstrap peer: %s", fv.Bootstrap)
} else {
bootstrapPeers := cfg.Discovery.BootstrapPeers
if len(bootstrapPeers) == 0 {
bootstrapPeers = constants.GetBootstrapPeers()
}
if len(bootstrapPeers) > 0 {
cfg.Discovery.BootstrapPeers = bootstrapPeers
bootstrapHost := parseHostFromMultiaddr(bootstrapPeers[0])
if bootstrapHost != "" {
rqliteJoinAddr = fmt.Sprintf("%s:%d", bootstrapHost, 7001)
logger.Printf("Using extracted bootstrap host %s for RQLite Raft join", bootstrapHost)
} else {
logger.Printf("Warning: Could not extract host from bootstrap peer %s", bootstrapPeers[0])
rqliteJoinAddr = "localhost:7001"
}
logger.Printf("Using environment bootstrap peers: %v", bootstrapPeers)
} else {
logger.Printf("Warning: No bootstrap peers configured")
rqliteJoinAddr = "localhost:7001"
logger.Printf("Using localhost fallback for RQLite Raft join")
}
logger.Printf("=== NETWORK DIAGNOSTICS ===")
logger.Printf("Target RQLite Raft join address: %s", rqliteJoinAddr)
runNetworkDiagnostics(rqliteJoinAddr, logger)
}
cfg.Database.RQLiteJoinAddress = rqliteJoinAddr
logger.Printf("Regular node - joining RQLite cluster (raft) at: %s", cfg.Database.RQLiteJoinAddress)
return rqliteJoinAddr
}

View File

@ -88,110 +88,17 @@ func main() {
logger.Printf("Starting regular node...") logger.Printf("Starting regular node...")
} }
// Apply environment variable overrides before applying CLI flags so that // Centralized mapping from flags/env to config (flags > env > defaults)
// precedence is: flags > env > defaults _ = MapFlagsAndEnvToConfig(cfg, NodeFlagValues{
config.ApplyEnvOverrides(cfg) DataDir: *dataDir,
NodeID: *nodeID,
// Set basic configuration Bootstrap: *bootstrap,
cfg.Node.DataDir = *dataDir Role: *role,
cfg.Node.ListenAddresses = []string{ P2PPort: port,
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port), RqlHTTP: *rqlHTTP,
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", port), RqlRaft: *rqlRaft,
} Advertise: *advertise,
}, isBootstrap, logger)
// RQLite ports (overridable for local multi-node on one host)
cfg.Database.RQLitePort = *rqlHTTP
cfg.Database.RQLiteRaftPort = *rqlRaft
cfg.Database.AdvertiseMode = strings.ToLower(*advertise)
logger.Printf("RQLite advertise mode: %s", cfg.Database.AdvertiseMode)
if isBootstrap {
// Check if this is the primary bootstrap node (first in list) or secondary
bootstrapPeers := constants.GetBootstrapPeers()
isSecondaryBootstrap := false
if len(bootstrapPeers) > 1 {
// Check if this machine matches any bootstrap peer other than the first
for i := 1; i < len(bootstrapPeers); i++ {
host := parseHostFromMultiaddr(bootstrapPeers[i])
if host != "" && isLocalIP(host) {
isSecondaryBootstrap = true
break
}
}
}
if isSecondaryBootstrap {
// Secondary bootstrap nodes join the primary bootstrap Raft address (standardized to 7001)
primaryBootstrapHost := parseHostFromMultiaddr(bootstrapPeers[0])
cfg.Database.RQLiteJoinAddress = fmt.Sprintf("%s:%d", primaryBootstrapHost, 7001)
logger.Printf("Secondary bootstrap node - joining primary bootstrap (raft) at: %s", cfg.Database.RQLiteJoinAddress)
} else {
// Primary bootstrap node doesn't join anyone - it starts the cluster
cfg.Database.RQLiteJoinAddress = ""
logger.Printf("Primary bootstrap node - starting new RQLite cluster")
}
} else {
// Configure bootstrap peers for P2P discovery
var rqliteJoinAddr string // host:port for Raft join
if *bootstrap != "" {
// Use command line bootstrap if provided
cfg.Discovery.BootstrapPeers = []string{*bootstrap}
// Extract IP from bootstrap peer for RQLite
bootstrapHost := parseHostFromMultiaddr(*bootstrap)
if bootstrapHost != "" {
// Only translate localhost to external IP when not explicitly in localhost advertise mode
if (bootstrapHost == "127.0.0.1" || strings.EqualFold(bootstrapHost, "localhost")) && cfg.Database.AdvertiseMode != "localhost" {
if extIP, err := getPreferredLocalIP(); err == nil && extIP != "" {
logger.Printf("Translating localhost bootstrap to external IP %s for RQLite join", extIP)
bootstrapHost = extIP
} else {
logger.Printf("Warning: Failed to resolve external IP, keeping localhost for RQLite join")
}
}
// Regular nodes should join the bootstrap's RQLite Raft port (standardized to 7001)
rqliteJoinAddr = fmt.Sprintf("%s:%d", bootstrapHost, 7001)
logger.Printf("Using extracted bootstrap host %s for RQLite Raft join (port 7001)", bootstrapHost)
} else {
logger.Printf("Warning: Could not extract host from bootstrap peer %s, using localhost fallback", *bootstrap)
rqliteJoinAddr = fmt.Sprintf("localhost:%d", 7001) // Use localhost raft fallback instead
}
logger.Printf("Using command line bootstrap peer: %s", *bootstrap)
} else {
// Use environment-configured bootstrap peers if provided; otherwise fallback to constants
bootstrapPeers := cfg.Discovery.BootstrapPeers
if len(bootstrapPeers) == 0 {
bootstrapPeers = constants.GetBootstrapPeers()
}
if len(bootstrapPeers) > 0 {
cfg.Discovery.BootstrapPeers = bootstrapPeers
// Use the first bootstrap peer for RQLite join
bootstrapHost := parseHostFromMultiaddr(bootstrapPeers[0])
if bootstrapHost != "" {
rqliteJoinAddr = fmt.Sprintf("%s:%d", bootstrapHost, 7001)
logger.Printf("Using extracted bootstrap host %s for RQLite Raft join", bootstrapHost)
} else {
logger.Printf("Warning: Could not extract host from bootstrap peer %s", bootstrapPeers[0])
// Try primary production server as fallback
rqliteJoinAddr = "localhost:7001"
}
logger.Printf("Using environment bootstrap peers: %v", bootstrapPeers)
} else {
logger.Printf("Warning: No bootstrap peers configured")
// Default to localhost when no peers configured
rqliteJoinAddr = "localhost:7001"
logger.Printf("Using localhost fallback for RQLite Raft join")
}
// Log network connectivity diagnostics
logger.Printf("=== NETWORK DIAGNOSTICS ===")
logger.Printf("Target RQLite Raft join address: %s", rqliteJoinAddr)
runNetworkDiagnostics(rqliteJoinAddr, logger)
}
// Regular nodes join the bootstrap node's RQLite cluster
cfg.Database.RQLiteJoinAddress = rqliteJoinAddr
logger.Printf("Regular node - joining RQLite cluster (raft) at: %s", cfg.Database.RQLiteJoinAddress)
}
logger.Printf("Data directory: %s", cfg.Node.DataDir) logger.Printf("Data directory: %s", cfg.Node.DataDir)
logger.Printf("Listen addresses: %v", cfg.Node.ListenAddresses) logger.Printf("Listen addresses: %v", cfg.Node.ListenAddresses)

View File

@ -0,0 +1,121 @@
package client
import (
"context"
"time"
"go.uber.org/zap"
)
// ensureAnchatPeerConnectivity ensures Anchat clients can discover each other through bootstrap
func (c *Client) ensureAnchatPeerConnectivity() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for i := 0; i < 30; i++ { // Run for ~1 minute
<-ticker.C
if !c.isConnected() {
return
}
connectedPeers := c.host.Network().Peers()
if c.dht != nil {
// Try to find peers through DHT routing table
routingPeers := c.dht.RoutingTable().ListPeers()
for _, peerID := range routingPeers {
if peerID == c.host.ID() {
continue
}
// Check already connected
alreadyConnected := false
for _, p := range connectedPeers {
if p == peerID {
alreadyConnected = true
break
}
}
if !alreadyConnected {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
peerInfo := c.host.Peerstore().PeerInfo(peerID)
if len(peerInfo.Addrs) == 0 {
if found, err := c.dht.FindPeer(ctx, peerID); err == nil {
peerInfo = found
c.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, time.Hour*24)
}
}
if len(peerInfo.Addrs) > 0 {
if err := c.host.Connect(ctx, peerInfo); err == nil {
c.logger.Info("Anchat discovered and connected to peer",
zap.String("peer", peerID.String()[:8]+"..."))
if added, err := c.dht.RoutingTable().TryAddPeer(peerID, true, true); err == nil && added {
c.logger.Debug("Added new peer to DHT routing table",
zap.String("peer", peerID.String()[:8]+"..."))
}
if c.libp2pPS != nil {
time.Sleep(100 * time.Millisecond)
_ = c.libp2pPS.ListPeers("")
}
} else {
c.logger.Debug("Failed to connect to discovered peer",
zap.String("peer", peerID.String()[:8]+"..."),
zap.Error(err))
}
}
cancel()
}
}
if len(routingPeers) == 0 {
for _, id := range connectedPeers {
if id != c.host.ID() {
if added, err := c.dht.RoutingTable().TryAddPeer(id, true, true); err == nil && added {
c.logger.Info("Force-added connected peer to DHT routing table",
zap.String("peer", id.String()[:8]+"..."))
}
}
}
c.dht.RefreshRoutingTable()
}
}
// Reconnect to known peers not currently connected
allKnownPeers := c.host.Peerstore().Peers()
for _, id := range allKnownPeers {
if id == c.host.ID() {
continue
}
already := false
for _, p := range connectedPeers {
if p == id { already = true; break }
}
if !already {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
pi := c.host.Peerstore().PeerInfo(id)
if len(pi.Addrs) > 0 {
if err := c.host.Connect(ctx, pi); err == nil {
c.logger.Info("Anchat reconnected to known peer",
zap.String("peer", id.String()[:8]+"..."))
if c.libp2pPS != nil { time.Sleep(100 * time.Millisecond); _ = c.libp2pPS.ListPeers("") }
}
}
cancel()
}
}
if i%5 == 0 && len(connectedPeers) > 0 {
c.logger.Info("Anchat peer discovery progress",
zap.Int("iteration", i+1),
zap.Int("connected_peers", len(connectedPeers)),
zap.Int("known_peers", len(allKnownPeers)))
}
}
}

View File

@ -17,7 +17,6 @@ import (
dht "github.com/libp2p/go-libp2p-kad-dht" dht "github.com/libp2p/go-libp2p-kad-dht"
libp2ppubsub "github.com/libp2p/go-libp2p-pubsub" libp2ppubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
"git.debros.io/DeBros/network/pkg/discovery" "git.debros.io/DeBros/network/pkg/discovery"
"git.debros.io/DeBros/network/pkg/pubsub" "git.debros.io/DeBros/network/pkg/pubsub"
@ -49,31 +48,6 @@ type Client struct {
mu sync.RWMutex mu sync.RWMutex
} }
// pubSubBridge bridges between our PubSubClient interface and the pubsub package
type pubSubBridge struct {
adapter *pubsub.ClientAdapter
}
func (p *pubSubBridge) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
// Convert our MessageHandler to the pubsub package MessageHandler
pubsubHandler := func(topic string, data []byte) error {
return handler(topic, data)
}
return p.adapter.Subscribe(ctx, topic, pubsubHandler)
}
func (p *pubSubBridge) Publish(ctx context.Context, topic string, data []byte) error {
return p.adapter.Publish(ctx, topic, data)
}
func (p *pubSubBridge) Unsubscribe(ctx context.Context, topic string) error {
return p.adapter.Unsubscribe(ctx, topic)
}
func (p *pubSubBridge) ListTopics(ctx context.Context) ([]string, error) {
return p.adapter.ListTopics(ctx)
}
// NewClient creates a new network client // NewClient creates a new network client
func NewClient(config *ClientConfig) (NetworkClient, error) { func NewClient(config *ClientConfig) (NetworkClient, error) {
if config == nil { if config == nil {
@ -237,15 +211,8 @@ func (c *Client) Connect() error {
c.logger.Warn("Failed to start peer discovery", zap.Error(err)) c.logger.Warn("Failed to start peer discovery", zap.Error(err))
} }
// For Anchat clients, ensure we connect to all other clients through bootstrap // Start generic aggressive peer discovery for all apps
if c.config.AppName == "anchat" { go c.startAggressivePeerDiscovery()
// Start mDNS discovery for local network peer discovery
go c.startMDNSDiscovery()
go c.ensureAnchatPeerConnectivity()
} else {
// Start aggressive peer discovery for other apps
go c.startAggressivePeerDiscovery()
}
// Start connection monitoring // Start connection monitoring
c.startConnectionMonitoring() c.startConnectionMonitoring()
@ -255,43 +222,7 @@ func (c *Client) Connect() error {
return nil return nil
} }
// connectToBootstrap connects to a bootstrap peer // Disconnect closes the connection to the network
func (c *Client) connectToBootstrap(ctx context.Context, addr string) error {
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return fmt.Errorf("invalid multiaddr: %w", err)
}
// Try to extract peer info if it's a full multiaddr with peer ID
peerInfo, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
// If there's no peer ID, try to discover the peer at this address
return c.connectToAddress(ctx, ma)
}
if err := c.host.Connect(ctx, *peerInfo); err != nil {
return fmt.Errorf("failed to connect to peer: %w", err)
}
c.logger.Debug("Connected to bootstrap peer",
zap.String("peer", peerInfo.ID.String()),
zap.String("addr", addr))
return nil
}
// connectToAddress attempts to discover and connect to a peer at the given address
func (c *Client) connectToAddress(ctx context.Context, ma multiaddr.Multiaddr) error {
// For the simple case, we'll just warn and continue
// In a production environment, you'd implement proper peer discovery
// using mDNS, DHT, or other mechanisms
c.logger.Warn("No peer ID provided in address, skipping bootstrap connection",
zap.String("addr", ma.String()),
zap.String("suggestion", "Use full multiaddr with peer ID like: /ip4/127.0.0.1/tcp/4001/p2p/<peer-id>"))
return nil // Don't fail - let the client continue without bootstrap
} // Disconnect closes the connection to the network
func (c *Client) Disconnect() error { func (c *Client) Disconnect() error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -372,268 +303,3 @@ func (c *Client) isConnected() bool {
func (c *Client) getAppNamespace() string { func (c *Client) getAppNamespace() string {
return c.config.AppName return c.config.AppName
} }
// startConnectionMonitoring monitors connection health and logs status
func (c *Client) startConnectionMonitoring() {
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
if !c.isConnected() {
return
}
// Remove connection status logging for cleaner output
// connectedPeers := c.host.Network().Peers()
// Only log if there are connection issues
_ = c.host.Network().Peers()
}
}()
}
// ensureAnchatPeerConnectivity ensures Anchat clients can discover each other through bootstrap
func (c *Client) ensureAnchatPeerConnectivity() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for i := 0; i < 30; i++ { // Run for 1 minute
<-ticker.C
if !c.isConnected() {
return
}
// Get current connected peers
connectedPeers := c.host.Network().Peers()
// For Anchat, we need to be very aggressive about finding other clients
// The key insight: we need to ask our connected peers (like bootstrap) for their peers
if c.dht != nil {
// Try to find peers through DHT routing table
routingPeers := c.dht.RoutingTable().ListPeers()
for _, peerID := range routingPeers {
if peerID == c.host.ID() {
continue
}
// Check if we're already connected to this peer
alreadyConnected := false
for _, alreadyConnectedPeer := range connectedPeers {
if alreadyConnectedPeer == peerID {
alreadyConnected = true
break
}
}
if !alreadyConnected {
// Try to connect to this peer
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
peerInfo := c.host.Peerstore().PeerInfo(peerID)
// If we don't have addresses, try to find them through the DHT
if len(peerInfo.Addrs) == 0 {
if foundPeerInfo, err := c.dht.FindPeer(ctx, peerID); err == nil {
peerInfo = foundPeerInfo
// Add to peerstore for future use
c.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, time.Hour*24)
}
}
if len(peerInfo.Addrs) > 0 {
err := c.host.Connect(ctx, peerInfo)
if err == nil {
c.logger.Info("Anchat discovered and connected to peer",
zap.String("peer", peerID.String()[:8]+"..."))
// Add newly connected peer to DHT routing table
if added, addErr := c.dht.RoutingTable().TryAddPeer(peerID, true, true); addErr == nil && added {
c.logger.Debug("Added new peer to DHT routing table",
zap.String("peer", peerID.String()[:8]+"..."))
}
// Force pubsub to recognize the new peer and form mesh connections
if c.libp2pPS != nil {
// Wait a moment for connection to stabilize
time.Sleep(100 * time.Millisecond)
// List peers to trigger mesh formation
_ = c.libp2pPS.ListPeers("")
}
} else {
c.logger.Debug("Failed to connect to discovered peer",
zap.String("peer", peerID.String()[:8]+"..."),
zap.Error(err))
}
}
cancel()
}
}
// If DHT routing table is still empty, try to force populate it
if len(routingPeers) == 0 {
// Try to add all connected peers to DHT routing table
for _, connectedPeerID := range connectedPeers {
if connectedPeerID != c.host.ID() {
if added, err := c.dht.RoutingTable().TryAddPeer(connectedPeerID, true, true); err == nil && added {
c.logger.Info("Force-added connected peer to DHT routing table",
zap.String("peer", connectedPeerID.String()[:8]+"..."))
}
}
}
// Force DHT refresh
c.dht.RefreshRoutingTable()
}
}
// Also try to connect to any peers we might have in our peerstore but aren't connected to
allKnownPeers := c.host.Peerstore().Peers()
for _, knownPeerID := range allKnownPeers {
if knownPeerID == c.host.ID() {
continue
}
// Check if we're already connected
alreadyConnected := false
for _, connectedPeer := range connectedPeers {
if connectedPeer == knownPeerID {
alreadyConnected = true
break
}
}
if !alreadyConnected {
// Try to connect to this known peer
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
peerInfo := c.host.Peerstore().PeerInfo(knownPeerID)
if len(peerInfo.Addrs) > 0 {
err := c.host.Connect(ctx, peerInfo)
if err == nil {
c.logger.Info("Anchat reconnected to known peer",
zap.String("peer", knownPeerID.String()[:8]+"..."))
// Force pubsub mesh formation
if c.libp2pPS != nil {
time.Sleep(100 * time.Millisecond)
_ = c.libp2pPS.ListPeers("")
}
}
}
cancel()
}
}
// Log status every 5 iterations (10 seconds)
if i%5 == 0 && len(connectedPeers) > 0 {
c.logger.Info("Anchat peer discovery progress",
zap.Int("iteration", i+1),
zap.Int("connected_peers", len(connectedPeers)),
zap.Int("known_peers", len(allKnownPeers)))
}
}
} // startAggressivePeerDiscovery implements aggressive peer discovery for non-Anchat apps
func (c *Client) startAggressivePeerDiscovery() {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for i := 0; i < 20; i++ { // Run for 1 minute
<-ticker.C
if !c.isConnected() {
return
}
// Get current connected peers
connectedPeers := c.host.Network().Peers()
// Try to discover more peers through the DHT
if c.dht != nil {
// Get peers from the DHT routing table
routingPeers := c.dht.RoutingTable().ListPeers()
for _, peerID := range routingPeers {
if peerID == c.host.ID() {
continue
}
// Check if we're already connected
alreadyConnected := false
for _, connectedPeer := range connectedPeers {
if connectedPeer == peerID {
alreadyConnected = true
break
}
}
if !alreadyConnected {
// Try to connect
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
peerInfo := c.host.Peerstore().PeerInfo(peerID)
if len(peerInfo.Addrs) > 0 {
err := c.host.Connect(ctx, peerInfo)
if err == nil {
c.logger.Debug("Connected to discovered peer",
zap.String("peer", peerID.String()[:8]+"..."))
}
}
cancel()
}
}
}
// Log current status every 10 iterations (30 seconds)
if i%10 == 0 {
c.logger.Debug("Peer discovery status",
zap.Int("iteration", i+1),
zap.Int("connected_peers", len(connectedPeers)))
}
}
}
// startMDNSDiscovery enables mDNS peer discovery for local network
func (c *Client) startMDNSDiscovery() {
// Setup mDNS discovery service for Anchat
mdnsService := mdns.NewMdnsService(c.host, "anchat-p2p", &discoveryNotifee{
client: c,
logger: c.logger,
})
if err := mdnsService.Start(); err != nil {
c.logger.Warn("Failed to start mDNS discovery", zap.Error(err))
return
}
c.logger.Info("Started mDNS discovery for Anchat")
}
// discoveryNotifee handles mDNS peer discovery notifications
type discoveryNotifee struct {
client *Client
logger *zap.Logger
}
func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) {
n.logger.Info("mDNS discovered Anchat peer",
zap.String("peer", pi.ID.String()[:8]+"..."),
zap.Int("addrs", len(pi.Addrs)))
// Connect to the discovered peer
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := n.client.host.Connect(ctx, pi); err != nil {
n.logger.Debug("Failed to connect to mDNS discovered peer",
zap.String("peer", pi.ID.String()[:8]+"..."),
zap.Error(err))
} else {
n.logger.Info("Successfully connected to mDNS discovered peer",
zap.String("peer", pi.ID.String()[:8]+"..."))
// Force pubsub to recognize the new peer
if n.client.libp2pPS != nil {
_ = n.client.libp2pPS.ListPeers("")
}
}
}

View File

@ -0,0 +1,48 @@
package client
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
)
// connectToBootstrap connects to a bootstrap peer
func (c *Client) connectToBootstrap(ctx context.Context, addr string) error {
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return fmt.Errorf("invalid multiaddr: %w", err)
}
// Try to extract peer info if it's a full multiaddr with peer ID
peerInfo, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
// If there's no peer ID, try to discover the peer at this address
return c.connectToAddress(ctx, ma)
}
if err := c.host.Connect(ctx, *peerInfo); err != nil {
return fmt.Errorf("failed to connect to peer: %w", err)
}
c.logger.Debug("Connected to bootstrap peer",
zap.String("peer", peerInfo.ID.String()),
zap.String("addr", addr))
return nil
}
// connectToAddress attempts to discover and connect to a peer at the given address
func (c *Client) connectToAddress(ctx context.Context, ma multiaddr.Multiaddr) error {
// For the simple case, we'll just warn and continue
// In a production environment, you'd implement proper peer discovery
// using mDNS, DHT, or other mechanisms
c.logger.Warn("No peer ID provided in address, skipping bootstrap connection",
zap.String("addr", ma.String()),
zap.String("suggestion", "Use full multiaddr with peer ID like: /ip4/127.0.0.1/tcp/4001/p2p/<peer-id>"))
return nil // Don't fail - let the client continue without bootstrap
}

View File

@ -0,0 +1,42 @@
package client
import (
"context"
"time"
"go.uber.org/zap"
)
// startAggressivePeerDiscovery implements aggressive peer discovery for non-Anchat apps
func (c *Client) startAggressivePeerDiscovery() {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for i := 0; i < 20; i++ { // ~1 minute
<-ticker.C
if !c.isConnected() { return }
connectedPeers := c.host.Network().Peers()
if c.dht != nil {
routingPeers := c.dht.RoutingTable().ListPeers()
for _, pid := range routingPeers {
if pid == c.host.ID() { continue }
already := false
for _, cp := range connectedPeers { if cp == pid { already = true; break } }
if !already {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
pi := c.host.Peerstore().PeerInfo(pid)
if len(pi.Addrs) > 0 {
if err := c.host.Connect(ctx, pi); err == nil {
c.logger.Debug("Connected to discovered peer", zap.String("peer", pid.String()[:8]+"..."))
}
}
cancel()
}
}
}
if i%10 == 0 {
c.logger.Debug("Peer discovery status", zap.Int("iteration", i+1), zap.Int("connected_peers", len(connectedPeers)))
}
}
}

View File

@ -0,0 +1,38 @@
package client
import (
"context"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
"go.uber.org/zap"
)
// startMDNSDiscovery enables mDNS peer discovery for local network
func (c *Client) startMDNSDiscovery() {
mdnsService := mdns.NewMdnsService(c.host, "anchat-p2p", &discoveryNotifee{ client: c, logger: c.logger })
if err := mdnsService.Start(); err != nil {
c.logger.Warn("Failed to start mDNS discovery", zap.Error(err))
return
}
c.logger.Info("Started mDNS discovery for Anchat")
}
// discoveryNotifee handles mDNS peer discovery notifications
type discoveryNotifee struct {
client *Client
logger *zap.Logger
}
func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) {
n.logger.Info("mDNS discovered Anchat peer", zap.String("peer", pi.ID.String()[:8]+"..."), zap.Int("addrs", len(pi.Addrs)))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := n.client.host.Connect(ctx, pi); err != nil {
n.logger.Debug("Failed to connect to mDNS discovered peer", zap.String("peer", pi.ID.String()[:8]+"..."), zap.Error(err))
} else {
n.logger.Info("Successfully connected to mDNS discovered peer", zap.String("peer", pi.ID.String()[:8]+"..."))
if n.client.libp2pPS != nil { _ = n.client.libp2pPS.ListPeers("") }
}
}

20
pkg/client/monitoring.go Normal file
View File

@ -0,0 +1,20 @@
package client
import "time"
// startConnectionMonitoring monitors connection health and logs status
func (c *Client) startConnectionMonitoring() {
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
if !c.isConnected() {
return
}
// Only touch network to detect issues; avoid noisy logs
_ = c.host.Network().Peers()
}
}()
}

View File

@ -0,0 +1,32 @@
package client
import (
"context"
"git.debros.io/DeBros/network/pkg/pubsub"
)
// pubSubBridge bridges between our PubSubClient interface and the pubsub package
type pubSubBridge struct {
adapter *pubsub.ClientAdapter
}
func (p *pubSubBridge) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
// Convert our MessageHandler to the pubsub package MessageHandler
pubsubHandler := func(topic string, data []byte) error {
return handler(topic, data)
}
return p.adapter.Subscribe(ctx, topic, pubsubHandler)
}
func (p *pubSubBridge) Publish(ctx context.Context, topic string, data []byte) error {
return p.adapter.Publish(ctx, topic, data)
}
func (p *pubSubBridge) Unsubscribe(ctx context.Context, topic string) error {
return p.adapter.Unsubscribe(ctx, topic)
}
func (p *pubSubBridge) ListTopics(ctx context.Context) ([]string, error) {
return p.adapter.ListTopics(ctx)
}