mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-10-06 10:19:07 +00:00
feat: add namespaced pubsub API with websocket and HTTP endpoints
This commit is contained in:
parent
5b0a6864f9
commit
910dbc5bf6
3
Makefile
3
Makefile
@ -3,7 +3,7 @@
|
|||||||
|
|
||||||
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports
|
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports
|
||||||
|
|
||||||
VERSION := 0.19.0-beta
|
VERSION := 0.34.0-beta
|
||||||
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
|
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
|
||||||
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
|
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
|
||||||
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
|
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
|
||||||
@ -14,6 +14,7 @@ build: deps
|
|||||||
@mkdir -p bin
|
@mkdir -p bin
|
||||||
go build -ldflags "$(LDFLAGS)" -o bin/node ./cmd/node
|
go build -ldflags "$(LDFLAGS)" -o bin/node ./cmd/node
|
||||||
go build -ldflags "$(LDFLAGS)" -o bin/network-cli cmd/cli/main.go
|
go build -ldflags "$(LDFLAGS)" -o bin/network-cli cmd/cli/main.go
|
||||||
|
go build -ldflags "$(LDFLAGS)" -o bin/gateway ./cmd/gateway
|
||||||
@echo "Build complete! Run ./bin/network-cli version"
|
@echo "Build complete! Run ./bin/network-cli version"
|
||||||
|
|
||||||
# Clean build artifacts
|
# Clean build artifacts
|
||||||
|
@ -247,6 +247,9 @@ func requiresNamespaceOwnership(p string) bool {
|
|||||||
if p == "/v1/apps" || strings.HasPrefix(p, "/v1/apps/") {
|
if p == "/v1/apps" || strings.HasPrefix(p, "/v1/apps/") {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
if strings.HasPrefix(p, "/v1/pubsub") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
194
pkg/gateway/pubsub_handlers.go
Normal file
194
pkg/gateway/pubsub_handlers.go
Normal file
@ -0,0 +1,194 @@
|
|||||||
|
package gateway
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.debros.io/DeBros/network/pkg/storage"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
var wsUpgrader = websocket.Upgrader{
|
||||||
|
ReadBufferSize: 1024,
|
||||||
|
WriteBufferSize: 1024,
|
||||||
|
// For early development we accept any origin; tighten later.
|
||||||
|
CheckOrigin: func(r *http.Request) bool { return true },
|
||||||
|
}
|
||||||
|
|
||||||
|
// pubsubWebsocketHandler upgrades to WS, subscribes to a namespaced topic, and
|
||||||
|
// forwards received PubSub messages to the client. Messages sent by the client
|
||||||
|
// are published to the same namespaced topic.
|
||||||
|
func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if g.client == nil {
|
||||||
|
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if r.Method != http.MethodGet {
|
||||||
|
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolve namespace from auth context
|
||||||
|
ns := resolveNamespaceFromRequest(r)
|
||||||
|
if ns == "" {
|
||||||
|
writeError(w, http.StatusForbidden, "namespace not resolved")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
topic := r.URL.Query().Get("topic")
|
||||||
|
if topic == "" {
|
||||||
|
writeError(w, http.StatusBadRequest, "missing 'topic'")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fullTopic := namespacedTopic(ns, topic)
|
||||||
|
|
||||||
|
conn, err := wsUpgrader.Upgrade(w, r, nil)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
// Channel to deliver PubSub messages to WS writer
|
||||||
|
msgs := make(chan []byte, 128)
|
||||||
|
ctx := r.Context()
|
||||||
|
// Subscribe to the topic; push data into msgs
|
||||||
|
h := func(_ string, data []byte) error {
|
||||||
|
select {
|
||||||
|
case msgs <- data:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
// Drop if client is slow to avoid blocking network
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := g.client.PubSub().Subscribe(ctx, fullTopic, h); err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() { _ = g.client.PubSub().Unsubscribe(ctx, fullTopic) }()
|
||||||
|
|
||||||
|
// Writer loop
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(30 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case b, ok := <-msgs:
|
||||||
|
if !ok {
|
||||||
|
_ = conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(5*time.Second))
|
||||||
|
close(done)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
conn.SetWriteDeadline(time.Now().Add(30 * time.Second))
|
||||||
|
if err := conn.WriteMessage(websocket.BinaryMessage, b); err != nil {
|
||||||
|
close(done)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-ticker.C:
|
||||||
|
// Ping keepalive
|
||||||
|
_ = conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(5*time.Second))
|
||||||
|
case <-ctx.Done():
|
||||||
|
close(done)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Reader loop: treat any client message as publish to the same topic
|
||||||
|
for {
|
||||||
|
mt, data, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if mt != websocket.TextMessage && mt != websocket.BinaryMessage {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := g.client.PubSub().Publish(ctx, fullTopic, data); err != nil {
|
||||||
|
// Best-effort notify client
|
||||||
|
_ = conn.WriteMessage(websocket.TextMessage, []byte("publish_error"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
|
// pubsubPublishHandler handles POST /v1/pubsub/publish {topic, data_base64}
|
||||||
|
func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if g.client == nil {
|
||||||
|
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ns := resolveNamespaceFromRequest(r)
|
||||||
|
if ns == "" {
|
||||||
|
writeError(w, http.StatusForbidden, "namespace not resolved")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var body struct {
|
||||||
|
Topic string `json:"topic"`
|
||||||
|
DataB64 string `json:"data_base64"`
|
||||||
|
}
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Topic == "" || body.DataB64 == "" {
|
||||||
|
writeError(w, http.StatusBadRequest, "invalid body: expected {topic,data_base64}")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
data, err := base64.StdEncoding.DecodeString(body.DataB64)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusBadRequest, "invalid base64 data")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := g.client.PubSub().Publish(r.Context(), namespacedTopic(ns, body.Topic), data); err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
|
||||||
|
}
|
||||||
|
|
||||||
|
// pubsubTopicsHandler lists topics within the caller's namespace
|
||||||
|
func (g *Gateway) pubsubTopicsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if g.client == nil {
|
||||||
|
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ns := resolveNamespaceFromRequest(r)
|
||||||
|
if ns == "" {
|
||||||
|
writeError(w, http.StatusForbidden, "namespace not resolved")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
all, err := g.client.PubSub().ListTopics(r.Context())
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
prefix := namespacePrefix(ns)
|
||||||
|
var filtered []string
|
||||||
|
for _, t := range all {
|
||||||
|
if len(t) >= len(prefix) && t[:len(prefix)] == prefix {
|
||||||
|
filtered = append(filtered, t[len(prefix):])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{"topics": filtered})
|
||||||
|
}
|
||||||
|
|
||||||
|
// resolveNamespaceFromRequest gets namespace from context set by auth middleware
|
||||||
|
func resolveNamespaceFromRequest(r *http.Request) string {
|
||||||
|
if v := r.Context().Value(storage.CtxKeyNamespaceOverride); v != nil {
|
||||||
|
if s, ok := v.(string); ok {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func namespacePrefix(ns string) string {
|
||||||
|
return "ns::" + ns + "::"
|
||||||
|
}
|
||||||
|
|
||||||
|
func namespacedTopic(ns, topic string) string {
|
||||||
|
return namespacePrefix(ns) + topic
|
||||||
|
}
|
@ -41,5 +41,10 @@ func (g *Gateway) Routes() http.Handler {
|
|||||||
mux.HandleFunc("/v1/network/connect", g.networkConnectHandler)
|
mux.HandleFunc("/v1/network/connect", g.networkConnectHandler)
|
||||||
mux.HandleFunc("/v1/network/disconnect", g.networkDisconnectHandler)
|
mux.HandleFunc("/v1/network/disconnect", g.networkDisconnectHandler)
|
||||||
|
|
||||||
|
// pubsub
|
||||||
|
mux.HandleFunc("/v1/pubsub/ws", g.pubsubWebsocketHandler)
|
||||||
|
mux.HandleFunc("/v1/pubsub/publish", g.pubsubPublishHandler)
|
||||||
|
mux.HandleFunc("/v1/pubsub/topics", g.pubsubTopicsHandler)
|
||||||
|
|
||||||
return g.withMiddleware(mux)
|
return g.withMiddleware(mux)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user