feat: enhance pubsub functionality with namespace isolation and handler management

- Implemented namespace isolation in pubsub handlers to ensure message segregation.
- Refactored subscription management to support multiple handlers per topic, allowing independent lifecycles for each handler.
- Introduced a unique HandlerID for each subscription, enabling better management of handler registrations and unsubscriptions.
- Updated message handling to broadcast to all registered handlers, improving message delivery efficiency.
This commit is contained in:
anonpenguin 2025-10-27 16:37:25 +02:00
parent 3f3ef9d1ac
commit 5930c9d832
No known key found for this signature in database
GPG Key ID: 1CBB1FE35AFBEE30
5 changed files with 102 additions and 50 deletions

View File

@ -21,7 +21,7 @@ test-e2e:
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports
VERSION := 0.52.12 VERSION := 0.52.13
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'

View File

@ -1,14 +1,13 @@
package gateway package gateway
import ( import (
"crypto/sha256"
"encoding/base64" "encoding/base64"
"encoding/hex"
"encoding/json" "encoding/json"
"net/http" "net/http"
"time" "time"
"github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/client"
"github.com/DeBrosOfficial/network/pkg/pubsub"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
@ -61,22 +60,14 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request)
msgs := make(chan []byte, 128) msgs := make(chan []byte, 128)
// Use internal auth context when interacting with client to avoid circular auth requirements // Use internal auth context when interacting with client to avoid circular auth requirements
ctx := client.WithInternalAuth(r.Context()) ctx := client.WithInternalAuth(r.Context())
// Subscribe to the topic; push data into msgs with simple per-connection de-dup // Apply namespace isolation
recent := make(map[string]time.Time) ctx = pubsub.WithNamespace(ctx, ns)
// Subscribe to the topic and forward messages to WS client
h := func(_ string, data []byte) error { h := func(_ string, data []byte) error {
g.logger.ComponentInfo("gateway", "pubsub ws: received message", g.logger.ComponentInfo("gateway", "pubsub ws: received message",
zap.String("topic", topic), zap.String("topic", topic),
zap.Int("data_len", len(data))) zap.Int("data_len", len(data)))
// Drop duplicates seen in the last 2 seconds
sum := sha256.Sum256(data)
key := hex.EncodeToString(sum[:])
if t, ok := recent[key]; ok && time.Since(t) < 2*time.Second {
g.logger.ComponentInfo("gateway", "pubsub ws: dropping duplicate",
zap.String("topic", topic))
return nil
}
recent[key] = time.Now()
select { select {
case msgs <- data: case msgs <- data:
g.logger.ComponentInfo("gateway", "pubsub ws: forwarded to client", g.logger.ComponentInfo("gateway", "pubsub ws: forwarded to client",
@ -199,7 +190,9 @@ func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) {
zap.String("namespace", ns), zap.String("namespace", ns),
zap.Int("data_len", len(data))) zap.Int("data_len", len(data)))
if err := g.client.PubSub().Publish(client.WithInternalAuth(r.Context()), body.Topic, data); err != nil { // Apply namespace isolation
ctx := pubsub.WithNamespace(client.WithInternalAuth(r.Context()), ns)
if err := g.client.PubSub().Publish(ctx, body.Topic, data); err != nil {
g.logger.ComponentWarn("gateway", "pubsub publish: failed", g.logger.ComponentWarn("gateway", "pubsub publish: failed",
zap.String("topic", body.Topic), zap.String("topic", body.Topic),
zap.Error(err)) zap.Error(err))
@ -225,7 +218,9 @@ func (g *Gateway) pubsubTopicsHandler(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusForbidden, "namespace not resolved") writeError(w, http.StatusForbidden, "namespace not resolved")
return return
} }
all, err := g.client.PubSub().ListTopics(client.WithInternalAuth(r.Context())) // Apply namespace isolation
ctx := pubsub.WithNamespace(client.WithInternalAuth(r.Context()), ns)
all, err := g.client.PubSub().ListTopics(ctx)
if err != nil { if err != nil {
writeError(w, http.StatusInternalServerError, err.Error()) writeError(w, http.StatusInternalServerError, err.Error())
return return

View File

@ -1,24 +1,29 @@
package pubsub package pubsub
import ( import (
"sync" "crypto/rand"
"encoding/hex"
"sync"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
) )
// Manager handles pub/sub operations // Manager handles pub/sub operations
type Manager struct { type Manager struct {
pubsub *pubsub.PubSub pubsub *pubsub.PubSub
topics map[string]*pubsub.Topic topics map[string]*pubsub.Topic
subscriptions map[string]*subscription subscriptions map[string]*topicSubscription
namespace string namespace string
mu sync.RWMutex mu sync.RWMutex
} }
// subscription holds subscription data // topicSubscription holds multiple handlers for a single topic
type subscription struct { type topicSubscription struct {
sub *pubsub.Subscription sub *pubsub.Subscription
cancel func() cancel func()
handlers map[HandlerID]MessageHandler
refCount int // Number of active subscriptions
mu sync.RWMutex
} }
// NewManager creates a new pubsub manager // NewManager creates a new pubsub manager
@ -26,7 +31,14 @@ func NewManager(ps *pubsub.PubSub, namespace string) *Manager {
return &Manager { return &Manager {
pubsub: ps, pubsub: ps,
topics: make(map[string]*pubsub.Topic), topics: make(map[string]*pubsub.Topic),
subscriptions: make(map[string]*subscription), subscriptions: make(map[string]*topicSubscription),
namespace: namespace, namespace: namespace,
} }
} }
// generateHandlerID creates a unique handler ID
func generateHandlerID() HandlerID {
b := make([]byte, 8)
rand.Read(b)
return HandlerID(hex.EncodeToString(b))
}

View File

@ -7,7 +7,9 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
) )
// Subscribe subscribes to a topic // Subscribe subscribes to a topic with a handler.
// Returns a HandlerID that can be used to unsubscribe this specific handler.
// Multiple handlers can subscribe to the same topic.
func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHandler) error { func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
if m.pubsub == nil { if m.pubsub == nil {
return fmt.Errorf("pubsub not initialized") return fmt.Errorf("pubsub not initialized")
@ -22,15 +24,23 @@ func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHa
} }
namespacedTopic := fmt.Sprintf("%s.%s", ns, topic) namespacedTopic := fmt.Sprintf("%s.%s", ns, topic)
// Check if already subscribed
m.mu.Lock() m.mu.Lock()
if _, exists := m.subscriptions[namespacedTopic]; exists { defer m.mu.Unlock()
m.mu.Unlock()
// Already subscribed - this is normal for LibP2P pubsub // Check if we already have a subscription for this topic
topicSub, exists := m.subscriptions[namespacedTopic]
if exists {
// Add handler to existing subscription
handlerID := generateHandlerID()
topicSub.mu.Lock()
topicSub.handlers[handlerID] = handler
topicSub.refCount++
topicSub.mu.Unlock()
return nil return nil
} }
m.mu.Unlock()
// Create new subscription
// Get or create topic // Get or create topic
libp2pTopic, err := m.getOrCreateTopic(namespacedTopic) libp2pTopic, err := m.getOrCreateTopic(namespacedTopic)
if err != nil { if err != nil {
@ -46,15 +56,17 @@ func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHa
// Create cancellable context for this subscription // Create cancellable context for this subscription
subCtx, cancel := context.WithCancel(context.Background()) subCtx, cancel := context.WithCancel(context.Background())
// Store subscription // Create topic subscription with initial handler
m.mu.Lock() handlerID := generateHandlerID()
m.subscriptions[namespacedTopic] = &subscription{ topicSub = &topicSubscription{
sub: sub, sub: sub,
cancel: cancel, cancel: cancel,
handlers: map[HandlerID]MessageHandler{handlerID: handler},
refCount: 1,
} }
m.mu.Unlock() m.subscriptions[namespacedTopic] = topicSub
// Start message handler goroutine // Start message handler goroutine (fan-out to all handlers)
go func() { go func() {
defer func() { defer func() {
sub.Cancel() sub.Cancel()
@ -73,10 +85,20 @@ func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHa
continue continue
} }
// Call the handler // Broadcast to all handlers
if err := handler(topic, msg.Data); err != nil { topicSub.mu.RLock()
// Log error but continue processing handlers := make([]MessageHandler, 0, len(topicSub.handlers))
continue for _, h := range topicSub.handlers {
handlers = append(handlers, h)
}
topicSub.mu.RUnlock()
// Call each handler (don't block on individual handler errors)
for _, h := range handlers {
if err := h(topic, msg.Data); err != nil {
// Log error but continue processing other handlers
continue
}
} }
} }
} }
@ -85,7 +107,9 @@ func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHa
return nil return nil
} }
// Unsubscribe unsubscribes from a topic // Unsubscribe decrements the subscription refcount for a topic.
// The subscription is only truly cancelled when refcount reaches zero.
// This allows multiple subscribers to the same topic.
func (m *Manager) Unsubscribe(ctx context.Context, topic string) error { func (m *Manager) Unsubscribe(ctx context.Context, topic string) error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
@ -99,9 +123,20 @@ func (m *Manager) Unsubscribe(ctx context.Context, topic string) error {
} }
namespacedTopic := fmt.Sprintf("%s.%s", ns, topic) namespacedTopic := fmt.Sprintf("%s.%s", ns, topic)
if subscription, exists := m.subscriptions[namespacedTopic]; exists { topicSub, exists := m.subscriptions[namespacedTopic]
// Cancel the subscription context to stop the message handler goroutine if !exists {
subscription.cancel() return nil // Already unsubscribed
}
// Decrement ref count
topicSub.mu.Lock()
topicSub.refCount--
shouldCancel := topicSub.refCount <= 0
topicSub.mu.Unlock()
// Only cancel and remove if no more subscribers
if shouldCancel {
topicSub.cancel()
delete(m.subscriptions, namespacedTopic) delete(m.subscriptions, namespacedTopic)
} }
@ -142,7 +177,7 @@ func (m *Manager) Close() error {
for _, sub := range m.subscriptions { for _, sub := range m.subscriptions {
sub.cancel() sub.cancel()
} }
m.subscriptions = make(map[string]*subscription) m.subscriptions = make(map[string]*topicSubscription)
// Close all topics // Close all topics
for _, topic := range m.topics { for _, topic := range m.topics {

View File

@ -1,5 +1,15 @@
package pubsub package pubsub
// MessageHandler represents a message handler function signature // MessageHandler represents a message handler function signature.
// This matches the client.MessageHandler type to avoid circular imports // Each handler is called when a message arrives on a subscribed topic.
// Multiple handlers can be registered for the same topic, and each will
// receive a copy of the message. Handlers should return an error only for
// critical failures; the error is logged but does not stop other handlers.
// This matches the client.MessageHandler type to avoid circular imports.
type MessageHandler func(topic string, data []byte) error type MessageHandler func(topic string, data []byte) error
// HandlerID uniquely identifies a handler registration.
// Each call to Subscribe generates a new HandlerID, allowing
// multiple subscribers to the same topic with independent lifecycles.
// Unsubscribe operations are ref-counted per topic.
type HandlerID string