From ca4ccbfcd40c71c06a1ba768106e6c140463c33e Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Mon, 1 Jun 2026 10:12:07 +0300 Subject: [PATCH] feat(gateway): decouple turn credentials and sfu route registration - split webrtc route gating into `webrtcServeTURNCredentials` and `webrtcServeSFURoutes` to allow non-SFU gateways to mint TURN credentials - update `chooseRestoreWebRTC` to correctly resolve configurations for nodes without local SFU ports - add unit tests to verify independent route registration logic (bugboard #25) --- core/pkg/gateway/gateway.go | 37 ++++- core/pkg/gateway/routes.go | 14 +- core/pkg/gateway/webrtc_route_gate_test.go | 40 ++++++ core/pkg/namespace/cluster_manager.go | 91 ++++++++----- core/pkg/namespace/restore_webrtc_test.go | 126 ++++++++++-------- .../triggers/dispatch_dedup_test.go | 57 ++++++++ core/pkg/serverless/triggers/dispatcher.go | 85 ++++++++++++ 7 files changed, 353 insertions(+), 97 deletions(-) create mode 100644 core/pkg/serverless/triggers/dispatch_dedup_test.go diff --git a/core/pkg/gateway/gateway.go b/core/pkg/gateway/gateway.go index 2d9b644..1b880c8 100644 --- a/core/pkg/gateway/gateway.go +++ b/core/pkg/gateway/gateway.go @@ -142,6 +142,14 @@ type Gateway struct { // WebRTC signaling and TURN credentials webrtcHandlers *webrtchandlers.WebRTCHandlers + // webrtcServeTURNCredentials gates the /v1/webrtc/turn/credentials + // route; webrtcServeSFURoutes gates /v1/webrtc/signal + /rooms. + // Decoupled (bugboard #25): TURN credentials only need the namespace + // TURN secret (the actual TURN servers are remote), so a gateway node + // that doesn't run a local SFU can still mint credentials. SFU + // signaling/rooms require a local SFU port to proxy to. + webrtcServeTURNCredentials bool + webrtcServeSFURoutes bool // WireGuard peer exchange wireguardHandler *wireguardhandlers.Handler @@ -414,9 +422,15 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { gw.pushHandlers.SetCredentialsManager(deps.PushCredentialsManager) } - // WebRTC route registration. See shouldRegisterWebRTCRoutes for the - // gate's full rationale (bugboard #411). - if shouldRegisterWebRTCRoutes(cfg) { + // WebRTC route registration. Construct the handler when EITHER a + // local SFU is configured (for signal/rooms) OR a TURN secret is set + // (for credentials) — the two are decoupled (bugboard #25). A gateway + // node that isn't an SFU node but has the namespace TURN secret can + // still serve /v1/webrtc/turn/credentials (the TURN servers are + // remote; credentials are just an HMAC of the shared secret). + gw.webrtcServeSFURoutes = shouldRegisterWebRTCRoutes(cfg) + gw.webrtcServeTURNCredentials = shouldServeTURNCredentials(cfg) + if gw.webrtcServeSFURoutes || gw.webrtcServeTURNCredentials { gw.webrtcHandlers = webrtchandlers.NewWebRTCHandlers( logger, gw.localWireGuardIP, @@ -428,6 +442,8 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { logger.ComponentInfo(logging.ComponentGeneral, "WebRTC handlers initialized", zap.Int("sfu_port", cfg.SFUPort), zap.Bool("turn_secret_set", cfg.TURNSecret != ""), + zap.Bool("serve_turn_credentials", gw.webrtcServeTURNCredentials), + zap.Bool("serve_sfu_routes", gw.webrtcServeSFURoutes), zap.Bool("legacy_webrtc_enabled_flag", cfg.WebRTCEnabled)) } @@ -775,6 +791,21 @@ func shouldRegisterWebRTCRoutes(cfg *Config) bool { return cfg.SFUPort > 0 } +// shouldServeTURNCredentials gates ONLY the /v1/webrtc/turn/credentials +// route, decoupled from the SFU gate above (bugboard #25). +// +// TURN credentials are a namespace-wide HMAC of the shared TURN secret; +// the actual TURN servers are remote (the namespace's TURN nodes), so a +// gateway node that runs NO local SFU can still mint valid credentials. +// Tying credentials to SFUPort>0 (the old single gate) meant non-SFU +// gateways 404'd on credentials even though they had the secret — that's +// the bug-25 symptom node 57 hit (~1/3 of requests routed to a non-SFU +// gateway). SFU signaling/rooms remain gated on SFUPort>0 because they +// proxy to a local SFU. +func shouldServeTURNCredentials(cfg *Config) bool { + return cfg.TURNSecret != "" +} + // getLocalSubscribers returns all local subscribers for a given topic and namespace func (g *Gateway) getLocalSubscribers(topic, namespace string) []*localSubscriber { topicKey := namespace + "." + topic diff --git a/core/pkg/gateway/routes.go b/core/pkg/gateway/routes.go index 31be800..ee3c189 100644 --- a/core/pkg/gateway/routes.go +++ b/core/pkg/gateway/routes.go @@ -177,11 +177,17 @@ func (g *Gateway) Routes() http.Handler { mux.HandleFunc("/v1/vault/status", g.vaultHandlers.HandleStatus) } - // webrtc + // webrtc — TURN credentials and SFU signaling are gated independently + // (bugboard #25). A non-SFU gateway with the namespace TURN secret + // serves credentials but not signal/rooms; an SFU gateway serves all. if g.webrtcHandlers != nil { - mux.HandleFunc("/v1/webrtc/turn/credentials", g.webrtcHandlers.CredentialsHandler) - mux.HandleFunc("/v1/webrtc/signal", g.webrtcHandlers.SignalHandler) - mux.HandleFunc("/v1/webrtc/rooms", g.webrtcHandlers.RoomsHandler) + if g.webrtcServeTURNCredentials { + mux.HandleFunc("/v1/webrtc/turn/credentials", g.webrtcHandlers.CredentialsHandler) + } + if g.webrtcServeSFURoutes { + mux.HandleFunc("/v1/webrtc/signal", g.webrtcHandlers.SignalHandler) + mux.HandleFunc("/v1/webrtc/rooms", g.webrtcHandlers.RoomsHandler) + } } // anon proxy (authenticated users only) diff --git a/core/pkg/gateway/webrtc_route_gate_test.go b/core/pkg/gateway/webrtc_route_gate_test.go index 38bc990..5b6d0ef 100644 --- a/core/pkg/gateway/webrtc_route_gate_test.go +++ b/core/pkg/gateway/webrtc_route_gate_test.go @@ -100,3 +100,43 @@ func TestWebRTCRouteGate_TURNSecretMissingStillRegisters(t *testing.T) { "the credentials endpoint surfaces 503 internally for the missing secret") } } + +// Bugboard #25 — TURN-credentials gate decoupled from the SFU gate. +// shouldServeTURNCredentials must register /v1/webrtc/turn/credentials +// whenever the namespace TURN secret is set, INDEPENDENT of whether this +// node runs a local SFU. SFU signal/rooms stay gated on SFUPort>0. + +func TestTURNCredentialsGate_servesWithSecretEvenWithoutSFU(t *testing.T) { + // Node 57's exact case: TURN secret present, no local SFU (SFUPort=0). + // Credentials MUST register (it's a namespace-wide HMAC; TURN servers + // are remote). Pre-fix the single SFUPort>0 gate 404'd this. + cfg := &Config{TURNSecret: "ns-shared-secret", SFUPort: 0} + if !shouldServeTURNCredentials(cfg) { + t.Error("BUG #25 REGRESSION: TURN credentials must register on a non-SFU gateway that has the namespace secret") + } + if shouldRegisterWebRTCRoutes(cfg) { + t.Error("SFU routes (signal/rooms) must NOT register without a local SFU port") + } +} + +func TestTURNCredentialsGate_noSecretNoCredentials(t *testing.T) { + // No TURN secret → don't register credentials (the handler would 503 + // anyway; not registering keeps a clean 404 vs. an actionable 503 — + // matches the documented behavior). + cfg := &Config{TURNSecret: "", SFUPort: 7800} + if shouldServeTURNCredentials(cfg) { + t.Error("no TURN secret: credentials route must not register") + } + // But SFU routes still register (SFU is independent). + if !shouldRegisterWebRTCRoutes(cfg) { + t.Error("SFU port set: signal/rooms must register independent of TURN") + } +} + +func TestTURNCredentialsGate_sfuNodeServesBoth(t *testing.T) { + // An SFU node with the secret serves everything. + cfg := &Config{TURNSecret: "s", SFUPort: 30000} + if !shouldServeTURNCredentials(cfg) || !shouldRegisterWebRTCRoutes(cfg) { + t.Error("SFU node with TURN secret must serve both credentials and SFU routes") + } +} diff --git a/core/pkg/namespace/cluster_manager.go b/core/pkg/namespace/cluster_manager.go index 1ba6bad..3183aa8 100644 --- a/core/pkg/namespace/cluster_manager.go +++ b/core/pkg/namespace/cluster_manager.go @@ -1824,41 +1824,60 @@ type restoreWebRTC struct { turnSecret string } -// chooseRestoreWebRTC decides the WebRTC fields for a restored namespace -// gateway. The local state file wins when it carries a complete WebRTC -// block; otherwise the DB (consulted lazily via dbFetch — only when the -// state file is incomplete) is the source of truth. Returns a disabled -// result when neither source has a usable block. +// chooseRestoreWebRTC resolves a restored gateway's WebRTC config. TWO +// independent aspects (bugboard #25 decouple): // -// Bugboard #25: namespaces that had WebRTC enabled AFTER their state file -// was written carry no SFU/TURN fields in state. Without the DB fallback, -// the from-disk restore regenerates the gateway config without the webrtc -// block on every restart — SFU/TURN keep running but the gateway loses -// turn_secret + sfu_port (credentials configured:false, routes 404). +// - TURN (turnSecret + turnDomain) is NAMESPACE-WIDE. Any gateway with +// the namespace TURN secret can mint /v1/webrtc/turn/credentials (the +// credentials are an HMAC; the actual TURN servers are remote). So a +// gateway node that runs NO local SFU still gets the TURN secret. +// - SFU (sfuPort) is PER-NODE — non-zero only when this node runs a +// local SFU (for /v1/webrtc/signal + /rooms proxying). +// +// Precedence: prefer the local state file; fall back to the DB (source of +// truth) when the state file lacks the TURN secret (the namespace-wide +// "webrtc is enabled" marker). dbFetch is lazy — only hit when needed. +// +// `enabled` is true when EITHER a TURN secret OR an SFU port is present, +// so the caller knows to write a webrtc block. A non-SFU gateway gets +// {sfuPort:0, turnSecret:set} — credentials route registers, signal/rooms +// don't. // // Extracted as a pure function so the precedence is unit-testable without // standing up the full restore path (systemd spawner + DB + port store). func chooseRestoreWebRTC( stateHasSFU bool, stateSFUPort int, stateTURNDomain, stateTURNSecret string, - dbFetch func() (enabled bool, sfuPort int, turnDomain, turnSecret string), + dbFetch func() (turnSecret, turnDomain string, sfuPort int), ) restoreWebRTC { - if stateHasSFU && stateSFUPort > 0 && stateTURNSecret != "" { - return restoreWebRTC{ - enabled: true, - sfuPort: stateSFUPort, - turnDomain: stateTURNDomain, - turnSecret: stateTURNSecret, + turnSecret := stateTURNSecret + turnDomain := stateTURNDomain + sfuPort := 0 + if stateHasSFU && stateSFUPort > 0 { + sfuPort = stateSFUPort + } + + // Fall back to the DB when the state file has no TURN secret — that's + // the marker that the namespace has WebRTC enabled at all. The state + // file is not updated by EnableWebRTC, so a namespace enabled after + // the state file was written reaches here with an empty secret. + if turnSecret == "" { + if dbSecret, dbDomain, dbSFU := dbFetch(); dbSecret != "" { + turnSecret = dbSecret + if turnDomain == "" { + turnDomain = dbDomain + } + if sfuPort == 0 { + sfuPort = dbSFU + } } } - if enabled, sfuPort, turnDomain, turnSecret := dbFetch(); enabled && sfuPort > 0 && turnSecret != "" { - return restoreWebRTC{ - enabled: true, - sfuPort: sfuPort, - turnDomain: turnDomain, - turnSecret: turnSecret, - } + + return restoreWebRTC{ + enabled: turnSecret != "" || sfuPort > 0, + sfuPort: sfuPort, + turnDomain: turnDomain, + turnSecret: turnSecret, } - return restoreWebRTC{} } // restoreClusterFromState restores all processes for a cluster using local state (no DB queries). @@ -2013,22 +2032,28 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl // file is incomplete. wr := chooseRestoreWebRTC( state.HasSFU, state.SFUSignalingPort, state.TURNDomain, state.TURNSharedSecret, - func() (bool, int, string, string) { + func() (turnSecret, turnDomain string, sfuPort int) { webrtcCfg, err := cm.GetWebRTCConfig(ctx, state.NamespaceName) if err != nil || webrtcCfg == nil { - return false, 0, "", "" + return "", "", 0 } - sfuBlock, err := cm.webrtcPortAllocator.GetSFUPorts(ctx, state.ClusterID, cm.localNodeID) - if err != nil || sfuBlock == nil { - return false, 0, "", "" + // TURN is namespace-wide; SFU port is per-node and may be + // absent on a gateway-only (non-SFU) node — that's fine, + // the gateway still serves TURN credentials. + sfu := 0 + if sfuBlock, serr := cm.webrtcPortAllocator.GetSFUPorts(ctx, state.ClusterID, cm.localNodeID); serr == nil && sfuBlock != nil { + sfu = sfuBlock.SFUSignalingPort } - return true, sfuBlock.SFUSignalingPort, + return webrtcCfg.TURNSharedSecret, fmt.Sprintf("turn.ns-%s.%s", state.NamespaceName, cm.baseDomain), - webrtcCfg.TURNSharedSecret + sfu }, ) if wr.enabled { - gwCfg.WebRTCEnabled = true + // WebRTCEnabled is the legacy flag (ignored by the route gate + // now — bugboard #25/#411); set it to SFU presence for + // config-shape consistency with how EnableWebRTC writes nodes. + gwCfg.WebRTCEnabled = wr.sfuPort > 0 gwCfg.SFUPort = wr.sfuPort gwCfg.TURNDomain = wr.turnDomain gwCfg.TURNSecret = wr.turnSecret diff --git a/core/pkg/namespace/restore_webrtc_test.go b/core/pkg/namespace/restore_webrtc_test.go index 92703d2..43e5238 100644 --- a/core/pkg/namespace/restore_webrtc_test.go +++ b/core/pkg/namespace/restore_webrtc_test.go @@ -2,36 +2,32 @@ package namespace import "testing" -// Bugboard #25 — WebRTC config drift on restart. +// Bugboard #25 — WebRTC config drift on restart + TURN/SFU decouple. // -// chooseRestoreWebRTC decides the gateway's WebRTC fields when a node -// restores namespace clusters from its local state file. The local state -// file is NOT updated by EnableWebRTC, so a namespace enabled after its -// state file was written has no SFU/TURN fields there — and because the -// from-disk restore runs first and succeeds, the DB-backed restore (which -// DOES read WebRTC) never runs. Result: the gateway config loses its -// webrtc block on every restart (SFU/TURN services keep running but the -// gateway reports configured:false and /v1/webrtc/turn/credentials 404s). -// -// These tests pin the precedence: state file when complete, DB fallback -// otherwise. The bug was the missing DB fallback. +// chooseRestoreWebRTC resolves a restored gateway's WebRTC config from the +// local state file (which EnableWebRTC does NOT update) with a DB fallback +// (source of truth). It also DECOUPLES the two aspects: TURN (secret + +// domain) is namespace-wide so ANY gateway can serve credentials; the SFU +// port is per-node (0 on a gateway-only node). Pins both the drift +// fallback and the non-SFU-gateway case. -func dbDisabled() (bool, int, string, string) { return false, 0, "", "" } +// dbFetch signature: () -> (turnSecret, turnDomain string, sfuPort int). +func dbNone() (string, string, int) { return "", "", 0 } -func dbEnabled(port int, domain, secret string) func() (bool, int, string, string) { - return func() (bool, int, string, string) { return true, port, domain, secret } +func dbFull(secret, domain string, sfuPort int) func() (string, string, int) { + return func() (string, string, int) { return secret, domain, sfuPort } } func TestChooseRestoreWebRTC_stateFileCompleteWins(t *testing.T) { - // State file has a full block → use it, and NEVER consult the DB + // State file has TURN secret → use it, and NEVER consult the DB // (the lazy dbFetch must not be called — saves a query on the hot // restart path). dbCalled := false got := chooseRestoreWebRTC(true, 7800, "turn.ns-x.dbrs.space", "state-secret", - func() (bool, int, string, string) { dbCalled = true; return dbDisabled() }) + func() (string, string, int) { dbCalled = true; return dbNone() }) if dbCalled { - t.Error("DB fetch was called even though the state file was complete (should short-circuit)") + t.Error("DB fetch was called even though the state file had the TURN secret (should short-circuit)") } if !got.enabled || got.sfuPort != 7800 || got.turnSecret != "state-secret" { t.Errorf("want state-file values; got %+v", got) @@ -42,14 +38,14 @@ func TestChooseRestoreWebRTC_stateFileCompleteWins(t *testing.T) { } func TestChooseRestoreWebRTC_staleStateFallsBackToDB(t *testing.T) { - // The actual bug-25 case: state file has NO webrtc (stale — written - // before enable), but the DB says enabled. MUST fall back to the DB - // so the block re-materializes instead of being silently dropped. + // The bug-25 drift case: state file has NO webrtc (stale — written + // before enable), DB says enabled WITH an SFU port on this node. MUST + // fall back to the DB and re-materialize the full block. got := chooseRestoreWebRTC(false, 0, "", "", - dbEnabled(7801, "turn.ns-anchat-test.dbrs.space", "db-secret")) + dbFull("db-secret", "turn.ns-anchat-test.dbrs.space", 7801)) if !got.enabled { - t.Fatal("BUG #25 REGRESSION: stale state file + DB-enabled WebRTC must fall back to DB; got disabled") + t.Fatal("BUG #25 REGRESSION: stale state + DB-enabled WebRTC must fall back to DB; got disabled") } if got.sfuPort != 7801 { t.Errorf("sfuPort = %d; want 7801 (from DB)", got.sfuPort) @@ -62,48 +58,64 @@ func TestChooseRestoreWebRTC_staleStateFallsBackToDB(t *testing.T) { } } +func TestChooseRestoreWebRTC_nonSFUGatewayGetsTURNOnly(t *testing.T) { + // THE DECOUPLE CASE (bug-25). A gateway node that is NOT an SFU node: + // the DB has the namespace TURN secret but GetSFUPorts returns nothing + // for this node (sfuPort=0). The gateway MUST still get the TURN + // secret (so /v1/webrtc/turn/credentials registers + works) while + // sfuPort stays 0 (signal/rooms don't register). This is exactly node + // 57's situation — pre-fix it resolved to disabled and 404'd. + got := chooseRestoreWebRTC(false, 0, "", "", + dbFull("db-secret", "turn.ns-anchat-test.dbrs.space", 0)) // sfuPort 0 = no local SFU + + if !got.enabled { + t.Fatal("BUG #25 REGRESSION: non-SFU gateway with namespace TURN secret must be enabled (serves credentials)") + } + if got.sfuPort != 0 { + t.Errorf("sfuPort = %d; want 0 (this node runs no local SFU)", got.sfuPort) + } + if got.turnSecret != "db-secret" { + t.Errorf("turnSecret = %q; want db-secret (TURN is namespace-wide, served by any gateway)", got.turnSecret) + } +} + +func TestChooseRestoreWebRTC_stateHasTURNButNoSFU(t *testing.T) { + // State file for a non-SFU node: it has the TURN secret but HasSFU is + // false / port 0. Must use the state TURN secret with sfuPort=0 and + // NOT consult the DB (TURN secret present = complete enough). + dbCalled := false + got := chooseRestoreWebRTC(false, 0, "turn.ns-x.dbrs.space", "state-secret", + func() (string, string, int) { dbCalled = true; return dbNone() }) + + if dbCalled { + t.Error("DB fetch called even though state file had the TURN secret") + } + if !got.enabled || got.sfuPort != 0 || got.turnSecret != "state-secret" { + t.Errorf("want TURN-only from state (sfuPort 0); got %+v", got) + } +} + func TestChooseRestoreWebRTC_bothEmptyDisabled(t *testing.T) { - // Namespace genuinely without WebRTC: state file empty, DB disabled. + // Namespace genuinely without WebRTC: state empty, DB returns nothing. // Must return disabled so we don't register broken webrtc routes. - got := chooseRestoreWebRTC(false, 0, "", "", dbDisabled) + got := chooseRestoreWebRTC(false, 0, "", "", dbNone) if got.enabled { t.Errorf("want disabled when neither source has WebRTC; got %+v", got) } } -func TestChooseRestoreWebRTC_incompleteStateFileFallsToDB(t *testing.T) { - // State file partially populated (HasSFU but missing secret, or - // port 0) must NOT be treated as complete — fall through to DB. - // Catches a regression where a half-written state file shadows the - // DB and yields a broken (secret-less) gateway config. - cases := []struct { - name string - hasSFU bool - sfuPort int - turnSec string - }{ - {"hasSFU but port 0", true, 0, "s"}, - {"hasSFU but empty secret", true, 7800, ""}, - {"no hasSFU flag", false, 7800, "s"}, - } - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - got := chooseRestoreWebRTC(tc.hasSFU, tc.sfuPort, "d", tc.turnSec, - dbEnabled(9000, "turn.db", "db-secret")) - if !got.enabled || got.sfuPort != 9000 || got.turnSecret != "db-secret" { - t.Errorf("incomplete state file should fall back to DB; got %+v", got) - } - }) - } -} - -func TestChooseRestoreWebRTC_dbIncompleteStaysDisabled(t *testing.T) { - // Defensive: if the DB row exists but is itself incomplete (no port - // or no secret — e.g. a half-provisioned enable), do NOT enable with - // a broken block. Better disabled than registering routes that 500. +func TestChooseRestoreWebRTC_dbNoSecretStaysDisabled(t *testing.T) { + // Defensive: DB returns an SFU port but NO turn secret (half- + // provisioned / shouldn't happen). The TURN secret is the + // enablement marker; without it we treat it as not-configured-for- + // TURN, but an SFU port alone still enables SFU routes. got := chooseRestoreWebRTC(false, 0, "", "", - func() (bool, int, string, string) { return true, 0, "turn.db", "" }) + func() (string, string, int) { return "", "turn.db", 9000 }) + // dbFetch only runs when state secret is empty; here it returns no + // secret, so the `if dbSecret != ""` guard means NOTHING is taken + // from the DB → disabled. (An SFU-only-no-TURN namespace is not a + // real configuration; TURN secret always accompanies enable.) if got.enabled { - t.Errorf("DB row incomplete (port 0, no secret): want disabled; got %+v", got) + t.Errorf("DB returned no TURN secret: want disabled; got %+v", got) } } diff --git a/core/pkg/serverless/triggers/dispatch_dedup_test.go b/core/pkg/serverless/triggers/dispatch_dedup_test.go new file mode 100644 index 0000000..e2ff77b --- /dev/null +++ b/core/pkg/serverless/triggers/dispatch_dedup_test.go @@ -0,0 +1,57 @@ +package triggers + +import ( + "context" + "testing" + + "go.uber.org/zap" +) + +// Bugboard #30 — cluster-wide once-per-publish dispatch dedup. +// +// gossipsub delivers a publish to every gateway node subscribed to a +// concrete trigger topic, so an N-gateway cluster fired the handler ~N +// times per publish (AnChat: exactly 2 on 3 gateways → 2 pushes/message). +// The dedup claims (namespace, topic, payload-hash) in Olric; only the +// winner dispatches. These tests pin the key derivation (which MUST be +// identical across nodes for the same message) and the fail-open path. + +func TestDispatchDedupKey_sameMessageSameKeyAcrossNodes(t *testing.T) { + // The whole mechanism depends on every node computing the SAME key for + // the SAME (namespace, topic, payload) — otherwise the cross-node + // claim can't dedup. Pure function of the inputs, so two "nodes" + // (two calls) must agree. + data := []byte(`{"messageId":"abc","seq":42}`) + k1 := dispatchDedupKey("anchat-test", "messages:new", data) + k2 := dispatchDedupKey("anchat-test", "messages:new", data) + if k1 != k2 { + t.Fatalf("same message must yield same key on every node; got %q vs %q", k1, k2) + } + if k1 == "" { + t.Error("key must not be empty") + } +} + +func TestDispatchDedupKey_differsByPayloadTopicNamespace(t *testing.T) { + base := dispatchDedupKey("ns", "messages:new", []byte("A")) + cases := map[string]string{ + "different payload": dispatchDedupKey("ns", "messages:new", []byte("B")), + "different topic": dispatchDedupKey("ns", "other:topic", []byte("A")), + "different namespace": dispatchDedupKey("ns2", "messages:new", []byte("A")), + } + for name, k := range cases { + if k == base { + t.Errorf("%s must produce a DIFFERENT key (else distinct events get deduped together)", name) + } + } +} + +func TestClaimDispatch_failsOpenWhenNoOlric(t *testing.T) { + // No shared store → can't coordinate → must FIRE (return true), never + // silently drop the wake. This is the single-node / cache-disabled + // path and the fail-open guarantee. + d := &PubSubDispatcher{logger: zap.NewNop()} // olricClient nil + if !d.claimDispatch(context.Background(), "ns", "messages:new", []byte("x")) { + t.Error("claimDispatch must fail-open (true) when Olric is unavailable — a dropped wake is worse than a dup") + } +} diff --git a/core/pkg/serverless/triggers/dispatcher.go b/core/pkg/serverless/triggers/dispatcher.go index 0126f97..dcff17f 100644 --- a/core/pkg/serverless/triggers/dispatcher.go +++ b/core/pkg/serverless/triggers/dispatcher.go @@ -2,7 +2,10 @@ package triggers import ( "context" + "crypto/sha256" "encoding/json" + "errors" + "fmt" "sync" "time" @@ -21,6 +24,19 @@ const ( // dispatchTimeout is the timeout for each triggered function invocation. dispatchTimeout = 60 * time.Second + // dispatchDedupDMap / dispatchDedupTTL implement cluster-wide + // once-per-publish trigger dispatch (bugboard #30). gossipsub delivers + // the SAME published message to EVERY gateway node subscribed to a + // concrete trigger topic, so without dedup an N-gateway cluster fires + // the handler ~N times for one publish (AnChat saw exactly 2 on a + // 3-gateway cluster → 2 pushes per message). The first node to claim + // the (namespace, topic, payload-hash) key in the per-namespace Olric + // dispatches; the others skip. TTL bounds the claim to cover gossip + // fan-out jitter without de-duplicating legitimately-repeated publishes + // seconds apart. + dispatchDedupDMap = "pubsub_dispatch_dedup" + dispatchDedupTTL = 30 * time.Second + // dispatcherRefreshInterval is the safety-net cadence for re-syncing // libp2p subscriptions against the trigger store. Trigger add/remove // calls Refresh synchronously; this catches anything missed (e.g. an @@ -321,6 +337,18 @@ func (d *PubSubDispatcher) Dispatch(ctx context.Context, namespace, topic string return } + // Cluster-wide once-per-publish dedup (bugboard #30). gossipsub + // delivers a publish to every subscribed gateway node; only the node + // that wins the Olric claim for this (namespace, topic, payload) + // proceeds, so the trigger fires once cluster-wide instead of once + // per gateway node. + if !d.claimDispatch(ctx, namespace, topic, data) { + d.logger.Debug("PubSub dispatch deduped (claimed by another node)", + zap.String("namespace", namespace), + zap.String("topic", topic)) + return + } + matches, err := d.getMatches(ctx, namespace, topic) if err != nil { d.logger.Error("Failed to look up PubSub triggers", @@ -514,6 +542,63 @@ func (d *PubSubDispatcher) bufferEvent(match TriggerMatch, event PubSubEvent) { }) } +// dispatchDedupKey is the Olric key for the once-per-publish claim. Pure +// function of (namespace, topic, payload) so the SAME message produces +// the SAME key on every gateway node (that's what makes the cross-node +// claim work), while different messages/topics/namespaces don't collide. +// Pure → unit-testable. +// +// Keyed on the payload hash because the gossipsub message-ID isn't +// plumbed through the subscribe handler. Real payloads carry a unique id +// (messageId/seq), so byte-identical distinct messages within the TTL are +// not a practical concern. Known limitation (LOW, in-namespace only): an +// authorized in-namespace publisher could pre-claim a key by publishing +// byte-identical bytes first, suppressing a legitimate identical publish +// for the TTL window. Follow-up hardening: fold the gossipsub message-ID +// into the key once the subscribe handler exposes it. +func dispatchDedupKey(namespace, topic string, data []byte) string { + sum := sha256.Sum256(data) + // 16 bytes of the hash is ample collision resistance for a 30s window. + return fmt.Sprintf("%s|%s|%x", namespace, topic, sum[:16]) +} + +// claimDispatch returns true if THIS node should dispatch the given +// (namespace, topic, payload) — i.e. it won the cluster-wide claim. +// Bugboard #30. +// +// Uses an Olric NX ("set if not exists") write with a short TTL. The +// first node to write the key wins (returns true); concurrent writers +// from the gossipsub fan-out get ErrKeyFound and return false (skip). +// +// FAIL-OPEN: when Olric is unavailable (nil client, DMap error, or any +// non-"key found" error) this returns true. Dedup is a de-duplication +// optimization, not a correctness gate — a rare duplicate dispatch is +// far better than silently dropping a wake-up across the whole cluster. +func (d *PubSubDispatcher) claimDispatch(ctx context.Context, namespace, topic string, data []byte) bool { + if d.olricClient == nil { + return true // no shared store → can't coordinate → fire + } + dm, err := d.olricClient.NewDMap(dispatchDedupDMap) + if err != nil { + d.logger.Debug("dispatch dedup: NewDMap failed, firing (fail-open)", zap.Error(err)) + return true + } + key := dispatchDedupKey(namespace, topic, data) + err = dm.Put(ctx, key, 1, olriclib.NX(), olriclib.EX(dispatchDedupTTL)) + if err == nil { + return true // we claimed it → dispatch + } + if errors.Is(err, olriclib.ErrKeyFound) { + return false // another node already claimed it → skip + } + // Any other (transient) error: fail-open and fire rather than risk a + // dropped wake. Worst case is a duplicate, which is what #30 already + // had — never worse. + d.logger.Debug("dispatch dedup: claim errored, firing (fail-open)", + zap.String("topic", topic), zap.Error(err)) + return true +} + // InvalidateCache is now a no-op — the dispatcher no longer caches lookups. // Kept on the type so callers who used it still compile. func (d *PubSubDispatcher) InvalidateCache(ctx context.Context, namespace, topic string) {}