mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-06-17 03:54:12 +00:00
HostFunctions is a process-wide singleton (one per gateway engine). Its `invCtx` field is shared across all WASM instances. For STATELESS execution the executor sets/clears it per-call but the lock is released before WASM runs — two concurrent invocations can race on the field and one's host call can read the other's identity. Window is microseconds. For PERSISTENT WS the bug was much worse: invCtx used to be bound ONCE at instantiation and reused for the connection's lifetime. Two simultaneous persistent WS connections from different namespaces / wallets overwrote each other's invCtx, and EVERY subsequent function_invoke / GetCallerJWTSubject / GetCallerWallet / GetSecret call from inside the WASM read whatever was bound LAST. Result: silent identity leak across tenants for as long as the connections overlapped. Fix: per-call invCtx propagation through Go's context.Context. wazero passes the ctx given to api.Function.Call through to host function callbacks, so every WASM-host hop carries its own invCtx. - pkg/serverless/invocation_context.go (new): WithInvocationContext + InvocationContextFromCtx helpers using an unexported invCtxKey. - pkg/serverless/hostfunctions/invocation_context.go (new): currentInvocationContext(ctx) — ctx-attached invCtx wins over the singleton field. - All host accessors (FunctionInvoke, GetEnv, GetSecret, GetRequestID, GetCallerWallet, GetWSClientID, GetCallerClaim, GetCallerJWTSubject) now route through currentInvocationContext(ctx). - pkg/serverless/persistent/instance.go: every export call's ctx is wrapped with the per-instance invCtx before being passed to wazero. - pkg/gateway/handlers/serverless/ws_persistent_handler.go: invCtx is built per-frame and attached to ctx, not stored on a shared field. - pkg/serverless/engine.go: removed the SetInvocationContext call at InstantiatePersistent (no longer needed; ctx carries it). Stateless still uses the singleton field — its race is latent since the host-functions split and migrating it is a separate scoped change. Tests: - hostfunctions/invocation_context_test.go covers ctx-wins-over-singleton. - gateway/handlers/serverless/ws_persistent_handler_test.go covers the per-frame ctx wiring. - cli/functions/build_test.go is new coverage for the build path touched in this change. VERSION bumped to 0.122.24.
123 lines
4.0 KiB
Go
123 lines
4.0 KiB
Go
package hostfunctions
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"github.com/DeBrosOfficial/network/pkg/pubsub"
|
|
"github.com/DeBrosOfficial/network/pkg/serverless"
|
|
)
|
|
|
|
// PubSubPublish publishes a message to a topic.
|
|
func (h *HostFunctions) PubSubPublish(ctx context.Context, topic string, data []byte) error {
|
|
if h.pubsub == nil {
|
|
return &serverless.HostFunctionError{Function: "pubsub_publish", Cause: fmt.Errorf("pubsub not available")}
|
|
}
|
|
|
|
// The pubsub adapter handles namespacing internally
|
|
if err := h.pubsub.Publish(ctx, topic, data); err != nil {
|
|
return &serverless.HostFunctionError{Function: "pubsub_publish", Cause: err}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// pubSubBatchEntry mirrors the JSON shape accepted by PubSubPublishBatch.
|
|
type pubSubBatchEntry struct {
|
|
Topic string `json:"topic"`
|
|
DataB64 string `json:"data_base64"`
|
|
}
|
|
|
|
// PubSubPublishBatch publishes multiple messages in parallel.
|
|
//
|
|
// Input is JSON: [{"topic":"...","data_base64":"..."}, ...]
|
|
// Up to pubsub.MaxBatchSize entries per call.
|
|
//
|
|
// Default behavior is fail-fast (first publish error is returned). The
|
|
// host function does not currently expose a best-effort flag — WASM
|
|
// callers that need it should call this function multiple times in
|
|
// chunks they're willing to retry independently.
|
|
func (h *HostFunctions) PubSubPublishBatch(ctx context.Context, msgsJSON []byte) error {
|
|
if h.pubsub == nil {
|
|
return &serverless.HostFunctionError{Function: "pubsub_publish_batch", Cause: fmt.Errorf("pubsub not available")}
|
|
}
|
|
|
|
var entries []pubSubBatchEntry
|
|
if err := json.Unmarshal(msgsJSON, &entries); err != nil {
|
|
return &serverless.HostFunctionError{Function: "pubsub_publish_batch", Cause: fmt.Errorf("invalid json: %w", err)}
|
|
}
|
|
if len(entries) == 0 {
|
|
return nil
|
|
}
|
|
if len(entries) > pubsub.MaxBatchSize {
|
|
return &serverless.HostFunctionError{
|
|
Function: "pubsub_publish_batch",
|
|
Cause: fmt.Errorf("too many messages: max %d per batch", pubsub.MaxBatchSize),
|
|
}
|
|
}
|
|
|
|
msgs := make([]pubsub.TopicMessage, 0, len(entries))
|
|
for i, e := range entries {
|
|
if e.Topic == "" {
|
|
return &serverless.HostFunctionError{
|
|
Function: "pubsub_publish_batch",
|
|
Cause: fmt.Errorf("entry %d: empty topic", i),
|
|
}
|
|
}
|
|
data, err := base64.StdEncoding.DecodeString(e.DataB64)
|
|
if err != nil {
|
|
return &serverless.HostFunctionError{
|
|
Function: "pubsub_publish_batch",
|
|
Cause: fmt.Errorf("entry %d (topic %q): bad base64: %w", i, e.Topic, err),
|
|
}
|
|
}
|
|
msgs = append(msgs, pubsub.TopicMessage{Topic: e.Topic, Data: data})
|
|
}
|
|
|
|
if err := h.pubsub.PublishBatch(ctx, msgs, pubsub.PublishBatchOptions{}); err != nil {
|
|
return &serverless.HostFunctionError{Function: "pubsub_publish_batch", Cause: err}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// WSSend sends data to a specific WebSocket client.
|
|
func (h *HostFunctions) WSSend(ctx context.Context, clientID string, data []byte) error {
|
|
if h.wsManager == nil {
|
|
return &serverless.HostFunctionError{Function: "ws_send", Cause: serverless.ErrWSNotAvailable}
|
|
}
|
|
|
|
// If no clientID provided, use the current invocation's client.
|
|
// Reads ctx-attached invCtx first (per-call, race-free for persistent
|
|
// WS) then falls back to the singleton — see invocation_context.go.
|
|
if clientID == "" {
|
|
if cur := h.currentInvocationContext(ctx); cur != nil && cur.WSClientID != "" {
|
|
clientID = cur.WSClientID
|
|
}
|
|
}
|
|
|
|
if clientID == "" {
|
|
return &serverless.HostFunctionError{Function: "ws_send", Cause: serverless.ErrWSNotAvailable}
|
|
}
|
|
|
|
if err := h.wsManager.Send(clientID, data); err != nil {
|
|
return &serverless.HostFunctionError{Function: "ws_send", Cause: err}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// WSBroadcast sends data to all WebSocket clients subscribed to a topic.
|
|
func (h *HostFunctions) WSBroadcast(ctx context.Context, topic string, data []byte) error {
|
|
if h.wsManager == nil {
|
|
return &serverless.HostFunctionError{Function: "ws_broadcast", Cause: serverless.ErrWSNotAvailable}
|
|
}
|
|
|
|
if err := h.wsManager.Broadcast(topic, data); err != nil {
|
|
return &serverless.HostFunctionError{Function: "ws_broadcast", Cause: err}
|
|
}
|
|
|
|
return nil
|
|
}
|