mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-10-06 13:49:07 +00:00
Unify and simplify pubsub peer discovery logic
Enable PeerExchange globally and replace Anchat-specific discovery with a generic, application-agnostic approach that uses periodic monitoring and a single proactive announcement to find topic peers.
This commit is contained in:
parent
b7d8c42464
commit
0b60ac0791
@ -157,18 +157,13 @@ func (c *Client) Connect() error {
|
||||
zap.Strings("listen_addrs", addrStrs),
|
||||
)
|
||||
|
||||
// Create LibP2P PubSub with enhanced discovery for Anchat
|
||||
// Create LibP2P GossipSub with PeerExchange enabled (gossip-based peer exchange).
|
||||
// Peer exchange helps propagate peer addresses via pubsub gossip and is enabled
|
||||
// globally so discovery works without Anchat-specific branches.
|
||||
var ps *libp2ppubsub.PubSub
|
||||
if c.config.AppName == "anchat" {
|
||||
// For Anchat, use more aggressive GossipSub settings for better peer discovery
|
||||
ps, err = libp2ppubsub.NewGossipSub(context.Background(), h,
|
||||
libp2ppubsub.WithPeerExchange(true), // Enable peer exchange
|
||||
libp2ppubsub.WithFloodPublish(true), // Flood publish for small networks
|
||||
)
|
||||
} else {
|
||||
// Standard GossipSub for other applications
|
||||
ps, err = libp2ppubsub.NewGossipSub(context.Background(), h)
|
||||
}
|
||||
ps, err = libp2ppubsub.NewGossipSub(context.Background(), h,
|
||||
libp2ppubsub.WithPeerExchange(true),
|
||||
)
|
||||
if err != nil {
|
||||
h.Close()
|
||||
return fmt.Errorf("failed to create pubsub: %w", err)
|
||||
|
@ -7,7 +7,9 @@ import (
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
)
|
||||
|
||||
// announceTopicInterest helps with peer discovery by announcing interest in a topic
|
||||
// announceTopicInterest helps with peer discovery by announcing interest in a topic.
|
||||
// It starts lightweight monitoring and performs a single proactive announcement to
|
||||
// encourage peers to respond. This implementation is application-agnostic.
|
||||
func (m *Manager) announceTopicInterest(topicName string) {
|
||||
// Wait a bit for the subscription to be established
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
@ -21,108 +23,43 @@ func (m *Manager) announceTopicInterest(topicName string) {
|
||||
return
|
||||
}
|
||||
|
||||
// For Anchat specifically, be more aggressive about finding peers
|
||||
if len(m.namespace) > 6 && m.namespace[:6] == "anchat" {
|
||||
go m.aggressiveTopicPeerDiscovery(topicName, topic)
|
||||
} else {
|
||||
// Start a periodic check to monitor topic peer growth
|
||||
go m.monitorTopicPeers(topicName, topic)
|
||||
}
|
||||
// Start periodic monitoring for topic peers
|
||||
go m.monitorTopicPeers(topicName, topic)
|
||||
|
||||
// Perform a single proactive announcement to the topic to encourage peers to respond
|
||||
go m.forceTopicPeerDiscovery(topicName, topic)
|
||||
}
|
||||
|
||||
// aggressiveTopicPeerDiscovery for Anchat - actively seeks topic peers
|
||||
func (m *Manager) aggressiveTopicPeerDiscovery(topicName string, topic *pubsub.Topic) {
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for i := 0; i < 30; i++ { // Monitor for 30 seconds
|
||||
<-ticker.C
|
||||
peers := topic.ListPeers()
|
||||
|
||||
// If we have peers, reduce frequency but keep monitoring
|
||||
if len(peers) > 0 {
|
||||
// Switch to normal monitoring once we have peers
|
||||
go m.monitorTopicPeers(topicName, topic)
|
||||
return
|
||||
}
|
||||
|
||||
// For Anchat, try to actively discover and connect to peers on this topic
|
||||
// This is critical because LibP2P pubsub requires direct connections for message propagation
|
||||
m.forceTopicPeerDiscovery(topicName, topic)
|
||||
}
|
||||
}
|
||||
|
||||
// enhancedAnchatTopicDiscovery implements enhanced peer discovery specifically for Anchat
|
||||
func (m *Manager) enhancedAnchatTopicDiscovery(topicName string, topic *pubsub.Topic) {
|
||||
// Wait for subscription to be fully established
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for i := 0; i < 20; i++ { // Monitor for 20 seconds
|
||||
<-ticker.C
|
||||
|
||||
peers := topic.ListPeers()
|
||||
if len(peers) > 0 {
|
||||
// Success! We found topic peers
|
||||
return
|
||||
}
|
||||
|
||||
// Try various discovery strategies
|
||||
if i%3 == 0 {
|
||||
// Strategy: Send discovery heartbeat
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||
discoveryMsg := []byte("ANCHAT_DISCOVERY_PING")
|
||||
topic.Publish(ctx, discoveryMsg)
|
||||
cancel()
|
||||
}
|
||||
|
||||
// Wait a bit and check again
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
peers = topic.ListPeers()
|
||||
if len(peers) > 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// forceTopicPeerDiscovery uses multiple strategies to find and connect to topic peers
|
||||
// forceTopicPeerDiscovery uses a simple strategy to announce presence on the topic.
|
||||
// It publishes a lightweight discovery ping and returns quickly.
|
||||
func (m *Manager) forceTopicPeerDiscovery(topicName string, topic *pubsub.Topic) {
|
||||
// Strategy 1: Check if pubsub knows about any peers for this topic
|
||||
// If pubsub already reports peers for this topic, do nothing.
|
||||
peers := topic.ListPeers()
|
||||
if len(peers) > 0 {
|
||||
return // We already have peers
|
||||
return
|
||||
}
|
||||
|
||||
// Strategy 2: Try to actively announce our presence and wait for responses
|
||||
// Send a ping/heartbeat to the topic to announce our presence
|
||||
// Send a short-lived discovery ping to the topic to announce presence.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Create a discovery message to announce our presence on this topic
|
||||
discoveryMsg := []byte("ANCHAT_PEER_DISCOVERY")
|
||||
topic.Publish(ctx, discoveryMsg)
|
||||
discoveryMsg := []byte("PEER_DISCOVERY_PING")
|
||||
_ = topic.Publish(ctx, discoveryMsg)
|
||||
|
||||
// Strategy 3: Wait briefly and check again
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
_ = topic.ListPeers() // Check again but we don't need to use the result
|
||||
|
||||
// Note: In LibP2P, topics don't automatically form connections between subscribers
|
||||
// The underlying network layer needs to ensure peers are connected first
|
||||
// This is why our enhanced client peer discovery is crucial
|
||||
// Wait briefly to allow peers to respond via pubsub peer exchange
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
}
|
||||
|
||||
// monitorTopicPeers periodically checks topic peer connectivity
|
||||
// monitorTopicPeers periodically checks topic peer connectivity and stops once peers are found.
|
||||
func (m *Manager) monitorTopicPeers(topicName string, topic *pubsub.Topic) {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for i := 0; i < 6; i++ { // Monitor for 30 seconds
|
||||
for i := 0; i < 6; i++ { // Monitor for ~30 seconds
|
||||
<-ticker.C
|
||||
peers := topic.ListPeers()
|
||||
|
||||
// If we have peers, we're good
|
||||
// If we have peers, stop monitoring
|
||||
if len(peers) > 0 {
|
||||
return
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package pubsub
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
)
|
||||
|
||||
@ -74,14 +75,9 @@ func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHa
|
||||
}
|
||||
}()
|
||||
|
||||
// Force peer discovery for this topic
|
||||
// Force peer discovery for this topic (application-agnostic)
|
||||
go m.announceTopicInterest(namespacedTopic)
|
||||
|
||||
// For Anchat, also try to actively find topic peers through the libp2p pubsub system
|
||||
if len(m.namespace) > 6 && m.namespace[:6] == "anchat" {
|
||||
go m.enhancedAnchatTopicDiscovery(namespacedTopic, libp2pTopic)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user