feat: implement local pub/sub delivery for WebSocket subscribers

- Added local subscriber management to the Gateway for direct message delivery to WebSocket clients.
- Introduced synchronization mechanisms to handle concurrent access to local subscribers.
- Enhanced pubsub handlers to register and unregister local subscribers, improving message delivery efficiency.
- Updated message publishing logic to prioritize local delivery before forwarding to libp2p, ensuring faster response times for local clients.
This commit is contained in:
anonpenguin 2025-10-27 17:30:40 +02:00
parent 5930c9d832
commit 8dc71e7920
2 changed files with 163 additions and 38 deletions

View File

@ -6,6 +6,7 @@ import (
"crypto/rsa"
"database/sql"
"strconv"
"sync"
"time"
"github.com/DeBrosOfficial/network/pkg/client"
@ -39,6 +40,16 @@ type Gateway struct {
sqlDB *sql.DB
ormClient rqlite.Client
ormHTTP *rqlite.HTTPGateway
// Local pub/sub bypass for same-gateway subscribers
localSubscribers map[string][]*localSubscriber // topic+namespace -> subscribers
mu sync.RWMutex
}
// localSubscriber represents a WebSocket subscriber for local message delivery
type localSubscriber struct {
msgChan chan []byte
namespace string
}
// New creates and initializes a new Gateway instance
@ -75,6 +86,7 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) {
cfg: cfg,
client: c,
startedAt: time.Now(),
localSubscribers: make(map[string][]*localSubscriber),
}
logger.ComponentInfo(logging.ComponentGeneral, "Generating RSA signing key...")
@ -126,3 +138,12 @@ func (g *Gateway) Close() {
_ = g.sqlDB.Close()
}
}
// getLocalSubscribers returns all local subscribers for a given topic and namespace
func (g *Gateway) getLocalSubscribers(topic, namespace string) []*localSubscriber {
topicKey := namespace + "." + topic
if subs, ok := g.localSubscribers[topicKey]; ok {
return subs
}
return nil
}

View File

@ -3,6 +3,7 @@ package gateway
import (
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"time"
@ -58,50 +59,73 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request)
// Channel to deliver PubSub messages to WS writer
msgs := make(chan []byte, 128)
// NEW: Register as local subscriber for direct message delivery
localSub := &localSubscriber{
msgChan: msgs,
namespace: ns,
}
topicKey := fmt.Sprintf("%s.%s", ns, topic)
g.mu.Lock()
g.localSubscribers[topicKey] = append(g.localSubscribers[topicKey], localSub)
subscriberCount := len(g.localSubscribers[topicKey])
g.mu.Unlock()
g.logger.ComponentInfo("gateway", "pubsub ws: registered local subscriber",
zap.String("topic", topic),
zap.String("namespace", ns),
zap.Int("total_subscribers", subscriberCount))
// Unregister on close
defer func() {
g.mu.Lock()
subs := g.localSubscribers[topicKey]
for i, sub := range subs {
if sub == localSub {
g.localSubscribers[topicKey] = append(subs[:i], subs[i+1:]...)
break
}
}
remainingCount := len(g.localSubscribers[topicKey])
if remainingCount == 0 {
delete(g.localSubscribers, topicKey)
}
g.mu.Unlock()
g.logger.ComponentInfo("gateway", "pubsub ws: unregistered local subscriber",
zap.String("topic", topic),
zap.Int("remaining_subscribers", remainingCount))
}()
// Use internal auth context when interacting with client to avoid circular auth requirements
ctx := client.WithInternalAuth(r.Context())
// Apply namespace isolation
ctx = pubsub.WithNamespace(ctx, ns)
// Subscribe to the topic and forward messages to WS client
h := func(_ string, data []byte) error {
g.logger.ComponentInfo("gateway", "pubsub ws: received message",
zap.String("topic", topic),
zap.Int("data_len", len(data)))
select {
case msgs <- data:
g.logger.ComponentInfo("gateway", "pubsub ws: forwarded to client",
zap.String("topic", topic))
return nil
default:
// Drop if client is slow to avoid blocking network
g.logger.ComponentWarn("gateway", "pubsub ws: client slow, dropping message",
zap.String("topic", topic))
return nil
}
}
if err := g.client.PubSub().Subscribe(ctx, topic, h); err != nil {
g.logger.ComponentWarn("gateway", "pubsub ws: subscribe failed")
writeError(w, http.StatusInternalServerError, err.Error())
return
}
defer func() { _ = g.client.PubSub().Unsubscribe(ctx, topic) }()
// no extra fan-out; rely on libp2p subscription
// Writer loop
// Writer loop - START THIS FIRST before libp2p subscription
done := make(chan struct{})
go func() {
g.logger.ComponentInfo("gateway", "pubsub ws: writer goroutine started",
zap.String("topic", topic))
defer g.logger.ComponentInfo("gateway", "pubsub ws: writer goroutine exiting",
zap.String("topic", topic))
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case b, ok := <-msgs:
if !ok {
g.logger.ComponentWarn("gateway", "pubsub ws: message channel closed",
zap.String("topic", topic))
_ = conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(5*time.Second))
close(done)
return
}
g.logger.ComponentInfo("gateway", "pubsub ws: sending message to client",
zap.String("topic", topic),
zap.Int("data_len", len(b)))
// Format message as JSON envelope with data (base64 encoded), timestamp, and topic
// This matches the SDK's Message interface: {data: string, timestamp: number, topic: string}
envelope := map[string]interface{}{
@ -111,13 +135,27 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request)
}
envelopeJSON, err := json.Marshal(envelope)
if err != nil {
g.logger.ComponentWarn("gateway", "pubsub ws: failed to marshal envelope",
zap.String("topic", topic),
zap.Error(err))
continue
}
g.logger.ComponentDebug("gateway", "pubsub ws: envelope created",
zap.String("topic", topic),
zap.Int("envelope_len", len(envelopeJSON)))
conn.SetWriteDeadline(time.Now().Add(30 * time.Second))
if err := conn.WriteMessage(websocket.TextMessage, envelopeJSON); err != nil {
g.logger.ComponentWarn("gateway", "pubsub ws: failed to write to websocket",
zap.String("topic", topic),
zap.Error(err))
close(done)
return
}
g.logger.ComponentInfo("gateway", "pubsub ws: message sent successfully",
zap.String("topic", topic))
case <-ticker.C:
// Ping keepalive
_ = conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(5*time.Second))
@ -128,6 +166,42 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request)
}
}()
// Subscribe to libp2p for cross-node messages (in background, non-blocking)
go func() {
h := func(_ string, data []byte) error {
g.logger.ComponentInfo("gateway", "pubsub ws: received message from libp2p",
zap.String("topic", topic),
zap.Int("data_len", len(data)))
select {
case msgs <- data:
g.logger.ComponentInfo("gateway", "pubsub ws: forwarded to client",
zap.String("topic", topic),
zap.String("source", "libp2p"))
return nil
default:
// Drop if client is slow to avoid blocking network
g.logger.ComponentWarn("gateway", "pubsub ws: client slow, dropping message",
zap.String("topic", topic))
return nil
}
}
if err := g.client.PubSub().Subscribe(ctx, topic, h); err != nil {
g.logger.ComponentWarn("gateway", "pubsub ws: libp2p subscribe failed (will use local-only)",
zap.String("topic", topic),
zap.Error(err))
return
}
g.logger.ComponentInfo("gateway", "pubsub ws: libp2p subscription established",
zap.String("topic", topic))
// Keep subscription alive until done
<-done
_ = g.client.PubSub().Unsubscribe(ctx, topic)
g.logger.ComponentInfo("gateway", "pubsub ws: libp2p subscription closed",
zap.String("topic", topic))
}()
// Reader loop: treat any client message as publish to the same topic
for {
mt, data, err := conn.ReadMessage()
@ -185,25 +259,55 @@ func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) {
return
}
g.logger.ComponentInfo("gateway", "pubsub publish: publishing message",
// NEW: Check for local websocket subscribers FIRST and deliver directly
g.mu.RLock()
localSubs := g.getLocalSubscribers(body.Topic, ns)
g.mu.RUnlock()
localDeliveryCount := 0
if len(localSubs) > 0 {
for _, sub := range localSubs {
select {
case sub.msgChan <- data:
localDeliveryCount++
g.logger.ComponentDebug("gateway", "delivered to local subscriber",
zap.String("topic", body.Topic))
default:
// Drop if buffer full
g.logger.ComponentWarn("gateway", "local subscriber buffer full, dropping message",
zap.String("topic", body.Topic))
}
}
}
g.logger.ComponentInfo("gateway", "pubsub publish: processing message",
zap.String("topic", body.Topic),
zap.String("namespace", ns),
zap.Int("data_len", len(data)))
zap.Int("data_len", len(data)),
zap.Int("local_subscribers", len(localSubs)),
zap.Int("local_delivered", localDeliveryCount))
// Apply namespace isolation
// Still publish to libp2p for cross-node delivery
ctx := pubsub.WithNamespace(client.WithInternalAuth(r.Context()), ns)
if err := g.client.PubSub().Publish(ctx, body.Topic, data); err != nil {
g.logger.ComponentWarn("gateway", "pubsub publish: failed",
// Log but don't fail - local delivery succeeded
g.logger.ComponentWarn("gateway", "libp2p publish failed (local delivery succeeded)",
zap.String("topic", body.Topic),
zap.Error(err))
// Still return OK since local delivery worked
if localDeliveryCount > 0 {
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "local_only": true})
return
}
// If no local delivery, return error
writeError(w, http.StatusInternalServerError, err.Error())
return
}
g.logger.ComponentInfo("gateway", "pubsub publish: message published successfully",
zap.String("topic", body.Topic))
zap.String("topic", body.Topic),
zap.String("delivery", "local+libp2p"))
// rely on libp2p to deliver to WS subscribers
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
}