feat(serverless,namespace): cut namespace gateway RPC latency (#708)

The 5-10s RPCs that broke calling were not cold-start — they were
per-RPC sequential rqlite reads, each forwarded to a raft leader that
geography-blind election had placed on a 256ms-distant node.

Lever A (serverless): cache function metadata + env vars in-process
(5s TTL, invalidated on deploy/enable/disable/delete) and stop the hot
invoke path re-fetching the function for the authorization check —
removes ~820ms of leader-routed pre-flight reads from every op.

Lever B (namespace): a locality-aware leadership reconciler hands raft
leadership off a geographically-isolated namespace leader to the nearest
co-located voter, via rqlite's transfer-leadership API. All nodes stay
voters — membership, quorum and fault tolerance are unchanged. Cuts the
per-hop cost from ~274ms to ~20ms when a distant node had become leader.
This commit is contained in:
anonpenguin23 2026-06-15 08:05:38 +03:00
parent 61635c4ce7
commit 9c213a166c
8 changed files with 640 additions and 82 deletions

View File

@ -86,6 +86,11 @@ type ClusterManager struct {
// Track provisioning operations
provisioningMu sync.RWMutex
provisioning map[string]bool // namespace -> in progress
// Leadership-locality reconciler cooldown (bugboard #708): per-namespace
// timestamp of the last leadership transfer, to bound churn. Lazy-init.
leaderLocalityMu sync.Mutex
leaderLocalityCooldown map[string]time.Time
}
// NewClusterManager creates a new cluster manager

View File

@ -0,0 +1,213 @@
package namespace
import (
"context"
"net"
"path/filepath"
"time"
"github.com/DeBrosOfficial/network/pkg/rqlite"
"go.uber.org/zap"
)
// Bugboard #708 — namespace raft leadership is geography-blind: the initial
// leader is sortedNodeIDs[0] over random libp2p peer IDs, and raft re-elects
// freely on every restart. When a geographically-distant node (high WireGuard
// RTT to its peers) becomes the leader, EVERY namespace write funnels through
// the distant node and waits on its cross-region replication for quorum — each
// rqlite hop jumps from ~20ms (co-located) to ~256ms, stacking into 5-10s RPCs
// that break calling.
//
// This reconciler keeps namespace leadership on a co-located voter. It NEVER
// removes a node or changes voter membership — all nodes stay voters (quorum
// and fault tolerance unchanged). It only hands leadership OFF a node that is
// isolated from the rest of the cluster, using rqlite's own
// transfer-leadership API.
const (
// leaderLocalityInterval is how often each node checks whether the
// namespace clusters it leads are well-placed.
leaderLocalityInterval = 90 * time.Second
// leaderLocalityRTTThreshold: if the leader's CLOSEST voter peer is farther
// than this, the leader is treated as geographically isolated and hands off
// leadership. Co-located nodes are ~20ms apart; a distant node is ~256ms —
// 100ms cleanly separates the two without false positives.
leaderLocalityRTTThreshold = 100 * time.Millisecond
// leaderLocalityCooldown bounds how often a single namespace's leadership
// is moved. In the common topology (a lone distant node among co-located
// peers) ONE transfer settles leadership on a co-located voter, which then
// stays (it has a nearby peer, so it never re-triggers). In a pathological
// all-mutually-distant topology there is no good leader to move to and the
// nearest-peer transfer would rotate; the cooldown caps that to roughly one
// transfer per node per window (bounded, non-destructive — membership and
// quorum are never touched), and node selection clustering most nodes
// ~20ms apart makes that case rare.
leaderLocalityCooldown = 10 * time.Minute
// leaderLocalityDialTimeout bounds each per-peer RTT probe.
leaderLocalityDialTimeout = 3 * time.Second
)
// decideLeadershipTransfer is the pure decision: should the local leader hand
// off leadership, and to which voter? peerRTTs maps each OTHER reachable voter's
// raft address → measured RTT. Returns a target and true ONLY when this node is
// the leader, every voter is reachable (don't destabilize an already-degraded
// cluster), the cooldown has elapsed, and even the CLOSEST peer is farther than
// `threshold` — i.e. the leader is isolated. If the leader has at least one
// nearby voter it is central enough; leave it. The chosen target is the nearest
// reachable peer (which, in a 1-distant/N-close topology, is a co-located node
// that will then have a nearby peer of its own → stable).
func decideLeadershipTransfer(isLeader, allVotersReachable, cooldownElapsed bool, peerRTTs map[string]time.Duration, threshold time.Duration) (string, bool) {
if !isLeader || !allVotersReachable || !cooldownElapsed || len(peerRTTs) == 0 {
return "", false
}
var bestAddr string
var bestRTT time.Duration
for addr, rtt := range peerRTTs {
if bestAddr == "" || rtt < bestRTT {
bestAddr, bestRTT = addr, rtt
}
}
if bestRTT > threshold {
return bestAddr, true
}
return "", false
}
// measurePeerRTTs probes every OTHER voter's raft address and returns their
// RTTs plus whether ALL voters were reachable+measurable (so the caller can
// refuse to act on a degraded cluster). Non-voters and self are skipped.
func measurePeerRTTs(nodes rqlite.RQLiteNodes, selfID string) (map[string]time.Duration, bool) {
peerRTTs := make(map[string]time.Duration)
allReachable := true
for _, n := range nodes {
if !n.Voter || n.ID == selfID {
continue
}
if !n.Reachable {
allReachable = false
continue
}
dialAddr := n.Address
if dialAddr == "" {
dialAddr = n.ID
}
rtt, derr := measureRaftRTT(dialAddr, leaderLocalityDialTimeout)
if derr != nil {
allReachable = false
continue
}
peerRTTs[n.ID] = rtt
}
return peerRTTs, allReachable
}
// measureRaftRTT returns the TCP-connect time to a peer's raft address — a
// privilege-free proxy for WireGuard round-trip latency.
func measureRaftRTT(raftAddr string, timeout time.Duration) (time.Duration, error) {
start := time.Now()
conn, err := net.DialTimeout("tcp", raftAddr, timeout)
if err != nil {
return 0, err
}
_ = conn.Close()
return time.Since(start), nil
}
func (cm *ClusterManager) leaderTransferCooldownElapsed(namespace string) bool {
cm.leaderLocalityMu.Lock()
defer cm.leaderLocalityMu.Unlock()
last, ok := cm.leaderLocalityCooldown[namespace]
return !ok || time.Since(last) >= leaderLocalityCooldown
}
func (cm *ClusterManager) recordLeaderTransfer(namespace string) {
cm.leaderLocalityMu.Lock()
defer cm.leaderLocalityMu.Unlock()
if cm.leaderLocalityCooldown == nil {
cm.leaderLocalityCooldown = make(map[string]time.Time)
}
cm.leaderLocalityCooldown[namespace] = time.Now()
}
// StartLeaderLocalityReconciler runs the periodic leadership-locality check
// until ctx is cancelled. Safe to call once at node boot.
func (cm *ClusterManager) StartLeaderLocalityReconciler(ctx context.Context) {
go func() {
ticker := time.NewTicker(leaderLocalityInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
cm.reconcileLeaderLocality(ctx)
}
}
}()
}
// reconcileLeaderLocality checks every namespace cluster this node hosts and,
// for any it currently leads from an isolated position, transfers leadership to
// the nearest co-located voter.
func (cm *ClusterManager) reconcileLeaderLocality(ctx context.Context) {
pattern := filepath.Join(cm.baseDataDir, "*", "cluster-state.json")
matches, err := filepath.Glob(pattern)
if err != nil {
cm.logger.Debug("leader-locality: glob failed", zap.Error(err))
return
}
for _, path := range matches {
if ctx.Err() != nil {
return
}
state, err := loadLocalState(path)
if err != nil {
continue
}
cm.reconcileNamespaceLeader(state.NamespaceName, state.LocalPorts.RQLiteHTTPPort)
}
}
// reconcileNamespaceLeader handles a single namespace's leadership locality.
func (cm *ClusterManager) reconcileNamespaceLeader(namespace string, rqliteHTTPPort int) {
if rqliteHTTPPort == 0 {
return
}
status, err := rqlite.GetRaftStatus(rqliteHTTPPort)
if err != nil {
// rqlite not up / not reachable on this node — nothing to do.
return
}
if status.Store.Raft.State != "Leader" {
return // only the leader can transfer leadership away
}
selfID := status.Store.Raft.LeaderID
nodes, err := rqlite.GetRaftNodes(rqliteHTTPPort)
if err != nil {
return
}
peerRTTs, allVotersReachable := measurePeerRTTs(nodes, selfID)
target, transfer := decideLeadershipTransfer(
true, allVotersReachable, cm.leaderTransferCooldownElapsed(namespace),
peerRTTs, leaderLocalityRTTThreshold,
)
if !transfer {
return
}
cm.logger.Info("leader-locality: this node is an isolated namespace raft leader — transferring leadership to a co-located voter (bugboard #708)",
zap.String("namespace", namespace),
zap.String("from", selfID),
zap.String("to", target),
zap.Duration("target_rtt", peerRTTs[target]),
)
// Record the cooldown BEFORE the transfer so a slow/looping transfer can't
// re-fire on the next tick regardless of outcome.
cm.recordLeaderTransfer(namespace)
if err := rqlite.TransferLeadershipTo(rqliteHTTPPort, target, cm.logger); err != nil {
cm.logger.Warn("leader-locality: leadership transfer failed",
zap.String("namespace", namespace), zap.Error(err))
}
}

View File

@ -0,0 +1,93 @@
package namespace
import (
"testing"
"time"
)
// Bugboard #708 — the leadership-locality reconciler hands leadership off a
// geographically-isolated namespace raft leader to the nearest co-located
// voter, without changing membership. These pin the decision logic.
const thr = 100 * time.Millisecond
func TestDecideLeadershipTransfer_isolatedLeaderTransfersToNearest(t *testing.T) {
// Distant leader (109): both peers are far. Transfer to the NEAREST (57 @235ms).
peers := map[string]time.Duration{
"10.0.0.6:10001": 256 * time.Millisecond, // 51
"10.0.0.1:10001": 235 * time.Millisecond, // 57
}
target, transfer := decideLeadershipTransfer(true, true, true, peers, thr)
if !transfer {
t.Fatal("an isolated leader (closest peer 235ms > 100ms) must transfer")
}
if target != "10.0.0.1:10001" {
t.Errorf("must transfer to the NEAREST peer; got %q", target)
}
}
func TestDecideLeadershipTransfer_centralLeaderStays(t *testing.T) {
// Co-located leader (51): has a nearby peer (57 @20ms) and a distant one (109).
// min RTT 20ms < 100ms → leader is central → NO transfer (the correct steady state).
peers := map[string]time.Duration{
"10.0.0.1:10001": 20 * time.Millisecond, // 57 (close)
"10.0.0.11:10001": 256 * time.Millisecond, // 109 (far)
}
if _, transfer := decideLeadershipTransfer(true, true, true, peers, thr); transfer {
t.Error("a leader with a nearby voter is central enough; must NOT transfer")
}
}
func TestDecideLeadershipTransfer_allDistantTransfersToNearest(t *testing.T) {
// Pathological all-mutually-distant topology: every peer is far, so there is
// no truly co-located target. The reconciler still moves to the NEAREST
// (best available); the per-namespace cooldown (TestLeaderTransferCooldown)
// is what bounds the resulting churn to ~one transfer per node per window.
peers := map[string]time.Duration{
"a": 250 * time.Millisecond,
"b": 210 * time.Millisecond,
}
target, transfer := decideLeadershipTransfer(true, true, true, peers, thr)
if !transfer || target != "b" {
t.Errorf("all-distant: expected transfer to nearest 'b'; got transfer=%v target=%q", transfer, target)
}
}
func TestDecideLeadershipTransfer_guards(t *testing.T) {
farPeers := map[string]time.Duration{"p": 300 * time.Millisecond}
if _, transfer := decideLeadershipTransfer(false, true, true, farPeers, thr); transfer {
t.Error("non-leader must never transfer")
}
if _, transfer := decideLeadershipTransfer(true, false, true, farPeers, thr); transfer {
t.Error("must not transfer when a voter is unreachable (degraded cluster)")
}
if _, transfer := decideLeadershipTransfer(true, true, false, farPeers, thr); transfer {
t.Error("must not transfer during cooldown")
}
if _, transfer := decideLeadershipTransfer(true, true, true, map[string]time.Duration{}, thr); transfer {
t.Error("must not transfer with no measurable peers (single-node / all-unreachable)")
}
}
func TestDecideLeadershipTransfer_exactlyThresholdStays(t *testing.T) {
// Closest peer exactly at the threshold is NOT > threshold → stay (no churn at the boundary).
peers := map[string]time.Duration{"p": thr}
if _, transfer := decideLeadershipTransfer(true, true, true, peers, thr); transfer {
t.Error("RTT exactly at the threshold must not trigger a transfer")
}
}
func TestLeaderTransferCooldown(t *testing.T) {
cm := &ClusterManager{}
if !cm.leaderTransferCooldownElapsed("ns") {
t.Error("fresh namespace (no prior transfer) must be out of cooldown")
}
cm.recordLeaderTransfer("ns")
if cm.leaderTransferCooldownElapsed("ns") {
t.Error("immediately after a transfer the namespace must be in cooldown")
}
if !cm.leaderTransferCooldownElapsed("other-ns") {
t.Error("cooldown must be per-namespace")
}
}

View File

@ -161,6 +161,13 @@ func (n *Node) startHTTPGateway(ctx context.Context) error {
zap.String("base_domain", clusterCfg.BaseDomain),
zap.String("base_data_dir", baseDataDir))
// Keep namespace raft leadership on co-located voters (bugboard #708):
// a geography-blind raft election can place leadership on a distant
// node, funneling every write across a ~256ms link into 5-10s RPCs.
// This reconciler hands leadership off an isolated leader to the nearest
// voter — never changing membership (all nodes stay voters).
clusterManager.StartLeaderLocalityReconciler(ctx)
// Restore previously-running namespace cluster processes in background.
// First try local state files (no DB dependency), then fall back to DB query with retries.
go func() {

View File

@ -10,53 +10,39 @@ import (
"go.uber.org/zap"
)
// TransferLeadership attempts to transfer Raft leadership to another voter.
// Used by both the RQLiteManager (on Stop) and the CLI (pre-upgrade).
// Returns nil if this node is not the leader or if transfer succeeds.
func TransferLeadership(port int, logger *zap.Logger) error {
// GetRaftStatus queries a local rqlite node's /status endpoint.
func GetRaftStatus(port int) (*RQLiteStatus, error) {
client := &http.Client{Timeout: 5 * time.Second}
// 1. Check if we're the leader
statusURL := fmt.Sprintf("http://localhost:%d/status", port)
resp, err := client.Get(statusURL)
resp, err := client.Get(fmt.Sprintf("http://localhost:%d/status", port))
if err != nil {
return fmt.Errorf("failed to query status: %w", err)
return nil, fmt.Errorf("failed to query status: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read status: %w", err)
return nil, fmt.Errorf("failed to read status: %w", err)
}
var status RQLiteStatus
if err := json.Unmarshal(body, &status); err != nil {
return fmt.Errorf("failed to parse status: %w", err)
return nil, fmt.Errorf("failed to parse status: %w", err)
}
return &status, nil
}
if status.Store.Raft.State != "Leader" {
logger.Debug("Not the leader, skipping transfer", zap.Int("port", port))
return nil
}
logger.Info("This node is the Raft leader, attempting leadership transfer",
zap.Int("port", port),
zap.String("leader_id", status.Store.Raft.LeaderID))
// 2. Find an eligible voter to transfer to
nodesURL := fmt.Sprintf("http://localhost:%d/nodes?nonvoters&ver=2&timeout=5s", port)
nodesResp, err := client.Get(nodesURL)
// GetRaftNodes queries a local rqlite node's /nodes endpoint (voters +
// non-voters, with reachability).
func GetRaftNodes(port int) (RQLiteNodes, error) {
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Get(fmt.Sprintf("http://localhost:%d/nodes?nonvoters&ver=2&timeout=5s", port))
if err != nil {
return fmt.Errorf("failed to query nodes: %w", err)
return nil, fmt.Errorf("failed to query nodes: %w", err)
}
defer nodesResp.Body.Close()
nodesBody, err := io.ReadAll(nodesResp.Body)
defer resp.Body.Close()
nodesBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read nodes: %w", err)
return nil, fmt.Errorf("failed to read nodes: %w", err)
}
// Try ver=2 wrapped format, fall back to plain array
// Try ver=2 wrapped format, fall back to plain array.
var nodes RQLiteNodes
var wrapped struct {
Nodes RQLiteNodes `json:"nodes"`
@ -66,8 +52,28 @@ func TransferLeadership(port int, logger *zap.Logger) error {
} else {
_ = json.Unmarshal(nodesBody, &nodes)
}
return nodes, nil
}
// Find a reachable voter that is NOT us
// TransferLeadership attempts to transfer Raft leadership to another voter.
// Used by both the RQLiteManager (on Stop) and the CLI (pre-upgrade).
// Returns nil if this node is not the leader or if transfer succeeds.
func TransferLeadership(port int, logger *zap.Logger) error {
status, err := GetRaftStatus(port)
if err != nil {
return err
}
if status.Store.Raft.State != "Leader" {
logger.Debug("Not the leader, skipping transfer", zap.Int("port", port))
return nil
}
nodes, err := GetRaftNodes(port)
if err != nil {
return err
}
// Find any reachable voter that is NOT us.
var targetID string
for _, n := range nodes {
if n.Voter && n.Reachable && n.ID != status.Store.Raft.LeaderID {
@ -75,57 +81,55 @@ func TransferLeadership(port int, logger *zap.Logger) error {
break
}
}
if targetID == "" {
logger.Warn("No eligible voter found for leadership transfer — will rely on SIGTERM graceful step-down",
zap.Int("port", port))
return nil
}
return TransferLeadershipTo(port, targetID, logger)
}
// TransferLeadershipTo transfers Raft leadership to a SPECIFIC target node ID
// (its raft address). The caller is responsible for confirming this node is the
// leader and that targetID is an eligible voter. Tolerant of a missing API
// (404) and a non-OK status — it logs and returns nil so callers treat transfer
// as best-effort.
func TransferLeadershipTo(port int, targetID string, logger *zap.Logger) error {
client := &http.Client{Timeout: 5 * time.Second}
logger.Info("Attempting Raft leadership transfer",
zap.Int("port", port), zap.String("target", targetID))
// 3. Attempt transfer via rqlite v8+ API
// POST /nodes/<target>/transfer-leadership
// If the API doesn't exist (404), fall back to relying on SIGTERM.
transferURL := fmt.Sprintf("http://localhost:%d/nodes/%s/transfer-leadership", port, targetID)
transferResp, err := client.Post(transferURL, "application/json", nil)
if err != nil {
logger.Warn("Leadership transfer request failed, relying on SIGTERM",
zap.Error(err))
logger.Warn("Leadership transfer request failed", zap.Error(err))
return nil
}
transferResp.Body.Close()
if transferResp.StatusCode == http.StatusNotFound {
logger.Info("Leadership transfer API not available (rqlite version), relying on SIGTERM")
switch {
case transferResp.StatusCode == http.StatusNotFound:
logger.Info("Leadership transfer API not available (rqlite version)")
return nil
}
if transferResp.StatusCode != http.StatusOK {
case transferResp.StatusCode != http.StatusOK:
logger.Warn("Leadership transfer returned unexpected status",
zap.Int("status", transferResp.StatusCode))
return nil
}
// 4. Verify transfer
// Verify.
time.Sleep(2 * time.Second)
verifyResp, err := client.Get(statusURL)
newStatus, err := GetRaftStatus(port)
if err != nil {
logger.Info("Could not verify transfer (node may have already stepped down)")
return nil
}
defer verifyResp.Body.Close()
verifyBody, _ := io.ReadAll(verifyResp.Body)
var newStatus RQLiteStatus
if err := json.Unmarshal(verifyBody, &newStatus); err == nil {
if newStatus.Store.Raft.State != "Leader" {
logger.Info("Leadership transferred successfully",
zap.String("new_leader", newStatus.Store.Raft.LeaderID),
zap.Int("port", port))
} else {
logger.Warn("Still leader after transfer attempt — will rely on SIGTERM",
zap.Int("port", port))
}
if newStatus.Store.Raft.State != "Leader" {
logger.Info("Leadership transferred successfully",
zap.String("new_leader", newStatus.Store.Raft.LeaderID), zap.Int("port", port))
} else {
logger.Warn("Still leader after transfer attempt", zap.Int("port", port))
}
return nil
}

View File

@ -118,16 +118,16 @@ func (i *Invoker) Invoke(ctx context.Context, req *InvokeRequest) (*InvokeRespon
// #264). The auth boundary for system triggers is at REGISTRATION
// time (HTTP `POST /v1/functions/{name}/triggers`, or deploy-time
// auto-register from function.yaml), not at firing time.
if !isSystemTrigger(req.TriggerType) {
authorized, err := i.CanInvoke(ctx, req.Namespace, req.FunctionName, req.CallerWallet)
if err != nil || !authorized {
return &InvokeResponse{
RequestID: requestID,
Status: InvocationStatusError,
Error: "unauthorized",
DurationMS: time.Since(startTime).Milliseconds(),
}, ErrUnauthorized
}
if !isSystemTrigger(req.TriggerType) && !canInvokeFn(fn, req.CallerWallet) {
// Authorization uses the function we already fetched above —
// CanInvoke would re-`registry.Get` it, a redundant leader-routed
// read on every op (bugboard #708).
return &InvokeResponse{
RequestID: requestID,
Status: InvocationStatusError,
Error: "unauthorized",
DurationMS: time.Since(startTime).Milliseconds(),
}, ErrUnauthorized
}
// Get environment variables
@ -504,20 +504,19 @@ func (i *Invoker) CanInvoke(ctx context.Context, namespace, functionName string,
if err != nil {
return false, err
}
return canInvokeFn(fn, callerWallet), nil
}
// Public functions can be invoked by anyone (auth middleware allows
// the request through without credentials).
// canInvokeFn is the pure authorization decision for an already-fetched
// function, so the hot Invoke path doesn't re-read the registry (bugboard
// #708). Public functions are open; a private function only requires that the
// caller has SOME identity — the auth middleware already verified namespace
// membership before the function ran.
func canInvokeFn(fn *Function, callerWallet string) bool {
if fn.IsPublic {
return true, nil
return true
}
// Private function: require an authenticated caller. The auth
// middleware has already verified the caller belongs to this
// namespace (either via JWT `namespace` claim or via API-key
// namespace lookup) before this function ever runs, so the only
// thing we need to confirm here is that the caller has SOME
// identity at all (i.e. the request wasn't anonymous).
return strings.TrimSpace(callerWallet) != "", nil
return strings.TrimSpace(callerWallet) != ""
}
// GetFunctionInfo returns basic info about a function for invocation.

View File

@ -6,7 +6,9 @@ import (
"database/sql"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
"github.com/DeBrosOfficial/network/pkg/ipfs"
@ -15,6 +17,27 @@ import (
"go.uber.org/zap"
)
// registryCacheTTL bounds how long function metadata + env vars are cached
// in-process before re-reading rqlite. Bugboard #708: every function_invoke
// previously did 3 uncached `weak` reads (Get, a redundant Get inside
// CanInvoke, and GetEnvVars), each forwarded to the raft leader — ~820ms of
// pure pre-flight tax per op when the leader is a distant node. With a short
// TTL + explicit invalidation on deploy/enable/disable/delete, a burst of RPCs
// (e.g. a call setup) reads metadata once instead of N times. The TTL is a
// backstop; correctness comes from the explicit invalidation, so cross-node
// propagation of a deploy/disable is bounded to this TTL.
const registryCacheTTL = 5 * time.Second
type fnCacheEntry struct {
fn *Function
at time.Time
}
type envCacheEntry struct {
env map[string]string
at time.Time
}
// Ensure Registry implements FunctionRegistry and InvocationLogger interfaces.
var _ FunctionRegistry = (*Registry)(nil)
var _ InvocationLogger = (*Registry)(nil)
@ -27,6 +50,12 @@ type Registry struct {
ipfsAPIURL string
logger *zap.Logger
tableName string
// Metadata cache (bugboard #708) — see registryCacheTTL.
cacheTTL time.Duration
cacheMu sync.RWMutex
fnCache map[string]fnCacheEntry // key: namespace\x00name\x00version
envCache map[string]envCacheEntry // key: functionID
}
// RegistryConfig holds configuration for the Registry.
@ -42,9 +71,78 @@ func NewRegistry(db rqlite.Client, ipfsClient ipfs.IPFSClient, cfg RegistryConfi
ipfsAPIURL: cfg.IPFSAPIURL,
logger: logger,
tableName: "functions",
cacheTTL: registryCacheTTL,
fnCache: make(map[string]fnCacheEntry),
envCache: make(map[string]envCacheEntry),
}
}
// --- metadata cache (bugboard #708) ---
//
// The cached *Function and env map are SHARED with all callers and MUST be
// treated as read-only — no consumer in pkg/serverless mutates them today, and
// none may, or it would corrupt the cache for concurrent readers.
func fnCacheKey(namespace, name string, version int) string {
return namespace + "\x00" + name + "\x00" + strconv.Itoa(version)
}
func (r *Registry) cachedFn(key string) (*Function, bool) {
r.cacheMu.RLock()
e, ok := r.fnCache[key]
r.cacheMu.RUnlock()
if !ok || time.Since(e.at) > r.cacheTTL {
return nil, false
}
return e.fn, true
}
func (r *Registry) storeFn(key string, fn *Function) {
r.cacheMu.Lock()
r.fnCache[key] = fnCacheEntry{fn: fn, at: time.Now()}
r.cacheMu.Unlock()
}
func (r *Registry) cachedEnv(functionID string) (map[string]string, bool) {
r.cacheMu.RLock()
e, ok := r.envCache[functionID]
r.cacheMu.RUnlock()
if !ok || time.Since(e.at) > r.cacheTTL {
return nil, false
}
return e.env, true
}
func (r *Registry) storeEnv(functionID string, env map[string]string) {
r.cacheMu.Lock()
r.envCache[functionID] = envCacheEntry{env: env, at: time.Now()}
r.cacheMu.Unlock()
}
// invalidateFn drops every cached version of (namespace, name). Called on
// deploy/enable/disable/delete so a metadata change is never masked by the
// cache beyond the write itself.
func (r *Registry) invalidateFn(namespace, name string) {
prefix := strings.TrimSpace(namespace) + "\x00" + strings.TrimSpace(name) + "\x00"
r.cacheMu.Lock()
for k := range r.fnCache {
if strings.HasPrefix(k, prefix) {
delete(r.fnCache, k)
}
}
r.cacheMu.Unlock()
}
// invalidateEnv drops the cached env vars for a function ID. A redeploy REUSES
// the existing function ID (Register: id = oldFn.ID) and rewrites env vars
// under it, so without this an env-var change would be masked by the cache for
// up to the TTL.
func (r *Registry) invalidateEnv(functionID string) {
r.cacheMu.Lock()
delete(r.envCache, functionID)
r.cacheMu.Unlock()
}
// Register deploys a new function or updates an existing one.
func (r *Registry) Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) (*Function, error) {
if fn == nil {
@ -128,6 +226,9 @@ func (r *Registry) Register(ctx context.Context, fn *FunctionDefinition, wasmByt
return nil, &DeployError{FunctionName: fn.Name, Cause: err}
}
r.invalidateFn(fn.Namespace, fn.Name)
r.invalidateEnv(id)
r.logger.Info("Function registered",
zap.String("id", id),
zap.String("name", fn.Name),
@ -146,6 +247,12 @@ func (r *Registry) Get(ctx context.Context, namespace, name string, version int)
namespace = strings.TrimSpace(namespace)
name = strings.TrimSpace(name)
// Cache hit (bugboard #708): skip the leader-routed weak read entirely.
cacheKey := fnCacheKey(namespace, name, version)
if fn, ok := r.cachedFn(cacheKey); ok {
return fn, nil
}
var query string
var args []interface{}
@ -184,13 +291,17 @@ func (r *Registry) Get(ctx context.Context, namespace, name string, version int)
}
if len(functions) == 0 {
// Do NOT cache misses — a just-deployed function must be visible
// immediately on the next call, not after the TTL.
if version == 0 {
return nil, ErrFunctionNotFound
}
return nil, ErrVersionNotFound
}
return r.rowToFunction(&functions[0]), nil
fn := r.rowToFunction(&functions[0])
r.storeFn(cacheKey, fn)
return fn, nil
}
// List returns all functions for a namespace.
@ -252,6 +363,7 @@ func (r *Registry) SetEnabled(ctx context.Context, namespace, name string, enabl
if rowsAffected == 0 {
return ErrFunctionNotFound
}
r.invalidateFn(namespace, name)
r.logger.Info("Function enabled-state updated",
zap.String("namespace", namespace),
zap.String("name", name),
@ -290,6 +402,8 @@ func (r *Registry) Delete(ctx context.Context, namespace, name string, version i
return ErrVersionNotFound
}
r.invalidateFn(namespace, name)
r.logger.Info("Function deleted",
zap.String("namespace", namespace),
zap.String("name", name),
@ -321,6 +435,10 @@ func (r *Registry) GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, er
// GetEnvVars retrieves environment variables for a function.
func (r *Registry) GetEnvVars(ctx context.Context, functionID string) (map[string]string, error) {
if env, ok := r.cachedEnv(functionID); ok {
return env, nil
}
query := `SELECT key, value FROM function_env_vars WHERE function_id = ?`
var rows []envVarRow
@ -333,6 +451,7 @@ func (r *Registry) GetEnvVars(ctx context.Context, functionID string) (map[strin
envVars[row.Key] = row.Value
}
r.storeEnv(functionID, envVars)
return envVars, nil
}

View File

@ -0,0 +1,118 @@
package serverless
import (
"testing"
"time"
"go.uber.org/zap"
)
// Bugboard #708 — function metadata + env vars are cached in-process so a burst
// of invokes doesn't pay a leader-routed weak read per op. These pin the cache
// hit/miss/TTL/invalidation behavior and the dedup'd authorization decision.
func newTestRegistry() *Registry {
return NewRegistry(NewMockRQLite(), NewMockIPFSClient(), RegistryConfig{}, zap.NewNop())
}
func TestRegistryCache_hitAndInvalidate(t *testing.T) {
r := newTestRegistry()
key := fnCacheKey("ns", "fn", 0)
fn := &Function{ID: "id-1", Name: "fn", Namespace: "ns"}
if _, ok := r.cachedFn(key); ok {
t.Fatal("empty cache must miss")
}
r.storeFn(key, fn)
got, ok := r.cachedFn(key)
if !ok || got != fn {
t.Fatalf("expected cache hit returning the stored fn; ok=%v got=%v", ok, got)
}
// Deploy/enable/disable/delete must drop every cached version.
r.storeFn(fnCacheKey("ns", "fn", 3), &Function{ID: "id-3", Name: "fn", Namespace: "ns"})
r.invalidateFn("ns", "fn")
if _, ok := r.cachedFn(key); ok {
t.Error("invalidateFn must drop the version-0 entry")
}
if _, ok := r.cachedFn(fnCacheKey("ns", "fn", 3)); ok {
t.Error("invalidateFn must drop ALL versions of the function")
}
}
func TestRegistryCache_invalidateScopedToFunction(t *testing.T) {
r := newTestRegistry()
r.storeFn(fnCacheKey("ns", "keep", 0), &Function{ID: "k", Name: "keep", Namespace: "ns"})
r.storeFn(fnCacheKey("ns", "drop", 0), &Function{ID: "d", Name: "drop", Namespace: "ns"})
r.invalidateFn("ns", "drop")
if _, ok := r.cachedFn(fnCacheKey("ns", "drop", 0)); ok {
t.Error("target function must be invalidated")
}
if _, ok := r.cachedFn(fnCacheKey("ns", "keep", 0)); !ok {
t.Error("a DIFFERENT function must NOT be invalidated (prefix must include the null separator)")
}
}
func TestRegistryCache_ttlExpiry(t *testing.T) {
r := newTestRegistry()
key := fnCacheKey("ns", "fn", 0)
// Backdate the entry beyond the TTL.
r.fnCache[key] = fnCacheEntry{fn: &Function{ID: "x"}, at: time.Now().Add(-2 * r.cacheTTL)}
if _, ok := r.cachedFn(key); ok {
t.Error("an entry older than the TTL must be treated as a miss")
}
}
func TestRegistryCache_envHitAndTTL(t *testing.T) {
r := newTestRegistry()
if _, ok := r.cachedEnv("fid"); ok {
t.Fatal("empty env cache must miss")
}
r.storeEnv("fid", map[string]string{"K": "V"})
if env, ok := r.cachedEnv("fid"); !ok || env["K"] != "V" {
t.Fatalf("expected env cache hit; ok=%v env=%v", ok, env)
}
r.envCache["fid"] = envCacheEntry{env: map[string]string{"K": "V"}, at: time.Now().Add(-2 * r.cacheTTL)}
if _, ok := r.cachedEnv("fid"); ok {
t.Error("env entry older than the TTL must miss")
}
}
func TestRegistryCache_envInvalidatedOnRedeploy(t *testing.T) {
// A redeploy REUSES the function ID (Register: id = oldFn.ID) and rewrites
// env vars under it, so Register must drop the env cache for that ID — else
// a changed env var (e.g. a rotated endpoint) is masked for up to the TTL.
r := newTestRegistry()
r.storeEnv("fid", map[string]string{"K": "old"})
if env, ok := r.cachedEnv("fid"); !ok || env["K"] != "old" {
t.Fatal("precondition: env should be cached")
}
r.invalidateEnv("fid") // what Register now calls
if _, ok := r.cachedEnv("fid"); ok {
t.Error("env cache must be invalidated on redeploy (reused ID); a changed env var must not be served stale")
}
}
func TestRegistryCache_keyDistinctNoCollision(t *testing.T) {
// Guard the null-separated key: "a"+"bc" must not collide with "ab"+"c".
if fnCacheKey("a", "bc", 0) == fnCacheKey("ab", "c", 0) {
t.Error("cache keys must not collide across namespace/name boundaries")
}
}
func TestCanInvokeFn(t *testing.T) {
if !canInvokeFn(&Function{IsPublic: true}, "") {
t.Error("public function must be invokable by an anonymous caller")
}
if canInvokeFn(&Function{IsPublic: false}, "") {
t.Error("private function must reject an empty (anonymous) caller")
}
if canInvokeFn(&Function{IsPublic: false}, " ") {
t.Error("private function must reject a whitespace-only caller")
}
if !canInvokeFn(&Function{IsPublic: false}, "wallet-abc") {
t.Error("private function must accept an identified caller")
}
}