mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-06-16 21:54:14 +00:00
feat(serverless): add turn_credentials host function and slow invocation diagnostics
- Implement `turn_credentials` host function to provide TURN configuration to WASM modules. - Add structured logging for slow serverless invocations exceeding 5s, providing per-phase timing (rate-limit, module-load, execution) to identify performance bottlenecks. - Enhance WebSocket handler logging to capture request context when 30s timeouts occur.
This commit is contained in:
parent
8fbc4485c1
commit
cfff08d91e
@ -506,6 +506,12 @@ func initializeServerless(logger *logging.ColoredLogger, cfg *Config, deps *Depe
|
||||
hostFuncsCfg := hostfunctions.HostFunctionsConfig{
|
||||
IPFSAPIURL: cfg.IPFSAPIURL,
|
||||
HTTPTimeout: 30 * time.Second,
|
||||
// feat-9 — TURN config for the turn_credentials host fn.
|
||||
// Empty TURNSecret → host fn returns {configured:false} envelope
|
||||
// (same shape as the HTTP endpoint's 503 semantically).
|
||||
TURNDomain: cfg.TURNDomain,
|
||||
TURNSecret: cfg.TURNSecret,
|
||||
StealthCDNDomain: cfg.StealthCDNDomain,
|
||||
}
|
||||
// WS-PubSub bridge: wire PubSub topics directly to WS clients without
|
||||
// per-event WASM invocation. The bridge is a thin layer over the
|
||||
|
||||
@ -172,6 +172,26 @@ func (h *ServerlessHandlers) HandleWebSocket(w http.ResponseWriter, r *http.Requ
|
||||
}
|
||||
|
||||
resp, err := h.invoker.Invoke(ctx, req)
|
||||
// Bugboard #24 diagnostic — when the 30s WS-handler timeout
|
||||
// actually fires, log a structured warning so AnChat's next
|
||||
// "signaling.relay timed out" report includes request_id +
|
||||
// function + namespace + duration. Pre-fix this surfaced as
|
||||
// opaque "RPC timeout after 30s" with no way to correlate to a
|
||||
// specific invocation in engine logs.
|
||||
if err != nil && ctx.Err() == context.DeadlineExceeded {
|
||||
fields := []zap.Field{
|
||||
zap.String("namespace", namespace),
|
||||
zap.String("function", name),
|
||||
zap.String("ws_client_id", clientID),
|
||||
zap.Int64("duration_ms", resp.DurationMS),
|
||||
zap.Int("timeout_ms", 30000),
|
||||
zap.String("caller_wallet", callerWallet),
|
||||
}
|
||||
if resp.RequestID != "" {
|
||||
fields = append(fields, zap.String("request_id", resp.RequestID))
|
||||
}
|
||||
h.logger.Warn("WS function-invoke hit 30s ceiling (bug-24)", fields...)
|
||||
}
|
||||
cancel()
|
||||
|
||||
// Send response back
|
||||
|
||||
@ -221,7 +221,20 @@ func NewEngine(cfg *Config, registry FunctionRegistry, hostServices HostServices
|
||||
return engine, nil
|
||||
}
|
||||
|
||||
// slowInvokeThreshold is the wall-clock duration above which Execute
|
||||
// emits a structured "slow invocation" warning with per-phase
|
||||
// breakdown. Picked to be well below the WS-handler's 30s ceiling
|
||||
// (bugboard #24) so we get diagnostic logs BEFORE the timeout actually
|
||||
// fires, surfacing which phase is the sink.
|
||||
const slowInvokeThreshold = 5 * time.Second
|
||||
|
||||
// Execute runs a function with the given input and returns the output.
|
||||
//
|
||||
// Emits per-phase timing telemetry when total duration exceeds
|
||||
// slowInvokeThreshold — bugboard #24 diagnostic. Without this, slow
|
||||
// invocations only surfaced as opaque "RPC timeout after 30s" at the
|
||||
// WS handler, with no way to tell whether the sink was rate-limit
|
||||
// checks, module compile, or WASM execution itself.
|
||||
func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx *InvocationContext) ([]byte, error) {
|
||||
if fn == nil {
|
||||
return nil, &ValidationError{Field: "function", Message: "cannot be nil"}
|
||||
@ -229,6 +242,15 @@ func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx
|
||||
|
||||
invCtx = EnsureInvocationContext(invCtx, fn)
|
||||
startTime := time.Now()
|
||||
// Per-phase timestamps for the slow-invoke log (bugboard #24
|
||||
// diagnostic). Zero values mean the phase was never entered, which
|
||||
// itself is signal (e.g. ratelimitMs=0 with totalMs=30000 means we
|
||||
// blocked entirely in module-load or execution).
|
||||
var (
|
||||
ratelimitDoneAt time.Time
|
||||
moduleLoadedAt time.Time
|
||||
executeDoneAt time.Time
|
||||
)
|
||||
|
||||
// Check rate limit. Prefer the tiered path when the limiter supports it
|
||||
// — that gives per-(ns, fn, wallet, ip) enforcement with retry-after.
|
||||
@ -257,6 +279,8 @@ func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx
|
||||
}
|
||||
}
|
||||
|
||||
ratelimitDoneAt = time.Now()
|
||||
|
||||
// Create timeout context
|
||||
execCtx, cancel := CreateTimeoutContext(ctx, fn, e.config.MaxTimeoutSeconds)
|
||||
defer cancel()
|
||||
@ -292,8 +316,10 @@ func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx
|
||||
module, err := e.getOrCompileModule(execCtx, fn.WASMCID)
|
||||
if err != nil {
|
||||
e.logInvocation(ctx, fn, invCtx, logBuf, startTime, 0, InvocationStatusError, err)
|
||||
e.logSlowInvocation(invCtx, startTime, ratelimitDoneAt, moduleLoadedAt, executeDoneAt, "module-load-failed", err)
|
||||
return nil, &ExecutionError{FunctionName: fn.Name, RequestID: invCtx.RequestID, Cause: err}
|
||||
}
|
||||
moduleLoadedAt = time.Now()
|
||||
|
||||
// Execute the module with context setters
|
||||
var contextSetter, contextClearer func()
|
||||
@ -302,6 +328,7 @@ func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx
|
||||
contextClearer = func() { hf.ClearContext() }
|
||||
}
|
||||
output, err := e.executor.ExecuteModule(execCtx, module, fn.Name, input, contextSetter, contextClearer)
|
||||
executeDoneAt = time.Now()
|
||||
if err != nil {
|
||||
status := InvocationStatusError
|
||||
if execCtx.Err() == context.DeadlineExceeded {
|
||||
@ -309,13 +336,60 @@ func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx
|
||||
err = ErrTimeout
|
||||
}
|
||||
e.logInvocation(ctx, fn, invCtx, logBuf, startTime, len(output), status, err)
|
||||
e.logSlowInvocation(invCtx, startTime, ratelimitDoneAt, moduleLoadedAt, executeDoneAt, string(status), err)
|
||||
return nil, &ExecutionError{FunctionName: fn.Name, RequestID: invCtx.RequestID, Cause: err}
|
||||
}
|
||||
|
||||
e.logInvocation(ctx, fn, invCtx, logBuf, startTime, len(output), InvocationStatusSuccess, nil)
|
||||
e.logSlowInvocation(invCtx, startTime, ratelimitDoneAt, moduleLoadedAt, executeDoneAt, "success", nil)
|
||||
return output, nil
|
||||
}
|
||||
|
||||
// logSlowInvocation emits a structured warning when total wall-clock
|
||||
// exceeds slowInvokeThreshold (bugboard #24 diagnostic). Per-phase
|
||||
// timestamps let operators see WHICH layer was the sink — pre-fix the
|
||||
// only signal was an opaque WS-handler "timeout after 30s" with no way
|
||||
// to tell whether rate-limit, module-load, or WASM-execute consumed
|
||||
// the budget.
|
||||
//
|
||||
// Zero-valued phase timestamps mean the phase was never reached, which
|
||||
// is itself signal — e.g. moduleLoadedAt=zero + executeDoneAt=zero with
|
||||
// large totalMs means we blocked in rate-limit OR module-load.
|
||||
func (e *Engine) logSlowInvocation(invCtx *InvocationContext, startTime, ratelimitDoneAt, moduleLoadedAt, executeDoneAt time.Time, status string, err error) {
|
||||
totalMs := time.Since(startTime).Milliseconds()
|
||||
if totalMs < slowInvokeThreshold.Milliseconds() {
|
||||
return
|
||||
}
|
||||
// Compute phase deltas. Use 0 for unreached phases so the log line
|
||||
// columns are stable.
|
||||
var ratelimitMs, moduleLoadMs, executeMs int64
|
||||
if !ratelimitDoneAt.IsZero() {
|
||||
ratelimitMs = ratelimitDoneAt.Sub(startTime).Milliseconds()
|
||||
}
|
||||
if !moduleLoadedAt.IsZero() && !ratelimitDoneAt.IsZero() {
|
||||
moduleLoadMs = moduleLoadedAt.Sub(ratelimitDoneAt).Milliseconds()
|
||||
}
|
||||
if !executeDoneAt.IsZero() && !moduleLoadedAt.IsZero() {
|
||||
executeMs = executeDoneAt.Sub(moduleLoadedAt).Milliseconds()
|
||||
}
|
||||
fields := []zap.Field{
|
||||
zap.String("namespace", invCtx.Namespace),
|
||||
zap.String("function", invCtx.FunctionName),
|
||||
zap.String("request_id", invCtx.RequestID),
|
||||
zap.String("trigger_type", string(invCtx.TriggerType)),
|
||||
zap.String("ws_client_id", invCtx.WSClientID),
|
||||
zap.Int64("total_ms", totalMs),
|
||||
zap.Int64("ratelimit_ms", ratelimitMs),
|
||||
zap.Int64("module_load_ms", moduleLoadMs),
|
||||
zap.Int64("execute_ms", executeMs),
|
||||
zap.String("invocation_status", status),
|
||||
}
|
||||
if err != nil {
|
||||
fields = append(fields, zap.Error(err))
|
||||
}
|
||||
e.logger.Warn("slow serverless invocation (bug-24 diagnostic)", fields...)
|
||||
}
|
||||
|
||||
// Precompile compiles a WASM module and caches it for faster execution.
|
||||
func (e *Engine) Precompile(ctx context.Context, wasmCID string, wasmBytes []byte) error {
|
||||
if wasmCID == "" {
|
||||
@ -665,6 +739,7 @@ func (e *Engine) registerHostModule(ctx context.Context) error {
|
||||
NewFunctionBuilder().WithFunc(e.hPubSubPublishBatch).Export("pubsub_publish_batch").
|
||||
NewFunctionBuilder().WithFunc(e.hPushSend).Export("push_send").
|
||||
NewFunctionBuilder().WithFunc(e.hPushSendV2).Export("push_send_v2").
|
||||
NewFunctionBuilder().WithFunc(e.hTurnCredentials).Export("turn_credentials").
|
||||
NewFunctionBuilder().WithFunc(e.hWSPubSubBridge).Export("ws_pubsub_bridge").
|
||||
NewFunctionBuilder().WithFunc(e.hWSPubSubUnbridge).Export("ws_pubsub_unbridge").
|
||||
NewFunctionBuilder().WithFunc(e.hWSSend).Export("ws_send").
|
||||
@ -1199,6 +1274,24 @@ func (e *Engine) hPushSendV2(ctx context.Context, mod api.Module,
|
||||
return e.executor.WriteToGuest(ctx, mod, out)
|
||||
}
|
||||
|
||||
// hTurnCredentials is the WASM-callable wrapper for TurnCredentials —
|
||||
// feat-9. Takes no args (namespace derived from invocation context),
|
||||
// returns packed uint64 (ptr<<32 | len) pointing to a JSON envelope in
|
||||
// guest memory, or 0 on setup error.
|
||||
//
|
||||
// The envelope shape is documented at turn.go:turnCredentialsEnvelope.
|
||||
// Callers MUST parse it — a non-zero return doesn't imply TURN is
|
||||
// configured (check envelope.configured before using credentials).
|
||||
func (e *Engine) hTurnCredentials(ctx context.Context, mod api.Module) uint64 {
|
||||
out, err := e.hostServices.TurnCredentials(ctx)
|
||||
if err != nil {
|
||||
e.logger.Warn("host function turn_credentials failed",
|
||||
zap.Error(err))
|
||||
return 0
|
||||
}
|
||||
return e.executor.WriteToGuest(ctx, mod, out)
|
||||
}
|
||||
|
||||
func (e *Engine) hLogInfo(ctx context.Context, mod api.Module, ptr, size uint32) {
|
||||
msg, ok := e.executor.ReadFromGuest(mod, ptr, size)
|
||||
if ok {
|
||||
|
||||
130
core/pkg/serverless/engine_slow_invoke_test.go
Normal file
130
core/pkg/serverless/engine_slow_invoke_test.go
Normal file
@ -0,0 +1,130 @@
|
||||
package serverless
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"go.uber.org/zap/zaptest/observer"
|
||||
)
|
||||
|
||||
// Bugboard #24 — slow-invoke diagnostic logging.
|
||||
//
|
||||
// The WS handler enforces a 30s ceiling on function-invoke. Pre-#24,
|
||||
// when that ceiling fired AnChat saw "RPC timeout after 30s" with no
|
||||
// way to tell whether the engine was blocked in rate-limit checks,
|
||||
// module compile, or WASM execution itself. Engine.Execute now emits
|
||||
// a structured "slow serverless invocation" warning above
|
||||
// slowInvokeThreshold (5s) with per-phase breakdown so the next test
|
||||
// run gives operators a smoking gun pointing at the actual sink.
|
||||
//
|
||||
// These tests pin the log shape so a refactor can't silently drop
|
||||
// fields AnChat will be looking for.
|
||||
|
||||
func TestLogSlowInvocation_belowThresholdEmitsNothing(t *testing.T) {
|
||||
// Trivial: fast invocations don't pollute logs. The threshold
|
||||
// exists specifically so warning-grade logs stay actionable.
|
||||
core, observed := observer.New(zapcore.WarnLevel)
|
||||
e := &Engine{logger: zap.New(core)}
|
||||
invCtx := &InvocationContext{Namespace: "ns", FunctionName: "fast-fn"}
|
||||
|
||||
now := time.Now()
|
||||
e.logSlowInvocation(invCtx, now, now.Add(1*time.Millisecond), now.Add(2*time.Millisecond), now.Add(100*time.Millisecond), "success", nil)
|
||||
|
||||
if got := observed.Len(); got != 0 {
|
||||
t.Errorf("fast invocation (100ms < 5s threshold) emitted %d log lines; want 0", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogSlowInvocation_aboveThresholdEmitsBreakdown(t *testing.T) {
|
||||
// The actual bug-24 diagnostic. Total > 5s → emit warning with
|
||||
// ALL phase fields populated so AnChat's next slow-call report
|
||||
// pins which layer is the sink.
|
||||
core, observed := observer.New(zapcore.WarnLevel)
|
||||
e := &Engine{logger: zap.New(core)}
|
||||
invCtx := &InvocationContext{
|
||||
Namespace: "anchat-test",
|
||||
FunctionName: "signaling.relay",
|
||||
RequestID: "req-abc-123",
|
||||
TriggerType: TriggerTypeWebSocket,
|
||||
WSClientID: "ws-client-xyz",
|
||||
}
|
||||
|
||||
// Simulate a 30s-class invocation that spent the bulk in execute.
|
||||
start := time.Now().Add(-30 * time.Second)
|
||||
ratelimitDone := start.Add(50 * time.Millisecond)
|
||||
moduleLoaded := start.Add(150 * time.Millisecond)
|
||||
executeDone := start.Add(30 * time.Second)
|
||||
e.logSlowInvocation(invCtx, start, ratelimitDone, moduleLoaded, executeDone, "timeout", nil)
|
||||
|
||||
logs := observed.All()
|
||||
if len(logs) != 1 {
|
||||
t.Fatalf("slow invocation emitted %d log lines; want 1", len(logs))
|
||||
}
|
||||
got := logs[0]
|
||||
|
||||
// Smoking-gun fields AnChat's diagnostic will read:
|
||||
want := map[string]interface{}{
|
||||
"namespace": "anchat-test",
|
||||
"function": "signaling.relay",
|
||||
"request_id": "req-abc-123",
|
||||
"ws_client_id": "ws-client-xyz",
|
||||
"invocation_status": "timeout",
|
||||
}
|
||||
for k, v := range want {
|
||||
field, ok := got.ContextMap()[k]
|
||||
if !ok {
|
||||
t.Errorf("missing field %q in slow-invoke log (AnChat depends on this)", k)
|
||||
continue
|
||||
}
|
||||
if field != v {
|
||||
t.Errorf("field %q = %v; want %v", k, field, v)
|
||||
}
|
||||
}
|
||||
|
||||
// Phase timings — the actual diagnostic value. Total ≈ 30s, with
|
||||
// rate-limit + module-load being trivial fractions, so execute_ms
|
||||
// should dominate. This tells operators "WASM execution is the
|
||||
// sink, not rate-limit or module compile."
|
||||
contextMap := got.ContextMap()
|
||||
totalMs, _ := contextMap["total_ms"].(int64)
|
||||
executeMs, _ := contextMap["execute_ms"].(int64)
|
||||
if totalMs < 29000 || totalMs > 31000 {
|
||||
t.Errorf("total_ms = %d; want ~30000 for the simulated 30s invocation", totalMs)
|
||||
}
|
||||
if executeMs < 29000 || executeMs > 30000 {
|
||||
t.Errorf("execute_ms = %d; want ~29900 (proves the phase-breakdown points at execute)", executeMs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogSlowInvocation_zeroPhaseTimestampsMeanUnreached(t *testing.T) {
|
||||
// Defensive: if Execute bails early (e.g. module compile fails
|
||||
// before WASM runs), executeDoneAt is zero. The log must still
|
||||
// emit with executeMs=0 rather than producing negative or absurd
|
||||
// values from subtracting zero.Time. This shape lets ops see
|
||||
// "we never reached execute" as a distinct signal from "execute
|
||||
// was fast."
|
||||
core, observed := observer.New(zapcore.WarnLevel)
|
||||
e := &Engine{logger: zap.New(core)}
|
||||
invCtx := &InvocationContext{Namespace: "ns", FunctionName: "fn"}
|
||||
|
||||
start := time.Now().Add(-10 * time.Second)
|
||||
ratelimitDone := start.Add(100 * time.Millisecond)
|
||||
// moduleLoadedAt and executeDoneAt left as zero — module-load failed
|
||||
e.logSlowInvocation(invCtx, start, ratelimitDone, time.Time{}, time.Time{}, "module-load-failed", nil)
|
||||
|
||||
logs := observed.All()
|
||||
if len(logs) != 1 {
|
||||
t.Fatalf("want 1 log line; got %d", len(logs))
|
||||
}
|
||||
cm := logs[0].ContextMap()
|
||||
moduleLoadMs, _ := cm["module_load_ms"].(int64)
|
||||
executeMs, _ := cm["execute_ms"].(int64)
|
||||
if moduleLoadMs != 0 {
|
||||
t.Errorf("module_load_ms = %d; want 0 when moduleLoadedAt was never set (signals 'unreached')", moduleLoadMs)
|
||||
}
|
||||
if executeMs != 0 {
|
||||
t.Errorf("execute_ms = %d; want 0 when executeDoneAt was never set", executeMs)
|
||||
}
|
||||
}
|
||||
@ -114,6 +114,10 @@ func (m *mockHostServices) PushSendV2(ctx context.Context, userID string, msgJSO
|
||||
return []byte(`{"ok":true,"devices_attempted":0,"devices_succeeded":0,"results":[]}`), nil
|
||||
}
|
||||
|
||||
func (m *mockHostServices) TurnCredentials(ctx context.Context) ([]byte, error) {
|
||||
return []byte(`{"configured":false}`), nil
|
||||
}
|
||||
|
||||
func (m *mockHostServices) DBTransaction(ctx context.Context, opsJSON []byte) ([]byte, error) {
|
||||
return []byte(`{"committed":true,"results":[]}`), nil
|
||||
}
|
||||
|
||||
@ -43,18 +43,21 @@ func NewHostFunctions(
|
||||
}
|
||||
|
||||
return &HostFunctions{
|
||||
db: db,
|
||||
cacheClient: cacheClient,
|
||||
storage: storage,
|
||||
ipfsAPIURL: cfg.IPFSAPIURL,
|
||||
pubsub: pubsubAdapter,
|
||||
wsManager: wsManager,
|
||||
secrets: secrets,
|
||||
pushDispatcher: pushDispatcher,
|
||||
pushManager: pushManager,
|
||||
wsBridge: wsBridge,
|
||||
httpClient: tlsutil.NewHTTPClient(httpTimeout),
|
||||
logger: logger,
|
||||
logs: make([]serverless.LogEntry, 0),
|
||||
db: db,
|
||||
cacheClient: cacheClient,
|
||||
storage: storage,
|
||||
ipfsAPIURL: cfg.IPFSAPIURL,
|
||||
pubsub: pubsubAdapter,
|
||||
wsManager: wsManager,
|
||||
secrets: secrets,
|
||||
pushDispatcher: pushDispatcher,
|
||||
pushManager: pushManager,
|
||||
wsBridge: wsBridge,
|
||||
turnDomain: cfg.TURNDomain,
|
||||
turnSecret: cfg.TURNSecret,
|
||||
stealthCDNDomain: cfg.StealthCDNDomain,
|
||||
httpClient: tlsutil.NewHTTPClient(httpTimeout),
|
||||
logger: logger,
|
||||
logs: make([]serverless.LogEntry, 0),
|
||||
}
|
||||
}
|
||||
|
||||
111
core/pkg/serverless/hostfunctions/turn.go
Normal file
111
core/pkg/serverless/hostfunctions/turn.go
Normal file
@ -0,0 +1,111 @@
|
||||
package hostfunctions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless"
|
||||
"github.com/DeBrosOfficial/network/pkg/turn"
|
||||
)
|
||||
|
||||
// turnCredentialTTL mirrors the HTTP handler at
|
||||
// pkg/gateway/handlers/webrtc/credentials.go — the credentials are
|
||||
// time-bound HMAC tokens and 10min is the operational sweet spot
|
||||
// (long enough for a call to set up, short enough to limit replay
|
||||
// exposure if a token leaks).
|
||||
const turnCredentialTTL = 10 * time.Minute
|
||||
|
||||
// turnCredentialsEnvelope is the JSON shape returned by TurnCredentials.
|
||||
// Mirrors what the HTTP credentials endpoint returns at the wire so
|
||||
// WASM callers and JS clients see the same field names — keeps SDKs
|
||||
// trivial. `configured=false` means TURN isn't set up on this gateway
|
||||
// (TURNSecret empty); callers should fall back to STUN-only.
|
||||
type turnCredentialsEnvelope struct {
|
||||
Configured bool `json:"configured"`
|
||||
Username string `json:"username,omitempty"`
|
||||
Password string `json:"password,omitempty"`
|
||||
TTL int `json:"ttl,omitempty"` // seconds
|
||||
URIs []string `json:"uris,omitempty"`
|
||||
Namespace string `json:"namespace,omitempty"`
|
||||
}
|
||||
|
||||
// TurnCredentials implements feat-9 — minting TURN credentials inside a
|
||||
// WASM function without a round-trip through HTTP. Mirrors the
|
||||
// `POST /v1/webrtc/turn/credentials` endpoint exactly: derives the
|
||||
// namespace from the invocation context (caller cannot spoof), generates
|
||||
// per-namespace HMAC credentials via pkg/turn, and assembles the same
|
||||
// URI list (including stealth TURN-over-443 when StealthCDNDomain is
|
||||
// set).
|
||||
//
|
||||
// Returns a JSON envelope identical in shape to the HTTP response, so
|
||||
// the WASM-side SDK helper can return it as-is to in-process callers
|
||||
// who want to inject the creds into RTCPeerConnection config.
|
||||
//
|
||||
// Setup-failure semantics match the rest of the host-fn family:
|
||||
// - No namespace in invocation context → Go error (HostFunctionError).
|
||||
// This should never happen in normal serverless flow but is defensive.
|
||||
// - TURN not configured on this gateway (TURNSecret empty) → returns
|
||||
// {configured:false} as a structured envelope, NOT an error. Same
|
||||
// shape as PushSend's silent no-op when push isn't configured —
|
||||
// keeps functions portable across deployments.
|
||||
func (h *HostFunctions) TurnCredentials(ctx context.Context) ([]byte, error) {
|
||||
cur := h.currentInvocationContext(ctx)
|
||||
if cur == nil || cur.Namespace == "" {
|
||||
return nil, &serverless.HostFunctionError{
|
||||
Function: "turn_credentials",
|
||||
Cause: fmt.Errorf("no namespace in invocation context"),
|
||||
}
|
||||
}
|
||||
|
||||
if h.turnSecret == "" {
|
||||
// TURN not configured on this gateway — return structured
|
||||
// "not configured" envelope so the caller can fall back to
|
||||
// STUN-only without treating it as a function-level error.
|
||||
// Matches the HTTP handler's 503 semantically, but at the host-
|
||||
// fn boundary we encode it as a result shape, not a Go error.
|
||||
return json.Marshal(turnCredentialsEnvelope{
|
||||
Configured: false,
|
||||
Namespace: cur.Namespace,
|
||||
})
|
||||
}
|
||||
|
||||
username, password := turn.GenerateCredentials(h.turnSecret, cur.Namespace, turnCredentialTTL)
|
||||
|
||||
uris := buildTURNURIs(h.turnDomain, h.stealthCDNDomain)
|
||||
|
||||
return json.Marshal(turnCredentialsEnvelope{
|
||||
Configured: true,
|
||||
Username: username,
|
||||
Password: password,
|
||||
TTL: int(turnCredentialTTL.Seconds()),
|
||||
URIs: uris,
|
||||
Namespace: cur.Namespace,
|
||||
})
|
||||
}
|
||||
|
||||
// buildTURNURIs is the URI assembly shared between the host-fn path and
|
||||
// the HTTP credentials handler. Returns an empty slice when neither
|
||||
// turnDomain nor stealthCDNDomain is set — caller-side this means
|
||||
// "TURN reachable but no public URI to advertise", which is a config
|
||||
// problem the operator should fix.
|
||||
//
|
||||
// Stealth: when stealthCDNDomain is non-empty we append
|
||||
// `turns:<domain>:443` — that endpoint is served by the in-house SNI
|
||||
// router on the standard HTTPS port and looks like ordinary TLS to a
|
||||
// passive observer / DPI. Usable in restricted regions.
|
||||
func buildTURNURIs(turnDomain, stealthCDNDomain string) []string {
|
||||
var uris []string
|
||||
if turnDomain != "" {
|
||||
uris = append(uris,
|
||||
fmt.Sprintf("turn:%s:3478?transport=udp", turnDomain),
|
||||
fmt.Sprintf("turn:%s:3478?transport=tcp", turnDomain),
|
||||
fmt.Sprintf("turns:%s:5349", turnDomain),
|
||||
)
|
||||
}
|
||||
if stealthCDNDomain != "" {
|
||||
uris = append(uris, fmt.Sprintf("turns:%s:443", stealthCDNDomain))
|
||||
}
|
||||
return uris
|
||||
}
|
||||
210
core/pkg/serverless/hostfunctions/turn_test.go
Normal file
210
core/pkg/serverless/hostfunctions/turn_test.go
Normal file
@ -0,0 +1,210 @@
|
||||
package hostfunctions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless"
|
||||
)
|
||||
|
||||
// feat-9 — turn_credentials host fn.
|
||||
//
|
||||
// Mirrors the /v1/webrtc/turn/credentials HTTP endpoint so WASM
|
||||
// functions can mint per-namespace TURN credentials without a round-trip
|
||||
// back through HTTP. These tests pin the contract that external SDK
|
||||
// helpers and AnChat's call setup logic depend on.
|
||||
|
||||
func TestTurnCredentials_returnsConfiguredEnvelopeWhenSecretSet(t *testing.T) {
|
||||
// Happy path: TURN configured → returns full envelope with username,
|
||||
// password, ttl, uris.
|
||||
h := &HostFunctions{
|
||||
turnDomain: "turn.example.com",
|
||||
turnSecret: "deadbeef-shared-secret-for-hmac",
|
||||
stealthCDNDomain: "",
|
||||
}
|
||||
h.SetInvocationContext(&serverless.InvocationContext{Namespace: "test-ns"})
|
||||
|
||||
raw, err := h.TurnCredentials(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("TurnCredentials: %v", err)
|
||||
}
|
||||
var env turnCredentialsEnvelope
|
||||
if err := json.Unmarshal(raw, &env); err != nil {
|
||||
t.Fatalf("unmarshal envelope: %v", err)
|
||||
}
|
||||
|
||||
if !env.Configured {
|
||||
t.Error("Configured = false; want true when turnSecret is set")
|
||||
}
|
||||
if env.Namespace != "test-ns" {
|
||||
t.Errorf("Namespace = %q; want test-ns (namespace must be derived from invCtx, not caller-controlled)",
|
||||
env.Namespace)
|
||||
}
|
||||
if env.Username == "" {
|
||||
t.Error("Username empty; want HMAC-derived value")
|
||||
}
|
||||
if env.Password == "" {
|
||||
t.Error("Password empty; want HMAC-derived value")
|
||||
}
|
||||
if env.TTL != int(turnCredentialTTL.Seconds()) {
|
||||
t.Errorf("TTL = %d; want %d (matches HTTP endpoint)", env.TTL, int(turnCredentialTTL.Seconds()))
|
||||
}
|
||||
// Username MUST contain the namespace per pkg/turn HMAC contract —
|
||||
// this is what the TURN server uses to scope the credential.
|
||||
if !strings.Contains(env.Username, "test-ns") {
|
||||
t.Errorf("Username %q must contain the namespace for TURN server-side scope check", env.Username)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTurnCredentials_returnsURIsForDomain(t *testing.T) {
|
||||
// Verify URI assembly mirrors the HTTP endpoint exactly — same three
|
||||
// URIs (udp + tcp + tls5349) when only turnDomain is set.
|
||||
h := &HostFunctions{
|
||||
turnDomain: "turn.example.com",
|
||||
turnSecret: "secret",
|
||||
}
|
||||
h.SetInvocationContext(&serverless.InvocationContext{Namespace: "ns"})
|
||||
|
||||
raw, _ := h.TurnCredentials(context.Background())
|
||||
var env turnCredentialsEnvelope
|
||||
_ = json.Unmarshal(raw, &env)
|
||||
|
||||
if len(env.URIs) != 3 {
|
||||
t.Fatalf("URIs count = %d; want 3 (udp+tcp+tls5349)", len(env.URIs))
|
||||
}
|
||||
want := []string{
|
||||
"turn:turn.example.com:3478?transport=udp",
|
||||
"turn:turn.example.com:3478?transport=tcp",
|
||||
"turns:turn.example.com:5349",
|
||||
}
|
||||
for i, w := range want {
|
||||
if env.URIs[i] != w {
|
||||
t.Errorf("URIs[%d] = %q; want %q", i, env.URIs[i], w)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTurnCredentials_stealthCDNAppendsTurns443(t *testing.T) {
|
||||
// Stealth: turns:<stealthCDNDomain>:443 must be APPENDED to the
|
||||
// regular URI list. Used in restricted regions where regular TURN
|
||||
// ports are blocked; the SNI router serves it as ordinary HTTPS on
|
||||
// :443 so DPI can't distinguish it. Critical for AnChat's restricted-
|
||||
// region UX (bugboard #411 + stealth TURN plan 4).
|
||||
h := &HostFunctions{
|
||||
turnDomain: "turn.example.com",
|
||||
turnSecret: "secret",
|
||||
stealthCDNDomain: "cdn.example.com",
|
||||
}
|
||||
h.SetInvocationContext(&serverless.InvocationContext{Namespace: "ns"})
|
||||
|
||||
raw, _ := h.TurnCredentials(context.Background())
|
||||
var env turnCredentialsEnvelope
|
||||
_ = json.Unmarshal(raw, &env)
|
||||
|
||||
if len(env.URIs) != 4 {
|
||||
t.Fatalf("URIs count = %d; want 4 (3 regular + 1 stealth)", len(env.URIs))
|
||||
}
|
||||
stealth := env.URIs[3]
|
||||
want := "turns:cdn.example.com:443"
|
||||
if stealth != want {
|
||||
t.Errorf("stealth URI = %q; want %q", stealth, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTurnCredentials_notConfiguredWhenSecretEmpty(t *testing.T) {
|
||||
// Back-compat / portability: when TURN isn't configured on this
|
||||
// gateway, return a structured {configured:false} envelope — NOT a
|
||||
// Go error. Same shape contract as PushSend's silent-noop when push
|
||||
// isn't configured. Lets the same WASM function run unchanged on
|
||||
// dev environments without TURN.
|
||||
h := &HostFunctions{
|
||||
turnSecret: "",
|
||||
}
|
||||
h.SetInvocationContext(&serverless.InvocationContext{Namespace: "ns"})
|
||||
|
||||
raw, err := h.TurnCredentials(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("TurnCredentials must NOT return Go error when TURN unconfigured; got %v", err)
|
||||
}
|
||||
var env turnCredentialsEnvelope
|
||||
_ = json.Unmarshal(raw, &env)
|
||||
if env.Configured {
|
||||
t.Error("Configured = true; want false when turnSecret is empty (caller relies on this to fall back to STUN-only)")
|
||||
}
|
||||
if env.Namespace != "ns" {
|
||||
t.Errorf("Namespace = %q; want ns (still populated for logging context)", env.Namespace)
|
||||
}
|
||||
if env.Username != "" || env.Password != "" {
|
||||
t.Error("Username/Password must be empty when not configured (no credentials to leak)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTurnCredentials_errorsWhenNoNamespaceInContext(t *testing.T) {
|
||||
// Defensive: serverless invocation should always have a namespace.
|
||||
// If not, return a Go error rather than producing TURN credentials
|
||||
// for an empty namespace (which would be a security bug — TURN
|
||||
// HMAC username is the namespace + ts, so "" would shadow any
|
||||
// real-namespace creds at the TURN server's auth check).
|
||||
h := &HostFunctions{turnSecret: "secret"}
|
||||
// no SetInvocationContext
|
||||
|
||||
_, err := h.TurnCredentials(context.Background())
|
||||
if err == nil {
|
||||
t.Fatal("no invocation context: must return error (avoid empty-namespace credentials)")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "namespace") {
|
||||
t.Errorf("error %q should mention namespace for caller diagnostics", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func TestTurnCredentials_credentialsAreNamespaceScoped(t *testing.T) {
|
||||
// Two different namespaces issued through the SAME host fn instance
|
||||
// MUST get distinct credentials. Catches a regression where the
|
||||
// namespace gets cached at host-fn construction instead of read
|
||||
// per-invocation from invCtx.
|
||||
h := &HostFunctions{
|
||||
turnDomain: "turn.example.com",
|
||||
turnSecret: "shared-secret",
|
||||
}
|
||||
|
||||
h.SetInvocationContext(&serverless.InvocationContext{Namespace: "ns-a"})
|
||||
rawA, _ := h.TurnCredentials(context.Background())
|
||||
var envA turnCredentialsEnvelope
|
||||
_ = json.Unmarshal(rawA, &envA)
|
||||
|
||||
h.SetInvocationContext(&serverless.InvocationContext{Namespace: "ns-b"})
|
||||
rawB, _ := h.TurnCredentials(context.Background())
|
||||
var envB turnCredentialsEnvelope
|
||||
_ = json.Unmarshal(rawB, &envB)
|
||||
|
||||
if envA.Username == envB.Username {
|
||||
t.Error("ns-a and ns-b got identical username — namespace not flowing per-invocation")
|
||||
}
|
||||
if envA.Password == envB.Password {
|
||||
t.Error("ns-a and ns-b got identical password — credentials not namespace-scoped (security bug)")
|
||||
}
|
||||
}
|
||||
|
||||
// buildTURNURIs unit tests — the pure helper used by both this host fn
|
||||
// and the HTTP endpoint. Cheap regression coverage.
|
||||
|
||||
func TestBuildTURNURIs_emptyDomainNoURIs(t *testing.T) {
|
||||
if got := buildTURNURIs("", ""); len(got) != 0 {
|
||||
t.Errorf("empty domain + empty stealth: want 0 URIs, got %d (%v)", len(got), got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildTURNURIs_stealthOnlyOmitsRegularURIs(t *testing.T) {
|
||||
// Edge: operator configured stealth but not regular TURN. Returns
|
||||
// ONLY the stealth URI — caller falls back to STUN if they can't
|
||||
// reach it. Don't pretend the regular TURN exists.
|
||||
got := buildTURNURIs("", "cdn.example.com")
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("want 1 stealth-only URI, got %d (%v)", len(got), got)
|
||||
}
|
||||
if got[0] != "turns:cdn.example.com:443" {
|
||||
t.Errorf("stealth URI mismatch: %q", got[0])
|
||||
}
|
||||
}
|
||||
@ -20,6 +20,15 @@ import (
|
||||
type HostFunctionsConfig struct {
|
||||
IPFSAPIURL string
|
||||
HTTPTimeout time.Duration
|
||||
|
||||
// TURN configuration — feat-9. Plumbed in from the gateway so the
|
||||
// `turn_credentials` host fn can mint per-namespace TURN credentials
|
||||
// without a round-trip back through HTTP. Mirrors the HTTP endpoint
|
||||
// at /v1/webrtc/turn/credentials. TURNSecret empty → host fn returns
|
||||
// a structured "TURN not configured" envelope (no error).
|
||||
TURNDomain string
|
||||
TURNSecret string
|
||||
StealthCDNDomain string // optional; non-empty adds turns:<domain>:443 URI
|
||||
}
|
||||
|
||||
// HostFunctions provides the bridge between WASM functions and Orama services.
|
||||
@ -54,6 +63,15 @@ type HostFunctions struct {
|
||||
invoker serverless.FunctionInvoker
|
||||
invokerLock sync.RWMutex
|
||||
|
||||
// TURN config — feat-9. Cached at NewHostFunctions; immutable for
|
||||
// the gateway's lifetime so no lock needed. Empty TURNSecret means
|
||||
// `turn_credentials` host fn returns a configured=false envelope
|
||||
// instead of an error (same shape as PushSend's silent-noop when
|
||||
// push isn't configured — keeps functions portable).
|
||||
turnDomain string
|
||||
turnSecret string
|
||||
stealthCDNDomain string
|
||||
|
||||
// triggerDispatcher is set after construction (via SetTriggerDispatcher).
|
||||
// When non-nil, PubSubPublish / PubSubPublishBatch synchronously fire
|
||||
// wildcard triggers on the local gateway so functions like
|
||||
|
||||
@ -225,6 +225,12 @@ func (m *MockHostServices) PushSendV2(ctx context.Context, userID string, msgJSO
|
||||
return []byte(`{"ok":true,"devices_attempted":0,"devices_succeeded":0,"results":[]}`), nil
|
||||
}
|
||||
|
||||
func (m *MockHostServices) TurnCredentials(ctx context.Context) ([]byte, error) {
|
||||
// Mirror PushSendV2's silent-noop-style envelope when not configured —
|
||||
// matches the documented host-fn contract for TURN being absent.
|
||||
return []byte(`{"configured":false}`), nil
|
||||
}
|
||||
|
||||
func (m *MockHostServices) DBTransaction(ctx context.Context, opsJSON []byte) ([]byte, error) {
|
||||
return []byte(`{"committed":true,"results":[]}`), nil
|
||||
}
|
||||
|
||||
@ -503,6 +503,30 @@ type HostServices interface {
|
||||
// envelope. Same shape as DBTransaction's "structured per-op result".
|
||||
PushSendV2(ctx context.Context, userID string, msgJSON []byte) ([]byte, error)
|
||||
|
||||
// TurnCredentials mints per-namespace TURN HMAC credentials for the
|
||||
// caller's namespace (derived from invocation context — caller
|
||||
// cannot spoof). Returns a JSON envelope matching the HTTP endpoint
|
||||
// at /v1/webrtc/turn/credentials:
|
||||
//
|
||||
// {
|
||||
// "configured": true,
|
||||
// "username": "<unix-ts>:<namespace>",
|
||||
// "password": "<hmac>",
|
||||
// "ttl": 600,
|
||||
// "uris": ["turn:...", "turns:...:443"],
|
||||
// "namespace": "<ns>"
|
||||
// }
|
||||
//
|
||||
// When TURN isn't configured on this gateway (TURNSecret empty),
|
||||
// returns {configured:false, namespace:<ns>} as a structured envelope
|
||||
// — NOT a Go error. This matches PushSend's silent-noop semantics so
|
||||
// functions stay portable across deployments with/without TURN.
|
||||
//
|
||||
// Bugboard feat-9 — removes the round-trip through HTTP for WASM
|
||||
// functions that need to inject TURN credentials into a peer's
|
||||
// RTCConfiguration without going back out to the gateway.
|
||||
TurnCredentials(ctx context.Context) ([]byte, error)
|
||||
|
||||
// ExecAndPublish runs ops atomically (like DBTransaction) and, ONLY
|
||||
// if the batch commits, publishes data to the named topic with any
|
||||
// occurrence of the literal string "{{seq}}" replaced by the assigned
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user