diff --git a/core/pkg/pubsub/publish.go b/core/pkg/pubsub/publish.go index 56c7163..e95f44f 100644 --- a/core/pkg/pubsub/publish.go +++ b/core/pkg/pubsub/publish.go @@ -5,7 +5,6 @@ import ( "fmt" "strings" "sync" - "time" "golang.org/x/sync/errgroup" ) @@ -75,30 +74,20 @@ func (m *Manager) Publish(ctx context.Context, topic string, data []byte) error return fmt.Errorf("failed to get topic for publishing: %w", err) } - // Wait briefly for mesh formation if no peers are in the mesh yet - // GossipSub needs time to discover peers and form a mesh - // With FloodPublish enabled, messages will be flooded to all connected peers - // but we still want to give the mesh a chance to form for better delivery - waitCtx, waitCancel := context.WithTimeout(ctx, 2*time.Second) - defer waitCancel() - - // Check if we have peers in the mesh, wait up to 2 seconds for mesh formation - meshFormed := false - for i := 0; i < 20 && !meshFormed; i++ { - peers := libp2pTopic.ListPeers() - if len(peers) > 0 { - meshFormed = true - break // Mesh has formed, proceed with publish - } - select { - case <-waitCtx.Done(): - meshFormed = true // Timeout, proceed anyway (FloodPublish will handle it) - case <-time.After(100 * time.Millisecond): - // Continue waiting - } - } - - // Publish message + // Publish immediately — do NOT wait for gossipsub mesh formation. + // + // The router runs with FloodPublish enabled (pkg/node/libp2p.go and + // pkg/client/client.go), so the message is sent directly to every + // connected peer subscribed to the topic without needing a mesh, and a + // same-gateway subscriber receives it via the local loopback regardless. + // + // A previous version polled ListPeers() for up to 2s here "to give the + // mesh a chance to form." On the namespace-gateway topology most + // application topics (per-conversation/wakeup) have no REMOTE mesh peers + // — they're delivered to local WS clients — so the loop timed out the + // full 2s on EVERY publish, making a 3-publish message-create cost ~6s + // server-side (feat-6, the dominant realtime latency). FloodPublish makes + // the wait redundant; removed. if err := libp2pTopic.Publish(ctx, data); err != nil { return fmt.Errorf("failed to publish message: %w", err) } diff --git a/core/pkg/pubsub/publish_batch_test.go b/core/pkg/pubsub/publish_batch_test.go index 4d13349..9f033ec 100644 --- a/core/pkg/pubsub/publish_batch_test.go +++ b/core/pkg/pubsub/publish_batch_test.go @@ -84,10 +84,31 @@ func TestPublishBatch_context_cancel_returns_error(t *testing.T) { } } +// TestPublish_does_not_block_on_empty_mesh is a regression guard for feat-6. +// Publish must NOT wait for gossipsub mesh formation: it previously polled +// ListPeers() for up to 2s, so every publish to a topic with no remote +// subscribers (the common namespace-gateway case, where wakeup topics are +// delivered to LOCAL WS clients) cost the full 2s — a 3-publish message-create +// paid ~6s server-side. FloodPublish delivers without the mesh, so a publish +// against an empty mesh must return promptly. +func TestPublish_does_not_block_on_empty_mesh(t *testing.T) { + mgr, cleanup := createTestManager(t, "test-ns") + defer cleanup() + + start := time.Now() + if err := mgr.Publish(context.Background(), "no-subscribers", []byte("d")); err != nil { + t.Fatalf("Publish failed: %v", err) + } + // Old code: ~2000ms. New code: ~ms. 500ms is a generous ceiling that + // avoids CI flakiness while still catching a re-introduced multi-second + // mesh-wait. + if elapsed := time.Since(start); elapsed > 500*time.Millisecond { + t.Errorf("Publish blocked %v on an empty mesh — the mesh-wait must stay removed (feat-6)", elapsed) + } +} + func TestPublishBatch_concurrency_limit(t *testing.T) { // Verify PublishBatch with low MaxConcurrency completes without deadlocking. - // Each Publish in a no-peer test environment waits up to 2s for mesh formation, - // so we use a small batch size to keep wall time bounded. mgr, cleanup := createTestManager(t, "test-ns") defer cleanup() diff --git a/core/pkg/serverless/engine.go b/core/pkg/serverless/engine.go index 1504be4..58c4529 100644 --- a/core/pkg/serverless/engine.go +++ b/core/pkg/serverless/engine.go @@ -312,6 +312,12 @@ func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx // caller that hasn't been migrated yet. execCtx = WithInvocationContext(execCtx, invCtx) + // Fresh per-invocation pubsub publish counter so the pubsub host + // functions can cap how many messages one invocation floods onto the + // shared gossipsub router (no WASM fuel metering exists; the rate limiter + // gates invocation frequency, not per-invocation host-call volume). + execCtx = WithPublishCounter(execCtx) + // Get compiled module (from cache or compile) module, err := e.getOrCompileModule(execCtx, fn.WASMCID) if err != nil { diff --git a/core/pkg/serverless/hostfunctions/database.go b/core/pkg/serverless/hostfunctions/database.go index 4ec4f53..8922f6c 100644 --- a/core/pkg/serverless/hostfunctions/database.go +++ b/core/pkg/serverless/hostfunctions/database.go @@ -376,6 +376,17 @@ func (h *HostFunctions) ExecAndPublish( } } + // exec_and_publish reaches the same shared gossipsub publish path as + // pubsub_publish, so it must charge the same per-invocation publish budget + // (it publishes exactly one wake-up message on commit). Checked before the + // write so an over-budget call has no side effects. + if n := serverless.AddPublishCount(ctx, 1); n > maxPublishesPerInvocation { + return nil, &serverless.HostFunctionError{ + Function: "exec_and_publish", + Cause: fmt.Errorf("publish budget exceeded (max %d per invocation)", maxPublishesPerInvocation), + } + } + batchRes, seq, batchErr := h.db.BatchWithSeq(ctx, ns, req.Ops) out := execAndPublishResult{} if batchRes != nil { diff --git a/core/pkg/serverless/hostfunctions/pubsub.go b/core/pkg/serverless/hostfunctions/pubsub.go index b23e19a..5df0556 100644 --- a/core/pkg/serverless/hostfunctions/pubsub.go +++ b/core/pkg/serverless/hostfunctions/pubsub.go @@ -10,6 +10,16 @@ import ( "github.com/DeBrosOfficial/network/pkg/serverless" ) +// maxPublishesPerInvocation caps how many pubsub messages a single function +// invocation (or persistent WS frame) may publish. This is a safety bound, not +// a normal-path limit: legitimate functions publish a handful (message-create +// does 3). It exists because the WASM runtime has no fuel metering and the +// request rate limiter gates invocation FREQUENCY, not per-invocation host-call +// volume — so without it a buggy/hostile `for { publish() }` could flood the +// shared gossipsub router, amplified to every peer by FloodPublish. 1000 is far +// above any real workload while bounding the blast radius ~1000x. +const maxPublishesPerInvocation = 1000 + // PubSubPublish publishes a message to a topic. // // After a successful libp2p publish, also synchronously fires local @@ -26,6 +36,13 @@ func (h *HostFunctions) PubSubPublish(ctx context.Context, topic string, data [] return &serverless.HostFunctionError{Function: "pubsub_publish", Cause: fmt.Errorf("pubsub not available")} } + if n := serverless.AddPublishCount(ctx, 1); n > maxPublishesPerInvocation { + return &serverless.HostFunctionError{ + Function: "pubsub_publish", + Cause: fmt.Errorf("publish budget exceeded (max %d per invocation)", maxPublishesPerInvocation), + } + } + // The pubsub adapter handles namespacing internally if err := h.pubsub.Publish(ctx, topic, data); err != nil { return &serverless.HostFunctionError{Function: "pubsub_publish", Cause: err} @@ -117,6 +134,13 @@ func (h *HostFunctions) PubSubPublishBatch(ctx context.Context, msgsJSON []byte) msgs = append(msgs, pubsub.TopicMessage{Topic: e.Topic, Data: data}) } + if n := serverless.AddPublishCount(ctx, len(msgs)); n > maxPublishesPerInvocation { + return &serverless.HostFunctionError{ + Function: "pubsub_publish_batch", + Cause: fmt.Errorf("publish budget exceeded (max %d per invocation)", maxPublishesPerInvocation), + } + } + if err := h.pubsub.PublishBatch(ctx, msgs, pubsub.PublishBatchOptions{}); err != nil { return &serverless.HostFunctionError{Function: "pubsub_publish_batch", Cause: err} } diff --git a/core/pkg/serverless/hostfunctions/pubsub_budget_test.go b/core/pkg/serverless/hostfunctions/pubsub_budget_test.go new file mode 100644 index 0000000..f87a95f --- /dev/null +++ b/core/pkg/serverless/hostfunctions/pubsub_budget_test.go @@ -0,0 +1,59 @@ +package hostfunctions + +import ( + "context" + "testing" + + "github.com/DeBrosOfficial/network/pkg/pubsub" + "github.com/DeBrosOfficial/network/pkg/serverless" +) + +// feat-6 follow-up: the per-invocation publish budget bounds gossipsub flooding +// now that the implicit 2s/publish throttle is gone. These verify the cap is +// enforced before the message ever reaches the pubsub layer. A non-nil sentinel +// adapter is used because once the budget is exceeded the publish is rejected +// before h.pubsub.Publish is reached, so the adapter is never dereferenced. + +func TestPubSubPublish_budgetEnforced(t *testing.T) { + h := &HostFunctions{pubsub: &pubsub.ClientAdapter{}} + ctx := serverless.WithPublishCounter(context.Background()) + serverless.AddPublishCount(ctx, maxPublishesPerInvocation) // exhaust to the cap + + if err := h.PubSubPublish(ctx, "t", []byte("d")); err == nil { + t.Fatal("expected publish-budget-exceeded error once the per-invocation cap is reached") + } +} + +func TestPubSubPublishBatch_budgetEnforced(t *testing.T) { + h := &HostFunctions{pubsub: &pubsub.ClientAdapter{}} + ctx := serverless.WithPublishCounter(context.Background()) + serverless.AddPublishCount(ctx, maxPublishesPerInvocation) + + in := []byte(`[{"topic":"t","data_base64":""}]`) + if err := h.PubSubPublishBatch(ctx, in); err == nil { + t.Fatal("expected publish-budget-exceeded error for the batch once over the cap") + } +} + +func TestExecAndPublish_budgetEnforced(t *testing.T) { + // exec_and_publish reaches the same shared gossipsub path, so it must also + // be bounded. db is non-nil but BatchWithSeq is never reached once the + // budget check rejects (it runs before the write). + fake := &fakeBatchClient{} + h := &HostFunctions{pubsub: &pubsub.ClientAdapter{}, db: fake} + ctx := serverless.WithInvocationContext( + serverless.WithPublishCounter(context.Background()), + &serverless.InvocationContext{Namespace: "ns-test"}, + ) + serverless.AddPublishCount(ctx, maxPublishesPerInvocation) + + in := []byte(`{"ops":[{"kind":"exec","sql":"INSERT INTO t VALUES (1)"}]}`) + if _, err := h.ExecAndPublish(ctx, in, "topic", []byte("data")); err == nil { + t.Fatal("expected publish-budget-exceeded error from exec_and_publish once over the cap") + } + // The budget check runs before the write — an over-budget call must have + // no side effects (no BatchWithSeq, hence no commit + no publish). + if fake.seqCalls != 0 { + t.Errorf("over-budget exec_and_publish must not write; got %d BatchWithSeq call(s)", fake.seqCalls) + } +} diff --git a/core/pkg/serverless/invocation_context.go b/core/pkg/serverless/invocation_context.go index 7ad623b..1318b86 100644 --- a/core/pkg/serverless/invocation_context.go +++ b/core/pkg/serverless/invocation_context.go @@ -1,6 +1,9 @@ package serverless -import "context" +import ( + "context" + "sync/atomic" +) // invCtxKey is the unexported context-value key used to attach an // InvocationContext to a Go context. The empty struct is the standard @@ -63,3 +66,37 @@ func InvocationContextFromCtx(ctx context.Context) *InvocationContext { v, _ := ctx.Value(invCtxKey{}).(*InvocationContext) return v } + +// publishCounterKey is the unexported context-value key for the per-invocation +// pubsub publish counter. +type publishCounterKey struct{} + +// publishCounter tracks how many pubsub messages a single invocation has +// published, so the host layer can cap intra-invocation publish volume. It +// rides the invocation's context (same per-call propagation model as +// InvocationContext) so concurrent invocations each get their own counter. +type publishCounter struct{ n atomic.Int64 } + +// WithPublishCounter returns a derived ctx carrying a FRESH per-invocation +// publish counter. Engine.Execute (stateless) and the persistent WS frame +// handler attach this so the pubsub host functions can bound how many messages +// one invocation publishes — the WASM runtime has no fuel metering and the +// rate limiter only gates invocation FREQUENCY, not per-invocation host-call +// volume, so without this a single admitted invocation could flood the shared +// gossipsub router (amplified to every peer by FloodPublish). +func WithPublishCounter(ctx context.Context) context.Context { + return context.WithValue(ctx, publishCounterKey{}, &publishCounter{}) +} + +// AddPublishCount adds n to the invocation's publish counter and returns the +// new running total. Returns -1 when the ctx carries no counter (an untracked +// path) so callers can skip enforcement rather than reject. +func AddPublishCount(ctx context.Context, n int) int64 { + if ctx == nil || n <= 0 { + return -1 + } + if pc, ok := ctx.Value(publishCounterKey{}).(*publishCounter); ok { + return pc.n.Add(int64(n)) + } + return -1 +} diff --git a/core/pkg/serverless/persistent/instance.go b/core/pkg/serverless/persistent/instance.go index 74f5005..4fcb6fe 100644 --- a/core/pkg/serverless/persistent/instance.go +++ b/core/pkg/serverless/persistent/instance.go @@ -183,6 +183,10 @@ func (i *Instance) withInvCtx(ctx context.Context) context.Context { if cur != nil { ctx = serverless.WithInvocationContext(ctx, cur) } + // Fresh per-frame pubsub publish counter so the pubsub host functions can + // bound how many messages one frame floods onto the shared gossipsub + // router (scoped per export call, like the rest of withInvCtx). + ctx = serverless.WithPublishCounter(ctx) // Attach a fresh per-call LogBuffer so oh.LogInfo / oh.LogError from // inside this ws_open / ws_frame / ws_close call write to a // scoped slice instead of the HostFunctions singleton (bugboard diff --git a/core/pkg/serverless/publish_counter_test.go b/core/pkg/serverless/publish_counter_test.go new file mode 100644 index 0000000..1fedae4 --- /dev/null +++ b/core/pkg/serverless/publish_counter_test.go @@ -0,0 +1,53 @@ +package serverless + +import ( + "context" + "testing" +) + +// feat-6 follow-up: removing the 2s publish wait removed the only implicit +// throttle on intra-invocation publish volume, so a per-invocation publish +// counter bounds it. These pin the counter's tracked/untracked behavior and +// the per-scope freshness that keeps a nested function_invoke from inheriting +// its caller's count. + +func TestAddPublishCount_untrackedReturnsNegative(t *testing.T) { + if got := AddPublishCount(context.Background(), 1); got != -1 { + t.Errorf("untracked ctx must return -1 (no enforcement); got %d", got) + } + if got := AddPublishCount(nil, 1); got != -1 { + t.Errorf("nil ctx must return -1; got %d", got) + } +} + +func TestAddPublishCount_tracksAndAccumulates(t *testing.T) { + ctx := WithPublishCounter(context.Background()) + if got := AddPublishCount(ctx, 1); got != 1 { + t.Errorf("first publish: got %d, want 1", got) + } + if got := AddPublishCount(ctx, 4); got != 5 { + t.Errorf("after +4: got %d, want 5", got) + } + // n<=0 is a no-op (returns -1) and must not change the running total. + if got := AddPublishCount(ctx, 0); got != -1 { + t.Errorf("n=0 must return -1 (no-op); got %d", got) + } + if got := AddPublishCount(ctx, 1); got != 6 { + t.Errorf("total must be unaffected by the n=0 call; got %d, want 6", got) + } +} + +func TestWithPublishCounter_freshPerScope(t *testing.T) { + parent := WithPublishCounter(context.Background()) + AddPublishCount(parent, 10) + + // A nested invocation attaches its own fresh counter and must start at 0. + child := WithPublishCounter(parent) + if got := AddPublishCount(child, 1); got != 1 { + t.Errorf("nested counter must start fresh (independent of parent); got %d", got) + } + // Parent total is unaffected by the child. + if got := AddPublishCount(parent, 1); got != 11 { + t.Errorf("parent total must be independent of child; got %d, want 11", got) + } +}