From 910dbc5bf6313d194730f9c1849165ccb6865582 Mon Sep 17 00:00:00 2001 From: anonpenguin Date: Sat, 16 Aug 2025 16:29:54 +0300 Subject: [PATCH] feat: add namespaced pubsub API with websocket and HTTP endpoints --- Makefile | 3 +- pkg/gateway/middleware.go | 17 +-- pkg/gateway/pubsub_handlers.go | 194 +++++++++++++++++++++++++++++++++ pkg/gateway/routes.go | 5 + 4 files changed, 211 insertions(+), 8 deletions(-) create mode 100644 pkg/gateway/pubsub_handlers.go diff --git a/Makefile b/Makefile index b271031..6a418ce 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.19.0-beta +VERSION := 0.34.0-beta COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' @@ -14,6 +14,7 @@ build: deps @mkdir -p bin go build -ldflags "$(LDFLAGS)" -o bin/node ./cmd/node go build -ldflags "$(LDFLAGS)" -o bin/network-cli cmd/cli/main.go + go build -ldflags "$(LDFLAGS)" -o bin/gateway ./cmd/gateway @echo "Build complete! Run ./bin/network-cli version" # Clean build artifacts diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index e9ac542..53d356f 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -241,13 +241,16 @@ func (g *Gateway) authorizationMiddleware(next http.Handler) http.Handler { // requiresNamespaceOwnership returns true if the path should be guarded by // namespace ownership checks. func requiresNamespaceOwnership(p string) bool { - if p == "/storage" || p == "/v1/storage" || strings.HasPrefix(p, "/v1/storage/") { - return true - } - if p == "/v1/apps" || strings.HasPrefix(p, "/v1/apps/") { - return true - } - return false + if p == "/storage" || p == "/v1/storage" || strings.HasPrefix(p, "/v1/storage/") { + return true + } + if p == "/v1/apps" || strings.HasPrefix(p, "/v1/apps/") { + return true + } + if strings.HasPrefix(p, "/v1/pubsub") { + return true + } + return false } // corsMiddleware applies permissive CORS headers suitable for early development diff --git a/pkg/gateway/pubsub_handlers.go b/pkg/gateway/pubsub_handlers.go new file mode 100644 index 0000000..4a27244 --- /dev/null +++ b/pkg/gateway/pubsub_handlers.go @@ -0,0 +1,194 @@ +package gateway + +import ( + "encoding/base64" + "encoding/json" + "net/http" + "time" + + "git.debros.io/DeBros/network/pkg/storage" + "github.com/gorilla/websocket" +) + +var wsUpgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + // For early development we accept any origin; tighten later. + CheckOrigin: func(r *http.Request) bool { return true }, +} + +// pubsubWebsocketHandler upgrades to WS, subscribes to a namespaced topic, and +// forwards received PubSub messages to the client. Messages sent by the client +// are published to the same namespaced topic. +func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) { + if g.client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + // Resolve namespace from auth context + ns := resolveNamespaceFromRequest(r) + if ns == "" { + writeError(w, http.StatusForbidden, "namespace not resolved") + return + } + + topic := r.URL.Query().Get("topic") + if topic == "" { + writeError(w, http.StatusBadRequest, "missing 'topic'") + return + } + fullTopic := namespacedTopic(ns, topic) + + conn, err := wsUpgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer conn.Close() + + // Channel to deliver PubSub messages to WS writer + msgs := make(chan []byte, 128) + ctx := r.Context() + // Subscribe to the topic; push data into msgs + h := func(_ string, data []byte) error { + select { + case msgs <- data: + return nil + default: + // Drop if client is slow to avoid blocking network + return nil + } + } + if err := g.client.PubSub().Subscribe(ctx, fullTopic, h); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + defer func() { _ = g.client.PubSub().Unsubscribe(ctx, fullTopic) }() + + // Writer loop + done := make(chan struct{}) + go func() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + select { + case b, ok := <-msgs: + if !ok { + _ = conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(5*time.Second)) + close(done) + return + } + conn.SetWriteDeadline(time.Now().Add(30 * time.Second)) + if err := conn.WriteMessage(websocket.BinaryMessage, b); err != nil { + close(done) + return + } + case <-ticker.C: + // Ping keepalive + _ = conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(5*time.Second)) + case <-ctx.Done(): + close(done) + return + } + } + }() + + // Reader loop: treat any client message as publish to the same topic + for { + mt, data, err := conn.ReadMessage() + if err != nil { + break + } + if mt != websocket.TextMessage && mt != websocket.BinaryMessage { + continue + } + if err := g.client.PubSub().Publish(ctx, fullTopic, data); err != nil { + // Best-effort notify client + _ = conn.WriteMessage(websocket.TextMessage, []byte("publish_error")) + } + } + <-done +} + +// pubsubPublishHandler handles POST /v1/pubsub/publish {topic, data_base64} +func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) { + if g.client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + ns := resolveNamespaceFromRequest(r) + if ns == "" { + writeError(w, http.StatusForbidden, "namespace not resolved") + return + } + var body struct { + Topic string `json:"topic"` + DataB64 string `json:"data_base64"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Topic == "" || body.DataB64 == "" { + writeError(w, http.StatusBadRequest, "invalid body: expected {topic,data_base64}") + return + } + data, err := base64.StdEncoding.DecodeString(body.DataB64) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid base64 data") + return + } + if err := g.client.PubSub().Publish(r.Context(), namespacedTopic(ns, body.Topic), data); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) +} + +// pubsubTopicsHandler lists topics within the caller's namespace +func (g *Gateway) pubsubTopicsHandler(w http.ResponseWriter, r *http.Request) { + if g.client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + ns := resolveNamespaceFromRequest(r) + if ns == "" { + writeError(w, http.StatusForbidden, "namespace not resolved") + return + } + all, err := g.client.PubSub().ListTopics(r.Context()) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + prefix := namespacePrefix(ns) + var filtered []string + for _, t := range all { + if len(t) >= len(prefix) && t[:len(prefix)] == prefix { + filtered = append(filtered, t[len(prefix):]) + } + } + writeJSON(w, http.StatusOK, map[string]any{"topics": filtered}) +} + +// resolveNamespaceFromRequest gets namespace from context set by auth middleware +func resolveNamespaceFromRequest(r *http.Request) string { + if v := r.Context().Value(storage.CtxKeyNamespaceOverride); v != nil { + if s, ok := v.(string); ok { + return s + } + } + return "" +} + +func namespacePrefix(ns string) string { + return "ns::" + ns + "::" +} + +func namespacedTopic(ns, topic string) string { + return namespacePrefix(ns) + topic +} diff --git a/pkg/gateway/routes.go b/pkg/gateway/routes.go index abea1df..12227b0 100644 --- a/pkg/gateway/routes.go +++ b/pkg/gateway/routes.go @@ -41,5 +41,10 @@ func (g *Gateway) Routes() http.Handler { mux.HandleFunc("/v1/network/connect", g.networkConnectHandler) mux.HandleFunc("/v1/network/disconnect", g.networkDisconnectHandler) + // pubsub + mux.HandleFunc("/v1/pubsub/ws", g.pubsubWebsocketHandler) + mux.HandleFunc("/v1/pubsub/publish", g.pubsubPublishHandler) + mux.HandleFunc("/v1/pubsub/topics", g.pubsubTopicsHandler) + return g.withMiddleware(mux) }