From df681be07fa8851e38a410e0eee79a08c5f47e5c Mon Sep 17 00:00:00 2001 From: anonpenguin Date: Sat, 23 Aug 2025 15:39:22 +0300 Subject: [PATCH] Refine authorization to distinguish API keys Add deduplication to pubsub websocket handler to drop recent duplicate messages within 2 seconds --- pkg/gateway/gateway.go | 3 ++- pkg/gateway/middleware.go | 28 +++++++++++++++++++--------- pkg/gateway/pubsub_handlers.go | 17 +++++++++++++++-- 3 files changed, 36 insertions(+), 12 deletions(-) diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index cb1306e..134f12a 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -7,6 +7,7 @@ import ( "net/http" "strconv" "time" + "git.debros.io/DeBros/network/pkg/client" "git.debros.io/DeBros/network/pkg/logging" @@ -58,7 +59,7 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { ) logger.ComponentInfo(logging.ComponentGeneral, "Creating gateway instance...") - gw := &Gateway{ + gw := &Gateway{ logger: logger, cfg: cfg, client: c, diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index 6194d49..94fd78c 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -203,15 +203,25 @@ func (g *Gateway) authorizationMiddleware(next http.Handler) http.Handler { return } - // Identify actor from context - ownerType := "" - ownerID := "" - if v := ctx.Value(ctxKeyJWT); v != nil { - if claims, ok := v.(*jwtClaims); ok && claims != nil && strings.TrimSpace(claims.Sub) != "" { - ownerType = "wallet" - ownerID = strings.TrimSpace(claims.Sub) - } - } + // Identify actor from context + ownerType := "" + ownerID := "" + if v := ctx.Value(ctxKeyJWT); v != nil { + if claims, ok := v.(*jwtClaims); ok && claims != nil && strings.TrimSpace(claims.Sub) != "" { + // Determine subject type. + // If subject looks like an API key (e.g., ak_:), + // treat it as an API key owner; otherwise assume a wallet subject. + subj := strings.TrimSpace(claims.Sub) + lowerSubj := strings.ToLower(subj) + if strings.HasPrefix(lowerSubj, "ak_") || strings.Contains(subj, ":") { + ownerType = "api_key" + ownerID = subj + } else { + ownerType = "wallet" + ownerID = subj + } + } + } if ownerType == "" && ownerID == "" { if v := ctx.Value(ctxKeyAPIKey); v != nil { if s, ok := v.(string); ok && strings.TrimSpace(s) != "" { diff --git a/pkg/gateway/pubsub_handlers.go b/pkg/gateway/pubsub_handlers.go index 2eb3682..7be5e8f 100644 --- a/pkg/gateway/pubsub_handlers.go +++ b/pkg/gateway/pubsub_handlers.go @@ -3,6 +3,8 @@ package gateway import ( "encoding/base64" "encoding/json" + "crypto/sha256" + "encoding/hex" "net/http" "time" @@ -58,8 +60,16 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) msgs := make(chan []byte, 128) // Use internal auth context when interacting with client to avoid circular auth requirements ctx := client.WithInternalAuth(r.Context()) - // Subscribe to the topic; push data into msgs - h := func(_ string, data []byte) error { + // Subscribe to the topic; push data into msgs with simple per-connection de-dup + recent := make(map[string]time.Time) + h := func(_ string, data []byte) error { + // Drop duplicates seen in the last 2 seconds + sum := sha256.Sum256(data) + key := hex.EncodeToString(sum[:]) + if t, ok := recent[key]; ok && time.Since(t) < 2*time.Second { + return nil + } + recent[key] = time.Now() select { case msgs <- data: return nil @@ -75,6 +85,8 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) } defer func() { _ = g.client.PubSub().Unsubscribe(ctx, topic) }() + // no extra fan-out; rely on libp2p subscription + // Writer loop done := make(chan struct{}) go func() { @@ -152,6 +164,7 @@ func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err.Error()) return } + // rely on libp2p to deliver to WS subscribers writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) }