diff --git a/VERSION b/VERSION index 6e5f3bd..b9cc0b9 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.122.24 +0.122.25 diff --git a/core/pkg/gateway/gateway.go b/core/pkg/gateway/gateway.go index 16bd66e..29650e4 100644 --- a/core/pkg/gateway/gateway.go +++ b/core/pkg/gateway/gateway.go @@ -13,8 +13,6 @@ import ( "net/http" "path/filepath" "reflect" - "strconv" - "strings" "sync" "time" @@ -647,24 +645,19 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { // Get libp2p host from client host := deps.Client.Host() if host != nil { - // Parse listen port from ListenAddr (format: ":port" or "addr:port") - listenPort := 0 - if cfg.ListenAddr != "" { - parts := strings.Split(cfg.ListenAddr, ":") - if len(parts) > 0 { - portStr := parts[len(parts)-1] - if p, err := strconv.Atoi(portStr); err == nil { - listenPort = p - } - } - } + // NOTE: we deliberately do NOT pass cfg.ListenAddr's port here + // anymore — that's the gateway's HTTP API port, NOT the libp2p + // port. Passing it caused every cross-node libp2p dial to land + // on the HTTP server and fail the multistream handshake, + // leaving the namespace mesh with 0 connected peers. The libp2p + // port is OS-assigned and lives on host.Addrs() — peer + // discovery extracts it from there at register time. // Create peer discovery manager gw.peerDiscovery = NewPeerDiscovery( host, deps.SQLDB, cfg.NodePeerID, - listenPort, cfg.ClientNamespace, logger.Logger, ) diff --git a/core/pkg/gateway/peer_discovery.go b/core/pkg/gateway/peer_discovery.go index 43432f6..0e2be9b 100644 --- a/core/pkg/gateway/peer_discovery.go +++ b/core/pkg/gateway/peer_discovery.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "os/exec" + "strconv" "strings" "time" @@ -16,29 +17,33 @@ import ( "go.uber.org/zap" ) -// PeerDiscovery manages namespace gateway peer discovery via RQLite +// PeerDiscovery manages namespace gateway peer discovery via RQLite. +// +// The libp2p listen port is NOT stored here — it's derived live from +// pd.host.Addrs() at register time. Previously this struct held a +// `listenPort` field populated from the gateway's HTTP API port (which +// silently broke all cross-node libp2p connections — see comment on +// registerSelf). Don't add it back. type PeerDiscovery struct { - host host.Host - rqliteDB *sql.DB - nodeID string - listenPort int - namespace string - logger *zap.Logger + host host.Host + rqliteDB *sql.DB + nodeID string + namespace string + logger *zap.Logger // Stop channel for background goroutines stopCh chan struct{} } -// NewPeerDiscovery creates a new peer discovery manager -func NewPeerDiscovery(h host.Host, rqliteDB *sql.DB, nodeID string, listenPort int, namespace string, logger *zap.Logger) *PeerDiscovery { +// NewPeerDiscovery creates a new peer discovery manager. +func NewPeerDiscovery(h host.Host, rqliteDB *sql.DB, nodeID string, namespace string, logger *zap.Logger) *PeerDiscovery { return &PeerDiscovery{ - host: h, - rqliteDB: rqliteDB, - nodeID: nodeID, - listenPort: listenPort, - namespace: namespace, - logger: logger, - stopCh: make(chan struct{}), + host: h, + rqliteDB: rqliteDB, + nodeID: nodeID, + namespace: namespace, + logger: logger, + stopCh: make(chan struct{}), } } @@ -129,8 +134,26 @@ func (pd *PeerDiscovery) registerSelf(ctx context.Context) error { return fmt.Errorf("failed to get WireGuard IP: %w", err) } - // Build multiaddr: /ip4//tcp//p2p/ - multiaddr := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", wireguardIP, pd.listenPort, peerID) + // CRITICAL: we used to publish `pd.listenPort` here, which is the gateway's + // HTTP API port (e.g. 10004). Other gateways would read this multiaddr from + // rqlite, dial /ip4//tcp/10004, hit the HTTP server, receive + // `HTTP/1.1 400 Bad Request`, and fail the libp2p multistream handshake + // with "message did not have trailing newline". The result: cross-node + // libp2p mesh had 0 connected peers cluster-wide and cross-node pubsub + // silently dropped 100% of messages. + // + // The actual libp2p port is OS-assigned at startup (client.go listens on + // `/ip4/0.0.0.0/tcp/0`), so we must derive it from the live host instead + // of the gateway's HTTP config. The listener binds 0.0.0.0 so it accepts + // traffic on the WG interface even though libp2p only reports loopback + + // public-routable addresses in host.Addrs(). + libp2pPort, err := extractLibp2pTCPPort(pd.host.Addrs()) + if err != nil { + return fmt.Errorf("failed to extract libp2p TCP port from host addresses: %w", err) + } + + // Build multiaddr: /ip4//tcp//p2p/ + multiaddr := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", wireguardIP, libp2pPort, peerID) query := ` INSERT OR REPLACE INTO _namespace_libp2p_peers @@ -138,11 +161,14 @@ func (pd *PeerDiscovery) registerSelf(ctx context.Context) error { VALUES (?, ?, ?, ?, ?, ?) ` + // We persist libp2pPort in the listen_port column too — the column is + // informational metadata for operators (the multiaddr is authoritative), + // and keeping it consistent avoids future debugging confusion. _, err = pd.rqliteDB.ExecContext(ctx, query, peerID, multiaddr, pd.nodeID, - pd.listenPort, + libp2pPort, pd.namespace, time.Now().UTC()) @@ -153,11 +179,47 @@ func (pd *PeerDiscovery) registerSelf(ctx context.Context) error { pd.logger.Info("Registered self in peer discovery", zap.String("peer_id", peerID), zap.String("multiaddr", multiaddr), - zap.String("node_id", pd.nodeID)) + zap.String("node_id", pd.nodeID), + zap.Int("libp2p_port", libp2pPort)) return nil } +// extractLibp2pTCPPort returns the TCP port the libp2p host is actually +// listening on, by parsing the host's reported listen addresses. +// +// `host.Addrs()` returns multiaddrs like: +// +// /ip4/127.0.0.1/tcp/43043 +// /ip4/217.76.56.2/tcp/43043 +// +// All entries share the same port (libp2p binds 0.0.0.0:RANDOM_PORT and +// reports one entry per detected interface IP). We take the first `/tcp/` +// component we find. +// +// Note: the WireGuard IP (10.0.0.x) does NOT appear in host.Addrs() because +// libp2p filters its own address enumeration. The listener IS bound to all +// interfaces including wg0, so the port is still reachable on the WG IP — +// we just have to combine the port we extract here with the WG IP we get +// separately (via getWireGuardIP). +func extractLibp2pTCPPort(addrs []multiaddr.Multiaddr) (int, error) { + for _, a := range addrs { + port, err := a.ValueForProtocol(multiaddr.P_TCP) + if err != nil { + continue // not a TCP multiaddr (could be QUIC, etc.) — skip + } + n, parseErr := strconv.Atoi(port) + if parseErr != nil { + continue + } + if n <= 0 || n > 65535 { + continue + } + return n, nil + } + return 0, fmt.Errorf("no TCP port found in libp2p host addresses (got %d addrs)", len(addrs)) +} + // unregisterSelf removes this gateway from the discovery table func (pd *PeerDiscovery) unregisterSelf(ctx context.Context) error { peerID := pd.host.ID().String() diff --git a/core/pkg/gateway/peer_discovery_test.go b/core/pkg/gateway/peer_discovery_test.go new file mode 100644 index 0000000..ac5b3fd --- /dev/null +++ b/core/pkg/gateway/peer_discovery_test.go @@ -0,0 +1,112 @@ +package gateway + +import ( + "testing" + + "github.com/multiformats/go-multiaddr" +) + +// TestExtractLibp2pTCPPort_FindsPort verifies the helper finds the TCP port +// from a typical libp2p host.Addrs() result. +// +// This is the regression guard for the bug where peer_discovery was +// announcing the gateway's HTTP API port (e.g. 10004) instead of the +// libp2p host's actual TCP port (random per restart). With the wrong +// port in the multiaddr, every cross-node libp2p dial landed on the HTTP +// server and failed the multistream handshake with "message did not have +// trailing newline" — leaving the cluster's namespace mesh with 0 +// connected peers and silently dropping all cross-node pubsub traffic. +func TestExtractLibp2pTCPPort_FindsPort(t *testing.T) { + addrs := mustParseAddrs(t, + "/ip4/127.0.0.1/tcp/43043", + "/ip4/217.76.56.2/tcp/43043", + ) + + port, err := extractLibp2pTCPPort(addrs) + if err != nil { + t.Fatalf("extractLibp2pTCPPort: %v", err) + } + if port != 43043 { + t.Errorf("port = %d, want 43043", port) + } +} + +// TestExtractLibp2pTCPPort_SkipsNonTCPAddrs verifies the helper does not +// fail when the host advertises non-TCP transports (e.g. QUIC, WebSocket). +// It must find the first TCP entry and return that. +func TestExtractLibp2pTCPPort_SkipsNonTCPAddrs(t *testing.T) { + addrs := mustParseAddrs(t, + "/ip4/127.0.0.1/udp/9999/quic-v1", + "/ip4/127.0.0.1/tcp/43043", + "/ip4/217.76.56.2/tcp/43043", + ) + + port, err := extractLibp2pTCPPort(addrs) + if err != nil { + t.Fatalf("extractLibp2pTCPPort: %v", err) + } + if port != 43043 { + t.Errorf("port = %d, want 43043 (TCP port should be picked, not QUIC)", port) + } +} + +// TestExtractLibp2pTCPPort_NoAddrsReturnsError verifies the helper returns +// an error rather than silently announcing port 0 when the host hasn't +// reported any addresses yet (e.g. called too early in lifecycle). +// +// A silent failure mode here is exactly what masked the original bug for +// so long — we'd rather get a loud error at register time than write +// `/ip4/.../tcp/0/...` to the discovery table. +func TestExtractLibp2pTCPPort_NoAddrsReturnsError(t *testing.T) { + _, err := extractLibp2pTCPPort(nil) + if err == nil { + t.Error("expected error for nil addrs, got nil") + } +} + +// TestExtractLibp2pTCPPort_AllUDPReturnsError verifies the helper returns +// an error when no TCP transports are present (UDP-only host). Persisting +// a TCP multiaddr that no listener serves would be the same class of bug. +func TestExtractLibp2pTCPPort_AllUDPReturnsError(t *testing.T) { + addrs := mustParseAddrs(t, + "/ip4/127.0.0.1/udp/9999/quic-v1", + "/ip4/217.76.56.2/udp/9999/quic-v1", + ) + + if _, err := extractLibp2pTCPPort(addrs); err == nil { + t.Error("expected error for TCP-less addrs, got nil") + } +} + +// TestExtractLibp2pTCPPort_AllAddrsShareSamePort verifies the realistic +// libp2p output shape: one entry per detected interface IP, all sharing +// the same OS-assigned port (because the listener binds 0.0.0.0:RANDOM). +// We take the first; we expect them all equal. +func TestExtractLibp2pTCPPort_AllAddrsShareSamePort(t *testing.T) { + addrs := mustParseAddrs(t, + "/ip4/127.0.0.1/tcp/55555", + "/ip4/10.0.0.6/tcp/55555", + "/ip4/51.38.128.56/tcp/55555", + ) + + port, err := extractLibp2pTCPPort(addrs) + if err != nil { + t.Fatalf("extractLibp2pTCPPort: %v", err) + } + if port != 55555 { + t.Errorf("port = %d, want 55555", port) + } +} + +func mustParseAddrs(t *testing.T, raws ...string) []multiaddr.Multiaddr { + t.Helper() + out := make([]multiaddr.Multiaddr, 0, len(raws)) + for _, r := range raws { + m, err := multiaddr.NewMultiaddr(r) + if err != nil { + t.Fatalf("parse multiaddr %q: %v", r, err) + } + out = append(out, m) + } + return out +} diff --git a/core/pkg/serverless/invoke.go b/core/pkg/serverless/invoke.go index 49ce65f..b05f495 100644 --- a/core/pkg/serverless/invoke.go +++ b/core/pkg/serverless/invoke.go @@ -97,15 +97,24 @@ func (i *Invoker) Invoke(ctx context.Context, req *InvokeRequest) (*InvokeRespon }, err } - // Check authorization - authorized, err := i.CanInvoke(ctx, req.Namespace, req.FunctionName, req.CallerWallet) - if err != nil || !authorized { - return &InvokeResponse{ - RequestID: requestID, - Status: InvocationStatusError, - Error: "unauthorized", - DurationMS: time.Since(startTime).Milliseconds(), - }, ErrUnauthorized + // Check authorization — ONLY for user-driven trigger types. System + // triggers (cron, pubsub, database, timer, job) fire from rows the + // gateway itself persisted on behalf of an already-authenticated + // operator; there is no per-invocation caller identity to check, and + // requiring one is a 100% blocking no-op safety check (see bugboard + // #264). The auth boundary for system triggers is at REGISTRATION + // time (HTTP `POST /v1/functions/{name}/triggers`, or deploy-time + // auto-register from function.yaml), not at firing time. + if !isSystemTrigger(req.TriggerType) { + authorized, err := i.CanInvoke(ctx, req.Namespace, req.FunctionName, req.CallerWallet) + if err != nil || !authorized { + return &InvokeResponse{ + RequestID: requestID, + Status: InvocationStatusError, + Error: "unauthorized", + DurationMS: time.Since(startTime).Milliseconds(), + }, ErrUnauthorized + } } // Get environment variables @@ -451,6 +460,29 @@ func (i *Invoker) BatchInvoke(ctx context.Context, req *BatchInvokeRequest) (*Ba // until there's a concrete tenant requirement. Today, "private" means // "authenticated in-namespace caller required" and that's enforced // here + at authMiddleware. +// isSystemTrigger reports whether a trigger type fires from gateway-internal +// state (a cron row, a pubsub dispatcher, a DB-change watcher, an in-process +// scheduler) rather than from an external caller request. +// +// The distinction matters for authorization: +// +// - User-driven triggers (HTTP, WebSocket) carry a real caller identity +// populated by auth middleware. CanInvoke gates them on that identity. +// - System triggers carry no caller identity by design — they were +// registered by an already-authenticated operator, stored in the +// namespace's own rqlite, and are now firing from the gateway process +// itself. Gating them on CallerWallet returns false unconditionally and +// silently blocks every fire (bugboard #264 — discovered via a cron +// trigger that fired every minute with "unauthorized" for 19+ hours). +func isSystemTrigger(t TriggerType) bool { + switch t { + case TriggerTypeCron, TriggerTypePubSub, TriggerTypeDatabase, + TriggerTypeTimer, TriggerTypeJob: + return true + } + return false +} + func (i *Invoker) CanInvoke(ctx context.Context, namespace, functionName string, callerWallet string) (bool, error) { fn, err := i.registry.Get(ctx, namespace, functionName, 0) if err != nil { diff --git a/core/pkg/serverless/invoke_system_trigger_test.go b/core/pkg/serverless/invoke_system_trigger_test.go new file mode 100644 index 0000000..c7d714b --- /dev/null +++ b/core/pkg/serverless/invoke_system_trigger_test.go @@ -0,0 +1,207 @@ +package serverless + +import ( + "context" + "errors" + "testing" + + "go.uber.org/zap" +) + +// TestIsSystemTrigger covers every trigger type exhaustively. The list +// matters: user-driven triggers MUST go through CanInvoke (auth middleware +// is the source of truth for caller identity); system triggers MUST bypass +// it (they have no caller — the trigger row IS the authorization, set at +// registration time). +// +// If a future contributor adds a new TriggerType, this test forces them to +// classify it here. Without that, the default (false → goes through +// CanInvoke) is the safer choice — but if the new type is system-internal +// and the contributor doesn't update isSystemTrigger, the symptom is the +// exact bug we just fixed: every fire returns "unauthorized" silently. +func TestIsSystemTrigger(t *testing.T) { + cases := []struct { + trigger TriggerType + system bool + }{ + // User-driven — must NOT be system. + {TriggerTypeHTTP, false}, + {TriggerTypeWebSocket, false}, + + // System-driven — fires from gateway-internal state. + {TriggerTypeCron, true}, + {TriggerTypePubSub, true}, + {TriggerTypeDatabase, true}, + {TriggerTypeTimer, true}, + {TriggerTypeJob, true}, + + // Unknown trigger types default to user-driven (safe default — go + // through CanInvoke and fail closed if there's no caller). + {TriggerType("future-unknown"), false}, + {TriggerType(""), false}, + } + for _, c := range cases { + got := isSystemTrigger(c.trigger) + if got != c.system { + t.Errorf("isSystemTrigger(%q) = %v, want %v", c.trigger, got, c.system) + } + } +} + +// invokeMockRegistry is a minimal FunctionRegistry that returns a single +// canned function. Anything else panics so accidental drift is loud. +type invokeMockRegistry struct { + FunctionRegistry // embedded — calling unimplemented methods panics + + fn *Function +} + +func (m *invokeMockRegistry) Get(_ context.Context, _, _ string, _ int) (*Function, error) { + return m.fn, nil +} + +// TestInvoke_systemTriggerBypassesAuth is the regression guard for +// bugboard #264: a private function registered with a cron trigger fired +// every minute with `"unauthorized"` because Invoke called CanInvoke with +// an empty CallerWallet, which is a 100% blocker for private functions. +// +// The fix gates CanInvoke on !isSystemTrigger(req.TriggerType). This test +// asserts the gate works for every system trigger type (cron, pubsub, +// database, timer, job) AND that user-driven triggers (http, websocket) +// still hit the auth check. +// +// Implementation note: we use a cancelled ctx so the call short-circuits +// inside executeWithRetry's ctx.Err() check at line 223 BEFORE touching +// engine (which is nil in this test). That lets us distinguish "blocked at +// auth" (err = ErrUnauthorized) from "passed auth, blocked later" (err = +// context.Canceled) without standing up a real WASM engine. +func TestInvoke_systemTriggerBypassesAuth(t *testing.T) { + privateFn := &Function{ + ID: "fn-id", + Namespace: "anchat-test", + Name: "push-fanout", + IsPublic: false, + } + inv := &Invoker{ + registry: &invokeMockRegistry{fn: privateFn}, + logger: zap.NewNop(), + // engine intentionally nil — cancelled-ctx short-circuit prevents reach. + } + + cases := []struct { + name string + trigger TriggerType + wantAuth bool // true → must hit ErrUnauthorized; false → must NOT + }{ + // System triggers — must bypass auth. The original bug was every + // one of these returning ErrUnauthorized. + {"cron bypasses auth", TriggerTypeCron, false}, + {"pubsub bypasses auth", TriggerTypePubSub, false}, + {"database bypasses auth", TriggerTypeDatabase, false}, + {"timer bypasses auth", TriggerTypeTimer, false}, + {"job bypasses auth", TriggerTypeJob, false}, + + // User-driven triggers — must STILL block anonymous callers on + // private functions. The fix narrows the gate; it does NOT + // remove it. + {"http blocks anonymous", TriggerTypeHTTP, true}, + {"websocket blocks anonymous", TriggerTypeWebSocket, true}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() // pre-cancelled so executeWithRetry short-circuits + + req := &InvokeRequest{ + Namespace: "anchat-test", + FunctionName: "push-fanout", + Input: []byte(`{"trigger":"test"}`), + TriggerType: tc.trigger, + CallerWallet: "", // anonymous — what cron/pubsub/etc. naturally have + } + resp, err := inv.Invoke(ctx, req) + + if tc.wantAuth { + // User-driven path: must hit the auth wall. + if !errors.Is(err, ErrUnauthorized) { + t.Errorf("trigger=%s wallet='': err=%v, want ErrUnauthorized", tc.trigger, err) + } + if resp == nil || resp.Error != "unauthorized" { + t.Errorf("trigger=%s: expected response.Error=\"unauthorized\", got %+v", tc.trigger, resp) + } + } else { + // System trigger: must NOT hit auth. Any other error is + // fine (we forced a cancelled ctx so we expect ctx.Err() + // or a wrapped version of it). The key invariant is + // "ErrUnauthorized must not appear". + if errors.Is(err, ErrUnauthorized) { + t.Errorf("trigger=%s: system trigger blocked at auth (regression of bugboard #264): %+v", tc.trigger, resp) + } + } + }) + } +} + +// TestInvoke_systemTriggerStillAllowsPublic is a sanity check: public +// functions invoked by a system trigger should work exactly the same as +// before (the auth gate was a no-op for them anyway). The bypass must +// not change semantics for public functions. +func TestInvoke_systemTriggerStillAllowsPublic(t *testing.T) { + publicFn := &Function{ + ID: "fn-id", + Namespace: "anchat-test", + Name: "ping", + IsPublic: true, + } + inv := &Invoker{ + registry: &invokeMockRegistry{fn: publicFn}, + logger: zap.NewNop(), + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + req := &InvokeRequest{ + Namespace: "anchat-test", + FunctionName: "ping", + Input: []byte(`{}`), + TriggerType: TriggerTypeCron, + CallerWallet: "", + } + _, err := inv.Invoke(ctx, req) + if errors.Is(err, ErrUnauthorized) { + t.Errorf("public function + system trigger should never be unauthorized: %v", err) + } +} + +// TestInvoke_userTriggerWithCallerStillWorks verifies the fix doesn't +// regress the happy path for user-driven triggers: an HTTP request with a +// real CallerWallet on a private function still succeeds at the auth gate. +func TestInvoke_userTriggerWithCallerStillWorks(t *testing.T) { + privateFn := &Function{ + ID: "fn-id", + Namespace: "anchat-test", + Name: "user-create", + IsPublic: false, + CreatedBy: "0xdeployer", + } + inv := &Invoker{ + registry: &invokeMockRegistry{fn: privateFn}, + logger: zap.NewNop(), + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + req := &InvokeRequest{ + Namespace: "anchat-test", + FunctionName: "user-create", + Input: []byte(`{}`), + TriggerType: TriggerTypeHTTP, + CallerWallet: "0xRealUser", + } + _, err := inv.Invoke(ctx, req) + if errors.Is(err, ErrUnauthorized) { + t.Errorf("authenticated HTTP caller on private function must pass auth: %v", err) + } +} diff --git a/sdk/package.json b/sdk/package.json index dced30c..d638ca2 100644 --- a/sdk/package.json +++ b/sdk/package.json @@ -1,6 +1,6 @@ { "name": "@debros/orama", - "version": "0.122.24", + "version": "0.122.25", "description": "TypeScript SDK for Orama Network - Database, PubSub, Cache, Storage, Vault, and more", "type": "module", "main": "./dist/index.js",