From 0b60ac0791eb9d04d37410017ed3ab3b126c92f4 Mon Sep 17 00:00:00 2001 From: anonpenguin Date: Thu, 14 Aug 2025 14:08:04 +0300 Subject: [PATCH] Unify and simplify pubsub peer discovery logic Enable PeerExchange globally and replace Anchat-specific discovery with a generic, application-agnostic approach that uses periodic monitoring and a single proactive announcement to find topic peers. --- pkg/client/client.go | 17 ++--- pkg/pubsub/discovery_integration.go | 103 ++++++---------------------- pkg/pubsub/subscriptions.go | 8 +-- 3 files changed, 28 insertions(+), 100 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index ecb2d40..1ce16e2 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -157,18 +157,13 @@ func (c *Client) Connect() error { zap.Strings("listen_addrs", addrStrs), ) - // Create LibP2P PubSub with enhanced discovery for Anchat + // Create LibP2P GossipSub with PeerExchange enabled (gossip-based peer exchange). + // Peer exchange helps propagate peer addresses via pubsub gossip and is enabled + // globally so discovery works without Anchat-specific branches. var ps *libp2ppubsub.PubSub - if c.config.AppName == "anchat" { - // For Anchat, use more aggressive GossipSub settings for better peer discovery - ps, err = libp2ppubsub.NewGossipSub(context.Background(), h, - libp2ppubsub.WithPeerExchange(true), // Enable peer exchange - libp2ppubsub.WithFloodPublish(true), // Flood publish for small networks - ) - } else { - // Standard GossipSub for other applications - ps, err = libp2ppubsub.NewGossipSub(context.Background(), h) - } + ps, err = libp2ppubsub.NewGossipSub(context.Background(), h, + libp2ppubsub.WithPeerExchange(true), + ) if err != nil { h.Close() return fmt.Errorf("failed to create pubsub: %w", err) diff --git a/pkg/pubsub/discovery_integration.go b/pkg/pubsub/discovery_integration.go index 02d3bd6..91b2521 100644 --- a/pkg/pubsub/discovery_integration.go +++ b/pkg/pubsub/discovery_integration.go @@ -7,7 +7,9 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" ) -// announceTopicInterest helps with peer discovery by announcing interest in a topic +// announceTopicInterest helps with peer discovery by announcing interest in a topic. +// It starts lightweight monitoring and performs a single proactive announcement to +// encourage peers to respond. This implementation is application-agnostic. func (m *Manager) announceTopicInterest(topicName string) { // Wait a bit for the subscription to be established time.Sleep(100 * time.Millisecond) @@ -21,108 +23,43 @@ func (m *Manager) announceTopicInterest(topicName string) { return } - // For Anchat specifically, be more aggressive about finding peers - if len(m.namespace) > 6 && m.namespace[:6] == "anchat" { - go m.aggressiveTopicPeerDiscovery(topicName, topic) - } else { - // Start a periodic check to monitor topic peer growth - go m.monitorTopicPeers(topicName, topic) - } + // Start periodic monitoring for topic peers + go m.monitorTopicPeers(topicName, topic) + + // Perform a single proactive announcement to the topic to encourage peers to respond + go m.forceTopicPeerDiscovery(topicName, topic) } -// aggressiveTopicPeerDiscovery for Anchat - actively seeks topic peers -func (m *Manager) aggressiveTopicPeerDiscovery(topicName string, topic *pubsub.Topic) { - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - for i := 0; i < 30; i++ { // Monitor for 30 seconds - <-ticker.C - peers := topic.ListPeers() - - // If we have peers, reduce frequency but keep monitoring - if len(peers) > 0 { - // Switch to normal monitoring once we have peers - go m.monitorTopicPeers(topicName, topic) - return - } - - // For Anchat, try to actively discover and connect to peers on this topic - // This is critical because LibP2P pubsub requires direct connections for message propagation - m.forceTopicPeerDiscovery(topicName, topic) - } -} - -// enhancedAnchatTopicDiscovery implements enhanced peer discovery specifically for Anchat -func (m *Manager) enhancedAnchatTopicDiscovery(topicName string, topic *pubsub.Topic) { - // Wait for subscription to be fully established - time.Sleep(200 * time.Millisecond) - - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - for i := 0; i < 20; i++ { // Monitor for 20 seconds - <-ticker.C - - peers := topic.ListPeers() - if len(peers) > 0 { - // Success! We found topic peers - return - } - - // Try various discovery strategies - if i%3 == 0 { - // Strategy: Send discovery heartbeat - ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) - discoveryMsg := []byte("ANCHAT_DISCOVERY_PING") - topic.Publish(ctx, discoveryMsg) - cancel() - } - - // Wait a bit and check again - time.Sleep(500 * time.Millisecond) - peers = topic.ListPeers() - if len(peers) > 0 { - return - } - } -} - -// forceTopicPeerDiscovery uses multiple strategies to find and connect to topic peers +// forceTopicPeerDiscovery uses a simple strategy to announce presence on the topic. +// It publishes a lightweight discovery ping and returns quickly. func (m *Manager) forceTopicPeerDiscovery(topicName string, topic *pubsub.Topic) { - // Strategy 1: Check if pubsub knows about any peers for this topic + // If pubsub already reports peers for this topic, do nothing. peers := topic.ListPeers() if len(peers) > 0 { - return // We already have peers + return } - // Strategy 2: Try to actively announce our presence and wait for responses - // Send a ping/heartbeat to the topic to announce our presence + // Send a short-lived discovery ping to the topic to announce presence. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - // Create a discovery message to announce our presence on this topic - discoveryMsg := []byte("ANCHAT_PEER_DISCOVERY") - topic.Publish(ctx, discoveryMsg) + discoveryMsg := []byte("PEER_DISCOVERY_PING") + _ = topic.Publish(ctx, discoveryMsg) - // Strategy 3: Wait briefly and check again - time.Sleep(500 * time.Millisecond) - _ = topic.ListPeers() // Check again but we don't need to use the result - - // Note: In LibP2P, topics don't automatically form connections between subscribers - // The underlying network layer needs to ensure peers are connected first - // This is why our enhanced client peer discovery is crucial + // Wait briefly to allow peers to respond via pubsub peer exchange + time.Sleep(300 * time.Millisecond) } -// monitorTopicPeers periodically checks topic peer connectivity +// monitorTopicPeers periodically checks topic peer connectivity and stops once peers are found. func (m *Manager) monitorTopicPeers(topicName string, topic *pubsub.Topic) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() - for i := 0; i < 6; i++ { // Monitor for 30 seconds + for i := 0; i < 6; i++ { // Monitor for ~30 seconds <-ticker.C peers := topic.ListPeers() - // If we have peers, we're good + // If we have peers, stop monitoring if len(peers) > 0 { return } diff --git a/pkg/pubsub/subscriptions.go b/pkg/pubsub/subscriptions.go index fb30a18..f6c032a 100644 --- a/pkg/pubsub/subscriptions.go +++ b/pkg/pubsub/subscriptions.go @@ -3,6 +3,7 @@ package pubsub import ( "context" "fmt" + pubsub "github.com/libp2p/go-libp2p-pubsub" ) @@ -74,14 +75,9 @@ func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHa } }() - // Force peer discovery for this topic + // Force peer discovery for this topic (application-agnostic) go m.announceTopicInterest(namespacedTopic) - // For Anchat, also try to actively find topic peers through the libp2p pubsub system - if len(m.namespace) > 6 && m.namespace[:6] == "anchat" { - go m.enhancedAnchatTopicDiscovery(namespacedTopic, libp2pTopic) - } - return nil }