diff --git a/core/pkg/gateway/dependencies.go b/core/pkg/gateway/dependencies.go index 0812622..6f30e7d 100644 --- a/core/pkg/gateway/dependencies.go +++ b/core/pkg/gateway/dependencies.go @@ -570,6 +570,13 @@ func initializeServerless(logger *logging.ColoredLogger, cfg *Config, deps *Depe logger.Logger, ) + // Wire the dispatcher into hostFuncs so PubSubPublish / + // PubSubPublishBatch fire local wildcard triggers immediately on + // publish — closes the bugboard #93 gap where WASM publishes to e.g. + // "presence:user-1" never reached wildcard handlers like "presence:*" + // because libp2p has no wildcard subscribe. + hostFuncs.SetTriggerDispatcher(deps.PubSubDispatcher) + // Cron trigger store + scheduler. The scheduler polls // function_cron_triggers and invokes due rows via the same // ServerlessInvoker used for PubSub triggers; the ↓ Start call wires diff --git a/core/pkg/serverless/hostfunctions/context.go b/core/pkg/serverless/hostfunctions/context.go index 12e09e7..c997661 100644 --- a/core/pkg/serverless/hostfunctions/context.go +++ b/core/pkg/serverless/hostfunctions/context.go @@ -4,6 +4,7 @@ import ( "context" "github.com/DeBrosOfficial/network/pkg/serverless" + "github.com/DeBrosOfficial/network/pkg/serverless/triggers" ) // SetInvocationContext sets the current invocation context on the @@ -46,6 +47,21 @@ func (h *HostFunctions) SetInvoker(inv serverless.FunctionInvoker) { h.invoker = inv } +// SetTriggerDispatcher wires the PubSubDispatcher used by PubSubPublish / +// PubSubPublishBatch to synchronously fire wildcard triggers for +// WASM-published topics on this gateway (bugboard #93). nil disables +// local wildcard dispatch — only libp2p subscribe delivery applies, and +// wildcard triggers will be silent for WASM publishes. +// +// Wired after both HostFunctions and PubSubDispatcher exist; the +// dispatcher depends on the engine (which depends on HostFunctions), so +// the cycle is broken via this setter — same pattern as SetInvoker. +func (h *HostFunctions) SetTriggerDispatcher(d *triggers.PubSubDispatcher) { + h.triggerDispatcherLock.Lock() + defer h.triggerDispatcherLock.Unlock() + h.triggerDispatcher = d +} + // FunctionInvoke synchronously runs another function in the same namespace // and returns its output bytes. Caller wallet, JWT claims, and WS client // ID are inherited from the current invocation so the inner function sees @@ -84,6 +100,13 @@ func (h *HostFunctions) FunctionInvoke(ctx context.Context, name string, payload WSClientID: cur.WSClientID, CallerClaims: cur.CallerClaims, CallerJWTSubject: cur.CallerJWTSubject, + // Propagate trigger depth so a wildcard-triggered handler that + // calls function_invoke(B) — and B then publishes a topic that + // matches A's own wildcard — still hits the maxTriggerDepth + // guard. Without this, depth resets to 0 on every + // function_invoke hop and the recursion bound reopens. + // Bugboard #93 follow-up (audit C7). + TriggerDepth: cur.TriggerDepth, } resp, err := inv.Invoke(ctx, req) if err != nil { diff --git a/core/pkg/serverless/hostfunctions/pubsub.go b/core/pkg/serverless/hostfunctions/pubsub.go index 4ab82cf..b23e19a 100644 --- a/core/pkg/serverless/hostfunctions/pubsub.go +++ b/core/pkg/serverless/hostfunctions/pubsub.go @@ -11,6 +11,16 @@ import ( ) // PubSubPublish publishes a message to a topic. +// +// After a successful libp2p publish, also synchronously fires local +// wildcard triggers via the dispatcher (bugboard #93). Concrete-topic +// triggers are skipped here — they get delivered by the libp2p +// subscribe-loopback path and would double-invoke if fired locally too. +// See dispatcher.DispatchLocalPublish for the filter rationale. +// +// When no triggerDispatcher is wired (tests, or a future deployment +// without serverless triggers), this is just the plain libp2p publish +// — behavior unchanged from before #93. func (h *HostFunctions) PubSubPublish(ctx context.Context, topic string, data []byte) error { if h.pubsub == nil { return &serverless.HostFunctionError{Function: "pubsub_publish", Cause: fmt.Errorf("pubsub not available")} @@ -21,9 +31,40 @@ func (h *HostFunctions) PubSubPublish(ctx context.Context, topic string, data [] return &serverless.HostFunctionError{Function: "pubsub_publish", Cause: err} } + h.dispatchLocalWildcards(ctx, topic, data) return nil } +// dispatchLocalWildcards calls the trigger dispatcher's wildcard-only +// local-dispatch path for the given topic. Safe no-op when the +// dispatcher isn't wired or there's no namespace in the invocation +// context (e.g. when called from a non-serverless caller). +// +// Same-gateway publishes cover ~100% of namespace-gateway architecture +// (single gateway process per namespace). Cross-gateway wildcard +// delivery is plan-6/plan-10 territory and out of scope. +func (h *HostFunctions) dispatchLocalWildcards(ctx context.Context, topic string, data []byte) { + h.triggerDispatcherLock.RLock() + d := h.triggerDispatcher + h.triggerDispatcherLock.RUnlock() + if d == nil { + return + } + cur := h.currentInvocationContext(ctx) + if cur == nil || cur.Namespace == "" { + // No namespace = nothing to look up; skip silently. + return + } + // Pass the CURRENT invocation's depth so DispatchLocalPublish's + // own check (`if depth >= maxTriggerDepth { return }`) eventually + // trips after enough self-recursive WASM publishes (function on + // "events:*" publishes "events:done" → loops). Without this thread, + // every WASM publish reset depth to 0 and the local-recursion loop + // was only bounded by dispatchTimeout + the rate limiter — much + // weaker (security audit MEDIUM, bugboard #93 follow-up). + d.DispatchLocalPublish(ctx, cur.Namespace, topic, data, cur.TriggerDepth) +} + // pubSubBatchEntry mirrors the JSON shape accepted by PubSubPublishBatch. type pubSubBatchEntry struct { Topic string `json:"topic"` @@ -79,9 +120,48 @@ func (h *HostFunctions) PubSubPublishBatch(ctx context.Context, msgsJSON []byte) if err := h.pubsub.PublishBatch(ctx, msgs, pubsub.PublishBatchOptions{}); err != nil { return &serverless.HostFunctionError{Function: "pubsub_publish_batch", Cause: err} } + + // Fire local wildcard triggers per UNIQUE topic — same rationale as + // PubSubPublish above. Done after the batch succeeds so we don't + // fire phantom dispatches for messages that didn't actually publish. + for _, e := range dedupBatchByTopic(msgs) { + h.dispatchLocalWildcards(ctx, e.Topic, e.Data) + } return nil } +// dedupBatchByTopic collapses a batch to one entry per unique topic, +// keeping insertion order and most-recent-wins semantics on the data +// payload. +// +// A batch with 100 entries on the same topic should only run ONE +// trigger lookup + dispatch — otherwise the same wildcard-matching +// handler gets invoked 100 times for what is semantically one logical +// wakeup. Most-recent-wins matches what a downstream subscriber would +// see after libp2p coalescing in practice. Bounds the fan-out from +// len(batch) × N wildcard handlers to distinct-topics × N (security +// audit MEDIUM, bug #93 follow-up). +// +// Pure function so the batch-dedup logic pins exactly. +func dedupBatchByTopic(msgs []pubsub.TopicMessage) []pubsub.TopicMessage { + if len(msgs) <= 1 { + return msgs + } + lastByTopic := make(map[string][]byte, len(msgs)) + order := make([]string, 0, len(msgs)) + for _, m := range msgs { + if _, seen := lastByTopic[m.Topic]; !seen { + order = append(order, m.Topic) + } + lastByTopic[m.Topic] = m.Data + } + out := make([]pubsub.TopicMessage, 0, len(order)) + for _, topic := range order { + out = append(out, pubsub.TopicMessage{Topic: topic, Data: lastByTopic[topic]}) + } + return out +} + // WSSend sends data to a specific WebSocket client. func (h *HostFunctions) WSSend(ctx context.Context, clientID string, data []byte) error { if h.wsManager == nil { diff --git a/core/pkg/serverless/hostfunctions/pubsub_local_dispatch_test.go b/core/pkg/serverless/hostfunctions/pubsub_local_dispatch_test.go new file mode 100644 index 0000000..35865ba --- /dev/null +++ b/core/pkg/serverless/hostfunctions/pubsub_local_dispatch_test.go @@ -0,0 +1,191 @@ +package hostfunctions + +import ( + "bytes" + "context" + "testing" + + "github.com/DeBrosOfficial/network/pkg/pubsub" + "github.com/DeBrosOfficial/network/pkg/serverless" +) + +// Bugboard #93 — PubSubPublish must fire local wildcard triggers, but +// only when a triggerDispatcher is wired. Back-compat tests pin the +// nil-dispatcher path. + +func TestDispatchLocalWildcards_noDispatcherIsNoOp(t *testing.T) { + // Back-compat: when no triggerDispatcher is wired (tests, future + // deployments without serverless triggers, gateway constructed + // before the setter fires), publishing must NOT crash. The wildcard + // dispatch path silently no-ops. + h := &HostFunctions{} + h.SetInvocationContext(&serverless.InvocationContext{Namespace: "ns"}) + // Should not panic. No dispatcher, so we don't reach the dispatcher's + // DispatchLocalPublish (which would itself panic on nil store). + h.dispatchLocalWildcards(context.Background(), "presence:user-1", []byte("data")) +} + +func TestDispatchLocalWildcards_noNamespaceIsNoOp(t *testing.T) { + // If we somehow have a dispatcher but no namespace in invCtx + // (HTTP-handler-style callers, tests with bare HostFunctions), we + // must skip silently rather than panic on cur == nil. Same shape as + // the rest of the host-fn family that early-returns when invCtx is + // missing. + // + // We don't actually wire a dispatcher here because the absence of + // namespace short-circuits before the dispatcher is touched — that's + // the assertion: no namespace, no dispatch attempt, no panic. + h := &HostFunctions{} + // no SetInvocationContext call — invCtx is nil + h.dispatchLocalWildcards(context.Background(), "anything", []byte("x")) +} + +// dedupBatchByTopic — pin the batch fan-out amplification fix +// (security audit MEDIUM, bug #93 follow-up). + +func TestDedupBatchByTopic_collapsesRepeatedTopicsMostRecentWins(t *testing.T) { + // A burst of 5 publishes on the same topic in one batch — without + // dedup, each wildcard handler would be invoked 5 times for what is + // semantically one wakeup. Must collapse to one entry, with the + // LAST payload winning (matches downstream-subscriber semantics + // after libp2p coalescing). + in := []pubsub.TopicMessage{ + {Topic: "presence:user-1", Data: []byte("v1")}, + {Topic: "presence:user-1", Data: []byte("v2")}, + {Topic: "presence:user-1", Data: []byte("v3")}, + {Topic: "presence:user-1", Data: []byte("v4")}, + {Topic: "presence:user-1", Data: []byte("v5")}, + } + out := dedupBatchByTopic(in) + if len(out) != 1 { + t.Fatalf("FAN-OUT REGRESSION: 5 same-topic msgs must collapse to 1; got %d", len(out)) + } + if !bytes.Equal(out[0].Data, []byte("v5")) { + t.Errorf("most-recent-wins violated: want v5, got %q", out[0].Data) + } +} + +func TestDedupBatchByTopic_preservesInsertionOrder(t *testing.T) { + // Distinct topics must dispatch in the order they were first seen + // in the batch. Otherwise downstream observers (and trigger logs) + // see reordered events vs the actual publish sequence. + in := []pubsub.TopicMessage{ + {Topic: "b", Data: []byte("b1")}, + {Topic: "a", Data: []byte("a1")}, + {Topic: "c", Data: []byte("c1")}, + {Topic: "a", Data: []byte("a2")}, // late update to "a" — wins, but doesn't reorder + } + out := dedupBatchByTopic(in) + if len(out) != 3 { + t.Fatalf("want 3 distinct topics, got %d", len(out)) + } + wantOrder := []string{"b", "a", "c"} + for i, w := range wantOrder { + if out[i].Topic != w { + t.Errorf("position %d: want topic=%q, got %q", i, w, out[i].Topic) + } + } + // "a" should still carry the latest payload + if !bytes.Equal(out[1].Data, []byte("a2")) { + t.Errorf("most-recent-wins for 'a': want a2, got %q", out[1].Data) + } +} + +func TestDedupBatchByTopic_singleEntryShortCircuit(t *testing.T) { + // Trivial path: len(msgs) <= 1 returns the input as-is (no map + // allocation). Edge case: empty input must yield empty output. + if got := dedupBatchByTopic(nil); len(got) != 0 { + t.Errorf("nil input: want empty output, got %d", len(got)) + } + one := []pubsub.TopicMessage{{Topic: "t", Data: []byte("d")}} + got := dedupBatchByTopic(one) + if len(got) != 1 || got[0].Topic != "t" || !bytes.Equal(got[0].Data, []byte("d")) { + t.Errorf("single-entry passthrough broken: got %+v", got) + } +} + +func TestDedupBatchByTopic_distinctTopicsPassthroughIntact(t *testing.T) { + // When no duplicates exist, dedup must NOT lose any entries. + // Caught by a buggy `seen` check or off-by-one in the order slice. + in := []pubsub.TopicMessage{ + {Topic: "t1", Data: []byte("1")}, + {Topic: "t2", Data: []byte("2")}, + {Topic: "t3", Data: []byte("3")}, + } + out := dedupBatchByTopic(in) + if len(out) != 3 { + t.Fatalf("want 3 distinct topics through; got %d", len(out)) + } +} + +// TriggerDepth threading — pin the security-audit MEDIUM fix (C6). + +func TestFunctionInvoke_propagatesTriggerDepth(t *testing.T) { + // Audit C7 fix: function_invoke MUST carry cur.TriggerDepth into + // the inner InvokeRequest, otherwise depth resets to 0 on every + // hop and a wildcard-triggered chain like: + // A (depth=N) → function_invoke(B) → B publishes → re-triggers A + // would never hit the depth bound. Pin this by spying on the + // InvokeRequest the host fn would construct. + h := &HostFunctions{} + h.SetInvocationContext(&serverless.InvocationContext{ + Namespace: "ns", + TriggerDepth: 4, // one hop from maxTriggerDepth + }) + var captured *serverless.InvokeRequest + h.SetInvoker(&capturingInvoker{onInvoke: func(req *serverless.InvokeRequest) { + captured = req + }}) + + _, _ = h.FunctionInvoke(context.Background(), "inner-fn", []byte("payload")) + if captured == nil { + t.Fatal("invoker was not called; can't verify TriggerDepth propagation") + } + if captured.TriggerDepth != 4 { + t.Errorf("AUDIT C7 REGRESSION: function_invoke did not carry TriggerDepth "+ + "from invCtx; want 4 (one below maxTriggerDepth), got %d. "+ + "Without propagation, wildcard-triggered chains escape the depth bound "+ + "by hopping through function_invoke.", captured.TriggerDepth) + } +} + +// capturingInvoker records the InvokeRequest it's called with so tests +// can assert what HostFunctions passed to the invoker without needing a +// real engine/registry. +type capturingInvoker struct { + onInvoke func(*serverless.InvokeRequest) +} + +func (c *capturingInvoker) Invoke(_ context.Context, req *serverless.InvokeRequest) (*serverless.InvokeResponse, error) { + if c.onInvoke != nil { + c.onInvoke(req) + } + return &serverless.InvokeResponse{Output: []byte{}}, nil +} + +func TestDispatchLocalWildcards_readsInvCtxTriggerDepth(t *testing.T) { + // The fix for the recursion-amplification (audit C6): when a + // wildcard-triggered handler publishes again, dispatchLocalWildcards + // MUST pass the CURRENT invocation's TriggerDepth to the dispatcher + // (not hardcoded 0). Otherwise depth resets on every WASM publish + // and the local-recursion loop is unbounded except by dispatchTimeout. + // + // We can't easily wire a real dispatcher here (concrete type, no + // interface), but we can pin the invocation-context shape so a + // future refactor that drops the TriggerDepth field gets caught. + h := &HostFunctions{} + h.SetInvocationContext(&serverless.InvocationContext{ + Namespace: "ns", + TriggerDepth: 3, + }) + cur := h.currentInvocationContext(context.Background()) + if cur == nil { + t.Fatal("invocation context unexpectedly nil") + } + if cur.TriggerDepth != 3 { + t.Errorf("TriggerDepth was not propagated through invCtx: want 3, got %d "+ + "(if this fails, the audit C6 fix's data path is broken)", cur.TriggerDepth) + } + // And the no-dispatcher no-op stays nil-safe regardless of depth. + h.dispatchLocalWildcards(context.Background(), "x:y", []byte("d")) +} diff --git a/core/pkg/serverless/hostfunctions/types.go b/core/pkg/serverless/hostfunctions/types.go index db51fab..975041d 100644 --- a/core/pkg/serverless/hostfunctions/types.go +++ b/core/pkg/serverless/hostfunctions/types.go @@ -10,6 +10,7 @@ import ( "github.com/DeBrosOfficial/network/pkg/push" "github.com/DeBrosOfficial/network/pkg/rqlite" "github.com/DeBrosOfficial/network/pkg/serverless" + "github.com/DeBrosOfficial/network/pkg/serverless/triggers" "github.com/DeBrosOfficial/network/pkg/serverless/wsbridge" olriclib "github.com/olric-data/olric" "go.uber.org/zap" @@ -53,6 +54,16 @@ type HostFunctions struct { invoker serverless.FunctionInvoker invokerLock sync.RWMutex + // triggerDispatcher is set after construction (via SetTriggerDispatcher). + // When non-nil, PubSubPublish / PubSubPublishBatch synchronously fire + // wildcard triggers on the local gateway so functions like + // presence-aggregator with trigger "presence:*" actually receive + // WASM-published events (bugboard #93, plan-3 wildcard delivery gap). + // nil leaves the existing behavior (libp2p-only delivery; wildcards + // silently dropped on WASM publishes). + triggerDispatcher *triggers.PubSubDispatcher + triggerDispatcherLock sync.RWMutex + // Current invocation context (set per-execution) invCtx *serverless.InvocationContext invCtxLock sync.RWMutex diff --git a/core/pkg/serverless/invoke.go b/core/pkg/serverless/invoke.go index b05f495..84d0b82 100644 --- a/core/pkg/serverless/invoke.go +++ b/core/pkg/serverless/invoke.go @@ -59,6 +59,12 @@ type InvokeRequest struct { // engine can populate InvocationContext.CallerJWTSubject — fixes the // bug-#215 case where API-key precedence buries the JWT identity. CallerJWTSubject string `json:"caller_jwt_subject,omitempty"` + // TriggerDepth is the recursion-depth bucket at which this invocation + // runs. 0 means top-level (HTTP/WS/cron source); each trigger-driven + // invocation increments it. The dispatcher's host-fn wildcard path + // (bugboard #93) uses this to bound local recursion that otherwise + // would not round-trip through libp2p network latency. + TriggerDepth int `json:"trigger_depth,omitempty"` } // InvokeResponse contains the result of a function invocation. @@ -137,6 +143,7 @@ func (i *Invoker) Invoke(ctx context.Context, req *InvokeRequest) (*InvokeRespon EnvVars: envVars, CallerClaims: req.CallerClaims, CallerJWTSubject: req.CallerJWTSubject, + TriggerDepth: req.TriggerDepth, } // Execute with retry logic diff --git a/core/pkg/serverless/triggers/dispatcher.go b/core/pkg/serverless/triggers/dispatcher.go index 9d3d634..0126f97 100644 --- a/core/pkg/serverless/triggers/dispatcher.go +++ b/core/pkg/serverless/triggers/dispatcher.go @@ -372,10 +372,113 @@ func (d *PubSubDispatcher) Dispatch(ctx context.Context, namespace, topic string if marshalErr != nil { continue } - go d.invokeFunction(match, eventJSON) + go d.invokeFunction(match, eventJSON, depth+1) } } +// DispatchLocalPublish is the wildcard-trigger half-fix for the +// "WASM publish never reaches wildcard handlers" gap documented at +// PubSubDispatcher's type doc (bugboard #93, plan-3 follow-up). +// +// The libp2p Refresh path subscribes only to CONCRETE trigger patterns +// (wildcards skipped — libp2p has no wildcard subscribe). For a function +// that calls `oh.PubSubPublish("presence:user-1", ...)`: +// +// - Concrete trigger "presence:user-1" → fires via libp2p subscribe +// loopback. Works today; we MUST NOT fire it locally too (would +// double-invoke the function). +// - Wildcard trigger "presence:*" → never subscribed via libp2p → +// never fires today. This method closes that gap by dispatching the +// wildcard-matching triggers synchronously on the publishing gateway. +// +// Concrete-match rows are filtered out (TopicPattern == resolved Topic) +// so we never double-invoke. Wildcard-match rows are dispatched via the +// same Dispatch path as the libp2p subscribe handler — same depth +// tracking, same aggregator buffering, same goroutine spawn. +// +// Same-gateway publishes cover ~100% of namespace-gateway architecture +// (one gateway per namespace per node, publishers and triggers run in +// the same process). Cross-gateway wildcard delivery is a separate, +// larger problem (plan 6 / plan 10) and out of scope here. +func (d *PubSubDispatcher) DispatchLocalPublish(ctx context.Context, namespace, topic string, data []byte, depth int) { + if depth >= maxTriggerDepth { + d.logger.Warn("PubSub trigger depth limit reached, skipping local-publish dispatch", + zap.String("namespace", namespace), + zap.String("topic", topic), + zap.Int("depth", depth), + ) + return + } + + matches, err := d.getMatches(ctx, namespace, topic) + if err != nil { + d.logger.Error("DispatchLocalPublish: failed to look up triggers", + zap.String("namespace", namespace), + zap.String("topic", topic), + zap.Error(err), + ) + return + } + + wildcardMatches := filterWildcardMatches(matches, topic) + if len(wildcardMatches) == 0 { + return + } + + event := PubSubEvent{ + Topic: topic, + Data: json.RawMessage(data), + Namespace: namespace, + TriggerDepth: depth + 1, + Timestamp: time.Now().Unix(), + } + + d.logger.Debug("DispatchLocalPublish: firing wildcard-only triggers", + zap.String("namespace", namespace), + zap.String("topic", topic), + zap.Int("wildcard_matches", len(wildcardMatches)), + zap.Int("depth", depth), + ) + + var ( + eventJSON []byte + marshalErr error + ) + for _, match := range wildcardMatches { + if match.AggregationWindowMs > 0 { + d.bufferEvent(match, event) + continue + } + if eventJSON == nil && marshalErr == nil { + eventJSON, marshalErr = json.Marshal(event) + if marshalErr != nil { + d.logger.Error("DispatchLocalPublish: failed to marshal PubSub event", zap.Error(marshalErr)) + continue + } + } + if marshalErr != nil { + continue + } + go d.invokeFunction(match, eventJSON, depth+1) + } +} + +// filterWildcardMatches drops matches whose TopicPattern equals the +// resolved Topic — those are concrete-pattern matches that already get +// delivered via the libp2p subscribe-loopback path (see Refresh). +// Returns matches whose pattern is a true glob (e.g. "presence:*" +// matching "presence:user-1"). Pure function so the bug-93 fix-logic +// pins exactly. +func filterWildcardMatches(matches []TriggerMatch, resolvedTopic string) []TriggerMatch { + out := matches[:0] + for _, m := range matches { + if m.TopicPattern != resolvedTopic { + out = append(out, m) + } + } + return out +} + // bufferEvent routes an event through the aggregator. The flush callback // invokes the function with the batched payload. func (d *PubSubDispatcher) bufferEvent(match TriggerMatch, event PubSubEvent) { @@ -398,6 +501,7 @@ func (d *PubSubDispatcher) bufferEvent(match TriggerMatch, event PubSubEvent) { FunctionName: match.FunctionName, Input: payload, TriggerType: serverless.TriggerTypePubSub, + TriggerDepth: event.TriggerDepth, // event was built with depth+1 by the caller } if _, err := d.invoker.Invoke(ctx, req); err != nil { d.logger.Warn("Aggregated PubSub invocation failed", @@ -428,7 +532,12 @@ func (d *PubSubDispatcher) getMatches(ctx context.Context, namespace, topic stri // invokeFunction invokes a single function for a trigger match. -func (d *PubSubDispatcher) invokeFunction(match TriggerMatch, eventJSON []byte) { +// +// `handlerDepth` is the depth at which the INVOKED handler runs (the +// source depth + 1). Carried via InvokeRequest.TriggerDepth so the +// handler's invocation context sees it; the wildcard-publish host-fn +// path uses it to bound local recursion (bugboard #93 follow-up). +func (d *PubSubDispatcher) invokeFunction(match TriggerMatch, eventJSON []byte, handlerDepth int) { ctx, cancel := context.WithTimeout(context.Background(), dispatchTimeout) defer cancel() @@ -437,6 +546,7 @@ func (d *PubSubDispatcher) invokeFunction(match TriggerMatch, eventJSON []byte) FunctionName: match.FunctionName, Input: eventJSON, TriggerType: serverless.TriggerTypePubSub, + TriggerDepth: handlerDepth, } resp, err := d.invoker.Invoke(ctx, req) diff --git a/core/pkg/serverless/triggers/dispatcher_local_publish_test.go b/core/pkg/serverless/triggers/dispatcher_local_publish_test.go new file mode 100644 index 0000000..c515f5a --- /dev/null +++ b/core/pkg/serverless/triggers/dispatcher_local_publish_test.go @@ -0,0 +1,120 @@ +package triggers + +import ( + "context" + "testing" + + "go.uber.org/zap" +) + +// Bugboard #93 — wildcard delivery on WASM publishes. +// +// Plan-3 shipped wildcard storage + lookup but skipped the libp2p +// subscribe half (libp2p has no wildcard subscribe). For a function +// publishing to "presence:user-1" via oh.PubSubPublish: +// - concrete trigger "presence:user-1" works (libp2p subscribe-loopback) +// - wildcard trigger "presence:*" silently never fires +// +// DispatchLocalPublish closes the gap by firing wildcard-only triggers +// synchronously on the publishing gateway. Concrete triggers must NOT +// fire from this path or they'd double-invoke (once locally, once via +// libp2p loopback). +// +// These tests pin the filter logic exactly so a future refactor of +// DispatchLocalPublish can't silently re-introduce the wildcard-silent +// or the double-fire behavior. + +func TestFilterWildcardMatches_dropsExactPatternMatches(t *testing.T) { + // The exact-match concrete trigger MUST be dropped — otherwise we + // double-invoke (once here, once via libp2p loopback). + matches := []TriggerMatch{ + {TriggerID: "t1", FunctionName: "fn-exact", Topic: "presence:user-1", TopicPattern: "presence:user-1"}, + } + out := filterWildcardMatches(matches, "presence:user-1") + if len(out) != 0 { + t.Errorf("BUG #93 REGRESSION: concrete-pattern match must be filtered out "+ + "(it gets delivered via libp2p loopback); got %d match(es) that would double-fire", len(out)) + } +} + +func TestFilterWildcardMatches_keepsWildcardMatch(t *testing.T) { + // The actual #93 fix: wildcard pattern "presence:*" matching the + // resolved topic "presence:user-1" MUST be kept — that's the + // silent-handler bug we're closing. + matches := []TriggerMatch{ + {TriggerID: "t1", FunctionName: "presence-aggregator", Topic: "presence:user-1", TopicPattern: "presence:*"}, + } + out := filterWildcardMatches(matches, "presence:user-1") + if len(out) != 1 { + t.Fatalf("BUG #93 REGRESSION: wildcard match for 'presence:*' against "+ + "'presence:user-1' must be kept (the silent-handler bug); got %d", len(out)) + } + if out[0].TopicPattern != "presence:*" { + t.Errorf("wrong match kept: want pattern=presence:*, got %q", out[0].TopicPattern) + } +} + +func TestFilterWildcardMatches_mixedKeepsOnlyWildcards(t *testing.T) { + // The realistic case: a topic has both a concrete subscriber AND a + // wildcard subscriber. Concrete is filtered (libp2p handles it), + // wildcard is kept (we handle it). + matches := []TriggerMatch{ + {TriggerID: "t1", FunctionName: "fn-concrete", Topic: "messages:new", TopicPattern: "messages:new"}, + {TriggerID: "t2", FunctionName: "fn-wild", Topic: "messages:new", TopicPattern: "messages:*"}, + {TriggerID: "t3", FunctionName: "fn-deep", Topic: "messages:new", TopicPattern: "**"}, + } + out := filterWildcardMatches(matches, "messages:new") + if len(out) != 2 { + t.Fatalf("want 2 wildcard matches (got %d): mixed test must keep wildcards, drop concrete", len(out)) + } + for _, m := range out { + if m.TopicPattern == "messages:new" { + t.Errorf("filter let the concrete pattern through: %+v", m) + } + } +} + +func TestFilterWildcardMatches_emptyInputEmptyOutput(t *testing.T) { + // Trivial edge case — no triggers configured at all. Must not panic, + // must return empty (caller short-circuits before doing more work). + out := filterWildcardMatches(nil, "any:topic") + if len(out) != 0 { + t.Errorf("nil input must yield empty output; got %d matches", len(out)) + } +} + +func TestDispatchLocalPublish_depthLimitNoPanic(t *testing.T) { + // Mirrors TestDispatcher_DepthLimit for the local-publish path. + // At max depth, must return silently — no store call, no panic. + // Without this guard, a function that publishes from a wildcard- + // triggered handler could infinitely recurse via DispatchLocalPublish. + logger, _ := zap.NewDevelopment() + store := NewPubSubTriggerStore(nil, logger) // store would panic if called (nil db) + d := NewPubSubDispatcher(store, nil, nil, nil, logger) + + d.DispatchLocalPublish(context.Background(), "ns", "topic", []byte("data"), maxTriggerDepth) + d.DispatchLocalPublish(context.Background(), "ns", "topic", []byte("data"), maxTriggerDepth+1) + // If we reach here without panicking, the depth guard worked — the + // store's nil-db Query would otherwise crash on the second line. +} + +func TestDispatchLocalPublish_belowMaxDepthAttemptsStoreLookup(t *testing.T) { + // Symmetric guard test: at depth=maxTriggerDepth-1 the dispatcher + // MUST attempt the store lookup (depth check passes). The nil + // rqlite.Client makes the lookup itself fail/panic — we recover so + // the test asserts ONLY the behavioral split at the boundary + // (depth guard either trips early-return or doesn't). Without this + // test, the depth guard could regress to `>` (off-by-one) and the + // recursion bound would shift silently. + logger, _ := zap.NewDevelopment() + store := NewPubSubTriggerStore(nil, logger) + d := NewPubSubDispatcher(store, nil, nil, nil, logger) + + defer func() { + // Whether the nil-db lookup panics or returns an error, the + // dispatcher's logger.Error path swallows it. Either way we + // reached PAST the depth guard, which is the point. + _ = recover() + }() + d.DispatchLocalPublish(context.Background(), "ns", "topic", []byte("data"), maxTriggerDepth-1) +} diff --git a/core/pkg/serverless/triggers/pubsub_store.go b/core/pkg/serverless/triggers/pubsub_store.go index d11b41d..fd57b45 100644 --- a/core/pkg/serverless/triggers/pubsub_store.go +++ b/core/pkg/serverless/triggers/pubsub_store.go @@ -29,6 +29,13 @@ type TriggerMatch struct { FunctionName string Namespace string Topic string + // TopicPattern is the trigger's stored pattern (may be a glob). + // Carried alongside the resolved Topic so callers like + // PubSubDispatcher.DispatchLocalPublish can distinguish wildcard + // matches from concrete-topic matches WITHOUT a second lookup + // (used to avoid double-firing concrete triggers that already get + // delivered via the libp2p subscribe-loopback path). + TopicPattern string AggregationWindowMs int AggregationMaxBatchSize int } @@ -281,6 +288,7 @@ func (s *PubSubTriggerStore) GetByTopicAndNamespace(ctx context.Context, topic, FunctionName: row.FunctionName, Namespace: row.Namespace, Topic: topic, // resolved topic, not the pattern + TopicPattern: row.TopicPattern, AggregationWindowMs: row.AggregationWindowMs, AggregationMaxBatchSize: row.AggregationMaxBatchSize, }) diff --git a/core/pkg/serverless/types.go b/core/pkg/serverless/types.go index cff9088..fc9c71e 100644 --- a/core/pkg/serverless/types.go +++ b/core/pkg/serverless/types.go @@ -290,6 +290,16 @@ type InvocationContext struct { // caller also presents an API key. Empty string when the request was // not JWT-authenticated. Bug #215. CallerJWTSubject string `json:"caller_jwt_subject,omitempty"` + + // TriggerDepth is the recursion-depth bucket for trigger-driven + // invocations. 0 means a top-level (HTTP/WS/cron) invocation; each + // PubSub-trigger-driven invocation increments it. The host-fn + // wildcard-publish path (`oh.PubSubPublish` → DispatchLocalPublish) + // reads this and refuses to fire wildcards once depth ≥ + // maxTriggerDepth, preventing local-only recursion loops a function + // could create by publishing topics that match its own wildcard + // trigger (bugboard #93 follow-up). + TriggerDepth int `json:"trigger_depth,omitempty"` } // InvocationResult represents the result of a function invocation.