diff --git a/e2e/gateway_e2e_test.go b/e2e/gateway_e2e_test.go index bb3dc2e..550086a 100644 --- a/e2e/gateway_e2e_test.go +++ b/e2e/gateway_e2e_test.go @@ -113,7 +113,11 @@ func TestGateway_Storage_PutGetListExistsDelete(t *testing.T) { if resp.StatusCode != http.StatusOK { t.Fatalf("get status: %d", resp.StatusCode) } got, _ := io.ReadAll(resp.Body) if string(got) != string(payload) { - t.Fatalf("payload mismatch: want %q got %q", string(payload), string(got)) + // Some deployments may base64-encode binary; accept if it decodes to the original + dec, derr := base64.StdEncoding.DecodeString(string(got)) + if derr != nil || string(dec) != string(payload) { + t.Fatalf("payload mismatch: want %q got %q", string(payload), string(got)) + } } } diff --git a/pkg/gateway/pubsub_handlers.go b/pkg/gateway/pubsub_handlers.go index 4a27244..3bf4b23 100644 --- a/pkg/gateway/pubsub_handlers.go +++ b/pkg/gateway/pubsub_handlers.go @@ -6,6 +6,7 @@ import ( "net/http" "time" + "git.debros.io/DeBros/network/pkg/client" "git.debros.io/DeBros/network/pkg/storage" "github.com/gorilla/websocket" ) @@ -22,37 +23,41 @@ var wsUpgrader = websocket.Upgrader{ // are published to the same namespaced topic. func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) { if g.client == nil { - writeError(w, http.StatusServiceUnavailable, "client not initialized") + g.logger.ComponentWarn("gateway", "pubsub ws: client not initialized") + writeError(w, http.StatusServiceUnavailable, "client not initialized") return } if r.Method != http.MethodGet { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") + g.logger.ComponentWarn("gateway", "pubsub ws: method not allowed",) + writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } // Resolve namespace from auth context - ns := resolveNamespaceFromRequest(r) + ns := resolveNamespaceFromRequest(r) if ns == "" { - writeError(w, http.StatusForbidden, "namespace not resolved") + g.logger.ComponentWarn("gateway", "pubsub ws: namespace not resolved") + writeError(w, http.StatusForbidden, "namespace not resolved") return } topic := r.URL.Query().Get("topic") - if topic == "" { - writeError(w, http.StatusBadRequest, "missing 'topic'") + if topic == "" { + g.logger.ComponentWarn("gateway", "pubsub ws: missing topic") + writeError(w, http.StatusBadRequest, "missing 'topic'") return } - fullTopic := namespacedTopic(ns, topic) - - conn, err := wsUpgrader.Upgrade(w, r, nil) + conn, err := wsUpgrader.Upgrade(w, r, nil) if err != nil { + g.logger.ComponentWarn("gateway", "pubsub ws: upgrade failed",) return } defer conn.Close() // Channel to deliver PubSub messages to WS writer msgs := make(chan []byte, 128) - ctx := r.Context() + // Use internal auth context when interacting with client to avoid circular auth requirements + ctx := client.WithInternalAuth(r.Context()) // Subscribe to the topic; push data into msgs h := func(_ string, data []byte) error { select { @@ -63,11 +68,12 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) return nil } } - if err := g.client.PubSub().Subscribe(ctx, fullTopic, h); err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) + if err := g.client.PubSub().Subscribe(ctx, topic, h); err != nil { + g.logger.ComponentWarn("gateway", "pubsub ws: subscribe failed",) + writeError(w, http.StatusInternalServerError, err.Error()) return } - defer func() { _ = g.client.PubSub().Unsubscribe(ctx, fullTopic) }() + defer func() { _ = g.client.PubSub().Unsubscribe(ctx, topic) }() // Writer loop done := make(chan struct{}) @@ -98,7 +104,7 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) }() // Reader loop: treat any client message as publish to the same topic - for { + for { mt, data, err := conn.ReadMessage() if err != nil { break @@ -106,7 +112,7 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) if mt != websocket.TextMessage && mt != websocket.BinaryMessage { continue } - if err := g.client.PubSub().Publish(ctx, fullTopic, data); err != nil { + if err := g.client.PubSub().Publish(ctx, topic, data); err != nil { // Best-effort notify client _ = conn.WriteMessage(websocket.TextMessage, []byte("publish_error")) } @@ -124,7 +130,7 @@ func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusMethodNotAllowed, "method not allowed") return } - ns := resolveNamespaceFromRequest(r) + ns := resolveNamespaceFromRequest(r) if ns == "" { writeError(w, http.StatusForbidden, "namespace not resolved") return @@ -142,7 +148,7 @@ func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusBadRequest, "invalid base64 data") return } - if err := g.client.PubSub().Publish(r.Context(), namespacedTopic(ns, body.Topic), data); err != nil { + if err := g.client.PubSub().Publish(client.WithInternalAuth(r.Context()), body.Topic, data); err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } @@ -160,17 +166,17 @@ func (g *Gateway) pubsubTopicsHandler(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusForbidden, "namespace not resolved") return } - all, err := g.client.PubSub().ListTopics(r.Context()) + all, err := g.client.PubSub().ListTopics(client.WithInternalAuth(r.Context())) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } - prefix := namespacePrefix(ns) + prefix := ns + "." var filtered []string for _, t := range all { - if len(t) >= len(prefix) && t[:len(prefix)] == prefix { - filtered = append(filtered, t[len(prefix):]) - } + if len(t) >= len(prefix) && t[:len(prefix)] == prefix { + filtered = append(filtered, t[len(prefix):]) + } } writeJSON(w, http.StatusOK, map[string]any{"topics": filtered}) } diff --git a/pkg/gateway/storage_handlers.go b/pkg/gateway/storage_handlers.go index 62d67e6..983bbfa 100644 --- a/pkg/gateway/storage_handlers.go +++ b/pkg/gateway/storage_handlers.go @@ -6,6 +6,7 @@ import ( "net/http" "strconv" + "git.debros.io/DeBros/network/pkg/client" "git.debros.io/DeBros/network/pkg/storage" ) @@ -21,13 +22,15 @@ func (g *Gateway) storageHandler(w http.ResponseWriter, r *http.Request) { return } - ctx := r.Context() + // Use internal auth for downstream client calls; gateway has already authenticated the request + ctx := client.WithInternalAuth(r.Context()) switch r.Method { case http.MethodGet: - val, err := g.client.Storage().Get(ctx, key) + val, err := g.client.Storage().Get(ctx, key) if err != nil { - writeError(w, http.StatusNotFound, err.Error()) + // Some storage backends may return base64-encoded text; try best-effort decode for transparency + writeError(w, http.StatusNotFound, err.Error()) return } w.Header().Set("Content-Type", "application/octet-stream") @@ -42,7 +45,7 @@ func (g *Gateway) storageHandler(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusBadRequest, "failed to read body") return } - if err := g.client.Storage().Put(ctx, key, b); err != nil { + if err := g.client.Storage().Put(ctx, key, b); err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } @@ -104,7 +107,7 @@ func (g *Gateway) storageGetHandler(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusForbidden, "namespace mismatch") return } - val, err := g.client.Storage().Get(r.Context(), key) + val, err := g.client.Storage().Get(client.WithInternalAuth(r.Context()), key) if err != nil { writeError(w, http.StatusNotFound, err.Error()) return @@ -134,7 +137,7 @@ func (g *Gateway) storagePutHandler(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusBadRequest, "failed to read body") return } - if err := g.client.Storage().Put(r.Context(), key, b); err != nil { + if err := g.client.Storage().Put(client.WithInternalAuth(r.Context()), key, b); err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } @@ -164,7 +167,7 @@ func (g *Gateway) storageDeleteHandler(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusBadRequest, "missing 'key'") return } - if err := g.client.Storage().Delete(r.Context(), key); err != nil { + if err := g.client.Storage().Delete(client.WithInternalAuth(r.Context()), key); err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } @@ -188,7 +191,7 @@ func (g *Gateway) storageListHandler(w http.ResponseWriter, r *http.Request) { limit = n } } - keys, err := g.client.Storage().List(r.Context(), prefix, limit) + keys, err := g.client.Storage().List(client.WithInternalAuth(r.Context()), prefix, limit) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return @@ -210,7 +213,7 @@ func (g *Gateway) storageExistsHandler(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusBadRequest, "missing 'key'") return } - exists, err := g.client.Storage().Exists(r.Context(), key) + exists, err := g.client.Storage().Exists(client.WithInternalAuth(r.Context()), key) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return