mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-10-06 08:19:07 +00:00
Refine authorization to distinguish API keys
Add deduplication to pubsub websocket handler to drop recent duplicate messages within 2 seconds
This commit is contained in:
parent
3fe78ee62a
commit
df681be07f
@ -8,6 +8,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
||||||
"git.debros.io/DeBros/network/pkg/client"
|
"git.debros.io/DeBros/network/pkg/client"
|
||||||
"git.debros.io/DeBros/network/pkg/logging"
|
"git.debros.io/DeBros/network/pkg/logging"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -58,7 +59,7 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) {
|
|||||||
)
|
)
|
||||||
|
|
||||||
logger.ComponentInfo(logging.ComponentGeneral, "Creating gateway instance...")
|
logger.ComponentInfo(logging.ComponentGeneral, "Creating gateway instance...")
|
||||||
gw := &Gateway{
|
gw := &Gateway{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
client: c,
|
client: c,
|
||||||
|
@ -203,15 +203,25 @@ func (g *Gateway) authorizationMiddleware(next http.Handler) http.Handler {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Identify actor from context
|
// Identify actor from context
|
||||||
ownerType := ""
|
ownerType := ""
|
||||||
ownerID := ""
|
ownerID := ""
|
||||||
if v := ctx.Value(ctxKeyJWT); v != nil {
|
if v := ctx.Value(ctxKeyJWT); v != nil {
|
||||||
if claims, ok := v.(*jwtClaims); ok && claims != nil && strings.TrimSpace(claims.Sub) != "" {
|
if claims, ok := v.(*jwtClaims); ok && claims != nil && strings.TrimSpace(claims.Sub) != "" {
|
||||||
ownerType = "wallet"
|
// Determine subject type.
|
||||||
ownerID = strings.TrimSpace(claims.Sub)
|
// If subject looks like an API key (e.g., ak_<random>:<namespace>),
|
||||||
}
|
// treat it as an API key owner; otherwise assume a wallet subject.
|
||||||
}
|
subj := strings.TrimSpace(claims.Sub)
|
||||||
|
lowerSubj := strings.ToLower(subj)
|
||||||
|
if strings.HasPrefix(lowerSubj, "ak_") || strings.Contains(subj, ":") {
|
||||||
|
ownerType = "api_key"
|
||||||
|
ownerID = subj
|
||||||
|
} else {
|
||||||
|
ownerType = "wallet"
|
||||||
|
ownerID = subj
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
if ownerType == "" && ownerID == "" {
|
if ownerType == "" && ownerID == "" {
|
||||||
if v := ctx.Value(ctxKeyAPIKey); v != nil {
|
if v := ctx.Value(ctxKeyAPIKey); v != nil {
|
||||||
if s, ok := v.(string); ok && strings.TrimSpace(s) != "" {
|
if s, ok := v.(string); ok && strings.TrimSpace(s) != "" {
|
||||||
|
@ -3,6 +3,8 @@ package gateway
|
|||||||
import (
|
import (
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -58,8 +60,16 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request)
|
|||||||
msgs := make(chan []byte, 128)
|
msgs := make(chan []byte, 128)
|
||||||
// Use internal auth context when interacting with client to avoid circular auth requirements
|
// Use internal auth context when interacting with client to avoid circular auth requirements
|
||||||
ctx := client.WithInternalAuth(r.Context())
|
ctx := client.WithInternalAuth(r.Context())
|
||||||
// Subscribe to the topic; push data into msgs
|
// Subscribe to the topic; push data into msgs with simple per-connection de-dup
|
||||||
h := func(_ string, data []byte) error {
|
recent := make(map[string]time.Time)
|
||||||
|
h := func(_ string, data []byte) error {
|
||||||
|
// Drop duplicates seen in the last 2 seconds
|
||||||
|
sum := sha256.Sum256(data)
|
||||||
|
key := hex.EncodeToString(sum[:])
|
||||||
|
if t, ok := recent[key]; ok && time.Since(t) < 2*time.Second {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
recent[key] = time.Now()
|
||||||
select {
|
select {
|
||||||
case msgs <- data:
|
case msgs <- data:
|
||||||
return nil
|
return nil
|
||||||
@ -75,6 +85,8 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request)
|
|||||||
}
|
}
|
||||||
defer func() { _ = g.client.PubSub().Unsubscribe(ctx, topic) }()
|
defer func() { _ = g.client.PubSub().Unsubscribe(ctx, topic) }()
|
||||||
|
|
||||||
|
// no extra fan-out; rely on libp2p subscription
|
||||||
|
|
||||||
// Writer loop
|
// Writer loop
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
@ -152,6 +164,7 @@ func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeError(w, http.StatusInternalServerError, err.Error())
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// rely on libp2p to deliver to WS subscribers
|
||||||
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
|
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user