network/pkg/gateway/pubsub_handlers.go
anonpenguin23 b983066016 refactor: rename DeBros to Orama and update configuration paths
- Replaced all instances of DeBros with Orama throughout the codebase, including CLI commands and configuration paths.
- Updated documentation to reflect the new naming convention and paths for configuration files.
- Removed the outdated PRODUCTION_INSTALL.md file and added new scripts for local domain setup and testing.
- Introduced a new interactive TUI installer for Orama Network, enhancing the installation experience.
- Improved logging and error handling across various components to provide clearer feedback during operations.
2025-11-26 15:36:11 +02:00

352 lines
11 KiB
Go

package gateway
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/DeBrosOfficial/network/pkg/client"
"github.com/DeBrosOfficial/network/pkg/pubsub"
"go.uber.org/zap"
"github.com/gorilla/websocket"
)
var wsUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// For early development we accept any origin; tighten later.
CheckOrigin: func(r *http.Request) bool { return true },
}
// pubsubWebsocketHandler upgrades to WS, subscribes to a namespaced topic, and
// forwards received PubSub messages to the client. Messages sent by the client
// are published to the same namespaced topic.
func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
g.logger.ComponentWarn("gateway", "pubsub ws: client not initialized")
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodGet {
g.logger.ComponentWarn("gateway", "pubsub ws: method not allowed")
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
// Resolve namespace from auth context
ns := resolveNamespaceFromRequest(r)
if ns == "" {
g.logger.ComponentWarn("gateway", "pubsub ws: namespace not resolved")
writeError(w, http.StatusForbidden, "namespace not resolved")
return
}
topic := r.URL.Query().Get("topic")
if topic == "" {
g.logger.ComponentWarn("gateway", "pubsub ws: missing topic")
writeError(w, http.StatusBadRequest, "missing 'topic'")
return
}
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
g.logger.ComponentWarn("gateway", "pubsub ws: upgrade failed")
return
}
defer conn.Close()
// Channel to deliver PubSub messages to WS writer
msgs := make(chan []byte, 128)
// NEW: Register as local subscriber for direct message delivery
localSub := &localSubscriber{
msgChan: msgs,
namespace: ns,
}
topicKey := fmt.Sprintf("%s.%s", ns, topic)
g.mu.Lock()
g.localSubscribers[topicKey] = append(g.localSubscribers[topicKey], localSub)
subscriberCount := len(g.localSubscribers[topicKey])
g.mu.Unlock()
g.logger.ComponentInfo("gateway", "pubsub ws: registered local subscriber",
zap.String("topic", topic),
zap.String("namespace", ns),
zap.Int("total_subscribers", subscriberCount))
// Unregister on close
defer func() {
g.mu.Lock()
subs := g.localSubscribers[topicKey]
for i, sub := range subs {
if sub == localSub {
g.localSubscribers[topicKey] = append(subs[:i], subs[i+1:]...)
break
}
}
remainingCount := len(g.localSubscribers[topicKey])
if remainingCount == 0 {
delete(g.localSubscribers, topicKey)
}
g.mu.Unlock()
g.logger.ComponentInfo("gateway", "pubsub ws: unregistered local subscriber",
zap.String("topic", topic),
zap.Int("remaining_subscribers", remainingCount))
}()
// Use internal auth context when interacting with client to avoid circular auth requirements
ctx := client.WithInternalAuth(r.Context())
// Apply namespace isolation
ctx = pubsub.WithNamespace(ctx, ns)
// Writer loop - START THIS FIRST before libp2p subscription
done := make(chan struct{})
go func() {
g.logger.ComponentInfo("gateway", "pubsub ws: writer goroutine started",
zap.String("topic", topic))
defer g.logger.ComponentInfo("gateway", "pubsub ws: writer goroutine exiting",
zap.String("topic", topic))
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case b, ok := <-msgs:
if !ok {
g.logger.ComponentWarn("gateway", "pubsub ws: message channel closed",
zap.String("topic", topic))
_ = conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(5*time.Second))
close(done)
return
}
g.logger.ComponentInfo("gateway", "pubsub ws: sending message to client",
zap.String("topic", topic),
zap.Int("data_len", len(b)))
// Format message as JSON envelope with data (base64 encoded), timestamp, and topic
// This matches the SDK's Message interface: {data: string, timestamp: number, topic: string}
envelope := map[string]interface{}{
"data": base64.StdEncoding.EncodeToString(b),
"timestamp": time.Now().UnixMilli(),
"topic": topic,
}
envelopeJSON, err := json.Marshal(envelope)
if err != nil {
g.logger.ComponentWarn("gateway", "pubsub ws: failed to marshal envelope",
zap.String("topic", topic),
zap.Error(err))
continue
}
g.logger.ComponentDebug("gateway", "pubsub ws: envelope created",
zap.String("topic", topic),
zap.Int("envelope_len", len(envelopeJSON)))
conn.SetWriteDeadline(time.Now().Add(30 * time.Second))
if err := conn.WriteMessage(websocket.TextMessage, envelopeJSON); err != nil {
g.logger.ComponentWarn("gateway", "pubsub ws: failed to write to websocket",
zap.String("topic", topic),
zap.Error(err))
close(done)
return
}
g.logger.ComponentInfo("gateway", "pubsub ws: message sent successfully",
zap.String("topic", topic))
case <-ticker.C:
// Ping keepalive
_ = conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(5*time.Second))
case <-ctx.Done():
close(done)
return
}
}
}()
// Subscribe to libp2p for cross-node messages (in background, non-blocking)
go func() {
h := func(_ string, data []byte) error {
g.logger.ComponentInfo("gateway", "pubsub ws: received message from libp2p",
zap.String("topic", topic),
zap.Int("data_len", len(data)))
select {
case msgs <- data:
g.logger.ComponentInfo("gateway", "pubsub ws: forwarded to client",
zap.String("topic", topic),
zap.String("source", "libp2p"))
return nil
default:
// Drop if client is slow to avoid blocking network
g.logger.ComponentWarn("gateway", "pubsub ws: client slow, dropping message",
zap.String("topic", topic))
return nil
}
}
if err := g.client.PubSub().Subscribe(ctx, topic, h); err != nil {
g.logger.ComponentWarn("gateway", "pubsub ws: libp2p subscribe failed (will use local-only)",
zap.String("topic", topic),
zap.Error(err))
return
}
g.logger.ComponentInfo("gateway", "pubsub ws: libp2p subscription established",
zap.String("topic", topic))
// Keep subscription alive until done
<-done
_ = g.client.PubSub().Unsubscribe(ctx, topic)
g.logger.ComponentInfo("gateway", "pubsub ws: libp2p subscription closed",
zap.String("topic", topic))
}()
// Reader loop: treat any client message as publish to the same topic
for {
mt, data, err := conn.ReadMessage()
if err != nil {
break
}
if mt != websocket.TextMessage && mt != websocket.BinaryMessage {
continue
}
// Filter out WebSocket heartbeat messages
// Don't publish them to the topic
var msg map[string]interface{}
if err := json.Unmarshal(data, &msg); err == nil {
if msgType, ok := msg["type"].(string); ok && msgType == "ping" {
g.logger.ComponentInfo("gateway", "pubsub ws: filtering out heartbeat ping")
continue
}
}
if err := g.client.PubSub().Publish(ctx, topic, data); err != nil {
// Best-effort notify client
_ = conn.WriteMessage(websocket.TextMessage, []byte("publish_error"))
}
}
<-done
}
// pubsubPublishHandler handles POST /v1/pubsub/publish {topic, data_base64}
func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
ns := resolveNamespaceFromRequest(r)
if ns == "" {
writeError(w, http.StatusForbidden, "namespace not resolved")
return
}
var body struct {
Topic string `json:"topic"`
DataB64 string `json:"data_base64"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Topic == "" || body.DataB64 == "" {
writeError(w, http.StatusBadRequest, "invalid body: expected {topic,data_base64}")
return
}
data, err := base64.StdEncoding.DecodeString(body.DataB64)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid base64 data")
return
}
// NEW: Check for local websocket subscribers FIRST and deliver directly
g.mu.RLock()
localSubs := g.getLocalSubscribers(body.Topic, ns)
g.mu.RUnlock()
localDeliveryCount := 0
if len(localSubs) > 0 {
for _, sub := range localSubs {
select {
case sub.msgChan <- data:
localDeliveryCount++
g.logger.ComponentDebug("gateway", "delivered to local subscriber",
zap.String("topic", body.Topic))
default:
// Drop if buffer full
g.logger.ComponentWarn("gateway", "local subscriber buffer full, dropping message",
zap.String("topic", body.Topic))
}
}
}
g.logger.ComponentInfo("gateway", "pubsub publish: processing message",
zap.String("topic", body.Topic),
zap.String("namespace", ns),
zap.Int("data_len", len(data)),
zap.Int("local_subscribers", len(localSubs)),
zap.Int("local_delivered", localDeliveryCount))
// Publish to libp2p asynchronously for cross-node delivery
// This prevents blocking the HTTP response if libp2p network is slow
go func() {
publishCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ctx := pubsub.WithNamespace(client.WithInternalAuth(publishCtx), ns)
if err := g.client.PubSub().Publish(ctx, body.Topic, data); err != nil {
g.logger.ComponentWarn("gateway", "async libp2p publish failed",
zap.String("topic", body.Topic),
zap.Error(err))
} else {
g.logger.ComponentDebug("gateway", "async libp2p publish succeeded",
zap.String("topic", body.Topic))
}
}()
// Return immediately after local delivery
// Local WebSocket subscribers already received the message
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
}
// pubsubTopicsHandler lists topics within the caller's namespace
func (g *Gateway) pubsubTopicsHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
ns := resolveNamespaceFromRequest(r)
if ns == "" {
writeError(w, http.StatusForbidden, "namespace not resolved")
return
}
// Apply namespace isolation
ctx := pubsub.WithNamespace(client.WithInternalAuth(r.Context()), ns)
all, err := g.client.PubSub().ListTopics(ctx)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
// Client returns topics already trimmed to its namespace; return as-is
writeJSON(w, http.StatusOK, map[string]any{"topics": all})
}
// resolveNamespaceFromRequest gets namespace from context set by auth middleware
func resolveNamespaceFromRequest(r *http.Request) string {
if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok {
return s
}
}
return ""
}
func namespacePrefix(ns string) string {
return "ns::" + ns + "::"
}
func namespacedTopic(ns, topic string) string {
return namespacePrefix(ns) + topic
}