From 9c213a166c04ef9bfb740d6e253a8d157603c9df Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Mon, 15 Jun 2026 08:05:38 +0300 Subject: [PATCH] feat(serverless,namespace): cut namespace gateway RPC latency (#708) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 5-10s RPCs that broke calling were not cold-start — they were per-RPC sequential rqlite reads, each forwarded to a raft leader that geography-blind election had placed on a 256ms-distant node. Lever A (serverless): cache function metadata + env vars in-process (5s TTL, invalidated on deploy/enable/disable/delete) and stop the hot invoke path re-fetching the function for the authorization check — removes ~820ms of leader-routed pre-flight reads from every op. Lever B (namespace): a locality-aware leadership reconciler hands raft leadership off a geographically-isolated namespace leader to the nearest co-located voter, via rqlite's transfer-leadership API. All nodes stay voters — membership, quorum and fault tolerance are unchanged. Cuts the per-hop cost from ~274ms to ~20ms when a distant node had become leader. --- core/pkg/namespace/cluster_manager.go | 5 + core/pkg/namespace/leader_locality.go | 213 +++++++++++++++++++++ core/pkg/namespace/leader_locality_test.go | 93 +++++++++ core/pkg/node/gateway.go | 7 + core/pkg/rqlite/leadership.go | 124 ++++++------ core/pkg/serverless/invoke.go | 41 ++-- core/pkg/serverless/registry.go | 121 +++++++++++- core/pkg/serverless/registry_cache_test.go | 118 ++++++++++++ 8 files changed, 640 insertions(+), 82 deletions(-) create mode 100644 core/pkg/namespace/leader_locality.go create mode 100644 core/pkg/namespace/leader_locality_test.go create mode 100644 core/pkg/serverless/registry_cache_test.go diff --git a/core/pkg/namespace/cluster_manager.go b/core/pkg/namespace/cluster_manager.go index de60428..d630bbf 100644 --- a/core/pkg/namespace/cluster_manager.go +++ b/core/pkg/namespace/cluster_manager.go @@ -86,6 +86,11 @@ type ClusterManager struct { // Track provisioning operations provisioningMu sync.RWMutex provisioning map[string]bool // namespace -> in progress + + // Leadership-locality reconciler cooldown (bugboard #708): per-namespace + // timestamp of the last leadership transfer, to bound churn. Lazy-init. + leaderLocalityMu sync.Mutex + leaderLocalityCooldown map[string]time.Time } // NewClusterManager creates a new cluster manager diff --git a/core/pkg/namespace/leader_locality.go b/core/pkg/namespace/leader_locality.go new file mode 100644 index 0000000..67fa8bf --- /dev/null +++ b/core/pkg/namespace/leader_locality.go @@ -0,0 +1,213 @@ +package namespace + +import ( + "context" + "net" + "path/filepath" + "time" + + "github.com/DeBrosOfficial/network/pkg/rqlite" + "go.uber.org/zap" +) + +// Bugboard #708 — namespace raft leadership is geography-blind: the initial +// leader is sortedNodeIDs[0] over random libp2p peer IDs, and raft re-elects +// freely on every restart. When a geographically-distant node (high WireGuard +// RTT to its peers) becomes the leader, EVERY namespace write funnels through +// the distant node and waits on its cross-region replication for quorum — each +// rqlite hop jumps from ~20ms (co-located) to ~256ms, stacking into 5-10s RPCs +// that break calling. +// +// This reconciler keeps namespace leadership on a co-located voter. It NEVER +// removes a node or changes voter membership — all nodes stay voters (quorum +// and fault tolerance unchanged). It only hands leadership OFF a node that is +// isolated from the rest of the cluster, using rqlite's own +// transfer-leadership API. +const ( + // leaderLocalityInterval is how often each node checks whether the + // namespace clusters it leads are well-placed. + leaderLocalityInterval = 90 * time.Second + // leaderLocalityRTTThreshold: if the leader's CLOSEST voter peer is farther + // than this, the leader is treated as geographically isolated and hands off + // leadership. Co-located nodes are ~20ms apart; a distant node is ~256ms — + // 100ms cleanly separates the two without false positives. + leaderLocalityRTTThreshold = 100 * time.Millisecond + // leaderLocalityCooldown bounds how often a single namespace's leadership + // is moved. In the common topology (a lone distant node among co-located + // peers) ONE transfer settles leadership on a co-located voter, which then + // stays (it has a nearby peer, so it never re-triggers). In a pathological + // all-mutually-distant topology there is no good leader to move to and the + // nearest-peer transfer would rotate; the cooldown caps that to roughly one + // transfer per node per window (bounded, non-destructive — membership and + // quorum are never touched), and node selection clustering most nodes + // ~20ms apart makes that case rare. + leaderLocalityCooldown = 10 * time.Minute + // leaderLocalityDialTimeout bounds each per-peer RTT probe. + leaderLocalityDialTimeout = 3 * time.Second +) + +// decideLeadershipTransfer is the pure decision: should the local leader hand +// off leadership, and to which voter? peerRTTs maps each OTHER reachable voter's +// raft address → measured RTT. Returns a target and true ONLY when this node is +// the leader, every voter is reachable (don't destabilize an already-degraded +// cluster), the cooldown has elapsed, and even the CLOSEST peer is farther than +// `threshold` — i.e. the leader is isolated. If the leader has at least one +// nearby voter it is central enough; leave it. The chosen target is the nearest +// reachable peer (which, in a 1-distant/N-close topology, is a co-located node +// that will then have a nearby peer of its own → stable). +func decideLeadershipTransfer(isLeader, allVotersReachable, cooldownElapsed bool, peerRTTs map[string]time.Duration, threshold time.Duration) (string, bool) { + if !isLeader || !allVotersReachable || !cooldownElapsed || len(peerRTTs) == 0 { + return "", false + } + var bestAddr string + var bestRTT time.Duration + for addr, rtt := range peerRTTs { + if bestAddr == "" || rtt < bestRTT { + bestAddr, bestRTT = addr, rtt + } + } + if bestRTT > threshold { + return bestAddr, true + } + return "", false +} + +// measurePeerRTTs probes every OTHER voter's raft address and returns their +// RTTs plus whether ALL voters were reachable+measurable (so the caller can +// refuse to act on a degraded cluster). Non-voters and self are skipped. +func measurePeerRTTs(nodes rqlite.RQLiteNodes, selfID string) (map[string]time.Duration, bool) { + peerRTTs := make(map[string]time.Duration) + allReachable := true + for _, n := range nodes { + if !n.Voter || n.ID == selfID { + continue + } + if !n.Reachable { + allReachable = false + continue + } + dialAddr := n.Address + if dialAddr == "" { + dialAddr = n.ID + } + rtt, derr := measureRaftRTT(dialAddr, leaderLocalityDialTimeout) + if derr != nil { + allReachable = false + continue + } + peerRTTs[n.ID] = rtt + } + return peerRTTs, allReachable +} + +// measureRaftRTT returns the TCP-connect time to a peer's raft address — a +// privilege-free proxy for WireGuard round-trip latency. +func measureRaftRTT(raftAddr string, timeout time.Duration) (time.Duration, error) { + start := time.Now() + conn, err := net.DialTimeout("tcp", raftAddr, timeout) + if err != nil { + return 0, err + } + _ = conn.Close() + return time.Since(start), nil +} + +func (cm *ClusterManager) leaderTransferCooldownElapsed(namespace string) bool { + cm.leaderLocalityMu.Lock() + defer cm.leaderLocalityMu.Unlock() + last, ok := cm.leaderLocalityCooldown[namespace] + return !ok || time.Since(last) >= leaderLocalityCooldown +} + +func (cm *ClusterManager) recordLeaderTransfer(namespace string) { + cm.leaderLocalityMu.Lock() + defer cm.leaderLocalityMu.Unlock() + if cm.leaderLocalityCooldown == nil { + cm.leaderLocalityCooldown = make(map[string]time.Time) + } + cm.leaderLocalityCooldown[namespace] = time.Now() +} + +// StartLeaderLocalityReconciler runs the periodic leadership-locality check +// until ctx is cancelled. Safe to call once at node boot. +func (cm *ClusterManager) StartLeaderLocalityReconciler(ctx context.Context) { + go func() { + ticker := time.NewTicker(leaderLocalityInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + cm.reconcileLeaderLocality(ctx) + } + } + }() +} + +// reconcileLeaderLocality checks every namespace cluster this node hosts and, +// for any it currently leads from an isolated position, transfers leadership to +// the nearest co-located voter. +func (cm *ClusterManager) reconcileLeaderLocality(ctx context.Context) { + pattern := filepath.Join(cm.baseDataDir, "*", "cluster-state.json") + matches, err := filepath.Glob(pattern) + if err != nil { + cm.logger.Debug("leader-locality: glob failed", zap.Error(err)) + return + } + for _, path := range matches { + if ctx.Err() != nil { + return + } + state, err := loadLocalState(path) + if err != nil { + continue + } + cm.reconcileNamespaceLeader(state.NamespaceName, state.LocalPorts.RQLiteHTTPPort) + } +} + +// reconcileNamespaceLeader handles a single namespace's leadership locality. +func (cm *ClusterManager) reconcileNamespaceLeader(namespace string, rqliteHTTPPort int) { + if rqliteHTTPPort == 0 { + return + } + status, err := rqlite.GetRaftStatus(rqliteHTTPPort) + if err != nil { + // rqlite not up / not reachable on this node — nothing to do. + return + } + if status.Store.Raft.State != "Leader" { + return // only the leader can transfer leadership away + } + selfID := status.Store.Raft.LeaderID + + nodes, err := rqlite.GetRaftNodes(rqliteHTTPPort) + if err != nil { + return + } + + peerRTTs, allVotersReachable := measurePeerRTTs(nodes, selfID) + + target, transfer := decideLeadershipTransfer( + true, allVotersReachable, cm.leaderTransferCooldownElapsed(namespace), + peerRTTs, leaderLocalityRTTThreshold, + ) + if !transfer { + return + } + + cm.logger.Info("leader-locality: this node is an isolated namespace raft leader — transferring leadership to a co-located voter (bugboard #708)", + zap.String("namespace", namespace), + zap.String("from", selfID), + zap.String("to", target), + zap.Duration("target_rtt", peerRTTs[target]), + ) + // Record the cooldown BEFORE the transfer so a slow/looping transfer can't + // re-fire on the next tick regardless of outcome. + cm.recordLeaderTransfer(namespace) + if err := rqlite.TransferLeadershipTo(rqliteHTTPPort, target, cm.logger); err != nil { + cm.logger.Warn("leader-locality: leadership transfer failed", + zap.String("namespace", namespace), zap.Error(err)) + } +} diff --git a/core/pkg/namespace/leader_locality_test.go b/core/pkg/namespace/leader_locality_test.go new file mode 100644 index 0000000..858c9f0 --- /dev/null +++ b/core/pkg/namespace/leader_locality_test.go @@ -0,0 +1,93 @@ +package namespace + +import ( + "testing" + "time" +) + +// Bugboard #708 — the leadership-locality reconciler hands leadership off a +// geographically-isolated namespace raft leader to the nearest co-located +// voter, without changing membership. These pin the decision logic. + +const thr = 100 * time.Millisecond + +func TestDecideLeadershipTransfer_isolatedLeaderTransfersToNearest(t *testing.T) { + // Distant leader (109): both peers are far. Transfer to the NEAREST (57 @235ms). + peers := map[string]time.Duration{ + "10.0.0.6:10001": 256 * time.Millisecond, // 51 + "10.0.0.1:10001": 235 * time.Millisecond, // 57 + } + target, transfer := decideLeadershipTransfer(true, true, true, peers, thr) + if !transfer { + t.Fatal("an isolated leader (closest peer 235ms > 100ms) must transfer") + } + if target != "10.0.0.1:10001" { + t.Errorf("must transfer to the NEAREST peer; got %q", target) + } +} + +func TestDecideLeadershipTransfer_centralLeaderStays(t *testing.T) { + // Co-located leader (51): has a nearby peer (57 @20ms) and a distant one (109). + // min RTT 20ms < 100ms → leader is central → NO transfer (the correct steady state). + peers := map[string]time.Duration{ + "10.0.0.1:10001": 20 * time.Millisecond, // 57 (close) + "10.0.0.11:10001": 256 * time.Millisecond, // 109 (far) + } + if _, transfer := decideLeadershipTransfer(true, true, true, peers, thr); transfer { + t.Error("a leader with a nearby voter is central enough; must NOT transfer") + } +} + +func TestDecideLeadershipTransfer_allDistantTransfersToNearest(t *testing.T) { + // Pathological all-mutually-distant topology: every peer is far, so there is + // no truly co-located target. The reconciler still moves to the NEAREST + // (best available); the per-namespace cooldown (TestLeaderTransferCooldown) + // is what bounds the resulting churn to ~one transfer per node per window. + peers := map[string]time.Duration{ + "a": 250 * time.Millisecond, + "b": 210 * time.Millisecond, + } + target, transfer := decideLeadershipTransfer(true, true, true, peers, thr) + if !transfer || target != "b" { + t.Errorf("all-distant: expected transfer to nearest 'b'; got transfer=%v target=%q", transfer, target) + } +} + +func TestDecideLeadershipTransfer_guards(t *testing.T) { + farPeers := map[string]time.Duration{"p": 300 * time.Millisecond} + + if _, transfer := decideLeadershipTransfer(false, true, true, farPeers, thr); transfer { + t.Error("non-leader must never transfer") + } + if _, transfer := decideLeadershipTransfer(true, false, true, farPeers, thr); transfer { + t.Error("must not transfer when a voter is unreachable (degraded cluster)") + } + if _, transfer := decideLeadershipTransfer(true, true, false, farPeers, thr); transfer { + t.Error("must not transfer during cooldown") + } + if _, transfer := decideLeadershipTransfer(true, true, true, map[string]time.Duration{}, thr); transfer { + t.Error("must not transfer with no measurable peers (single-node / all-unreachable)") + } +} + +func TestDecideLeadershipTransfer_exactlyThresholdStays(t *testing.T) { + // Closest peer exactly at the threshold is NOT > threshold → stay (no churn at the boundary). + peers := map[string]time.Duration{"p": thr} + if _, transfer := decideLeadershipTransfer(true, true, true, peers, thr); transfer { + t.Error("RTT exactly at the threshold must not trigger a transfer") + } +} + +func TestLeaderTransferCooldown(t *testing.T) { + cm := &ClusterManager{} + if !cm.leaderTransferCooldownElapsed("ns") { + t.Error("fresh namespace (no prior transfer) must be out of cooldown") + } + cm.recordLeaderTransfer("ns") + if cm.leaderTransferCooldownElapsed("ns") { + t.Error("immediately after a transfer the namespace must be in cooldown") + } + if !cm.leaderTransferCooldownElapsed("other-ns") { + t.Error("cooldown must be per-namespace") + } +} diff --git a/core/pkg/node/gateway.go b/core/pkg/node/gateway.go index 2230bf7..ab5689b 100644 --- a/core/pkg/node/gateway.go +++ b/core/pkg/node/gateway.go @@ -161,6 +161,13 @@ func (n *Node) startHTTPGateway(ctx context.Context) error { zap.String("base_domain", clusterCfg.BaseDomain), zap.String("base_data_dir", baseDataDir)) + // Keep namespace raft leadership on co-located voters (bugboard #708): + // a geography-blind raft election can place leadership on a distant + // node, funneling every write across a ~256ms link into 5-10s RPCs. + // This reconciler hands leadership off an isolated leader to the nearest + // voter — never changing membership (all nodes stay voters). + clusterManager.StartLeaderLocalityReconciler(ctx) + // Restore previously-running namespace cluster processes in background. // First try local state files (no DB dependency), then fall back to DB query with retries. go func() { diff --git a/core/pkg/rqlite/leadership.go b/core/pkg/rqlite/leadership.go index b78a143..bdba1b4 100644 --- a/core/pkg/rqlite/leadership.go +++ b/core/pkg/rqlite/leadership.go @@ -10,53 +10,39 @@ import ( "go.uber.org/zap" ) -// TransferLeadership attempts to transfer Raft leadership to another voter. -// Used by both the RQLiteManager (on Stop) and the CLI (pre-upgrade). -// Returns nil if this node is not the leader or if transfer succeeds. -func TransferLeadership(port int, logger *zap.Logger) error { +// GetRaftStatus queries a local rqlite node's /status endpoint. +func GetRaftStatus(port int) (*RQLiteStatus, error) { client := &http.Client{Timeout: 5 * time.Second} - - // 1. Check if we're the leader - statusURL := fmt.Sprintf("http://localhost:%d/status", port) - resp, err := client.Get(statusURL) + resp, err := client.Get(fmt.Sprintf("http://localhost:%d/status", port)) if err != nil { - return fmt.Errorf("failed to query status: %w", err) + return nil, fmt.Errorf("failed to query status: %w", err) } defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) if err != nil { - return fmt.Errorf("failed to read status: %w", err) + return nil, fmt.Errorf("failed to read status: %w", err) } - var status RQLiteStatus if err := json.Unmarshal(body, &status); err != nil { - return fmt.Errorf("failed to parse status: %w", err) + return nil, fmt.Errorf("failed to parse status: %w", err) } + return &status, nil +} - if status.Store.Raft.State != "Leader" { - logger.Debug("Not the leader, skipping transfer", zap.Int("port", port)) - return nil - } - - logger.Info("This node is the Raft leader, attempting leadership transfer", - zap.Int("port", port), - zap.String("leader_id", status.Store.Raft.LeaderID)) - - // 2. Find an eligible voter to transfer to - nodesURL := fmt.Sprintf("http://localhost:%d/nodes?nonvoters&ver=2&timeout=5s", port) - nodesResp, err := client.Get(nodesURL) +// GetRaftNodes queries a local rqlite node's /nodes endpoint (voters + +// non-voters, with reachability). +func GetRaftNodes(port int) (RQLiteNodes, error) { + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Get(fmt.Sprintf("http://localhost:%d/nodes?nonvoters&ver=2&timeout=5s", port)) if err != nil { - return fmt.Errorf("failed to query nodes: %w", err) + return nil, fmt.Errorf("failed to query nodes: %w", err) } - defer nodesResp.Body.Close() - - nodesBody, err := io.ReadAll(nodesResp.Body) + defer resp.Body.Close() + nodesBody, err := io.ReadAll(resp.Body) if err != nil { - return fmt.Errorf("failed to read nodes: %w", err) + return nil, fmt.Errorf("failed to read nodes: %w", err) } - - // Try ver=2 wrapped format, fall back to plain array + // Try ver=2 wrapped format, fall back to plain array. var nodes RQLiteNodes var wrapped struct { Nodes RQLiteNodes `json:"nodes"` @@ -66,8 +52,28 @@ func TransferLeadership(port int, logger *zap.Logger) error { } else { _ = json.Unmarshal(nodesBody, &nodes) } + return nodes, nil +} - // Find a reachable voter that is NOT us +// TransferLeadership attempts to transfer Raft leadership to another voter. +// Used by both the RQLiteManager (on Stop) and the CLI (pre-upgrade). +// Returns nil if this node is not the leader or if transfer succeeds. +func TransferLeadership(port int, logger *zap.Logger) error { + status, err := GetRaftStatus(port) + if err != nil { + return err + } + if status.Store.Raft.State != "Leader" { + logger.Debug("Not the leader, skipping transfer", zap.Int("port", port)) + return nil + } + + nodes, err := GetRaftNodes(port) + if err != nil { + return err + } + + // Find any reachable voter that is NOT us. var targetID string for _, n := range nodes { if n.Voter && n.Reachable && n.ID != status.Store.Raft.LeaderID { @@ -75,57 +81,55 @@ func TransferLeadership(port int, logger *zap.Logger) error { break } } - if targetID == "" { logger.Warn("No eligible voter found for leadership transfer — will rely on SIGTERM graceful step-down", zap.Int("port", port)) return nil } + return TransferLeadershipTo(port, targetID, logger) +} + +// TransferLeadershipTo transfers Raft leadership to a SPECIFIC target node ID +// (its raft address). The caller is responsible for confirming this node is the +// leader and that targetID is an eligible voter. Tolerant of a missing API +// (404) and a non-OK status — it logs and returns nil so callers treat transfer +// as best-effort. +func TransferLeadershipTo(port int, targetID string, logger *zap.Logger) error { + client := &http.Client{Timeout: 5 * time.Second} + + logger.Info("Attempting Raft leadership transfer", + zap.Int("port", port), zap.String("target", targetID)) - // 3. Attempt transfer via rqlite v8+ API - // POST /nodes//transfer-leadership - // If the API doesn't exist (404), fall back to relying on SIGTERM. transferURL := fmt.Sprintf("http://localhost:%d/nodes/%s/transfer-leadership", port, targetID) transferResp, err := client.Post(transferURL, "application/json", nil) if err != nil { - logger.Warn("Leadership transfer request failed, relying on SIGTERM", - zap.Error(err)) + logger.Warn("Leadership transfer request failed", zap.Error(err)) return nil } transferResp.Body.Close() - if transferResp.StatusCode == http.StatusNotFound { - logger.Info("Leadership transfer API not available (rqlite version), relying on SIGTERM") + switch { + case transferResp.StatusCode == http.StatusNotFound: + logger.Info("Leadership transfer API not available (rqlite version)") return nil - } - - if transferResp.StatusCode != http.StatusOK { + case transferResp.StatusCode != http.StatusOK: logger.Warn("Leadership transfer returned unexpected status", zap.Int("status", transferResp.StatusCode)) return nil } - // 4. Verify transfer + // Verify. time.Sleep(2 * time.Second) - verifyResp, err := client.Get(statusURL) + newStatus, err := GetRaftStatus(port) if err != nil { logger.Info("Could not verify transfer (node may have already stepped down)") return nil } - defer verifyResp.Body.Close() - - verifyBody, _ := io.ReadAll(verifyResp.Body) - var newStatus RQLiteStatus - if err := json.Unmarshal(verifyBody, &newStatus); err == nil { - if newStatus.Store.Raft.State != "Leader" { - logger.Info("Leadership transferred successfully", - zap.String("new_leader", newStatus.Store.Raft.LeaderID), - zap.Int("port", port)) - } else { - logger.Warn("Still leader after transfer attempt — will rely on SIGTERM", - zap.Int("port", port)) - } + if newStatus.Store.Raft.State != "Leader" { + logger.Info("Leadership transferred successfully", + zap.String("new_leader", newStatus.Store.Raft.LeaderID), zap.Int("port", port)) + } else { + logger.Warn("Still leader after transfer attempt", zap.Int("port", port)) } - return nil } diff --git a/core/pkg/serverless/invoke.go b/core/pkg/serverless/invoke.go index 447e932..147dbcd 100644 --- a/core/pkg/serverless/invoke.go +++ b/core/pkg/serverless/invoke.go @@ -118,16 +118,16 @@ func (i *Invoker) Invoke(ctx context.Context, req *InvokeRequest) (*InvokeRespon // #264). The auth boundary for system triggers is at REGISTRATION // time (HTTP `POST /v1/functions/{name}/triggers`, or deploy-time // auto-register from function.yaml), not at firing time. - if !isSystemTrigger(req.TriggerType) { - authorized, err := i.CanInvoke(ctx, req.Namespace, req.FunctionName, req.CallerWallet) - if err != nil || !authorized { - return &InvokeResponse{ - RequestID: requestID, - Status: InvocationStatusError, - Error: "unauthorized", - DurationMS: time.Since(startTime).Milliseconds(), - }, ErrUnauthorized - } + if !isSystemTrigger(req.TriggerType) && !canInvokeFn(fn, req.CallerWallet) { + // Authorization uses the function we already fetched above — + // CanInvoke would re-`registry.Get` it, a redundant leader-routed + // read on every op (bugboard #708). + return &InvokeResponse{ + RequestID: requestID, + Status: InvocationStatusError, + Error: "unauthorized", + DurationMS: time.Since(startTime).Milliseconds(), + }, ErrUnauthorized } // Get environment variables @@ -504,20 +504,19 @@ func (i *Invoker) CanInvoke(ctx context.Context, namespace, functionName string, if err != nil { return false, err } + return canInvokeFn(fn, callerWallet), nil +} - // Public functions can be invoked by anyone (auth middleware allows - // the request through without credentials). +// canInvokeFn is the pure authorization decision for an already-fetched +// function, so the hot Invoke path doesn't re-read the registry (bugboard +// #708). Public functions are open; a private function only requires that the +// caller has SOME identity — the auth middleware already verified namespace +// membership before the function ran. +func canInvokeFn(fn *Function, callerWallet string) bool { if fn.IsPublic { - return true, nil + return true } - - // Private function: require an authenticated caller. The auth - // middleware has already verified the caller belongs to this - // namespace (either via JWT `namespace` claim or via API-key - // namespace lookup) before this function ever runs, so the only - // thing we need to confirm here is that the caller has SOME - // identity at all (i.e. the request wasn't anonymous). - return strings.TrimSpace(callerWallet) != "", nil + return strings.TrimSpace(callerWallet) != "" } // GetFunctionInfo returns basic info about a function for invocation. diff --git a/core/pkg/serverless/registry.go b/core/pkg/serverless/registry.go index e9a9d5b..860e9eb 100644 --- a/core/pkg/serverless/registry.go +++ b/core/pkg/serverless/registry.go @@ -6,7 +6,9 @@ import ( "database/sql" "fmt" "io" + "strconv" "strings" + "sync" "time" "github.com/DeBrosOfficial/network/pkg/ipfs" @@ -15,6 +17,27 @@ import ( "go.uber.org/zap" ) +// registryCacheTTL bounds how long function metadata + env vars are cached +// in-process before re-reading rqlite. Bugboard #708: every function_invoke +// previously did 3 uncached `weak` reads (Get, a redundant Get inside +// CanInvoke, and GetEnvVars), each forwarded to the raft leader — ~820ms of +// pure pre-flight tax per op when the leader is a distant node. With a short +// TTL + explicit invalidation on deploy/enable/disable/delete, a burst of RPCs +// (e.g. a call setup) reads metadata once instead of N times. The TTL is a +// backstop; correctness comes from the explicit invalidation, so cross-node +// propagation of a deploy/disable is bounded to this TTL. +const registryCacheTTL = 5 * time.Second + +type fnCacheEntry struct { + fn *Function + at time.Time +} + +type envCacheEntry struct { + env map[string]string + at time.Time +} + // Ensure Registry implements FunctionRegistry and InvocationLogger interfaces. var _ FunctionRegistry = (*Registry)(nil) var _ InvocationLogger = (*Registry)(nil) @@ -27,6 +50,12 @@ type Registry struct { ipfsAPIURL string logger *zap.Logger tableName string + + // Metadata cache (bugboard #708) — see registryCacheTTL. + cacheTTL time.Duration + cacheMu sync.RWMutex + fnCache map[string]fnCacheEntry // key: namespace\x00name\x00version + envCache map[string]envCacheEntry // key: functionID } // RegistryConfig holds configuration for the Registry. @@ -42,9 +71,78 @@ func NewRegistry(db rqlite.Client, ipfsClient ipfs.IPFSClient, cfg RegistryConfi ipfsAPIURL: cfg.IPFSAPIURL, logger: logger, tableName: "functions", + cacheTTL: registryCacheTTL, + fnCache: make(map[string]fnCacheEntry), + envCache: make(map[string]envCacheEntry), } } +// --- metadata cache (bugboard #708) --- +// +// The cached *Function and env map are SHARED with all callers and MUST be +// treated as read-only — no consumer in pkg/serverless mutates them today, and +// none may, or it would corrupt the cache for concurrent readers. + +func fnCacheKey(namespace, name string, version int) string { + return namespace + "\x00" + name + "\x00" + strconv.Itoa(version) +} + +func (r *Registry) cachedFn(key string) (*Function, bool) { + r.cacheMu.RLock() + e, ok := r.fnCache[key] + r.cacheMu.RUnlock() + if !ok || time.Since(e.at) > r.cacheTTL { + return nil, false + } + return e.fn, true +} + +func (r *Registry) storeFn(key string, fn *Function) { + r.cacheMu.Lock() + r.fnCache[key] = fnCacheEntry{fn: fn, at: time.Now()} + r.cacheMu.Unlock() +} + +func (r *Registry) cachedEnv(functionID string) (map[string]string, bool) { + r.cacheMu.RLock() + e, ok := r.envCache[functionID] + r.cacheMu.RUnlock() + if !ok || time.Since(e.at) > r.cacheTTL { + return nil, false + } + return e.env, true +} + +func (r *Registry) storeEnv(functionID string, env map[string]string) { + r.cacheMu.Lock() + r.envCache[functionID] = envCacheEntry{env: env, at: time.Now()} + r.cacheMu.Unlock() +} + +// invalidateFn drops every cached version of (namespace, name). Called on +// deploy/enable/disable/delete so a metadata change is never masked by the +// cache beyond the write itself. +func (r *Registry) invalidateFn(namespace, name string) { + prefix := strings.TrimSpace(namespace) + "\x00" + strings.TrimSpace(name) + "\x00" + r.cacheMu.Lock() + for k := range r.fnCache { + if strings.HasPrefix(k, prefix) { + delete(r.fnCache, k) + } + } + r.cacheMu.Unlock() +} + +// invalidateEnv drops the cached env vars for a function ID. A redeploy REUSES +// the existing function ID (Register: id = oldFn.ID) and rewrites env vars +// under it, so without this an env-var change would be masked by the cache for +// up to the TTL. +func (r *Registry) invalidateEnv(functionID string) { + r.cacheMu.Lock() + delete(r.envCache, functionID) + r.cacheMu.Unlock() +} + // Register deploys a new function or updates an existing one. func (r *Registry) Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) (*Function, error) { if fn == nil { @@ -128,6 +226,9 @@ func (r *Registry) Register(ctx context.Context, fn *FunctionDefinition, wasmByt return nil, &DeployError{FunctionName: fn.Name, Cause: err} } + r.invalidateFn(fn.Namespace, fn.Name) + r.invalidateEnv(id) + r.logger.Info("Function registered", zap.String("id", id), zap.String("name", fn.Name), @@ -146,6 +247,12 @@ func (r *Registry) Get(ctx context.Context, namespace, name string, version int) namespace = strings.TrimSpace(namespace) name = strings.TrimSpace(name) + // Cache hit (bugboard #708): skip the leader-routed weak read entirely. + cacheKey := fnCacheKey(namespace, name, version) + if fn, ok := r.cachedFn(cacheKey); ok { + return fn, nil + } + var query string var args []interface{} @@ -184,13 +291,17 @@ func (r *Registry) Get(ctx context.Context, namespace, name string, version int) } if len(functions) == 0 { + // Do NOT cache misses — a just-deployed function must be visible + // immediately on the next call, not after the TTL. if version == 0 { return nil, ErrFunctionNotFound } return nil, ErrVersionNotFound } - return r.rowToFunction(&functions[0]), nil + fn := r.rowToFunction(&functions[0]) + r.storeFn(cacheKey, fn) + return fn, nil } // List returns all functions for a namespace. @@ -252,6 +363,7 @@ func (r *Registry) SetEnabled(ctx context.Context, namespace, name string, enabl if rowsAffected == 0 { return ErrFunctionNotFound } + r.invalidateFn(namespace, name) r.logger.Info("Function enabled-state updated", zap.String("namespace", namespace), zap.String("name", name), @@ -290,6 +402,8 @@ func (r *Registry) Delete(ctx context.Context, namespace, name string, version i return ErrVersionNotFound } + r.invalidateFn(namespace, name) + r.logger.Info("Function deleted", zap.String("namespace", namespace), zap.String("name", name), @@ -321,6 +435,10 @@ func (r *Registry) GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, er // GetEnvVars retrieves environment variables for a function. func (r *Registry) GetEnvVars(ctx context.Context, functionID string) (map[string]string, error) { + if env, ok := r.cachedEnv(functionID); ok { + return env, nil + } + query := `SELECT key, value FROM function_env_vars WHERE function_id = ?` var rows []envVarRow @@ -333,6 +451,7 @@ func (r *Registry) GetEnvVars(ctx context.Context, functionID string) (map[strin envVars[row.Key] = row.Value } + r.storeEnv(functionID, envVars) return envVars, nil } diff --git a/core/pkg/serverless/registry_cache_test.go b/core/pkg/serverless/registry_cache_test.go new file mode 100644 index 0000000..02b40d4 --- /dev/null +++ b/core/pkg/serverless/registry_cache_test.go @@ -0,0 +1,118 @@ +package serverless + +import ( + "testing" + "time" + + "go.uber.org/zap" +) + +// Bugboard #708 — function metadata + env vars are cached in-process so a burst +// of invokes doesn't pay a leader-routed weak read per op. These pin the cache +// hit/miss/TTL/invalidation behavior and the dedup'd authorization decision. + +func newTestRegistry() *Registry { + return NewRegistry(NewMockRQLite(), NewMockIPFSClient(), RegistryConfig{}, zap.NewNop()) +} + +func TestRegistryCache_hitAndInvalidate(t *testing.T) { + r := newTestRegistry() + key := fnCacheKey("ns", "fn", 0) + fn := &Function{ID: "id-1", Name: "fn", Namespace: "ns"} + + if _, ok := r.cachedFn(key); ok { + t.Fatal("empty cache must miss") + } + r.storeFn(key, fn) + got, ok := r.cachedFn(key) + if !ok || got != fn { + t.Fatalf("expected cache hit returning the stored fn; ok=%v got=%v", ok, got) + } + + // Deploy/enable/disable/delete must drop every cached version. + r.storeFn(fnCacheKey("ns", "fn", 3), &Function{ID: "id-3", Name: "fn", Namespace: "ns"}) + r.invalidateFn("ns", "fn") + if _, ok := r.cachedFn(key); ok { + t.Error("invalidateFn must drop the version-0 entry") + } + if _, ok := r.cachedFn(fnCacheKey("ns", "fn", 3)); ok { + t.Error("invalidateFn must drop ALL versions of the function") + } +} + +func TestRegistryCache_invalidateScopedToFunction(t *testing.T) { + r := newTestRegistry() + r.storeFn(fnCacheKey("ns", "keep", 0), &Function{ID: "k", Name: "keep", Namespace: "ns"}) + r.storeFn(fnCacheKey("ns", "drop", 0), &Function{ID: "d", Name: "drop", Namespace: "ns"}) + + r.invalidateFn("ns", "drop") + + if _, ok := r.cachedFn(fnCacheKey("ns", "drop", 0)); ok { + t.Error("target function must be invalidated") + } + if _, ok := r.cachedFn(fnCacheKey("ns", "keep", 0)); !ok { + t.Error("a DIFFERENT function must NOT be invalidated (prefix must include the null separator)") + } +} + +func TestRegistryCache_ttlExpiry(t *testing.T) { + r := newTestRegistry() + key := fnCacheKey("ns", "fn", 0) + // Backdate the entry beyond the TTL. + r.fnCache[key] = fnCacheEntry{fn: &Function{ID: "x"}, at: time.Now().Add(-2 * r.cacheTTL)} + if _, ok := r.cachedFn(key); ok { + t.Error("an entry older than the TTL must be treated as a miss") + } +} + +func TestRegistryCache_envHitAndTTL(t *testing.T) { + r := newTestRegistry() + if _, ok := r.cachedEnv("fid"); ok { + t.Fatal("empty env cache must miss") + } + r.storeEnv("fid", map[string]string{"K": "V"}) + if env, ok := r.cachedEnv("fid"); !ok || env["K"] != "V" { + t.Fatalf("expected env cache hit; ok=%v env=%v", ok, env) + } + r.envCache["fid"] = envCacheEntry{env: map[string]string{"K": "V"}, at: time.Now().Add(-2 * r.cacheTTL)} + if _, ok := r.cachedEnv("fid"); ok { + t.Error("env entry older than the TTL must miss") + } +} + +func TestRegistryCache_envInvalidatedOnRedeploy(t *testing.T) { + // A redeploy REUSES the function ID (Register: id = oldFn.ID) and rewrites + // env vars under it, so Register must drop the env cache for that ID — else + // a changed env var (e.g. a rotated endpoint) is masked for up to the TTL. + r := newTestRegistry() + r.storeEnv("fid", map[string]string{"K": "old"}) + if env, ok := r.cachedEnv("fid"); !ok || env["K"] != "old" { + t.Fatal("precondition: env should be cached") + } + r.invalidateEnv("fid") // what Register now calls + if _, ok := r.cachedEnv("fid"); ok { + t.Error("env cache must be invalidated on redeploy (reused ID); a changed env var must not be served stale") + } +} + +func TestRegistryCache_keyDistinctNoCollision(t *testing.T) { + // Guard the null-separated key: "a"+"bc" must not collide with "ab"+"c". + if fnCacheKey("a", "bc", 0) == fnCacheKey("ab", "c", 0) { + t.Error("cache keys must not collide across namespace/name boundaries") + } +} + +func TestCanInvokeFn(t *testing.T) { + if !canInvokeFn(&Function{IsPublic: true}, "") { + t.Error("public function must be invokable by an anonymous caller") + } + if canInvokeFn(&Function{IsPublic: false}, "") { + t.Error("private function must reject an empty (anonymous) caller") + } + if canInvokeFn(&Function{IsPublic: false}, " ") { + t.Error("private function must reject a whitespace-only caller") + } + if !canInvokeFn(&Function{IsPublic: false}, "wallet-abc") { + t.Error("private function must accept an identified caller") + } +}