From cfff08d91e4608b0e095dae60356e8158097e1a5 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Thu, 28 May 2026 09:54:24 +0300 Subject: [PATCH] feat(serverless): add turn_credentials host function and slow invocation diagnostics - Implement `turn_credentials` host function to provide TURN configuration to WASM modules. - Add structured logging for slow serverless invocations exceeding 5s, providing per-phase timing (rate-limit, module-load, execution) to identify performance bottlenecks. - Enhance WebSocket handler logging to capture request context when 30s timeouts occur. --- core/pkg/gateway/dependencies.go | 6 + .../gateway/handlers/serverless/ws_handler.go | 20 ++ core/pkg/serverless/engine.go | 93 ++++++++ .../pkg/serverless/engine_slow_invoke_test.go | 130 +++++++++++ core/pkg/serverless/hostfuncs_test.go | 4 + .../serverless/hostfunctions/host_services.go | 29 +-- core/pkg/serverless/hostfunctions/turn.go | 111 +++++++++ .../pkg/serverless/hostfunctions/turn_test.go | 210 ++++++++++++++++++ core/pkg/serverless/hostfunctions/types.go | 18 ++ core/pkg/serverless/mocks_test.go | 6 + core/pkg/serverless/types.go | 24 ++ 11 files changed, 638 insertions(+), 13 deletions(-) create mode 100644 core/pkg/serverless/engine_slow_invoke_test.go create mode 100644 core/pkg/serverless/hostfunctions/turn.go create mode 100644 core/pkg/serverless/hostfunctions/turn_test.go diff --git a/core/pkg/gateway/dependencies.go b/core/pkg/gateway/dependencies.go index 6f30e7d..8c490f8 100644 --- a/core/pkg/gateway/dependencies.go +++ b/core/pkg/gateway/dependencies.go @@ -506,6 +506,12 @@ func initializeServerless(logger *logging.ColoredLogger, cfg *Config, deps *Depe hostFuncsCfg := hostfunctions.HostFunctionsConfig{ IPFSAPIURL: cfg.IPFSAPIURL, HTTPTimeout: 30 * time.Second, + // feat-9 — TURN config for the turn_credentials host fn. + // Empty TURNSecret → host fn returns {configured:false} envelope + // (same shape as the HTTP endpoint's 503 semantically). + TURNDomain: cfg.TURNDomain, + TURNSecret: cfg.TURNSecret, + StealthCDNDomain: cfg.StealthCDNDomain, } // WS-PubSub bridge: wire PubSub topics directly to WS clients without // per-event WASM invocation. The bridge is a thin layer over the diff --git a/core/pkg/gateway/handlers/serverless/ws_handler.go b/core/pkg/gateway/handlers/serverless/ws_handler.go index 8986bb6..88b2ca7 100644 --- a/core/pkg/gateway/handlers/serverless/ws_handler.go +++ b/core/pkg/gateway/handlers/serverless/ws_handler.go @@ -172,6 +172,26 @@ func (h *ServerlessHandlers) HandleWebSocket(w http.ResponseWriter, r *http.Requ } resp, err := h.invoker.Invoke(ctx, req) + // Bugboard #24 diagnostic — when the 30s WS-handler timeout + // actually fires, log a structured warning so AnChat's next + // "signaling.relay timed out" report includes request_id + + // function + namespace + duration. Pre-fix this surfaced as + // opaque "RPC timeout after 30s" with no way to correlate to a + // specific invocation in engine logs. + if err != nil && ctx.Err() == context.DeadlineExceeded { + fields := []zap.Field{ + zap.String("namespace", namespace), + zap.String("function", name), + zap.String("ws_client_id", clientID), + zap.Int64("duration_ms", resp.DurationMS), + zap.Int("timeout_ms", 30000), + zap.String("caller_wallet", callerWallet), + } + if resp.RequestID != "" { + fields = append(fields, zap.String("request_id", resp.RequestID)) + } + h.logger.Warn("WS function-invoke hit 30s ceiling (bug-24)", fields...) + } cancel() // Send response back diff --git a/core/pkg/serverless/engine.go b/core/pkg/serverless/engine.go index e20c19a..3bc373d 100644 --- a/core/pkg/serverless/engine.go +++ b/core/pkg/serverless/engine.go @@ -221,7 +221,20 @@ func NewEngine(cfg *Config, registry FunctionRegistry, hostServices HostServices return engine, nil } +// slowInvokeThreshold is the wall-clock duration above which Execute +// emits a structured "slow invocation" warning with per-phase +// breakdown. Picked to be well below the WS-handler's 30s ceiling +// (bugboard #24) so we get diagnostic logs BEFORE the timeout actually +// fires, surfacing which phase is the sink. +const slowInvokeThreshold = 5 * time.Second + // Execute runs a function with the given input and returns the output. +// +// Emits per-phase timing telemetry when total duration exceeds +// slowInvokeThreshold — bugboard #24 diagnostic. Without this, slow +// invocations only surfaced as opaque "RPC timeout after 30s" at the +// WS handler, with no way to tell whether the sink was rate-limit +// checks, module compile, or WASM execution itself. func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx *InvocationContext) ([]byte, error) { if fn == nil { return nil, &ValidationError{Field: "function", Message: "cannot be nil"} @@ -229,6 +242,15 @@ func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx invCtx = EnsureInvocationContext(invCtx, fn) startTime := time.Now() + // Per-phase timestamps for the slow-invoke log (bugboard #24 + // diagnostic). Zero values mean the phase was never entered, which + // itself is signal (e.g. ratelimitMs=0 with totalMs=30000 means we + // blocked entirely in module-load or execution). + var ( + ratelimitDoneAt time.Time + moduleLoadedAt time.Time + executeDoneAt time.Time + ) // Check rate limit. Prefer the tiered path when the limiter supports it // — that gives per-(ns, fn, wallet, ip) enforcement with retry-after. @@ -257,6 +279,8 @@ func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx } } + ratelimitDoneAt = time.Now() + // Create timeout context execCtx, cancel := CreateTimeoutContext(ctx, fn, e.config.MaxTimeoutSeconds) defer cancel() @@ -292,8 +316,10 @@ func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx module, err := e.getOrCompileModule(execCtx, fn.WASMCID) if err != nil { e.logInvocation(ctx, fn, invCtx, logBuf, startTime, 0, InvocationStatusError, err) + e.logSlowInvocation(invCtx, startTime, ratelimitDoneAt, moduleLoadedAt, executeDoneAt, "module-load-failed", err) return nil, &ExecutionError{FunctionName: fn.Name, RequestID: invCtx.RequestID, Cause: err} } + moduleLoadedAt = time.Now() // Execute the module with context setters var contextSetter, contextClearer func() @@ -302,6 +328,7 @@ func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx contextClearer = func() { hf.ClearContext() } } output, err := e.executor.ExecuteModule(execCtx, module, fn.Name, input, contextSetter, contextClearer) + executeDoneAt = time.Now() if err != nil { status := InvocationStatusError if execCtx.Err() == context.DeadlineExceeded { @@ -309,13 +336,60 @@ func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx err = ErrTimeout } e.logInvocation(ctx, fn, invCtx, logBuf, startTime, len(output), status, err) + e.logSlowInvocation(invCtx, startTime, ratelimitDoneAt, moduleLoadedAt, executeDoneAt, string(status), err) return nil, &ExecutionError{FunctionName: fn.Name, RequestID: invCtx.RequestID, Cause: err} } e.logInvocation(ctx, fn, invCtx, logBuf, startTime, len(output), InvocationStatusSuccess, nil) + e.logSlowInvocation(invCtx, startTime, ratelimitDoneAt, moduleLoadedAt, executeDoneAt, "success", nil) return output, nil } +// logSlowInvocation emits a structured warning when total wall-clock +// exceeds slowInvokeThreshold (bugboard #24 diagnostic). Per-phase +// timestamps let operators see WHICH layer was the sink — pre-fix the +// only signal was an opaque WS-handler "timeout after 30s" with no way +// to tell whether rate-limit, module-load, or WASM-execute consumed +// the budget. +// +// Zero-valued phase timestamps mean the phase was never reached, which +// is itself signal — e.g. moduleLoadedAt=zero + executeDoneAt=zero with +// large totalMs means we blocked in rate-limit OR module-load. +func (e *Engine) logSlowInvocation(invCtx *InvocationContext, startTime, ratelimitDoneAt, moduleLoadedAt, executeDoneAt time.Time, status string, err error) { + totalMs := time.Since(startTime).Milliseconds() + if totalMs < slowInvokeThreshold.Milliseconds() { + return + } + // Compute phase deltas. Use 0 for unreached phases so the log line + // columns are stable. + var ratelimitMs, moduleLoadMs, executeMs int64 + if !ratelimitDoneAt.IsZero() { + ratelimitMs = ratelimitDoneAt.Sub(startTime).Milliseconds() + } + if !moduleLoadedAt.IsZero() && !ratelimitDoneAt.IsZero() { + moduleLoadMs = moduleLoadedAt.Sub(ratelimitDoneAt).Milliseconds() + } + if !executeDoneAt.IsZero() && !moduleLoadedAt.IsZero() { + executeMs = executeDoneAt.Sub(moduleLoadedAt).Milliseconds() + } + fields := []zap.Field{ + zap.String("namespace", invCtx.Namespace), + zap.String("function", invCtx.FunctionName), + zap.String("request_id", invCtx.RequestID), + zap.String("trigger_type", string(invCtx.TriggerType)), + zap.String("ws_client_id", invCtx.WSClientID), + zap.Int64("total_ms", totalMs), + zap.Int64("ratelimit_ms", ratelimitMs), + zap.Int64("module_load_ms", moduleLoadMs), + zap.Int64("execute_ms", executeMs), + zap.String("invocation_status", status), + } + if err != nil { + fields = append(fields, zap.Error(err)) + } + e.logger.Warn("slow serverless invocation (bug-24 diagnostic)", fields...) +} + // Precompile compiles a WASM module and caches it for faster execution. func (e *Engine) Precompile(ctx context.Context, wasmCID string, wasmBytes []byte) error { if wasmCID == "" { @@ -665,6 +739,7 @@ func (e *Engine) registerHostModule(ctx context.Context) error { NewFunctionBuilder().WithFunc(e.hPubSubPublishBatch).Export("pubsub_publish_batch"). NewFunctionBuilder().WithFunc(e.hPushSend).Export("push_send"). NewFunctionBuilder().WithFunc(e.hPushSendV2).Export("push_send_v2"). + NewFunctionBuilder().WithFunc(e.hTurnCredentials).Export("turn_credentials"). NewFunctionBuilder().WithFunc(e.hWSPubSubBridge).Export("ws_pubsub_bridge"). NewFunctionBuilder().WithFunc(e.hWSPubSubUnbridge).Export("ws_pubsub_unbridge"). NewFunctionBuilder().WithFunc(e.hWSSend).Export("ws_send"). @@ -1199,6 +1274,24 @@ func (e *Engine) hPushSendV2(ctx context.Context, mod api.Module, return e.executor.WriteToGuest(ctx, mod, out) } +// hTurnCredentials is the WASM-callable wrapper for TurnCredentials — +// feat-9. Takes no args (namespace derived from invocation context), +// returns packed uint64 (ptr<<32 | len) pointing to a JSON envelope in +// guest memory, or 0 on setup error. +// +// The envelope shape is documented at turn.go:turnCredentialsEnvelope. +// Callers MUST parse it — a non-zero return doesn't imply TURN is +// configured (check envelope.configured before using credentials). +func (e *Engine) hTurnCredentials(ctx context.Context, mod api.Module) uint64 { + out, err := e.hostServices.TurnCredentials(ctx) + if err != nil { + e.logger.Warn("host function turn_credentials failed", + zap.Error(err)) + return 0 + } + return e.executor.WriteToGuest(ctx, mod, out) +} + func (e *Engine) hLogInfo(ctx context.Context, mod api.Module, ptr, size uint32) { msg, ok := e.executor.ReadFromGuest(mod, ptr, size) if ok { diff --git a/core/pkg/serverless/engine_slow_invoke_test.go b/core/pkg/serverless/engine_slow_invoke_test.go new file mode 100644 index 0000000..3c78e1d --- /dev/null +++ b/core/pkg/serverless/engine_slow_invoke_test.go @@ -0,0 +1,130 @@ +package serverless + +import ( + "testing" + "time" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" +) + +// Bugboard #24 — slow-invoke diagnostic logging. +// +// The WS handler enforces a 30s ceiling on function-invoke. Pre-#24, +// when that ceiling fired AnChat saw "RPC timeout after 30s" with no +// way to tell whether the engine was blocked in rate-limit checks, +// module compile, or WASM execution itself. Engine.Execute now emits +// a structured "slow serverless invocation" warning above +// slowInvokeThreshold (5s) with per-phase breakdown so the next test +// run gives operators a smoking gun pointing at the actual sink. +// +// These tests pin the log shape so a refactor can't silently drop +// fields AnChat will be looking for. + +func TestLogSlowInvocation_belowThresholdEmitsNothing(t *testing.T) { + // Trivial: fast invocations don't pollute logs. The threshold + // exists specifically so warning-grade logs stay actionable. + core, observed := observer.New(zapcore.WarnLevel) + e := &Engine{logger: zap.New(core)} + invCtx := &InvocationContext{Namespace: "ns", FunctionName: "fast-fn"} + + now := time.Now() + e.logSlowInvocation(invCtx, now, now.Add(1*time.Millisecond), now.Add(2*time.Millisecond), now.Add(100*time.Millisecond), "success", nil) + + if got := observed.Len(); got != 0 { + t.Errorf("fast invocation (100ms < 5s threshold) emitted %d log lines; want 0", got) + } +} + +func TestLogSlowInvocation_aboveThresholdEmitsBreakdown(t *testing.T) { + // The actual bug-24 diagnostic. Total > 5s → emit warning with + // ALL phase fields populated so AnChat's next slow-call report + // pins which layer is the sink. + core, observed := observer.New(zapcore.WarnLevel) + e := &Engine{logger: zap.New(core)} + invCtx := &InvocationContext{ + Namespace: "anchat-test", + FunctionName: "signaling.relay", + RequestID: "req-abc-123", + TriggerType: TriggerTypeWebSocket, + WSClientID: "ws-client-xyz", + } + + // Simulate a 30s-class invocation that spent the bulk in execute. + start := time.Now().Add(-30 * time.Second) + ratelimitDone := start.Add(50 * time.Millisecond) + moduleLoaded := start.Add(150 * time.Millisecond) + executeDone := start.Add(30 * time.Second) + e.logSlowInvocation(invCtx, start, ratelimitDone, moduleLoaded, executeDone, "timeout", nil) + + logs := observed.All() + if len(logs) != 1 { + t.Fatalf("slow invocation emitted %d log lines; want 1", len(logs)) + } + got := logs[0] + + // Smoking-gun fields AnChat's diagnostic will read: + want := map[string]interface{}{ + "namespace": "anchat-test", + "function": "signaling.relay", + "request_id": "req-abc-123", + "ws_client_id": "ws-client-xyz", + "invocation_status": "timeout", + } + for k, v := range want { + field, ok := got.ContextMap()[k] + if !ok { + t.Errorf("missing field %q in slow-invoke log (AnChat depends on this)", k) + continue + } + if field != v { + t.Errorf("field %q = %v; want %v", k, field, v) + } + } + + // Phase timings — the actual diagnostic value. Total ≈ 30s, with + // rate-limit + module-load being trivial fractions, so execute_ms + // should dominate. This tells operators "WASM execution is the + // sink, not rate-limit or module compile." + contextMap := got.ContextMap() + totalMs, _ := contextMap["total_ms"].(int64) + executeMs, _ := contextMap["execute_ms"].(int64) + if totalMs < 29000 || totalMs > 31000 { + t.Errorf("total_ms = %d; want ~30000 for the simulated 30s invocation", totalMs) + } + if executeMs < 29000 || executeMs > 30000 { + t.Errorf("execute_ms = %d; want ~29900 (proves the phase-breakdown points at execute)", executeMs) + } +} + +func TestLogSlowInvocation_zeroPhaseTimestampsMeanUnreached(t *testing.T) { + // Defensive: if Execute bails early (e.g. module compile fails + // before WASM runs), executeDoneAt is zero. The log must still + // emit with executeMs=0 rather than producing negative or absurd + // values from subtracting zero.Time. This shape lets ops see + // "we never reached execute" as a distinct signal from "execute + // was fast." + core, observed := observer.New(zapcore.WarnLevel) + e := &Engine{logger: zap.New(core)} + invCtx := &InvocationContext{Namespace: "ns", FunctionName: "fn"} + + start := time.Now().Add(-10 * time.Second) + ratelimitDone := start.Add(100 * time.Millisecond) + // moduleLoadedAt and executeDoneAt left as zero — module-load failed + e.logSlowInvocation(invCtx, start, ratelimitDone, time.Time{}, time.Time{}, "module-load-failed", nil) + + logs := observed.All() + if len(logs) != 1 { + t.Fatalf("want 1 log line; got %d", len(logs)) + } + cm := logs[0].ContextMap() + moduleLoadMs, _ := cm["module_load_ms"].(int64) + executeMs, _ := cm["execute_ms"].(int64) + if moduleLoadMs != 0 { + t.Errorf("module_load_ms = %d; want 0 when moduleLoadedAt was never set (signals 'unreached')", moduleLoadMs) + } + if executeMs != 0 { + t.Errorf("execute_ms = %d; want 0 when executeDoneAt was never set", executeMs) + } +} diff --git a/core/pkg/serverless/hostfuncs_test.go b/core/pkg/serverless/hostfuncs_test.go index 9ba5719..858da3c 100644 --- a/core/pkg/serverless/hostfuncs_test.go +++ b/core/pkg/serverless/hostfuncs_test.go @@ -114,6 +114,10 @@ func (m *mockHostServices) PushSendV2(ctx context.Context, userID string, msgJSO return []byte(`{"ok":true,"devices_attempted":0,"devices_succeeded":0,"results":[]}`), nil } +func (m *mockHostServices) TurnCredentials(ctx context.Context) ([]byte, error) { + return []byte(`{"configured":false}`), nil +} + func (m *mockHostServices) DBTransaction(ctx context.Context, opsJSON []byte) ([]byte, error) { return []byte(`{"committed":true,"results":[]}`), nil } diff --git a/core/pkg/serverless/hostfunctions/host_services.go b/core/pkg/serverless/hostfunctions/host_services.go index 160be73..6ab4ddf 100644 --- a/core/pkg/serverless/hostfunctions/host_services.go +++ b/core/pkg/serverless/hostfunctions/host_services.go @@ -43,18 +43,21 @@ func NewHostFunctions( } return &HostFunctions{ - db: db, - cacheClient: cacheClient, - storage: storage, - ipfsAPIURL: cfg.IPFSAPIURL, - pubsub: pubsubAdapter, - wsManager: wsManager, - secrets: secrets, - pushDispatcher: pushDispatcher, - pushManager: pushManager, - wsBridge: wsBridge, - httpClient: tlsutil.NewHTTPClient(httpTimeout), - logger: logger, - logs: make([]serverless.LogEntry, 0), + db: db, + cacheClient: cacheClient, + storage: storage, + ipfsAPIURL: cfg.IPFSAPIURL, + pubsub: pubsubAdapter, + wsManager: wsManager, + secrets: secrets, + pushDispatcher: pushDispatcher, + pushManager: pushManager, + wsBridge: wsBridge, + turnDomain: cfg.TURNDomain, + turnSecret: cfg.TURNSecret, + stealthCDNDomain: cfg.StealthCDNDomain, + httpClient: tlsutil.NewHTTPClient(httpTimeout), + logger: logger, + logs: make([]serverless.LogEntry, 0), } } diff --git a/core/pkg/serverless/hostfunctions/turn.go b/core/pkg/serverless/hostfunctions/turn.go new file mode 100644 index 0000000..aed5f61 --- /dev/null +++ b/core/pkg/serverless/hostfunctions/turn.go @@ -0,0 +1,111 @@ +package hostfunctions + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/DeBrosOfficial/network/pkg/serverless" + "github.com/DeBrosOfficial/network/pkg/turn" +) + +// turnCredentialTTL mirrors the HTTP handler at +// pkg/gateway/handlers/webrtc/credentials.go — the credentials are +// time-bound HMAC tokens and 10min is the operational sweet spot +// (long enough for a call to set up, short enough to limit replay +// exposure if a token leaks). +const turnCredentialTTL = 10 * time.Minute + +// turnCredentialsEnvelope is the JSON shape returned by TurnCredentials. +// Mirrors what the HTTP credentials endpoint returns at the wire so +// WASM callers and JS clients see the same field names — keeps SDKs +// trivial. `configured=false` means TURN isn't set up on this gateway +// (TURNSecret empty); callers should fall back to STUN-only. +type turnCredentialsEnvelope struct { + Configured bool `json:"configured"` + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` + TTL int `json:"ttl,omitempty"` // seconds + URIs []string `json:"uris,omitempty"` + Namespace string `json:"namespace,omitempty"` +} + +// TurnCredentials implements feat-9 — minting TURN credentials inside a +// WASM function without a round-trip through HTTP. Mirrors the +// `POST /v1/webrtc/turn/credentials` endpoint exactly: derives the +// namespace from the invocation context (caller cannot spoof), generates +// per-namespace HMAC credentials via pkg/turn, and assembles the same +// URI list (including stealth TURN-over-443 when StealthCDNDomain is +// set). +// +// Returns a JSON envelope identical in shape to the HTTP response, so +// the WASM-side SDK helper can return it as-is to in-process callers +// who want to inject the creds into RTCPeerConnection config. +// +// Setup-failure semantics match the rest of the host-fn family: +// - No namespace in invocation context → Go error (HostFunctionError). +// This should never happen in normal serverless flow but is defensive. +// - TURN not configured on this gateway (TURNSecret empty) → returns +// {configured:false} as a structured envelope, NOT an error. Same +// shape as PushSend's silent no-op when push isn't configured — +// keeps functions portable across deployments. +func (h *HostFunctions) TurnCredentials(ctx context.Context) ([]byte, error) { + cur := h.currentInvocationContext(ctx) + if cur == nil || cur.Namespace == "" { + return nil, &serverless.HostFunctionError{ + Function: "turn_credentials", + Cause: fmt.Errorf("no namespace in invocation context"), + } + } + + if h.turnSecret == "" { + // TURN not configured on this gateway — return structured + // "not configured" envelope so the caller can fall back to + // STUN-only without treating it as a function-level error. + // Matches the HTTP handler's 503 semantically, but at the host- + // fn boundary we encode it as a result shape, not a Go error. + return json.Marshal(turnCredentialsEnvelope{ + Configured: false, + Namespace: cur.Namespace, + }) + } + + username, password := turn.GenerateCredentials(h.turnSecret, cur.Namespace, turnCredentialTTL) + + uris := buildTURNURIs(h.turnDomain, h.stealthCDNDomain) + + return json.Marshal(turnCredentialsEnvelope{ + Configured: true, + Username: username, + Password: password, + TTL: int(turnCredentialTTL.Seconds()), + URIs: uris, + Namespace: cur.Namespace, + }) +} + +// buildTURNURIs is the URI assembly shared between the host-fn path and +// the HTTP credentials handler. Returns an empty slice when neither +// turnDomain nor stealthCDNDomain is set — caller-side this means +// "TURN reachable but no public URI to advertise", which is a config +// problem the operator should fix. +// +// Stealth: when stealthCDNDomain is non-empty we append +// `turns::443` — that endpoint is served by the in-house SNI +// router on the standard HTTPS port and looks like ordinary TLS to a +// passive observer / DPI. Usable in restricted regions. +func buildTURNURIs(turnDomain, stealthCDNDomain string) []string { + var uris []string + if turnDomain != "" { + uris = append(uris, + fmt.Sprintf("turn:%s:3478?transport=udp", turnDomain), + fmt.Sprintf("turn:%s:3478?transport=tcp", turnDomain), + fmt.Sprintf("turns:%s:5349", turnDomain), + ) + } + if stealthCDNDomain != "" { + uris = append(uris, fmt.Sprintf("turns:%s:443", stealthCDNDomain)) + } + return uris +} diff --git a/core/pkg/serverless/hostfunctions/turn_test.go b/core/pkg/serverless/hostfunctions/turn_test.go new file mode 100644 index 0000000..8bc3e95 --- /dev/null +++ b/core/pkg/serverless/hostfunctions/turn_test.go @@ -0,0 +1,210 @@ +package hostfunctions + +import ( + "context" + "encoding/json" + "strings" + "testing" + + "github.com/DeBrosOfficial/network/pkg/serverless" +) + +// feat-9 — turn_credentials host fn. +// +// Mirrors the /v1/webrtc/turn/credentials HTTP endpoint so WASM +// functions can mint per-namespace TURN credentials without a round-trip +// back through HTTP. These tests pin the contract that external SDK +// helpers and AnChat's call setup logic depend on. + +func TestTurnCredentials_returnsConfiguredEnvelopeWhenSecretSet(t *testing.T) { + // Happy path: TURN configured → returns full envelope with username, + // password, ttl, uris. + h := &HostFunctions{ + turnDomain: "turn.example.com", + turnSecret: "deadbeef-shared-secret-for-hmac", + stealthCDNDomain: "", + } + h.SetInvocationContext(&serverless.InvocationContext{Namespace: "test-ns"}) + + raw, err := h.TurnCredentials(context.Background()) + if err != nil { + t.Fatalf("TurnCredentials: %v", err) + } + var env turnCredentialsEnvelope + if err := json.Unmarshal(raw, &env); err != nil { + t.Fatalf("unmarshal envelope: %v", err) + } + + if !env.Configured { + t.Error("Configured = false; want true when turnSecret is set") + } + if env.Namespace != "test-ns" { + t.Errorf("Namespace = %q; want test-ns (namespace must be derived from invCtx, not caller-controlled)", + env.Namespace) + } + if env.Username == "" { + t.Error("Username empty; want HMAC-derived value") + } + if env.Password == "" { + t.Error("Password empty; want HMAC-derived value") + } + if env.TTL != int(turnCredentialTTL.Seconds()) { + t.Errorf("TTL = %d; want %d (matches HTTP endpoint)", env.TTL, int(turnCredentialTTL.Seconds())) + } + // Username MUST contain the namespace per pkg/turn HMAC contract — + // this is what the TURN server uses to scope the credential. + if !strings.Contains(env.Username, "test-ns") { + t.Errorf("Username %q must contain the namespace for TURN server-side scope check", env.Username) + } +} + +func TestTurnCredentials_returnsURIsForDomain(t *testing.T) { + // Verify URI assembly mirrors the HTTP endpoint exactly — same three + // URIs (udp + tcp + tls5349) when only turnDomain is set. + h := &HostFunctions{ + turnDomain: "turn.example.com", + turnSecret: "secret", + } + h.SetInvocationContext(&serverless.InvocationContext{Namespace: "ns"}) + + raw, _ := h.TurnCredentials(context.Background()) + var env turnCredentialsEnvelope + _ = json.Unmarshal(raw, &env) + + if len(env.URIs) != 3 { + t.Fatalf("URIs count = %d; want 3 (udp+tcp+tls5349)", len(env.URIs)) + } + want := []string{ + "turn:turn.example.com:3478?transport=udp", + "turn:turn.example.com:3478?transport=tcp", + "turns:turn.example.com:5349", + } + for i, w := range want { + if env.URIs[i] != w { + t.Errorf("URIs[%d] = %q; want %q", i, env.URIs[i], w) + } + } +} + +func TestTurnCredentials_stealthCDNAppendsTurns443(t *testing.T) { + // Stealth: turns::443 must be APPENDED to the + // regular URI list. Used in restricted regions where regular TURN + // ports are blocked; the SNI router serves it as ordinary HTTPS on + // :443 so DPI can't distinguish it. Critical for AnChat's restricted- + // region UX (bugboard #411 + stealth TURN plan 4). + h := &HostFunctions{ + turnDomain: "turn.example.com", + turnSecret: "secret", + stealthCDNDomain: "cdn.example.com", + } + h.SetInvocationContext(&serverless.InvocationContext{Namespace: "ns"}) + + raw, _ := h.TurnCredentials(context.Background()) + var env turnCredentialsEnvelope + _ = json.Unmarshal(raw, &env) + + if len(env.URIs) != 4 { + t.Fatalf("URIs count = %d; want 4 (3 regular + 1 stealth)", len(env.URIs)) + } + stealth := env.URIs[3] + want := "turns:cdn.example.com:443" + if stealth != want { + t.Errorf("stealth URI = %q; want %q", stealth, want) + } +} + +func TestTurnCredentials_notConfiguredWhenSecretEmpty(t *testing.T) { + // Back-compat / portability: when TURN isn't configured on this + // gateway, return a structured {configured:false} envelope — NOT a + // Go error. Same shape contract as PushSend's silent-noop when push + // isn't configured. Lets the same WASM function run unchanged on + // dev environments without TURN. + h := &HostFunctions{ + turnSecret: "", + } + h.SetInvocationContext(&serverless.InvocationContext{Namespace: "ns"}) + + raw, err := h.TurnCredentials(context.Background()) + if err != nil { + t.Fatalf("TurnCredentials must NOT return Go error when TURN unconfigured; got %v", err) + } + var env turnCredentialsEnvelope + _ = json.Unmarshal(raw, &env) + if env.Configured { + t.Error("Configured = true; want false when turnSecret is empty (caller relies on this to fall back to STUN-only)") + } + if env.Namespace != "ns" { + t.Errorf("Namespace = %q; want ns (still populated for logging context)", env.Namespace) + } + if env.Username != "" || env.Password != "" { + t.Error("Username/Password must be empty when not configured (no credentials to leak)") + } +} + +func TestTurnCredentials_errorsWhenNoNamespaceInContext(t *testing.T) { + // Defensive: serverless invocation should always have a namespace. + // If not, return a Go error rather than producing TURN credentials + // for an empty namespace (which would be a security bug — TURN + // HMAC username is the namespace + ts, so "" would shadow any + // real-namespace creds at the TURN server's auth check). + h := &HostFunctions{turnSecret: "secret"} + // no SetInvocationContext + + _, err := h.TurnCredentials(context.Background()) + if err == nil { + t.Fatal("no invocation context: must return error (avoid empty-namespace credentials)") + } + if !strings.Contains(err.Error(), "namespace") { + t.Errorf("error %q should mention namespace for caller diagnostics", err.Error()) + } +} + +func TestTurnCredentials_credentialsAreNamespaceScoped(t *testing.T) { + // Two different namespaces issued through the SAME host fn instance + // MUST get distinct credentials. Catches a regression where the + // namespace gets cached at host-fn construction instead of read + // per-invocation from invCtx. + h := &HostFunctions{ + turnDomain: "turn.example.com", + turnSecret: "shared-secret", + } + + h.SetInvocationContext(&serverless.InvocationContext{Namespace: "ns-a"}) + rawA, _ := h.TurnCredentials(context.Background()) + var envA turnCredentialsEnvelope + _ = json.Unmarshal(rawA, &envA) + + h.SetInvocationContext(&serverless.InvocationContext{Namespace: "ns-b"}) + rawB, _ := h.TurnCredentials(context.Background()) + var envB turnCredentialsEnvelope + _ = json.Unmarshal(rawB, &envB) + + if envA.Username == envB.Username { + t.Error("ns-a and ns-b got identical username — namespace not flowing per-invocation") + } + if envA.Password == envB.Password { + t.Error("ns-a and ns-b got identical password — credentials not namespace-scoped (security bug)") + } +} + +// buildTURNURIs unit tests — the pure helper used by both this host fn +// and the HTTP endpoint. Cheap regression coverage. + +func TestBuildTURNURIs_emptyDomainNoURIs(t *testing.T) { + if got := buildTURNURIs("", ""); len(got) != 0 { + t.Errorf("empty domain + empty stealth: want 0 URIs, got %d (%v)", len(got), got) + } +} + +func TestBuildTURNURIs_stealthOnlyOmitsRegularURIs(t *testing.T) { + // Edge: operator configured stealth but not regular TURN. Returns + // ONLY the stealth URI — caller falls back to STUN if they can't + // reach it. Don't pretend the regular TURN exists. + got := buildTURNURIs("", "cdn.example.com") + if len(got) != 1 { + t.Fatalf("want 1 stealth-only URI, got %d (%v)", len(got), got) + } + if got[0] != "turns:cdn.example.com:443" { + t.Errorf("stealth URI mismatch: %q", got[0]) + } +} diff --git a/core/pkg/serverless/hostfunctions/types.go b/core/pkg/serverless/hostfunctions/types.go index 975041d..43773c8 100644 --- a/core/pkg/serverless/hostfunctions/types.go +++ b/core/pkg/serverless/hostfunctions/types.go @@ -20,6 +20,15 @@ import ( type HostFunctionsConfig struct { IPFSAPIURL string HTTPTimeout time.Duration + + // TURN configuration — feat-9. Plumbed in from the gateway so the + // `turn_credentials` host fn can mint per-namespace TURN credentials + // without a round-trip back through HTTP. Mirrors the HTTP endpoint + // at /v1/webrtc/turn/credentials. TURNSecret empty → host fn returns + // a structured "TURN not configured" envelope (no error). + TURNDomain string + TURNSecret string + StealthCDNDomain string // optional; non-empty adds turns::443 URI } // HostFunctions provides the bridge between WASM functions and Orama services. @@ -54,6 +63,15 @@ type HostFunctions struct { invoker serverless.FunctionInvoker invokerLock sync.RWMutex + // TURN config — feat-9. Cached at NewHostFunctions; immutable for + // the gateway's lifetime so no lock needed. Empty TURNSecret means + // `turn_credentials` host fn returns a configured=false envelope + // instead of an error (same shape as PushSend's silent-noop when + // push isn't configured — keeps functions portable). + turnDomain string + turnSecret string + stealthCDNDomain string + // triggerDispatcher is set after construction (via SetTriggerDispatcher). // When non-nil, PubSubPublish / PubSubPublishBatch synchronously fire // wildcard triggers on the local gateway so functions like diff --git a/core/pkg/serverless/mocks_test.go b/core/pkg/serverless/mocks_test.go index 3ee85a4..1091520 100644 --- a/core/pkg/serverless/mocks_test.go +++ b/core/pkg/serverless/mocks_test.go @@ -225,6 +225,12 @@ func (m *MockHostServices) PushSendV2(ctx context.Context, userID string, msgJSO return []byte(`{"ok":true,"devices_attempted":0,"devices_succeeded":0,"results":[]}`), nil } +func (m *MockHostServices) TurnCredentials(ctx context.Context) ([]byte, error) { + // Mirror PushSendV2's silent-noop-style envelope when not configured — + // matches the documented host-fn contract for TURN being absent. + return []byte(`{"configured":false}`), nil +} + func (m *MockHostServices) DBTransaction(ctx context.Context, opsJSON []byte) ([]byte, error) { return []byte(`{"committed":true,"results":[]}`), nil } diff --git a/core/pkg/serverless/types.go b/core/pkg/serverless/types.go index c14995e..896219a 100644 --- a/core/pkg/serverless/types.go +++ b/core/pkg/serverless/types.go @@ -503,6 +503,30 @@ type HostServices interface { // envelope. Same shape as DBTransaction's "structured per-op result". PushSendV2(ctx context.Context, userID string, msgJSON []byte) ([]byte, error) + // TurnCredentials mints per-namespace TURN HMAC credentials for the + // caller's namespace (derived from invocation context — caller + // cannot spoof). Returns a JSON envelope matching the HTTP endpoint + // at /v1/webrtc/turn/credentials: + // + // { + // "configured": true, + // "username": ":", + // "password": "", + // "ttl": 600, + // "uris": ["turn:...", "turns:...:443"], + // "namespace": "" + // } + // + // When TURN isn't configured on this gateway (TURNSecret empty), + // returns {configured:false, namespace:} as a structured envelope + // — NOT a Go error. This matches PushSend's silent-noop semantics so + // functions stay portable across deployments with/without TURN. + // + // Bugboard feat-9 — removes the round-trip through HTTP for WASM + // functions that need to inject TURN credentials into a peer's + // RTCConfiguration without going back out to the gateway. + TurnCredentials(ctx context.Context) ([]byte, error) + // ExecAndPublish runs ops atomically (like DBTransaction) and, ONLY // if the batch commits, publishes data to the named topic with any // occurrence of the literal string "{{seq}}" replaced by the assigned