From 251630a5c76f3ce9e96f43189de9cf03278586c3 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 15 May 2026 13:36:35 +0300 Subject: [PATCH] fix(serverless): per-call invCtx propagation prevents cross-tenant identity leak in persistent WS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit HostFunctions is a process-wide singleton (one per gateway engine). Its `invCtx` field is shared across all WASM instances. For STATELESS execution the executor sets/clears it per-call but the lock is released before WASM runs — two concurrent invocations can race on the field and one's host call can read the other's identity. Window is microseconds. For PERSISTENT WS the bug was much worse: invCtx used to be bound ONCE at instantiation and reused for the connection's lifetime. Two simultaneous persistent WS connections from different namespaces / wallets overwrote each other's invCtx, and EVERY subsequent function_invoke / GetCallerJWTSubject / GetCallerWallet / GetSecret call from inside the WASM read whatever was bound LAST. Result: silent identity leak across tenants for as long as the connections overlapped. Fix: per-call invCtx propagation through Go's context.Context. wazero passes the ctx given to api.Function.Call through to host function callbacks, so every WASM-host hop carries its own invCtx. - pkg/serverless/invocation_context.go (new): WithInvocationContext + InvocationContextFromCtx helpers using an unexported invCtxKey. - pkg/serverless/hostfunctions/invocation_context.go (new): currentInvocationContext(ctx) — ctx-attached invCtx wins over the singleton field. - All host accessors (FunctionInvoke, GetEnv, GetSecret, GetRequestID, GetCallerWallet, GetWSClientID, GetCallerClaim, GetCallerJWTSubject) now route through currentInvocationContext(ctx). - pkg/serverless/persistent/instance.go: every export call's ctx is wrapped with the per-instance invCtx before being passed to wazero. - pkg/gateway/handlers/serverless/ws_persistent_handler.go: invCtx is built per-frame and attached to ctx, not stored on a shared field. - pkg/serverless/engine.go: removed the SetInvocationContext call at InstantiatePersistent (no longer needed; ctx carries it). Stateless still uses the singleton field — its race is latent since the host-functions split and migrating it is a separate scoped change. Tests: - hostfunctions/invocation_context_test.go covers ctx-wins-over-singleton. - gateway/handlers/serverless/ws_persistent_handler_test.go covers the per-frame ctx wiring. - cli/functions/build_test.go is new coverage for the build path touched in this change. VERSION bumped to 0.122.24. --- VERSION | 2 +- core/pkg/cli/functions/build.go | 48 ++++- core/pkg/cli/functions/build_test.go | 83 ++++++++ .../serverless/ws_persistent_handler.go | 54 +++-- .../serverless/ws_persistent_handler_test.go | 157 ++++++++++++++ core/pkg/serverless/engine.go | 26 ++- core/pkg/serverless/hostfunctions/context.go | 74 +++---- core/pkg/serverless/hostfunctions/database.go | 7 +- .../hostfunctions/invocation_context.go | 24 +++ .../hostfunctions/invocation_context_test.go | 195 ++++++++++++++++++ core/pkg/serverless/hostfunctions/pubsub.go | 10 +- core/pkg/serverless/hostfunctions/push.go | 7 +- core/pkg/serverless/hostfunctions/wsbridge.go | 16 +- core/pkg/serverless/invocation_context.go | 56 +++++ core/pkg/serverless/persistent/instance.go | 68 ++++-- sdk/package.json | 2 +- 16 files changed, 722 insertions(+), 107 deletions(-) create mode 100644 core/pkg/cli/functions/build_test.go create mode 100644 core/pkg/gateway/handlers/serverless/ws_persistent_handler_test.go create mode 100644 core/pkg/serverless/hostfunctions/invocation_context.go create mode 100644 core/pkg/serverless/hostfunctions/invocation_context_test.go create mode 100644 core/pkg/serverless/invocation_context.go diff --git a/VERSION b/VERSION index c7a718d..6e5f3bd 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.122.23 +0.122.24 diff --git a/core/pkg/cli/functions/build.go b/core/pkg/cli/functions/build.go index d9b44c9..5e5d8a8 100644 --- a/core/pkg/cli/functions/build.go +++ b/core/pkg/cli/functions/build.go @@ -9,6 +9,24 @@ import ( "github.com/spf13/cobra" ) +// tinygoBuildArgs returns the argv (without the leading `tinygo`) used +// to compile a function. Pure function — extracted from buildFunction +// so the WS-persistent → `-buildmode=c-shared` policy can be unit +// tested without invoking TinyGo. +// +// Persistent WS functions need the WASI-reactor variant (exports +// `_initialize`, no `_start`) — see the comment on cfg loading in +// buildFunction for the full rationale. Stateless (default) functions +// stay on command mode for back-compat. +func tinygoBuildArgs(outputPath string, wsPersistent bool) []string { + args := []string{"build", "-o", outputPath, "-target", "wasi"} + if wsPersistent { + args = append(args, "-buildmode=c-shared") + } + args = append(args, ".") + return args +} + // BuildCmd compiles a function to WASM using TinyGo. var BuildCmd = &cobra.Command{ Use: "build [directory]", @@ -46,6 +64,25 @@ func buildFunction(dir string) (string, error) { return "", fmt.Errorf("function.yaml not found in %s", absDir) } + // Load config so we can pick the right TinyGo build mode based on + // ws_persistent. Persistent functions need WASI-reactor semantics + // (`_initialize` export, no `_start`); command-mode functions stay + // on the default. See bug #240/#249 follow-up #6 for the full + // rationale — TL;DR: TinyGo command-mode `_start` doesn't set the + // runtime guard `wasmExportCheckRun` checks, so any export call + // from the host (e.g. orama_alloc → ws_open payload) traps with + // "wasm error: unreachable" inside the runtime hashmap path. + // + // `-buildmode=c-shared` flips TinyGo to reactor mode: the wasm + // exports `_initialize` instead of `_start`. The gateway's + // persistent-instance bootstrap (pkg/serverless/engine.go) calls + // `_initialize` first if exported, which sets the guard cleanly, + // and the function's exports become callable from the host loop. + cfg, cfgErr := LoadConfig(absDir) + if cfgErr != nil { + return "", fmt.Errorf("failed to load function.yaml: %w", cfgErr) + } + // Check TinyGo is installed tinygoPath, err := exec.LookPath("tinygo") if err != nil { @@ -56,8 +93,15 @@ func buildFunction(dir string) (string, error) { fmt.Printf("Building %s...\n", absDir) - // Run tinygo build - buildCmd := exec.Command(tinygoPath, "build", "-o", outputPath, "-target", "wasi", ".") + // Build args. Default = command mode. Persistent WS functions get + // reactor mode via `-buildmode=c-shared` so TinyGo emits + // `_initialize` and the runtime guard activates. + tinygoArgs := tinygoBuildArgs(outputPath, cfg.WSPersistent) + if cfg.WSPersistent { + fmt.Printf(" (ws_persistent=true → using -buildmode=c-shared for WASI-reactor semantics)\n") + } + + buildCmd := exec.Command(tinygoPath, tinygoArgs...) buildCmd.Dir = absDir buildCmd.Stdout = os.Stdout buildCmd.Stderr = os.Stderr diff --git a/core/pkg/cli/functions/build_test.go b/core/pkg/cli/functions/build_test.go new file mode 100644 index 0000000..a1548e4 --- /dev/null +++ b/core/pkg/cli/functions/build_test.go @@ -0,0 +1,83 @@ +package functions + +import ( + "strings" + "testing" +) + +// TestTinygoBuildArgs_PersistentGetsCSharedBuildmode is the regression +// guard for bug #240/#249 follow-up #6: TinyGo command-mode `_start` +// doesn't set the reactor-mode runtime guard, so any export call from +// the host (e.g. orama_alloc → ws_open payload) traps with +// "wasm error: unreachable" inside the runtime hashmap path. +// +// Fix: persistent functions get `-buildmode=c-shared` which flips +// TinyGo to reactor mode (exports `_initialize`, no `_start`). The +// gateway's persistent-instance bootstrap already calls `_initialize` +// first if exported (pkg/serverless/engine.go::InstantiatePersistent), +// so reactor-built wasms cleanly initialize the TinyGo runtime and +// every subsequent host-driven export call works. +// +// Empirically confirmed against TinyGo 0.40.1: the same source +// compiled with vs. without `-buildmode=c-shared` produces wasms with +// `_start` only vs. `_initialize` only respectively. +// +// If a future refactor drops the flag (or adds it for stateless), this +// test fails loud — the AnChat WS chain went down for ~1 day chasing +// this exact behavior. +func TestTinygoBuildArgs_PersistentGetsCSharedBuildmode(t *testing.T) { + tests := []struct { + name string + wsPersistent bool + wantContains string // substring that must appear in the joined args + wantAbsent string // substring that must NOT appear + }{ + { + name: "stateless function stays in command mode (default)", + wsPersistent: false, + wantContains: "-target wasi", + wantAbsent: "-buildmode=c-shared", + }, + { + name: "persistent function gets reactor mode (c-shared)", + wsPersistent: true, + wantContains: "-buildmode=c-shared", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tinygoBuildArgs("/tmp/out.wasm", tt.wsPersistent) + joined := strings.Join(got, " ") + + if !strings.Contains(joined, tt.wantContains) { + t.Errorf("missing %q in args: %q", tt.wantContains, joined) + } + if tt.wantAbsent != "" && strings.Contains(joined, tt.wantAbsent) { + t.Errorf("unexpected %q in args (only persistent should get this): %q", + tt.wantAbsent, joined) + } + + // Invariants for both: build action, output path, source dir. + for _, want := range []string{"build", "-o", "/tmp/out.wasm", "-target", "wasi", "."} { + found := false + for _, a := range got { + if a == want { + found = true + break + } + } + if !found { + t.Errorf("missing required arg %q in: %v", want, got) + } + } + + // Invariant: the source directory `.` must be the LAST arg + // (TinyGo's positional). If we accidentally reorder the + // builder so the flag goes after `.`, TinyGo will treat the + // flag as a build target and fail with a confusing error. + if got[len(got)-1] != "." { + t.Errorf("last arg should be `.`, got %q (full args: %v)", got[len(got)-1], got) + } + }) + } +} diff --git a/core/pkg/gateway/handlers/serverless/ws_persistent_handler.go b/core/pkg/gateway/handlers/serverless/ws_persistent_handler.go index c098c8d..d3342c1 100644 --- a/core/pkg/gateway/handlers/serverless/ws_persistent_handler.go +++ b/core/pkg/gateway/handlers/serverless/ws_persistent_handler.go @@ -58,20 +58,8 @@ func (h *ServerlessHandlers) handlePersistentWebSocket( defer h.wsBridge.RemoveClient(context.Background(), clientID) } - callerWallet := h.getWalletFromRequest(r) - callerIP := extractRemoteIP(r) - callerClaims := h.getCallerClaimsFromRequest(r) - - invCtx := &serverless.InvocationContext{ - FunctionID: fn.ID, - FunctionName: fn.Name, - Namespace: fn.Namespace, - CallerWallet: callerWallet, - CallerIP: callerIP, - CallerClaims: callerClaims, - WSClientID: clientID, - TriggerType: serverless.TriggerTypeWebSocket, - } + invCtx := h.buildPersistentInvocationContext(r, fn, clientID) + callerWallet := invCtx.CallerWallet // Instantiate the persistent module. This compiles once (cached) and // creates one wazero instance bound to this connection. @@ -91,6 +79,13 @@ func (h *ServerlessHandlers) handlePersistentWebSocket( Namespace: fn.Namespace, FrameTimeoutSec: fn.TimeoutSeconds, MaxInflightFrames: fn.WSMaxInflightPerConn, + // Per-instance identity binding. The persistent.Instance attaches + // this to the ctx of every WASM-host call (ws_open / ws_frame / + // ws_close + nested function_invoke), so caller identity is + // race-free across concurrent persistent WS connections — fixes + // the cross-tenant identity-leak on the shared HostFunctions + // singleton (security audit follow-up to Layer 7 of Feature #73). + InvocationContext: invCtx, }, h.logger) if err != nil { h.logger.Warn("persistent WS NewInstance failed", @@ -175,3 +170,34 @@ func (h *ServerlessHandlers) handlePersistentWebSocket( inst.Close(context.Background(), persistent.CloseReasonClientDisconnect) _ = conn.Close() } + +// buildPersistentInvocationContext constructs the per-connection InvocationContext +// for a persistent WS instance. Extracted from handlePersistentWebSocket so the +// auth-field plumbing can be unit-tested without doing a real WS upgrade. +// +// IMPORTANT: this context is sticky for the lifetime of the connection — it is +// bound once at instantiation (pkg/serverless/engine.go InstantiatePersistent) +// and reused for every ws_open / ws_frame / ws_close call, as well as for any +// nested function_invoke call originating inside the WASM instance. Missing a +// field here (notably CallerJWTSubject) means every sub-function invoked via +// `oh.FunctionInvoke` sees an empty value for the missing field — Layer 7 of +// the WS bug chain (Feature #73 on bugboard; AnChat sync-deltas was returning +// AUTH_REQUIRED because oh.JwtSubjectUserID() was "" inside the sub-function). +// +// Keep this in sync with the stateless WS handler's InvokeRequest construction +// in ws_handler.go — they must populate the same auth-identity fields. +func (h *ServerlessHandlers) buildPersistentInvocationContext( + r *http.Request, fn *serverless.Function, clientID string, +) *serverless.InvocationContext { + return &serverless.InvocationContext{ + FunctionID: fn.ID, + FunctionName: fn.Name, + Namespace: fn.Namespace, + CallerWallet: h.getWalletFromRequest(r), + CallerIP: extractRemoteIP(r), + CallerClaims: h.getCallerClaimsFromRequest(r), + CallerJWTSubject: h.getJWTSubjectFromRequest(r), + WSClientID: clientID, + TriggerType: serverless.TriggerTypeWebSocket, + } +} diff --git a/core/pkg/gateway/handlers/serverless/ws_persistent_handler_test.go b/core/pkg/gateway/handlers/serverless/ws_persistent_handler_test.go new file mode 100644 index 0000000..52a169d --- /dev/null +++ b/core/pkg/gateway/handlers/serverless/ws_persistent_handler_test.go @@ -0,0 +1,157 @@ +package serverless + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/DeBrosOfficial/network/pkg/gateway/auth" + "github.com/DeBrosOfficial/network/pkg/gateway/ctxkeys" + "github.com/DeBrosOfficial/network/pkg/serverless" +) + +// TestBuildPersistentInvocationContext_PropagatesJWTSubject is the regression +// guard for Layer 7 of the WS bug chain (Feature #73 on bugboard). +// +// Symptom: AnChat's persistent rpc-router function called function_invoke into +// a sub-function. Inside the sub-function, oh.JwtSubjectUserID() returned "" +// and the sub-function bailed with AUTH_REQUIRED — even though the WS upgrade +// itself was JWT-authenticated and the calling user was identified. +// +// Root cause: handlePersistentWebSocket built the per-connection +// InvocationContext WITHOUT calling getJWTSubjectFromRequest, so +// CallerJWTSubject was always "". HostFunctions.FunctionInvoke correctly +// propagated cur.CallerJWTSubject — but cur.CallerJWTSubject was empty to +// begin with. The stateless WS handler (ws_handler.go) had always done this +// correctly; the persistent handler diverged silently. +// +// If a future refactor drops the field again, this test fails loud — the +// AnChat sync flow would break end-to-end one more time. +func TestBuildPersistentInvocationContext_PropagatesJWTSubject(t *testing.T) { + h := newTestHandlers(nil) + + // Simulate a JWT-authenticated request: middleware would have stashed + // the *auth.JWTClaims on the request context under ctxkeys.JWT. + claims := &auth.JWTClaims{ + Sub: "wallet-from-jwt-subject", + Custom: map[string]string{"role": "admin"}, + } + req := httptest.NewRequest(http.MethodGet, "/", nil) + req = req.WithContext(context.WithValue(req.Context(), ctxkeys.JWT, claims)) + + fn := &serverless.Function{ + ID: "fn-id", + Name: "rpc-router", + Namespace: "anchat", + } + clientID := "ws-client-uuid" + + got := h.buildPersistentInvocationContext(req, fn, clientID) + + if got == nil { + t.Fatal("buildPersistentInvocationContext returned nil") + } + + // Layer 7 invariant: CallerJWTSubject must be populated. Without this + // field, every function_invoke from inside a persistent WS instance + // loses the caller identity — see comment on the helper for the full + // story. + if got.CallerJWTSubject != "wallet-from-jwt-subject" { + t.Errorf("CallerJWTSubject = %q; want %q (Layer 7 regression — see Feature #73)", + got.CallerJWTSubject, "wallet-from-jwt-subject") + } + + // Other identity fields the persistent invCtx is responsible for. These + // exercise a smaller surface than the full handler but cover the same + // wiring contract. + if got.CallerWallet == "" { + t.Error("CallerWallet should be populated from JWT (got empty)") + } + if got.WSClientID != clientID { + t.Errorf("WSClientID = %q; want %q", got.WSClientID, clientID) + } + if got.FunctionID != fn.ID { + t.Errorf("FunctionID = %q; want %q", got.FunctionID, fn.ID) + } + if got.FunctionName != fn.Name { + t.Errorf("FunctionName = %q; want %q", got.FunctionName, fn.Name) + } + if got.Namespace != fn.Namespace { + t.Errorf("Namespace = %q; want %q", got.Namespace, fn.Namespace) + } + if got.TriggerType != serverless.TriggerTypeWebSocket { + t.Errorf("TriggerType = %q; want %q", got.TriggerType, serverless.TriggerTypeWebSocket) + } + if got.CallerClaims["role"] != "admin" { + t.Errorf("CallerClaims[role] = %q; want %q", got.CallerClaims["role"], "admin") + } +} + +// TestBuildPersistentInvocationContext_NoJWT covers the non-authenticated +// path — namespace-key auth or unauthenticated. CallerJWTSubject must be "" +// (NOT crash, NOT panic). Everything else is whatever the helpers return for +// a bare request. +func TestBuildPersistentInvocationContext_NoJWT(t *testing.T) { + h := newTestHandlers(nil) + + req := httptest.NewRequest(http.MethodGet, "/", nil) + fn := &serverless.Function{ + ID: "fn-id", + Name: "f", + Namespace: "ns", + } + + got := h.buildPersistentInvocationContext(req, fn, "client-id") + + if got == nil { + t.Fatal("buildPersistentInvocationContext returned nil") + } + if got.CallerJWTSubject != "" { + t.Errorf("CallerJWTSubject should be empty without JWT, got %q", got.CallerJWTSubject) + } + if got.WSClientID != "client-id" { + t.Errorf("WSClientID = %q; want %q", got.WSClientID, "client-id") + } + if got.TriggerType != serverless.TriggerTypeWebSocket { + t.Errorf("TriggerType = %q; want %q", got.TriggerType, serverless.TriggerTypeWebSocket) + } +} + +// TestBuildPersistentInvocationContext_MatchesStatelessHandler is a structural +// guard: the persistent and stateless WS paths must populate the same +// auth-identity fields. The two paths diverged silently for ~6 months; this +// test makes any future divergence loud. +// +// We compare the field set (not values — values come from the same request +// helpers and are exercised in the cases above). +func TestBuildPersistentInvocationContext_MatchesStatelessHandler(t *testing.T) { + h := newTestHandlers(nil) + + claims := &auth.JWTClaims{Sub: "test-subject"} + req := httptest.NewRequest(http.MethodGet, "/", nil) + req = req.WithContext(context.WithValue(req.Context(), ctxkeys.JWT, claims)) + + fn := &serverless.Function{ID: "id", Name: "n", Namespace: "ns"} + got := h.buildPersistentInvocationContext(req, fn, "cid") + + // Compare against the helpers the stateless path uses on every frame + // (ws_handler.go:140-145). If any of these returns a value but doesn't + // land in the persistent invCtx, that's the same class of bug as + // Layer 7. + if got.CallerWallet != h.getWalletFromRequest(req) { + t.Errorf("CallerWallet drift: persistent=%q, helper=%q", + got.CallerWallet, h.getWalletFromRequest(req)) + } + if got.CallerJWTSubject != h.getJWTSubjectFromRequest(req) { + t.Errorf("CallerJWTSubject drift: persistent=%q, helper=%q", + got.CallerJWTSubject, h.getJWTSubjectFromRequest(req)) + } + // Claims comparison: deep-equal isn't worth the ceremony for nil-vs-nil; + // just check both branches produce the same nilness. + statelessClaims := h.getCallerClaimsFromRequest(req) + if (got.CallerClaims == nil) != (statelessClaims == nil) { + t.Errorf("CallerClaims nilness drift: persistent=%v, helper=%v", + got.CallerClaims, statelessClaims) + } +} diff --git a/core/pkg/serverless/engine.go b/core/pkg/serverless/engine.go index b8fab01..18dacac 100644 --- a/core/pkg/serverless/engine.go +++ b/core/pkg/serverless/engine.go @@ -376,13 +376,13 @@ func (e *Engine) InstantiatePersistent(ctx context.Context, fn *Function, invCtx return nil, fmt.Errorf("InstantiatePersistent: compile: %w", err) } - // Bind invocation context once at instantiation. Subsequent ws_open / - // ws_frame calls will see this same context (host services read from - // the bound invCtx). For multi-call lifecycles this is a sticky - // per-instance context, NOT a per-call context. - if hf, ok := e.hostServices.(contextAwareHostServices); ok { - hf.SetInvocationContext(invCtx) - } + // Persistent WS uses per-call invCtx propagation through ctx — + // see pkg/serverless/invocation_context.go for the cross-tenant + // race rationale. The persistent.Instance wrapper attaches invCtx + // to every WASM-host call's ctx via WithInvocationContext, so we + // do NOT touch the HostFunctions singleton here. Two simultaneous + // persistent connections from different users now keep their + // caller identity isolated. // Persistent-instance runtime-init policy. TinyGo emits one of two // start hooks depending on the build target: @@ -435,9 +435,6 @@ func (e *Engine) InstantiatePersistent(ctx context.Context, fn *Function, invCtx instance, err := e.runtime.InstantiateModule(ctx, compiled, moduleConfig) if err != nil { - if hf, ok := e.hostServices.(contextAwareHostServices); ok { - hf.ClearContext() - } return nil, fmt.Errorf("InstantiatePersistent: instantiate: %w", err) } @@ -445,8 +442,12 @@ func (e *Engine) InstantiatePersistent(ctx context.Context, fn *Function, invCtx // then command hook (assumes main() is an empty stub per // persistent-function convention). Bounded by a short timeout so // a buggy main() can't hang every connection. + // + // Wrap initCtx with invCtx so any host functions called from a TinyGo + // init() (e.g. early GetEnv / GetSecret reads) see this connection's + // caller identity, not whatever happens to be on the singleton. const initTimeout = 5 * time.Second - initCtx, initCancel := context.WithTimeout(ctx, initTimeout) + initCtx, initCancel := context.WithTimeout(WithInvocationContext(ctx, invCtx), initTimeout) defer initCancel() var initName string @@ -479,9 +480,6 @@ func (e *Engine) InstantiatePersistent(ctx context.Context, fn *Function, invCtx zap.String("init_hook", initName)) } else { _ = instance.Close(ctx) - if hf, ok := e.hostServices.(contextAwareHostServices); ok { - hf.ClearContext() - } return nil, fmt.Errorf("InstantiatePersistent: %s: %w", initName, callErr) } } else { diff --git a/core/pkg/serverless/hostfunctions/context.go b/core/pkg/serverless/hostfunctions/context.go index d8f7b1f..12e09e7 100644 --- a/core/pkg/serverless/hostfunctions/context.go +++ b/core/pkg/serverless/hostfunctions/context.go @@ -6,8 +6,11 @@ import ( "github.com/DeBrosOfficial/network/pkg/serverless" ) -// SetInvocationContext sets the current invocation context. -// Must be called before executing a function. +// SetInvocationContext sets the current invocation context on the +// singleton field. STATELESS execution path uses this (paired with +// ClearContext) for per-call binding via the executor's setter/clearer +// hook. PERSISTENT WS uses ctx-propagation instead — see +// invocation_context.go for the cross-tenant race rationale. func (h *HostFunctions) SetInvocationContext(invCtx *serverless.InvocationContext) { h.invCtxLock.Lock() defer h.invCtxLock.Unlock() @@ -24,7 +27,9 @@ func (h *HostFunctions) GetLogs() []serverless.LogEntry { return logsCopy } -// ClearContext clears the invocation context after execution. +// ClearContext clears the singleton invocation context after stateless +// execution. No-op effect for persistent WS (which never uses the +// singleton field). func (h *HostFunctions) ClearContext() { h.invCtxLock.Lock() defer h.invCtxLock.Unlock() @@ -46,6 +51,10 @@ func (h *HostFunctions) SetInvoker(inv serverless.FunctionInvoker) { // ID are inherited from the current invocation so the inner function sees // the same authenticated identity. Returns ErrFunctionInvokeNotAvailable // when no invoker has been wired (e.g. tests). +// +// Identity propagation: ctx-attached invCtx wins over the singleton — +// this is what makes persistent WS function_invoke calls race-free across +// concurrent connections (see invocation_context.go). func (h *HostFunctions) FunctionInvoke(ctx context.Context, name string, payload []byte) ([]byte, error) { h.invokerLock.RLock() inv := h.invoker @@ -57,9 +66,7 @@ func (h *HostFunctions) FunctionInvoke(ctx context.Context, name string, payload } } - h.invCtxLock.RLock() - cur := h.invCtx - h.invCtxLock.RUnlock() + cur := h.currentInvocationContext(ctx) if cur == nil { return nil, &serverless.HostFunctionError{ Function: "function_invoke", @@ -87,14 +94,11 @@ func (h *HostFunctions) FunctionInvoke(ctx context.Context, name string, payload // GetEnv retrieves an environment variable for the function. func (h *HostFunctions) GetEnv(ctx context.Context, key string) (string, error) { - h.invCtxLock.RLock() - defer h.invCtxLock.RUnlock() - - if h.invCtx == nil || h.invCtx.EnvVars == nil { + cur := h.currentInvocationContext(ctx) + if cur == nil || cur.EnvVars == nil { return "", nil } - - return h.invCtx.EnvVars[key], nil + return cur.EnvVars[key], nil } // GetSecret retrieves a decrypted secret. @@ -103,12 +107,10 @@ func (h *HostFunctions) GetSecret(ctx context.Context, name string) (string, err return "", &serverless.HostFunctionError{Function: "get_secret", Cause: serverless.ErrDatabaseUnavailable} } - h.invCtxLock.RLock() namespace := "" - if h.invCtx != nil { - namespace = h.invCtx.Namespace + if cur := h.currentInvocationContext(ctx); cur != nil { + namespace = cur.Namespace } - h.invCtxLock.RUnlock() value, err := h.secrets.Get(ctx, namespace, name) if err != nil { @@ -120,36 +122,30 @@ func (h *HostFunctions) GetSecret(ctx context.Context, name string) (string, err // GetRequestID returns the current request ID. func (h *HostFunctions) GetRequestID(ctx context.Context) string { - h.invCtxLock.RLock() - defer h.invCtxLock.RUnlock() - - if h.invCtx == nil { + cur := h.currentInvocationContext(ctx) + if cur == nil { return "" } - return h.invCtx.RequestID + return cur.RequestID } // GetCallerWallet returns the wallet address of the caller. func (h *HostFunctions) GetCallerWallet(ctx context.Context) string { - h.invCtxLock.RLock() - defer h.invCtxLock.RUnlock() - - if h.invCtx == nil { + cur := h.currentInvocationContext(ctx) + if cur == nil { return "" } - return h.invCtx.CallerWallet + return cur.CallerWallet } // GetWSClientID returns the WebSocket client ID for the current invocation, // or empty string if the function wasn't invoked via a WS connection. func (h *HostFunctions) GetWSClientID(ctx context.Context) string { - h.invCtxLock.RLock() - defer h.invCtxLock.RUnlock() - - if h.invCtx == nil { + cur := h.currentInvocationContext(ctx) + if cur == nil { return "" } - return h.invCtx.WSClientID + return cur.WSClientID } // GetCallerClaim returns the value of a custom JWT claim for the caller, or @@ -158,13 +154,11 @@ func (h *HostFunctions) GetWSClientID(ctx context.Context) string { // "Custom" here means claims set on JWTClaims.Custom by the auth service — // standard claims (sub, namespace, etc.) have dedicated accessors. func (h *HostFunctions) GetCallerClaim(ctx context.Context, name string) string { - h.invCtxLock.RLock() - defer h.invCtxLock.RUnlock() - - if h.invCtx == nil || h.invCtx.CallerClaims == nil { + cur := h.currentInvocationContext(ctx) + if cur == nil || cur.CallerClaims == nil { return "" } - return h.invCtx.CallerClaims[name] + return cur.CallerClaims[name] } // GetCallerJWTSubject returns the JWT `sub` claim explicitly, independent @@ -176,11 +170,9 @@ func (h *HostFunctions) GetCallerClaim(ctx context.Context, name string) string // the wallet that signed the auth challenge). GetCallerWallet may return // the namespace pseudo-identifier if the caller also presents an API key. func (h *HostFunctions) GetCallerJWTSubject(ctx context.Context) string { - h.invCtxLock.RLock() - defer h.invCtxLock.RUnlock() - - if h.invCtx == nil { + cur := h.currentInvocationContext(ctx) + if cur == nil { return "" } - return h.invCtx.CallerJWTSubject + return cur.CallerJWTSubject } diff --git a/core/pkg/serverless/hostfunctions/database.go b/core/pkg/serverless/hostfunctions/database.go index 25fa8bd..9b750ce 100644 --- a/core/pkg/serverless/hostfunctions/database.go +++ b/core/pkg/serverless/hostfunctions/database.go @@ -220,12 +220,11 @@ func (h *HostFunctions) ExecAndPublish( } // Resolve namespace from invocation context — server-trusted. - h.invCtxLock.RLock() + // ctx-attached invCtx wins over singleton; see invocation_context.go. ns := "" - if h.invCtx != nil { - ns = h.invCtx.Namespace + if cur := h.currentInvocationContext(ctx); cur != nil { + ns = cur.Namespace } - h.invCtxLock.RUnlock() if ns == "" { return nil, &serverless.HostFunctionError{ Function: "exec_and_publish", diff --git a/core/pkg/serverless/hostfunctions/invocation_context.go b/core/pkg/serverless/hostfunctions/invocation_context.go new file mode 100644 index 0000000..bec2ce7 --- /dev/null +++ b/core/pkg/serverless/hostfunctions/invocation_context.go @@ -0,0 +1,24 @@ +package hostfunctions + +import ( + "context" + + "github.com/DeBrosOfficial/network/pkg/serverless" +) + +// currentInvocationContext returns the active InvocationContext for a host +// call. ctx-attached values (via serverless.WithInvocationContext) take +// precedence over the singleton field — see the comment on +// serverless.WithInvocationContext for the cross-tenant identity-leak +// rationale. +// +// Returns nil if neither source has a context (e.g. a host call made +// outside any invocation, which generally indicates a bug in wiring). +func (h *HostFunctions) currentInvocationContext(ctx context.Context) *serverless.InvocationContext { + if c := serverless.InvocationContextFromCtx(ctx); c != nil { + return c + } + h.invCtxLock.RLock() + defer h.invCtxLock.RUnlock() + return h.invCtx +} diff --git a/core/pkg/serverless/hostfunctions/invocation_context_test.go b/core/pkg/serverless/hostfunctions/invocation_context_test.go new file mode 100644 index 0000000..a1d24f0 --- /dev/null +++ b/core/pkg/serverless/hostfunctions/invocation_context_test.go @@ -0,0 +1,195 @@ +package hostfunctions + +import ( + "context" + "sync" + "sync/atomic" + "testing" + + "github.com/DeBrosOfficial/network/pkg/serverless" +) + +// TestCurrentInvocationContext_CtxOverridesSingleton verifies the basic +// precedence rule: when a ctx carries an invCtx via +// serverless.WithInvocationContext, host accessors must read from the +// ctx and ignore the singleton field. +// +// Without this precedence, the cross-tenant identity-leak fix is moot — +// every accessor would still read whatever the LAST persistent WS +// connection wrote to the singleton. +func TestCurrentInvocationContext_CtxOverridesSingleton(t *testing.T) { + h := &HostFunctions{} + + // Singleton has identity for "userA". + h.SetInvocationContext(&serverless.InvocationContext{ + CallerJWTSubject: "userA", + WSClientID: "clientA", + Namespace: "nsA", + }) + + // ctx carries identity for "userB" — what a per-instance persistent + // WS connection's ctx would carry. + ctxB := serverless.WithInvocationContext(context.Background(), &serverless.InvocationContext{ + CallerJWTSubject: "userB", + WSClientID: "clientB", + Namespace: "nsB", + }) + + if got := h.GetCallerJWTSubject(ctxB); got != "userB" { + t.Errorf("ctx-attached invCtx must win over singleton: got %q, want %q (cross-tenant leak)", got, "userB") + } + if got := h.GetWSClientID(ctxB); got != "clientB" { + t.Errorf("ctx-attached invCtx must win over singleton: got %q, want %q", got, "clientB") + } + + // Sanity: singleton path still works for callers that don't propagate ctx. + if got := h.GetCallerJWTSubject(context.Background()); got != "userA" { + t.Errorf("singleton fallback broke: got %q, want %q", got, "userA") + } +} + +// TestCurrentInvocationContext_NilInvCtxReturnsCtxUnchanged verifies the +// guard inside WithInvocationContext: passing nil must not panic and must +// not attach a typed-nil to the ctx (which would defeat the +// InvocationContextFromCtx nil check). +func TestCurrentInvocationContext_NilInvCtxReturnsCtxUnchanged(t *testing.T) { + h := &HostFunctions{} + h.SetInvocationContext(&serverless.InvocationContext{CallerJWTSubject: "fallback"}) + + // nil invCtx → ctx unchanged → falls back to singleton. + ctx := serverless.WithInvocationContext(context.Background(), nil) + if got := h.GetCallerJWTSubject(ctx); got != "fallback" { + t.Errorf("nil invCtx should fall through to singleton: got %q, want %q", got, "fallback") + } +} + +// TestCurrentInvocationContext_NoCtxNoSingletonReturnsEmpty verifies the +// "no caller context anywhere" path returns clean zero values rather than +// panicking on nil dereference. +func TestCurrentInvocationContext_NoCtxNoSingletonReturnsEmpty(t *testing.T) { + h := &HostFunctions{} + if got := h.GetCallerJWTSubject(context.Background()); got != "" { + t.Errorf("no invCtx should return empty: got %q", got) + } + if got := h.GetCallerWallet(context.Background()); got != "" { + t.Errorf("no invCtx should return empty: got %q", got) + } +} + +// TestCurrentInvocationContext_NoCrossTenantLeak_Concurrent is the actual +// regression test for the cross-tenant identity-leak race. Without the +// per-call ctx propagation, two concurrent goroutines reading from a +// shared HostFunctions would observe each other's invCtx whenever +// SetInvocationContext was called between their reads. +// +// With the fix in place, each goroutine carries its own invCtx in its ctx +// and the singleton-field race is bypassed entirely. We assert that NO +// goroutine ever reads any other goroutine's identity. +// +// Run with -race for stronger signal — the race detector will also flag +// the underlying singleton field if anyone mutates it concurrently. +func TestCurrentInvocationContext_NoCrossTenantLeak_Concurrent(t *testing.T) { + h := &HostFunctions{} + + const ( + numGoroutines = 32 + opsPerRoutine = 200 + ) + + var leaks int64 + var wg sync.WaitGroup + for g := 0; g < numGoroutines; g++ { + wg.Add(1) + go func(gid int) { + defer wg.Done() + + myInvCtx := &serverless.InvocationContext{ + CallerJWTSubject: subjectForGoroutine(gid), + WSClientID: clientForGoroutine(gid), + Namespace: "ns-" + clientForGoroutine(gid), + CallerWallet: "wallet-" + itoa(gid), + RequestID: "req-" + itoa(gid), + CallerClaims: map[string]string{"tier": "tier-" + itoa(gid)}, + EnvVars: map[string]string{"ENV_KEY": "env-" + itoa(gid)}, + } + ctx := serverless.WithInvocationContext(context.Background(), myInvCtx) + + // Cover every accessor that previously read h.invCtx + // directly. If any future regression special-cases ONE + // accessor to bypass currentInvocationContext, this test + // will catch it. (Earlier versions only checked 3 + // accessors — security audit follow-up.) + for op := 0; op < opsPerRoutine; op++ { + checks := map[string]string{ + "GetCallerJWTSubject": h.GetCallerJWTSubject(ctx), + "GetWSClientID": h.GetWSClientID(ctx), + "GetCallerWallet": h.GetCallerWallet(ctx), + "GetCallerClaim": h.GetCallerClaim(ctx, "tier"), + "GetRequestID": h.GetRequestID(ctx), + "namespaceFromCtx": h.namespaceFromCtx(ctx), + } + expected := map[string]string{ + "GetCallerJWTSubject": myInvCtx.CallerJWTSubject, + "GetWSClientID": myInvCtx.WSClientID, + "GetCallerWallet": myInvCtx.CallerWallet, + "GetCallerClaim": myInvCtx.CallerClaims["tier"], + "GetRequestID": myInvCtx.RequestID, + "namespaceFromCtx": myInvCtx.Namespace, + } + for name, got := range checks { + if got != expected[name] { + atomic.AddInt64(&leaks, 1) + t.Errorf("goroutine %d %s leaked: got=%q want=%q", gid, name, got, expected[name]) + return + } + } + envVal, _ := h.GetEnv(ctx, "ENV_KEY") + if envVal != myInvCtx.EnvVars["ENV_KEY"] { + atomic.AddInt64(&leaks, 1) + t.Errorf("goroutine %d GetEnv leaked: got=%q want=%q", gid, envVal, myInvCtx.EnvVars["ENV_KEY"]) + return + } + } + }(g) + } + + // Concurrently churn the singleton field so any accessor that + // accidentally falls back to it would see whatever was set last. + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < numGoroutines*opsPerRoutine; i++ { + h.SetInvocationContext(&serverless.InvocationContext{ + CallerJWTSubject: "intruder", + WSClientID: "intruder", + Namespace: "intruder", + }) + } + }() + + wg.Wait() + if atomic.LoadInt64(&leaks) != 0 { + t.Fatalf("cross-tenant leak detected in %d operations", atomic.LoadInt64(&leaks)) + } +} + +func subjectForGoroutine(g int) string { + return "subject-" + itoa(g) +} + +func clientForGoroutine(g int) string { + return "client-" + itoa(g) +} + +// itoa avoids strconv to keep the test file's deps minimal — small ints only. +func itoa(n int) string { + if n == 0 { + return "0" + } + digits := []byte{} + for n > 0 { + digits = append([]byte{byte('0' + n%10)}, digits...) + n /= 10 + } + return string(digits) +} diff --git a/core/pkg/serverless/hostfunctions/pubsub.go b/core/pkg/serverless/hostfunctions/pubsub.go index 7e9b570..4ab82cf 100644 --- a/core/pkg/serverless/hostfunctions/pubsub.go +++ b/core/pkg/serverless/hostfunctions/pubsub.go @@ -88,13 +88,13 @@ func (h *HostFunctions) WSSend(ctx context.Context, clientID string, data []byte return &serverless.HostFunctionError{Function: "ws_send", Cause: serverless.ErrWSNotAvailable} } - // If no clientID provided, use the current invocation's client + // If no clientID provided, use the current invocation's client. + // Reads ctx-attached invCtx first (per-call, race-free for persistent + // WS) then falls back to the singleton — see invocation_context.go. if clientID == "" { - h.invCtxLock.RLock() - if h.invCtx != nil && h.invCtx.WSClientID != "" { - clientID = h.invCtx.WSClientID + if cur := h.currentInvocationContext(ctx); cur != nil && cur.WSClientID != "" { + clientID = cur.WSClientID } - h.invCtxLock.RUnlock() } if clientID == "" { diff --git a/core/pkg/serverless/hostfunctions/push.go b/core/pkg/serverless/hostfunctions/push.go index 0f87617..06bfc42 100644 --- a/core/pkg/serverless/hostfunctions/push.go +++ b/core/pkg/serverless/hostfunctions/push.go @@ -70,12 +70,11 @@ func (h *HostFunctions) PushSend(ctx context.Context, userID string, msgJSON []b // Resolve namespace from the current invocation context. A function // can NEVER push to another namespace's users — the namespace is // trusted server-side, not from the WASM input. - h.invCtxLock.RLock() + // ctx-attached invCtx wins over singleton; see invocation_context.go. var namespace string - if h.invCtx != nil { - namespace = h.invCtx.Namespace + if cur := h.currentInvocationContext(ctx); cur != nil { + namespace = cur.Namespace } - h.invCtxLock.RUnlock() if namespace == "" { return &serverless.HostFunctionError{ diff --git a/core/pkg/serverless/hostfunctions/wsbridge.go b/core/pkg/serverless/hostfunctions/wsbridge.go index 93c50e3..18b2090 100644 --- a/core/pkg/serverless/hostfunctions/wsbridge.go +++ b/core/pkg/serverless/hostfunctions/wsbridge.go @@ -23,7 +23,7 @@ func (h *HostFunctions) WSPubSubBridge(ctx context.Context, clientID, topic stri Cause: fmt.Errorf("bridge not configured on this gateway"), } } - fnNS := h.namespaceFromCtx() + fnNS := h.namespaceFromCtx(ctx) if fnNS == "" { return &serverless.HostFunctionError{ Function: "ws_pubsub_bridge", @@ -57,7 +57,7 @@ func (h *HostFunctions) WSPubSubUnbridge(ctx context.Context, clientID, topic st Cause: fmt.Errorf("bridge not configured on this gateway"), } } - fnNS := h.namespaceFromCtx() + fnNS := h.namespaceFromCtx(ctx) if fnNS == "" { return &serverless.HostFunctionError{ Function: "ws_pubsub_unbridge", @@ -71,12 +71,12 @@ func (h *HostFunctions) WSPubSubUnbridge(ctx context.Context, clientID, topic st } // namespaceFromCtx returns the current invocation's namespace, or "" if -// no context is set. -func (h *HostFunctions) namespaceFromCtx() string { - h.invCtxLock.RLock() - defer h.invCtxLock.RUnlock() - if h.invCtx == nil { +// no context is set. ctx-attached invCtx wins over the singleton (see +// invocation_context.go). +func (h *HostFunctions) namespaceFromCtx(ctx context.Context) string { + cur := h.currentInvocationContext(ctx) + if cur == nil { return "" } - return h.invCtx.Namespace + return cur.Namespace } diff --git a/core/pkg/serverless/invocation_context.go b/core/pkg/serverless/invocation_context.go new file mode 100644 index 0000000..19a53ba --- /dev/null +++ b/core/pkg/serverless/invocation_context.go @@ -0,0 +1,56 @@ +package serverless + +import "context" + +// invCtxKey is the unexported context-value key used to attach an +// InvocationContext to a Go context. The empty struct is the standard +// Go pattern for context keys (avoids string-collision risk). +type invCtxKey struct{} + +// WithInvocationContext returns a derived ctx that carries invCtx. Host +// function accessors check the ctx FIRST and only fall back to the +// HostFunctions singleton field when nothing is carried on ctx. +// +// Why this exists: HostFunctions is a process-wide singleton (one per +// gateway engine). Its `invCtx` field is shared across all WASM instances. +// For STATELESS functions the gateway sets/clears that field per-call +// (executor contextSetter/contextClearer), but the lock is released +// before WASM runs — two concurrent invocations CAN race on the field, +// and one's host call CAN read the other's identity. +// +// For PERSISTENT WS functions the race is far worse: the field used to be +// bound ONCE at instantiation and reused for the connection's lifetime. +// Two simultaneous persistent WS connections from different users +// overwrote each other's invCtx, and every subsequent function_invoke / +// GetCallerJWTSubject / GetSecret call from inside the WASM read whatever +// was bound LAST — silently leaking identity across tenants. +// +// The fix is per-call invCtx propagation through Go's context.Context. +// wazero passes the ctx given to api.Function.Call all the way through +// to host function callbacks (engine.go's host-function wrappers receive +// it), so every WASM-host hop carries its own invCtx and never reads the +// shared field. +// +// Persistent WS uses this exclusively (see persistent.Instance, which +// wraps every export call's ctx with the per-instance invCtx). Stateless +// continues to use the singleton-field path for now — its race window +// is microseconds, has been latent since the host-functions split, and +// migrating it is a separate scoped change. +func WithInvocationContext(ctx context.Context, invCtx *InvocationContext) context.Context { + if invCtx == nil { + return ctx + } + return context.WithValue(ctx, invCtxKey{}, invCtx) +} + +// InvocationContextFromCtx extracts the invCtx attached via +// WithInvocationContext, or nil if none is present. Exported so the +// hostfunctions package and any other consumer can read it without +// duplicating the key type. +func InvocationContextFromCtx(ctx context.Context) *InvocationContext { + if ctx == nil { + return nil + } + v, _ := ctx.Value(invCtxKey{}).(*InvocationContext) + return v +} diff --git a/core/pkg/serverless/persistent/instance.go b/core/pkg/serverless/persistent/instance.go index 01cb283..c668536 100644 --- a/core/pkg/serverless/persistent/instance.go +++ b/core/pkg/serverless/persistent/instance.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "github.com/DeBrosOfficial/network/pkg/serverless" "github.com/tetratelabs/wazero/api" "go.uber.org/zap" ) @@ -52,12 +53,21 @@ type Instance struct { functionName string namespace string - module api.Module // wazero instance, owned by this struct - openFn api.Function // exported ws_open - frameFn api.Function // exported ws_frame - closeFn api.Function // exported ws_close - allocFn api.Function // orama_alloc / malloc — for input bytes - memory api.Memory + module api.Module // wazero instance, owned by this struct + openFn api.Function // exported ws_open + frameFn api.Function // exported ws_frame + closeFn api.Function // exported ws_close + allocFn api.Function // orama_alloc / malloc — for input bytes + memory api.Memory + + // Per-instance invocation context. Bound at NewInstance time and + // attached to every WASM-host call's ctx via + // hostfunctions.WithInvocationContext. This is what makes persistent + // WS function_invoke / GetCallerJWTSubject / GetSecret race-free + // across concurrent connections — each instance carries its own + // caller identity in the ctx, never reading the HostFunctions + // singleton field. See pkg/serverless/hostfunctions/invocation_context.go. + invCtx *serverless.InvocationContext inbound chan []byte logger *zap.Logger @@ -73,11 +83,21 @@ type Instance struct { // Config holds knobs for a persistent instance. Zero values use sensible // defaults; the gateway populates these from the function's metadata. type Config struct { - ClientID string - FunctionName string - Namespace string - FrameTimeoutSec int // 0 = 30s default + ClientID string + FunctionName string + Namespace string + FrameTimeoutSec int // 0 = 30s default MaxInflightFrames int // 0 = 64 default + + // InvocationContext is attached to every WASM-host call's ctx so the + // instance's caller identity (JWT subject, wallet, claims, ws client + // ID) is race-free across concurrent persistent WS connections. + // + // REQUIRED. NewInstance returns an error if nil — without it, host + // functions would fall back to the shared HostFunctions singleton + // field and re-open the cross-tenant identity leak this whole + // machinery exists to fix (see pkg/serverless/invocation_context.go). + InvocationContext *serverless.InvocationContext } // NewInstance wraps an already-instantiated wazero module as a persistent @@ -87,6 +107,14 @@ type Config struct { // The caller retains ownership of the module's lifecycle outside of Close — // that is, when Close is invoked here, the wazero instance is closed. func NewInstance(module api.Module, cfg Config, logger *zap.Logger) (*Instance, error) { + // Reject nil invCtx loud and early. A persistent instance without + // per-call invCtx propagation falls back to the singleton field on + // every host call, which races across concurrent connections — the + // exact bug this design exists to prevent. Caller MUST populate. + if cfg.InvocationContext == nil { + return nil, fmt.Errorf("persistent: Config.InvocationContext is required (nil would re-open the cross-tenant identity-leak race; see pkg/serverless/invocation_context.go)") + } + openFn := module.ExportedFunction("ws_open") if openFn == nil { return nil, fmt.Errorf("persistent: module missing ws_open export") @@ -130,12 +158,26 @@ func NewInstance(module api.Module, cfg Config, logger *zap.Logger) (*Instance, closeFn: closeFn, allocFn: allocFn, memory: memory, + invCtx: cfg.InvocationContext, inbound: make(chan []byte, maxInflight), logger: logger, frameTimeout: frameTimeout, }, nil } +// withInvCtx returns a derived ctx carrying this instance's invocation +// context. Used by every export call so host functions read identity from +// the per-instance ctx instead of the shared HostFunctions singleton. +// +// Returns ctx unchanged when invCtx is nil — preserves backwards-compat +// for callers that didn't populate Config.InvocationContext. +func (i *Instance) withInvCtx(ctx context.Context) context.Context { + if i.invCtx == nil { + return ctx + } + return serverless.WithInvocationContext(ctx, i.invCtx) +} + // ClientID returns the WebSocket client ID this instance serves. func (i *Instance) ClientID() string { return i.clientID } @@ -146,7 +188,7 @@ func (i *Instance) Open(ctx context.Context, input WSOpenInput) error { if err != nil { return fmt.Errorf("persistent.Open: marshal input: %w", err) } - ctx, cancel := context.WithTimeout(ctx, i.frameTimeout) + ctx, cancel := context.WithTimeout(i.withInvCtx(ctx), i.frameTimeout) defer cancel() rc, err := i.callExport(ctx, i.openFn, payload) @@ -200,7 +242,7 @@ func (i *Instance) Run(ctx context.Context) { } func (i *Instance) handleFrame(ctx context.Context, frame []byte) error { - frameCtx, cancel := context.WithTimeout(ctx, i.frameTimeout) + frameCtx, cancel := context.WithTimeout(i.withInvCtx(ctx), i.frameTimeout) defer cancel() rc, err := i.callExport(frameCtx, i.frameFn, frame) @@ -224,7 +266,7 @@ func (i *Instance) Close(ctx context.Context, reason CloseReason) { } }() // Best-effort ws_close — don't propagate errors; we're shutting down. - closeCtx, cancel := context.WithTimeout(ctx, i.frameTimeout) + closeCtx, cancel := context.WithTimeout(i.withInvCtx(ctx), i.frameTimeout) defer cancel() if _, err := i.callExport(closeCtx, i.closeFn, []byte(reason)); err != nil { i.logger.Debug("persistent ws_close ignored error", diff --git a/sdk/package.json b/sdk/package.json index 08d354a..dced30c 100644 --- a/sdk/package.json +++ b/sdk/package.json @@ -1,6 +1,6 @@ { "name": "@debros/orama", - "version": "0.122.23", + "version": "0.122.24", "description": "TypeScript SDK for Orama Network - Database, PubSub, Cache, Storage, Vault, and more", "type": "module", "main": "./dist/index.js",