From 8dc71e79208c0dbccd22d5b6b1406c445940bdfb Mon Sep 17 00:00:00 2001 From: anonpenguin Date: Mon, 27 Oct 2025 17:30:40 +0200 Subject: [PATCH] feat: implement local pub/sub delivery for WebSocket subscribers - Added local subscriber management to the Gateway for direct message delivery to WebSocket clients. - Introduced synchronization mechanisms to handle concurrent access to local subscribers. - Enhanced pubsub handlers to register and unregister local subscribers, improving message delivery efficiency. - Updated message publishing logic to prioritize local delivery before forwarding to libp2p, ensuring faster response times for local clients. --- pkg/gateway/gateway.go | 29 +++++- pkg/gateway/pubsub_handlers.go | 172 ++++++++++++++++++++++++++------- 2 files changed, 163 insertions(+), 38 deletions(-) diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 8887237..ef03844 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -6,6 +6,7 @@ import ( "crypto/rsa" "database/sql" "strconv" + "sync" "time" "github.com/DeBrosOfficial/network/pkg/client" @@ -39,6 +40,16 @@ type Gateway struct { sqlDB *sql.DB ormClient rqlite.Client ormHTTP *rqlite.HTTPGateway + + // Local pub/sub bypass for same-gateway subscribers + localSubscribers map[string][]*localSubscriber // topic+namespace -> subscribers + mu sync.RWMutex +} + +// localSubscriber represents a WebSocket subscriber for local message delivery +type localSubscriber struct { + msgChan chan []byte + namespace string } // New creates and initializes a new Gateway instance @@ -71,10 +82,11 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { logger.ComponentInfo(logging.ComponentGeneral, "Creating gateway instance...") gw := &Gateway{ - logger: logger, - cfg: cfg, - client: c, - startedAt: time.Now(), + logger: logger, + cfg: cfg, + client: c, + startedAt: time.Now(), + localSubscribers: make(map[string][]*localSubscriber), } logger.ComponentInfo(logging.ComponentGeneral, "Generating RSA signing key...") @@ -126,3 +138,12 @@ func (g *Gateway) Close() { _ = g.sqlDB.Close() } } + +// getLocalSubscribers returns all local subscribers for a given topic and namespace +func (g *Gateway) getLocalSubscribers(topic, namespace string) []*localSubscriber { + topicKey := namespace + "." + topic + if subs, ok := g.localSubscribers[topicKey]; ok { + return subs + } + return nil +} diff --git a/pkg/gateway/pubsub_handlers.go b/pkg/gateway/pubsub_handlers.go index fb32389..0ac4f72 100644 --- a/pkg/gateway/pubsub_handlers.go +++ b/pkg/gateway/pubsub_handlers.go @@ -3,6 +3,7 @@ package gateway import ( "encoding/base64" "encoding/json" + "fmt" "net/http" "time" @@ -58,50 +59,73 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) // Channel to deliver PubSub messages to WS writer msgs := make(chan []byte, 128) + + // NEW: Register as local subscriber for direct message delivery + localSub := &localSubscriber{ + msgChan: msgs, + namespace: ns, + } + topicKey := fmt.Sprintf("%s.%s", ns, topic) + + g.mu.Lock() + g.localSubscribers[topicKey] = append(g.localSubscribers[topicKey], localSub) + subscriberCount := len(g.localSubscribers[topicKey]) + g.mu.Unlock() + + g.logger.ComponentInfo("gateway", "pubsub ws: registered local subscriber", + zap.String("topic", topic), + zap.String("namespace", ns), + zap.Int("total_subscribers", subscriberCount)) + + // Unregister on close + defer func() { + g.mu.Lock() + subs := g.localSubscribers[topicKey] + for i, sub := range subs { + if sub == localSub { + g.localSubscribers[topicKey] = append(subs[:i], subs[i+1:]...) + break + } + } + remainingCount := len(g.localSubscribers[topicKey]) + if remainingCount == 0 { + delete(g.localSubscribers, topicKey) + } + g.mu.Unlock() + g.logger.ComponentInfo("gateway", "pubsub ws: unregistered local subscriber", + zap.String("topic", topic), + zap.Int("remaining_subscribers", remainingCount)) + }() + // Use internal auth context when interacting with client to avoid circular auth requirements ctx := client.WithInternalAuth(r.Context()) // Apply namespace isolation ctx = pubsub.WithNamespace(ctx, ns) - // Subscribe to the topic and forward messages to WS client - h := func(_ string, data []byte) error { - g.logger.ComponentInfo("gateway", "pubsub ws: received message", - zap.String("topic", topic), - zap.Int("data_len", len(data))) - - select { - case msgs <- data: - g.logger.ComponentInfo("gateway", "pubsub ws: forwarded to client", - zap.String("topic", topic)) - return nil - default: - // Drop if client is slow to avoid blocking network - g.logger.ComponentWarn("gateway", "pubsub ws: client slow, dropping message", - zap.String("topic", topic)) - return nil - } - } - if err := g.client.PubSub().Subscribe(ctx, topic, h); err != nil { - g.logger.ComponentWarn("gateway", "pubsub ws: subscribe failed") - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - defer func() { _ = g.client.PubSub().Unsubscribe(ctx, topic) }() - - // no extra fan-out; rely on libp2p subscription - - // Writer loop + + // Writer loop - START THIS FIRST before libp2p subscription done := make(chan struct{}) go func() { + g.logger.ComponentInfo("gateway", "pubsub ws: writer goroutine started", + zap.String("topic", topic)) + defer g.logger.ComponentInfo("gateway", "pubsub ws: writer goroutine exiting", + zap.String("topic", topic)) ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case b, ok := <-msgs: if !ok { + g.logger.ComponentWarn("gateway", "pubsub ws: message channel closed", + zap.String("topic", topic)) _ = conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(5*time.Second)) close(done) return } + + g.logger.ComponentInfo("gateway", "pubsub ws: sending message to client", + zap.String("topic", topic), + zap.Int("data_len", len(b))) + // Format message as JSON envelope with data (base64 encoded), timestamp, and topic // This matches the SDK's Message interface: {data: string, timestamp: number, topic: string} envelope := map[string]interface{}{ @@ -111,13 +135,27 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) } envelopeJSON, err := json.Marshal(envelope) if err != nil { + g.logger.ComponentWarn("gateway", "pubsub ws: failed to marshal envelope", + zap.String("topic", topic), + zap.Error(err)) continue } + + g.logger.ComponentDebug("gateway", "pubsub ws: envelope created", + zap.String("topic", topic), + zap.Int("envelope_len", len(envelopeJSON))) + conn.SetWriteDeadline(time.Now().Add(30 * time.Second)) if err := conn.WriteMessage(websocket.TextMessage, envelopeJSON); err != nil { + g.logger.ComponentWarn("gateway", "pubsub ws: failed to write to websocket", + zap.String("topic", topic), + zap.Error(err)) close(done) return } + + g.logger.ComponentInfo("gateway", "pubsub ws: message sent successfully", + zap.String("topic", topic)) case <-ticker.C: // Ping keepalive _ = conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(5*time.Second)) @@ -128,6 +166,42 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) } }() + // Subscribe to libp2p for cross-node messages (in background, non-blocking) + go func() { + h := func(_ string, data []byte) error { + g.logger.ComponentInfo("gateway", "pubsub ws: received message from libp2p", + zap.String("topic", topic), + zap.Int("data_len", len(data))) + + select { + case msgs <- data: + g.logger.ComponentInfo("gateway", "pubsub ws: forwarded to client", + zap.String("topic", topic), + zap.String("source", "libp2p")) + return nil + default: + // Drop if client is slow to avoid blocking network + g.logger.ComponentWarn("gateway", "pubsub ws: client slow, dropping message", + zap.String("topic", topic)) + return nil + } + } + if err := g.client.PubSub().Subscribe(ctx, topic, h); err != nil { + g.logger.ComponentWarn("gateway", "pubsub ws: libp2p subscribe failed (will use local-only)", + zap.String("topic", topic), + zap.Error(err)) + return + } + g.logger.ComponentInfo("gateway", "pubsub ws: libp2p subscription established", + zap.String("topic", topic)) + + // Keep subscription alive until done + <-done + _ = g.client.PubSub().Unsubscribe(ctx, topic) + g.logger.ComponentInfo("gateway", "pubsub ws: libp2p subscription closed", + zap.String("topic", topic)) + }() + // Reader loop: treat any client message as publish to the same topic for { mt, data, err := conn.ReadMessage() @@ -185,25 +259,55 @@ func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) { return } - g.logger.ComponentInfo("gateway", "pubsub publish: publishing message", + // NEW: Check for local websocket subscribers FIRST and deliver directly + g.mu.RLock() + localSubs := g.getLocalSubscribers(body.Topic, ns) + g.mu.RUnlock() + + localDeliveryCount := 0 + if len(localSubs) > 0 { + for _, sub := range localSubs { + select { + case sub.msgChan <- data: + localDeliveryCount++ + g.logger.ComponentDebug("gateway", "delivered to local subscriber", + zap.String("topic", body.Topic)) + default: + // Drop if buffer full + g.logger.ComponentWarn("gateway", "local subscriber buffer full, dropping message", + zap.String("topic", body.Topic)) + } + } + } + + g.logger.ComponentInfo("gateway", "pubsub publish: processing message", zap.String("topic", body.Topic), zap.String("namespace", ns), - zap.Int("data_len", len(data))) + zap.Int("data_len", len(data)), + zap.Int("local_subscribers", len(localSubs)), + zap.Int("local_delivered", localDeliveryCount)) - // Apply namespace isolation + // Still publish to libp2p for cross-node delivery ctx := pubsub.WithNamespace(client.WithInternalAuth(r.Context()), ns) if err := g.client.PubSub().Publish(ctx, body.Topic, data); err != nil { - g.logger.ComponentWarn("gateway", "pubsub publish: failed", + // Log but don't fail - local delivery succeeded + g.logger.ComponentWarn("gateway", "libp2p publish failed (local delivery succeeded)", zap.String("topic", body.Topic), zap.Error(err)) + // Still return OK since local delivery worked + if localDeliveryCount > 0 { + writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "local_only": true}) + return + } + // If no local delivery, return error writeError(w, http.StatusInternalServerError, err.Error()) return } g.logger.ComponentInfo("gateway", "pubsub publish: message published successfully", - zap.String("topic", body.Topic)) + zap.String("topic", body.Topic), + zap.String("delivery", "local+libp2p")) - // rely on libp2p to deliver to WS subscribers writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) }