Compare commits

..

1 Commits

Author SHA1 Message Date
anonpenguin
8b8f0a4251
Merge pull request #93 from DeBrosDAO/nightly
release: 0.122.47 — nightly → main
2026-06-11 17:37:20 +03:00
25 changed files with 97 additions and 1225 deletions

View File

@ -1 +1 @@
0.122.51
0.122.47

View File

@ -1,15 +0,0 @@
-- =============================================================================
-- 031_refresh_token_custom_claims.sql
--
-- Carry the additive JWT custom claims (e.g. the namespace's account_id from
-- the auth-claims-provider hook, bugboard #548/#920) ALONGSIDE the refresh
-- token, so a rotated access token keeps the same claims without re-invoking
-- the namespace's claims-provider function on every 15-min refresh (the
-- refresh path is the latency-critical VoIP-wake path, bugboard #125).
--
-- Resolved once at /v1/auth/verify mint time, stored here, and replayed +
-- propagated across each rotation. NULL/absent = no custom claims (the default
-- for every namespace without a claims-provider) → fully backward compatible.
-- =============================================================================
ALTER TABLE refresh_tokens ADD COLUMN custom_claims TEXT;

View File

@ -43,10 +43,9 @@ type AuthService interface {
// Verifies signature, expiration, and issuer.
ParseAndVerifyJWT(token string) (*JWTClaims, error)
// GenerateJWT creates a new signed JWT with the specified subject, TTL, and
// optional additive custom claims (nil = none; bugboard #548).
// GenerateJWT creates a new signed JWT with the specified claims and TTL.
// Returns: token, expirationUnix, error.
GenerateJWT(namespace, subject string, ttl time.Duration, custom map[string]string) (string, int64, error)
GenerateJWT(namespace, subject string, ttl time.Duration) (string, int64, error)
// RegisterApp registers a new client application with the gateway.
// Returns an application ID that can be used for OAuth flows.

View File

@ -4,7 +4,6 @@ import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
)
// sha256Hex returns the lowercase hex-encoded SHA-256 hash of the input string.
@ -23,34 +22,3 @@ func HmacSHA256Hex(data, secret string) string {
mac.Write([]byte(data))
return hex.EncodeToString(mac.Sum(nil))
}
// marshalClaims serializes additive JWT custom claims for storage alongside a
// refresh token (bugboard #548). Empty/nil → "" so the column stays NULL-ish
// and absent claims read back as nil.
func marshalClaims(m map[string]string) string {
if len(m) == 0 {
return ""
}
b, err := json.Marshal(m)
if err != nil {
return ""
}
return string(b)
}
// unmarshalClaims is the inverse of marshalClaims. An empty string or any
// malformed value yields nil (fail-soft — a corrupt claims blob must never
// break token rotation; the token simply rotates without custom claims).
func unmarshalClaims(s string) map[string]string {
if s == "" {
return nil
}
var m map[string]string
if err := json.Unmarshal([]byte(s), &m); err != nil {
return nil
}
if len(m) == 0 {
return nil
}
return m
}

View File

@ -182,22 +182,15 @@ func (s *Service) ParseAndVerifyJWT(token string) (*JWTClaims, error) {
return &claims, nil
}
// GenerateJWT mints a signed access token. `custom` carries additive
// app-defined claims (e.g. the namespace's account_id from the claims-provider
// hook, bugboard #548) under the top-level "custom" object — read back via
// JWTClaims.Custom / oh.GetCallerClaim. Pass nil for none. Reserved claims
// (sub/iss/aud/iat/nbf/exp/namespace) are always gateway-controlled and cannot
// be overridden by `custom` (the caller is responsible for not putting
// reserved keys here; the claims-provider path sanitizes them out upstream).
func (s *Service) GenerateJWT(ns, subject string, ttl time.Duration, custom map[string]string) (string, int64, error) {
func (s *Service) GenerateJWT(ns, subject string, ttl time.Duration) (string, int64, error) {
// Prefer EdDSA when available
if s.preferEdDSA && s.edSigningKey != nil {
return s.generateEdDSAJWT(ns, subject, ttl, custom)
return s.generateEdDSAJWT(ns, subject, ttl)
}
return s.generateRSAJWT(ns, subject, ttl, custom)
return s.generateRSAJWT(ns, subject, ttl)
}
func (s *Service) generateEdDSAJWT(ns, subject string, ttl time.Duration, custom map[string]string) (string, int64, error) {
func (s *Service) generateEdDSAJWT(ns, subject string, ttl time.Duration) (string, int64, error) {
if s.edSigningKey == nil {
return "", 0, errors.New("EdDSA signing key unavailable")
}
@ -218,9 +211,6 @@ func (s *Service) generateEdDSAJWT(ns, subject string, ttl time.Duration, custom
"exp": exp.Unix(),
"namespace": ns,
}
if len(custom) > 0 {
payload["custom"] = custom
}
pb, _ := json.Marshal(payload)
hb64 := base64.RawURLEncoding.EncodeToString(hb)
pb64 := base64.RawURLEncoding.EncodeToString(pb)
@ -230,7 +220,7 @@ func (s *Service) generateEdDSAJWT(ns, subject string, ttl time.Duration, custom
return signingInput + "." + sb64, exp.Unix(), nil
}
func (s *Service) generateRSAJWT(ns, subject string, ttl time.Duration, custom map[string]string) (string, int64, error) {
func (s *Service) generateRSAJWT(ns, subject string, ttl time.Duration) (string, int64, error) {
if s.signingKey == nil {
return "", 0, errors.New("signing key unavailable")
}
@ -251,9 +241,6 @@ func (s *Service) generateRSAJWT(ns, subject string, ttl time.Duration, custom m
"exp": exp.Unix(),
"namespace": ns,
}
if len(custom) > 0 {
payload["custom"] = custom
}
pb, _ := json.Marshal(payload)
hb64 := base64.RawURLEncoding.EncodeToString(hb)
pb64 := base64.RawURLEncoding.EncodeToString(pb)

View File

@ -31,15 +31,8 @@ type rotationMockORMDB struct {
client.DatabaseClient
mu sync.Mutex
subjectByToken map[string]string // hashedToken -> subject (nil/missing = "invalid")
claimsByToken map[string]string // hashedToken -> custom_claims JSON (bugboard #548)
inserted int // count of INSERTs (new refresh-token rows)
subjects map[string]string // subject -> last hashed token inserted
// selectErrRemaining: number of upcoming "SELECT subject" calls that
// should return selectErr (simulates a transient rqlite leader outage).
// Decremented per matching call; 0 = serve normally (bugboard #125).
selectErr error
selectErrRemaining int
selectAttemptsTaken int
}
func (m *rotationMockORMDB) Query(_ context.Context, sql string, args ...interface{}) (*client.QueryResult, error) {
@ -52,23 +45,14 @@ func (m *rotationMockORMDB) Query(_ context.Context, sql string, args ...interfa
if containsCI(sql, "SELECT id FROM namespaces") {
return &client.QueryResult{Count: 1, Rows: [][]interface{}{{int64(1)}}}, nil
}
// SELECT subject (+ custom_claims, bugboard #548) for the lookup.
if containsCI(sql, "SELECT subject") && containsCI(sql, "FROM refresh_tokens") {
m.selectAttemptsTaken++
if m.selectErrRemaining > 0 {
m.selectErrRemaining--
return nil, m.selectErr
}
// SELECT subject for the refresh-token lookup.
if containsCI(sql, "SELECT subject FROM refresh_tokens") {
if len(args) < 2 {
return &client.QueryResult{Count: 0}, nil
}
hashedTok, _ := args[1].(string)
if subj, ok := m.subjectByToken[hashedTok]; ok && subj != "" {
claims := ""
if m.claimsByToken != nil {
claims = m.claimsByToken[hashedTok]
}
return &client.QueryResult{Count: 1, Rows: [][]interface{}{{subj, claims}}}, nil
return &client.QueryResult{Count: 1, Rows: [][]interface{}{{subj}}}, nil
}
return &client.QueryResult{Count: 0}, nil
}
@ -87,14 +71,6 @@ func (m *rotationMockORMDB) Query(_ context.Context, sql string, args ...interfa
m.subjectByToken = map[string]string{}
}
m.subjectByToken[hashedTok] = subj
// custom_claims is the LAST arg (bugboard #548) — capture it so
// rotation-propagation tests can assert it carries forward.
if m.claimsByToken == nil {
m.claimsByToken = map[string]string{}
}
if cc, ok := args[len(args)-1].(string); ok {
m.claimsByToken[hashedTok] = cc
}
}
return &client.QueryResult{Count: 1}, nil
}
@ -393,120 +369,3 @@ func TestRefreshToken_RotatedTokenReplayFails(t *testing.T) {
t.Fatal("expected error reusing rotated token, got nil")
}
}
// Bugboard #125: a TRANSIENT rqlite error on the lookup (leader briefly
// unavailable during a rolling restart) must surface as ErrRefreshTransient
// (→ 503, retryable) — NOT "invalid or expired" (→ 401, full SIWE re-auth,
// impossible on a locked device answering a VoIP-woken call).
func TestRefreshToken_transientSelectError_returnsTransient(t *testing.T) {
s, ormDB, _ := newRotationTestService(t)
const refresh = "valid-but-leader-down"
ormDB.subjectByToken[sha256Hex(refresh)] = "0xWALLET"
// Every lookup attempt across the whole retry window errors.
ormDB.selectErr = errors.New("rqlite: leadership lost")
ormDB.selectErrRemaining = 99
_, _, _, _, err := s.RefreshToken(context.Background(), refresh, "anchat-test")
if !errors.Is(err, ErrRefreshTransient) {
t.Fatalf("err = %v, want ErrRefreshTransient (a valid token must not 401 during a leader outage)", err)
}
}
// The lookup is retried, so a brief blip recovers transparently within one
// refresh call (no client-visible failure at all).
func TestRefreshToken_selectRecoversAfterRetry(t *testing.T) {
s, ormDB, _ := newRotationTestService(t)
const refresh = "valid-blips-then-ok"
ormDB.subjectByToken[sha256Hex(refresh)] = "0xWALLET"
ormDB.selectErr = errors.New("rqlite: leadership lost")
ormDB.selectErrRemaining = refreshSelectRetries - 1 // fail all but the last attempt
access, newRefresh, subj, _, err := s.RefreshToken(context.Background(), refresh, "anchat-test")
if err != nil {
t.Fatalf("RefreshToken should recover after transient blips: %v", err)
}
if access == "" || newRefresh == "" || subj != "0xWALLET" {
t.Errorf("recovered refresh incomplete: access=%q newRefresh=%q subj=%q", access, newRefresh, subj)
}
}
// A transient error on the CAS write (revoke) is also retryable, not a 401.
func TestRefreshToken_transientUpdateError_returnsTransient(t *testing.T) {
s, ormDB, rq := newRotationTestService(t)
const refresh = "valid-cas-write-down"
ormDB.subjectByToken[sha256Hex(refresh)] = "0xWALLET"
rq.execErrNext = []error{errors.New("rqlite: write failed, no leader")}
_, _, _, _, err := s.RefreshToken(context.Background(), refresh, "anchat-test")
if !errors.Is(err, ErrRefreshTransient) {
t.Fatalf("err = %v, want ErrRefreshTransient on a transient CAS write error", err)
}
}
// A genuinely unknown token must remain a hard invalid (401), NOT be masked as
// transient — the distinction is the whole point of the #125 fix.
func TestRefreshToken_unknownToken_isNotTransient(t *testing.T) {
s, _, _ := newRotationTestService(t)
_, _, _, _, err := s.RefreshToken(context.Background(), "never-existed", "anchat-test")
if err == nil {
t.Fatal("expected error for unknown token")
}
if errors.Is(err, ErrRefreshTransient) {
t.Errorf("unknown token must be a genuine invalid (401), not transient (503): %v", err)
}
}
// mockClaimsResolver is a fixed claims-provider stand-in for the mint tests.
type mockClaimsResolver struct{ claims map[string]string }
func (m mockClaimsResolver) ResolveClaims(_ context.Context, _, _ string) map[string]string {
return m.claims
}
// Bugboard #548: claims resolved at IssueTokens (login) must be stored with the
// refresh token AND replayed into the rotated access token — so account_id
// survives the 15-min refresh without re-invoking the provider.
func TestRefreshToken_propagatesCustomClaims(t *testing.T) {
s, ormDB, _ := newRotationTestService(t)
s.SetClaimsResolver(mockClaimsResolver{claims: map[string]string{"account_id": "u-999"}})
// Login mint — IssueTokens resolves + stores the claims with the refresh row.
_, refresh, _, err := s.IssueTokens(context.Background(), "0xWALLET", "anchat-test")
if err != nil {
t.Fatalf("IssueTokens: %v", err)
}
if got := ormDB.claimsByToken[sha256Hex(refresh)]; got != `{"account_id":"u-999"}` {
t.Fatalf("claims not stored with refresh token; got %q", got)
}
// Refresh — the rotated access token must carry account_id, and the NEW
// refresh row must propagate the stored claims.
access, newRefresh, _, _, err := s.RefreshToken(context.Background(), refresh, "anchat-test")
if err != nil {
t.Fatalf("RefreshToken: %v", err)
}
claims, err := s.ParseAndVerifyJWT(access)
if err != nil {
t.Fatalf("ValidateJWT: %v", err)
}
if claims.Custom["account_id"] != "u-999" {
t.Errorf("rotated access token lost account_id; custom=%v", claims.Custom)
}
if got := ormDB.claimsByToken[sha256Hex(newRefresh)]; got != `{"account_id":"u-999"}` {
t.Errorf("rotation did not propagate claims to the new row; got %q", got)
}
// Second rotation hop (N+1 → N+2): the claim must survive repeated
// rotations, not just the first — the propagation is the whole point.
access2, _, _, _, err := s.RefreshToken(context.Background(), newRefresh, "anchat-test")
if err != nil {
t.Fatalf("second RefreshToken: %v", err)
}
claims2, err := s.ParseAndVerifyJWT(access2)
if err != nil {
t.Fatalf("ParseAndVerifyJWT (2nd): %v", err)
}
if claims2.Custom["account_id"] != "u-999" {
t.Errorf("account_id lost across the second rotation; custom=%v", claims2.Custom)
}
}

View File

@ -35,8 +35,7 @@ type Service struct {
edKeyID string
preferEdDSA bool
defaultNS string
apiKeyHMACSecret string // HMAC secret for hashing API keys before storage
claimsResolver ClaimsResolver // namespace claims-provider hook (bugboard #548); nil = none
apiKeyHMACSecret string // HMAC secret for hashing API keys before storage
}
func NewService(logger *logging.ColoredLogger, orm client.NetworkClient, signingKeyPEM string, defaultNS string) (*Service, error) {
@ -85,28 +84,6 @@ func (s *Service) SetRqliteClient(db rqlite.Client) {
s.db = db
}
// ClaimsResolver resolves additive, namespace-defined JWT custom claims for an
// authenticated wallet at token-mint time (bugboard #548/#920). The concrete
// implementation invokes the namespace's reserved `auth-claims-provider`
// serverless function; it MUST be fail-open (return nil, never error) so a
// missing/slow/broken provider never breaks authentication. Injected via
// SetClaimsResolver; nil = no custom claims (every namespace's default).
type ClaimsResolver interface {
ResolveClaims(ctx context.Context, wallet, namespace string) map[string]string
}
// SetClaimsResolver wires the namespace claims-provider hook used at mint time.
func (s *Service) SetClaimsResolver(r ClaimsResolver) { s.claimsResolver = r }
// resolveCustomClaims returns the namespace's additive claims for this wallet,
// or nil. Fail-open by contract — the resolver never errors.
func (s *Service) resolveCustomClaims(ctx context.Context, wallet, namespace string) map[string]string {
if s.claimsResolver == nil {
return nil
}
return s.claimsResolver.ResolveClaims(ctx, wallet, namespace)
}
// ErrRotationNotConfigured is returned by RefreshToken when the service
// wasn't given an rqlite client — refusing to rotate without atomicity
// guarantees is safer than rotating non-atomically.
@ -247,13 +224,8 @@ func (s *Service) IssueTokens(ctx context.Context, wallet, namespace string) (st
return "", "", 0, fmt.Errorf("signing key unavailable")
}
// Resolve namespace-defined additive claims (bugboard #548) ONCE at mint
// time. Stored with the refresh token below and replayed across rotations
// so the 15-min refresh path never re-invokes the provider.
custom := s.resolveCustomClaims(ctx, wallet, namespace)
// Issue access token (15m)
token, expUnix, err := s.GenerateJWT(namespace, wallet, 15*time.Minute, custom)
token, expUnix, err := s.GenerateJWT(namespace, wallet, 15*time.Minute)
if err != nil {
return "", "", 0, fmt.Errorf("failed to generate JWT: %w", err)
}
@ -274,8 +246,8 @@ func (s *Service) IssueTokens(ctx context.Context, wallet, namespace string) (st
db := s.orm.Database()
hashedRefresh := sha256Hex(refresh)
if _, err := db.Query(internalCtx,
"INSERT INTO refresh_tokens(namespace_id, subject, token, audience, expires_at, custom_claims) VALUES (?, ?, ?, ?, datetime('now', '+30 days'), ?)",
nsID, wallet, hashedRefresh, "gateway", marshalClaims(custom),
"INSERT INTO refresh_tokens(namespace_id, subject, token, audience, expires_at) VALUES (?, ?, ?, ?, datetime('now', '+30 days'))",
nsID, wallet, hashedRefresh, "gateway",
); err != nil {
return "", "", 0, fmt.Errorf("failed to store refresh token: %w", err)
}
@ -293,27 +265,6 @@ func (s *Service) IssueTokens(ctx context.Context, wallet, namespace string) (st
// This is the tripwire promised by RFC 9700 §4.12 (refresh-token rotation).
var ErrRefreshTokenReplay = fmt.Errorf("refresh token already rotated or invalid")
// ErrRefreshTransient is returned when refresh-token rotation fails for a
// RETRYABLE reason — an rqlite-layer error rather than a genuine bad/expired
// token. Bugboard #125: during a rolling gateway restart the rqlite leader is
// briefly unavailable (re-election window), so the lookup/rotation errors;
// collapsing that into "invalid token" forces a 401 → full SIWE re-auth, which
// is impossible on a locked device answering a VoIP-woken call. Callers MUST
// surface this as a retryable 503, NOT a 401, so the client retries within the
// ring window instead of tearing down the session.
var ErrRefreshTransient = fmt.Errorf("refresh token rotation temporarily unavailable")
const (
// refreshSelectRetries bounds how many times the refresh lookup is retried
// when the rqlite read errors (transient leader unavailability). The read
// is idempotent and happens BEFORE any write, so retrying is safe.
refreshSelectRetries = 3
// refreshSelectRetryDelay is the backoff between lookup retries. Three
// tries × 250ms rides out a brief leader re-election without adding
// meaningful latency to the common (healthy-leader) path.
refreshSelectRetryDelay = 250 * time.Millisecond
)
// RefreshToken validates the supplied refresh token, atomically rotates it
// (revokes the old, mints a new), and returns a fresh access token alongside
// the rotated refresh token.
@ -358,61 +309,22 @@ func (s *Service) RefreshToken(ctx context.Context, refreshToken, namespace stri
nsID, err := s.ResolveNamespaceID(ctx, namespace)
if err != nil {
// Bugboard #125: namespace resolution runs an rqlite query BEFORE the
// token lookup, so a leader re-election during a rolling restart fails
// here too. Treat it as retryable (→ 503), not a bad token (→ 401) —
// the refresh-path namespace comes from an already-authenticated
// session, so a resolution failure is a transient DB error, never the
// client's fault.
s.logger.ComponentWarn(logging.ComponentGeneral,
"refresh namespace resolution failed (transient, surfacing retryable)",
zap.String("namespace", namespace),
zap.Error(err))
return "", "", "", 0, ErrRefreshTransient
return "", "", "", 0, err
}
hashedRefresh := sha256Hex(refreshToken)
// Step 1: read the subject. Tells us who the token belongs to AND
// validates that it's currently usable (not revoked, not expired).
//
// Bugboard #125: distinguish a TRANSIENT rqlite error (leader briefly
// unavailable during a rolling restart) from a GENUINE token miss. The
// read is idempotent and pre-write, so we retry it a few times; only after
// exhausting retries do we surface ErrRefreshTransient (→ 503, client
// retries). An actual empty result (Count == 0) is a real bad/expired
// token → "invalid or expired" (→ 401). Collapsing the two used to 401 a
// valid session during every restart, defeating the VoIP-wake refresh.
selectQ := `SELECT subject, custom_claims FROM refresh_tokens
selectQ := `SELECT subject FROM refresh_tokens
WHERE namespace_id = ? AND token = ?
AND revoked_at IS NULL
AND (expires_at IS NULL OR expires_at > datetime('now'))
LIMIT 1`
var res *client.QueryResult
var selErr error
for attempt := 0; attempt < refreshSelectRetries; attempt++ {
res, selErr = ormDB.Query(internalCtx, selectQ, nsID, hashedRefresh)
if selErr == nil && res != nil {
break
}
if attempt < refreshSelectRetries-1 {
time.Sleep(refreshSelectRetryDelay)
}
}
if selErr != nil || res == nil {
// rqlite error persisted across retries — leader likely mid-election.
// Retryable, NOT an invalid token.
s.logger.ComponentWarn(logging.ComponentGeneral,
"refresh token lookup failed (transient rqlite error, surfacing retryable)",
zap.String("namespace", namespace),
zap.Error(selErr))
return "", "", "", 0, ErrRefreshTransient
}
if res.Count == 0 {
// Genuinely not found / revoked / expired — a real bad token.
res, err := ormDB.Query(internalCtx, selectQ, nsID, hashedRefresh)
if err != nil || res == nil || res.Count == 0 {
return "", "", "", 0, fmt.Errorf("invalid or expired refresh token")
}
var customClaimsJSON string
if len(res.Rows) > 0 && len(res.Rows[0]) > 0 {
if val, ok := res.Rows[0][0].(string); ok {
subject = val
@ -420,15 +332,7 @@ func (s *Service) RefreshToken(ctx context.Context, refreshToken, namespace stri
b, _ := json.Marshal(res.Rows[0][0])
_ = json.Unmarshal(b, &subject)
}
// custom_claims (bugboard #548) — resolved once at login, replayed on
// every rotation so the refresh path never re-invokes the provider.
if len(res.Rows[0]) > 1 {
if cc, ok := res.Rows[0][1].(string); ok {
customClaimsJSON = cc
}
}
}
custom := unmarshalClaims(customClaimsJSON)
// Step 2: atomic CAS — revoke the old row. RowsAffected is the lock.
// Two concurrent calls with the same refresh token: exactly one wins
@ -439,13 +343,7 @@ func (s *Service) RefreshToken(ctx context.Context, refreshToken, namespace stri
WHERE namespace_id = ? AND token = ? AND revoked_at IS NULL`,
nsID, hashedRefresh)
if err != nil {
// rqlite write error (leader unavailable) — retryable, not a bad
// token. No row was revoked, so a client retry is safe (bugboard #125).
s.logger.ComponentWarn(logging.ComponentGeneral,
"refresh token revoke failed (transient rqlite error, surfacing retryable)",
zap.String("namespace", namespace),
zap.Error(err))
return "", "", "", 0, ErrRefreshTransient
return "", "", "", 0, fmt.Errorf("revoke old refresh token: %w", err)
}
affected, _ := updRes.RowsAffected()
if affected == 0 {
@ -461,9 +359,8 @@ func (s *Service) RefreshToken(ctx context.Context, refreshToken, namespace stri
return "", "", "", 0, ErrRefreshTokenReplay
}
// Step 3: mint the new access JWT, carrying forward the stored custom
// claims so a rotated token keeps the same account_id etc. (bugboard #548).
accessToken, expUnix, err = s.GenerateJWT(namespace, subject, 15*time.Minute, custom)
// Step 3: mint the new access JWT.
accessToken, expUnix, err = s.GenerateJWT(namespace, subject, 15*time.Minute)
if err != nil {
return "", "", "", 0, fmt.Errorf("generate access token: %w", err)
}
@ -479,23 +376,10 @@ func (s *Service) RefreshToken(ctx context.Context, refreshToken, namespace stri
}
newRefreshToken = base64.RawURLEncoding.EncodeToString(rbuf)
hashedNew := sha256Hex(newRefreshToken)
// Re-marshal from the parsed map (not the raw stored string) so the new
// row and the freshly-minted access token are provably consistent and
// self-healing — a malformed stored blob converges to "" on both sides
// rather than being propagated forward verbatim. custom_claims is written
// ONLY here and in IssueTokens, both from a sanitized map (bugboard #548).
if _, err := ormDB.Query(internalCtx,
"INSERT INTO refresh_tokens(namespace_id, subject, token, audience, expires_at, custom_claims) VALUES (?, ?, ?, ?, datetime('now', '+30 days'), ?)",
nsID, subject, hashedNew, "gateway", marshalClaims(custom)); err != nil {
// The old token is already revoked (step 2). A retryable error here
// leaves the client to re-attempt — which will re-auth since the old
// token is gone — but that's strictly better than masking a transient
// failure as a permanent 401 (bugboard #125). Surface retryable.
s.logger.ComponentWarn(logging.ComponentGeneral,
"refresh token store failed after revoke (transient rqlite error)",
zap.String("namespace", namespace),
zap.Error(err))
return "", "", "", 0, ErrRefreshTransient
"INSERT INTO refresh_tokens(namespace_id, subject, token, audience, expires_at) VALUES (?, ?, ?, ?, datetime('now', '+30 days'))",
nsID, subject, hashedNew, "gateway"); err != nil {
return "", "", "", 0, fmt.Errorf("store rotated refresh token: %w", err)
}
return accessToken, newRefreshToken, subject, expUnix, nil

View File

@ -112,7 +112,7 @@ func TestJWTFlow(t *testing.T) {
sub := "0x1234567890abcdef1234567890abcdef12345678"
ttl := 15 * time.Minute
token, exp, err := s.GenerateJWT(ns, sub, ttl, nil)
token, exp, err := s.GenerateJWT(ns, sub, ttl)
if err != nil {
t.Fatalf("GenerateJWT failed: %v", err)
}
@ -192,7 +192,7 @@ func TestEdDSAJWTFlow(t *testing.T) {
ttl := 15 * time.Minute
// With EdDSA preferred, GenerateJWT should produce an EdDSA token
token, exp, err := s.GenerateJWT(ns, sub, ttl, nil)
token, exp, err := s.GenerateJWT(ns, sub, ttl)
if err != nil {
t.Fatalf("GenerateJWT (EdDSA) failed: %v", err)
}
@ -233,7 +233,7 @@ func TestRS256BackwardCompat(t *testing.T) {
// Generate an RS256 token directly (simulating a legacy token)
s.preferEdDSA = false
token, _, err := s.GenerateJWT("test-ns", "user1", 15*time.Minute, nil)
token, _, err := s.GenerateJWT("test-ns", "user1", 15*time.Minute)
if err != nil {
t.Fatalf("GenerateJWT (RS256) failed: %v", err)
}
@ -447,7 +447,7 @@ func TestEdDSACrossServiceVerify(t *testing.T) {
const wantSub = "BNbN2RNQTsYrrywZCLnhV9j3hd38jwcRqfxBecZX7hDE"
const wantNS = "anchat-test"
token, _, err := signer.GenerateJWT(wantNS, wantSub, 15*time.Minute, nil)
token, _, err := signer.GenerateJWT(wantNS, wantSub, 15*time.Minute)
if err != nil {
t.Fatalf("signer.GenerateJWT: %v", err)
}
@ -478,7 +478,7 @@ func TestEdDSACrossServiceVerify_differentKeysFail(t *testing.T) {
_, verKey, _ := ed25519.GenerateKey(rand.Reader)
verifier.SetEdDSAKey(verKey)
token, _, err := signer.GenerateJWT("ns", "sub", 15*time.Minute, nil)
token, _, err := signer.GenerateJWT("ns", "sub", 15*time.Minute)
if err != nil {
t.Fatalf("GenerateJWT: %v", err)
}

View File

@ -1,178 +0,0 @@
package gateway
import (
"context"
"encoding/json"
"errors"
"sort"
"sync"
"time"
"github.com/DeBrosOfficial/network/pkg/serverless"
"github.com/DeBrosOfficial/network/pkg/serverless/registry"
"go.uber.org/zap"
)
// Claims-provider hook (bugboard #548/#920).
//
// A namespace opts into additive, signed JWT claims by deploying a serverless
// function with the RESERVED name "auth-claims-provider". At /v1/auth/verify
// mint time the gateway invokes it (in the namespace's own context, so it can
// read the namespace's tables) with {"wallet","namespace"} and merges the
// string→string object it returns into the JWT's custom claims — e.g.
// {"account_id":"<users.user_id>"} so push devices key on the stable account
// identity rather than the authenticating wallet.
//
// Hard guarantees:
// - FAIL-OPEN: a missing / slow / erroring / malformed provider yields NO
// claims; authentication never breaks because a claims function is down.
// - Reserved claims (sub/iss/aud/iat/nbf/exp/namespace/custom) can never be
// set by the provider — the gateway controls those.
// - Bounded: timeout, max claim count, max total size.
const (
// claimsProviderFnName is the reserved function name a namespace deploys to
// inject additive JWT claims at mint time.
claimsProviderFnName = "auth-claims-provider"
// claimsProviderTimeout bounds the provider invocation so a slow/hung
// function never stalls the auth path past this budget (fail-open after).
claimsProviderTimeout = 2 * time.Second
// maxCustomClaims / maxCustomClaimsBytes cap what a provider may inject —
// JWTs ride in headers, and an unbounded claim blob is a DoS / cost vector.
maxCustomClaims = 16
maxCustomClaimsBytes = 4096
// claimsProviderWarnInterval rate-limits the fail-open WARN so a broken
// provider doesn't flood the log on every login.
claimsProviderWarnInterval = 30 * time.Second
)
// reservedClaimKeys can never be injected by a namespace claims provider; the
// gateway owns these. A provider that returns any of them has them dropped.
var reservedClaimKeys = map[string]struct{}{
"sub": {}, "iss": {}, "aud": {}, "iat": {},
"nbf": {}, "exp": {}, "namespace": {}, "custom": {},
}
// jwtClaimsProvider implements auth.ClaimsResolver by invoking the namespace's
// reserved auth-claims-provider function.
type jwtClaimsProvider struct {
invoker *serverless.Invoker
logger *zap.Logger
mu sync.Mutex
lastWarnUTC time.Time
}
// newJWTClaimsProvider builds the resolver. A nil invoker disables the hook
// (ResolveClaims returns nil).
func newJWTClaimsProvider(invoker *serverless.Invoker, logger *zap.Logger) *jwtClaimsProvider {
if logger == nil {
logger = zap.NewNop()
}
return &jwtClaimsProvider{invoker: invoker, logger: logger.Named("claims-provider")}
}
// ResolveClaims invokes the namespace's auth-claims-provider and returns the
// sanitized additive claims, or nil. Never errors (fail-open contract).
func (p *jwtClaimsProvider) ResolveClaims(ctx context.Context, wallet, namespace string) map[string]string {
if p.invoker == nil || wallet == "" || namespace == "" {
return nil
}
input, err := json.Marshal(map[string]string{"wallet": wallet, "namespace": namespace})
if err != nil {
return nil
}
callCtx, cancel := context.WithTimeout(ctx, claimsProviderTimeout)
defer cancel()
resp, err := p.invoker.Invoke(callCtx, &serverless.InvokeRequest{
Namespace: namespace,
FunctionName: claimsProviderFnName,
Input: input,
// Gateway-initiated, no end-user caller → system trigger skips the
// per-caller authorization check.
TriggerType: serverless.TriggerTypeInternal,
})
if err != nil || resp == nil {
// The namespace simply hasn't deployed the function (registry miss) is
// the normal no-claims case for most namespaces — stay silent. Any
// other failure is a real problem worth a rate-limited WARN.
if !errors.Is(err, registry.ErrFunctionNotFound) {
p.warnRateLimited("claims provider invoke failed (minting without custom claims)",
namespace, err)
}
return nil
}
if resp.Status != serverless.InvocationStatusSuccess {
p.warnRateLimited("claims provider returned non-success (minting without custom claims)",
namespace, nil)
return nil
}
return sanitizeProviderClaims(resp.Output)
}
// sanitizeProviderClaims parses the provider's RAW stdout as a bare JSON object
// of additive claims (NOT an {ok,result} Ack envelope — per the #976 contract)
// and returns a safe string→string subset: string values only, reserved keys
// dropped, bounded count and total size. Any parse failure → nil (fail-open).
func sanitizeProviderClaims(raw []byte) map[string]string {
if len(raw) == 0 || len(raw) > maxCustomClaimsBytes {
return nil
}
var obj map[string]any
if err := json.Unmarshal(raw, &obj); err != nil || len(obj) == 0 {
return nil
}
// Iterate in sorted key order so an over-budget provider payload truncates
// DETERMINISTICALLY (Go map iteration is randomized) — the same output must
// always yield the same claims, never a per-login-varying subset.
keys := make([]string, 0, len(obj))
for k := range obj {
keys = append(keys, k)
}
sort.Strings(keys)
out := make(map[string]string, len(obj))
total := 0
for _, k := range keys {
if len(out) >= maxCustomClaims {
break
}
if _, reserved := reservedClaimKeys[k]; reserved {
continue
}
s, ok := obj[k].(string) // string→string contract; non-string values dropped
if !ok {
continue
}
total += len(k) + len(s)
if total > maxCustomClaimsBytes {
break
}
out[k] = s
}
if len(out) == 0 {
return nil
}
return out
}
func (p *jwtClaimsProvider) warnRateLimited(msg, namespace string, err error) {
p.mu.Lock()
now := time.Now()
if now.Sub(p.lastWarnUTC) < claimsProviderWarnInterval {
p.mu.Unlock()
return
}
p.lastWarnUTC = now
p.mu.Unlock()
fields := []zap.Field{zap.String("namespace", namespace), zap.String("function", claimsProviderFnName)}
if err != nil {
fields = append(fields, zap.Error(err))
}
p.logger.Warn(msg, fields...)
}

View File

@ -1,98 +0,0 @@
package gateway
import (
"testing"
)
// Bugboard #548: the claims-provider sanitizer is the security boundary —
// a namespace function must NOT be able to forge reserved claims, inject
// non-string values, or blow the size budget.
func TestSanitizeProviderClaims_happyPath(t *testing.T) {
out := sanitizeProviderClaims([]byte(`{"account_id":"u-123","tier":"pro"}`))
if out["account_id"] != "u-123" || out["tier"] != "pro" {
t.Fatalf("expected additive claims, got %v", out)
}
}
func TestSanitizeProviderClaims_dropsReservedKeys(t *testing.T) {
// A malicious provider tries to override sub/exp/namespace — must be dropped.
out := sanitizeProviderClaims([]byte(`{"sub":"0xATTACKER","exp":"9999999999","namespace":"evil","account_id":"u-1"}`))
for _, k := range []string{"sub", "exp", "namespace"} {
if _, present := out[k]; present {
t.Errorf("reserved key %q must be dropped, got %v", k, out)
}
}
if out["account_id"] != "u-1" {
t.Errorf("legitimate claim dropped: %v", out)
}
}
func TestSanitizeProviderClaims_nonStringValuesDropped(t *testing.T) {
out := sanitizeProviderClaims([]byte(`{"account_id":"u-1","num":5,"obj":{"a":1},"arr":[1],"ok":"yes"}`))
if len(out) != 2 || out["account_id"] != "u-1" || out["ok"] != "yes" {
t.Errorf("non-string values must be dropped; got %v", out)
}
}
func TestSanitizeProviderClaims_failOpenOnGarbage(t *testing.T) {
for _, bad := range [][]byte{
nil,
[]byte(``),
[]byte(`not json`),
[]byte(`[1,2,3]`), // array, not object
[]byte(`"just a string"`), // scalar
[]byte(`{}`), // empty object
[]byte(`{"ok":true,"result":{"account_id":"u"}}`), // Ack envelope (wrong shape) → no top-level string claims
} {
if got := sanitizeProviderClaims(bad); got != nil {
t.Errorf("garbage %q must yield nil (fail-open), got %v", bad, got)
}
}
}
func TestSanitizeProviderClaims_countAndSizeCapped(t *testing.T) {
// Way more than maxCustomClaims string entries.
buf := []byte("{")
for i := 0; i < maxCustomClaims+20; i++ {
if i > 0 {
buf = append(buf, ',')
}
buf = append(buf, []byte(`"k`)...)
buf = append(buf, []byte(itoa(i))...)
buf = append(buf, []byte(`":"v"`)...)
}
buf = append(buf, '}')
out := sanitizeProviderClaims(buf)
if len(out) > maxCustomClaims {
t.Errorf("claim count not capped: got %d, max %d", len(out), maxCustomClaims)
}
// Oversized total payload → rejected outright.
big := make([]byte, maxCustomClaimsBytes+10)
for i := range big {
big[i] = 'a'
}
if got := sanitizeProviderClaims(big); got != nil {
t.Errorf("oversized payload must be rejected, got %v", got)
}
}
func TestResolveClaims_nilInvokerOrEmptyArgs(t *testing.T) {
p := newJWTClaimsProvider(nil, nil) // nil invoker disables the hook
if got := p.ResolveClaims(nil, "0xW", "ns"); got != nil {
t.Errorf("nil invoker must yield nil claims, got %v", got)
}
}
func itoa(n int) string {
if n == 0 {
return "0"
}
var b []byte
for n > 0 {
b = append([]byte{byte('0' + n%10)}, b...)
n /= 10
}
return string(b)
}

View File

@ -648,14 +648,6 @@ func initializeServerless(logger *logging.ColoredLogger, cfg *Config, deps *Depe
authService.SetRqliteClient(deps.ORMClient)
}
// Wire the namespace claims-provider hook (bugboard #548): at JWT mint time
// the auth service invokes the namespace's reserved `auth-claims-provider`
// function (if deployed) and merges its additive claims (e.g. account_id)
// into the token. Fail-open — a missing/slow provider never breaks auth.
if deps.ServerlessInvoker != nil {
authService.SetClaimsResolver(newJWTClaimsProvider(deps.ServerlessInvoker, logger.Logger))
}
// Load or create EdDSA key for new JWT tokens. Bug #215 fix: when
// cfg.ClusterSecret is set, the key is derived deterministically from
// it via HKDF, so every gateway in the cluster shares the same Ed25519

View File

@ -393,32 +393,6 @@ func TestRefreshHandler_NilAuthService(t *testing.T) {
}
}
// Bugboard #125: a non-bad-token failure (here ErrRotationNotConfigured from a
// service with no rqlite client) must surface as a RETRYABLE 503 with a
// Retry-After header — NOT a 401 that would force a locked device into an
// impossible SIWE re-auth mid-call-ring.
func TestRefreshHandler_TransientError_returns503Retryable(t *testing.T) {
svc, err := authsvc.NewService(testLogger(), nil, "", "default")
if err != nil {
t.Fatalf("failed to create auth service: %v", err)
}
h := NewHandlers(testLogger(), svc, nil, "default", noopInternalAuth)
body, _ := json.Marshal(RefreshRequest{RefreshToken: "some-valid-looking-token"})
req := httptest.NewRequest(http.MethodPost, "/v1/auth/refresh", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
h.RefreshHandler(rec, req)
if rec.Code != http.StatusServiceUnavailable {
t.Fatalf("transient refresh failure must be 503, got %d", rec.Code)
}
if rec.Header().Get("Retry-After") == "" {
t.Error("503 refresh response should carry a Retry-After header")
}
}
// --- APIKeyToJWTHandler tests ---------------------------------------------
func TestAPIKeyToJWTHandler_MissingKey(t *testing.T) {

View File

@ -2,7 +2,6 @@ package auth
import (
"encoding/json"
"errors"
"net/http"
"strings"
"time"
@ -58,7 +57,7 @@ func (h *Handlers) APIKeyToJWTHandler(w http.ResponseWriter, r *http.Request) {
return
}
token, expUnix, err := h.authService.GenerateJWT(ns, key, 15*time.Minute, nil)
token, expUnix, err := h.authService.GenerateJWT(ns, key, 15*time.Minute)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
@ -104,20 +103,11 @@ func (h *Handlers) RefreshHandler(w http.ResponseWriter, r *http.Request) {
// the SDK persists it (bug #239 fix) and uses it on the next refresh.
token, newRefreshToken, subject, expUnix, err := h.authService.RefreshToken(r.Context(), req.RefreshToken, req.Namespace)
if err != nil {
// Bugboard #125: a TRANSIENT rotation failure (rqlite leader briefly
// unavailable during a rolling restart) must surface as a retryable
// 503 — NOT a 401 — so the client retries within the call-ring window
// instead of tearing the session down to a full SIWE re-auth, which is
// impossible on a locked device answering a VoIP-woken call.
if errors.Is(err, authsvc.ErrRefreshTransient) || errors.Is(err, authsvc.ErrRotationNotConfigured) {
w.Header().Set("Retry-After", "1")
writeError(w, http.StatusServiceUnavailable, "refresh temporarily unavailable, retry")
return
}
// Genuine bad/expired/replayed token. The service emits a WARN log on
// replay (ErrRefreshTokenReplay) so the operator can investigate. We
// surface a generic 401 regardless — leaking "your token was already
// used" would help an attacker confirm a stolen token was rotated.
// The service emits a WARN log on replay (ErrRefreshTokenReplay)
// so the operator can investigate. We surface a generic 401 here
// regardless — leaking "your token was already used" to the
// caller would help an attacker confirm a stolen token has been
// rotated.
writeError(w, http.StatusUnauthorized, "invalid or expired refresh token")
return
}

View File

@ -157,24 +157,6 @@ func (h *ServerlessHandlers) getJWTSubjectFromRequest(r *http.Request) string {
return strings.TrimSpace(claims.Sub)
}
// getJWTExpiryFromRequest returns the Bearer JWT's `exp` claim (unix seconds)
// if the request was JWT-authenticated, or 0 otherwise (e.g. API-key auth, or
// a token without an exp). Persistent WS connections capture this at upgrade
// to enforce mid-session expiry — a long-lived socket must stop serving RPCs
// once its authorizing token expires, unless refreshed via the #321
// auth.refresh control frame. Bugboard #868.
func (h *ServerlessHandlers) getJWTExpiryFromRequest(r *http.Request) int64 {
v := r.Context().Value(ctxkeys.JWT)
if v == nil {
return 0
}
claims, ok := v.(*auth.JWTClaims)
if !ok || claims == nil {
return 0
}
return claims.Exp
}
// getWalletFromRequest extracts wallet address from JWT.
func (h *ServerlessHandlers) getWalletFromRequest(r *http.Request) string {
// Import strings package functions inline to avoid circular dependencies

View File

@ -1,152 +0,0 @@
package serverless
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/DeBrosOfficial/network/pkg/gateway/auth"
"github.com/DeBrosOfficial/network/pkg/gateway/ctxkeys"
)
// TestWSJWTExpired is the core security regression guard for bugboard #868: a
// persistent WS authenticates ONCE at upgrade, and the read loop must stop
// serving application frames once the authorizing JWT is past exp+grace.
//
// If wsJWTExpired starts returning false for a clearly-expired token (or true
// for a still-valid one), an expired token regains full RPC access — including
// turn.credentials minting — for the socket's lifetime.
func TestWSJWTExpired(t *testing.T) {
// Fixed reference instant so the table is deterministic (the read loop
// uses time.Now() in production; the pure function takes `now` for tests).
now := time.Unix(1_700_000_000, 0)
grace := 120 * time.Second
cases := []struct {
name string
expUnix int64
now time.Time
want bool
}{
{
name: "no expiry to enforce (API-key auth, exp=0) never expires",
expUnix: 0,
now: now,
want: false,
},
{
name: "negative exp treated as no-expiry (defensive)",
expUnix: -5,
now: now,
want: false,
},
{
name: "token valid, well before exp",
expUnix: now.Add(10 * time.Minute).Unix(),
now: now,
want: false,
},
{
name: "token just past exp but inside grace window — still allowed",
expUnix: now.Add(-30 * time.Second).Unix(),
now: now,
want: false,
},
{
name: "token exactly at exp+grace boundary — not yet expired (After is strict)",
expUnix: now.Add(-grace).Unix(),
now: now,
want: false,
},
{
name: "token past exp+grace — expired, must reject",
expUnix: now.Add(-(grace + time.Second)).Unix(),
now: now,
want: true,
},
{
name: "token long expired — expired",
expUnix: now.Add(-24 * time.Hour).Unix(),
now: now,
want: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := wsJWTExpired(tc.expUnix, tc.now, grace)
if got != tc.want {
t.Errorf("wsJWTExpired(exp=%d, now=%d, grace=%s) = %v; want %v",
tc.expUnix, tc.now.Unix(), grace, got, tc.want)
}
})
}
}
// TestGetJWTExpiryFromRequest verifies the gateway reads the authorizing JWT's
// exp off the request context at upgrade. This is the value the read loop
// enforces for the socket's lifetime (#868); if it silently returns 0 for a
// JWT-authenticated request, expiry enforcement is disabled and the bug
// re-opens.
func TestGetJWTExpiryFromRequest(t *testing.T) {
h := newTestHandlers(nil)
t.Run("JWT with exp returns exp", func(t *testing.T) {
claims := &auth.JWTClaims{Sub: "alice", Exp: 1_700_000_123}
req := httptest.NewRequest(http.MethodGet, "/", nil)
req = req.WithContext(context.WithValue(req.Context(), ctxkeys.JWT, claims))
if got := h.getJWTExpiryFromRequest(req); got != 1_700_000_123 {
t.Errorf("getJWTExpiryFromRequest = %d; want 1700000123", got)
}
})
t.Run("no JWT on context returns 0 (API-key / unauthenticated)", func(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "/", nil)
if got := h.getJWTExpiryFromRequest(req); got != 0 {
t.Errorf("getJWTExpiryFromRequest = %d; want 0", got)
}
})
t.Run("nil claims under key returns 0", func(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "/", nil)
var nilClaims *auth.JWTClaims
req = req.WithContext(context.WithValue(req.Context(), ctxkeys.JWT, nilClaims))
if got := h.getJWTExpiryFromRequest(req); got != 0 {
t.Errorf("getJWTExpiryFromRequest = %d; want 0", got)
}
})
}
// TestWSAuthState_refreshExtendsExpiry documents the auth.refresh contract that
// the read loop relies on (#868 + #321): a successful auth.refresh moves the
// enforced expiry forward to the new token's exp, so a socket that refreshes
// before its grace window closes keeps serving RPCs uninterrupted.
//
// We assert the state-transition directly (the full handler needs a live WS
// conn for the ack write; that path is exercised by integration tests). The
// invariant: after refresh, a `now` that WOULD have expired the old token no
// longer expires the socket.
func TestWSAuthState_refreshExtendsExpiry(t *testing.T) {
now := time.Unix(1_700_000_000, 0)
grace := 120 * time.Second
oldExp := now.Add(-(grace + time.Minute)).Unix() // already past grace → expired
state := &wsAuthState{expUnix: oldExp}
if !wsJWTExpired(state.expUnix, now, grace) {
t.Fatalf("precondition: old token should be expired at now")
}
// Simulate what handleAuthRefresh does on success: adopt the new token's
// exp.
newExp := now.Add(15 * time.Minute).Unix()
state.expUnix = newExp
if wsJWTExpired(state.expUnix, now, grace) {
t.Errorf("after refresh the socket must NOT be expired (exp=%d, now=%d)",
state.expUnix, now.Unix())
}
}

View File

@ -22,51 +22,6 @@ import (
// application traffic that goes straight to WASM. Bugboard #321.
var oramaControlFramePrefix = []byte(`"__orama"`)
const (
// wsJWTExpiryGrace is the slack past a JWT's `exp` before the gateway
// stops serving application frames on a persistent WS. It covers clock
// skew between the gateway and the issuing path plus the client's
// refresh round-trip (the #321 auth.refresh control frame). Bugboard
// #868: without this, a socket authenticated ONCE at upgrade keeps full
// RPC access — including turn.credentials minting — for the socket's
// entire lifetime even after the token expires.
//
// Note: on the auth.refresh path ParseAndVerifyJWT independently allows
// its own ±60s exp skew, so worst-case service-past-exp is this grace
// plus that skew (~180s), not 120s flat. Both bounds are deliberate and
// the socket is force-closed once they elapse.
wsJWTExpiryGrace = 120 * time.Second
// wsCloseJWTExpired is the application-specific WS close code sent when a
// persistent socket is torn down for serving past its JWT expiry. It sits
// in the private-use range (4000-4999) and is distinct from protocol
// codes so clients can special-case it as "reconnect with a fresh token".
// Bugboard #868.
wsCloseJWTExpired = 4401
)
// wsAuthState carries the live JWT expiry for a persistent WS across the read
// loop and the auth.refresh control handler. Both run in the SAME goroutine —
// control frames are handled inline in the read loop before any frame reaches
// WASM — so the field needs no synchronization. Bugboard #868.
type wsAuthState struct {
// expUnix is the `exp` (unix seconds) of the JWT currently authorizing
// this socket. 0 means "no expiry to enforce" (e.g. API-key auth or a
// token without exp) — such sockets are exempt from mid-session expiry.
expUnix int64
}
// wsJWTExpired reports whether a persistent WS authorized by a JWT expiring at
// expUnix (unix seconds) has passed its enforcement deadline at time now,
// allowing grace for clock skew + refresh round-trip. expUnix <= 0 means there
// is no expiry to enforce and is never considered expired. Bugboard #868.
func wsJWTExpired(expUnix int64, now time.Time, grace time.Duration) bool {
if expUnix <= 0 {
return false
}
return now.After(time.Unix(expUnix, 0).Add(grace))
}
// oramaControlFrame is the wire shape for gateway-handled control
// frames on a persistent WS. The single Type field discriminates;
// payload fields specific to each Type ride alongside.
@ -142,12 +97,6 @@ func (h *ServerlessHandlers) handlePersistentWebSocket(
invCtx := h.buildPersistentInvocationContext(r, fn, clientID)
callerWallet := invCtx.CallerWallet
// Capture the authorizing JWT's expiry so the read loop can enforce it
// for the socket's lifetime (bugboard #868). A successful auth.refresh
// control frame updates this in place; 0 (non-JWT auth) disables the
// check.
authState := &wsAuthState{expUnix: h.getJWTExpiryFromRequest(r)}
// Instantiate the persistent module. This compiles once (cached) and
// creates one wazero instance bound to this connection.
module, err := h.engine.InstantiatePersistent(r.Context(), fn, invCtx)
@ -247,7 +196,7 @@ func (h *ServerlessHandlers) handlePersistentWebSocket(
// avoids json.Unmarshal for every application frame. Only
// frames carrying the `"__orama"` key get parsed.
if bytes.Contains(frame, oramaControlFramePrefix) {
handled, ackErr := h.handleOramaControlFrame(frame, fn, inst, authState, namespace, clientID, conn)
handled, ackErr := h.handleOramaControlFrame(frame, fn, inst, namespace, clientID, conn)
if ackErr != nil {
h.logger.Warn("persistent WS: control-frame ack write failed",
zap.String("client_id", clientID),
@ -264,26 +213,6 @@ func (h *ServerlessHandlers) handlePersistentWebSocket(
// application frame.
}
// Bugboard #868: a persistent WS authenticates ONCE at upgrade.
// Before handing an application frame to WASM, reject it once the
// authorizing JWT is past exp+grace — otherwise an expired token
// keeps serving RPCs (incl. turn.credentials minting) indefinitely.
// The client keeps the socket alive by sending an
// {"__orama":"auth.refresh"} control frame (handled above, which
// bypasses this check) before the token expires. The check runs
// only on application frames so an expired client can still recover
// via auth.refresh rather than being locked out.
if wsJWTExpired(authState.expUnix, time.Now(), wsJWTExpiryGrace) {
h.logger.Info("persistent WS: closing — JWT expired without refresh",
zap.String("client_id", clientID),
zap.String("namespace", namespace),
zap.Int64("jwt_exp", authState.expUnix))
_ = conn.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(wsCloseJWTExpired, "jwt expired; reconnect with a fresh token"),
time.Now().Add(time.Second))
break
}
if err := inst.Submit(frame); err != nil {
h.logger.Warn("persistent WS submit failed (queue full?)",
zap.String("client_id", clientID),
@ -347,7 +276,6 @@ func (h *ServerlessHandlers) handleOramaControlFrame(
frame []byte,
fn *serverless.Function,
inst *persistent.Instance,
authState *wsAuthState,
namespace, clientID string,
conn *websocket.Conn,
) (handled bool, ackErr error) {
@ -363,7 +291,7 @@ func (h *ServerlessHandlers) handleOramaControlFrame(
switch ctrl.Type {
case "auth.refresh":
return true, h.handleAuthRefresh(ctrl, fn, inst, authState, namespace, clientID, conn)
return true, h.handleAuthRefresh(ctrl, fn, inst, namespace, clientID, conn)
default:
// Unknown control type — ack with an error so the client knows
// the frame was seen but ignored. Treat as handled (don't
@ -384,7 +312,6 @@ func (h *ServerlessHandlers) handleAuthRefresh(
ctrl oramaControlFrame,
fn *serverless.Function,
inst *persistent.Instance,
authState *wsAuthState,
namespace, clientID string,
conn *websocket.Conn,
) error {
@ -480,12 +407,6 @@ func (h *ServerlessHandlers) handleAuthRefresh(
})
}
// Extend the socket's expiry enforcement to the new token's exp so the
// read loop keeps serving RPCs past the old deadline (bugboard #868).
// authState and the read loop share this goroutine, so the write is
// race-free.
authState.expUnix = claims.Exp
h.logger.Info("persistent WS: auth.refresh applied",
zap.String("client_id", clientID),
zap.String("namespace", namespace),

View File

@ -23,7 +23,7 @@ func TestJWTGenerateAndParse(t *testing.T) {
t.Fatalf("failed to create service: %v", err)
}
tok, exp, err := svc.GenerateJWT("ns1", "subj", time.Minute, nil)
tok, exp, err := svc.GenerateJWT("ns1", "subj", time.Minute)
if err != nil || exp <= 0 {
t.Fatalf("gen err=%v exp=%d", err, exp)
}
@ -50,7 +50,7 @@ func TestJWTExpired(t *testing.T) {
}
// Use sufficiently negative TTL to bypass allowed clock skew
tok, _, err := svc.GenerateJWT("ns1", "subj", -2*time.Minute, nil)
tok, _, err := svc.GenerateJWT("ns1", "subj", -2*time.Minute)
if err != nil {
t.Fatalf("gen err=%v", err)
}

View File

@ -51,7 +51,7 @@ func newAuthServiceForTest(t *testing.T) *auth.Service {
func TestAuthMiddleware_WSJWTQuery_validToken(t *testing.T) {
svc := newAuthServiceForTest(t)
token, _, err := svc.GenerateJWT("anchat-test", "0xWALLET_SUBJECT", 15*time.Minute, nil)
token, _, err := svc.GenerateJWT("anchat-test", "0xWALLET_SUBJECT", 15*time.Minute)
if err != nil {
t.Fatalf("GenerateJWT: %v", err)
}
@ -125,7 +125,7 @@ func TestAuthMiddleware_WSJWTQuery_ignoredOnNonWSRequest(t *testing.T) {
// privacy issues of JWTs leaking via referrer headers, browser history,
// and access logs.
svc := newAuthServiceForTest(t)
token, _, err := svc.GenerateJWT("ns", "sub", 15*time.Minute, nil)
token, _, err := svc.GenerateJWT("ns", "sub", 15*time.Minute)
if err != nil {
t.Fatalf("GenerateJWT: %v", err)
}
@ -156,8 +156,8 @@ func TestAuthMiddleware_WSJWTQuery_headerWinsOverQuery(t *testing.T) {
// Header path runs FIRST and wins. Verifies the query fallback is a
// fallback, not an override.
svc := newAuthServiceForTest(t)
headerJWT, _, _ := svc.GenerateJWT("ns-header", "sub-header", 15*time.Minute, nil)
queryJWT, _, _ := svc.GenerateJWT("ns-query", "sub-query", 15*time.Minute, nil)
headerJWT, _, _ := svc.GenerateJWT("ns-header", "sub-header", 15*time.Minute)
queryJWT, _, _ := svc.GenerateJWT("ns-query", "sub-query", 15*time.Minute)
g := &Gateway{authService: svc}
@ -242,7 +242,7 @@ func TestAuthMiddleware_WSJWTQuery_malformedJWTFallsThrough(t *testing.T) {
func TestValidateAuthForNamespaceProxy_WSJWTQuery(t *testing.T) {
svc := newAuthServiceForTest(t)
token, _, err := svc.GenerateJWT("anchat-test", "0xWALLET", 15*time.Minute, nil)
token, _, err := svc.GenerateJWT("anchat-test", "0xWALLET", 15*time.Minute)
if err != nil {
t.Fatalf("GenerateJWT: %v", err)
}
@ -270,7 +270,7 @@ func TestValidateAuthForNamespaceProxy_WSJWTQuery(t *testing.T) {
func TestValidateAuthForNamespaceProxy_WSJWTQuery_ignoredOnNonWS(t *testing.T) {
svc := newAuthServiceForTest(t)
token, _, err := svc.GenerateJWT("anchat-test", "0xWALLET", 15*time.Minute, nil)
token, _, err := svc.GenerateJWT("anchat-test", "0xWALLET", 15*time.Minute)
if err != nil {
t.Fatalf("GenerateJWT: %v", err)
}
@ -295,7 +295,7 @@ func TestValidateAuthForNamespaceProxy_WSJWTQuery_ignoredOnNonWS(t *testing.T) {
// doesn't leak into proxy hops or downstream logs.
func TestAuthMiddleware_WSJWTQuery_strippedAfterVerify(t *testing.T) {
svc := newAuthServiceForTest(t)
token, _, err := svc.GenerateJWT("anchat-test", "0xWALLET", 15*time.Minute, nil)
token, _, err := svc.GenerateJWT("anchat-test", "0xWALLET", 15*time.Minute)
if err != nil {
t.Fatalf("GenerateJWT: %v", err)
}

View File

@ -1844,13 +1844,6 @@ type restoreWebRTC struct {
turnDomain string
turnSecret string
stealthDomain string // feat-124: empty when webrtc stealth is disabled
// unresolved is true when the state file had no TURN secret AND the DB
// fallback ERRORED (vs. resolved-but-not-enabled). The caller must NOT
// write a WebRTC-disabled gateway config off an unresolved lookup — that
// silently kills turn.credentials on a node that should serve TURN
// (bugboard #130: a decrypt failure after cluster-secret rotation was
// swallowed into "disabled"). enabled is always false when unresolved.
unresolved bool
}
// chooseRestoreWebRTC resolves a restored gateway's WebRTC config. TWO
@ -1876,7 +1869,7 @@ type restoreWebRTC struct {
// standing up the full restore path (systemd spawner + DB + port store).
func chooseRestoreWebRTC(
stateHasSFU bool, stateSFUPort int, stateTURNDomain, stateTURNSecret, stateStealthDomain string,
dbFetch func() (turnSecret, turnDomain, stealthDomain string, sfuPort int, resolved bool),
dbFetch func() (turnSecret, turnDomain, stealthDomain string, sfuPort int),
) restoreWebRTC {
turnSecret := stateTURNSecret
turnDomain := stateTURNDomain
@ -1892,18 +1885,8 @@ func chooseRestoreWebRTC(
// the state file was written reaches here with an empty secret.
// (Stealth toggles DO rewrite cluster state on every node, so the
// state-first read stays fresh for stealthDomain too.)
unresolved := false
if turnSecret == "" {
dbSecret, dbDomain, dbStealth, dbSFU, resolved := dbFetch()
switch {
case !resolved:
// The DB/decrypt lookup ERRORED — we do not know whether WebRTC
// is enabled. This is DISTINCT from resolved-but-empty (genuinely
// disabled). Signal unresolved so the caller preserves the
// running config instead of writing a TURN-disabled one
// (bugboard #130).
unresolved = true
case dbSecret != "":
if dbSecret, dbDomain, dbStealth, dbSFU := dbFetch(); dbSecret != "" {
turnSecret = dbSecret
if turnDomain == "" {
turnDomain = dbDomain
@ -1918,8 +1901,7 @@ func chooseRestoreWebRTC(
}
return restoreWebRTC{
enabled: !unresolved && (turnSecret != "" || sfuPort > 0),
unresolved: unresolved,
enabled: turnSecret != "" || sfuPort > 0,
sfuPort: sfuPort,
turnDomain: turnDomain,
turnSecret: turnSecret,
@ -2080,25 +2062,10 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl
// file is incomplete.
wr := chooseRestoreWebRTC(
state.HasSFU, state.SFUSignalingPort, state.TURNDomain, state.TURNSharedSecret, state.TURNStealthDomain,
func() (turnSecret, turnDomain, stealthDomain string, sfuPort int, resolved bool) {
func() (turnSecret, turnDomain, stealthDomain string, sfuPort int) {
webrtcCfg, err := cm.GetWebRTCConfig(ctx, state.NamespaceName)
if err != nil {
// Do NOT swallow this into "disabled". A decrypt failure
// (stale cluster-secret-derived key after rotation) or a
// transient read error would otherwise silently disable
// TURN on this node — turn.credentials then returns
// namespace_not_configured (bugboard #130). Surface it
// loudly and signal unresolved so the caller preserves the
// running config.
cm.logger.Error("WebRTC TURN secret unresolvable on this node — refusing to silently disable TURN; preserving existing gateway config. Likely a cluster-secret rotation; regenerate with `orama namespace disable webrtc` then `orama namespace enable webrtc`.",
zap.String("namespace", state.NamespaceName),
zap.String("node_id", cm.localNodeID),
zap.Error(err))
return "", "", "", 0, false
}
if webrtcCfg == nil {
// Resolved cleanly: the namespace genuinely has no WebRTC.
return "", "", "", 0, true
if err != nil || webrtcCfg == nil {
return "", "", "", 0
}
// TURN is namespace-wide; SFU port is per-node and may be
// absent on a gateway-only (non-SFU) node — that's fine,
@ -2110,7 +2077,7 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl
return webrtcCfg.TURNSharedSecret,
fmt.Sprintf("turn.ns-%s.%s", state.NamespaceName, cm.baseDomain),
cm.stealthDomainFor(state.NamespaceName, webrtcCfg),
sfu, true
sfu
},
)
if wr.enabled {
@ -2127,48 +2094,20 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/health", pb.GatewayHTTPPort))
if err == nil {
resp.Body.Close()
switch {
case wr.unresolved:
// Bugboard #130 guard: the WebRTC secret could not be resolved
// (DB/decrypt error, logged above). The gateway is already up
// and may be serving TURN from a valid on-disk secret — do NOT
// reconcile it to the empty/disabled block we'd otherwise
// build, which would kill turn.credentials on this node. Leave
// the running config untouched; the operator regenerates the
// secret.
//
// Note: this also defers ReconcileGateway's #837
// secrets-encryption-key reconcile for this one converge pass.
// That is acceptable — the operator action that fixes the
// unresolved TURN secret (regenerate + restart) re-runs the
// full reconcile, and pre-fix this path would have corrupted
// the WebRTC block anyway.
cm.logger.Error("Gateway up but WebRTC secret unresolved — skipping reconcile to avoid disabling TURN on the running config (bugboard #130)",
zap.String("namespace", state.NamespaceName))
default:
// Gateway is already up. Reconcile config drift (bugboard #25 —
// the WARM case): if the running gateway's on-disk config has a
// WebRTC block that differs from the desired (e.g. it lost the
// block on a prior restart where it stayed healthy and the
// cold-spawn path below never ran), rewrite the config +
// restart. ReconcileGateway is a no-op when the on-disk block
// already matches, so this does NOT cause a restart loop.
if rerr := cm.systemdSpawner.ReconcileGateway(ctx, state.NamespaceName, cm.localNodeID, gwCfg); rerr != nil {
cm.logger.Warn("Gateway WebRTC reconcile failed (leaving running config as-is)",
zap.String("namespace", state.NamespaceName), zap.Error(rerr))
}
// Gateway is already up. Reconcile config drift (bugboard #25 —
// the WARM case): if the running gateway's on-disk config has a
// WebRTC block that differs from the desired (e.g. it lost the
// block on a prior restart where it stayed healthy and the
// cold-spawn path below never ran), rewrite the config + restart.
// ReconcileGateway is a no-op when the on-disk block already
// matches, so this does NOT cause a restart loop on every boot.
if rerr := cm.systemdSpawner.ReconcileGateway(ctx, state.NamespaceName, cm.localNodeID, gwCfg); rerr != nil {
cm.logger.Warn("Gateway WebRTC reconcile failed (leaving running config as-is)",
zap.String("namespace", state.NamespaceName), zap.Error(rerr))
}
} else {
// Gateway is down → cold spawn. We must bring a gateway up
// regardless (the namespace needs one); but if the WebRTC secret
// was unresolved we can't write a working TURN block, so warn
// loudly that TURN is degraded on this node until the secret is
// regenerated (bugboard #130).
switch {
case wr.unresolved:
cm.logger.Error("Cold-spawning gateway with TURN UNAVAILABLE — WebRTC secret unresolved on this node; turn.credentials will return namespace_not_configured until it is regenerated (`orama namespace disable webrtc` then `orama namespace enable webrtc`)",
zap.String("namespace", state.NamespaceName))
case wr.enabled && !state.HasSFU:
// Gateway is down → cold spawn with the resolved config.
if wr.enabled && !state.HasSFU {
cm.logger.Info("Re-materialized WebRTC gateway config from DB (state file was stale)",
zap.String("namespace", state.NamespaceName),
zap.Int("sfu_port", wr.sfuPort))

View File

@ -11,16 +11,11 @@ import "testing"
// port is per-node (0 on a gateway-only node). Pins both the drift
// fallback and the non-SFU-gateway case.
// dbFetch signature: () -> (turnSecret, turnDomain, stealthDomain string, sfuPort int, resolved bool).
// resolved=true means the lookup completed (with or without a config);
// resolved=false means it ERRORED (e.g. decrypt failure) → unresolved.
func dbNone() (string, string, string, int, bool) { return "", "", "", 0, true }
// dbFetch signature: () -> (turnSecret, turnDomain, stealthDomain string, sfuPort int).
func dbNone() (string, string, string, int) { return "", "", "", 0 }
// dbError models a DB/decrypt failure: the lookup did not complete.
func dbError() (string, string, string, int, bool) { return "", "", "", 0, false }
func dbFull(secret, domain string, sfuPort int) func() (string, string, string, int, bool) {
return func() (string, string, string, int, bool) { return secret, domain, "", sfuPort, true }
func dbFull(secret, domain string, sfuPort int) func() (string, string, string, int) {
return func() (string, string, string, int) { return secret, domain, "", sfuPort }
}
func TestChooseRestoreWebRTC_stateFileCompleteWins(t *testing.T) {
@ -29,7 +24,7 @@ func TestChooseRestoreWebRTC_stateFileCompleteWins(t *testing.T) {
// restart path).
dbCalled := false
got := chooseRestoreWebRTC(true, 7800, "turn.ns-x.dbrs.space", "state-secret", "",
func() (string, string, string, int, bool) { dbCalled = true; return dbNone() })
func() (string, string, string, int) { dbCalled = true; return dbNone() })
if dbCalled {
t.Error("DB fetch was called even though the state file had the TURN secret (should short-circuit)")
@ -90,7 +85,7 @@ func TestChooseRestoreWebRTC_stateHasTURNButNoSFU(t *testing.T) {
// NOT consult the DB (TURN secret present = complete enough).
dbCalled := false
got := chooseRestoreWebRTC(false, 0, "turn.ns-x.dbrs.space", "state-secret", "",
func() (string, string, string, int, bool) { dbCalled = true; return dbNone() })
func() (string, string, string, int) { dbCalled = true; return dbNone() })
if dbCalled {
t.Error("DB fetch called even though state file had the TURN secret")
@ -115,7 +110,7 @@ func TestChooseRestoreWebRTC_dbNoSecretStaysDisabled(t *testing.T) {
// enablement marker; without it we treat it as not-configured-for-
// TURN, but an SFU port alone still enables SFU routes.
got := chooseRestoreWebRTC(false, 0, "", "", "",
func() (string, string, string, int, bool) { return "", "turn.db", "", 9000, true })
func() (string, string, string, int) { return "", "turn.db", "", 9000 })
// dbFetch only runs when state secret is empty; here it returns no
// secret, so the `if dbSecret != ""` guard means NOTHING is taken
// from the DB → disabled. (An SFU-only-no-TURN namespace is not a
@ -131,7 +126,7 @@ func TestChooseRestoreWebRTC_stealthFromStateFile(t *testing.T) {
// Stealth toggles rewrite cluster state, so a fresh state file carries
// the stealth domain and must win without a DB call.
got := chooseRestoreWebRTC(true, 7800, "turn.ns-x.dbrs.space", "state-secret", "cdn-abc123def456.dbrs.space",
func() (string, string, string, int, bool) {
func() (string, string, string, int) {
t.Error("DB fetch called even though state file was complete")
return dbNone()
})
@ -144,65 +139,14 @@ func TestChooseRestoreWebRTC_stealthFromDBOnStaleState(t *testing.T) {
// Stale state (no TURN secret) + DB has stealth enabled → stealth domain
// re-materializes from the DB alongside the rest of the WebRTC block.
got := chooseRestoreWebRTC(false, 0, "", "", "",
func() (string, string, string, int, bool) {
return "db-secret", "turn.ns-x.dbrs.space", "cdn-abc123def456.dbrs.space", 7801, true
func() (string, string, string, int) {
return "db-secret", "turn.ns-x.dbrs.space", "cdn-abc123def456.dbrs.space", 7801
})
if !got.enabled || got.stealthDomain != "cdn-abc123def456.dbrs.space" {
t.Errorf("want stealth domain from DB on stale state; got %+v", got)
}
}
// --- bugboard #130: distinguish "unresolved (DB/decrypt error)" from "disabled" ---
func TestChooseRestoreWebRTC_dbErrorMarksUnresolvedNotDisabled(t *testing.T) {
// The bug-130 case: state file has no secret (freshly-joined node) and
// the DB lookup ERRORS (e.g. the stored TURN secret can't be decrypted
// after a cluster-secret rotation). This MUST surface as unresolved —
// NOT as a clean "disabled" — so the caller preserves the running config
// instead of writing a TURN-disabled gateway (which made turn.credentials
// return namespace_not_configured).
got := chooseRestoreWebRTC(false, 0, "", "", "", dbError)
if !got.unresolved {
t.Fatal("BUG #130 REGRESSION: a DB/decrypt error must mark the result unresolved")
}
if got.enabled {
t.Errorf("unresolved result must never be enabled (would write a config off an errored lookup); got %+v", got)
}
if got.turnSecret != "" {
t.Errorf("unresolved result must carry no secret; got %q", got.turnSecret)
}
}
func TestChooseRestoreWebRTC_resolvedEmptyIsDisabledNotUnresolved(t *testing.T) {
// The contrast case: the DB lookup COMPLETES and reports no WebRTC
// (genuinely disabled namespace). This must be disabled, NOT unresolved —
// the caller is free to write the empty/disabled config here.
got := chooseRestoreWebRTC(false, 0, "", "", "", dbNone)
if got.unresolved {
t.Error("a clean resolved-but-empty lookup must NOT be marked unresolved")
}
if got.enabled {
t.Errorf("genuinely-disabled namespace must be disabled; got %+v", got)
}
}
func TestChooseRestoreWebRTC_stateSecretWinsOverDBError(t *testing.T) {
// A node that already holds the TURN secret in its state file must NOT be
// affected by a DB error — it short-circuits before dbFetch and stays
// enabled/resolved. Guards against the #130 fix accidentally disabling
// healthy nodes when the DB is flaky.
got := chooseRestoreWebRTC(true, 7800, "turn.ns-x.dbrs.space", "state-secret", "",
func() (string, string, string, int, bool) {
t.Error("DB fetch must not be called when the state file has the secret")
return dbError()
})
if got.unresolved || !got.enabled || got.turnSecret != "state-secret" {
t.Errorf("state-file secret must win and stay enabled/resolved; got %+v", got)
}
}
func TestChooseRestoreWebRTC_noStealthStaysEmpty(t *testing.T) {
// Stealth disabled everywhere → empty stealthDomain (gateway advertises
// the baseline 3-rung ladder only).

View File

@ -493,7 +493,7 @@ func (i *Invoker) BatchInvoke(ctx context.Context, req *BatchInvokeRequest) (*Ba
func isSystemTrigger(t TriggerType) bool {
switch t {
case TriggerTypeCron, TriggerTypePubSub, TriggerTypeDatabase,
TriggerTypeTimer, TriggerTypeJob, TriggerTypeInternal:
TriggerTypeTimer, TriggerTypeJob:
return true
}
return false

View File

@ -30,11 +30,6 @@ const (
TriggerTypePubSub TriggerType = "pubsub"
TriggerTypeTimer TriggerType = "timer"
TriggerTypeJob TriggerType = "job"
// TriggerTypeInternal marks a gateway-initiated invocation with no end-user
// caller (e.g. the auth claims-provider hook at JWT mint time, bugboard
// #548). Treated as a system trigger so the per-caller authorization check
// is skipped — the gateway is the trusted invoker.
TriggerTypeInternal TriggerType = "internal"
)
// JobStatus represents the current state of a background job.
@ -239,8 +234,8 @@ type FunctionDefinition struct {
// When WSPersistent is true, the function exports ws_open/ws_frame/ws_close
// instead of using the default per-frame stateless model.
WSPersistent bool `json:"ws_persistent,omitempty"`
WSIdleTimeoutSec int `json:"ws_idle_timeout_sec,omitempty"` // 0 = no idle timeout
WSMaxFrameBytes int `json:"ws_max_frame_bytes,omitempty"` // 0 = use default 256 KB
WSIdleTimeoutSec int `json:"ws_idle_timeout_sec,omitempty"` // 0 = no idle timeout
WSMaxFrameBytes int `json:"ws_max_frame_bytes,omitempty"` // 0 = use default 256 KB
WSMaxInflightPerConn int `json:"ws_max_inflight_per_conn,omitempty"` // 0 = use default 64
// RawHTTPResponse enables raw-HTTP-response mode (bugboard #835): the
@ -289,11 +284,11 @@ type Function struct {
// InvocationContext provides context for a function invocation.
type InvocationContext struct {
RequestID string `json:"request_id"`
FunctionID string `json:"function_id"`
FunctionName string `json:"function_name"`
Namespace string `json:"namespace"`
CallerWallet string `json:"caller_wallet,omitempty"`
RequestID string `json:"request_id"`
FunctionID string `json:"function_id"`
FunctionName string `json:"function_name"`
Namespace string `json:"namespace"`
CallerWallet string `json:"caller_wallet,omitempty"`
// CallerIP is the source IP of the request, populated by HTTP/WS handlers.
// Used by the multi-tier rate limiter as a fallback bucket for anonymous
// (no-wallet) callers.

View File

@ -1,6 +1,6 @@
{
"name": "@debros/orama",
"version": "0.122.51",
"version": "0.122.47",
"description": "TypeScript SDK for Orama Network - Database, PubSub, Cache, Storage, Vault, and more",
"type": "module",
"main": "./dist/index.js",

View File

@ -167,41 +167,6 @@ export class HttpClient {
return this.baseURL;
}
/**
* Normalize any thrown error into a typed SDKError so callers can branch on
* `.code`/`.httpStatus` instead of string-matching a bare platform
* `TypeError: Network request failed` (bugboard #129).
*
* - SDKError (an HTTP error response) passes through unchanged.
* - An AbortError (our own per-request timeout firing) code "TIMEOUT".
* - Anything else (fetch rejects with a TypeError on DNS failure, connection
* refused, offline, or TLS error) code "NETWORK_ERROR".
*
* In every network case httpStatus is 0 (no HTTP response was received), which
* is how the app distinguishes "couldn't reach the gateway" from a real 4xx/5xx.
*/
private normalizeError(error: unknown, timeoutMs: number): SDKError {
if (error instanceof SDKError) {
return error;
}
const name = (error as { name?: string })?.name;
const message = error instanceof Error ? error.message : String(error);
if (name === "AbortError") {
return new SDKError(
`request timed out after ${timeoutMs}ms`,
0,
"TIMEOUT",
{ cause: name }
);
}
return new SDKError(
message || "network request failed",
0,
"NETWORK_ERROR",
{ cause: name }
);
}
async request<T = any>(
method: "GET" | "POST" | "PUT" | "DELETE",
path: string,
@ -333,14 +298,18 @@ export class HttpClient {
}
}
// Normalize native errors (TypeError, AbortError) into a typed SDKError
// so the app gets a stable `.code`/`.httpStatus` instead of a bare
// platform "Network request failed" (bugboard #129).
const sdkError = this.normalizeError(error, requestTimeout);
// Call the network error callback if configured. This allows the app to
// trigger gateway failover.
// Call the network error callback if configured
// This allows the app to trigger gateway failover
if (this.onNetworkError) {
// Convert native errors (TypeError, AbortError) to SDKError for the callback
const sdkError =
error instanceof SDKError
? error
: new SDKError(
error instanceof Error ? error.message : String(error),
0, // httpStatus 0 indicates network-level failure
"NETWORK_ERROR"
);
this.onNetworkError(sdkError, {
method,
path,
@ -349,7 +318,7 @@ export class HttpClient {
});
}
throw sdkError;
throw error;
} finally {
clearTimeout(timeoutId);
}

View File

@ -1,88 +0,0 @@
import { describe, it, expect, vi } from "vitest";
import { HttpClient } from "../../../src/core/http";
import { SDKError } from "../../../src/errors";
/**
* Bugboard #129 typed network errors.
*
* Before this fix the HttpClient re-threw the raw platform error on a
* network-level failure, so a caller (e.g. AnChat's JwtSession) could only
* tell "couldn't reach the gateway" apart from a real HTTP error by
* string-matching `TypeError: Network request failed`. These guards lock in
* that every transport failure surfaces as a typed SDKError with httpStatus 0
* and a stable `.code`, while genuine HTTP errors keep their status/code.
*/
describe("Bug #129 — HttpClient surfaces typed network errors", () => {
function client(fetchImpl: typeof fetch, onNetworkError?: any) {
return new HttpClient({
baseURL: "https://gw.example",
maxRetries: 0,
timeout: 5000,
fetch: fetchImpl,
onNetworkError,
});
}
it("maps a fetch TypeError (connection failure) to SDKError NETWORK_ERROR / status 0", async () => {
const fetchSpy = vi.fn(async () => {
throw new TypeError("Network request failed");
});
const err = await client(fetchSpy as any)
.post("/v1/auth/refresh", { refresh_token: "x" })
.catch((e) => e);
expect(err).toBeInstanceOf(SDKError);
expect(err.code).toBe("NETWORK_ERROR");
expect(err.httpStatus).toBe(0);
// Original platform message is preserved for diagnostics.
expect(err.message).toContain("Network request failed");
});
it("maps an AbortError (timeout) to SDKError TIMEOUT / status 0", async () => {
const fetchSpy = vi.fn(async () => {
const e = new Error("aborted");
e.name = "AbortError";
throw e;
});
const err = await client(fetchSpy as any)
.get("/v1/auth/challenge")
.catch((e) => e);
expect(err).toBeInstanceOf(SDKError);
expect(err.code).toBe("TIMEOUT");
expect(err.httpStatus).toBe(0);
expect(err.message).toContain("5000ms");
});
it("passes a real HTTP error through unchanged (not masked as NETWORK_ERROR)", async () => {
const fetchSpy = vi.fn(
async () =>
new Response(JSON.stringify({ error: "nope", code: "BAD_TOKEN" }), {
status: 401,
headers: { "content-type": "application/json" },
})
);
const err = await client(fetchSpy as any)
.post("/v1/auth/refresh", { refresh_token: "x" })
.catch((e) => e);
expect(err).toBeInstanceOf(SDKError);
expect(err.httpStatus).toBe(401);
expect(err.code).toBe("BAD_TOKEN");
});
it("hands the typed error (not the raw TypeError) to the onNetworkError callback", async () => {
const seen: SDKError[] = [];
const fetchSpy = vi.fn(async () => {
throw new TypeError("Failed to fetch");
});
await client(fetchSpy as any, (e: SDKError) => seen.push(e))
.get("/v1/db/read")
.catch(() => {});
expect(seen).toHaveLength(1);
expect(seen[0]).toBeInstanceOf(SDKError);
expect(seen[0].code).toBe("NETWORK_ERROR");
expect(seen[0].httpStatus).toBe(0);
});
});