diff --git a/core/pkg/rqlite/batch.go b/core/pkg/rqlite/batch.go index 67f8d4f..60c6054 100644 --- a/core/pkg/rqlite/batch.go +++ b/core/pkg/rqlite/batch.go @@ -236,13 +236,28 @@ func coerceInt64(v interface{}) (int64, error) { // - Rows arrive as []map[string]interface{} just like c.Query — columns // are populated via the rqlite "associative" response shape. func (c *client) BatchQuery(ctx context.Context, ops []BatchOp) ([]OpResult, error) { + return c.BatchQueryConsistency(ctx, ops, ReadConsistencyWeak) +} + +// BatchQueryConsistency is BatchQuery with an explicit read-consistency level. +// +// ReadConsistencyWeak (what BatchQuery passes) routes the batch to the leader +// so every row reflects the latest committed write — at the cost of a leader +// round-trip. ReadConsistencyNone routes to the serving node's LOCAL SQLite +// (~1ms, no leader hop) and is ONLY safe for reads that don't need +// read-your-own-writes freshness — see ReadConsistency and bug #235. +// +// none-level reads run on connNone; if that connection isn't configured the +// batch transparently uses the weak connection (correct, just slower). +func (c *client) BatchQueryConsistency(ctx context.Context, ops []BatchOp, rc ReadConsistency) ([]OpResult, error) { if len(ops) == 0 { return []OpResult{}, nil } if len(ops) > MaxBatchOps { return nil, fmt.Errorf("rqlite.BatchQuery: too many ops (%d > max %d)", len(ops), MaxBatchOps) } - if c.conn == nil { + conn := c.queryConn(rc) + if conn == nil { return nil, fmt.Errorf("rqlite.BatchQuery: native gorqlite connection not configured (use NewClientWithDSN or NewClientWithConn)") } @@ -261,7 +276,7 @@ func (c *client) BatchQuery(ctx context.Context, ops []BatchOp) ([]OpResult, err } } - qrs, err := c.conn.QueryParameterizedContext(ctx, stmts) + qrs, err := conn.QueryParameterizedContext(ctx, stmts) if err != nil { // gorqlite returns a slice of QueryResult even on partial failure; // extract per-op errors if available, else surface the joined err. diff --git a/core/pkg/rqlite/client.go b/core/pkg/rqlite/client.go index 604617c..0cf0645 100644 --- a/core/pkg/rqlite/client.go +++ b/core/pkg/rqlite/client.go @@ -27,14 +27,29 @@ func NewClient(db *sql.DB) Client { // or "https://..."). Both connections share configuration but are independent // HTTP clients. // -// Returns an error if the gorqlite native dial fails. The *sql.DB is not +// It also opens a SECOND native connection pinned to level=none, used by the +// opt-in local-read path (BatchQueryConsistency). gorqlite's consistency level +// is per-connection, not per-query, so a dedicated connection is the only way +// to offer none-level reads without disturbing the default weak reads. +// +// Returns an error if either gorqlite native dial fails. The *sql.DB is not // validated here — callers should already have done that. func NewClientWithDSN(db *sql.DB, dsn string) (Client, error) { conn, err := gorqlite.Open(dsn) if err != nil { return nil, fmt.Errorf("rqlite.NewClientWithDSN: native dial failed: %w", err) } - return &client{db: db, conn: conn}, nil + connNone, err := gorqlite.Open(dsn) + if err != nil { + conn.Close() + return nil, fmt.Errorf("rqlite.NewClientWithDSN: native dial (none-level) failed: %w", err) + } + if err := connNone.SetConsistencyLevel(gorqlite.ConsistencyLevelNone); err != nil { + conn.Close() + connNone.Close() + return nil, fmt.Errorf("rqlite.NewClientWithDSN: pin none consistency: %w", err) + } + return &client{db: db, conn: conn, connNone: connNone}, nil } // NewClientWithConn wires the ORM client when the caller already has a @@ -55,6 +70,49 @@ func NewClientFromAdapter(adapter *RQLiteAdapter) Client { type client struct { db *sql.DB conn *gorqlite.Connection + // connNone is a second native connection pinned to level=none. Used only + // by BatchQueryConsistency(ReadConsistencyNone) for fast LOCAL reads that + // skip the leader hop. nil for clients built without a native connection + // (NewClient) or via NewClientWithConn — in which case none-reads degrade + // to the weak conn (always correct, just slower). + connNone *gorqlite.Connection +} + +// ReadConsistency selects the rqlite read-consistency level for a read path. +// rqlite consistency applies to READS only; writes always traverse Raft. +// +// - ReadConsistencyWeak (default): the serving node forwards the read to the +// leader, so it always observes the latest committed write. On a +// cross-region cluster this costs a full leader round-trip per read +// (feat-6: ~273ms on the Singapore↔leader hop). +// - ReadConsistencyNone: the serving node answers from its LOCAL SQLite +// without contacting the leader (~1ms). It may return a slightly stale +// snapshot when this node is a follower lagging in Raft replay, so it is +// ONLY safe for reads that do not need to observe a write made earlier in +// the same invocation (bug #235). Read-your-own-writes flows must stay on +// weak, or fold the read into a DBTransaction post-commit query. +type ReadConsistency string + +const ( + ReadConsistencyWeak ReadConsistency = "weak" + ReadConsistencyNone ReadConsistency = "none" +) + +// useNoneConn reports whether a read at consistency rc should use the +// dedicated none-level connection. Pure decision split out for unit testing +// without a live rqlite dial. +func useNoneConn(rc ReadConsistency, hasNoneConn bool) bool { + return rc == ReadConsistencyNone && hasNoneConn +} + +// queryConn picks the native connection matching the requested read +// consistency. Returns the weak (leader-routed) connection when none-level is +// not requested or not available; weak is always correct, only slower. +func (c *client) queryConn(rc ReadConsistency) *gorqlite.Connection { + if useNoneConn(rc, c.connNone != nil) { + return c.connNone + } + return c.conn } // Query runs an arbitrary SELECT and scans rows into dest. diff --git a/core/pkg/rqlite/read_consistency_test.go b/core/pkg/rqlite/read_consistency_test.go new file mode 100644 index 0000000..4b0e11a --- /dev/null +++ b/core/pkg/rqlite/read_consistency_test.go @@ -0,0 +1,62 @@ +package rqlite + +import ( + "testing" + + "github.com/rqlite/gorqlite" +) + +// feat-6: opt-in level=none reads remove the cross-region leader hop that weak +// reads pay on every query. These pin the connection-selection logic so a +// none-read can never accidentally route to the leader-bound connection (which +// would silently re-impose the 273ms hop the whole change exists to avoid). + +func TestUseNoneConn(t *testing.T) { + cases := []struct { + name string + rc ReadConsistency + hasNone bool + want bool + }{ + {"none requested + available", ReadConsistencyNone, true, true}, + {"none requested + unavailable", ReadConsistencyNone, false, false}, + {"weak requested + available", ReadConsistencyWeak, true, false}, + {"weak requested + unavailable", ReadConsistencyWeak, false, false}, + {"empty (default) + available", ReadConsistency(""), true, false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := useNoneConn(tc.rc, tc.hasNone); got != tc.want { + t.Errorf("useNoneConn(%q, %v) = %v; want %v", tc.rc, tc.hasNone, got, tc.want) + } + }) + } +} + +func TestQueryConn_selectsNoneConnWhenAvailable(t *testing.T) { + weak := &gorqlite.Connection{} + none := &gorqlite.Connection{} + c := &client{conn: weak, connNone: none} + + if got := c.queryConn(ReadConsistencyNone); got != none { + t.Error("ReadConsistencyNone must select the dedicated none-level connection") + } + if got := c.queryConn(ReadConsistencyWeak); got != weak { + t.Error("ReadConsistencyWeak must select the leader-routed connection") + } + if got := c.queryConn(ReadConsistency("")); got != weak { + t.Error("default (empty) consistency must select the leader-routed connection") + } +} + +func TestQueryConn_degradesToWeakWhenNoneConnAbsent(t *testing.T) { + // NewClientWithConn / NewClient build clients without a none connection. + // A none-read must fall back to the weak conn — always correct, just + // slower — never to a nil connection. + weak := &gorqlite.Connection{} + c := &client{conn: weak, connNone: nil} + + if got := c.queryConn(ReadConsistencyNone); got != weak { + t.Error("none-read must degrade to the weak connection when connNone is nil") + } +} diff --git a/core/pkg/serverless/engine.go b/core/pkg/serverless/engine.go index 4f7b11f..1504be4 100644 --- a/core/pkg/serverless/engine.go +++ b/core/pkg/serverless/engine.go @@ -746,6 +746,7 @@ func (e *Engine) registerHostModule(ctx context.Context) error { NewFunctionBuilder().WithFunc(e.hWSSend).Export("ws_send"). NewFunctionBuilder().WithFunc(e.hWSBroadcast).Export("ws_broadcast"). NewFunctionBuilder().WithFunc(e.hFunctionInvoke).Export("function_invoke"). + NewFunctionBuilder().WithFunc(e.hFunctionInvokeAsync).Export("function_invoke_async"). NewFunctionBuilder().WithFunc(e.hLogInfo).Export("log_info"). NewFunctionBuilder().WithFunc(e.hLogError).Export("log_error"). Instantiate(ctx) @@ -1209,6 +1210,35 @@ func (e *Engine) hFunctionInvoke(ctx context.Context, mod api.Module, return e.executor.WriteToGuest(ctx, mod, out) } +// hFunctionInvokeAsync is the WASM-callable wrapper for FunctionInvokeAsync. +// Fire-and-forget: it dispatches the target function to run concurrently and +// returns immediately so the caller's frame loop isn't blocked on the target's +// I/O. The target inherits the caller's identity (incl. WS client ID) and is +// expected to deliver its own result to the client via ws_send. +// +// Inputs mirror hFunctionInvoke (name + payload pointers). Returns 1 when the +// invocation was ACCEPTED (queued), 0 on a read failure or backpressure +// rejection — the guest can fall back to a synchronous function_invoke or +// surface "busy" to the client. +func (e *Engine) hFunctionInvokeAsync(ctx context.Context, mod api.Module, + namePtr, nameLen, payloadPtr, payloadLen uint32) uint32 { + name, ok := e.executor.ReadFromGuest(mod, namePtr, nameLen) + if !ok { + return 0 + } + payload, ok := e.executor.ReadFromGuest(mod, payloadPtr, payloadLen) + if !ok { + return 0 + } + if err := e.hostServices.FunctionInvokeAsync(ctx, string(name), payload); err != nil { + e.logger.Warn("function_invoke_async rejected", + zap.String("name", string(name)), + zap.Error(err)) + return 0 + } + return 1 +} + // hWSSend is the WASM-callable wrapper for WSSend. // Inputs: clientID + raw frame bytes. clientID may be empty — in that case // the host falls back to the current invocation's WS client (if any). diff --git a/core/pkg/serverless/hostfuncs_test.go b/core/pkg/serverless/hostfuncs_test.go index 817832d..2565398 100644 --- a/core/pkg/serverless/hostfuncs_test.go +++ b/core/pkg/serverless/hostfuncs_test.go @@ -146,6 +146,10 @@ func (m *mockHostServices) FunctionInvoke(ctx context.Context, name string, payl return nil, nil } +func (m *mockHostServices) FunctionInvokeAsync(ctx context.Context, name string, payload []byte) error { + return nil +} + func (m *mockHostServices) HTTPFetch(ctx context.Context, method, url string, headers map[string]string, body []byte) ([]byte, error) { return nil, nil } diff --git a/core/pkg/serverless/hostfunctions/async_invoke_test.go b/core/pkg/serverless/hostfunctions/async_invoke_test.go new file mode 100644 index 0000000..8c3a3aa --- /dev/null +++ b/core/pkg/serverless/hostfunctions/async_invoke_test.go @@ -0,0 +1,176 @@ +package hostfunctions + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/DeBrosOfficial/network/pkg/serverless" + "go.uber.org/zap" +) + +// feat-6 / feat-12: function_invoke_async lets a single stateful dispatcher +// (rpc-router) fan out slow per-RPC handlers WITHOUT freezing its serial frame +// loop. These pin: the target runs with inherited identity, the call returns +// immediately, the payload is copied before return (guest memory is reused), +// missing wiring is rejected, and the in-flight cap applies backpressure. + +// recordingInvoker captures Invoke calls. When blockOn is non-nil the +// goroutine inside Invoke blocks on it — used to hold in-flight slots for the +// backpressure test. +type recordingInvoker struct { + mu sync.Mutex + reqs []*serverless.InvokeRequest + called chan *serverless.InvokeRequest + blockOn chan struct{} +} + +func (r *recordingInvoker) Invoke(ctx context.Context, req *serverless.InvokeRequest) (*serverless.InvokeResponse, error) { + r.mu.Lock() + r.reqs = append(r.reqs, req) + r.mu.Unlock() + if r.called != nil { + r.called <- req + } + if r.blockOn != nil { + <-r.blockOn + } + return &serverless.InvokeResponse{Status: serverless.InvocationStatusSuccess}, nil +} + +func newAsyncHF(inv serverless.FunctionInvoker, semSize int) *HostFunctions { + h := &HostFunctions{logger: zap.NewNop()} + if semSize > 0 { + h.asyncInvokeSem = make(chan struct{}, semSize) + } + if inv != nil { + h.SetInvoker(inv) + } + return h +} + +func asyncCtx() context.Context { + return serverless.WithInvocationContext(context.Background(), &serverless.InvocationContext{ + Namespace: "ns-test", + WSClientID: "client-1", + CallerWallet: "0xwallet", + }) +} + +func TestFunctionInvokeAsync_runsTargetWithInheritedIdentity(t *testing.T) { + inv := &recordingInvoker{called: make(chan *serverless.InvokeRequest, 1)} + h := newAsyncHF(inv, 4) + + if err := h.FunctionInvokeAsync(asyncCtx(), "sync-deltas", []byte(`{"x":1}`)); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + select { + case req := <-inv.called: + if req.FunctionName != "sync-deltas" { + t.Errorf("FunctionName = %q; want sync-deltas", req.FunctionName) + } + if req.Namespace != "ns-test" { + t.Errorf("Namespace = %q; want ns-test", req.Namespace) + } + if req.WSClientID != "client-1" { + t.Errorf("WSClientID = %q; want client-1 (inherited so target can ws_send its own reply)", req.WSClientID) + } + if req.CallerWallet != "0xwallet" { + t.Errorf("CallerWallet = %q; want 0xwallet", req.CallerWallet) + } + if string(req.Input) != `{"x":1}` { + t.Errorf("Input = %q; want {\"x\":1}", req.Input) + } + if req.TriggerType != serverless.TriggerTypeWebSocket { + t.Errorf("TriggerType = %v; want WebSocket", req.TriggerType) + } + case <-time.After(2 * time.Second): + t.Fatal("target was never invoked") + } +} + +func TestFunctionInvokeAsync_noInvokerReturnsError(t *testing.T) { + h := &HostFunctions{logger: zap.NewNop(), asyncInvokeSem: make(chan struct{}, 4)} + if err := h.FunctionInvokeAsync(asyncCtx(), "x", nil); err == nil { + t.Fatal("expected an error when no invoker is wired") + } +} + +func TestFunctionInvokeAsync_noInvocationContextReturnsError(t *testing.T) { + h := newAsyncHF(&recordingInvoker{}, 4) + if err := h.FunctionInvokeAsync(context.Background(), "x", nil); err == nil { + t.Fatal("expected an error when there is no invocation context") + } +} + +func TestFunctionInvokeAsync_backpressureWhenSaturated(t *testing.T) { + block := make(chan struct{}) + inv := &recordingInvoker{called: make(chan *serverless.InvokeRequest, 1), blockOn: block} + h := newAsyncHF(inv, 1) // single in-flight slot + + // First call acquires the only slot; its goroutine blocks inside Invoke. + if err := h.FunctionInvokeAsync(asyncCtx(), "slow", nil); err != nil { + t.Fatalf("first call should be accepted: %v", err) + } + <-inv.called // ensure the goroutine has entered Invoke and is holding the slot + + // Second call: the cap is reached → must be rejected (backpressure). + if err := h.FunctionInvokeAsync(asyncCtx(), "slow2", nil); err == nil { + t.Fatal("expected backpressure rejection when the in-flight cap is reached") + } + + // Release the first invocation so its slot frees and the goroutine exits. + close(block) +} + +func TestFunctionInvokeAsync_slotReclaimedAfterCompletion(t *testing.T) { + // Proves the defer-release returns the slot: with a single-slot cap, a + // second call must succeed once the first target has finished. + inv := &recordingInvoker{called: make(chan *serverless.InvokeRequest, 2)} + h := newAsyncHF(inv, 1) + + if err := h.FunctionInvokeAsync(asyncCtx(), "first", nil); err != nil { + t.Fatalf("first call should be accepted: %v", err) + } + <-inv.called // first target ran (non-blocking invoker) → its slot is freed on return + + // Retry until the deferred release has run (the goroutine releases the + // slot just after Invoke returns; poll briefly to avoid a timing flake). + deadline := time.Now().Add(2 * time.Second) + var err error + for time.Now().Before(deadline) { + if err = h.FunctionInvokeAsync(asyncCtx(), "second", nil); err == nil { + break + } + time.Sleep(5 * time.Millisecond) + } + if err != nil { + t.Fatalf("second call should succeed after the first slot is reclaimed; got %v", err) + } + <-inv.called +} + +func TestFunctionInvokeAsync_copiesPayloadBeforeReturn(t *testing.T) { + inv := &recordingInvoker{called: make(chan *serverless.InvokeRequest, 1)} + h := newAsyncHF(inv, 4) + + payload := []byte("original") + if err := h.FunctionInvokeAsync(asyncCtx(), "x", payload); err != nil { + t.Fatalf("unexpected error: %v", err) + } + // Simulate the guest reusing its memory the instant the host call returns. + for i := range payload { + payload[i] = 'X' + } + + select { + case req := <-inv.called: + if string(req.Input) != "original" { + t.Errorf("payload was not copied before return; target saw %q (guest-memory reuse corrupted it)", req.Input) + } + case <-time.After(2 * time.Second): + t.Fatal("target was never invoked") + } +} diff --git a/core/pkg/serverless/hostfunctions/context.go b/core/pkg/serverless/hostfunctions/context.go index c997661..18a9422 100644 --- a/core/pkg/serverless/hostfunctions/context.go +++ b/core/pkg/serverless/hostfunctions/context.go @@ -2,11 +2,28 @@ package hostfunctions import ( "context" + "fmt" + "time" "github.com/DeBrosOfficial/network/pkg/serverless" "github.com/DeBrosOfficial/network/pkg/serverless/triggers" + "go.uber.org/zap" ) +// asyncInvokeMaxInFlight bounds concurrently-running FunctionInvokeAsync +// goroutines across the whole gateway. Each async invocation still passes +// through the engine's own execution semaphore; this cap bounds the GOROUTINES +// so a client flooding WS frames can't spawn an unbounded number of pending +// invocations. When hit, FunctionInvokeAsync rejects so the guest applies +// backpressure (e.g. falls back to a synchronous invoke or returns "busy"). +const asyncInvokeMaxInFlight = 256 + +// asyncInvokeTimeout bounds a single async invocation. Detached from the frame +// ctx (which is cancelled when ws_frame returns), so it carries its own +// deadline — generous enough for cross-region work, tight enough that a stuck +// invocation eventually frees its in-flight slot. +const asyncInvokeTimeout = 30 * time.Second + // SetInvocationContext sets the current invocation context on the // singleton field. STATELESS execution path uses this (paired with // ClearContext) for per-call binding via the executor's setter/clearer @@ -115,6 +132,99 @@ func (h *HostFunctions) FunctionInvoke(ctx context.Context, name string, payload return resp.Output, nil } +// FunctionInvokeAsync runs another function in the same namespace CONCURRENTLY +// and returns immediately, WITHOUT blocking the caller or returning the +// target's output. It exists so a persistent dispatcher (rpc-router) — a +// single stateful instance that must process frames serially — can fan out +// slow per-RPC handlers without freezing its frame loop for the full +// (cross-region) duration of each one. The handlers run in the engine's +// execution pool and deliver their own results to the client via ws_send +// (they inherit the same WS client ID). +// +// The target inherits the caller's identity exactly like FunctionInvoke. +// Returns an error only when the call can't be ACCEPTED: no invoker wired, no +// invocation context, or the in-flight cap is reached (backpressure). Failures +// INSIDE the target are not reported here — they surface via the target's own +// logging / ws_send, because the caller has already moved on. +func (h *HostFunctions) FunctionInvokeAsync(ctx context.Context, name string, payload []byte) error { + h.invokerLock.RLock() + inv := h.invoker + h.invokerLock.RUnlock() + if inv == nil { + return &serverless.HostFunctionError{ + Function: "function_invoke_async", + Cause: serverless.ErrFunctionInvokeNotAvailable, + } + } + + cur := h.currentInvocationContext(ctx) + if cur == nil { + return &serverless.HostFunctionError{ + Function: "function_invoke_async", + Cause: serverless.ErrFunctionInvokeNotAvailable, + } + } + + // Bound in-flight goroutines. nil sem = bare test construction → unbounded + // (production always builds it in NewHostFunctions). A full channel means + // we're saturated; reject so the guest applies backpressure. + if h.asyncInvokeSem != nil { + select { + case h.asyncInvokeSem <- struct{}{}: + default: + return &serverless.HostFunctionError{ + Function: "function_invoke_async", + Cause: fmt.Errorf("too many in-flight async invocations (max %d)", asyncInvokeMaxInFlight), + } + } + } + + // Copy identity AND payload before returning: the invocation context can + // be swapped (auth.refresh) and `payload` is a VIEW into guest memory that + // the next frame may overwrite — the goroutine outlives this call, so it + // must own its inputs. + // + // The struct copy is shallow: snapshot.CallerClaims / EnvVars share the + // source maps. That is safe because an InvocationContext's maps are + // immutable after construction (auth.refresh swaps the whole pointer via + // UpdateInvocationContext rather than mutating in place); no code writes + // these maps on a live context. Keep that invariant if you touch the + // refresh path, or clone the maps here. + snapshot := *cur + payloadCopy := make([]byte, len(payload)) + copy(payloadCopy, payload) + logger := h.logger + + go func() { + if h.asyncInvokeSem != nil { + defer func() { <-h.asyncInvokeSem }() + } + bgCtx := serverless.WithInvocationContext(context.Background(), &snapshot) + bgCtx, cancel := context.WithTimeout(bgCtx, asyncInvokeTimeout) + defer cancel() + + req := &serverless.InvokeRequest{ + Namespace: snapshot.Namespace, + FunctionName: name, + Input: payloadCopy, + TriggerType: serverless.TriggerTypeWebSocket, + CallerWallet: snapshot.CallerWallet, + CallerIP: snapshot.CallerIP, + WSClientID: snapshot.WSClientID, + CallerClaims: snapshot.CallerClaims, + CallerJWTSubject: snapshot.CallerJWTSubject, + TriggerDepth: snapshot.TriggerDepth, + } + if _, err := inv.Invoke(bgCtx, req); err != nil && logger != nil { + logger.Warn("function_invoke_async target failed", + zap.String("name", name), + zap.String("namespace", snapshot.Namespace), + zap.Error(err)) + } + }() + return nil +} + // GetEnv retrieves an environment variable for the function. func (h *HostFunctions) GetEnv(ctx context.Context, key string) (string, error) { cur := h.currentInvocationContext(ctx) diff --git a/core/pkg/serverless/hostfunctions/database.go b/core/pkg/serverless/hostfunctions/database.go index 80e0fd1..4ec4f53 100644 --- a/core/pkg/serverless/hostfunctions/database.go +++ b/core/pkg/serverless/hostfunctions/database.go @@ -191,6 +191,39 @@ func (h *HostFunctions) DBTransaction(ctx context.Context, opsJSON []byte) ([]by // rqlite layer. type dbQueryBatchRequest struct { Ops []rqlite.BatchOp `json:"ops"` + // Consistency is the optional rqlite read level for this batch. + // "" / "weak" (default): leader-routed, always fresh. "none": fast LOCAL + // read (~1ms, no leader hop) — ONLY safe for reads that don't need + // read-your-own-writes freshness (see rqlite.ReadConsistency / bug #235). + // feat-6: lets read-heavy functions skip the cross-region weak-read hop. + Consistency string `json:"consistency,omitempty"` +} + +// batchQueryConsistencyClient is the optional capability a Client exposes when +// it can serve reads at an explicit consistency level. The production +// *rqlite.client implements it; bare test mocks don't. Kept OFF the +// rqlite.Client interface so the none-read path doesn't churn every mock. +type batchQueryConsistencyClient interface { + BatchQueryConsistency(ctx context.Context, ops []rqlite.BatchOp, rc rqlite.ReadConsistency) ([]rqlite.OpResult, error) +} + +// resolveBatchQuery runs the batched read at the requested consistency. +// Empty or "weak" → the default leader-routed read. "none" → a fast local read +// via the consistency-capable client (degrading to weak only when the client +// can't serve an explicit level — weak is always correct). Unknown values are +// rejected here at the boundary rather than silently downgraded. +func (h *HostFunctions) resolveBatchQuery(ctx context.Context, ops []rqlite.BatchOp, consistency string) ([]rqlite.OpResult, error) { + switch consistency { + case "", string(rqlite.ReadConsistencyWeak): + return h.db.BatchQuery(ctx, ops) + case string(rqlite.ReadConsistencyNone): + if ext, ok := h.db.(batchQueryConsistencyClient); ok { + return ext.BatchQueryConsistency(ctx, ops, rqlite.ReadConsistencyNone) + } + return h.db.BatchQuery(ctx, ops) + default: + return nil, fmt.Errorf("invalid consistency %q (allowed: \"none\", \"weak\")", consistency) + } } // dbQueryBatchResult is the JSON wire shape returned to WASM callers. @@ -258,7 +291,7 @@ func (h *HostFunctions) DBQueryBatch(ctx context.Context, opsJSON []byte) ([]byt batchCtx, cancel := context.WithTimeout(ctx, dbQueryBatchTimeout) defer cancel() - results, err := h.db.BatchQuery(batchCtx, req.Ops) + results, err := h.resolveBatchQuery(batchCtx, req.Ops, req.Consistency) if err != nil { return nil, &serverless.HostFunctionError{Function: "db_query_batch", Cause: err} } diff --git a/core/pkg/serverless/hostfunctions/db_consistency_test.go b/core/pkg/serverless/hostfunctions/db_consistency_test.go new file mode 100644 index 0000000..996fa7a --- /dev/null +++ b/core/pkg/serverless/hostfunctions/db_consistency_test.go @@ -0,0 +1,127 @@ +package hostfunctions + +import ( + "context" + "testing" + + "github.com/DeBrosOfficial/network/pkg/rqlite" +) + +// feat-6: DBQueryBatch gained an opt-in "consistency":"none" field so +// read-heavy functions can skip the cross-region leader hop. These pin the +// routing: "none" must reach the consistency-capable path, the default must +// stay on the always-fresh leader read, an incapable client must degrade +// safely, and an unknown value must be rejected at the boundary (never +// silently downgraded). + +// consistencyAwareClient implements BatchQuery AND the optional +// BatchQueryConsistency capability, recording which path was taken. +type consistencyAwareClient struct { + rqlite.Client + batchQueryCalls int + consistencyCalls int + lastConsistency rqlite.ReadConsistency +} + +func (c *consistencyAwareClient) BatchQuery(ctx context.Context, ops []rqlite.BatchOp) ([]rqlite.OpResult, error) { + c.batchQueryCalls++ + return []rqlite.OpResult{}, nil +} + +func (c *consistencyAwareClient) BatchQueryConsistency(ctx context.Context, ops []rqlite.BatchOp, rc rqlite.ReadConsistency) ([]rqlite.OpResult, error) { + c.consistencyCalls++ + c.lastConsistency = rc + return []rqlite.OpResult{}, nil +} + +// weakOnlyClient implements only BatchQuery (no consistency capability), so a +// none-read must degrade to the leader-routed BatchQuery rather than failing. +type weakOnlyClient struct { + rqlite.Client + batchQueryCalls int +} + +func (w *weakOnlyClient) BatchQuery(ctx context.Context, ops []rqlite.BatchOp) ([]rqlite.OpResult, error) { + w.batchQueryCalls++ + return []rqlite.OpResult{}, nil +} + +func TestResolveBatchQuery_noneRoutesToConsistencyPath(t *testing.T) { + fake := &consistencyAwareClient{} + h := newHFWithDB(fake) + + if _, err := h.resolveBatchQuery(context.Background(), []rqlite.BatchOp{{Kind: rqlite.BatchOpQuery, SQL: "SELECT 1"}}, "none"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if fake.consistencyCalls != 1 || fake.batchQueryCalls != 0 { + t.Fatalf("none must route to BatchQueryConsistency; got consistency=%d weak=%d", fake.consistencyCalls, fake.batchQueryCalls) + } + if fake.lastConsistency != rqlite.ReadConsistencyNone { + t.Errorf("expected ReadConsistencyNone, got %q", fake.lastConsistency) + } +} + +func TestResolveBatchQuery_defaultAndWeakUseLeaderRoutedRead(t *testing.T) { + for _, consistency := range []string{"", "weak"} { + fake := &consistencyAwareClient{} + h := newHFWithDB(fake) + if _, err := h.resolveBatchQuery(context.Background(), nil, consistency); err != nil { + t.Fatalf("consistency=%q unexpected error: %v", consistency, err) + } + if fake.batchQueryCalls != 1 || fake.consistencyCalls != 0 { + t.Errorf("consistency=%q must use weak BatchQuery; got weak=%d consistency=%d", + consistency, fake.batchQueryCalls, fake.consistencyCalls) + } + } +} + +func TestResolveBatchQuery_noneDegradesWhenClientLacksCapability(t *testing.T) { + fake := &weakOnlyClient{} + h := newHFWithDB(fake) + + if _, err := h.resolveBatchQuery(context.Background(), nil, "none"); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if fake.batchQueryCalls != 1 { + t.Errorf("none must degrade to BatchQuery when capability absent; got %d calls", fake.batchQueryCalls) + } +} + +func TestResolveBatchQuery_invalidConsistencyRejected(t *testing.T) { + fake := &consistencyAwareClient{} + h := newHFWithDB(fake) + + _, err := h.resolveBatchQuery(context.Background(), nil, "bogus") + if err == nil { + t.Fatal("invalid consistency must return an error, not silently downgrade") + } + if fake.batchQueryCalls != 0 || fake.consistencyCalls != 0 { + t.Error("invalid consistency must not run any query") + } +} + +func TestDBQueryBatch_consistencyNoneRoutesLocal(t *testing.T) { + fake := &consistencyAwareClient{} + h := newHFWithDB(fake) + + in := []byte(`{"consistency":"none","ops":[{"sql":"SELECT 1"}]}`) + if _, err := h.DBQueryBatch(context.Background(), in); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if fake.consistencyCalls != 1 { + t.Errorf("DBQueryBatch with consistency=none must route to the local read; got %d", fake.consistencyCalls) + } + if fake.lastConsistency != rqlite.ReadConsistencyNone { + t.Errorf("expected ReadConsistencyNone, got %q", fake.lastConsistency) + } +} + +func TestDBQueryBatch_invalidConsistencyErrors(t *testing.T) { + fake := &consistencyAwareClient{} + h := newHFWithDB(fake) + + in := []byte(`{"consistency":"bogus","ops":[{"sql":"SELECT 1"}]}`) + if _, err := h.DBQueryBatch(context.Background(), in); err == nil { + t.Fatal("DBQueryBatch must reject an unknown consistency value") + } +} diff --git a/core/pkg/serverless/hostfunctions/host_services.go b/core/pkg/serverless/hostfunctions/host_services.go index 281d116..2b25cd9 100644 --- a/core/pkg/serverless/hostfunctions/host_services.go +++ b/core/pkg/serverless/hostfunctions/host_services.go @@ -75,5 +75,6 @@ func NewHostFunctions( httpClient: tlsutil.NewHTTPClient(httpTimeout), logger: logger, logs: make([]serverless.LogEntry, 0), + asyncInvokeSem: make(chan struct{}, asyncInvokeMaxInFlight), } } diff --git a/core/pkg/serverless/hostfunctions/types.go b/core/pkg/serverless/hostfunctions/types.go index 347d112..b082ee3 100644 --- a/core/pkg/serverless/hostfunctions/types.go +++ b/core/pkg/serverless/hostfunctions/types.go @@ -69,6 +69,15 @@ type HostFunctions struct { invoker serverless.FunctionInvoker invokerLock sync.RWMutex + // asyncInvokeSem bounds the number of concurrently-running + // FunctionInvokeAsync goroutines across the gateway. A buffered channel + // used as a counting semaphore: a slot is taken before spawning and + // released when the goroutine finishes. When full, FunctionInvokeAsync + // rejects (backpressure to the guest) instead of spawning unbounded + // goroutines under a frame flood. Built in NewHostFunctions; nil only in + // bare test construction (treated as unbounded there). + asyncInvokeSem chan struct{} + // TURN config — feat-9. Cached at NewHostFunctions; immutable for // the gateway's lifetime so no lock needed. Empty TURNSecret means // `turn_credentials` host fn returns a configured=false envelope diff --git a/core/pkg/serverless/mocks_test.go b/core/pkg/serverless/mocks_test.go index a834667..60ad60f 100644 --- a/core/pkg/serverless/mocks_test.go +++ b/core/pkg/serverless/mocks_test.go @@ -259,6 +259,10 @@ func (m *MockHostServices) FunctionInvoke(ctx context.Context, name string, payl return nil, nil } +func (m *MockHostServices) FunctionInvokeAsync(ctx context.Context, name string, payload []byte) error { + return nil +} + func (m *MockHostServices) HTTPFetch(ctx context.Context, method, url string, headers map[string]string, body []byte) ([]byte, error) { return nil, nil } diff --git a/core/pkg/serverless/types.go b/core/pkg/serverless/types.go index b68c35e..ec384b9 100644 --- a/core/pkg/serverless/types.go +++ b/core/pkg/serverless/types.go @@ -572,6 +572,22 @@ type HostServices interface { // rpc_error to the client. FunctionInvoke(ctx context.Context, name string, payload []byte) ([]byte, error) + // FunctionInvokeAsync invokes another function in the same namespace + // CONCURRENTLY and returns immediately — it does NOT wait for or return + // the target's output. The target runs in the engine's execution pool + // inheriting the caller's identity (wallet, JWT claims, WS client ID), + // and is expected to deliver any result to the client itself via ws_send + // (it has the same WS client ID). + // + // This is the non-blocking counterpart to FunctionInvoke, for a + // persistent dispatcher (rpc-router) that must not freeze its single + // stateful instance for the full duration of a slow target invocation. + // Returns an error only when the invocation could not be ACCEPTED (no + // invoker wired, no invocation context, or in-flight cap reached) — not + // for failures inside the target, which surface via the target's own + // logging/ws_send. + FunctionInvokeAsync(ctx context.Context, name string, payload []byte) error + // HTTP operations HTTPFetch(ctx context.Context, method, url string, headers map[string]string, body []byte) ([]byte, error)