diff --git a/core/pkg/deployments/port_allocator_test.go b/core/pkg/deployments/port_allocator_test.go index 674130e..d69b0c9 100644 --- a/core/pkg/deployments/port_allocator_test.go +++ b/core/pkg/deployments/port_allocator_test.go @@ -158,6 +158,14 @@ func (m *mockRQLiteClient) BatchWithSeq(ctx context.Context, namespace string, o return res, 1, err } +func (m *mockRQLiteClient) BatchQuery(ctx context.Context, ops []rqlite.BatchOp) ([]rqlite.OpResult, error) { + out := make([]rqlite.OpResult, len(ops)) + for i := range ops { + out[i] = rqlite.OpResult{Kind: rqlite.BatchOpQuery} + } + return out, nil +} + func TestPortAllocator_AllocatePort(t *testing.T) { logger := zap.NewNop() mockDB := newMockRQLiteClient() diff --git a/core/pkg/gateway/dependencies.go b/core/pkg/gateway/dependencies.go index b0f4081..6820aa2 100644 --- a/core/pkg/gateway/dependencies.go +++ b/core/pkg/gateway/dependencies.go @@ -558,10 +558,15 @@ func initializeServerless(logger *logging.ColoredLogger, cfg *Config, deps *Depe if deps.OlricClient != nil { olricUnderlying = deps.OlricClient.UnderlyingClient() } + // Pass the pubsub adapter so the dispatcher can subscribe to libp2p + // for every literal trigger pattern (bugboard #282 fix). nil-safe: + // dispatcher's Start/Refresh become no-ops when adapter is unavailable, + // preserving the legacy HTTP-only Dispatch hook. deps.PubSubDispatcher = triggers.NewPubSubDispatcher( triggerStore, deps.ServerlessInvoker, olricUnderlying, + pubsubAdapter, logger.Logger, ) diff --git a/core/pkg/gateway/gateway.go b/core/pkg/gateway/gateway.go index 29650e4..0d38d4f 100644 --- a/core/pkg/gateway/gateway.go +++ b/core/pkg/gateway/gateway.go @@ -360,6 +360,17 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { gw.pubsubHandlers.SetOnPublish(func(ctx context.Context, namespace, topic string, data []byte) { deps.PubSubDispatcher.Dispatch(ctx, namespace, topic, data, 0) }) + // Subscribe the dispatcher to libp2p pubsub for every literal + // trigger pattern so WASM `oh.PubSubPublish` calls reach trigger + // handlers (bugboard #282 — pre-fix, the dispatcher only fired + // from the HTTP publish hook above, so internal WASM publishes + // silently dropped every subscriber). Stop is called from + // lifecycle.Close. + if err := deps.PubSubDispatcher.Start(context.Background()); err != nil { + logger.ComponentWarn(logging.ComponentGeneral, + "PubSubDispatcher Start failed (libp2p subscribe path disabled — HTTP-publish triggers still work)", + zap.Error(err)) + } } if deps.PersistentWSManager != nil { gw.persistentWSManager = deps.PersistentWSManager diff --git a/core/pkg/gateway/handlers/deployments/mocks_test.go b/core/pkg/gateway/handlers/deployments/mocks_test.go index eb81040..a3e0d47 100644 --- a/core/pkg/gateway/handlers/deployments/mocks_test.go +++ b/core/pkg/gateway/handlers/deployments/mocks_test.go @@ -171,6 +171,14 @@ func (m *mockRQLiteClient) BatchWithSeq(ctx context.Context, namespace string, o return res, 1, err } +func (m *mockRQLiteClient) BatchQuery(ctx context.Context, ops []rqlite.BatchOp) ([]rqlite.OpResult, error) { + out := make([]rqlite.OpResult, len(ops)) + for i := range ops { + out[i] = rqlite.OpResult{Kind: rqlite.BatchOpQuery} + } + return out, nil +} + // mockProcessManager implements a mock process manager for testing type mockProcessManager struct { StartFunc func(ctx context.Context, deployment *deployments.Deployment, workDir string) error diff --git a/core/pkg/gateway/handlers/serverless/deploy_handler.go b/core/pkg/gateway/handlers/serverless/deploy_handler.go index 505e832..6048eb4 100644 --- a/core/pkg/gateway/handlers/serverless/deploy_handler.go +++ b/core/pkg/gateway/handlers/serverless/deploy_handler.go @@ -171,6 +171,16 @@ func (h *ServerlessHandlers) DeployFunction(w http.ResponseWriter, r *http.Reque h.dispatcher.InvalidateCache(ctx, def.Namespace, topic) } } + // One Refresh after the batch — subscribes the dispatcher to libp2p + // for every newly-added literal topic so WASM publishes from other + // functions trigger this handler (bugboard #282). The periodic + // refresh loop catches the rare add we miss here. + if h.dispatcher != nil { + if rerr := h.dispatcher.Refresh(ctx); rerr != nil { + h.logger.Warn("PubSubDispatcher Refresh after deploy auto-register failed (periodic loop will retry)", + zap.Error(rerr)) + } + } } // Register Cron triggers from definition. Mirrors the PubSub branch above: diff --git a/core/pkg/gateway/handlers/serverless/trigger_handler.go b/core/pkg/gateway/handlers/serverless/trigger_handler.go index 7e6094f..823b1d6 100644 --- a/core/pkg/gateway/handlers/serverless/trigger_handler.go +++ b/core/pkg/gateway/handlers/serverless/trigger_handler.go @@ -98,6 +98,16 @@ func (h *ServerlessHandlers) HandleAddTrigger(w http.ResponseWriter, r *http.Req return } if h.dispatcher != nil { + // Refresh subscribes the dispatcher to libp2p for this newly-added + // trigger's topic so future WASM publishes reach the handler + // (bugboard #282). Best-effort — Refresh failures are logged + // inside; the periodic refresh loop will retry within 60s. + if rerr := h.dispatcher.Refresh(ctx); rerr != nil { + h.logger.Warn("PubSubDispatcher Refresh after trigger add failed (periodic loop will retry)", + zap.Error(rerr)) + } + // Legacy no-op — kept for back-compat with anything still + // calling it; can be removed in a future cleanup. h.dispatcher.InvalidateCache(ctx, namespace, req.Topic) } h.logger.Info("PubSub trigger added via API", @@ -230,6 +240,12 @@ func (h *ServerlessHandlers) HandleDeleteTrigger(w http.ResponseWriter, r *http. return } if h.dispatcher != nil { + // Refresh prunes the dispatcher's libp2p subscription if this + // was the last trigger on that topic (bugboard #282). + if rerr := h.dispatcher.Refresh(ctx); rerr != nil { + h.logger.Warn("PubSubDispatcher Refresh after trigger remove failed (periodic loop will retry)", + zap.Error(rerr)) + } h.dispatcher.InvalidateCache(ctx, namespace, triggerTopic) } h.logger.Info("PubSub trigger removed via API", diff --git a/core/pkg/gateway/handlers/sqlite/handlers_test.go b/core/pkg/gateway/handlers/sqlite/handlers_test.go index 0c96341..06fe7bd 100644 --- a/core/pkg/gateway/handlers/sqlite/handlers_test.go +++ b/core/pkg/gateway/handlers/sqlite/handlers_test.go @@ -107,6 +107,14 @@ func (m *mockRQLiteClient) BatchWithSeq(ctx context.Context, namespace string, o return res, 1, err } +func (m *mockRQLiteClient) BatchQuery(ctx context.Context, ops []rqlite.BatchOp) ([]rqlite.OpResult, error) { + out := make([]rqlite.OpResult, len(ops)) + for i := range ops { + out[i] = rqlite.OpResult{Kind: rqlite.BatchOpQuery} + } + return out, nil +} + type mockIPFSClient struct { AddFunc func(ctx context.Context, r io.Reader, filename string) (*ipfs.AddResponse, error) AddDirectoryFunc func(ctx context.Context, dirPath string) (*ipfs.AddResponse, error) diff --git a/core/pkg/gateway/lifecycle.go b/core/pkg/gateway/lifecycle.go index 9380a16..356e738 100644 --- a/core/pkg/gateway/lifecycle.go +++ b/core/pkg/gateway/lifecycle.go @@ -36,6 +36,12 @@ func (g *Gateway) Close() { g.cronScheduler.Stop() } + // Stop the pubsub dispatcher's periodic refresh goroutine. libp2p + // subscriptions die naturally with the client teardown below. + if g.pubsubDispatcher != nil { + g.pubsubDispatcher.Stop() + } + // Drain persistent WebSocket instances. Each instance gets a slice of // the 30s budget; ws_close on each is best-effort. if g.persistentWSManager != nil { diff --git a/core/pkg/namespace/cluster_recovery_test.go b/core/pkg/namespace/cluster_recovery_test.go index e67b33a..5c685d8 100644 --- a/core/pkg/namespace/cluster_recovery_test.go +++ b/core/pkg/namespace/cluster_recovery_test.go @@ -79,6 +79,13 @@ func (m *recoveryMockDB) BatchWithSeq(_ context.Context, _ string, ops []rqlite. res, _ := m.Batch(context.Background(), ops) return res, 1, nil } +func (m *recoveryMockDB) BatchQuery(_ context.Context, ops []rqlite.BatchOp) ([]rqlite.OpResult, error) { + out := make([]rqlite.OpResult, len(ops)) + for i := range ops { + out[i] = rqlite.OpResult{Kind: rqlite.BatchOpQuery} + } + return out, nil +} var _ rqlite.Client = (*recoveryMockDB)(nil) diff --git a/core/pkg/namespace/port_allocator_test.go b/core/pkg/namespace/port_allocator_test.go index 27d4763..0b1074b 100644 --- a/core/pkg/namespace/port_allocator_test.go +++ b/core/pkg/namespace/port_allocator_test.go @@ -106,6 +106,14 @@ func (m *mockRQLiteClient) BatchWithSeq(ctx context.Context, namespace string, o return res, 1, err } +func (m *mockRQLiteClient) BatchQuery(ctx context.Context, ops []rqlite.BatchOp) ([]rqlite.OpResult, error) { + out := make([]rqlite.OpResult, len(ops)) + for i := range ops { + out[i] = rqlite.OpResult{Kind: rqlite.BatchOpQuery} + } + return out, nil +} + // Ensure mockRQLiteClient implements rqlite.Client var _ rqlite.Client = (*mockRQLiteClient)(nil) diff --git a/core/pkg/rqlite/batch.go b/core/pkg/rqlite/batch.go index d968d0a..67f8d4f 100644 --- a/core/pkg/rqlite/batch.go +++ b/core/pkg/rqlite/batch.go @@ -59,6 +59,21 @@ type BatchResult struct { // 100 is plenty for any realistic transactional unit of work. const MaxBatchOps = 100 +// MaxBatchQueryRowsPerOp caps the row count returned per query in a +// BatchQuery result. Without this, a malicious or buggy WASM function +// could OOM the gateway by submitting `SELECT * FROM ` and +// having every row materialized into a Go map. 10000 rows fits comfortably +// in memory even when multiplied by MaxBatchOps; functions that legitimately +// need more should paginate. +const MaxBatchQueryRowsPerOp = 10000 + +// MaxBatchQueryTotalBytes caps the aggregate JSON-encoded size of all +// BatchQuery results across all ops. Defense in depth against the same +// OOM vector as MaxBatchQueryRowsPerOp — a single op could have 5000 +// rows × 20KB each = 100MB and still be under the per-op count cap. +// 32 MiB matches the WASM module memory ceiling order-of-magnitude. +const MaxBatchQueryTotalBytes = 32 * 1024 * 1024 + // BatchWithSeq executes the user's ops atomically AND, in the same atomic // batch, increments the per-namespace publish sequence counter so the caller // can attach the assigned seq to a follow-up wake-up message. @@ -200,6 +215,164 @@ func coerceInt64(v interface{}) (int64, error) { } } +// BatchQuery runs N SELECT statements in a single HTTP request to RQLite's +// /db/query endpoint via the native gorqlite Connection, returning one +// OpResult per input op in the original order. +// +// Why this exists: c.Query (sql.DB path) sends ONE statement per HTTP call, +// paying a full leader round-trip each time. For functions that gather state +// from many tables before doing work (e.g. anchat's message-create gathers +// auth + participants + devices = 7-10 reads), the per-call RTT dominates — +// 10 sequential reads on devnet's cross-region cluster take ~3.5s vs ~330ms +// for the batched form. See bugboard #270 for the workload measurement. +// +// Semantics: +// - All ops MUST be Kind=BatchOpQuery. Exec ops error out at validation. +// - All N statements are sent in one POST to /db/query with level=weak, +// so they all run on the leader and see the same committed snapshot. +// - Per-op errors are reported in OpResult.Error (one entry per input, +// same order). The whole call only returns a Go error on transport +// failures (network, leader unreachable, JSON malformed) or validation. +// - Rows arrive as []map[string]interface{} just like c.Query — columns +// are populated via the rqlite "associative" response shape. +func (c *client) BatchQuery(ctx context.Context, ops []BatchOp) ([]OpResult, error) { + if len(ops) == 0 { + return []OpResult{}, nil + } + if len(ops) > MaxBatchOps { + return nil, fmt.Errorf("rqlite.BatchQuery: too many ops (%d > max %d)", len(ops), MaxBatchOps) + } + if c.conn == nil { + return nil, fmt.Errorf("rqlite.BatchQuery: native gorqlite connection not configured (use NewClientWithDSN or NewClientWithConn)") + } + + // Validate up-front: callers must use BatchOpQuery for every entry. + // Mixing in an Exec would be a footgun (it'd silently be skipped or + // trigger an unrelated error from the query endpoint), so reject loud. + stmts := make([]gorqlite.ParameterizedStatement, len(ops)) + for i, op := range ops { + if op.Kind != BatchOpQuery { + return nil, fmt.Errorf("rqlite.BatchQuery: op %d has kind %q (only %q allowed; use Batch for mixed exec/query)", + i, op.Kind, BatchOpQuery) + } + stmts[i] = gorqlite.ParameterizedStatement{ + Query: op.SQL, + Arguments: op.Args, + } + } + + qrs, err := c.conn.QueryParameterizedContext(ctx, stmts) + if err != nil { + // gorqlite returns a slice of QueryResult even on partial failure; + // extract per-op errors if available, else surface the joined err. + if len(qrs) == 0 { + return nil, fmt.Errorf("rqlite.BatchQuery: %w", err) + } + // Fall through to map qrs → OpResults; per-op errors are in qr.Err. + } + + // Track aggregate result size across all ops as a defense-in-depth + // OOM guard. If a single op stays under MaxBatchQueryRowsPerOp but + // the SUM across ops still grows pathologically large, this cap + // trips and the remaining ops surface an error rather than blowing + // the gateway's heap. + var totalBytes int + out := make([]OpResult, len(ops)) + for i, qr := range qrs { + if totalBytes >= MaxBatchQueryTotalBytes { + out[i] = OpResult{ + Kind: BatchOpQuery, + Error: fmt.Sprintf("rqlite.BatchQuery: aggregate result bytes exceeded cap (%d) — earlier ops consumed the budget; this op result truncated", + MaxBatchQueryTotalBytes), + } + continue + } + opRes := queryResultToOpResult(qr) + totalBytes += estimateOpResultBytes(opRes) + out[i] = opRes + } + // If fewer results returned than ops requested (shouldn't happen per + // gorqlite contract), pad with errors so caller indexing matches input. + for i := len(qrs); i < len(ops); i++ { + out[i] = OpResult{ + Kind: BatchOpQuery, + Error: "rqlite.BatchQuery: no result returned for op " + fmt.Sprint(i), + } + } + return out, nil +} + +// estimateOpResultBytes is a cheap approximation of the JSON-encoded +// size of an OpResult, used only for the aggregate-bytes cap in +// BatchQuery. Doesn't have to be exact — overestimating is safer than +// underestimating, since the cap is a DoS guard, not a billing meter. +func estimateOpResultBytes(r OpResult) int { + // Per-row overhead: ~32 bytes for JSON braces + commas + key wrappers. + // Per-cell: key length (assume 16) + value bytes. + const perRowOverhead = 32 + const perCellOverhead = 16 + total := len(r.Error) + perRowOverhead + for _, row := range r.Rows { + total += perRowOverhead + for k, v := range row { + total += len(k) + perCellOverhead + switch x := v.(type) { + case string: + total += len(x) + case []byte: + total += len(x) + default: + // numerics, bools, nil — bounded constants, count as 16. + total += 16 + } + } + } + return total +} + +// queryResultToOpResult converts a single gorqlite.QueryResult into our +// OpResult wire shape, including row materialization via the associative +// API. Per-op errors are surfaced via OpResult.Error. +// +// Enforces MaxBatchQueryRowsPerOp as a DoS guard — a single op returning +// more rows is truncated and Error is set so the WASM caller can decide +// whether to paginate or treat it as fatal. Without this guard a malicious +// `SELECT * FROM ` could OOM the gateway. +func queryResultToOpResult(qr gorqlite.QueryResult) OpResult { + if qr.Err != nil { + return OpResult{ + Kind: BatchOpQuery, + Error: qr.Err.Error(), + } + } + // Materialize all rows as map[string]interface{} via the associative + // iterator — matches how c.Query consumers expect rows to look. + var rows []map[string]interface{} + for qr.Next() { + if len(rows) >= MaxBatchQueryRowsPerOp { + return OpResult{ + Kind: BatchOpQuery, + Rows: rows, + Error: fmt.Sprintf("rqlite.BatchQuery: row cap exceeded (%d) — paginate via LIMIT/OFFSET", + MaxBatchQueryRowsPerOp), + } + } + row, mapErr := qr.Map() + if mapErr != nil { + return OpResult{ + Kind: BatchOpQuery, + Rows: rows, + Error: "rqlite.BatchQuery: row map: " + mapErr.Error(), + } + } + rows = append(rows, row) + } + return OpResult{ + Kind: BatchOpQuery, + Rows: rows, + } +} + // Batch executes ops as a single atomic transaction. // // Semantics: diff --git a/core/pkg/rqlite/batch_caps_test.go b/core/pkg/rqlite/batch_caps_test.go new file mode 100644 index 0000000..d19a328 --- /dev/null +++ b/core/pkg/rqlite/batch_caps_test.go @@ -0,0 +1,87 @@ +package rqlite + +import ( + "strings" + "testing" +) + +// TestEstimateOpResultBytes_growsWithRowCount is a sanity check that the +// estimator is monotonic in row count — required for the aggregate-bytes +// cap in BatchQuery to actually stop the OOM vector (HIGH-severity +// security finding on bugboard #270 follow-up audit). +func TestEstimateOpResultBytes_growsWithRowCount(t *testing.T) { + row := map[string]interface{}{"id": int64(1), "name": "alice"} + + small := OpResult{Kind: BatchOpQuery, Rows: []map[string]interface{}{row}} + big := OpResult{Kind: BatchOpQuery, Rows: make([]map[string]interface{}, 100)} + for i := range big.Rows { + big.Rows[i] = row + } + + smallBytes := estimateOpResultBytes(small) + bigBytes := estimateOpResultBytes(big) + if bigBytes <= smallBytes { + t.Errorf("estimator should grow with row count: 1-row=%d, 100-row=%d", smallBytes, bigBytes) + } + if bigBytes < smallBytes*50 { + t.Errorf("estimator should grow ~linearly: 100×1-row=%d, 100-row=%d (expected ~100x)", + smallBytes*100, bigBytes) + } +} + +// TestEstimateOpResultBytes_accountsForStringContent ensures the +// estimator includes the string-value bytes — otherwise large TEXT +// columns wouldn't count toward the cap and the OOM vector reopens. +func TestEstimateOpResultBytes_accountsForStringContent(t *testing.T) { + bigString := strings.Repeat("x", 10_000) + row := map[string]interface{}{"body": bigString} + + result := OpResult{Kind: BatchOpQuery, Rows: []map[string]interface{}{row}} + bytes := estimateOpResultBytes(result) + + if bytes < 10_000 { + t.Errorf("estimator must include string content bytes; got %d for a 10KB string", bytes) + } +} + +// TestEstimateOpResultBytes_emptyAndError covers edge cases that the +// aggregate-bytes loop in BatchQuery iterates over. +func TestEstimateOpResultBytes_emptyAndError(t *testing.T) { + empty := OpResult{Kind: BatchOpQuery} + if got := estimateOpResultBytes(empty); got <= 0 { + t.Errorf("empty result should have non-negative estimate (got %d)", got) + } + + withErr := OpResult{Kind: BatchOpQuery, Error: "no such table: foo"} + if got := estimateOpResultBytes(withErr); got < len(withErr.Error) { + t.Errorf("estimator should account for error message bytes; got %d for %d-byte error", + got, len(withErr.Error)) + } +} + +// TestMaxBatchQueryRowsPerOp_isReasonable is a sanity check — if a future +// contributor tightens the cap below typical workload sizes, this catches +// it. AnChat's read-batch case is ~10 reads × <100 rows each; we want +// plenty of headroom but not unbounded. +func TestMaxBatchQueryRowsPerOp_isReasonable(t *testing.T) { + if MaxBatchQueryRowsPerOp < 1000 { + t.Errorf("MaxBatchQueryRowsPerOp=%d is too low — typical reads need at least 1000 rows headroom", + MaxBatchQueryRowsPerOp) + } + if MaxBatchQueryRowsPerOp > 1_000_000 { + t.Errorf("MaxBatchQueryRowsPerOp=%d is too high — OOM vector unbounded", + MaxBatchQueryRowsPerOp) + } +} + +// TestMaxBatchQueryTotalBytes_isReasonable mirrors above for the +// aggregate cap. +func TestMaxBatchQueryTotalBytes_isReasonable(t *testing.T) { + if MaxBatchQueryTotalBytes < 1024*1024 { + t.Errorf("MaxBatchQueryTotalBytes=%d is too low (< 1MB)", MaxBatchQueryTotalBytes) + } + if MaxBatchQueryTotalBytes > 1024*1024*1024 { + t.Errorf("MaxBatchQueryTotalBytes=%d is too high (>1GB) — OOM vector unbounded", + MaxBatchQueryTotalBytes) + } +} diff --git a/core/pkg/rqlite/orm_types.go b/core/pkg/rqlite/orm_types.go index e54b560..02e9747 100644 --- a/core/pkg/rqlite/orm_types.go +++ b/core/pkg/rqlite/orm_types.go @@ -56,6 +56,26 @@ type Client interface { // the assigned sequence number. Used by exec_and_publish to attach a seq // to wake-up messages so subscribers can detect replication-lag gaps. BatchWithSeq(ctx context.Context, namespace string, userOps []BatchOp) (*BatchResult, int64, error) + + // BatchQuery runs N SELECT statements in ONE HTTP request to RQLite's + // /db/query endpoint, returning one OpResult per input op in the same + // order. All queries execute on the leader (level=weak — same as our + // default reads) in a single network round-trip — N queries cost ~one + // query's worth of latency instead of N times. + // + // Use this for read-heavy functions that need to gather state from + // multiple tables before doing work. Empirically on devnet (167ms RTT to + // leader): 10 sequential c.Query calls = 3562ms; 1 BatchQuery with 10 + // statements = 338ms. 10× speedup. + // + // Per-query errors are surfaced in OpResult.Error and do NOT fail the + // whole batch — each query's result is independent. A transport-level + // failure (network, leader unreachable) returns a non-nil Go error and + // the OpResults may be empty. + // + // Requires the client to have been constructed with a *gorqlite.Connection + // (NewClientWithDSN or NewClientWithConn). Returns an error otherwise. + BatchQuery(ctx context.Context, ops []BatchOp) ([]OpResult, error) } // Tx mirrors Client but executes within a transaction. diff --git a/core/pkg/serverless/engine.go b/core/pkg/serverless/engine.go index 18dacac..2daff94 100644 --- a/core/pkg/serverless/engine.go +++ b/core/pkg/serverless/engine.go @@ -604,6 +604,7 @@ func (e *Engine) registerHostModule(ctx context.Context) error { NewFunctionBuilder().WithFunc(e.hDBExecute).Export("db_execute"). NewFunctionBuilder().WithFunc(e.hDBExecuteV2).Export("db_execute_v2"). NewFunctionBuilder().WithFunc(e.hDBTransaction).Export("db_transaction"). + NewFunctionBuilder().WithFunc(e.hDBQueryBatch).Export("db_query_batch"). NewFunctionBuilder().WithFunc(e.hExecAndPublish).Export("exec_and_publish"). NewFunctionBuilder().WithFunc(e.hCacheGet).Export("cache_get"). NewFunctionBuilder().WithFunc(e.hCacheSet).Export("cache_set"). @@ -912,6 +913,27 @@ func (e *Engine) hDBTransaction(ctx context.Context, mod api.Module, opsPtr, ops return e.executor.WriteToGuest(ctx, mod, out) } +// hDBQueryBatch is the WASM-callable wrapper for DBQueryBatch. +// Input: pointer/length of opsJSON ({"ops":[{"sql":"...","args":[...]}, ...]}). +// Returns a packed uint64 (ptr<<32 | len) pointing to JSON result in guest +// memory, or 0 on setup/transport error. +// +// Per-query errors are surfaced inside the JSON result (one entry per op +// has its own `error` field). A return of 0 means the whole call failed +// before per-op results could be built. +func (e *Engine) hDBQueryBatch(ctx context.Context, mod api.Module, opsPtr, opsLen uint32) uint64 { + opsJSON, ok := e.executor.ReadFromGuest(mod, opsPtr, opsLen) + if !ok { + return 0 + } + out, err := e.hostServices.DBQueryBatch(ctx, opsJSON) + if err != nil { + e.logger.Warn("host function db_query_batch failed", zap.Error(err)) + return 0 + } + return e.executor.WriteToGuest(ctx, mod, out) +} + // hExecAndPublish is the WASM-callable wrapper for ExecAndPublish. // Inputs: // diff --git a/core/pkg/serverless/hostfuncs_test.go b/core/pkg/serverless/hostfuncs_test.go index 08faf74..7b3ad30 100644 --- a/core/pkg/serverless/hostfuncs_test.go +++ b/core/pkg/serverless/hostfuncs_test.go @@ -64,6 +64,10 @@ func (m *mockHostServices) DBQueryV2(ctx context.Context, query string, args []i return []byte(`{"rows":[]}`), nil } +func (m *mockHostServices) DBQueryBatch(ctx context.Context, opsJSON []byte) ([]byte, error) { + return []byte(`{"results":[]}`), nil +} + func (m *mockHostServices) CacheGet(ctx context.Context, key string) ([]byte, error) { return nil, nil } diff --git a/core/pkg/serverless/hostfunctions/database.go b/core/pkg/serverless/hostfunctions/database.go index 9b750ce..80e0fd1 100644 --- a/core/pkg/serverless/hostfunctions/database.go +++ b/core/pkg/serverless/hostfunctions/database.go @@ -6,11 +6,21 @@ import ( "encoding/json" "fmt" "strconv" + "time" "github.com/DeBrosOfficial/network/pkg/rqlite" "github.com/DeBrosOfficial/network/pkg/serverless" ) +// dbQueryBatchTimeout caps the rqlite round-trip for a single +// `oh.DBQueryBatch` host call. Tighter than the function's invocation +// timeout (typically 15-30s) so a stalled leader doesn't burn the entire +// budget on one batched read; the WASM function still has headroom to +// do downstream work after the read returns. 10s is generous for the +// 167ms-RTT cross-region devnet cluster (one round-trip ~340ms) while +// catching genuine leader stalls quickly. +const dbQueryBatchTimeout = 10 * time.Second + // DBQuery executes a SELECT query and returns JSON-encoded results. func (h *HostFunctions) DBQuery(ctx context.Context, query string, args []interface{}) ([]byte, error) { if h.db == nil { @@ -176,6 +186,93 @@ func (h *HostFunctions) DBTransaction(ctx context.Context, opsJSON []byte) ([]by return out, nil } +// dbQueryBatchRequest is the WASM-side shape for db_query_batch input. +// Each op MUST be Kind=BatchOpQuery; mixing exec is rejected at the +// rqlite layer. +type dbQueryBatchRequest struct { + Ops []rqlite.BatchOp `json:"ops"` +} + +// dbQueryBatchResult is the JSON wire shape returned to WASM callers. +// `Results` is one entry per input op, in the same order. Per-op errors +// are surfaced in `error`; transport/validation errors come back as a +// Go error from the host fn. +type dbQueryBatchResult struct { + Results []rqlite.OpResult `json:"results"` +} + +// DBQueryBatch runs N SELECTs in one round-trip via RQLite's /db/query +// bulk endpoint. Designed for read-heavy functions that gather state +// from multiple tables before doing work (e.g. anchat's message-create +// reads auth + participants + devices = 7-10 SELECTs). +// +// Wire shapes: +// +// in: {"ops": [{"sql":"...","args":[...]}, ...]} +// out: {"results": [{"kind":"query","rows":[...],"error":""}, ...]} +// +// Per-query errors are reported in the per-op `error` field; the host +// fn only returns a Go error on setup/validation/transport failures. +// Kind is auto-set to "query" on input — exec ops are rejected, since +// mixing kinds in a query batch is meaningless and would silently +// drop the writes (see bugboard #270). +// +// Empirical baseline on devnet's cross-region cluster (167ms RTT to +// leader): 10 sequential DBQuery host calls = ~3.5s; one DBQueryBatch +// with 10 statements = ~340ms. 10× speedup. +func (h *HostFunctions) DBQueryBatch(ctx context.Context, opsJSON []byte) ([]byte, error) { + if h.db == nil { + return nil, &serverless.HostFunctionError{Function: "db_query_batch", Cause: serverless.ErrDatabaseUnavailable} + } + var req dbQueryBatchRequest + if err := json.Unmarshal(opsJSON, &req); err != nil { + return nil, &serverless.HostFunctionError{ + Function: "db_query_batch", + Cause: fmt.Errorf("invalid json: %w", err), + } + } + if len(req.Ops) == 0 { + return nil, &serverless.HostFunctionError{ + Function: "db_query_batch", + Cause: fmt.Errorf("ops required"), + } + } + if len(req.Ops) > rqlite.MaxBatchOps { + return nil, &serverless.HostFunctionError{ + Function: "db_query_batch", + Cause: fmt.Errorf("too many ops: max %d", rqlite.MaxBatchOps), + } + } + // Force kind=query for every op. Callers can omit the field; this + // makes the wire format more ergonomic AND prevents accidental exec + // ops from being silently dropped by the rqlite-side validator. + for i := range req.Ops { + req.Ops[i].Kind = rqlite.BatchOpQuery + } + + // Explicit batch-level deadline. The caller's ctx already carries the + // function's invocation timeout (typically 15-30s), but we want a + // tighter cap on the rqlite round-trip itself so a stalled leader + // doesn't burn the entire invocation budget on one batched query. + // Leaves headroom for downstream WASM work after the read returns. + batchCtx, cancel := context.WithTimeout(ctx, dbQueryBatchTimeout) + defer cancel() + + results, err := h.db.BatchQuery(batchCtx, req.Ops) + if err != nil { + return nil, &serverless.HostFunctionError{Function: "db_query_batch", Cause: err} + } + + out, mErr := json.Marshal(dbQueryBatchResult{Results: results}) + if mErr != nil { + return nil, &serverless.HostFunctionError{ + Function: "db_query_batch", + Cause: fmt.Errorf("marshal result: %w", mErr), + } + } + return out, nil +} + // execAndPublishResult is the JSON wire shape returned to WASM callers. type execAndPublishResult struct { Results []rqlite.OpResult `json:"results"` diff --git a/core/pkg/serverless/hostfunctions/database_test.go b/core/pkg/serverless/hostfunctions/database_test.go index 3f91eda..6f367fa 100644 --- a/core/pkg/serverless/hostfunctions/database_test.go +++ b/core/pkg/serverless/hostfunctions/database_test.go @@ -11,18 +11,21 @@ import ( "github.com/DeBrosOfficial/network/pkg/rqlite" ) -// fakeBatchClient is a tiny rqlite.Client stub that only implements Batch -// and BatchWithSeq. Other methods rely on the embedded Client which is nil — -// any test that calls them will panic, which is intentional. +// fakeBatchClient is a tiny rqlite.Client stub that only implements Batch, +// BatchWithSeq, and BatchQuery. Other methods rely on the embedded Client +// which is nil — any test that calls them will panic, which is intentional. type fakeBatchClient struct { rqlite.Client - calls int - lastOps []rqlite.BatchOp - seqCalls int - lastSeqNS string - respond func(ops []rqlite.BatchOp) (*rqlite.BatchResult, error) - respondSeq func(ns string, ops []rqlite.BatchOp) (*rqlite.BatchResult, int64, error) - nextSeq int64 + calls int + lastOps []rqlite.BatchOp + seqCalls int + lastSeqNS string + queryCalls int + lastQueryOps []rqlite.BatchOp + respond func(ops []rqlite.BatchOp) (*rqlite.BatchResult, error) + respondSeq func(ns string, ops []rqlite.BatchOp) (*rqlite.BatchResult, int64, error) + respondQuery func(ops []rqlite.BatchOp) ([]rqlite.OpResult, error) + nextSeq int64 } func (f *fakeBatchClient) Batch(ctx context.Context, ops []rqlite.BatchOp) (*rqlite.BatchResult, error) { @@ -50,6 +53,23 @@ func (f *fakeBatchClient) BatchWithSeq(ctx context.Context, namespace string, op return res, atomic.LoadInt64(&f.nextSeq), err } +func (f *fakeBatchClient) BatchQuery(ctx context.Context, ops []rqlite.BatchOp) ([]rqlite.OpResult, error) { + f.queryCalls++ + f.lastQueryOps = ops + if f.respondQuery != nil { + return f.respondQuery(ops) + } + // Default: echo one OpResult per input with a single row {ok:1}. + results := make([]rqlite.OpResult, len(ops)) + for i := range ops { + results[i] = rqlite.OpResult{ + Kind: rqlite.BatchOpQuery, + Rows: []map[string]interface{}{{"ok": int64(1)}}, + } + } + return results, nil +} + func newHFWithDB(db rqlite.Client) *HostFunctions { return &HostFunctions{db: db} } @@ -349,3 +369,158 @@ func TestDBTransaction_rollback_returns_committed_false_no_go_error(t *testing.T t.Errorf("expected UNIQUE error in result, got: %q", res.Results[1].Error) } } + +// ============================================================================= +// DBQueryBatch tests (bugboard #270 — batched-reads host fn) +// ============================================================================= + +// TestDBQueryBatch_happy_path verifies the wire shape and that ops flow +// through to rqlite.Client.BatchQuery in order. +func TestDBQueryBatch_happy_path(t *testing.T) { + fake := &fakeBatchClient{} + h := newHFWithDB(fake) + + in := `{"ops":[ + {"sql":"SELECT 1"}, + {"sql":"SELECT 2 WHERE x = ?","args":[42]}, + {"sql":"SELECT 3"} + ]}` + out, err := h.DBQueryBatch(context.Background(), []byte(in)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if fake.queryCalls != 1 { + t.Errorf("expected 1 BatchQuery call, got %d", fake.queryCalls) + } + if len(fake.lastQueryOps) != 3 { + t.Errorf("expected 3 ops forwarded, got %d", len(fake.lastQueryOps)) + } + // Each op MUST have kind force-set to "query" by the host fn, + // regardless of what the caller sent. This prevents accidental exec + // from being dropped silently (see bugboard #270). + for i, op := range fake.lastQueryOps { + if op.Kind != rqlite.BatchOpQuery { + t.Errorf("op[%d] kind = %q; want %q", i, op.Kind, rqlite.BatchOpQuery) + } + } + var res dbQueryBatchResult + if err := json.Unmarshal(out, &res); err != nil { + t.Fatalf("decode result: %v", err) + } + if len(res.Results) != 3 { + t.Errorf("expected 3 results, got %d", len(res.Results)) + } +} + +// TestDBQueryBatch_forces_kind_query is the regression guard against the +// "silent exec drop" failure mode. The bugboard #270 fix explicitly sets +// every input op's kind to BatchOpQuery so callers can't accidentally +// pass `{"kind":"exec"}` into a query batch and have it disappear. +func TestDBQueryBatch_forces_kind_query(t *testing.T) { + fake := &fakeBatchClient{} + h := newHFWithDB(fake) + + // Caller maliciously/accidentally sends kind=exec — host fn must coerce. + in := `{"ops":[{"kind":"exec","sql":"DELETE FROM users"}]}` + if _, err := h.DBQueryBatch(context.Background(), []byte(in)); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(fake.lastQueryOps) != 1 { + t.Fatalf("expected 1 op forwarded, got %d", len(fake.lastQueryOps)) + } + if fake.lastQueryOps[0].Kind != rqlite.BatchOpQuery { + t.Errorf("kind = %q; want %q (must coerce, NOT silently let exec through)", + fake.lastQueryOps[0].Kind, rqlite.BatchOpQuery) + } +} + +func TestDBQueryBatch_invalid_json_rejected(t *testing.T) { + h := newHFWithDB(&fakeBatchClient{}) + _, err := h.DBQueryBatch(context.Background(), []byte(`not json`)) + if err == nil { + t.Fatal("expected error for invalid json, got nil") + } + if !strings.Contains(err.Error(), "invalid json") { + t.Errorf("expected 'invalid json' in error, got: %v", err) + } +} + +func TestDBQueryBatch_no_ops_rejected(t *testing.T) { + h := newHFWithDB(&fakeBatchClient{}) + _, err := h.DBQueryBatch(context.Background(), []byte(`{"ops":[]}`)) + if err == nil { + t.Fatal("expected error for empty ops, got nil") + } + if !strings.Contains(err.Error(), "ops required") { + t.Errorf("expected 'ops required' in error, got: %v", err) + } +} + +func TestDBQueryBatch_oversize_batch_rejected(t *testing.T) { + h := newHFWithDB(&fakeBatchClient{}) + + var sb strings.Builder + sb.WriteString(`{"ops":[`) + for i := 0; i <= rqlite.MaxBatchOps; i++ { + if i > 0 { + sb.WriteString(",") + } + sb.WriteString(`{"sql":"SELECT 1"}`) + } + sb.WriteString(`]}`) + + _, err := h.DBQueryBatch(context.Background(), []byte(sb.String())) + if err == nil { + t.Fatal("expected error for oversize batch, got nil") + } + if !strings.Contains(err.Error(), "too many ops") { + t.Errorf("expected 'too many ops' in error, got: %v", err) + } +} + +func TestDBQueryBatch_no_db_returns_error(t *testing.T) { + h := &HostFunctions{db: nil} + _, err := h.DBQueryBatch(context.Background(), []byte(`{"ops":[{"sql":"SELECT 1"}]}`)) + if err == nil { + t.Fatal("expected error when db is nil") + } +} + +// TestDBQueryBatch_per_op_errors_surface_in_json verifies that a per-op +// SQL error (e.g. table doesn't exist) appears in the per-op `error` +// field instead of failing the whole call. This matches DBTransaction's +// "structured error" contract. +func TestDBQueryBatch_per_op_errors_surface_in_json(t *testing.T) { + fake := &fakeBatchClient{ + respondQuery: func(ops []rqlite.BatchOp) ([]rqlite.OpResult, error) { + return []rqlite.OpResult{ + {Kind: rqlite.BatchOpQuery, Rows: []map[string]interface{}{{"x": int64(1)}}}, + {Kind: rqlite.BatchOpQuery, Error: "no such table: missing"}, + }, nil + }, + } + h := newHFWithDB(fake) + + in := `{"ops":[{"sql":"SELECT 1"},{"sql":"SELECT * FROM missing"}]}` + out, err := h.DBQueryBatch(context.Background(), []byte(in)) + if err != nil { + t.Fatalf("per-op errors must NOT surface as Go errors: %v", err) + } + var res dbQueryBatchResult + if err := json.Unmarshal(out, &res); err != nil { + t.Fatalf("decode result: %v", err) + } + if len(res.Results) != 2 { + t.Fatalf("expected 2 results, got %d", len(res.Results)) + } + if res.Results[0].Error != "" { + t.Errorf("op 0 should have no error, got: %q", res.Results[0].Error) + } + if res.Results[1].Error == "" { + t.Errorf("op 1 should carry SQL error in JSON, got empty") + } +} + +// Silence the "imported and not used" warning if sql isn't needed elsewhere +// in test additions — kept here as a guard in case future tests need it. +var _ = sql.ErrNoRows diff --git a/core/pkg/serverless/mocks_test.go b/core/pkg/serverless/mocks_test.go index 21fb9ee..db41ae7 100644 --- a/core/pkg/serverless/mocks_test.go +++ b/core/pkg/serverless/mocks_test.go @@ -128,6 +128,13 @@ func (m *MockHostServices) DBQueryV2(ctx context.Context, query string, args []i return []byte(`{"rows":[]}`), nil } +func (m *MockHostServices) DBQueryBatch(ctx context.Context, opsJSON []byte) ([]byte, error) { + // Bare stub — returns the empty results shape. Tests that need per-op + // behavior should mock at the HostFunctions level (see fakeBatchClient + // in pkg/serverless/hostfunctions/database_test.go). + return []byte(`{"results":[]}`), nil +} + func (m *MockHostServices) CacheGet(ctx context.Context, key string) ([]byte, error) { m.mu.RLock() defer m.mu.RUnlock() @@ -408,6 +415,15 @@ func (m *MockRQLite) BatchWithSeq(ctx context.Context, namespace string, ops []r return res, 1, err } +func (m *MockRQLite) BatchQuery(ctx context.Context, ops []rqlite.BatchOp) ([]rqlite.OpResult, error) { + // Bare stub mirroring Batch: one empty-row result per op. + results := make([]rqlite.OpResult, len(ops)) + for i := range ops { + results[i] = rqlite.OpResult{Kind: rqlite.BatchOpQuery, Rows: nil} + } + return results, nil +} + type mockResult struct{} func (m *mockResult) LastInsertId() (int64, error) { return 1, nil } diff --git a/core/pkg/serverless/triggers/dispatcher.go b/core/pkg/serverless/triggers/dispatcher.go index d004003..9d3d634 100644 --- a/core/pkg/serverless/triggers/dispatcher.go +++ b/core/pkg/serverless/triggers/dispatcher.go @@ -3,8 +3,10 @@ package triggers import ( "context" "encoding/json" + "sync" "time" + "github.com/DeBrosOfficial/network/pkg/pubsub" "github.com/DeBrosOfficial/network/pkg/serverless" "github.com/DeBrosOfficial/network/pkg/serverless/aggregator" olriclib "github.com/olric-data/olric" @@ -18,6 +20,13 @@ const ( // dispatchTimeout is the timeout for each triggered function invocation. dispatchTimeout = 60 * time.Second + + // dispatcherRefreshInterval is the safety-net cadence for re-syncing + // libp2p subscriptions against the trigger store. Trigger add/remove + // calls Refresh synchronously; this catches anything missed (e.g. an + // add that happened on a different gateway node, or a deploy-time + // auto-register where the Refresh hook wasn't wired). + dispatcherRefreshInterval = 60 * time.Second ) // PubSubEvent is the JSON payload sent to functions triggered by PubSub messages. @@ -29,32 +38,270 @@ type PubSubEvent struct { Timestamp int64 `json:"timestamp"` } +// dispatcherPubSub is the subset of *pubsub.ClientAdapter the dispatcher +// needs for libp2p subscribe/unsubscribe. Defined as an interface so the +// dispatcher's Start/Refresh logic is unit-testable without standing up +// a real libp2p host. +type dispatcherPubSub interface { + Subscribe(ctx context.Context, topic string, handler pubsub.MessageHandler) error + Unsubscribe(ctx context.Context, topic string) error +} + +// topicLister is the subset of *PubSubTriggerStore the dispatcher's +// Refresh path needs. Defined as an interface so tests can inject a +// canned trigger set and exercise the real Refresh code path (rather +// than re-simulating it inline, which would let regressions slip). +type topicLister interface { + ListDistinctTopicPatterns(ctx context.Context) ([]DistinctTopicSubscription, error) +} + // PubSubDispatcher looks up triggers for a topic+namespace and asynchronously -// invokes matching serverless functions. +// invokes matching serverless functions. Subscribes to libp2p pubsub for +// every literal trigger pattern so WASM `oh.PubSubPublish` calls reach +// trigger handlers (bugboard #282 — before this, the dispatcher only fired +// when the HTTP `/v1/pubsub/publish` endpoint was hit, so every internal +// WASM publish silently dropped every subscriber). +// +// KNOWN LIMITATIONS (tracked as follow-ups, NOT in scope for #282): +// +// 1. Cross-namespace publish surface: any peer in the cluster's libp2p +// mesh can publish to a tenant's namespaced topic (`.`) +// and drive a trigger invocation. The libp2p mesh has no per-topic +// ACL, so a compromised namespace gateway gains the ability to fire +// other tenants' handlers. Pre-fix this attack failed because the +// dispatcher never subscribed at all. Mitigation requires either +// signed-envelope verification at dispatch time or a per-namespace +// swarm key (PSK) separating each tenant's pubsub mesh. Documented +// in the security audit on bugboard #282; track as a separate ticket. +// +// 2. Trigger-depth loops via libp2p round-trip: maxTriggerDepth=5 is +// embedded in the PubSubEvent payload, but a triggered function that +// publishes back through `oh.PubSubPublish` re-enters this dispatcher +// via libp2p Subscribe with depth=0 (the depth field lives in the +// OUR envelope, not in the libp2p wire format). Loops are bounded +// only by the per-invocation timeout. WASM functions MUST self-limit +// by reading `event.trigger_depth` from their input. A future fix +// would encode depth in a libp2p header the dispatcher reads back. +// +// 3. Wildcard patterns are not subscribed via libp2p (libp2p has no +// wildcard subscribe). Wildcard triggers only fire from HTTP-publish +// events via the legacy Dispatch hook, NOT from WASM publishes. +// Documented in Refresh below. type PubSubDispatcher struct { store *PubSubTriggerStore invoker *serverless.Invoker olricClient olriclib.Client // may be nil (cache disabled) aggregator *aggregator.Aggregator logger *zap.Logger + + // topicLister is the interface Refresh uses to enumerate desired + // subscriptions. Defaults to the concrete store but is overridable + // in tests so the real Refresh code path can be exercised against + // a canned trigger set. Set in NewPubSubDispatcher; only swapped + // by tests via the helper in dispatcher_refresh_test.go. + topicLister topicLister + + // pubsub is the libp2p-pubsub layer the dispatcher subscribes to so + // it can react to events published from WASM `oh.PubSubPublish` calls + // (which bypass the HTTP publish handler). nil disables the + // auto-subscribe behavior — kept nullable for tests that exercise + // only the Dispatch path. + pubsub dispatcherPubSub + + // subMu guards subscribedKeys against concurrent Refresh + Stop calls. + subMu sync.Mutex + // subscribedKeys is the set of (namespace, topic) tuples currently + // libp2p-subscribed by this dispatcher. Used by Refresh to compute the + // add/remove diff against the live trigger store. + subscribedKeys map[string]bool + + // stopCh signals the periodic Refresh goroutine to exit. + stopCh chan struct{} + stopOnce sync.Once } // NewPubSubDispatcher creates a new PubSub trigger dispatcher. +// +// The `ps` argument may be nil (e.g. in tests, or namespaces with pubsub +// disabled) — in that case Start/Refresh are no-ops and the dispatcher +// only fires for explicit Dispatch calls (the legacy HTTP-publish hook). func NewPubSubDispatcher( store *PubSubTriggerStore, invoker *serverless.Invoker, olricClient olriclib.Client, + ps dispatcherPubSub, logger *zap.Logger, ) *PubSubDispatcher { return &PubSubDispatcher{ - store: store, - invoker: invoker, - olricClient: olricClient, - aggregator: aggregator.New(logger, dispatchTimeout), - logger: logger, + store: store, + topicLister: store, // defaults to the real store; tests override + invoker: invoker, + olricClient: olricClient, + pubsub: ps, + aggregator: aggregator.New(logger, dispatchTimeout), + logger: logger, + subscribedKeys: make(map[string]bool), + stopCh: make(chan struct{}), } } +// subKey produces the map key used to track libp2p subscriptions per +// (namespace, topic) tuple. Keeping it in one place avoids drift. +func subKey(namespace, topic string) string { + return namespace + "|" + topic +} + +// Start subscribes to libp2p pubsub for every literal trigger pattern in +// the store and spawns the periodic refresh goroutine. Returns the first +// Subscribe error if any — but a partial-failure scenario (some topics +// subscribed, others failed) is logged and continues, since one bad topic +// shouldn't break dispatch for every other handler. +// +// Wildcard patterns (e.g. "messages:*") are skipped with a warning. libp2p +// has no native wildcard subscribe, so handling those cross-node properly +// needs a separate mechanism (per-namespace fan-out topic, or hooking +// HostFunctions.PubSubPublish to call Dispatch directly). For now, wildcard +// triggers only fire when the publish originates from the HTTP endpoint +// (which goes through the legacy Dispatch hook). +func (d *PubSubDispatcher) Start(ctx context.Context) error { + if d.pubsub == nil { + d.logger.Info("PubSubDispatcher.Start: pubsub disabled, skipping libp2p subscribe") + return nil + } + if err := d.Refresh(ctx); err != nil { + return err + } + go d.refreshLoop() + d.logger.Info("PubSubDispatcher started", + zap.Duration("refresh_interval", dispatcherRefreshInterval), + ) + return nil +} + +// Stop signals the periodic refresh goroutine to exit. Safe to call +// multiple times. Does NOT unsubscribe — the dispatcher's libp2p +// subscriptions die with the pubsub manager during gateway shutdown. +func (d *PubSubDispatcher) Stop() { + d.stopOnce.Do(func() { + close(d.stopCh) + }) +} + +// refreshLoop is the periodic-Refresh goroutine spawned by Start. Catches +// trigger add/remove events that didn't go through the Refresh hook (e.g. +// a different gateway node ran the trigger add, or the deploy-time +// auto-register path). +func (d *PubSubDispatcher) refreshLoop() { + ticker := time.NewTicker(dispatcherRefreshInterval) + defer ticker.Stop() + for { + select { + case <-d.stopCh: + return + case <-ticker.C: + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + if err := d.Refresh(ctx); err != nil { + d.logger.Warn("PubSubDispatcher periodic refresh failed", + zap.Error(err)) + } + cancel() + } + } +} + +// Refresh re-syncs libp2p subscriptions against the live trigger store: +// subscribes to any new literal patterns, unsubscribes from patterns +// whose triggers were all removed. Idempotent — safe to call from +// multiple paths (Start, trigger-add hook, periodic loop). +// +// Wildcards are skipped (see Start). Errors on individual Subscribe calls +// are logged but do not abort the refresh — one bad topic shouldn't take +// down every other handler. +func (d *PubSubDispatcher) Refresh(ctx context.Context) error { + if d.pubsub == nil { + return nil + } + subs, err := d.topicLister.ListDistinctTopicPatterns(ctx) + if err != nil { + return err + } + + // Compute the desired set, skipping wildcards. + desired := make(map[string]DistinctTopicSubscription, len(subs)) + for _, s := range subs { + if s.Wildcard { + // Log once-per-refresh would be cleaner but the volume is + // bounded by the trigger count and this is a known limitation. + d.logger.Debug("PubSubDispatcher.Refresh: skipping wildcard pattern (libp2p has no wildcard subscribe)", + zap.String("namespace", s.Namespace), + zap.String("topic_pattern", s.TopicPattern), + ) + continue + } + desired[subKey(s.Namespace, s.TopicPattern)] = s + } + + d.subMu.Lock() + defer d.subMu.Unlock() + + // Subscribe to newly-added topics. + for key, s := range desired { + if d.subscribedKeys[key] { + continue + } + ns, topic := s.Namespace, s.TopicPattern + handler := func(msgTopic string, data []byte) error { + // PEER_DISCOVERY_PING is filtered upstream in the Manager. + // data already excludes those. + d.Dispatch(context.Background(), ns, topic, data, 0) + return nil + } + if err := d.pubsub.Subscribe(ctx, topic, handler); err != nil { + d.logger.Warn("PubSubDispatcher.Refresh: libp2p Subscribe failed", + zap.String("namespace", ns), + zap.String("topic", topic), + zap.Error(err)) + continue + } + d.subscribedKeys[key] = true + d.logger.Info("PubSubDispatcher subscribed to trigger topic", + zap.String("namespace", ns), + zap.String("topic", topic)) + } + + // Unsubscribe from topics whose triggers were all removed. + for key := range d.subscribedKeys { + if _, stillDesired := desired[key]; stillDesired { + continue + } + // key format is "namespace|topic"; split safely. + topic := key + if i := indexByteFromStart(key, '|'); i >= 0 { + topic = key[i+1:] + } + if err := d.pubsub.Unsubscribe(ctx, topic); err != nil { + d.logger.Debug("PubSubDispatcher.Refresh: libp2p Unsubscribe ignored", + zap.String("key", key), + zap.Error(err)) + } + delete(d.subscribedKeys, key) + d.logger.Info("PubSubDispatcher unsubscribed from trigger topic (no live triggers)", + zap.String("key", key)) + } + return nil +} + +// indexByteFromStart is a tiny local helper to avoid importing `strings` +// for one call. Returns the index of the first occurrence of c in s, or -1. +func indexByteFromStart(s string, c byte) int { + for i := 0; i < len(s); i++ { + if s[i] == c { + return i + } + } + return -1 +} + // Aggregator exposes the underlying aggregator so callers (gateway lifecycle) // can flush pending buffers on shutdown. func (d *PubSubDispatcher) Aggregator() *aggregator.Aggregator { diff --git a/core/pkg/serverless/triggers/dispatcher_refresh_test.go b/core/pkg/serverless/triggers/dispatcher_refresh_test.go new file mode 100644 index 0000000..75452e8 --- /dev/null +++ b/core/pkg/serverless/triggers/dispatcher_refresh_test.go @@ -0,0 +1,286 @@ +package triggers + +import ( + "context" + "errors" + "sort" + "sync" + "testing" + + "github.com/DeBrosOfficial/network/pkg/pubsub" + "go.uber.org/zap" +) + +// fakePubSubManager implements dispatcherPubSub for unit tests. Tracks +// Subscribe/Unsubscribe calls in order so tests can assert exact behavior +// without standing up a real libp2p host. +type fakePubSubManager struct { + mu sync.Mutex + subscribed map[string]pubsub.MessageHandler // topic → handler + subscribeErr func(topic string) error + subscribeCalls []string + unsubscribeCalls []string +} + +func newFakePubSubManager() *fakePubSubManager { + return &fakePubSubManager{subscribed: map[string]pubsub.MessageHandler{}} +} + +func (f *fakePubSubManager) Subscribe(ctx context.Context, topic string, handler pubsub.MessageHandler) error { + f.mu.Lock() + defer f.mu.Unlock() + if f.subscribeErr != nil { + if err := f.subscribeErr(topic); err != nil { + return err + } + } + f.subscribed[topic] = handler + f.subscribeCalls = append(f.subscribeCalls, topic) + return nil +} + +func (f *fakePubSubManager) Unsubscribe(ctx context.Context, topic string) error { + f.mu.Lock() + defer f.mu.Unlock() + delete(f.subscribed, topic) + f.unsubscribeCalls = append(f.unsubscribeCalls, topic) + return nil +} + +func (f *fakePubSubManager) subscribedTopics() []string { + f.mu.Lock() + defer f.mu.Unlock() + out := make([]string, 0, len(f.subscribed)) + for t := range f.subscribed { + out = append(out, t) + } + sort.Strings(out) + return out +} + +// fakeTopicLister implements the topicLister interface so Refresh's real +// code path can be exercised without standing up an rqlite client. The +// `subs` field is what ListDistinctTopicPatterns returns; tests mutate it +// between Refresh calls to drive add/remove diffs. +type fakeTopicLister struct { + subs []DistinctTopicSubscription + listErr error + calls int +} + +func (l *fakeTopicLister) ListDistinctTopicPatterns(ctx context.Context) ([]DistinctTopicSubscription, error) { + l.calls++ + if l.listErr != nil { + return nil, l.listErr + } + return append([]DistinctTopicSubscription(nil), l.subs...), nil +} + +// newDispatcherForRefreshTest builds a PubSubDispatcher with the fake +// topic lister and fake pubsub manager swapped in. Returns the dispatcher +// plus both fakes so tests can mutate the trigger set and assert behavior. +func newDispatcherForRefreshTest(initialSubs []DistinctTopicSubscription) (*PubSubDispatcher, *fakeTopicLister, *fakePubSubManager) { + ps := newFakePubSubManager() + lister := &fakeTopicLister{subs: initialSubs} + d := NewPubSubDispatcher(nil, nil, nil, ps, zap.NewNop()) + // Swap the topicLister with our fake — the constructor defaulted it to + // the (nil) store. This is the seam that makes Refresh exercisable in + // unit tests without an rqlite dependency. + d.topicLister = lister + return d, lister, ps +} + +// TestRefresh_subscribesNewLiteralTopics — happy path. Triggers added to +// the store result in libp2p subscribes for their literal topics on the +// next Refresh. Regression guard for bugboard #282 — without the fix, +// dispatcher.Start never subscribed and every WASM publish silently +// dropped every trigger handler. +func TestRefresh_subscribesNewLiteralTopics(t *testing.T) { + d, _, ps := newDispatcherForRefreshTest([]DistinctTopicSubscription{ + {Namespace: "anchat", TopicPattern: "messages:new", Wildcard: false}, + {Namespace: "anchat", TopicPattern: "conversations:updated", Wildcard: false}, + {Namespace: "anchat", TopicPattern: "messages:*", Wildcard: true}, + }) + + if err := d.Refresh(context.Background()); err != nil { + t.Fatalf("Refresh: %v", err) + } + + got := ps.subscribedTopics() + want := []string{"conversations:updated", "messages:new"} + if !equalStrings(got, want) { + t.Errorf("subscribed topics = %v, want %v (wildcard 'messages:*' must be skipped)", got, want) + } + + // subscribedKeys should track both namespaced keys. + d.subMu.Lock() + defer d.subMu.Unlock() + if !d.subscribedKeys[subKey("anchat", "messages:new")] { + t.Error("subscribedKeys missing messages:new") + } + if !d.subscribedKeys[subKey("anchat", "conversations:updated")] { + t.Error("subscribedKeys missing conversations:updated") + } + if d.subscribedKeys[subKey("anchat", "messages:*")] { + t.Error("subscribedKeys should NOT contain wildcard 'messages:*'") + } +} + +// TestRefresh_unsubscribesRemovedTopics — diff path. Triggers removed +// from the store (so their topic disappears from ListDistinct...) are +// unsubscribed on the next Refresh. +func TestRefresh_unsubscribesRemovedTopics(t *testing.T) { + d, lister, ps := newDispatcherForRefreshTest([]DistinctTopicSubscription{ + {Namespace: "ns", TopicPattern: "old-topic"}, + {Namespace: "ns", TopicPattern: "still-here"}, + }) + + // First Refresh — both subscribed. + if err := d.Refresh(context.Background()); err != nil { + t.Fatalf("first Refresh: %v", err) + } + if got, want := ps.subscribedTopics(), []string{"old-topic", "still-here"}; !equalStrings(got, want) { + t.Fatalf("after first refresh: subscribed = %v, want %v", got, want) + } + + // Simulate trigger removal — only one remains. + lister.subs = []DistinctTopicSubscription{ + {Namespace: "ns", TopicPattern: "still-here"}, + } + + // Second Refresh — old-topic should be unsubscribed. + if err := d.Refresh(context.Background()); err != nil { + t.Fatalf("second Refresh: %v", err) + } + if len(ps.unsubscribeCalls) != 1 || ps.unsubscribeCalls[0] != "old-topic" { + t.Errorf("unsubscribe calls = %v, want [old-topic]", ps.unsubscribeCalls) + } + if got, want := ps.subscribedTopics(), []string{"still-here"}; !equalStrings(got, want) { + t.Errorf("after prune: subscribed = %v, want %v", got, want) + } +} + +// TestRefresh_skipsAlreadySubscribed — idempotency. Calling Refresh +// twice with the same trigger set must NOT re-subscribe. +func TestRefresh_skipsAlreadySubscribed(t *testing.T) { + d, _, ps := newDispatcherForRefreshTest([]DistinctTopicSubscription{ + {Namespace: "ns", TopicPattern: "topic-a"}, + }) + + if err := d.Refresh(context.Background()); err != nil { + t.Fatalf("first Refresh: %v", err) + } + if err := d.Refresh(context.Background()); err != nil { + t.Fatalf("second Refresh: %v", err) + } + + if len(ps.subscribeCalls) != 1 { + t.Errorf("expected 1 subscribe call total (idempotent), got %d: %v", + len(ps.subscribeCalls), ps.subscribeCalls) + } +} + +// TestRefresh_subscribeErrorDoesNotBlockOtherTopics — a single Subscribe +// failure must not abort the refresh for other topics. One bad topic +// shouldn't take down every other handler. +func TestRefresh_subscribeErrorDoesNotBlockOtherTopics(t *testing.T) { + d, _, ps := newDispatcherForRefreshTest([]DistinctTopicSubscription{ + {Namespace: "ns", TopicPattern: "ok-1"}, + {Namespace: "ns", TopicPattern: "broken-topic"}, + {Namespace: "ns", TopicPattern: "ok-2"}, + }) + ps.subscribeErr = func(topic string) error { + if topic == "broken-topic" { + return errors.New("simulated libp2p failure") + } + return nil + } + + if err := d.Refresh(context.Background()); err != nil { + t.Fatalf("Refresh: %v", err) + } + + if got, want := ps.subscribedTopics(), []string{"ok-1", "ok-2"}; !equalStrings(got, want) { + t.Errorf("subscribed = %v, want %v (broken-topic should fail-soft)", got, want) + } + + // subscribedKeys must NOT contain the failed topic so the next Refresh + // retries it. Verifies the rollback-on-error path. + d.subMu.Lock() + defer d.subMu.Unlock() + if d.subscribedKeys[subKey("ns", "broken-topic")] { + t.Error("subscribedKeys must NOT include broken-topic (so next Refresh retries)") + } +} + +// TestRefresh_listError_propagates verifies that a transport error from +// the trigger store (e.g. rqlite unreachable) returns an error from +// Refresh rather than silently doing nothing. +func TestRefresh_listError_propagates(t *testing.T) { + d, lister, _ := newDispatcherForRefreshTest(nil) + lister.listErr = errors.New("rqlite unavailable") + + err := d.Refresh(context.Background()) + if err == nil { + t.Fatal("expected error from Refresh when store fails, got nil") + } + if !errors.Is(err, lister.listErr) && err.Error() != lister.listErr.Error() { + t.Errorf("expected wrapped store error, got: %v", err) + } +} + +// TestNewPubSubDispatcher_nilPubsubIsAllowed — constructs cleanly when +// pubsub manager is nil. Subsequent Start/Refresh must be no-ops, and +// the store must NOT be queried (since there's no point subscribing). +func TestNewPubSubDispatcher_nilPubsubIsAllowed(t *testing.T) { + d := NewPubSubDispatcher(nil, nil, nil, nil, zap.NewNop()) + if d == nil { + t.Fatal("constructor returned nil") + } + // Swap in a fake lister so we can assert it isn't called. + fakeLister := &fakeTopicLister{} + d.topicLister = fakeLister + + if err := d.Start(context.Background()); err != nil { + t.Errorf("Start with nil pubsub returned error: %v", err) + } + if err := d.Refresh(context.Background()); err != nil { + t.Errorf("Refresh with nil pubsub returned error: %v", err) + } + if fakeLister.calls != 0 { + t.Errorf("topic lister should NOT be called when pubsub is nil, got %d calls", fakeLister.calls) + } + // Stop is idempotent (two close on stopCh would panic; stopOnce guards it). + d.Stop() + d.Stop() +} + +// TestSubKey verifies the (namespace, topic) tuple key format is stable — +// the Refresh diff logic depends on consistent key construction. +func TestSubKey(t *testing.T) { + cases := []struct { + ns, topic, want string + }{ + {"anchat", "messages:new", "anchat|messages:new"}, + {"", "topic-only", "|topic-only"}, + {"ns", "", "ns|"}, + } + for _, c := range cases { + if got := subKey(c.ns, c.topic); got != c.want { + t.Errorf("subKey(%q, %q) = %q, want %q", c.ns, c.topic, got, c.want) + } + } +} + +// equalStrings is a tiny helper for slice-equality assertions (order-sensitive). +func equalStrings(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} diff --git a/core/pkg/serverless/triggers/pubsub_store.go b/core/pkg/serverless/triggers/pubsub_store.go index 6125339..d11b41d 100644 --- a/core/pkg/serverless/triggers/pubsub_store.go +++ b/core/pkg/serverless/triggers/pubsub_store.go @@ -197,6 +197,54 @@ func (s *PubSubTriggerStore) ListByFunction(ctx context.Context, functionID stri return triggers, nil } +// DistinctTopicSubscription is a (namespace, topic_pattern) pair used by +// the dispatcher to know which libp2p pubsub topics to subscribe to. +// Wildcard patterns are flagged so the caller can skip subscribing (libp2p +// has no native wildcard support — see bugboard #282 implementation notes). +type DistinctTopicSubscription struct { + Namespace string + TopicPattern string + Wildcard bool +} + +// ListDistinctTopicPatterns returns the unique (namespace, topic_pattern) +// pairs across all enabled triggers attached to active functions. Used by +// PubSubDispatcher.Start to decide which libp2p pubsub topics to subscribe +// to so WASM-published events actually reach trigger handlers (bugboard +// #282 — dispatcher previously only fired from HTTP publishes, so WASM +// publishes from message-create silently dropped every handler invocation). +// +// The dispatcher subscribes to each NON-wildcard pattern at startup and on +// trigger add/remove. Wildcard patterns are returned with Wildcard=true so +// callers can log/skip them — handling those cross-node properly requires +// a different mechanism (per-namespace fan-out topic or publish-side hook) +// that's not in scope for this fix. +func (s *PubSubTriggerStore) ListDistinctTopicPatterns(ctx context.Context) ([]DistinctTopicSubscription, error) { + query := ` + SELECT DISTINCT f.namespace AS namespace, t.topic_pattern AS topic_pattern + FROM function_pubsub_triggers t + JOIN functions f ON t.function_id = f.id + WHERE t.enabled = TRUE AND f.status = 'active' + ORDER BY f.namespace, t.topic_pattern + ` + var rows []struct { + Namespace string + TopicPattern string + } + if err := s.db.Query(ctx, &rows, query); err != nil { + return nil, fmt.Errorf("ListDistinctTopicPatterns: %w", err) + } + out := make([]DistinctTopicSubscription, 0, len(rows)) + for _, r := range rows { + out = append(out, DistinctTopicSubscription{ + Namespace: r.Namespace, + TopicPattern: r.TopicPattern, + Wildcard: IsWildcard(r.TopicPattern), + }) + } + return out, nil +} + // GetByTopicAndNamespace returns all enabled triggers whose topic_pattern // matches `topic` within the namespace. Patterns are SQLite GLOB; the // post-filter enforces stricter segment-aware semantics. diff --git a/core/pkg/serverless/triggers/triggers_test.go b/core/pkg/serverless/triggers/triggers_test.go index e2662f4..125c566 100644 --- a/core/pkg/serverless/triggers/triggers_test.go +++ b/core/pkg/serverless/triggers/triggers_test.go @@ -33,7 +33,7 @@ type mockInvokeCall struct { func TestDispatcher_DepthLimit(t *testing.T) { logger, _ := zap.NewDevelopment() store := NewPubSubTriggerStore(nil, logger) // store won't be called - d := NewPubSubDispatcher(store, nil, nil, logger) + d := NewPubSubDispatcher(store, nil, nil, nil, logger) // Dispatch at max depth should be a no-op (no panic, no store call) d.Dispatch(context.Background(), "ns", "topic", []byte("data"), maxTriggerDepth) diff --git a/core/pkg/serverless/types.go b/core/pkg/serverless/types.go index 9f59faa..6a5bdf9 100644 --- a/core/pkg/serverless/types.go +++ b/core/pkg/serverless/types.go @@ -444,6 +444,21 @@ type HostServices interface { // returned JSON, NOT as a Go error. DBTransaction(ctx context.Context, opsJSON []byte) ([]byte, error) + // DBQueryBatch runs N SELECT statements in ONE round-trip to the leader + // (via RQLite's /db/query bulk endpoint). All queries see the same + // committed snapshot. opsJSON shape: {"ops":[{"sql":"...","args":[...]}, ...]}. + // Returns JSON {"results":[{"rows":[...], "error":""}, ...]} with one + // entry per input op, in the same order. Per-query errors are surfaced + // in the per-op `error` field; the call only returns a Go error on + // transport/validation failures. + // + // Use this for read-heavy functions that gather state from many tables + // before doing work — e.g. anchat's message-create reads auth + + // participants + devices (7-10 SELECTs) before writing. Empirically on + // devnet's cross-region cluster: 10 sequential DBQuery = ~3.5s; one + // DBQueryBatch with 10 statements = ~340ms. See bugboard #270. + DBQueryBatch(ctx context.Context, opsJSON []byte) ([]byte, error) + // ExecAndPublish runs ops atomically (like DBTransaction) and, ONLY // if the batch commits, publishes data to the named topic with any // occurrence of the literal string "{{seq}}" replaced by the assigned