mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-06-16 21:54:14 +00:00
feat(gateway): enable local wildcard triggers for pubsub
- wire PubSubDispatcher to host functions to support local wildcard triggers for WASM-published topics - implement batch deduplication by topic to prevent redundant trigger invocations and bound fan-out - propagate trigger depth through function invocations to maintain recursion limits during local dispatch
This commit is contained in:
parent
0463f37c0d
commit
b2d35bbde1
@ -570,6 +570,13 @@ func initializeServerless(logger *logging.ColoredLogger, cfg *Config, deps *Depe
|
||||
logger.Logger,
|
||||
)
|
||||
|
||||
// Wire the dispatcher into hostFuncs so PubSubPublish /
|
||||
// PubSubPublishBatch fire local wildcard triggers immediately on
|
||||
// publish — closes the bugboard #93 gap where WASM publishes to e.g.
|
||||
// "presence:user-1" never reached wildcard handlers like "presence:*"
|
||||
// because libp2p has no wildcard subscribe.
|
||||
hostFuncs.SetTriggerDispatcher(deps.PubSubDispatcher)
|
||||
|
||||
// Cron trigger store + scheduler. The scheduler polls
|
||||
// function_cron_triggers and invokes due rows via the same
|
||||
// ServerlessInvoker used for PubSub triggers; the ↓ Start call wires
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless"
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless/triggers"
|
||||
)
|
||||
|
||||
// SetInvocationContext sets the current invocation context on the
|
||||
@ -46,6 +47,21 @@ func (h *HostFunctions) SetInvoker(inv serverless.FunctionInvoker) {
|
||||
h.invoker = inv
|
||||
}
|
||||
|
||||
// SetTriggerDispatcher wires the PubSubDispatcher used by PubSubPublish /
|
||||
// PubSubPublishBatch to synchronously fire wildcard triggers for
|
||||
// WASM-published topics on this gateway (bugboard #93). nil disables
|
||||
// local wildcard dispatch — only libp2p subscribe delivery applies, and
|
||||
// wildcard triggers will be silent for WASM publishes.
|
||||
//
|
||||
// Wired after both HostFunctions and PubSubDispatcher exist; the
|
||||
// dispatcher depends on the engine (which depends on HostFunctions), so
|
||||
// the cycle is broken via this setter — same pattern as SetInvoker.
|
||||
func (h *HostFunctions) SetTriggerDispatcher(d *triggers.PubSubDispatcher) {
|
||||
h.triggerDispatcherLock.Lock()
|
||||
defer h.triggerDispatcherLock.Unlock()
|
||||
h.triggerDispatcher = d
|
||||
}
|
||||
|
||||
// FunctionInvoke synchronously runs another function in the same namespace
|
||||
// and returns its output bytes. Caller wallet, JWT claims, and WS client
|
||||
// ID are inherited from the current invocation so the inner function sees
|
||||
@ -84,6 +100,13 @@ func (h *HostFunctions) FunctionInvoke(ctx context.Context, name string, payload
|
||||
WSClientID: cur.WSClientID,
|
||||
CallerClaims: cur.CallerClaims,
|
||||
CallerJWTSubject: cur.CallerJWTSubject,
|
||||
// Propagate trigger depth so a wildcard-triggered handler that
|
||||
// calls function_invoke(B) — and B then publishes a topic that
|
||||
// matches A's own wildcard — still hits the maxTriggerDepth
|
||||
// guard. Without this, depth resets to 0 on every
|
||||
// function_invoke hop and the recursion bound reopens.
|
||||
// Bugboard #93 follow-up (audit C7).
|
||||
TriggerDepth: cur.TriggerDepth,
|
||||
}
|
||||
resp, err := inv.Invoke(ctx, req)
|
||||
if err != nil {
|
||||
|
||||
@ -11,6 +11,16 @@ import (
|
||||
)
|
||||
|
||||
// PubSubPublish publishes a message to a topic.
|
||||
//
|
||||
// After a successful libp2p publish, also synchronously fires local
|
||||
// wildcard triggers via the dispatcher (bugboard #93). Concrete-topic
|
||||
// triggers are skipped here — they get delivered by the libp2p
|
||||
// subscribe-loopback path and would double-invoke if fired locally too.
|
||||
// See dispatcher.DispatchLocalPublish for the filter rationale.
|
||||
//
|
||||
// When no triggerDispatcher is wired (tests, or a future deployment
|
||||
// without serverless triggers), this is just the plain libp2p publish
|
||||
// — behavior unchanged from before #93.
|
||||
func (h *HostFunctions) PubSubPublish(ctx context.Context, topic string, data []byte) error {
|
||||
if h.pubsub == nil {
|
||||
return &serverless.HostFunctionError{Function: "pubsub_publish", Cause: fmt.Errorf("pubsub not available")}
|
||||
@ -21,9 +31,40 @@ func (h *HostFunctions) PubSubPublish(ctx context.Context, topic string, data []
|
||||
return &serverless.HostFunctionError{Function: "pubsub_publish", Cause: err}
|
||||
}
|
||||
|
||||
h.dispatchLocalWildcards(ctx, topic, data)
|
||||
return nil
|
||||
}
|
||||
|
||||
// dispatchLocalWildcards calls the trigger dispatcher's wildcard-only
|
||||
// local-dispatch path for the given topic. Safe no-op when the
|
||||
// dispatcher isn't wired or there's no namespace in the invocation
|
||||
// context (e.g. when called from a non-serverless caller).
|
||||
//
|
||||
// Same-gateway publishes cover ~100% of namespace-gateway architecture
|
||||
// (single gateway process per namespace). Cross-gateway wildcard
|
||||
// delivery is plan-6/plan-10 territory and out of scope.
|
||||
func (h *HostFunctions) dispatchLocalWildcards(ctx context.Context, topic string, data []byte) {
|
||||
h.triggerDispatcherLock.RLock()
|
||||
d := h.triggerDispatcher
|
||||
h.triggerDispatcherLock.RUnlock()
|
||||
if d == nil {
|
||||
return
|
||||
}
|
||||
cur := h.currentInvocationContext(ctx)
|
||||
if cur == nil || cur.Namespace == "" {
|
||||
// No namespace = nothing to look up; skip silently.
|
||||
return
|
||||
}
|
||||
// Pass the CURRENT invocation's depth so DispatchLocalPublish's
|
||||
// own check (`if depth >= maxTriggerDepth { return }`) eventually
|
||||
// trips after enough self-recursive WASM publishes (function on
|
||||
// "events:*" publishes "events:done" → loops). Without this thread,
|
||||
// every WASM publish reset depth to 0 and the local-recursion loop
|
||||
// was only bounded by dispatchTimeout + the rate limiter — much
|
||||
// weaker (security audit MEDIUM, bugboard #93 follow-up).
|
||||
d.DispatchLocalPublish(ctx, cur.Namespace, topic, data, cur.TriggerDepth)
|
||||
}
|
||||
|
||||
// pubSubBatchEntry mirrors the JSON shape accepted by PubSubPublishBatch.
|
||||
type pubSubBatchEntry struct {
|
||||
Topic string `json:"topic"`
|
||||
@ -79,9 +120,48 @@ func (h *HostFunctions) PubSubPublishBatch(ctx context.Context, msgsJSON []byte)
|
||||
if err := h.pubsub.PublishBatch(ctx, msgs, pubsub.PublishBatchOptions{}); err != nil {
|
||||
return &serverless.HostFunctionError{Function: "pubsub_publish_batch", Cause: err}
|
||||
}
|
||||
|
||||
// Fire local wildcard triggers per UNIQUE topic — same rationale as
|
||||
// PubSubPublish above. Done after the batch succeeds so we don't
|
||||
// fire phantom dispatches for messages that didn't actually publish.
|
||||
for _, e := range dedupBatchByTopic(msgs) {
|
||||
h.dispatchLocalWildcards(ctx, e.Topic, e.Data)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// dedupBatchByTopic collapses a batch to one entry per unique topic,
|
||||
// keeping insertion order and most-recent-wins semantics on the data
|
||||
// payload.
|
||||
//
|
||||
// A batch with 100 entries on the same topic should only run ONE
|
||||
// trigger lookup + dispatch — otherwise the same wildcard-matching
|
||||
// handler gets invoked 100 times for what is semantically one logical
|
||||
// wakeup. Most-recent-wins matches what a downstream subscriber would
|
||||
// see after libp2p coalescing in practice. Bounds the fan-out from
|
||||
// len(batch) × N wildcard handlers to distinct-topics × N (security
|
||||
// audit MEDIUM, bug #93 follow-up).
|
||||
//
|
||||
// Pure function so the batch-dedup logic pins exactly.
|
||||
func dedupBatchByTopic(msgs []pubsub.TopicMessage) []pubsub.TopicMessage {
|
||||
if len(msgs) <= 1 {
|
||||
return msgs
|
||||
}
|
||||
lastByTopic := make(map[string][]byte, len(msgs))
|
||||
order := make([]string, 0, len(msgs))
|
||||
for _, m := range msgs {
|
||||
if _, seen := lastByTopic[m.Topic]; !seen {
|
||||
order = append(order, m.Topic)
|
||||
}
|
||||
lastByTopic[m.Topic] = m.Data
|
||||
}
|
||||
out := make([]pubsub.TopicMessage, 0, len(order))
|
||||
for _, topic := range order {
|
||||
out = append(out, pubsub.TopicMessage{Topic: topic, Data: lastByTopic[topic]})
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// WSSend sends data to a specific WebSocket client.
|
||||
func (h *HostFunctions) WSSend(ctx context.Context, clientID string, data []byte) error {
|
||||
if h.wsManager == nil {
|
||||
|
||||
191
core/pkg/serverless/hostfunctions/pubsub_local_dispatch_test.go
Normal file
191
core/pkg/serverless/hostfunctions/pubsub_local_dispatch_test.go
Normal file
@ -0,0 +1,191 @@
|
||||
package hostfunctions
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/pubsub"
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless"
|
||||
)
|
||||
|
||||
// Bugboard #93 — PubSubPublish must fire local wildcard triggers, but
|
||||
// only when a triggerDispatcher is wired. Back-compat tests pin the
|
||||
// nil-dispatcher path.
|
||||
|
||||
func TestDispatchLocalWildcards_noDispatcherIsNoOp(t *testing.T) {
|
||||
// Back-compat: when no triggerDispatcher is wired (tests, future
|
||||
// deployments without serverless triggers, gateway constructed
|
||||
// before the setter fires), publishing must NOT crash. The wildcard
|
||||
// dispatch path silently no-ops.
|
||||
h := &HostFunctions{}
|
||||
h.SetInvocationContext(&serverless.InvocationContext{Namespace: "ns"})
|
||||
// Should not panic. No dispatcher, so we don't reach the dispatcher's
|
||||
// DispatchLocalPublish (which would itself panic on nil store).
|
||||
h.dispatchLocalWildcards(context.Background(), "presence:user-1", []byte("data"))
|
||||
}
|
||||
|
||||
func TestDispatchLocalWildcards_noNamespaceIsNoOp(t *testing.T) {
|
||||
// If we somehow have a dispatcher but no namespace in invCtx
|
||||
// (HTTP-handler-style callers, tests with bare HostFunctions), we
|
||||
// must skip silently rather than panic on cur == nil. Same shape as
|
||||
// the rest of the host-fn family that early-returns when invCtx is
|
||||
// missing.
|
||||
//
|
||||
// We don't actually wire a dispatcher here because the absence of
|
||||
// namespace short-circuits before the dispatcher is touched — that's
|
||||
// the assertion: no namespace, no dispatch attempt, no panic.
|
||||
h := &HostFunctions{}
|
||||
// no SetInvocationContext call — invCtx is nil
|
||||
h.dispatchLocalWildcards(context.Background(), "anything", []byte("x"))
|
||||
}
|
||||
|
||||
// dedupBatchByTopic — pin the batch fan-out amplification fix
|
||||
// (security audit MEDIUM, bug #93 follow-up).
|
||||
|
||||
func TestDedupBatchByTopic_collapsesRepeatedTopicsMostRecentWins(t *testing.T) {
|
||||
// A burst of 5 publishes on the same topic in one batch — without
|
||||
// dedup, each wildcard handler would be invoked 5 times for what is
|
||||
// semantically one wakeup. Must collapse to one entry, with the
|
||||
// LAST payload winning (matches downstream-subscriber semantics
|
||||
// after libp2p coalescing).
|
||||
in := []pubsub.TopicMessage{
|
||||
{Topic: "presence:user-1", Data: []byte("v1")},
|
||||
{Topic: "presence:user-1", Data: []byte("v2")},
|
||||
{Topic: "presence:user-1", Data: []byte("v3")},
|
||||
{Topic: "presence:user-1", Data: []byte("v4")},
|
||||
{Topic: "presence:user-1", Data: []byte("v5")},
|
||||
}
|
||||
out := dedupBatchByTopic(in)
|
||||
if len(out) != 1 {
|
||||
t.Fatalf("FAN-OUT REGRESSION: 5 same-topic msgs must collapse to 1; got %d", len(out))
|
||||
}
|
||||
if !bytes.Equal(out[0].Data, []byte("v5")) {
|
||||
t.Errorf("most-recent-wins violated: want v5, got %q", out[0].Data)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDedupBatchByTopic_preservesInsertionOrder(t *testing.T) {
|
||||
// Distinct topics must dispatch in the order they were first seen
|
||||
// in the batch. Otherwise downstream observers (and trigger logs)
|
||||
// see reordered events vs the actual publish sequence.
|
||||
in := []pubsub.TopicMessage{
|
||||
{Topic: "b", Data: []byte("b1")},
|
||||
{Topic: "a", Data: []byte("a1")},
|
||||
{Topic: "c", Data: []byte("c1")},
|
||||
{Topic: "a", Data: []byte("a2")}, // late update to "a" — wins, but doesn't reorder
|
||||
}
|
||||
out := dedupBatchByTopic(in)
|
||||
if len(out) != 3 {
|
||||
t.Fatalf("want 3 distinct topics, got %d", len(out))
|
||||
}
|
||||
wantOrder := []string{"b", "a", "c"}
|
||||
for i, w := range wantOrder {
|
||||
if out[i].Topic != w {
|
||||
t.Errorf("position %d: want topic=%q, got %q", i, w, out[i].Topic)
|
||||
}
|
||||
}
|
||||
// "a" should still carry the latest payload
|
||||
if !bytes.Equal(out[1].Data, []byte("a2")) {
|
||||
t.Errorf("most-recent-wins for 'a': want a2, got %q", out[1].Data)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDedupBatchByTopic_singleEntryShortCircuit(t *testing.T) {
|
||||
// Trivial path: len(msgs) <= 1 returns the input as-is (no map
|
||||
// allocation). Edge case: empty input must yield empty output.
|
||||
if got := dedupBatchByTopic(nil); len(got) != 0 {
|
||||
t.Errorf("nil input: want empty output, got %d", len(got))
|
||||
}
|
||||
one := []pubsub.TopicMessage{{Topic: "t", Data: []byte("d")}}
|
||||
got := dedupBatchByTopic(one)
|
||||
if len(got) != 1 || got[0].Topic != "t" || !bytes.Equal(got[0].Data, []byte("d")) {
|
||||
t.Errorf("single-entry passthrough broken: got %+v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDedupBatchByTopic_distinctTopicsPassthroughIntact(t *testing.T) {
|
||||
// When no duplicates exist, dedup must NOT lose any entries.
|
||||
// Caught by a buggy `seen` check or off-by-one in the order slice.
|
||||
in := []pubsub.TopicMessage{
|
||||
{Topic: "t1", Data: []byte("1")},
|
||||
{Topic: "t2", Data: []byte("2")},
|
||||
{Topic: "t3", Data: []byte("3")},
|
||||
}
|
||||
out := dedupBatchByTopic(in)
|
||||
if len(out) != 3 {
|
||||
t.Fatalf("want 3 distinct topics through; got %d", len(out))
|
||||
}
|
||||
}
|
||||
|
||||
// TriggerDepth threading — pin the security-audit MEDIUM fix (C6).
|
||||
|
||||
func TestFunctionInvoke_propagatesTriggerDepth(t *testing.T) {
|
||||
// Audit C7 fix: function_invoke MUST carry cur.TriggerDepth into
|
||||
// the inner InvokeRequest, otherwise depth resets to 0 on every
|
||||
// hop and a wildcard-triggered chain like:
|
||||
// A (depth=N) → function_invoke(B) → B publishes → re-triggers A
|
||||
// would never hit the depth bound. Pin this by spying on the
|
||||
// InvokeRequest the host fn would construct.
|
||||
h := &HostFunctions{}
|
||||
h.SetInvocationContext(&serverless.InvocationContext{
|
||||
Namespace: "ns",
|
||||
TriggerDepth: 4, // one hop from maxTriggerDepth
|
||||
})
|
||||
var captured *serverless.InvokeRequest
|
||||
h.SetInvoker(&capturingInvoker{onInvoke: func(req *serverless.InvokeRequest) {
|
||||
captured = req
|
||||
}})
|
||||
|
||||
_, _ = h.FunctionInvoke(context.Background(), "inner-fn", []byte("payload"))
|
||||
if captured == nil {
|
||||
t.Fatal("invoker was not called; can't verify TriggerDepth propagation")
|
||||
}
|
||||
if captured.TriggerDepth != 4 {
|
||||
t.Errorf("AUDIT C7 REGRESSION: function_invoke did not carry TriggerDepth "+
|
||||
"from invCtx; want 4 (one below maxTriggerDepth), got %d. "+
|
||||
"Without propagation, wildcard-triggered chains escape the depth bound "+
|
||||
"by hopping through function_invoke.", captured.TriggerDepth)
|
||||
}
|
||||
}
|
||||
|
||||
// capturingInvoker records the InvokeRequest it's called with so tests
|
||||
// can assert what HostFunctions passed to the invoker without needing a
|
||||
// real engine/registry.
|
||||
type capturingInvoker struct {
|
||||
onInvoke func(*serverless.InvokeRequest)
|
||||
}
|
||||
|
||||
func (c *capturingInvoker) Invoke(_ context.Context, req *serverless.InvokeRequest) (*serverless.InvokeResponse, error) {
|
||||
if c.onInvoke != nil {
|
||||
c.onInvoke(req)
|
||||
}
|
||||
return &serverless.InvokeResponse{Output: []byte{}}, nil
|
||||
}
|
||||
|
||||
func TestDispatchLocalWildcards_readsInvCtxTriggerDepth(t *testing.T) {
|
||||
// The fix for the recursion-amplification (audit C6): when a
|
||||
// wildcard-triggered handler publishes again, dispatchLocalWildcards
|
||||
// MUST pass the CURRENT invocation's TriggerDepth to the dispatcher
|
||||
// (not hardcoded 0). Otherwise depth resets on every WASM publish
|
||||
// and the local-recursion loop is unbounded except by dispatchTimeout.
|
||||
//
|
||||
// We can't easily wire a real dispatcher here (concrete type, no
|
||||
// interface), but we can pin the invocation-context shape so a
|
||||
// future refactor that drops the TriggerDepth field gets caught.
|
||||
h := &HostFunctions{}
|
||||
h.SetInvocationContext(&serverless.InvocationContext{
|
||||
Namespace: "ns",
|
||||
TriggerDepth: 3,
|
||||
})
|
||||
cur := h.currentInvocationContext(context.Background())
|
||||
if cur == nil {
|
||||
t.Fatal("invocation context unexpectedly nil")
|
||||
}
|
||||
if cur.TriggerDepth != 3 {
|
||||
t.Errorf("TriggerDepth was not propagated through invCtx: want 3, got %d "+
|
||||
"(if this fails, the audit C6 fix's data path is broken)", cur.TriggerDepth)
|
||||
}
|
||||
// And the no-dispatcher no-op stays nil-safe regardless of depth.
|
||||
h.dispatchLocalWildcards(context.Background(), "x:y", []byte("d"))
|
||||
}
|
||||
@ -10,6 +10,7 @@ import (
|
||||
"github.com/DeBrosOfficial/network/pkg/push"
|
||||
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless"
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless/triggers"
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless/wsbridge"
|
||||
olriclib "github.com/olric-data/olric"
|
||||
"go.uber.org/zap"
|
||||
@ -53,6 +54,16 @@ type HostFunctions struct {
|
||||
invoker serverless.FunctionInvoker
|
||||
invokerLock sync.RWMutex
|
||||
|
||||
// triggerDispatcher is set after construction (via SetTriggerDispatcher).
|
||||
// When non-nil, PubSubPublish / PubSubPublishBatch synchronously fire
|
||||
// wildcard triggers on the local gateway so functions like
|
||||
// presence-aggregator with trigger "presence:*" actually receive
|
||||
// WASM-published events (bugboard #93, plan-3 wildcard delivery gap).
|
||||
// nil leaves the existing behavior (libp2p-only delivery; wildcards
|
||||
// silently dropped on WASM publishes).
|
||||
triggerDispatcher *triggers.PubSubDispatcher
|
||||
triggerDispatcherLock sync.RWMutex
|
||||
|
||||
// Current invocation context (set per-execution)
|
||||
invCtx *serverless.InvocationContext
|
||||
invCtxLock sync.RWMutex
|
||||
|
||||
@ -59,6 +59,12 @@ type InvokeRequest struct {
|
||||
// engine can populate InvocationContext.CallerJWTSubject — fixes the
|
||||
// bug-#215 case where API-key precedence buries the JWT identity.
|
||||
CallerJWTSubject string `json:"caller_jwt_subject,omitempty"`
|
||||
// TriggerDepth is the recursion-depth bucket at which this invocation
|
||||
// runs. 0 means top-level (HTTP/WS/cron source); each trigger-driven
|
||||
// invocation increments it. The dispatcher's host-fn wildcard path
|
||||
// (bugboard #93) uses this to bound local recursion that otherwise
|
||||
// would not round-trip through libp2p network latency.
|
||||
TriggerDepth int `json:"trigger_depth,omitempty"`
|
||||
}
|
||||
|
||||
// InvokeResponse contains the result of a function invocation.
|
||||
@ -137,6 +143,7 @@ func (i *Invoker) Invoke(ctx context.Context, req *InvokeRequest) (*InvokeRespon
|
||||
EnvVars: envVars,
|
||||
CallerClaims: req.CallerClaims,
|
||||
CallerJWTSubject: req.CallerJWTSubject,
|
||||
TriggerDepth: req.TriggerDepth,
|
||||
}
|
||||
|
||||
// Execute with retry logic
|
||||
|
||||
@ -372,10 +372,113 @@ func (d *PubSubDispatcher) Dispatch(ctx context.Context, namespace, topic string
|
||||
if marshalErr != nil {
|
||||
continue
|
||||
}
|
||||
go d.invokeFunction(match, eventJSON)
|
||||
go d.invokeFunction(match, eventJSON, depth+1)
|
||||
}
|
||||
}
|
||||
|
||||
// DispatchLocalPublish is the wildcard-trigger half-fix for the
|
||||
// "WASM publish never reaches wildcard handlers" gap documented at
|
||||
// PubSubDispatcher's type doc (bugboard #93, plan-3 follow-up).
|
||||
//
|
||||
// The libp2p Refresh path subscribes only to CONCRETE trigger patterns
|
||||
// (wildcards skipped — libp2p has no wildcard subscribe). For a function
|
||||
// that calls `oh.PubSubPublish("presence:user-1", ...)`:
|
||||
//
|
||||
// - Concrete trigger "presence:user-1" → fires via libp2p subscribe
|
||||
// loopback. Works today; we MUST NOT fire it locally too (would
|
||||
// double-invoke the function).
|
||||
// - Wildcard trigger "presence:*" → never subscribed via libp2p →
|
||||
// never fires today. This method closes that gap by dispatching the
|
||||
// wildcard-matching triggers synchronously on the publishing gateway.
|
||||
//
|
||||
// Concrete-match rows are filtered out (TopicPattern == resolved Topic)
|
||||
// so we never double-invoke. Wildcard-match rows are dispatched via the
|
||||
// same Dispatch path as the libp2p subscribe handler — same depth
|
||||
// tracking, same aggregator buffering, same goroutine spawn.
|
||||
//
|
||||
// Same-gateway publishes cover ~100% of namespace-gateway architecture
|
||||
// (one gateway per namespace per node, publishers and triggers run in
|
||||
// the same process). Cross-gateway wildcard delivery is a separate,
|
||||
// larger problem (plan 6 / plan 10) and out of scope here.
|
||||
func (d *PubSubDispatcher) DispatchLocalPublish(ctx context.Context, namespace, topic string, data []byte, depth int) {
|
||||
if depth >= maxTriggerDepth {
|
||||
d.logger.Warn("PubSub trigger depth limit reached, skipping local-publish dispatch",
|
||||
zap.String("namespace", namespace),
|
||||
zap.String("topic", topic),
|
||||
zap.Int("depth", depth),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
matches, err := d.getMatches(ctx, namespace, topic)
|
||||
if err != nil {
|
||||
d.logger.Error("DispatchLocalPublish: failed to look up triggers",
|
||||
zap.String("namespace", namespace),
|
||||
zap.String("topic", topic),
|
||||
zap.Error(err),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
wildcardMatches := filterWildcardMatches(matches, topic)
|
||||
if len(wildcardMatches) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
event := PubSubEvent{
|
||||
Topic: topic,
|
||||
Data: json.RawMessage(data),
|
||||
Namespace: namespace,
|
||||
TriggerDepth: depth + 1,
|
||||
Timestamp: time.Now().Unix(),
|
||||
}
|
||||
|
||||
d.logger.Debug("DispatchLocalPublish: firing wildcard-only triggers",
|
||||
zap.String("namespace", namespace),
|
||||
zap.String("topic", topic),
|
||||
zap.Int("wildcard_matches", len(wildcardMatches)),
|
||||
zap.Int("depth", depth),
|
||||
)
|
||||
|
||||
var (
|
||||
eventJSON []byte
|
||||
marshalErr error
|
||||
)
|
||||
for _, match := range wildcardMatches {
|
||||
if match.AggregationWindowMs > 0 {
|
||||
d.bufferEvent(match, event)
|
||||
continue
|
||||
}
|
||||
if eventJSON == nil && marshalErr == nil {
|
||||
eventJSON, marshalErr = json.Marshal(event)
|
||||
if marshalErr != nil {
|
||||
d.logger.Error("DispatchLocalPublish: failed to marshal PubSub event", zap.Error(marshalErr))
|
||||
continue
|
||||
}
|
||||
}
|
||||
if marshalErr != nil {
|
||||
continue
|
||||
}
|
||||
go d.invokeFunction(match, eventJSON, depth+1)
|
||||
}
|
||||
}
|
||||
|
||||
// filterWildcardMatches drops matches whose TopicPattern equals the
|
||||
// resolved Topic — those are concrete-pattern matches that already get
|
||||
// delivered via the libp2p subscribe-loopback path (see Refresh).
|
||||
// Returns matches whose pattern is a true glob (e.g. "presence:*"
|
||||
// matching "presence:user-1"). Pure function so the bug-93 fix-logic
|
||||
// pins exactly.
|
||||
func filterWildcardMatches(matches []TriggerMatch, resolvedTopic string) []TriggerMatch {
|
||||
out := matches[:0]
|
||||
for _, m := range matches {
|
||||
if m.TopicPattern != resolvedTopic {
|
||||
out = append(out, m)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// bufferEvent routes an event through the aggregator. The flush callback
|
||||
// invokes the function with the batched payload.
|
||||
func (d *PubSubDispatcher) bufferEvent(match TriggerMatch, event PubSubEvent) {
|
||||
@ -398,6 +501,7 @@ func (d *PubSubDispatcher) bufferEvent(match TriggerMatch, event PubSubEvent) {
|
||||
FunctionName: match.FunctionName,
|
||||
Input: payload,
|
||||
TriggerType: serverless.TriggerTypePubSub,
|
||||
TriggerDepth: event.TriggerDepth, // event was built with depth+1 by the caller
|
||||
}
|
||||
if _, err := d.invoker.Invoke(ctx, req); err != nil {
|
||||
d.logger.Warn("Aggregated PubSub invocation failed",
|
||||
@ -428,7 +532,12 @@ func (d *PubSubDispatcher) getMatches(ctx context.Context, namespace, topic stri
|
||||
|
||||
|
||||
// invokeFunction invokes a single function for a trigger match.
|
||||
func (d *PubSubDispatcher) invokeFunction(match TriggerMatch, eventJSON []byte) {
|
||||
//
|
||||
// `handlerDepth` is the depth at which the INVOKED handler runs (the
|
||||
// source depth + 1). Carried via InvokeRequest.TriggerDepth so the
|
||||
// handler's invocation context sees it; the wildcard-publish host-fn
|
||||
// path uses it to bound local recursion (bugboard #93 follow-up).
|
||||
func (d *PubSubDispatcher) invokeFunction(match TriggerMatch, eventJSON []byte, handlerDepth int) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), dispatchTimeout)
|
||||
defer cancel()
|
||||
|
||||
@ -437,6 +546,7 @@ func (d *PubSubDispatcher) invokeFunction(match TriggerMatch, eventJSON []byte)
|
||||
FunctionName: match.FunctionName,
|
||||
Input: eventJSON,
|
||||
TriggerType: serverless.TriggerTypePubSub,
|
||||
TriggerDepth: handlerDepth,
|
||||
}
|
||||
|
||||
resp, err := d.invoker.Invoke(ctx, req)
|
||||
|
||||
120
core/pkg/serverless/triggers/dispatcher_local_publish_test.go
Normal file
120
core/pkg/serverless/triggers/dispatcher_local_publish_test.go
Normal file
@ -0,0 +1,120 @@
|
||||
package triggers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Bugboard #93 — wildcard delivery on WASM publishes.
|
||||
//
|
||||
// Plan-3 shipped wildcard storage + lookup but skipped the libp2p
|
||||
// subscribe half (libp2p has no wildcard subscribe). For a function
|
||||
// publishing to "presence:user-1" via oh.PubSubPublish:
|
||||
// - concrete trigger "presence:user-1" works (libp2p subscribe-loopback)
|
||||
// - wildcard trigger "presence:*" silently never fires
|
||||
//
|
||||
// DispatchLocalPublish closes the gap by firing wildcard-only triggers
|
||||
// synchronously on the publishing gateway. Concrete triggers must NOT
|
||||
// fire from this path or they'd double-invoke (once locally, once via
|
||||
// libp2p loopback).
|
||||
//
|
||||
// These tests pin the filter logic exactly so a future refactor of
|
||||
// DispatchLocalPublish can't silently re-introduce the wildcard-silent
|
||||
// or the double-fire behavior.
|
||||
|
||||
func TestFilterWildcardMatches_dropsExactPatternMatches(t *testing.T) {
|
||||
// The exact-match concrete trigger MUST be dropped — otherwise we
|
||||
// double-invoke (once here, once via libp2p loopback).
|
||||
matches := []TriggerMatch{
|
||||
{TriggerID: "t1", FunctionName: "fn-exact", Topic: "presence:user-1", TopicPattern: "presence:user-1"},
|
||||
}
|
||||
out := filterWildcardMatches(matches, "presence:user-1")
|
||||
if len(out) != 0 {
|
||||
t.Errorf("BUG #93 REGRESSION: concrete-pattern match must be filtered out "+
|
||||
"(it gets delivered via libp2p loopback); got %d match(es) that would double-fire", len(out))
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterWildcardMatches_keepsWildcardMatch(t *testing.T) {
|
||||
// The actual #93 fix: wildcard pattern "presence:*" matching the
|
||||
// resolved topic "presence:user-1" MUST be kept — that's the
|
||||
// silent-handler bug we're closing.
|
||||
matches := []TriggerMatch{
|
||||
{TriggerID: "t1", FunctionName: "presence-aggregator", Topic: "presence:user-1", TopicPattern: "presence:*"},
|
||||
}
|
||||
out := filterWildcardMatches(matches, "presence:user-1")
|
||||
if len(out) != 1 {
|
||||
t.Fatalf("BUG #93 REGRESSION: wildcard match for 'presence:*' against "+
|
||||
"'presence:user-1' must be kept (the silent-handler bug); got %d", len(out))
|
||||
}
|
||||
if out[0].TopicPattern != "presence:*" {
|
||||
t.Errorf("wrong match kept: want pattern=presence:*, got %q", out[0].TopicPattern)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterWildcardMatches_mixedKeepsOnlyWildcards(t *testing.T) {
|
||||
// The realistic case: a topic has both a concrete subscriber AND a
|
||||
// wildcard subscriber. Concrete is filtered (libp2p handles it),
|
||||
// wildcard is kept (we handle it).
|
||||
matches := []TriggerMatch{
|
||||
{TriggerID: "t1", FunctionName: "fn-concrete", Topic: "messages:new", TopicPattern: "messages:new"},
|
||||
{TriggerID: "t2", FunctionName: "fn-wild", Topic: "messages:new", TopicPattern: "messages:*"},
|
||||
{TriggerID: "t3", FunctionName: "fn-deep", Topic: "messages:new", TopicPattern: "**"},
|
||||
}
|
||||
out := filterWildcardMatches(matches, "messages:new")
|
||||
if len(out) != 2 {
|
||||
t.Fatalf("want 2 wildcard matches (got %d): mixed test must keep wildcards, drop concrete", len(out))
|
||||
}
|
||||
for _, m := range out {
|
||||
if m.TopicPattern == "messages:new" {
|
||||
t.Errorf("filter let the concrete pattern through: %+v", m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterWildcardMatches_emptyInputEmptyOutput(t *testing.T) {
|
||||
// Trivial edge case — no triggers configured at all. Must not panic,
|
||||
// must return empty (caller short-circuits before doing more work).
|
||||
out := filterWildcardMatches(nil, "any:topic")
|
||||
if len(out) != 0 {
|
||||
t.Errorf("nil input must yield empty output; got %d matches", len(out))
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatchLocalPublish_depthLimitNoPanic(t *testing.T) {
|
||||
// Mirrors TestDispatcher_DepthLimit for the local-publish path.
|
||||
// At max depth, must return silently — no store call, no panic.
|
||||
// Without this guard, a function that publishes from a wildcard-
|
||||
// triggered handler could infinitely recurse via DispatchLocalPublish.
|
||||
logger, _ := zap.NewDevelopment()
|
||||
store := NewPubSubTriggerStore(nil, logger) // store would panic if called (nil db)
|
||||
d := NewPubSubDispatcher(store, nil, nil, nil, logger)
|
||||
|
||||
d.DispatchLocalPublish(context.Background(), "ns", "topic", []byte("data"), maxTriggerDepth)
|
||||
d.DispatchLocalPublish(context.Background(), "ns", "topic", []byte("data"), maxTriggerDepth+1)
|
||||
// If we reach here without panicking, the depth guard worked — the
|
||||
// store's nil-db Query would otherwise crash on the second line.
|
||||
}
|
||||
|
||||
func TestDispatchLocalPublish_belowMaxDepthAttemptsStoreLookup(t *testing.T) {
|
||||
// Symmetric guard test: at depth=maxTriggerDepth-1 the dispatcher
|
||||
// MUST attempt the store lookup (depth check passes). The nil
|
||||
// rqlite.Client makes the lookup itself fail/panic — we recover so
|
||||
// the test asserts ONLY the behavioral split at the boundary
|
||||
// (depth guard either trips early-return or doesn't). Without this
|
||||
// test, the depth guard could regress to `>` (off-by-one) and the
|
||||
// recursion bound would shift silently.
|
||||
logger, _ := zap.NewDevelopment()
|
||||
store := NewPubSubTriggerStore(nil, logger)
|
||||
d := NewPubSubDispatcher(store, nil, nil, nil, logger)
|
||||
|
||||
defer func() {
|
||||
// Whether the nil-db lookup panics or returns an error, the
|
||||
// dispatcher's logger.Error path swallows it. Either way we
|
||||
// reached PAST the depth guard, which is the point.
|
||||
_ = recover()
|
||||
}()
|
||||
d.DispatchLocalPublish(context.Background(), "ns", "topic", []byte("data"), maxTriggerDepth-1)
|
||||
}
|
||||
@ -29,6 +29,13 @@ type TriggerMatch struct {
|
||||
FunctionName string
|
||||
Namespace string
|
||||
Topic string
|
||||
// TopicPattern is the trigger's stored pattern (may be a glob).
|
||||
// Carried alongside the resolved Topic so callers like
|
||||
// PubSubDispatcher.DispatchLocalPublish can distinguish wildcard
|
||||
// matches from concrete-topic matches WITHOUT a second lookup
|
||||
// (used to avoid double-firing concrete triggers that already get
|
||||
// delivered via the libp2p subscribe-loopback path).
|
||||
TopicPattern string
|
||||
AggregationWindowMs int
|
||||
AggregationMaxBatchSize int
|
||||
}
|
||||
@ -281,6 +288,7 @@ func (s *PubSubTriggerStore) GetByTopicAndNamespace(ctx context.Context, topic,
|
||||
FunctionName: row.FunctionName,
|
||||
Namespace: row.Namespace,
|
||||
Topic: topic, // resolved topic, not the pattern
|
||||
TopicPattern: row.TopicPattern,
|
||||
AggregationWindowMs: row.AggregationWindowMs,
|
||||
AggregationMaxBatchSize: row.AggregationMaxBatchSize,
|
||||
})
|
||||
|
||||
@ -290,6 +290,16 @@ type InvocationContext struct {
|
||||
// caller also presents an API key. Empty string when the request was
|
||||
// not JWT-authenticated. Bug #215.
|
||||
CallerJWTSubject string `json:"caller_jwt_subject,omitempty"`
|
||||
|
||||
// TriggerDepth is the recursion-depth bucket for trigger-driven
|
||||
// invocations. 0 means a top-level (HTTP/WS/cron) invocation; each
|
||||
// PubSub-trigger-driven invocation increments it. The host-fn
|
||||
// wildcard-publish path (`oh.PubSubPublish` → DispatchLocalPublish)
|
||||
// reads this and refuses to fire wildcards once depth ≥
|
||||
// maxTriggerDepth, preventing local-only recursion loops a function
|
||||
// could create by publishing topics that match its own wildcard
|
||||
// trigger (bugboard #93 follow-up).
|
||||
TriggerDepth int `json:"trigger_depth,omitempty"`
|
||||
}
|
||||
|
||||
// InvocationResult represents the result of a function invocation.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user