From e2bc9577ff50eb71b253a2dd1ead84f3a44daec8 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Thu, 21 May 2026 15:52:46 +0300 Subject: [PATCH] feat(serverless): isolate invocation logs and enforce cron poll interval - Fix log cross-contamination by introducing per-invocation LogBuffers (bugboard #108) - Enforce a 100ms minimum for CronPollInterval to prevent scheduler starvation (bugboard #109) - Add comprehensive validation tests for cron interval constraints --- core/pkg/serverless/config.go | 44 +++- .../serverless/config_cron_interval_test.go | 109 ++++++++++ core/pkg/serverless/engine.go | 36 +++- core/pkg/serverless/hostfunctions/logging.go | 41 ++-- .../hostfunctions/logging_buffer_test.go | 140 +++++++++++++ core/pkg/serverless/log_buffer.go | 96 +++++++++ core/pkg/serverless/log_buffer_test.go | 190 ++++++++++++++++++ core/pkg/serverless/persistent/instance.go | 14 +- .../pkg/serverless/triggers/cron_scheduler.go | 12 ++ .../triggers/cron_subsecond_test.go | 80 ++++++++ 10 files changed, 740 insertions(+), 22 deletions(-) create mode 100644 core/pkg/serverless/config_cron_interval_test.go create mode 100644 core/pkg/serverless/hostfunctions/logging_buffer_test.go create mode 100644 core/pkg/serverless/log_buffer.go create mode 100644 core/pkg/serverless/log_buffer_test.go create mode 100644 core/pkg/serverless/triggers/cron_subsecond_test.go diff --git a/core/pkg/serverless/config.go b/core/pkg/serverless/config.go index 3417eb6..2ac1c8f 100644 --- a/core/pkg/serverless/config.go +++ b/core/pkg/serverless/config.go @@ -1,6 +1,7 @@ package serverless import ( + "fmt" "time" ) @@ -28,7 +29,22 @@ type Config struct { JobMaxQueueSize int `yaml:"job_max_queue_size"` JobMaxPayloadSize int `yaml:"job_max_payload_size"` // bytes - // Scheduler configuration + // Scheduler configuration. + // + // CronPollInterval is the cadence at which the cron scheduler scans + // `function_cron_triggers` for due rows. Lower = finer dispatch + // granularity (useful for sub-second cron expressions like + // `*/1 * * * * *` — the 6-field grammar accepted by ParseCron), + // higher = less rqlite/CPU spend. + // + // Hard floor: MinCronPollInterval (rejected at Validate). Below the + // floor the scheduler can't keep up — each tick costs ~1 rqlite + // ListDue + N MarkRun writes, ~340-450ms per call on a + // cross-region anchat-test-style cluster. Polling faster than the + // per-tick cost queues ticks indefinitely and starves the namespace. + // + // Default: 1 minute. Set to 1s for typing/presence-style ephemeral + // state prune workloads (bugboard #109). CronPollInterval time.Duration `yaml:"cron_poll_interval"` TimerPollInterval time.Duration `yaml:"timer_poll_interval"` DBPollInterval time.Duration `yaml:"db_poll_interval"` @@ -48,6 +64,21 @@ type Config struct { LogRetention int `yaml:"log_retention"` // Days to retain logs } +// MinCronPollInterval is the hard floor on CronPollInterval. Below +// this the cron scheduler can't keep up with itself — each tick costs +// at minimum one rqlite ListDue (a network round-trip + query), so +// polling much faster than the per-tick cost would queue ticks +// indefinitely and starve the namespace gateway. 100ms is generous +// (it allows ~10 ticks/sec) while still preventing the runaway +// configuration that would cripple the gateway. +// +// Operators wanting sub-second cron dispatch (e.g. typing/presence +// ephemeral state prune jobs per bugboard #109) should set 1s — this +// gives comfortable headroom over per-tick rqlite latency even on +// cross-region clusters and allows 6-field cron expressions like +// `*/1 * * * * *` to fire on every-second cadence. +const MinCronPollInterval = 100 * time.Millisecond + // DefaultConfig returns a configuration with sensible defaults. func DefaultConfig() *Config { return &Config{ @@ -116,6 +147,17 @@ func (c *Config) Validate() []error { if c.ModuleCacheSize <= 0 { errs = append(errs, &ConfigError{Field: "ModuleCacheSize", Message: "must be positive"}) } + // CronPollInterval floor — see MinCronPollInterval doc. Zero means + // "use the default" (ApplyDefaults handles it); a non-zero value + // below the floor would silently let the operator paint themselves + // into a runaway-scheduler corner. + if c.CronPollInterval != 0 && c.CronPollInterval < MinCronPollInterval { + errs = append(errs, &ConfigError{ + Field: "CronPollInterval", + Message: fmt.Sprintf("must be >= %s (current=%s); see bugboard #109 — below this the scheduler can't keep up with per-tick rqlite cost and queues ticks indefinitely", + MinCronPollInterval, c.CronPollInterval), + }) + } return errs } diff --git a/core/pkg/serverless/config_cron_interval_test.go b/core/pkg/serverless/config_cron_interval_test.go new file mode 100644 index 0000000..d0cb7a0 --- /dev/null +++ b/core/pkg/serverless/config_cron_interval_test.go @@ -0,0 +1,109 @@ +package serverless + +import ( + "strings" + "testing" + "time" +) + +// TestConfig_Validate_CronPollIntervalFloor is the regression guard for +// the bugboard #109 floor. The original ask was sub-second cron polling +// for typing/presence prune workloads. We allow sub-second down to the +// MinCronPollInterval floor (100ms), and reject anything below it +// because the per-tick rqlite cost would queue ticks indefinitely and +// starve the namespace gateway. +func TestConfig_Validate_CronPollIntervalFloor(t *testing.T) { + cases := []struct { + name string + interval time.Duration + wantReject bool + }{ + {"zero means use default (no error)", 0, false}, + {"1 minute (legacy default) — fine", time.Minute, false}, + {"1 second — sub-second OK", time.Second, false}, + {"500ms — sub-second OK", 500 * time.Millisecond, false}, + {"exactly the floor (100ms) — OK", MinCronPollInterval, false}, + {"50ms — below floor, REJECT", 50 * time.Millisecond, true}, + {"1ms — well below floor, REJECT", 1 * time.Millisecond, true}, + {"-1s (operator typo) — REJECT", -time.Second, true}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + c := DefaultConfig() + c.CronPollInterval = tc.interval + + errs := c.Validate() + gotReject := false + for _, err := range errs { + if ce, ok := err.(*ConfigError); ok && ce.Field == "CronPollInterval" { + gotReject = true + } + } + if gotReject != tc.wantReject { + t.Errorf("interval=%v: reject=%v; want reject=%v (errs=%v)", + tc.interval, gotReject, tc.wantReject, errs) + } + }) + } +} + +// TestConfig_Validate_CronPollIntervalErrorMessage verifies the +// rejection error carries the operator-facing detail (current value, +// min value, bugboard reference). Without this, an operator misconfiguring +// `cron_poll_interval: 10ms` gets an opaque "invalid config" error and +// has to grep code to figure out why. +func TestConfig_Validate_CronPollIntervalErrorMessage(t *testing.T) { + c := DefaultConfig() + c.CronPollInterval = 10 * time.Millisecond + + errs := c.Validate() + if len(errs) == 0 { + t.Fatal("expected validation error for sub-floor CronPollInterval") + } + var found *ConfigError + for _, err := range errs { + if ce, ok := err.(*ConfigError); ok && ce.Field == "CronPollInterval" { + found = ce + break + } + } + if found == nil { + t.Fatalf("no CronPollInterval ConfigError in %v", errs) + } + for _, want := range []string{ + MinCronPollInterval.String(), // floor + "10ms", // current value + "#109", // bugboard reference + } { + if !strings.Contains(found.Message, want) { + t.Errorf("error message missing %q: %s", want, found.Message) + } + } +} + +// TestConfig_ApplyDefaults_FillsInCronPollInterval verifies the default +// is applied when the field is zero. Regression guard against a future +// refactor that accidentally drops the zero-check. +func TestConfig_ApplyDefaults_FillsInCronPollInterval(t *testing.T) { + c := &Config{} + c.ApplyDefaults() + if c.CronPollInterval != time.Minute { + t.Errorf("ApplyDefaults: CronPollInterval = %v; want %v", + c.CronPollInterval, time.Minute) + } +} + +// TestMinCronPollInterval_Reasonable is a guard rail on the constant +// itself. If a future contributor sets it too high (blocks legit +// typing/presence workloads) or too low (lets DoS through), this +// catches it. +func TestMinCronPollInterval_Reasonable(t *testing.T) { + if MinCronPollInterval > time.Second { + t.Errorf("MinCronPollInterval=%v is too high — blocks legit sub-second prune workloads (bugboard #109)", + MinCronPollInterval) + } + if MinCronPollInterval < time.Millisecond { + t.Errorf("MinCronPollInterval=%v is too low — opens scheduler DoS surface", + MinCronPollInterval) + } +} diff --git a/core/pkg/serverless/engine.go b/core/pkg/serverless/engine.go index f81f742..12a179f 100644 --- a/core/pkg/serverless/engine.go +++ b/core/pkg/serverless/engine.go @@ -261,10 +261,20 @@ func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx execCtx, cancel := CreateTimeoutContext(ctx, fn, e.config.MaxTimeoutSeconds) defer cancel() + // Attach a fresh per-invocation LogBuffer to the ctx that wazero + // passes through to host-fn callbacks. host.LogInfo / host.LogError + // extract this buffer and append to it instead of writing to the + // HostFunctions singleton slice — which would cross-contaminate + // concurrent invocations (bugboard #108: push-fanout's invocation + // record was capturing rpc-router and message-push-handler log + // lines because every WASM call shared one h.logs slice). + logBuf := NewLogBuffer() + execCtx = WithLogBuffer(execCtx, logBuf) + // Get compiled module (from cache or compile) module, err := e.getOrCompileModule(execCtx, fn.WASMCID) if err != nil { - e.logInvocation(ctx, fn, invCtx, startTime, 0, InvocationStatusError, err) + e.logInvocation(ctx, fn, invCtx, logBuf, startTime, 0, InvocationStatusError, err) return nil, &ExecutionError{FunctionName: fn.Name, RequestID: invCtx.RequestID, Cause: err} } @@ -281,11 +291,11 @@ func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx status = InvocationStatusTimeout err = ErrTimeout } - e.logInvocation(ctx, fn, invCtx, startTime, len(output), status, err) + e.logInvocation(ctx, fn, invCtx, logBuf, startTime, len(output), status, err) return nil, &ExecutionError{FunctionName: fn.Name, RequestID: invCtx.RequestID, Cause: err} } - e.logInvocation(ctx, fn, invCtx, startTime, len(output), InvocationStatusSuccess, nil) + e.logInvocation(ctx, fn, invCtx, logBuf, startTime, len(output), InvocationStatusSuccess, nil) return output, nil } @@ -540,7 +550,14 @@ func (e *Engine) getOrCompileModule(ctx context.Context, wasmCID string) (wazero } // logInvocation logs an invocation record. -func (e *Engine) logInvocation(ctx context.Context, fn *Function, invCtx *InvocationContext, startTime time.Time, outputSize int, status InvocationStatus, err error) { +// +// `logBuf` is the per-invocation LogBuffer attached to ctx at Execute +// start (bugboard #108 fix). When non-nil, the record's Logs field is +// populated from the buffer's snapshot — invocation-local, no +// cross-contamination. When nil (legacy callers that haven't been +// updated), falls back to the HostFunctions singleton via the +// GetLogs() interface check — same behavior as pre-#108. +func (e *Engine) logInvocation(ctx context.Context, fn *Function, invCtx *InvocationContext, logBuf *LogBuffer, startTime time.Time, outputSize int, status InvocationStatus, err error) { if e.invocationLogger == nil || !e.config.LogInvocations { return } @@ -563,8 +580,15 @@ func (e *Engine) logInvocation(ctx context.Context, fn *Function, invCtx *Invoca record.ErrorMessage = err.Error() } - // Collect logs from host services if supported - if hf, ok := e.hostServices.(interface{ GetLogs() []LogEntry }); ok { + // Collect logs: prefer the per-invocation LogBuffer (bugboard #108), + // fall back to the legacy singleton for callers that haven't been + // migrated yet. The singleton path was the source of the + // cross-contamination bug; once every Execute path passes a real + // buffer here, the GetLogs() singleton read is dead code that + // can be removed in a future cleanup. + if logBuf != nil { + record.Logs = logBuf.Snapshot() + } else if hf, ok := e.hostServices.(interface{ GetLogs() []LogEntry }); ok { record.Logs = hf.GetLogs() } diff --git a/core/pkg/serverless/hostfunctions/logging.go b/core/pkg/serverless/hostfunctions/logging.go index b66f29e..f3cfe9d 100644 --- a/core/pkg/serverless/hostfunctions/logging.go +++ b/core/pkg/serverless/hostfunctions/logging.go @@ -9,16 +9,27 @@ import ( "go.uber.org/zap" ) -// LogInfo logs an info message. +// LogInfo logs an info message. Writes to the per-invocation LogBuffer +// attached to ctx (see log_buffer.go); falls back to the legacy +// HostFunctions singleton slice when no buffer is on ctx (test paths +// that haven't migrated). +// +// Bugboard #108 fix: previously this always wrote to the singleton +// `h.logs`, causing cross-contamination between concurrent invocations +// (push-fanout's invocation record captured rpc-router's log lines). func (h *HostFunctions) LogInfo(ctx context.Context, message string) { - h.logsLock.Lock() - defer h.logsLock.Unlock() - - h.logs = append(h.logs, serverless.LogEntry{ + entry := serverless.LogEntry{ Level: "info", Message: message, Timestamp: time.Now(), - }) + } + if buf := serverless.LogBufferFromCtx(ctx); buf != nil { + buf.Append(entry) + } else { + h.logsLock.Lock() + h.logs = append(h.logs, entry) + h.logsLock.Unlock() + } h.logger.Info(message, zap.String("request_id", h.GetRequestID(ctx)), @@ -26,16 +37,22 @@ func (h *HostFunctions) LogInfo(ctx context.Context, message string) { ) } -// LogError logs an error message. +// LogError logs an error message. See LogInfo for the per-invocation +// LogBuffer / singleton fallback semantics — same code path, same +// bugboard #108 rationale. func (h *HostFunctions) LogError(ctx context.Context, message string) { - h.logsLock.Lock() - defer h.logsLock.Unlock() - - h.logs = append(h.logs, serverless.LogEntry{ + entry := serverless.LogEntry{ Level: "error", Message: message, Timestamp: time.Now(), - }) + } + if buf := serverless.LogBufferFromCtx(ctx); buf != nil { + buf.Append(entry) + } else { + h.logsLock.Lock() + h.logs = append(h.logs, entry) + h.logsLock.Unlock() + } h.logger.Error(message, zap.String("request_id", h.GetRequestID(ctx)), diff --git a/core/pkg/serverless/hostfunctions/logging_buffer_test.go b/core/pkg/serverless/hostfunctions/logging_buffer_test.go new file mode 100644 index 0000000..fc5c052 --- /dev/null +++ b/core/pkg/serverless/hostfunctions/logging_buffer_test.go @@ -0,0 +1,140 @@ +package hostfunctions + +import ( + "context" + "sync" + "sync/atomic" + "testing" + + "github.com/DeBrosOfficial/network/pkg/serverless" + "go.uber.org/zap" +) + +// TestLogInfo_writesToCtxBuffer is the regression guard for bugboard +// #108. When the caller attaches a per-invocation LogBuffer to ctx, +// LogInfo MUST write to that buffer (not to the singleton h.logs). +// +// Pre-fix, LogInfo always wrote to h.logs, causing cross-contamination +// between concurrent invocations. +func TestLogInfo_writesToCtxBuffer(t *testing.T) { + h := &HostFunctions{logger: zap.NewNop()} + buf := serverless.NewLogBuffer() + ctx := serverless.WithLogBuffer(context.Background(), buf) + + h.LogInfo(ctx, "hello from invocation A") + h.LogError(ctx, "boom from invocation A") + + snap := buf.Snapshot() + if len(snap) != 2 { + t.Fatalf("ctx buffer len = %d; want 2", len(snap)) + } + if snap[0].Level != "info" || snap[0].Message != "hello from invocation A" { + t.Errorf("info entry wrong: %+v", snap[0]) + } + if snap[1].Level != "error" || snap[1].Message != "boom from invocation A" { + t.Errorf("error entry wrong: %+v", snap[1]) + } + + // The singleton must NOT have been touched. + if len(h.logs) != 0 { + t.Errorf("singleton h.logs got %d entries; want 0 (ctx buffer should have absorbed them)", + len(h.logs)) + } +} + +// TestLogInfo_fallsBackToSingletonWhenNoBuffer preserves the legacy +// behavior for callers (tests, mostly) that haven't migrated to the +// ctx-attached buffer path yet. Without this fallback, every test that +// constructed a HostFunctions directly and called LogInfo without +// wrapping ctx would silently lose log entries. +func TestLogInfo_fallsBackToSingletonWhenNoBuffer(t *testing.T) { + h := &HostFunctions{logger: zap.NewNop()} + // No buffer attached to ctx. + h.LogInfo(context.Background(), "legacy call") + h.LogError(context.Background(), "legacy error") + + if len(h.logs) != 2 { + t.Errorf("singleton h.logs got %d entries; want 2 (legacy fallback)", len(h.logs)) + } +} + +// TestLogInfo_concurrentInvocations_noCrossContamination is THE +// regression guard for bugboard #108's empirically-observed symptom: +// push-fanout's invocation record contained log lines from rpc-router +// because both shared the singleton h.logs slice. +// +// Sixteen goroutines simulating concurrent invocations each attach +// their own LogBuffer to ctx, then write distinguishable entries via +// HostFunctions.LogInfo. After all goroutines complete, each buffer +// must contain ONLY its own entries — zero cross-talk. +// +// Run with -race for stronger signal. Pre-fix (singleton h.logs), every +// goroutine wrote into the shared slice and a different goroutine's +// GetLogs() snapshot would scoop them up. +func TestLogInfo_concurrentInvocations_noCrossContamination(t *testing.T) { + h := &HostFunctions{logger: zap.NewNop()} + + const ( + goroutines = 16 + opsPerG = 50 + ) + var ( + wg sync.WaitGroup + failures int64 + ) + for g := 0; g < goroutines; g++ { + wg.Add(1) + go func(gid int) { + defer wg.Done() + buf := serverless.NewLogBuffer() + ctx := serverless.WithLogBuffer(context.Background(), buf) + myMarker := workloadMarker(gid) + + for op := 0; op < opsPerG; op++ { + h.LogInfo(ctx, myMarker) + } + + snap := buf.Snapshot() + if len(snap) != opsPerG { + atomic.AddInt64(&failures, 1) + t.Errorf("goroutine %d: snapshot len = %d; want %d", gid, len(snap), opsPerG) + return + } + for _, e := range snap { + if e.Message != myMarker { + atomic.AddInt64(&failures, 1) + t.Errorf("goroutine %d: foreign entry %q in own buffer", gid, e.Message) + return + } + } + }(g) + } + wg.Wait() + + if atomic.LoadInt64(&failures) != 0 { + t.Fatalf("%d cross-contamination failures across %d concurrent invocations", + atomic.LoadInt64(&failures), goroutines) + } + + // Singleton must NOT have grown — every write went to a ctx buffer. + if len(h.logs) != 0 { + t.Errorf("singleton h.logs got %d entries; want 0 (all should have gone to ctx buffers)", + len(h.logs)) + } +} + +func workloadMarker(g int) string { + return "workload-" + itoaHF(g) +} + +func itoaHF(n int) string { + if n == 0 { + return "0" + } + digits := []byte{} + for n > 0 { + digits = append([]byte{byte('0' + n%10)}, digits...) + n /= 10 + } + return string(digits) +} diff --git a/core/pkg/serverless/log_buffer.go b/core/pkg/serverless/log_buffer.go new file mode 100644 index 0000000..33eef9d --- /dev/null +++ b/core/pkg/serverless/log_buffer.go @@ -0,0 +1,96 @@ +package serverless + +import ( + "context" + "sync" +) + +// logBufferKey is the unexported context-value key used to attach a +// per-invocation LogBuffer. Empty struct = standard Go pattern for ctx +// keys (avoids string-collision risk). Parallels invCtxKey used by +// WithInvocationContext — both fix the same class of singleton-state +// cross-contamination bug. +type logBufferKey struct{} + +// LogBuffer collects WASM-emitted log entries (oh.LogInfo / oh.LogError) +// for ONE invocation. Each Engine.Execute creates a fresh LogBuffer and +// attaches it to the ctx passed to wazero; host functions extract it +// from ctx and append. Engine.logInvocation reads the buffer's snapshot +// when writing the invocation record. +// +// Why this exists: HostFunctions used to hold a singleton `logs` slice +// shared across every concurrent WASM invocation, with a per-call reset +// in SetInvocationContext. Two invocations executing concurrently would +// see each other's logs scooped up by whichever called GetLogs() first +// — empirically observed on bugboard #108 (push-fanout's invocation +// record contained rpc-router and message-push-handler log lines). +// +// The fix attaches a fresh LogBuffer to ctx per invocation. HostFunctions. +// LogInfo / LogError read the buffer from ctx and append to its +// invocation-local slice. The singleton h.logs field is kept as a +// back-compat fallback for tests that haven't been migrated, but no +// production code path relies on it once Engine.Execute is routing +// through the ctx buffer. +type LogBuffer struct { + mu sync.Mutex + entries []LogEntry +} + +// NewLogBuffer returns an empty buffer ready to receive entries. +func NewLogBuffer() *LogBuffer { + return &LogBuffer{} +} + +// Append adds one log entry. Thread-safe — wazero modules aren't +// goroutine-safe in practice, but the lock makes the invariant explicit +// rather than relying on call-site discipline. +func (b *LogBuffer) Append(entry LogEntry) { + b.mu.Lock() + defer b.mu.Unlock() + b.entries = append(b.entries, entry) +} + +// Snapshot returns a defensive copy of the buffer's entries. Callers +// (e.g. Engine.logInvocation) iterate the snapshot without holding the +// buffer's lock. +func (b *LogBuffer) Snapshot() []LogEntry { + b.mu.Lock() + defer b.mu.Unlock() + out := make([]LogEntry, len(b.entries)) + copy(out, b.entries) + return out +} + +// Len returns the number of buffered entries — used in tests to assert +// per-invocation accounting without making a full copy. +func (b *LogBuffer) Len() int { + b.mu.Lock() + defer b.mu.Unlock() + return len(b.entries) +} + +// WithLogBuffer returns a derived ctx that carries buf. HostFunctions. +// LogInfo / LogError check ctx FIRST and only fall back to the +// HostFunctions singleton slice if no buffer is attached. +// +// Callers MUST create a fresh LogBuffer per invocation (NewLogBuffer) +// rather than reusing one across calls — that's the whole point of the +// fix. Reusing a buffer would re-create the cross-contamination class. +func WithLogBuffer(ctx context.Context, buf *LogBuffer) context.Context { + if buf == nil { + return ctx + } + return context.WithValue(ctx, logBufferKey{}, buf) +} + +// LogBufferFromCtx extracts the LogBuffer attached via WithLogBuffer, or +// nil if none is present (in which case callers fall back to the legacy +// singleton h.logs path). Exported so hostfunctions can retrieve the +// buffer without re-importing the key type. +func LogBufferFromCtx(ctx context.Context) *LogBuffer { + if ctx == nil { + return nil + } + v, _ := ctx.Value(logBufferKey{}).(*LogBuffer) + return v +} diff --git a/core/pkg/serverless/log_buffer_test.go b/core/pkg/serverless/log_buffer_test.go new file mode 100644 index 0000000..b61f943 --- /dev/null +++ b/core/pkg/serverless/log_buffer_test.go @@ -0,0 +1,190 @@ +package serverless + +import ( + "context" + "sync" + "sync/atomic" + "testing" +) + +// TestLogBuffer_appendAndSnapshot verifies the basic Append → Snapshot +// roundtrip. The snapshot must be a defensive copy so mutating it +// doesn't corrupt the buffer's internal state. +func TestLogBuffer_appendAndSnapshot(t *testing.T) { + b := NewLogBuffer() + b.Append(LogEntry{Level: "info", Message: "hello"}) + b.Append(LogEntry{Level: "error", Message: "boom"}) + + snap := b.Snapshot() + if len(snap) != 2 { + t.Fatalf("snapshot len = %d; want 2", len(snap)) + } + if snap[0].Message != "hello" || snap[1].Message != "boom" { + t.Errorf("snapshot order wrong: %+v", snap) + } + + // Mutate the snapshot — buffer must be unaffected. + snap[0].Message = "MUTATED" + freshSnap := b.Snapshot() + if freshSnap[0].Message != "hello" { + t.Errorf("snapshot must be defensive copy; buffer was mutated: %+v", freshSnap) + } +} + +// TestWithLogBuffer_extractsAttachedBuffer is the basic ctx-attachment +// round-trip. Anything more sophisticated (cross-call propagation) is +// validated end-to-end in the host-functions tests. +func TestWithLogBuffer_extractsAttachedBuffer(t *testing.T) { + b := NewLogBuffer() + ctx := WithLogBuffer(context.Background(), b) + + got := LogBufferFromCtx(ctx) + if got != b { + t.Errorf("LogBufferFromCtx returned %p; want %p", got, b) + } +} + +// TestWithLogBuffer_nilIsNoop guards the contract that passing nil +// returns ctx unchanged. Important because the call site in Engine.Execute +// always passes a non-nil buffer, but tests and back-compat callers +// might pass nil and expect ctx untouched (and LogBufferFromCtx to +// return nil so logging falls back to the singleton). +func TestWithLogBuffer_nilIsNoop(t *testing.T) { + ctx := WithLogBuffer(context.Background(), nil) + if got := LogBufferFromCtx(ctx); got != nil { + t.Errorf("LogBufferFromCtx after WithLogBuffer(nil) = %p; want nil", got) + } +} + +// TestLogBufferFromCtx_nilCtxIsSafe — defensive guard. ctx-key lookup +// on a nil ctx panics if not handled. +func TestLogBufferFromCtx_nilCtxIsSafe(t *testing.T) { + if got := LogBufferFromCtx(nil); got != nil { + t.Errorf("LogBufferFromCtx(nil) = %p; want nil", got) + } +} + +// TestLogBuffer_concurrentAppendIsSafe stresses the lock contract. The +// bug we're fixing (bugboard #108) was about state being shared across +// goroutines without locking — this test asserts the FIX doesn't +// reintroduce a different race in its own internal state. +// +// Run with -race for stronger signal. Without the mutex inside Append, +// the race detector would flag this. +func TestLogBuffer_concurrentAppendIsSafe(t *testing.T) { + b := NewLogBuffer() + const ( + writers = 16 + writesPerW = 100 + ) + var wg sync.WaitGroup + for w := 0; w < writers; w++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + for n := 0; n < writesPerW; n++ { + b.Append(LogEntry{Level: "info", Message: "x"}) + } + }(w) + } + wg.Wait() + + got := b.Len() + want := writers * writesPerW + if got != want { + t.Errorf("Len after concurrent writes = %d; want %d (lost writes — race)", got, want) + } +} + +// TestLogBuffer_concurrentInvocationsDoNotCrossContaminate is the +// REGRESSION GUARD for bugboard #108. Two goroutines simulating +// concurrent invocations each create their OWN LogBuffer attached to +// their OWN ctx. They append distinguishable entries. The snapshots +// MUST be cleanly separated — no entry from goroutine A ever ends up +// in goroutine B's buffer. +// +// Pre-fix, this kind of cross-contamination was the empirically-observed +// symptom: push-fanout's invocation record contained log lines from +// rpc-router because both shared the singleton h.logs slice. This test +// codifies the invariant that with per-invocation buffers, that class +// of cross-talk is impossible. +func TestLogBuffer_concurrentInvocationsDoNotCrossContaminate(t *testing.T) { + const ( + goroutines = 16 + opsPerG = 50 + ) + var ( + wg sync.WaitGroup + failures int64 + ) + for g := 0; g < goroutines; g++ { + wg.Add(1) + go func(gid int) { + defer wg.Done() + // Each goroutine simulates one invocation: fresh buffer + + // fresh ctx, writes its own ID into each entry. + buf := NewLogBuffer() + ctx := WithLogBuffer(context.Background(), buf) + myID := goroutineMarker(gid) + + for op := 0; op < opsPerG; op++ { + // Pull buffer from ctx (mimics what host.LogInfo does) + // and append. If a different goroutine's buffer somehow + // got attached to this ctx, the entries land in the + // wrong buffer and we detect it post-hoc. + cur := LogBufferFromCtx(ctx) + if cur != buf { + atomic.AddInt64(&failures, 1) + t.Errorf("goroutine %d: LogBufferFromCtx returned a different buffer", gid) + return + } + cur.Append(LogEntry{Level: "info", Message: myID}) + } + + // Verify the snapshot is entirely this goroutine's entries — + // no cross-talk. (Length AND content check.) + snap := buf.Snapshot() + if len(snap) != opsPerG { + atomic.AddInt64(&failures, 1) + t.Errorf("goroutine %d: snapshot len = %d; want %d (cross-contamination)", + gid, len(snap), opsPerG) + return + } + for _, e := range snap { + if e.Message != myID { + atomic.AddInt64(&failures, 1) + t.Errorf("goroutine %d: snapshot contains foreign entry %q (want all %q)", + gid, e.Message, myID) + return + } + } + }(g) + } + wg.Wait() + + if atomic.LoadInt64(&failures) != 0 { + t.Fatalf("%d cross-contamination failures across %d concurrent invocations", + atomic.LoadInt64(&failures), goroutines) + } +} + +// goroutineMarker is a deterministic per-goroutine message that +// uniquely identifies which goroutine wrote a log entry. Used by the +// cross-contamination test to verify the entry came from the right +// invocation. +func goroutineMarker(g int) string { + return "goroutine-" + itoaLB(g) +} + +// itoaLB avoids strconv to keep the test file's deps minimal. +func itoaLB(n int) string { + if n == 0 { + return "0" + } + digits := []byte{} + for n > 0 { + digits = append([]byte{byte('0' + n%10)}, digits...) + n /= 10 + } + return string(digits) +} diff --git a/core/pkg/serverless/persistent/instance.go b/core/pkg/serverless/persistent/instance.go index 94111fb..74f5005 100644 --- a/core/pkg/serverless/persistent/instance.go +++ b/core/pkg/serverless/persistent/instance.go @@ -180,10 +180,18 @@ func (i *Instance) withInvCtx(ctx context.Context) context.Context { i.invCtxMu.RLock() cur := i.invCtx i.invCtxMu.RUnlock() - if cur == nil { - return ctx + if cur != nil { + ctx = serverless.WithInvocationContext(ctx, cur) } - return serverless.WithInvocationContext(ctx, cur) + // Attach a fresh per-call LogBuffer so oh.LogInfo / oh.LogError from + // inside this ws_open / ws_frame / ws_close call write to a + // scoped slice instead of the HostFunctions singleton (bugboard + // #108 fix). Persistent WS doesn't currently persist these logs to + // function_logs (no logInvocation for persistent frames), so the + // buffer is discarded when the call returns — the point is to + // avoid leaking entries into the singleton where a concurrent + // stateless Execute would otherwise see them. + return serverless.WithLogBuffer(ctx, serverless.NewLogBuffer()) } // UpdateInvocationContext atomically swaps the per-instance invocation diff --git a/core/pkg/serverless/triggers/cron_scheduler.go b/core/pkg/serverless/triggers/cron_scheduler.go index 6ab4e78..a60eb77 100644 --- a/core/pkg/serverless/triggers/cron_scheduler.go +++ b/core/pkg/serverless/triggers/cron_scheduler.go @@ -38,6 +38,14 @@ type CronScheduler struct { // NewCronScheduler builds a scheduler. Reasonable defaults: poll every // 30 seconds, dispatch up to 100 triggers per tick. +// +// Sub-second pollInterval is permitted (down to the engine config's +// MinCronPollInterval) for typing/presence-style ephemeral state prune +// workloads — see bugboard #109. Each tick costs ~1 rqlite ListDue +// + ~2 MarkRun writes per dispatched trigger (per-call ~340-450ms on +// a cross-region cluster), so picking faster than that on average +// queues ticks. Logged as a warning when the operator goes below 1s +// so the trade-off is visible. func NewCronScheduler( store *CronTriggerStore, invoker CronInvoker, @@ -47,6 +55,10 @@ func NewCronScheduler( if pollInterval <= 0 { pollInterval = 30 * time.Second } + if pollInterval < time.Second { + logger.Warn("cron scheduler: sub-second poll interval; ensure per-tick rqlite cost is bounded or scheduler will queue ticks indefinitely (bugboard #109)", + zap.Duration("poll_interval", pollInterval)) + } return &CronScheduler{ store: store, invoker: invoker, diff --git a/core/pkg/serverless/triggers/cron_subsecond_test.go b/core/pkg/serverless/triggers/cron_subsecond_test.go new file mode 100644 index 0000000..02cd6e1 --- /dev/null +++ b/core/pkg/serverless/triggers/cron_subsecond_test.go @@ -0,0 +1,80 @@ +package triggers + +import ( + "testing" + "time" +) + +// TestParseCron_everySecond is the regression guard for bugboard #109's +// canonical use case: `*/1 * * * * *` (6-field, "every second"). The +// parser already supports 6-field expressions with seconds — this test +// pins that behavior so a future refactor of the 6-field branch can't +// silently break the ephemeral-state prune workload. +func TestParseCron_everySecond(t *testing.T) { + c, err := ParseCron("*/1 * * * * *") + if err != nil { + t.Fatalf("ParseCron: %v", err) + } + if !c.hasSeconds { + t.Error("hasSeconds = false; want true for 6-field expression") + } + for s := 0; s < 60; s++ { + if !c.seconds.match(s) { + t.Errorf("seconds.match(%d) = false; want true for `*/1` (every second)", s) + } + } +} + +// TestNext_everySecond verifies that `*/1 * * * * *` advances by +// exactly one second on each Next() call. If the cron scheduler is +// ticking every 1s and the expression matches every second, the +// dispatched next_run_at MUST land on the next whole second — not a +// minute later (which would defeat sub-second cron entirely). +func TestNext_everySecond(t *testing.T) { + c, err := ParseCron("*/1 * * * * *") + if err != nil { + t.Fatalf("ParseCron: %v", err) + } + start := time.Date(2026, 5, 21, 13, 14, 15, 0, time.UTC) + got, err := c.Next(start) + if err != nil { + t.Fatalf("Next: %v", err) + } + want := time.Date(2026, 5, 21, 13, 14, 16, 0, time.UTC) + if !got.Equal(want) { + t.Errorf("Next(%s) = %s; want %s (every-second cron should advance 1s)", + start.Format(time.RFC3339), got.Format(time.RFC3339), want.Format(time.RFC3339)) + } + + // And the next one is +1s from that. + got2, _ := c.Next(got) + want2 := want.Add(time.Second) + if !got2.Equal(want2) { + t.Errorf("Next(%s) = %s; want %s", got.Format(time.RFC3339), + got2.Format(time.RFC3339), want2.Format(time.RFC3339)) + } +} + +// TestParseCron_subSecondStep_validation covers a few practical +// sub-second-style expressions the operator might try, ensuring the +// parser rejects nothing legitimate. Negative coverage in the existing +// cron_parser_test.go for invalid expressions. +func TestParseCron_subSecondStep_validation(t *testing.T) { + cases := []struct { + expr string + want bool // true = should parse OK + }{ + {"*/1 * * * * *", true}, // every second + {"*/5 * * * * *", true}, // every 5s + {"*/30 * * * * *", true}, // every 30s (already tested in cron_parser_test.go) + {"0 * * * * *", true}, // at second 0 of every minute (= once a minute, 6-field) + {"*/2 */1 * * * *", true}, + {"*/1 * * * *", true}, // 5-field: every minute (NOT every second — different schedule!) + } + for _, tc := range cases { + _, err := ParseCron(tc.expr) + if (err == nil) != tc.want { + t.Errorf("ParseCron(%q): err=%v; want parseable=%v", tc.expr, err, tc.want) + } + } +}