mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-06-16 23:34:12 +00:00
v0.122.42 (f412425, secrets encryption) shipped the template emission, the per-cluster secret generator, and the gateway.Config consumer — but NOT the parse field on config.HTTPGatewayConfig. Phase 4 writes `secrets_encryption_key` into node.yaml under the http_gateway section, and pkg/config/yaml.go decodes with KnownFields(true) (strict). The unknown field made every node.yaml parse fail, so orama-node exited 1 on every start and systemd crash-looped it (restart counter hit 380+ on the first upgraded devnet node before the rolling controller halted). Root cause: a generated-config field with no matching struct field under strict unmarshal. Fix is the missing field. The runtime key itself is still consumed from ~/.orama/secrets/secrets-encryption-key (pkg/node/ gateway.go), which already worked — so this one-field addition fully restores boot AND the feature. The standalone gateway (cmd/gateway/config.go) uses lenient parsing and was unaffected. Regression test in pkg/config/decode_test.go decodes a node.yaml carrying secrets_encryption_key under strict mode.
1524 lines
57 KiB
Go
1524 lines
57 KiB
Go
package serverless
|
|
|
|
import (
|
|
"context"
|
|
cryptorand "crypto/rand"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/tetratelabs/wazero"
|
|
"github.com/tetratelabs/wazero/api"
|
|
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
|
|
"github.com/tetratelabs/wazero/sys"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/DeBrosOfficial/network/pkg/serverless/cache"
|
|
"github.com/DeBrosOfficial/network/pkg/serverless/execution"
|
|
)
|
|
|
|
// persistentFriendlyProcExit is our override of WASI's `proc_exit`.
|
|
//
|
|
// Standard wazero proc_exit:
|
|
//
|
|
// mod.CloseWithExitCode(ctx, exitCode) // ← invalidates the module
|
|
// panic(sys.NewExitError(exitCode))
|
|
//
|
|
// This breaks TinyGo command-mode (target=wasi) functions that we want
|
|
// to keep alive past `_start` for a persistent-instance lifecycle —
|
|
// `_start` ends with `proc_exit(0)`, which kills the module and makes
|
|
// the function's other exports (ws_open, ws_frame, ws_close,
|
|
// orama_alloc) uncallable.
|
|
//
|
|
// Override semantics:
|
|
//
|
|
// - exitCode == 0: panic with ExitError(0) but DO NOT close the
|
|
// module. This is TinyGo's "_start completed cleanly" signal; we
|
|
// want the module to stay live so the persistent instance can
|
|
// receive ws_open / ws_frame frames.
|
|
// - exitCode != 0: preserve standard WASI behavior — close + panic.
|
|
// A non-zero exit is a genuine application-signaled failure; we
|
|
// want it to behave exactly as upstream WASI does.
|
|
//
|
|
// The panic is mandatory in both cases — wasm code following proc_exit
|
|
// is conventionally `unreachable` (LLVM emits this after exit calls),
|
|
// and not panicking would let it execute. The CALLER (our
|
|
// `InstantiatePersistent`) catches the ExitError and special-cases
|
|
// code 0 as success.
|
|
//
|
|
// Affects ALL functions (stateless + persistent) on this runtime, but
|
|
// safe for stateless because the stateless path closes its own module
|
|
// after each invocation regardless.
|
|
func persistentFriendlyProcExit(ctx context.Context, mod api.Module, exitCode uint32) {
|
|
if exitCode != 0 {
|
|
_ = mod.CloseWithExitCode(ctx, exitCode)
|
|
}
|
|
panic(sys.NewExitError(exitCode))
|
|
}
|
|
|
|
// contextAwareHostServices is an internal interface for services that need to know about
|
|
// the current invocation context.
|
|
type contextAwareHostServices interface {
|
|
SetInvocationContext(invCtx *InvocationContext)
|
|
ClearContext()
|
|
}
|
|
|
|
// Ensure Engine implements FunctionExecutor interface.
|
|
var _ FunctionExecutor = (*Engine)(nil)
|
|
|
|
// Engine is the core WASM execution engine using wazero.
|
|
// It manages compiled module caching and function execution.
|
|
type Engine struct {
|
|
runtime wazero.Runtime
|
|
config *Config
|
|
registry FunctionRegistry
|
|
hostServices HostServices
|
|
logger *zap.Logger
|
|
|
|
// Module cache
|
|
moduleCache *cache.ModuleCache
|
|
|
|
// Execution components
|
|
executor *execution.Executor
|
|
lifecycle *execution.ModuleLifecycle
|
|
|
|
// Invocation logger for metrics/debugging
|
|
invocationLogger InvocationLogger
|
|
|
|
// Rate limiter
|
|
rateLimiter RateLimiter
|
|
}
|
|
|
|
// InvocationLogger logs function invocations (optional).
|
|
type InvocationLogger interface {
|
|
Log(ctx context.Context, inv *InvocationRecord) error
|
|
}
|
|
|
|
// InvocationRecord represents a logged invocation.
|
|
type InvocationRecord struct {
|
|
ID string `json:"id"`
|
|
FunctionID string `json:"function_id"`
|
|
RequestID string `json:"request_id"`
|
|
TriggerType TriggerType `json:"trigger_type"`
|
|
CallerWallet string `json:"caller_wallet,omitempty"`
|
|
InputSize int `json:"input_size"`
|
|
OutputSize int `json:"output_size"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
CompletedAt time.Time `json:"completed_at"`
|
|
DurationMS int64 `json:"duration_ms"`
|
|
Status InvocationStatus `json:"status"`
|
|
ErrorMessage string `json:"error_message,omitempty"`
|
|
MemoryUsedMB float64 `json:"memory_used_mb"`
|
|
Logs []LogEntry `json:"logs,omitempty"`
|
|
}
|
|
|
|
// RateLimiter is the legacy single-bucket rate-limit interface, kept for
|
|
// backward compatibility with TokenBucketLimiter. New limiters should
|
|
// implement TieredRateLimiter as well — the engine prefers the richer path
|
|
// when available via type assertion.
|
|
type RateLimiter interface {
|
|
Allow(ctx context.Context, key string) (bool, error)
|
|
}
|
|
|
|
// TieredRateLimiter is the rich interface that lets the engine pass
|
|
// per-(namespace, function, wallet, ip) context for layered enforcement.
|
|
// MultiTierLimiter implements both this and the legacy RateLimiter.
|
|
type TieredRateLimiter interface {
|
|
AllowRequest(ctx context.Context, req RateLimitRequest) (Decision, error)
|
|
}
|
|
|
|
// EngineOption configures the Engine.
|
|
type EngineOption func(*Engine)
|
|
|
|
// WithInvocationLogger sets the invocation logger.
|
|
func WithInvocationLogger(logger InvocationLogger) EngineOption {
|
|
return func(e *Engine) {
|
|
e.invocationLogger = logger
|
|
}
|
|
}
|
|
|
|
// WithRateLimiter sets the rate limiter.
|
|
func WithRateLimiter(limiter RateLimiter) EngineOption {
|
|
return func(e *Engine) {
|
|
e.rateLimiter = limiter
|
|
}
|
|
}
|
|
|
|
// NewEngine creates a new WASM execution engine.
|
|
func NewEngine(cfg *Config, registry FunctionRegistry, hostServices HostServices, logger *zap.Logger, opts ...EngineOption) (*Engine, error) {
|
|
if cfg == nil {
|
|
cfg = DefaultConfig()
|
|
}
|
|
cfg.ApplyDefaults()
|
|
|
|
// Create wazero runtime with compilation cache
|
|
runtimeConfig := wazero.NewRuntimeConfig().
|
|
WithCloseOnContextDone(true)
|
|
|
|
runtime := wazero.NewRuntimeWithConfig(context.Background(), runtimeConfig)
|
|
|
|
// Instantiate WASI with a CUSTOM `proc_exit` that does NOT close the
|
|
// module on exit code 0 (#240/#249 follow-up #5).
|
|
//
|
|
// Background: TinyGo command-mode `_start` (target=wasi) runs the
|
|
// runtime init, calls `main()`, then calls `proc_exit(0)`. Wazero's
|
|
// stock proc_exit then calls `mod.CloseWithExitCode(0)` which
|
|
// invalidates the module — subsequent calls to `ws_open`, `ws_frame`,
|
|
// etc. return `ExitError(0)`. That breaks every TinyGo
|
|
// command-mode persistent function (anchat's rpc-router being the
|
|
// canary).
|
|
//
|
|
// Fix: override proc_exit. For exit code 0 (the "clean termination"
|
|
// case TinyGo emits at the end of `_start`), we panic with
|
|
// ExitError(0) but DO NOT close the module — letting the caller of
|
|
// `_start` see the ExitError as a "_start completed" signal while
|
|
// the module's exports stay live for ws_open/frame/close.
|
|
//
|
|
// For non-zero exit codes (genuine application-signaled errors), we
|
|
// preserve standard WASI behavior: close the module AND panic. This
|
|
// keeps `proc_exit(N != 0)` semantics intact.
|
|
//
|
|
// Override pattern documented in wazero v1.11+ at
|
|
// imports/wasi_snapshot_preview1/wasi.go:111-127:
|
|
//
|
|
// wasiBuilder := r.NewHostModuleBuilder(ModuleName)
|
|
// wasi_snapshot_preview1.NewFunctionExporter().ExportFunctions(wasiBuilder)
|
|
// // Subsequent calls to NewFunctionBuilder override built-in exports.
|
|
// wasiBuilder.NewFunctionBuilder().WithFunc(...).Export("proc_exit")
|
|
//
|
|
// This is the *only* way to bypass the close-on-exit behavior in
|
|
// wazero — there's no per-instance flag and no global toggle.
|
|
wasiBuilder := runtime.NewHostModuleBuilder(wasi_snapshot_preview1.ModuleName)
|
|
wasi_snapshot_preview1.NewFunctionExporter().ExportFunctions(wasiBuilder)
|
|
wasiBuilder.NewFunctionBuilder().
|
|
WithFunc(persistentFriendlyProcExit).
|
|
Export("proc_exit")
|
|
if _, err := wasiBuilder.Instantiate(context.Background()); err != nil {
|
|
panic("serverless: failed to instantiate WASI with custom proc_exit: " + err.Error())
|
|
}
|
|
|
|
engine := &Engine{
|
|
runtime: runtime,
|
|
config: cfg,
|
|
registry: registry,
|
|
hostServices: hostServices,
|
|
logger: logger,
|
|
moduleCache: cache.NewModuleCache(cfg.ModuleCacheSize, logger),
|
|
executor: execution.NewExecutor(runtime, logger, cfg.MaxConcurrentExecutions),
|
|
lifecycle: execution.NewModuleLifecycle(runtime, logger),
|
|
}
|
|
|
|
// Apply options
|
|
for _, opt := range opts {
|
|
opt(engine)
|
|
}
|
|
|
|
// Register host functions
|
|
if err := engine.registerHostModule(context.Background()); err != nil {
|
|
return nil, fmt.Errorf("failed to register host module: %w", err)
|
|
}
|
|
|
|
return engine, nil
|
|
}
|
|
|
|
// slowInvokeThreshold returns the wall-clock duration above which Execute
|
|
// emits a structured "slow invocation" warning with per-phase breakdown.
|
|
// Sourced from config (SlowInvokeThresholdMs) so a cluster under
|
|
// investigation can lower it to surface the sub-second cold-start floor that
|
|
// the 5s default hides (bugboard #27). Defaults to 5s when unset.
|
|
func (e *Engine) slowInvokeThreshold() time.Duration {
|
|
if e.config != nil && e.config.SlowInvokeThresholdMs > 0 {
|
|
return time.Duration(e.config.SlowInvokeThresholdMs) * time.Millisecond
|
|
}
|
|
return defaultSlowInvokeThresholdMs * time.Millisecond
|
|
}
|
|
|
|
// Execute runs a function with the given input and returns the output.
|
|
//
|
|
// Emits per-phase timing telemetry when total duration exceeds
|
|
// slowInvokeThreshold — bugboard #24 diagnostic. Without this, slow
|
|
// invocations only surfaced as opaque "RPC timeout after 30s" at the
|
|
// WS handler, with no way to tell whether the sink was rate-limit
|
|
// checks, module compile, or WASM execution itself.
|
|
func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx *InvocationContext) ([]byte, error) {
|
|
if fn == nil {
|
|
return nil, &ValidationError{Field: "function", Message: "cannot be nil"}
|
|
}
|
|
|
|
invCtx = EnsureInvocationContext(invCtx, fn)
|
|
startTime := time.Now()
|
|
// Per-phase timestamps for the slow-invoke log (bugboard #24
|
|
// diagnostic). Zero values mean the phase was never entered, which
|
|
// itself is signal (e.g. ratelimitMs=0 with totalMs=30000 means we
|
|
// blocked entirely in module-load or execution).
|
|
var (
|
|
ratelimitDoneAt time.Time
|
|
moduleLoadedAt time.Time
|
|
executeDoneAt time.Time
|
|
)
|
|
|
|
// Check rate limit. Prefer the tiered path when the limiter supports it
|
|
// — that gives per-(ns, fn, wallet, ip) enforcement with retry-after.
|
|
// Fall back to the legacy single-bucket interface otherwise.
|
|
if e.rateLimiter != nil {
|
|
if tl, ok := e.rateLimiter.(TieredRateLimiter); ok {
|
|
req := RateLimitRequest{
|
|
Namespace: invCtx.Namespace,
|
|
Function: invCtx.FunctionName,
|
|
Wallet: invCtx.CallerWallet,
|
|
IP: invCtx.CallerIP,
|
|
}
|
|
d, err := tl.AllowRequest(ctx, req)
|
|
if err != nil {
|
|
e.logger.Warn("Rate limiter error", zap.Error(err))
|
|
} else if !d.Allowed {
|
|
return nil, &RateLimitedError{Scope: d.Scope, RetryAfter: d.RetryAfter}
|
|
}
|
|
} else {
|
|
allowed, err := e.rateLimiter.Allow(ctx, "global")
|
|
if err != nil {
|
|
e.logger.Warn("Rate limiter error", zap.Error(err))
|
|
} else if !allowed {
|
|
return nil, ErrRateLimited
|
|
}
|
|
}
|
|
}
|
|
|
|
ratelimitDoneAt = time.Now()
|
|
|
|
// Create timeout context
|
|
execCtx, cancel := CreateTimeoutContext(ctx, fn, e.config.MaxTimeoutSeconds)
|
|
defer cancel()
|
|
|
|
// Attach a fresh per-invocation LogBuffer to the ctx that wazero
|
|
// passes through to host-fn callbacks. host.LogInfo / host.LogError
|
|
// extract this buffer and append to it instead of writing to the
|
|
// HostFunctions singleton slice — which would cross-contaminate
|
|
// concurrent invocations (bugboard #108: push-fanout's invocation
|
|
// record was capturing rpc-router and message-push-handler log
|
|
// lines because every WASM call shared one h.logs slice).
|
|
logBuf := NewLogBuffer()
|
|
execCtx = WithLogBuffer(execCtx, logBuf)
|
|
|
|
// Attach this invocation's InvocationContext to execCtx so host
|
|
// functions resolve identity/namespace from ctx instead of the
|
|
// process-wide HostFunctions singleton. Closes the stateless race
|
|
// that bugboard #348 surfaced via AnChat's message-push-handler:
|
|
// two concurrent pubsub-triggered invocations would overwrite each
|
|
// other's singleton invCtx, and the loser's push_send_v2 call would
|
|
// read either a cross-tenant namespace (silent identity leak) or a
|
|
// nil singleton ("no namespace in invocation context" error — the
|
|
// observable empty-envelope symptom AnChat reported).
|
|
//
|
|
// The singleton SetInvocationContext/ClearContext block below
|
|
// stays as defense-in-depth — host fns prefer ctx via
|
|
// currentInvocationContext (hostfunctions/invocation_context.go),
|
|
// so this is the live source; the singleton path serves any future
|
|
// caller that hasn't been migrated yet.
|
|
execCtx = WithInvocationContext(execCtx, invCtx)
|
|
|
|
// Fresh per-invocation pubsub publish counter so the pubsub host
|
|
// functions can cap how many messages one invocation floods onto the
|
|
// shared gossipsub router (no WASM fuel metering exists; the rate limiter
|
|
// gates invocation frequency, not per-invocation host-call volume).
|
|
execCtx = WithPublishCounter(execCtx)
|
|
|
|
// Raw-HTTP-response mode (bugboard #835). Only RawHTTPResponse functions
|
|
// get a collector attached — set_http_response is a validated no-op for
|
|
// every other function (no collector → host call returns an error). The
|
|
// collector rides execCtx so concurrent invocations never cross-write,
|
|
// matching the publish-counter / log-buffer per-call model.
|
|
if fn.RawHTTPResponse {
|
|
execCtx = WithRawHTTPCollector(execCtx)
|
|
}
|
|
|
|
// Get compiled module (from cache or compile)
|
|
module, err := e.getOrCompileModule(execCtx, fn.WASMCID)
|
|
if err != nil {
|
|
e.logInvocation(ctx, fn, invCtx, logBuf, startTime, 0, InvocationStatusError, err)
|
|
e.logSlowInvocation(invCtx, startTime, ratelimitDoneAt, moduleLoadedAt, executeDoneAt, 0, "module-load-failed", err)
|
|
return nil, &ExecutionError{FunctionName: fn.Name, RequestID: invCtx.RequestID, Cause: err}
|
|
}
|
|
moduleLoadedAt = time.Now()
|
|
|
|
// Execute the module with context setters
|
|
var contextSetter, contextClearer func()
|
|
if hf, ok := e.hostServices.(contextAwareHostServices); ok {
|
|
contextSetter = func() { hf.SetInvocationContext(invCtx) }
|
|
contextClearer = func() { hf.ClearContext() }
|
|
}
|
|
// Attach a collector so ExecuteModule reports how long instantiate (TinyGo
|
|
// _start cold-start) took, letting the slow-invoke diagnostic split the
|
|
// execute phase into cold-start vs handler work (bugboard #27).
|
|
execCtx, instTiming := execution.WithInstantiateTiming(execCtx)
|
|
output, err := e.executor.ExecuteModule(execCtx, module, fn.Name, input, contextSetter, contextClearer)
|
|
executeDoneAt = time.Now()
|
|
if err != nil {
|
|
status := InvocationStatusError
|
|
if execCtx.Err() == context.DeadlineExceeded {
|
|
status = InvocationStatusTimeout
|
|
err = ErrTimeout
|
|
}
|
|
e.logInvocation(ctx, fn, invCtx, logBuf, startTime, len(output), status, err)
|
|
e.logSlowInvocation(invCtx, startTime, ratelimitDoneAt, moduleLoadedAt, executeDoneAt, instTiming.InstantiateNs, string(status), err)
|
|
return nil, &ExecutionError{FunctionName: fn.Name, RequestID: invCtx.RequestID, Cause: err}
|
|
}
|
|
|
|
// Surface any verbatim HTTP response the function set (bugboard #835)
|
|
// onto invCtx so the Invoker → HTTP handler can replay it. Only
|
|
// RawHTTPResponse functions have a collector attached; TakeRawHTTPResponse
|
|
// returns (_, false) otherwise.
|
|
if res, ok := TakeRawHTTPResponse(execCtx); ok {
|
|
invCtx.RawHTTP = &res
|
|
}
|
|
|
|
e.logInvocation(ctx, fn, invCtx, logBuf, startTime, len(output), InvocationStatusSuccess, nil)
|
|
e.logSlowInvocation(invCtx, startTime, ratelimitDoneAt, moduleLoadedAt, executeDoneAt, instTiming.InstantiateNs, "success", nil)
|
|
return output, nil
|
|
}
|
|
|
|
// logSlowInvocation emits a structured warning when total wall-clock
|
|
// exceeds slowInvokeThreshold (bugboard #24 diagnostic). Per-phase
|
|
// timestamps let operators see WHICH layer was the sink — pre-fix the
|
|
// only signal was an opaque WS-handler "timeout after 30s" with no way
|
|
// to tell whether rate-limit, module-load, or WASM-execute consumed
|
|
// the budget.
|
|
//
|
|
// Zero-valued phase timestamps mean the phase was never reached, which
|
|
// is itself signal — e.g. moduleLoadedAt=zero + executeDoneAt=zero with
|
|
// large totalMs means we blocked in rate-limit OR module-load.
|
|
func (e *Engine) logSlowInvocation(invCtx *InvocationContext, startTime, ratelimitDoneAt, moduleLoadedAt, executeDoneAt time.Time, instantiateNs int64, status string, err error) {
|
|
totalMs := time.Since(startTime).Milliseconds()
|
|
if totalMs < e.slowInvokeThreshold().Milliseconds() {
|
|
return
|
|
}
|
|
// Compute phase deltas. Use 0 for unreached phases so the log line
|
|
// columns are stable.
|
|
var ratelimitMs, moduleLoadMs, executeMs int64
|
|
if !ratelimitDoneAt.IsZero() {
|
|
ratelimitMs = ratelimitDoneAt.Sub(startTime).Milliseconds()
|
|
}
|
|
if !moduleLoadedAt.IsZero() && !ratelimitDoneAt.IsZero() {
|
|
moduleLoadMs = moduleLoadedAt.Sub(ratelimitDoneAt).Milliseconds()
|
|
}
|
|
if !executeDoneAt.IsZero() && !moduleLoadedAt.IsZero() {
|
|
executeMs = executeDoneAt.Sub(moduleLoadedAt).Milliseconds()
|
|
}
|
|
// Split execute into instantiate (TinyGo _start cold-start) vs run
|
|
// (handler logic). A count=0 read with instantiate_ms ≈ execute_ms and
|
|
// run_ms ≈ 0 is the bugboard #27 cold-start floor — the per-call fresh
|
|
// instantiation, not the handler, is the sink.
|
|
instantiateMs := instantiateNs / int64(time.Millisecond)
|
|
runMs := executeMs - instantiateMs
|
|
if runMs < 0 {
|
|
runMs = 0
|
|
}
|
|
fields := []zap.Field{
|
|
zap.String("namespace", invCtx.Namespace),
|
|
zap.String("function", invCtx.FunctionName),
|
|
zap.String("request_id", invCtx.RequestID),
|
|
zap.String("trigger_type", string(invCtx.TriggerType)),
|
|
zap.String("ws_client_id", invCtx.WSClientID),
|
|
zap.Int64("total_ms", totalMs),
|
|
zap.Int64("ratelimit_ms", ratelimitMs),
|
|
zap.Int64("module_load_ms", moduleLoadMs),
|
|
zap.Int64("execute_ms", executeMs),
|
|
zap.Int64("instantiate_ms", instantiateMs),
|
|
zap.Int64("run_ms", runMs),
|
|
zap.String("invocation_status", status),
|
|
}
|
|
if err != nil {
|
|
fields = append(fields, zap.Error(err))
|
|
}
|
|
e.logger.Warn("slow serverless invocation (bug-24 diagnostic)", fields...)
|
|
}
|
|
|
|
// Precompile compiles a WASM module and caches it for faster execution.
|
|
func (e *Engine) Precompile(ctx context.Context, wasmCID string, wasmBytes []byte) error {
|
|
if wasmCID == "" {
|
|
return &ValidationError{Field: "wasmCID", Message: "cannot be empty"}
|
|
}
|
|
if len(wasmBytes) == 0 {
|
|
return &ValidationError{Field: "wasmBytes", Message: "cannot be empty"}
|
|
}
|
|
|
|
// Check if already cached
|
|
if e.moduleCache.Has(wasmCID) {
|
|
return nil
|
|
}
|
|
|
|
// Compile the module
|
|
compiled, err := e.lifecycle.CompileModule(ctx, wasmCID, wasmBytes)
|
|
if err != nil {
|
|
return &DeployError{FunctionName: wasmCID, Cause: err}
|
|
}
|
|
|
|
// Enforce memory limits
|
|
if err := e.checkMemoryLimits(compiled); err != nil {
|
|
compiled.Close(ctx)
|
|
return &DeployError{FunctionName: wasmCID, Cause: err}
|
|
}
|
|
|
|
// Cache the compiled module
|
|
e.moduleCache.Set(wasmCID, compiled)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Invalidate removes a compiled module from the cache.
|
|
func (e *Engine) Invalidate(wasmCID string) {
|
|
e.moduleCache.Delete(context.Background(), wasmCID)
|
|
}
|
|
|
|
// Close shuts down the engine and releases resources.
|
|
func (e *Engine) Close(ctx context.Context) error {
|
|
// Close all cached modules
|
|
e.moduleCache.Clear(ctx)
|
|
|
|
// Close the runtime
|
|
return e.runtime.Close(ctx)
|
|
}
|
|
|
|
// GetCacheStats returns cache statistics.
|
|
func (e *Engine) GetCacheStats() (size int, capacity int) {
|
|
return e.moduleCache.GetStats()
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// Private methods
|
|
// -----------------------------------------------------------------------------
|
|
|
|
// checkMemoryLimits validates that a compiled module's memory declarations
|
|
// don't exceed the configured maximum. Each WASM memory page is 64KB.
|
|
func (e *Engine) checkMemoryLimits(compiled wazero.CompiledModule) error {
|
|
maxPages := uint32(e.config.MaxMemoryLimitMB * 16) // 1 MB = 16 pages (64KB each)
|
|
for _, mem := range compiled.ExportedMemories() {
|
|
if max, hasMax := mem.Max(); hasMax && max > maxPages {
|
|
return fmt.Errorf("module declares %d MB max memory, exceeds limit of %d MB",
|
|
max/16, e.config.MaxMemoryLimitMB)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// getOrCompileModule retrieves a compiled module from cache or compiles it.
|
|
// InstantiatePersistent creates a long-lived module instance for a
|
|
// persistent WebSocket function. Unlike the per-frame stateless model,
|
|
// this instance:
|
|
//
|
|
// - is NOT closed after a single call
|
|
// - has its WASI _start hook DISABLED (the function's main() must be
|
|
// empty; the lifecycle exports ws_open/ws_frame/ws_close are called
|
|
// explicitly by the caller)
|
|
// - retains memory across frames
|
|
//
|
|
// Caller is responsible for calling Close() on the returned api.Module
|
|
// (typically wrapped in persistent.Instance which handles this).
|
|
func (e *Engine) InstantiatePersistent(ctx context.Context, fn *Function, invCtx *InvocationContext) (api.Module, error) {
|
|
compiled, err := e.getOrCompileModule(ctx, fn.WASMCID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("InstantiatePersistent: compile: %w", err)
|
|
}
|
|
|
|
// Persistent WS uses per-call invCtx propagation through ctx —
|
|
// see pkg/serverless/invocation_context.go for the cross-tenant
|
|
// race rationale. The persistent.Instance wrapper attaches invCtx
|
|
// to every WASM-host call's ctx via WithInvocationContext, so we
|
|
// do NOT touch the HostFunctions singleton here. Two simultaneous
|
|
// persistent connections from different users now keep their
|
|
// caller identity isolated.
|
|
|
|
// Persistent-instance runtime-init policy. TinyGo emits one of two
|
|
// start hooks depending on the build target:
|
|
//
|
|
// - wasi-reactor target → exports `_initialize` only
|
|
// - wasi (command) target → exports `_start` only
|
|
//
|
|
// Both hooks run the runtime's initAll (heap, GC, package init).
|
|
// `_start` additionally calls `main()` — fine when main is an
|
|
// empty stub (which is the convention for persistent WS functions
|
|
// since the gateway drives lifecycle via ws_open / ws_frame /
|
|
// ws_close, NOT main()).
|
|
//
|
|
// Without one of them being called, TinyGo's runtime stays in an
|
|
// uninitialized state and the very first export call traps via
|
|
// `wasmExportCheckRun` — managed-memory operations (allocs,
|
|
// hashmap ops) panic immediately.
|
|
//
|
|
// History of this code path (bugs #240/#249 follow-ups):
|
|
// - Original code: `WithStartFunctions()` with NO args
|
|
// (explicitly disable both). Intent was to skip main(); side
|
|
// effect was breaking TinyGo init. Persistent WS dead since
|
|
// plan #06 landed.
|
|
// - First fix: call `_initialize` manually. Worked for
|
|
// wasi-reactor builds. Still broken for wasi (command) builds
|
|
// like AnChat's rpc-router which only exports `_start`.
|
|
// - This fix: try `_initialize` first; fall back to `_start`
|
|
// if reactor hook isn't exported. Bounded by a 5s timeout so
|
|
// a runaway main() can't hang instantiation forever.
|
|
//
|
|
// AnChat's wasm-objdump output that pinned this:
|
|
// Export[15]:
|
|
// - func[127] <_start> → "_start"
|
|
// - func[414] <main.oramaAlloc#wasmexport> → "orama_alloc"
|
|
// - func[416] <main.wsOpen#wasmexport> → "ws_open"
|
|
// ...
|
|
// (no `_initialize`)
|
|
//
|
|
// We still pass `WithStartFunctions()` (no args) so wazero doesn't
|
|
// auto-call `_start` during InstantiateModule — we want full
|
|
// control over which hook runs and to bound it with our own
|
|
// timeout.
|
|
moduleConfig := wazero.NewModuleConfig().
|
|
WithName(fn.Name + "-" + invCtx.WSClientID).
|
|
WithStartFunctions().
|
|
WithStdin(emptyReader{}).
|
|
WithStdout(discardWriter{}).
|
|
WithStderr(discardWriter{}).
|
|
WithArgs(fn.Name).
|
|
// Bugboard #27 — wazero defaults to fake/sentinel clocks (deterministic
|
|
// fixtures for unit testing). TinyGo wasm calls WASI clock_time_get
|
|
// from time.Now() and gets a frozen ~2022-01-01T00:00:00.001Z back
|
|
// for every reading, silently poisoning any serverless function that
|
|
// embeds timestamps (receipts, audit rows, cursor cmp logic). Opt
|
|
// into real clocks via the documented wazero hook — same effect as
|
|
// the runtime would get on a normal Go process.
|
|
WithSysWalltime().
|
|
WithSysNanotime().
|
|
// Bugboard #120 — same class as #27. Without WithRandSource, wazero's
|
|
// default RNG is deterministic (zero seed), so TinyGo crypto/rand.Read
|
|
// returns identical bytes on every fresh instance — constant codes /
|
|
// nonces / tokens. Wire in the host CSPRNG. Same fix at
|
|
// execution/executor.go for the stateless path.
|
|
WithRandSource(cryptorand.Reader)
|
|
|
|
instance, err := e.runtime.InstantiateModule(ctx, compiled, moduleConfig)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("InstantiatePersistent: instantiate: %w", err)
|
|
}
|
|
|
|
// Bootstrap the wasm runtime. Try reactor hook first (no main()),
|
|
// then command hook (assumes main() is an empty stub per
|
|
// persistent-function convention). Bounded by a short timeout so
|
|
// a buggy main() can't hang every connection.
|
|
//
|
|
// Wrap initCtx with invCtx so any host functions called from a TinyGo
|
|
// init() (e.g. early GetEnv / GetSecret reads) see this connection's
|
|
// caller identity, not whatever happens to be on the singleton.
|
|
const initTimeout = 5 * time.Second
|
|
initCtx, initCancel := context.WithTimeout(WithInvocationContext(ctx, invCtx), initTimeout)
|
|
defer initCancel()
|
|
|
|
var initName string
|
|
var initFn api.Function
|
|
if hook := instance.ExportedFunction("_initialize"); hook != nil {
|
|
initName, initFn = "_initialize", hook
|
|
} else if hook := instance.ExportedFunction("_start"); hook != nil {
|
|
initName, initFn = "_start", hook
|
|
}
|
|
if initFn != nil {
|
|
_, callErr := initFn.Call(initCtx)
|
|
if callErr != nil {
|
|
// ExitError(0) is the "command-mode _start completed cleanly"
|
|
// signal from TinyGo (target=wasi). Our custom proc_exit
|
|
// override (persistentFriendlyProcExit, registered at engine
|
|
// setup) keeps the module alive in this case — it just
|
|
// panics ExitError(0) without calling CloseWithExitCode.
|
|
// So the bootstrap is actually successful and the module's
|
|
// exports remain callable.
|
|
//
|
|
// Anything else is a real failure: ExitError(N != 0) means
|
|
// the function's main() returned non-zero (or proc_exit was
|
|
// called explicitly with non-zero), or the runtime trapped
|
|
// during init. Close + propagate.
|
|
var exitErr *sys.ExitError
|
|
if errors.As(callErr, &exitErr) && exitErr.ExitCode() == 0 {
|
|
e.logger.Debug("persistent instance bootstrapped via _start (command-mode normal exit)",
|
|
zap.String("function", fn.Name),
|
|
zap.String("client_id", invCtx.WSClientID),
|
|
zap.String("init_hook", initName))
|
|
} else {
|
|
_ = instance.Close(ctx)
|
|
return nil, fmt.Errorf("InstantiatePersistent: %s: %w", initName, callErr)
|
|
}
|
|
} else {
|
|
// _initialize-style clean return (no panic). wasi-reactor
|
|
// modules built with TinyGo `//go:wasmexport` go this path.
|
|
e.logger.Debug("persistent instance bootstrapped",
|
|
zap.String("function", fn.Name),
|
|
zap.String("client_id", invCtx.WSClientID),
|
|
zap.String("init_hook", initName))
|
|
}
|
|
} else {
|
|
// Neither hook exported. The module may still work if it has
|
|
// no managed-memory operations — but that's rare in TinyGo.
|
|
// Log a warning so a function author who hits this can
|
|
// diagnose without filing a ticket.
|
|
e.logger.Warn("persistent module exports no _initialize or _start; runtime may be uninitialized",
|
|
zap.String("function", fn.Name),
|
|
zap.String("client_id", invCtx.WSClientID))
|
|
}
|
|
|
|
return instance, nil
|
|
}
|
|
|
|
// emptyReader satisfies io.Reader for persistent WASM stdin.
|
|
type emptyReader struct{}
|
|
|
|
func (emptyReader) Read(p []byte) (int, error) { return 0, nil }
|
|
|
|
// discardWriter satisfies io.Writer for persistent WASM stdout/stderr.
|
|
// Unlike io.Discard which has special handling, this is a typed value
|
|
// suitable for the wazero ModuleConfig API.
|
|
type discardWriter struct{}
|
|
|
|
func (discardWriter) Write(p []byte) (int, error) { return len(p), nil }
|
|
|
|
func (e *Engine) getOrCompileModule(ctx context.Context, wasmCID string) (wazero.CompiledModule, error) {
|
|
return e.moduleCache.GetOrCompute(wasmCID, func() (wazero.CompiledModule, error) {
|
|
// Fetch WASM bytes from registry
|
|
wasmBytes, err := e.registry.GetWASMBytes(ctx, wasmCID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch WASM: %w", err)
|
|
}
|
|
|
|
// Compile the module
|
|
compiled, err := e.lifecycle.CompileModule(ctx, wasmCID, wasmBytes)
|
|
if err != nil {
|
|
return nil, ErrCompilationFailed
|
|
}
|
|
|
|
// Enforce memory limits
|
|
if err := e.checkMemoryLimits(compiled); err != nil {
|
|
compiled.Close(ctx)
|
|
return nil, err
|
|
}
|
|
|
|
return compiled, nil
|
|
})
|
|
}
|
|
|
|
// logInvocation logs an invocation record.
|
|
//
|
|
// `logBuf` is the per-invocation LogBuffer attached to ctx at Execute
|
|
// start (bugboard #108 fix). When non-nil, the record's Logs field is
|
|
// populated from the buffer's snapshot — invocation-local, no
|
|
// cross-contamination. When nil (legacy callers that haven't been
|
|
// updated), falls back to the HostFunctions singleton via the
|
|
// GetLogs() interface check — same behavior as pre-#108.
|
|
func (e *Engine) logInvocation(ctx context.Context, fn *Function, invCtx *InvocationContext, logBuf *LogBuffer, startTime time.Time, outputSize int, status InvocationStatus, err error) {
|
|
if e.invocationLogger == nil || !e.config.LogInvocations {
|
|
return
|
|
}
|
|
|
|
completedAt := time.Now()
|
|
record := &InvocationRecord{
|
|
ID: uuid.New().String(),
|
|
FunctionID: fn.ID,
|
|
RequestID: invCtx.RequestID,
|
|
TriggerType: invCtx.TriggerType,
|
|
CallerWallet: invCtx.CallerWallet,
|
|
OutputSize: outputSize,
|
|
StartedAt: startTime,
|
|
CompletedAt: completedAt,
|
|
DurationMS: completedAt.Sub(startTime).Milliseconds(),
|
|
Status: status,
|
|
}
|
|
|
|
if err != nil {
|
|
record.ErrorMessage = err.Error()
|
|
}
|
|
|
|
// Collect logs: prefer the per-invocation LogBuffer (bugboard #108),
|
|
// fall back to the legacy singleton for callers that haven't been
|
|
// migrated yet. The singleton path was the source of the
|
|
// cross-contamination bug; once every Execute path passes a real
|
|
// buffer here, the GetLogs() singleton read is dead code that
|
|
// can be removed in a future cleanup.
|
|
if logBuf != nil {
|
|
record.Logs = logBuf.Snapshot()
|
|
} else if hf, ok := e.hostServices.(interface{ GetLogs() []LogEntry }); ok {
|
|
record.Logs = hf.GetLogs()
|
|
}
|
|
|
|
if logErr := e.invocationLogger.Log(ctx, record); logErr != nil {
|
|
e.logger.Warn("Failed to log invocation", zap.Error(logErr))
|
|
}
|
|
}
|
|
|
|
// registerHostModule registers the Orama host functions with the wazero runtime.
|
|
//
|
|
// We expose the SAME export set under three module names:
|
|
//
|
|
// - "env" — canonical. Matches the WASI / TinyGo convention. The
|
|
// official SDK examples and docs use this name.
|
|
// - "host" — long-standing alias kept for backward compatibility.
|
|
// - "orama" — alias added 2026-05-06 after multiple apps intuited the
|
|
// brand name as the import target and hit cryptic
|
|
// "module[orama] not instantiated" errors. Cheap insurance:
|
|
// a few KB of runtime metadata per alias, zero behavioral
|
|
// cost. Apps SHOULD prefer `env` going forward; `orama` is
|
|
// supported indefinitely to avoid breaking deployed code.
|
|
//
|
|
// All three names resolve to identical function tables — a WASM module
|
|
// can mix imports across the three with no consequence.
|
|
func (e *Engine) registerHostModule(ctx context.Context) error {
|
|
for _, moduleName := range []string{"env", "host", "orama"} {
|
|
_, err := e.runtime.NewHostModuleBuilder(moduleName).
|
|
NewFunctionBuilder().WithFunc(e.hGetCallerWallet).Export("get_caller_wallet").
|
|
NewFunctionBuilder().WithFunc(e.hGetCallerJWTSubject).Export("get_caller_jwt_subject").
|
|
NewFunctionBuilder().WithFunc(e.hGetWSClientID).Export("get_ws_client_id").
|
|
NewFunctionBuilder().WithFunc(e.hGetCallerClaim).Export("get_caller_claim").
|
|
NewFunctionBuilder().WithFunc(e.hGetRequestID).Export("get_request_id").
|
|
NewFunctionBuilder().WithFunc(e.hGetEnv).Export("get_env").
|
|
NewFunctionBuilder().WithFunc(e.hGetSecret).Export("get_secret").
|
|
NewFunctionBuilder().WithFunc(e.hDBQuery).Export("db_query").
|
|
NewFunctionBuilder().WithFunc(e.hDBQueryV2).Export("db_query_v2").
|
|
NewFunctionBuilder().WithFunc(e.hDBExecute).Export("db_execute").
|
|
NewFunctionBuilder().WithFunc(e.hDBExecuteV2).Export("db_execute_v2").
|
|
NewFunctionBuilder().WithFunc(e.hDBTransaction).Export("db_transaction").
|
|
NewFunctionBuilder().WithFunc(e.hDBQueryBatch).Export("db_query_batch").
|
|
NewFunctionBuilder().WithFunc(e.hExecAndPublish).Export("exec_and_publish").
|
|
NewFunctionBuilder().WithFunc(e.hCacheGet).Export("cache_get").
|
|
NewFunctionBuilder().WithFunc(e.hCacheSet).Export("cache_set").
|
|
NewFunctionBuilder().WithFunc(e.hCacheIncr).Export("cache_incr").
|
|
NewFunctionBuilder().WithFunc(e.hCacheIncrBy).Export("cache_incr_by").
|
|
NewFunctionBuilder().WithFunc(e.hHTTPFetch).Export("http_fetch").
|
|
NewFunctionBuilder().WithFunc(e.hAnyoneFetch).Export("anyone_fetch").
|
|
NewFunctionBuilder().WithFunc(e.hSetHTTPResponse).Export("set_http_response").
|
|
NewFunctionBuilder().WithFunc(e.hPubSubPublish).Export("pubsub_publish").
|
|
NewFunctionBuilder().WithFunc(e.hPubSubPublishBatch).Export("pubsub_publish_batch").
|
|
NewFunctionBuilder().WithFunc(e.hPushSend).Export("push_send").
|
|
NewFunctionBuilder().WithFunc(e.hPushSendV2).Export("push_send_v2").
|
|
NewFunctionBuilder().WithFunc(e.hTurnCredentials).Export("turn_credentials").
|
|
NewFunctionBuilder().WithFunc(e.hWSPubSubBridge).Export("ws_pubsub_bridge").
|
|
NewFunctionBuilder().WithFunc(e.hWSPubSubUnbridge).Export("ws_pubsub_unbridge").
|
|
NewFunctionBuilder().WithFunc(e.hWSSend).Export("ws_send").
|
|
NewFunctionBuilder().WithFunc(e.hWSBroadcast).Export("ws_broadcast").
|
|
NewFunctionBuilder().WithFunc(e.hEphemeralStateSet).Export("ephemeral_state_set").
|
|
NewFunctionBuilder().WithFunc(e.hEphemeralStateClear).Export("ephemeral_state_clear").
|
|
NewFunctionBuilder().WithFunc(e.hFunctionInvoke).Export("function_invoke").
|
|
NewFunctionBuilder().WithFunc(e.hFunctionInvokeAsync).Export("function_invoke_async").
|
|
NewFunctionBuilder().WithFunc(e.hLogInfo).Export("log_info").
|
|
NewFunctionBuilder().WithFunc(e.hLogError).Export("log_error").
|
|
Instantiate(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// Host function implementations (delegate to executor for memory operations)
|
|
// -----------------------------------------------------------------------------
|
|
|
|
func (e *Engine) hGetCallerWallet(ctx context.Context, mod api.Module) uint64 {
|
|
wallet := e.hostServices.GetCallerWallet(ctx)
|
|
return e.executor.WriteToGuest(ctx, mod, []byte(wallet))
|
|
}
|
|
|
|
func (e *Engine) hGetRequestID(ctx context.Context, mod api.Module) uint64 {
|
|
rid := e.hostServices.GetRequestID(ctx)
|
|
return e.executor.WriteToGuest(ctx, mod, []byte(rid))
|
|
}
|
|
|
|
// hGetWSClientID returns the current invocation's WebSocket client ID, or
|
|
// empty string if the function wasn't invoked via WS.
|
|
func (e *Engine) hGetWSClientID(ctx context.Context, mod api.Module) uint64 {
|
|
cid := e.hostServices.GetWSClientID(ctx)
|
|
return e.executor.WriteToGuest(ctx, mod, []byte(cid))
|
|
}
|
|
|
|
// hGetCallerJWTSubject returns the JWT `sub` claim explicitly. Empty
|
|
// string if the request was not JWT-authenticated. See bug #215.
|
|
func (e *Engine) hGetCallerJWTSubject(ctx context.Context, mod api.Module) uint64 {
|
|
sub := e.hostServices.GetCallerJWTSubject(ctx)
|
|
return e.executor.WriteToGuest(ctx, mod, []byte(sub))
|
|
}
|
|
|
|
// hGetCallerClaim reads a claim name from guest memory, looks it up on the
|
|
// caller's JWT custom claims, and writes the value (or empty string) back.
|
|
func (e *Engine) hGetCallerClaim(ctx context.Context, mod api.Module, namePtr, nameLen uint32) uint64 {
|
|
name, ok := e.executor.ReadFromGuest(mod, namePtr, nameLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
val := e.hostServices.GetCallerClaim(ctx, string(name))
|
|
return e.executor.WriteToGuest(ctx, mod, []byte(val))
|
|
}
|
|
|
|
func (e *Engine) hGetEnv(ctx context.Context, mod api.Module, keyPtr, keyLen uint32) uint64 {
|
|
key, ok := e.executor.ReadFromGuest(mod, keyPtr, keyLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
val, _ := e.hostServices.GetEnv(ctx, string(key))
|
|
return e.executor.WriteToGuest(ctx, mod, []byte(val))
|
|
}
|
|
|
|
func (e *Engine) hGetSecret(ctx context.Context, mod api.Module, namePtr, nameLen uint32) uint64 {
|
|
name, ok := e.executor.ReadFromGuest(mod, namePtr, nameLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
val, err := e.hostServices.GetSecret(ctx, string(name))
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
return e.executor.WriteToGuest(ctx, mod, []byte(val))
|
|
}
|
|
|
|
func (e *Engine) hDBQuery(ctx context.Context, mod api.Module, queryPtr, queryLen, argsPtr, argsLen uint32) uint64 {
|
|
query, ok := e.executor.ReadFromGuest(mod, queryPtr, queryLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
|
|
var args []interface{}
|
|
if argsLen > 0 {
|
|
if err := e.executor.UnmarshalJSONFromGuest(mod, argsPtr, argsLen, &args); err != nil {
|
|
e.logger.Error("failed to unmarshal db_query arguments", zap.Error(err))
|
|
return 0
|
|
}
|
|
}
|
|
|
|
results, err := e.hostServices.DBQuery(ctx, string(query), args)
|
|
if err != nil {
|
|
e.logger.Error("host function db_query failed", zap.Error(err), zap.String("query", string(query)))
|
|
return 0
|
|
}
|
|
return e.executor.WriteToGuest(ctx, mod, results)
|
|
}
|
|
|
|
func (e *Engine) hDBExecute(ctx context.Context, mod api.Module, queryPtr, queryLen, argsPtr, argsLen uint32) uint32 {
|
|
query, ok := e.executor.ReadFromGuest(mod, queryPtr, queryLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
|
|
var args []interface{}
|
|
if argsLen > 0 {
|
|
if err := e.executor.UnmarshalJSONFromGuest(mod, argsPtr, argsLen, &args); err != nil {
|
|
e.logger.Error("failed to unmarshal db_execute arguments", zap.Error(err))
|
|
return 0
|
|
}
|
|
}
|
|
|
|
affected, err := e.hostServices.DBExecute(ctx, string(query), args)
|
|
if err != nil {
|
|
e.logger.Error("host function db_execute failed", zap.Error(err), zap.String("query", string(query)))
|
|
return 0
|
|
}
|
|
return uint32(affected)
|
|
}
|
|
|
|
func (e *Engine) hCacheGet(ctx context.Context, mod api.Module, keyPtr, keyLen uint32) uint64 {
|
|
key, ok := e.executor.ReadFromGuest(mod, keyPtr, keyLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
val, err := e.hostServices.CacheGet(ctx, string(key))
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
return e.executor.WriteToGuest(ctx, mod, val)
|
|
}
|
|
|
|
func (e *Engine) hCacheSet(ctx context.Context, mod api.Module, keyPtr, keyLen, valPtr, valLen uint32, ttl int64) {
|
|
key, ok := e.executor.ReadFromGuest(mod, keyPtr, keyLen)
|
|
if !ok {
|
|
return
|
|
}
|
|
val, ok := e.executor.ReadFromGuest(mod, valPtr, valLen)
|
|
if !ok {
|
|
return
|
|
}
|
|
_ = e.hostServices.CacheSet(ctx, string(key), val, ttl)
|
|
}
|
|
|
|
func (e *Engine) hCacheIncr(ctx context.Context, mod api.Module, keyPtr, keyLen uint32) int64 {
|
|
key, ok := e.executor.ReadFromGuest(mod, keyPtr, keyLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
val, err := e.hostServices.CacheIncr(ctx, string(key))
|
|
if err != nil {
|
|
e.logger.Error("host function cache_incr failed", zap.Error(err), zap.String("key", string(key)))
|
|
return 0
|
|
}
|
|
return val
|
|
}
|
|
|
|
func (e *Engine) hCacheIncrBy(ctx context.Context, mod api.Module, keyPtr, keyLen uint32, delta int64) int64 {
|
|
key, ok := e.executor.ReadFromGuest(mod, keyPtr, keyLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
val, err := e.hostServices.CacheIncrBy(ctx, string(key), delta)
|
|
if err != nil {
|
|
e.logger.Error("host function cache_incr_by failed", zap.Error(err), zap.String("key", string(key)), zap.Int64("delta", delta))
|
|
return 0
|
|
}
|
|
return val
|
|
}
|
|
|
|
func (e *Engine) hHTTPFetch(ctx context.Context, mod api.Module, methodPtr, methodLen, urlPtr, urlLen, headersPtr, headersLen, bodyPtr, bodyLen uint32) uint64 {
|
|
method, ok := e.executor.ReadFromGuest(mod, methodPtr, methodLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
u, ok := e.executor.ReadFromGuest(mod, urlPtr, urlLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
|
|
var headers map[string]string
|
|
if headersLen > 0 {
|
|
if err := e.executor.UnmarshalJSONFromGuest(mod, headersPtr, headersLen, &headers); err != nil {
|
|
e.logger.Error("failed to unmarshal http_fetch headers", zap.Error(err))
|
|
return 0
|
|
}
|
|
}
|
|
|
|
body, ok := e.executor.ReadFromGuest(mod, bodyPtr, bodyLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
|
|
resp, err := e.hostServices.HTTPFetch(ctx, string(method), string(u), headers, body)
|
|
if err != nil {
|
|
e.logger.Error("host function http_fetch failed", zap.Error(err), zap.String("url", string(u)))
|
|
return 0
|
|
}
|
|
return e.executor.WriteToGuest(ctx, mod, resp)
|
|
}
|
|
|
|
// hSetHTTPResponse is the WASM-callable wrapper for SetHTTPResponse —
|
|
// bugboard #835 raw-HTTP-response mode.
|
|
//
|
|
// ABI: set_http_response(status i32, headersJSONPtr, headersJSONLen,
|
|
// bodyPtr, bodyLen uint32) -> uint32. headersJSON (when non-empty) is a JSON
|
|
// object of string→string. Returns 1 on success, 0 on failure (function not
|
|
// deployed with raw_http_response, bad status, oversized headers/body, or a
|
|
// guest-memory read error).
|
|
func (e *Engine) hSetHTTPResponse(ctx context.Context, mod api.Module,
|
|
status, headersPtr, headersLen, bodyPtr, bodyLen uint32) uint32 {
|
|
var headers map[string]string
|
|
if headersLen > 0 {
|
|
if err := e.executor.UnmarshalJSONFromGuest(mod, headersPtr, headersLen, &headers); err != nil {
|
|
e.logger.Warn("set_http_response: failed to unmarshal headers", zap.Error(err))
|
|
return 0
|
|
}
|
|
}
|
|
|
|
var body []byte
|
|
if bodyLen > 0 {
|
|
b, ok := e.executor.ReadFromGuest(mod, bodyPtr, bodyLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
body = b
|
|
}
|
|
|
|
if err := e.hostServices.SetHTTPResponse(ctx, int(status), headers, body); err != nil {
|
|
e.logger.Warn("host function set_http_response failed", zap.Error(err))
|
|
return 0
|
|
}
|
|
return 1
|
|
}
|
|
|
|
// hAnyoneFetch is the WASM-callable wrapper for AnyoneFetch — feat-11.
|
|
// Identical ABI to hHTTPFetch (method, url, headers JSON, body), routes
|
|
// through the Anyone SOCKS5 proxy. Returns packed (ptr<<32 | len) to the
|
|
// JSON response envelope, or 0 on a setup error (the typed
|
|
// proxy-unavailable / transport-error cases come back inside the
|
|
// envelope with status 0, NOT as a 0 return).
|
|
func (e *Engine) hAnyoneFetch(ctx context.Context, mod api.Module, methodPtr, methodLen, urlPtr, urlLen, headersPtr, headersLen, bodyPtr, bodyLen uint32) uint64 {
|
|
method, ok := e.executor.ReadFromGuest(mod, methodPtr, methodLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
u, ok := e.executor.ReadFromGuest(mod, urlPtr, urlLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
|
|
var headers map[string]string
|
|
if headersLen > 0 {
|
|
if err := e.executor.UnmarshalJSONFromGuest(mod, headersPtr, headersLen, &headers); err != nil {
|
|
e.logger.Error("failed to unmarshal anyone_fetch headers", zap.Error(err))
|
|
return 0
|
|
}
|
|
}
|
|
|
|
body, ok := e.executor.ReadFromGuest(mod, bodyPtr, bodyLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
|
|
resp, err := e.hostServices.AnyoneFetch(ctx, string(method), string(u), headers, body)
|
|
if err != nil {
|
|
e.logger.Error("host function anyone_fetch failed", zap.Error(err), zap.String("url", string(u)))
|
|
return 0
|
|
}
|
|
return e.executor.WriteToGuest(ctx, mod, resp)
|
|
}
|
|
|
|
func (e *Engine) hPubSubPublish(ctx context.Context, mod api.Module, topicPtr, topicLen, dataPtr, dataLen uint32) uint32 {
|
|
topic, ok := e.executor.ReadFromGuest(mod, topicPtr, topicLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
|
|
data, ok := e.executor.ReadFromGuest(mod, dataPtr, dataLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
|
|
err := e.hostServices.PubSubPublish(ctx, string(topic), data)
|
|
if err != nil {
|
|
e.logger.Error("host function pubsub_publish failed", zap.Error(err), zap.String("topic", string(topic)))
|
|
return 0
|
|
}
|
|
return 1 // Success
|
|
}
|
|
|
|
// hPubSubPublishBatch is the WASM-callable wrapper for PubSubPublishBatch.
|
|
// Input: pointer/length of a JSON array of {topic, data_base64}.
|
|
// Returns 1 on success, 0 on error.
|
|
func (e *Engine) hPubSubPublishBatch(ctx context.Context, mod api.Module, msgsPtr, msgsLen uint32) uint32 {
|
|
msgsJSON, ok := e.executor.ReadFromGuest(mod, msgsPtr, msgsLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
if err := e.hostServices.PubSubPublishBatch(ctx, msgsJSON); err != nil {
|
|
e.logger.Error("host function pubsub_publish_batch failed", zap.Error(err))
|
|
return 0
|
|
}
|
|
return 1
|
|
}
|
|
|
|
// hDBExecuteV2 is the WASM-callable wrapper for DBExecuteV2 (bug #218).
|
|
// Inputs: pointer/length of SQL + JSON args. Returns a packed uint64
|
|
// (ptr<<32 | len) pointing to the JSON envelope in guest memory, or 0 on
|
|
// host-side failure. The JSON envelope's "error" field carries SQL errors.
|
|
func (e *Engine) hDBExecuteV2(ctx context.Context, mod api.Module, queryPtr, queryLen, argsPtr, argsLen uint32) uint64 {
|
|
query, ok := e.executor.ReadFromGuest(mod, queryPtr, queryLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
var args []interface{}
|
|
if argsLen > 0 {
|
|
if err := e.executor.UnmarshalJSONFromGuest(mod, argsPtr, argsLen, &args); err != nil {
|
|
e.logger.Warn("db_execute_v2: failed to unmarshal args", zap.Error(err))
|
|
return 0
|
|
}
|
|
}
|
|
out, err := e.hostServices.DBExecuteV2(ctx, string(query), args)
|
|
if err != nil {
|
|
e.logger.Warn("host function db_execute_v2 failed", zap.Error(err))
|
|
return 0
|
|
}
|
|
return e.executor.WriteToGuest(ctx, mod, out)
|
|
}
|
|
|
|
// hDBQueryV2 is the WASM-callable wrapper for DBQueryV2 (bug #218).
|
|
func (e *Engine) hDBQueryV2(ctx context.Context, mod api.Module, queryPtr, queryLen, argsPtr, argsLen uint32) uint64 {
|
|
query, ok := e.executor.ReadFromGuest(mod, queryPtr, queryLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
var args []interface{}
|
|
if argsLen > 0 {
|
|
if err := e.executor.UnmarshalJSONFromGuest(mod, argsPtr, argsLen, &args); err != nil {
|
|
e.logger.Warn("db_query_v2: failed to unmarshal args", zap.Error(err))
|
|
return 0
|
|
}
|
|
}
|
|
out, err := e.hostServices.DBQueryV2(ctx, string(query), args)
|
|
if err != nil {
|
|
e.logger.Warn("host function db_query_v2 failed", zap.Error(err))
|
|
return 0
|
|
}
|
|
return e.executor.WriteToGuest(ctx, mod, out)
|
|
}
|
|
|
|
// hDBTransaction is the WASM-callable wrapper for DBTransaction.
|
|
// Input: pointer/length of opsJSON ({"ops":[{kind,sql,args},...]}).
|
|
// Returns a packed uint64 (ptr<<32 | len) pointing to JSON BatchResult in
|
|
// guest memory, or 0 on setup error.
|
|
//
|
|
// Note the result JSON's `committed` field tells the caller whether the
|
|
// writes landed — a return of non-zero does NOT imply commit.
|
|
func (e *Engine) hDBTransaction(ctx context.Context, mod api.Module, opsPtr, opsLen uint32) uint64 {
|
|
opsJSON, ok := e.executor.ReadFromGuest(mod, opsPtr, opsLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
out, err := e.hostServices.DBTransaction(ctx, opsJSON)
|
|
if err != nil {
|
|
e.logger.Warn("host function db_transaction failed", zap.Error(err))
|
|
return 0
|
|
}
|
|
return e.executor.WriteToGuest(ctx, mod, out)
|
|
}
|
|
|
|
// hDBQueryBatch is the WASM-callable wrapper for DBQueryBatch.
|
|
// Input: pointer/length of opsJSON ({"ops":[{"sql":"...","args":[...]}, ...]}).
|
|
// Returns a packed uint64 (ptr<<32 | len) pointing to JSON result in guest
|
|
// memory, or 0 on setup/transport error.
|
|
//
|
|
// Per-query errors are surfaced inside the JSON result (one entry per op
|
|
// has its own `error` field). A return of 0 means the whole call failed
|
|
// before per-op results could be built.
|
|
func (e *Engine) hDBQueryBatch(ctx context.Context, mod api.Module, opsPtr, opsLen uint32) uint64 {
|
|
opsJSON, ok := e.executor.ReadFromGuest(mod, opsPtr, opsLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
out, err := e.hostServices.DBQueryBatch(ctx, opsJSON)
|
|
if err != nil {
|
|
e.logger.Warn("host function db_query_batch failed", zap.Error(err))
|
|
return 0
|
|
}
|
|
return e.executor.WriteToGuest(ctx, mod, out)
|
|
}
|
|
|
|
// hExecAndPublish is the WASM-callable wrapper for ExecAndPublish.
|
|
// Inputs:
|
|
//
|
|
// opsPtr/opsLen — JSON {"ops":[{kind,sql,args},...]}
|
|
// topicPtr/topicLen — UTF-8 PubSub topic for the wake-up
|
|
// dataPtr/dataLen — wake-up payload bytes; "{{seq}}" will be substituted
|
|
//
|
|
// Returns a packed uint64 (ptr<<32 | len) pointing to the JSON result in
|
|
// guest memory, or 0 on setup error. The result JSON has fields
|
|
// committed/seq/published/publish_error that the caller inspects.
|
|
func (e *Engine) hExecAndPublish(ctx context.Context, mod api.Module,
|
|
opsPtr, opsLen, topicPtr, topicLen, dataPtr, dataLen uint32) uint64 {
|
|
|
|
opsJSON, ok := e.executor.ReadFromGuest(mod, opsPtr, opsLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
topic, ok := e.executor.ReadFromGuest(mod, topicPtr, topicLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
data, ok := e.executor.ReadFromGuest(mod, dataPtr, dataLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
out, err := e.hostServices.ExecAndPublish(ctx, opsJSON, string(topic), data)
|
|
if err != nil {
|
|
e.logger.Warn("host function exec_and_publish failed",
|
|
zap.String("topic", string(topic)),
|
|
zap.Error(err))
|
|
return 0
|
|
}
|
|
return e.executor.WriteToGuest(ctx, mod, out)
|
|
}
|
|
|
|
// hWSPubSubBridge is the WASM-callable wrapper for WSPubSubBridge.
|
|
// Inputs: clientID + topic strings. Returns 1 on success, 0 on error.
|
|
func (e *Engine) hWSPubSubBridge(ctx context.Context, mod api.Module,
|
|
cidPtr, cidLen, topicPtr, topicLen uint32) uint32 {
|
|
cid, ok := e.executor.ReadFromGuest(mod, cidPtr, cidLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
topic, ok := e.executor.ReadFromGuest(mod, topicPtr, topicLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
if err := e.hostServices.WSPubSubBridge(ctx, string(cid), string(topic)); err != nil {
|
|
e.logger.Warn("ws_pubsub_bridge failed",
|
|
zap.String("client_id", string(cid)),
|
|
zap.String("topic", string(topic)),
|
|
zap.Error(err))
|
|
return 0
|
|
}
|
|
return 1
|
|
}
|
|
|
|
// hWSPubSubUnbridge is the WASM-callable wrapper for WSPubSubUnbridge.
|
|
func (e *Engine) hWSPubSubUnbridge(ctx context.Context, mod api.Module,
|
|
cidPtr, cidLen, topicPtr, topicLen uint32) uint32 {
|
|
cid, ok := e.executor.ReadFromGuest(mod, cidPtr, cidLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
topic, ok := e.executor.ReadFromGuest(mod, topicPtr, topicLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
if err := e.hostServices.WSPubSubUnbridge(ctx, string(cid), string(topic)); err != nil {
|
|
e.logger.Warn("ws_pubsub_unbridge failed",
|
|
zap.String("client_id", string(cid)),
|
|
zap.String("topic", string(topic)),
|
|
zap.Error(err))
|
|
return 0
|
|
}
|
|
return 1
|
|
}
|
|
|
|
// hFunctionInvoke is the WASM-callable wrapper for FunctionInvoke. Used by
|
|
// the rpc-router persistent function (and any future dispatcher) to run
|
|
// another function in the same namespace synchronously and forward its
|
|
// output back to its caller.
|
|
//
|
|
// Inputs:
|
|
//
|
|
// namePtr/nameLen — UTF-8 target function name
|
|
// payloadPtr/payloadLen — raw input bytes for the target function
|
|
//
|
|
// Returns a packed uint64 (ptr<<32 | len) pointing to the target's output
|
|
// bytes in guest memory, or 0 on error. The caller is expected to JSON-
|
|
// decode the output (target functions ack with JSON envelopes).
|
|
func (e *Engine) hFunctionInvoke(ctx context.Context, mod api.Module,
|
|
namePtr, nameLen, payloadPtr, payloadLen uint32) uint64 {
|
|
name, ok := e.executor.ReadFromGuest(mod, namePtr, nameLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
payload, ok := e.executor.ReadFromGuest(mod, payloadPtr, payloadLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
out, err := e.hostServices.FunctionInvoke(ctx, string(name), payload)
|
|
if err != nil {
|
|
e.logger.Warn("function_invoke failed",
|
|
zap.String("name", string(name)),
|
|
zap.Error(err))
|
|
return 0
|
|
}
|
|
return e.executor.WriteToGuest(ctx, mod, out)
|
|
}
|
|
|
|
// hFunctionInvokeAsync is the WASM-callable wrapper for FunctionInvokeAsync.
|
|
// Fire-and-forget: it dispatches the target function to run concurrently and
|
|
// returns immediately so the caller's frame loop isn't blocked on the target's
|
|
// I/O. The target inherits the caller's identity (incl. WS client ID) and is
|
|
// expected to deliver its own result to the client via ws_send.
|
|
//
|
|
// Inputs mirror hFunctionInvoke (name + payload pointers). Returns 1 when the
|
|
// invocation was ACCEPTED (queued), 0 on a read failure or backpressure
|
|
// rejection — the guest can fall back to a synchronous function_invoke or
|
|
// surface "busy" to the client.
|
|
func (e *Engine) hFunctionInvokeAsync(ctx context.Context, mod api.Module,
|
|
namePtr, nameLen, payloadPtr, payloadLen uint32) uint32 {
|
|
name, ok := e.executor.ReadFromGuest(mod, namePtr, nameLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
payload, ok := e.executor.ReadFromGuest(mod, payloadPtr, payloadLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
if err := e.hostServices.FunctionInvokeAsync(ctx, string(name), payload); err != nil {
|
|
e.logger.Warn("function_invoke_async rejected",
|
|
zap.String("name", string(name)),
|
|
zap.Error(err))
|
|
return 0
|
|
}
|
|
return 1
|
|
}
|
|
|
|
// hWSSend is the WASM-callable wrapper for WSSend.
|
|
// Inputs: clientID + raw frame bytes. clientID may be empty — in that case
|
|
// the host falls back to the current invocation's WS client (if any).
|
|
// Returns 1 on success, 0 on error.
|
|
func (e *Engine) hWSSend(ctx context.Context, mod api.Module,
|
|
cidPtr, cidLen, dataPtr, dataLen uint32) uint32 {
|
|
cid, ok := e.executor.ReadFromGuest(mod, cidPtr, cidLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
data, ok := e.executor.ReadFromGuest(mod, dataPtr, dataLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
if err := e.hostServices.WSSend(ctx, string(cid), data); err != nil {
|
|
e.logger.Warn("ws_send failed",
|
|
zap.String("client_id", string(cid)),
|
|
zap.Error(err))
|
|
return 0
|
|
}
|
|
return 1
|
|
}
|
|
|
|
// hWSBroadcast is the WASM-callable wrapper for WSBroadcast.
|
|
// Inputs: topic + raw frame bytes. Sends `data` to every WS client currently
|
|
// subscribed to `topic` in the function's namespace. Returns 1 on success,
|
|
// 0 on error.
|
|
func (e *Engine) hWSBroadcast(ctx context.Context, mod api.Module,
|
|
topicPtr, topicLen, dataPtr, dataLen uint32) uint32 {
|
|
topic, ok := e.executor.ReadFromGuest(mod, topicPtr, topicLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
data, ok := e.executor.ReadFromGuest(mod, dataPtr, dataLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
if err := e.hostServices.WSBroadcast(ctx, string(topic), data); err != nil {
|
|
e.logger.Warn("ws_broadcast failed",
|
|
zap.String("topic", string(topic)),
|
|
zap.Error(err))
|
|
return 0
|
|
}
|
|
return 1
|
|
}
|
|
|
|
// hEphemeralStateSet is the WASM-callable wrapper for EphemeralStateSet —
|
|
// bugboard #710 WS-subscribe-tracked ephemeral state.
|
|
//
|
|
// ABI: ephemeral_state_set(topicPtr, topicLen, keyPtr, keyLen, payloadPtr,
|
|
// payloadLen uint32, ttlMs int64) -> uint32. Returns 1 on success, 0 on
|
|
// failure (no WS client in context, empty topic/key, oversized payload,
|
|
// per-client key cap, or a guest-memory read error).
|
|
func (e *Engine) hEphemeralStateSet(ctx context.Context, mod api.Module,
|
|
topicPtr, topicLen, keyPtr, keyLen, payloadPtr, payloadLen uint32, ttlMs int64) uint32 {
|
|
topic, ok := e.executor.ReadFromGuest(mod, topicPtr, topicLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
key, ok := e.executor.ReadFromGuest(mod, keyPtr, keyLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
var payload []byte
|
|
if payloadLen > 0 {
|
|
p, ok := e.executor.ReadFromGuest(mod, payloadPtr, payloadLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
payload = p
|
|
}
|
|
if err := e.hostServices.EphemeralStateSet(ctx, string(topic), string(key), payload, ttlMs); err != nil {
|
|
e.logger.Warn("host function ephemeral_state_set failed",
|
|
zap.String("topic", string(topic)),
|
|
zap.String("key", string(key)),
|
|
zap.Error(err))
|
|
return 0
|
|
}
|
|
return 1
|
|
}
|
|
|
|
// hEphemeralStateClear is the WASM-callable wrapper for EphemeralStateClear.
|
|
//
|
|
// ABI: ephemeral_state_clear(topicPtr, topicLen, keyPtr, keyLen uint32) ->
|
|
// uint32. Returns 1 on success (including idempotent clears of a missing key),
|
|
// 0 on failure (no WS client in context, empty topic/key, or a guest-memory
|
|
// read error).
|
|
func (e *Engine) hEphemeralStateClear(ctx context.Context, mod api.Module,
|
|
topicPtr, topicLen, keyPtr, keyLen uint32) uint32 {
|
|
topic, ok := e.executor.ReadFromGuest(mod, topicPtr, topicLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
key, ok := e.executor.ReadFromGuest(mod, keyPtr, keyLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
if err := e.hostServices.EphemeralStateClear(ctx, string(topic), string(key)); err != nil {
|
|
e.logger.Warn("host function ephemeral_state_clear failed",
|
|
zap.String("topic", string(topic)),
|
|
zap.String("key", string(key)),
|
|
zap.Error(err))
|
|
return 0
|
|
}
|
|
return 1
|
|
}
|
|
|
|
// hPushSend is the WASM-callable wrapper for PushSend.
|
|
// Inputs:
|
|
// userIDPtr/userIDLen — UTF-8 user ID to push to (within the function's
|
|
// own namespace; the namespace is server-side trusted)
|
|
// msgPtr/msgLen — JSON payload matching hostfunctions.PushSendArgs
|
|
// Returns 1 on success, 0 on error.
|
|
func (e *Engine) hPushSend(ctx context.Context, mod api.Module,
|
|
userIDPtr, userIDLen, msgPtr, msgLen uint32) uint32 {
|
|
userID, ok := e.executor.ReadFromGuest(mod, userIDPtr, userIDLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
msgJSON, ok := e.executor.ReadFromGuest(mod, msgPtr, msgLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
if err := e.hostServices.PushSend(ctx, string(userID), msgJSON); err != nil {
|
|
e.logger.Error("host function push_send failed",
|
|
zap.String("user_id", string(userID)),
|
|
zap.Error(err))
|
|
return 0
|
|
}
|
|
return 1
|
|
}
|
|
|
|
// hPushSendV2 is the WASM-callable wrapper for PushSendV2 — the
|
|
// rich-result push host function. Returns a packed uint64
|
|
// (ptr<<32 | len) pointing to a JSON envelope in guest memory, or 0
|
|
// on setup/validation error.
|
|
//
|
|
// The JSON envelope is push.SendDetailedResult: top-level Ok bool,
|
|
// per-device Results with HTTP status / reason / unregistered flag.
|
|
// Callers MUST parse it — a non-zero return does NOT mean every
|
|
// device succeeded (read result.ok or iterate results[]).
|
|
//
|
|
// Bugboard #348: replaces the binary success/fail of PushSend with
|
|
// the full per-device truth so WASM callers can react granularly.
|
|
func (e *Engine) hPushSendV2(ctx context.Context, mod api.Module,
|
|
userIDPtr, userIDLen, msgPtr, msgLen uint32) uint64 {
|
|
userID, ok := e.executor.ReadFromGuest(mod, userIDPtr, userIDLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
msgJSON, ok := e.executor.ReadFromGuest(mod, msgPtr, msgLen)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
out, err := e.hostServices.PushSendV2(ctx, string(userID), msgJSON)
|
|
if err != nil {
|
|
e.logger.Warn("host function push_send_v2 failed",
|
|
zap.String("user_id", string(userID)),
|
|
zap.Error(err))
|
|
return 0
|
|
}
|
|
return e.executor.WriteToGuest(ctx, mod, out)
|
|
}
|
|
|
|
// hTurnCredentials is the WASM-callable wrapper for TurnCredentials —
|
|
// feat-9. Takes no args (namespace derived from invocation context),
|
|
// returns packed uint64 (ptr<<32 | len) pointing to a JSON envelope in
|
|
// guest memory, or 0 on setup error.
|
|
//
|
|
// The envelope shape is documented at turn.go:turnCredentialsEnvelope.
|
|
// Callers MUST parse it — a non-zero return doesn't imply TURN is
|
|
// configured (check envelope.configured before using credentials).
|
|
func (e *Engine) hTurnCredentials(ctx context.Context, mod api.Module) uint64 {
|
|
out, err := e.hostServices.TurnCredentials(ctx)
|
|
if err != nil {
|
|
e.logger.Warn("host function turn_credentials failed",
|
|
zap.Error(err))
|
|
return 0
|
|
}
|
|
return e.executor.WriteToGuest(ctx, mod, out)
|
|
}
|
|
|
|
func (e *Engine) hLogInfo(ctx context.Context, mod api.Module, ptr, size uint32) {
|
|
msg, ok := e.executor.ReadFromGuest(mod, ptr, size)
|
|
if ok {
|
|
e.hostServices.LogInfo(ctx, string(msg))
|
|
}
|
|
}
|
|
|
|
func (e *Engine) hLogError(ctx context.Context, mod api.Module, ptr, size uint32) {
|
|
msg, ok := e.executor.ReadFromGuest(mod, ptr, size)
|
|
if ok {
|
|
e.hostServices.LogError(ctx, string(msg))
|
|
}
|
|
}
|