mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-12-12 22:38:50 +00:00
- Added new E2E tests for authentication, cache operations, and IPFS interactions to improve coverage and reliability. - Introduced concurrency tests for cache operations to validate performance under load. - Updated `go.mod` to include `github.com/mattn/go-sqlite3` as a dependency for database interactions. - Refined Makefile to simplify E2E test execution and configuration discovery. - Removed outdated client E2E tests and consolidated related functionality for better maintainability.
210 lines
5.1 KiB
Go
210 lines
5.1 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
)
|
|
|
|
// Subscribe subscribes to a topic with a handler.
|
|
// Returns a HandlerID that can be used to unsubscribe this specific handler.
|
|
// Multiple handlers can subscribe to the same topic.
|
|
func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
|
|
if m.pubsub == nil {
|
|
return fmt.Errorf("pubsub not initialized")
|
|
}
|
|
|
|
// Determine namespace (allow per-call override via context)
|
|
ns := m.namespace
|
|
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
|
|
if s, ok := v.(string); ok && s != "" {
|
|
ns = s
|
|
}
|
|
}
|
|
namespacedTopic := fmt.Sprintf("%s.%s", ns, topic)
|
|
|
|
// Fast path: we already have a subscription for this topic
|
|
m.mu.RLock()
|
|
if existing := m.subscriptions[namespacedTopic]; existing != nil {
|
|
m.mu.RUnlock()
|
|
handlerID := generateHandlerID()
|
|
existing.mu.Lock()
|
|
existing.handlers[handlerID] = handler
|
|
existing.refCount++
|
|
existing.mu.Unlock()
|
|
return nil
|
|
}
|
|
m.mu.RUnlock()
|
|
|
|
// Create the underlying libp2p subscription without holding the manager lock
|
|
// to avoid re-entrant lock attempts
|
|
libp2pTopic, err := m.getOrCreateTopic(namespacedTopic)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get topic: %w", err)
|
|
}
|
|
|
|
// Subscribe to topic
|
|
sub, err := libp2pTopic.Subscribe()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to subscribe to topic: %w", err)
|
|
}
|
|
|
|
// Create cancellable context for this subscription
|
|
subCtx, cancel := context.WithCancel(context.Background())
|
|
|
|
// Create topic subscription with initial handler
|
|
handlerID := generateHandlerID()
|
|
newSub := &topicSubscription{
|
|
sub: sub,
|
|
cancel: cancel,
|
|
handlers: map[HandlerID]MessageHandler{handlerID: handler},
|
|
refCount: 1,
|
|
}
|
|
|
|
// Install the subscription (or merge if another goroutine beat us)
|
|
m.mu.Lock()
|
|
if existing := m.subscriptions[namespacedTopic]; existing != nil {
|
|
m.mu.Unlock()
|
|
// Another goroutine already created a subscription while we were working
|
|
// Clean up our resources and add to theirs
|
|
cancel()
|
|
sub.Cancel()
|
|
handlerID := generateHandlerID()
|
|
existing.mu.Lock()
|
|
existing.handlers[handlerID] = handler
|
|
existing.refCount++
|
|
existing.mu.Unlock()
|
|
return nil
|
|
}
|
|
m.subscriptions[namespacedTopic] = newSub
|
|
m.mu.Unlock()
|
|
|
|
// Announce topic interest to help with peer discovery
|
|
go m.announceTopicInterest(namespacedTopic)
|
|
|
|
// Start message handler goroutine (fan-out to all handlers)
|
|
go func(ts *topicSubscription) {
|
|
defer ts.sub.Cancel()
|
|
|
|
for {
|
|
select {
|
|
case <-subCtx.Done():
|
|
return
|
|
default:
|
|
msg, err := ts.sub.Next(subCtx)
|
|
if err != nil {
|
|
if subCtx.Err() != nil {
|
|
return // Context cancelled
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Filter out internal discovery messages
|
|
if string(msg.Data) == "PEER_DISCOVERY_PING" {
|
|
continue
|
|
}
|
|
|
|
// Broadcast to all handlers
|
|
ts.mu.RLock()
|
|
handlers := make([]MessageHandler, 0, len(ts.handlers))
|
|
for _, h := range ts.handlers {
|
|
handlers = append(handlers, h)
|
|
}
|
|
ts.mu.RUnlock()
|
|
|
|
// Call each handler (don't block on individual handler errors)
|
|
for _, h := range handlers {
|
|
if err := h(topic, msg.Data); err != nil {
|
|
// Log error but continue processing other handlers
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}(newSub)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Unsubscribe decrements the subscription refcount for a topic.
|
|
// The subscription is only truly cancelled when refcount reaches zero.
|
|
// This allows multiple subscribers to the same topic.
|
|
func (m *Manager) Unsubscribe(ctx context.Context, topic string) error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
// Determine namespace (allow per-call override via context)
|
|
ns := m.namespace
|
|
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
|
|
if s, ok := v.(string); ok && s != "" {
|
|
ns = s
|
|
}
|
|
}
|
|
namespacedTopic := fmt.Sprintf("%s.%s", ns, topic)
|
|
|
|
topicSub, exists := m.subscriptions[namespacedTopic]
|
|
if !exists {
|
|
return nil // Already unsubscribed
|
|
}
|
|
|
|
// Decrement ref count
|
|
topicSub.mu.Lock()
|
|
topicSub.refCount--
|
|
shouldCancel := topicSub.refCount <= 0
|
|
topicSub.mu.Unlock()
|
|
|
|
// Only cancel and remove if no more subscribers
|
|
if shouldCancel {
|
|
topicSub.cancel()
|
|
delete(m.subscriptions, namespacedTopic)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ListTopics returns all subscribed topics
|
|
func (m *Manager) ListTopics(ctx context.Context) ([]string, error) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
var topics []string
|
|
// Determine namespace (allow per-call override via context)
|
|
ns := m.namespace
|
|
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
|
|
if s, ok := v.(string); ok && s != "" {
|
|
ns = s
|
|
}
|
|
}
|
|
prefix := ns + "."
|
|
|
|
for topic := range m.subscriptions {
|
|
if len(topic) > len(prefix) && topic[:len(prefix)] == prefix {
|
|
originalTopic := topic[len(prefix):]
|
|
topics = append(topics, originalTopic)
|
|
}
|
|
}
|
|
|
|
return topics, nil
|
|
}
|
|
|
|
// Close closes all subscriptions and topics
|
|
func (m *Manager) Close() error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
// Cancel all subscriptions
|
|
for _, sub := range m.subscriptions {
|
|
sub.cancel()
|
|
}
|
|
m.subscriptions = make(map[string]*topicSubscription)
|
|
|
|
// Close all topics
|
|
for _, topic := range m.topics {
|
|
topic.Close()
|
|
}
|
|
m.topics = make(map[string]*pubsub.Topic)
|
|
|
|
return nil
|
|
}
|