package gateway import ( "encoding/base64" "encoding/json" "net/http" "time" "git.debros.io/DeBros/network/pkg/storage" "github.com/gorilla/websocket" ) var wsUpgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, // For early development we accept any origin; tighten later. CheckOrigin: func(r *http.Request) bool { return true }, } // pubsubWebsocketHandler upgrades to WS, subscribes to a namespaced topic, and // forwards received PubSub messages to the client. Messages sent by the client // are published to the same namespaced topic. func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) { if g.client == nil { writeError(w, http.StatusServiceUnavailable, "client not initialized") return } if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } // Resolve namespace from auth context ns := resolveNamespaceFromRequest(r) if ns == "" { writeError(w, http.StatusForbidden, "namespace not resolved") return } topic := r.URL.Query().Get("topic") if topic == "" { writeError(w, http.StatusBadRequest, "missing 'topic'") return } fullTopic := namespacedTopic(ns, topic) conn, err := wsUpgrader.Upgrade(w, r, nil) if err != nil { return } defer conn.Close() // Channel to deliver PubSub messages to WS writer msgs := make(chan []byte, 128) ctx := r.Context() // Subscribe to the topic; push data into msgs h := func(_ string, data []byte) error { select { case msgs <- data: return nil default: // Drop if client is slow to avoid blocking network return nil } } if err := g.client.PubSub().Subscribe(ctx, fullTopic, h); err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } defer func() { _ = g.client.PubSub().Unsubscribe(ctx, fullTopic) }() // Writer loop done := make(chan struct{}) go func() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case b, ok := <-msgs: if !ok { _ = conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(5*time.Second)) close(done) return } conn.SetWriteDeadline(time.Now().Add(30 * time.Second)) if err := conn.WriteMessage(websocket.BinaryMessage, b); err != nil { close(done) return } case <-ticker.C: // Ping keepalive _ = conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(5*time.Second)) case <-ctx.Done(): close(done) return } } }() // Reader loop: treat any client message as publish to the same topic for { mt, data, err := conn.ReadMessage() if err != nil { break } if mt != websocket.TextMessage && mt != websocket.BinaryMessage { continue } if err := g.client.PubSub().Publish(ctx, fullTopic, data); err != nil { // Best-effort notify client _ = conn.WriteMessage(websocket.TextMessage, []byte("publish_error")) } } <-done } // pubsubPublishHandler handles POST /v1/pubsub/publish {topic, data_base64} func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) { if g.client == nil { writeError(w, http.StatusServiceUnavailable, "client not initialized") return } if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } ns := resolveNamespaceFromRequest(r) if ns == "" { writeError(w, http.StatusForbidden, "namespace not resolved") return } var body struct { Topic string `json:"topic"` DataB64 string `json:"data_base64"` } if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Topic == "" || body.DataB64 == "" { writeError(w, http.StatusBadRequest, "invalid body: expected {topic,data_base64}") return } data, err := base64.StdEncoding.DecodeString(body.DataB64) if err != nil { writeError(w, http.StatusBadRequest, "invalid base64 data") return } if err := g.client.PubSub().Publish(r.Context(), namespacedTopic(ns, body.Topic), data); err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) } // pubsubTopicsHandler lists topics within the caller's namespace func (g *Gateway) pubsubTopicsHandler(w http.ResponseWriter, r *http.Request) { if g.client == nil { writeError(w, http.StatusServiceUnavailable, "client not initialized") return } ns := resolveNamespaceFromRequest(r) if ns == "" { writeError(w, http.StatusForbidden, "namespace not resolved") return } all, err := g.client.PubSub().ListTopics(r.Context()) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } prefix := namespacePrefix(ns) var filtered []string for _, t := range all { if len(t) >= len(prefix) && t[:len(prefix)] == prefix { filtered = append(filtered, t[len(prefix):]) } } writeJSON(w, http.StatusOK, map[string]any{"topics": filtered}) } // resolveNamespaceFromRequest gets namespace from context set by auth middleware func resolveNamespaceFromRequest(r *http.Request) string { if v := r.Context().Value(storage.CtxKeyNamespaceOverride); v != nil { if s, ok := v.(string); ok { return s } } return "" } func namespacePrefix(ns string) string { return "ns::" + ns + "::" } func namespacedTopic(ns, topic string) string { return namespacePrefix(ns) + topic }