feat(serverless): isolate invocation logs and enforce cron poll interval

- Fix log cross-contamination by introducing per-invocation LogBuffers
  (bugboard #108)
- Enforce a 100ms minimum for CronPollInterval to prevent scheduler
  starvation (bugboard #109)
- Add comprehensive validation tests for cron interval constraints
This commit is contained in:
anonpenguin23 2026-05-21 15:52:46 +03:00
parent 3b8139802c
commit e2bc9577ff
10 changed files with 740 additions and 22 deletions

View File

@ -1,6 +1,7 @@
package serverless package serverless
import ( import (
"fmt"
"time" "time"
) )
@ -28,7 +29,22 @@ type Config struct {
JobMaxQueueSize int `yaml:"job_max_queue_size"` JobMaxQueueSize int `yaml:"job_max_queue_size"`
JobMaxPayloadSize int `yaml:"job_max_payload_size"` // bytes JobMaxPayloadSize int `yaml:"job_max_payload_size"` // bytes
// Scheduler configuration // Scheduler configuration.
//
// CronPollInterval is the cadence at which the cron scheduler scans
// `function_cron_triggers` for due rows. Lower = finer dispatch
// granularity (useful for sub-second cron expressions like
// `*/1 * * * * *` — the 6-field grammar accepted by ParseCron),
// higher = less rqlite/CPU spend.
//
// Hard floor: MinCronPollInterval (rejected at Validate). Below the
// floor the scheduler can't keep up — each tick costs ~1 rqlite
// ListDue + N MarkRun writes, ~340-450ms per call on a
// cross-region anchat-test-style cluster. Polling faster than the
// per-tick cost queues ticks indefinitely and starves the namespace.
//
// Default: 1 minute. Set to 1s for typing/presence-style ephemeral
// state prune workloads (bugboard #109).
CronPollInterval time.Duration `yaml:"cron_poll_interval"` CronPollInterval time.Duration `yaml:"cron_poll_interval"`
TimerPollInterval time.Duration `yaml:"timer_poll_interval"` TimerPollInterval time.Duration `yaml:"timer_poll_interval"`
DBPollInterval time.Duration `yaml:"db_poll_interval"` DBPollInterval time.Duration `yaml:"db_poll_interval"`
@ -48,6 +64,21 @@ type Config struct {
LogRetention int `yaml:"log_retention"` // Days to retain logs LogRetention int `yaml:"log_retention"` // Days to retain logs
} }
// MinCronPollInterval is the hard floor on CronPollInterval. Below
// this the cron scheduler can't keep up with itself — each tick costs
// at minimum one rqlite ListDue (a network round-trip + query), so
// polling much faster than the per-tick cost would queue ticks
// indefinitely and starve the namespace gateway. 100ms is generous
// (it allows ~10 ticks/sec) while still preventing the runaway
// configuration that would cripple the gateway.
//
// Operators wanting sub-second cron dispatch (e.g. typing/presence
// ephemeral state prune jobs per bugboard #109) should set 1s — this
// gives comfortable headroom over per-tick rqlite latency even on
// cross-region clusters and allows 6-field cron expressions like
// `*/1 * * * * *` to fire on every-second cadence.
const MinCronPollInterval = 100 * time.Millisecond
// DefaultConfig returns a configuration with sensible defaults. // DefaultConfig returns a configuration with sensible defaults.
func DefaultConfig() *Config { func DefaultConfig() *Config {
return &Config{ return &Config{
@ -116,6 +147,17 @@ func (c *Config) Validate() []error {
if c.ModuleCacheSize <= 0 { if c.ModuleCacheSize <= 0 {
errs = append(errs, &ConfigError{Field: "ModuleCacheSize", Message: "must be positive"}) errs = append(errs, &ConfigError{Field: "ModuleCacheSize", Message: "must be positive"})
} }
// CronPollInterval floor — see MinCronPollInterval doc. Zero means
// "use the default" (ApplyDefaults handles it); a non-zero value
// below the floor would silently let the operator paint themselves
// into a runaway-scheduler corner.
if c.CronPollInterval != 0 && c.CronPollInterval < MinCronPollInterval {
errs = append(errs, &ConfigError{
Field: "CronPollInterval",
Message: fmt.Sprintf("must be >= %s (current=%s); see bugboard #109 — below this the scheduler can't keep up with per-tick rqlite cost and queues ticks indefinitely",
MinCronPollInterval, c.CronPollInterval),
})
}
return errs return errs
} }

View File

@ -0,0 +1,109 @@
package serverless
import (
"strings"
"testing"
"time"
)
// TestConfig_Validate_CronPollIntervalFloor is the regression guard for
// the bugboard #109 floor. The original ask was sub-second cron polling
// for typing/presence prune workloads. We allow sub-second down to the
// MinCronPollInterval floor (100ms), and reject anything below it
// because the per-tick rqlite cost would queue ticks indefinitely and
// starve the namespace gateway.
func TestConfig_Validate_CronPollIntervalFloor(t *testing.T) {
cases := []struct {
name string
interval time.Duration
wantReject bool
}{
{"zero means use default (no error)", 0, false},
{"1 minute (legacy default) — fine", time.Minute, false},
{"1 second — sub-second OK", time.Second, false},
{"500ms — sub-second OK", 500 * time.Millisecond, false},
{"exactly the floor (100ms) — OK", MinCronPollInterval, false},
{"50ms — below floor, REJECT", 50 * time.Millisecond, true},
{"1ms — well below floor, REJECT", 1 * time.Millisecond, true},
{"-1s (operator typo) — REJECT", -time.Second, true},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
c := DefaultConfig()
c.CronPollInterval = tc.interval
errs := c.Validate()
gotReject := false
for _, err := range errs {
if ce, ok := err.(*ConfigError); ok && ce.Field == "CronPollInterval" {
gotReject = true
}
}
if gotReject != tc.wantReject {
t.Errorf("interval=%v: reject=%v; want reject=%v (errs=%v)",
tc.interval, gotReject, tc.wantReject, errs)
}
})
}
}
// TestConfig_Validate_CronPollIntervalErrorMessage verifies the
// rejection error carries the operator-facing detail (current value,
// min value, bugboard reference). Without this, an operator misconfiguring
// `cron_poll_interval: 10ms` gets an opaque "invalid config" error and
// has to grep code to figure out why.
func TestConfig_Validate_CronPollIntervalErrorMessage(t *testing.T) {
c := DefaultConfig()
c.CronPollInterval = 10 * time.Millisecond
errs := c.Validate()
if len(errs) == 0 {
t.Fatal("expected validation error for sub-floor CronPollInterval")
}
var found *ConfigError
for _, err := range errs {
if ce, ok := err.(*ConfigError); ok && ce.Field == "CronPollInterval" {
found = ce
break
}
}
if found == nil {
t.Fatalf("no CronPollInterval ConfigError in %v", errs)
}
for _, want := range []string{
MinCronPollInterval.String(), // floor
"10ms", // current value
"#109", // bugboard reference
} {
if !strings.Contains(found.Message, want) {
t.Errorf("error message missing %q: %s", want, found.Message)
}
}
}
// TestConfig_ApplyDefaults_FillsInCronPollInterval verifies the default
// is applied when the field is zero. Regression guard against a future
// refactor that accidentally drops the zero-check.
func TestConfig_ApplyDefaults_FillsInCronPollInterval(t *testing.T) {
c := &Config{}
c.ApplyDefaults()
if c.CronPollInterval != time.Minute {
t.Errorf("ApplyDefaults: CronPollInterval = %v; want %v",
c.CronPollInterval, time.Minute)
}
}
// TestMinCronPollInterval_Reasonable is a guard rail on the constant
// itself. If a future contributor sets it too high (blocks legit
// typing/presence workloads) or too low (lets DoS through), this
// catches it.
func TestMinCronPollInterval_Reasonable(t *testing.T) {
if MinCronPollInterval > time.Second {
t.Errorf("MinCronPollInterval=%v is too high — blocks legit sub-second prune workloads (bugboard #109)",
MinCronPollInterval)
}
if MinCronPollInterval < time.Millisecond {
t.Errorf("MinCronPollInterval=%v is too low — opens scheduler DoS surface",
MinCronPollInterval)
}
}

View File

@ -261,10 +261,20 @@ func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx
execCtx, cancel := CreateTimeoutContext(ctx, fn, e.config.MaxTimeoutSeconds) execCtx, cancel := CreateTimeoutContext(ctx, fn, e.config.MaxTimeoutSeconds)
defer cancel() defer cancel()
// Attach a fresh per-invocation LogBuffer to the ctx that wazero
// passes through to host-fn callbacks. host.LogInfo / host.LogError
// extract this buffer and append to it instead of writing to the
// HostFunctions singleton slice — which would cross-contaminate
// concurrent invocations (bugboard #108: push-fanout's invocation
// record was capturing rpc-router and message-push-handler log
// lines because every WASM call shared one h.logs slice).
logBuf := NewLogBuffer()
execCtx = WithLogBuffer(execCtx, logBuf)
// Get compiled module (from cache or compile) // Get compiled module (from cache or compile)
module, err := e.getOrCompileModule(execCtx, fn.WASMCID) module, err := e.getOrCompileModule(execCtx, fn.WASMCID)
if err != nil { if err != nil {
e.logInvocation(ctx, fn, invCtx, startTime, 0, InvocationStatusError, err) e.logInvocation(ctx, fn, invCtx, logBuf, startTime, 0, InvocationStatusError, err)
return nil, &ExecutionError{FunctionName: fn.Name, RequestID: invCtx.RequestID, Cause: err} return nil, &ExecutionError{FunctionName: fn.Name, RequestID: invCtx.RequestID, Cause: err}
} }
@ -281,11 +291,11 @@ func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx
status = InvocationStatusTimeout status = InvocationStatusTimeout
err = ErrTimeout err = ErrTimeout
} }
e.logInvocation(ctx, fn, invCtx, startTime, len(output), status, err) e.logInvocation(ctx, fn, invCtx, logBuf, startTime, len(output), status, err)
return nil, &ExecutionError{FunctionName: fn.Name, RequestID: invCtx.RequestID, Cause: err} return nil, &ExecutionError{FunctionName: fn.Name, RequestID: invCtx.RequestID, Cause: err}
} }
e.logInvocation(ctx, fn, invCtx, startTime, len(output), InvocationStatusSuccess, nil) e.logInvocation(ctx, fn, invCtx, logBuf, startTime, len(output), InvocationStatusSuccess, nil)
return output, nil return output, nil
} }
@ -540,7 +550,14 @@ func (e *Engine) getOrCompileModule(ctx context.Context, wasmCID string) (wazero
} }
// logInvocation logs an invocation record. // logInvocation logs an invocation record.
func (e *Engine) logInvocation(ctx context.Context, fn *Function, invCtx *InvocationContext, startTime time.Time, outputSize int, status InvocationStatus, err error) { //
// `logBuf` is the per-invocation LogBuffer attached to ctx at Execute
// start (bugboard #108 fix). When non-nil, the record's Logs field is
// populated from the buffer's snapshot — invocation-local, no
// cross-contamination. When nil (legacy callers that haven't been
// updated), falls back to the HostFunctions singleton via the
// GetLogs() interface check — same behavior as pre-#108.
func (e *Engine) logInvocation(ctx context.Context, fn *Function, invCtx *InvocationContext, logBuf *LogBuffer, startTime time.Time, outputSize int, status InvocationStatus, err error) {
if e.invocationLogger == nil || !e.config.LogInvocations { if e.invocationLogger == nil || !e.config.LogInvocations {
return return
} }
@ -563,8 +580,15 @@ func (e *Engine) logInvocation(ctx context.Context, fn *Function, invCtx *Invoca
record.ErrorMessage = err.Error() record.ErrorMessage = err.Error()
} }
// Collect logs from host services if supported // Collect logs: prefer the per-invocation LogBuffer (bugboard #108),
if hf, ok := e.hostServices.(interface{ GetLogs() []LogEntry }); ok { // fall back to the legacy singleton for callers that haven't been
// migrated yet. The singleton path was the source of the
// cross-contamination bug; once every Execute path passes a real
// buffer here, the GetLogs() singleton read is dead code that
// can be removed in a future cleanup.
if logBuf != nil {
record.Logs = logBuf.Snapshot()
} else if hf, ok := e.hostServices.(interface{ GetLogs() []LogEntry }); ok {
record.Logs = hf.GetLogs() record.Logs = hf.GetLogs()
} }

View File

@ -9,16 +9,27 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
// LogInfo logs an info message. // LogInfo logs an info message. Writes to the per-invocation LogBuffer
// attached to ctx (see log_buffer.go); falls back to the legacy
// HostFunctions singleton slice when no buffer is on ctx (test paths
// that haven't migrated).
//
// Bugboard #108 fix: previously this always wrote to the singleton
// `h.logs`, causing cross-contamination between concurrent invocations
// (push-fanout's invocation record captured rpc-router's log lines).
func (h *HostFunctions) LogInfo(ctx context.Context, message string) { func (h *HostFunctions) LogInfo(ctx context.Context, message string) {
h.logsLock.Lock() entry := serverless.LogEntry{
defer h.logsLock.Unlock()
h.logs = append(h.logs, serverless.LogEntry{
Level: "info", Level: "info",
Message: message, Message: message,
Timestamp: time.Now(), Timestamp: time.Now(),
}) }
if buf := serverless.LogBufferFromCtx(ctx); buf != nil {
buf.Append(entry)
} else {
h.logsLock.Lock()
h.logs = append(h.logs, entry)
h.logsLock.Unlock()
}
h.logger.Info(message, h.logger.Info(message,
zap.String("request_id", h.GetRequestID(ctx)), zap.String("request_id", h.GetRequestID(ctx)),
@ -26,16 +37,22 @@ func (h *HostFunctions) LogInfo(ctx context.Context, message string) {
) )
} }
// LogError logs an error message. // LogError logs an error message. See LogInfo for the per-invocation
// LogBuffer / singleton fallback semantics — same code path, same
// bugboard #108 rationale.
func (h *HostFunctions) LogError(ctx context.Context, message string) { func (h *HostFunctions) LogError(ctx context.Context, message string) {
h.logsLock.Lock() entry := serverless.LogEntry{
defer h.logsLock.Unlock()
h.logs = append(h.logs, serverless.LogEntry{
Level: "error", Level: "error",
Message: message, Message: message,
Timestamp: time.Now(), Timestamp: time.Now(),
}) }
if buf := serverless.LogBufferFromCtx(ctx); buf != nil {
buf.Append(entry)
} else {
h.logsLock.Lock()
h.logs = append(h.logs, entry)
h.logsLock.Unlock()
}
h.logger.Error(message, h.logger.Error(message,
zap.String("request_id", h.GetRequestID(ctx)), zap.String("request_id", h.GetRequestID(ctx)),

View File

@ -0,0 +1,140 @@
package hostfunctions
import (
"context"
"sync"
"sync/atomic"
"testing"
"github.com/DeBrosOfficial/network/pkg/serverless"
"go.uber.org/zap"
)
// TestLogInfo_writesToCtxBuffer is the regression guard for bugboard
// #108. When the caller attaches a per-invocation LogBuffer to ctx,
// LogInfo MUST write to that buffer (not to the singleton h.logs).
//
// Pre-fix, LogInfo always wrote to h.logs, causing cross-contamination
// between concurrent invocations.
func TestLogInfo_writesToCtxBuffer(t *testing.T) {
h := &HostFunctions{logger: zap.NewNop()}
buf := serverless.NewLogBuffer()
ctx := serverless.WithLogBuffer(context.Background(), buf)
h.LogInfo(ctx, "hello from invocation A")
h.LogError(ctx, "boom from invocation A")
snap := buf.Snapshot()
if len(snap) != 2 {
t.Fatalf("ctx buffer len = %d; want 2", len(snap))
}
if snap[0].Level != "info" || snap[0].Message != "hello from invocation A" {
t.Errorf("info entry wrong: %+v", snap[0])
}
if snap[1].Level != "error" || snap[1].Message != "boom from invocation A" {
t.Errorf("error entry wrong: %+v", snap[1])
}
// The singleton must NOT have been touched.
if len(h.logs) != 0 {
t.Errorf("singleton h.logs got %d entries; want 0 (ctx buffer should have absorbed them)",
len(h.logs))
}
}
// TestLogInfo_fallsBackToSingletonWhenNoBuffer preserves the legacy
// behavior for callers (tests, mostly) that haven't migrated to the
// ctx-attached buffer path yet. Without this fallback, every test that
// constructed a HostFunctions directly and called LogInfo without
// wrapping ctx would silently lose log entries.
func TestLogInfo_fallsBackToSingletonWhenNoBuffer(t *testing.T) {
h := &HostFunctions{logger: zap.NewNop()}
// No buffer attached to ctx.
h.LogInfo(context.Background(), "legacy call")
h.LogError(context.Background(), "legacy error")
if len(h.logs) != 2 {
t.Errorf("singleton h.logs got %d entries; want 2 (legacy fallback)", len(h.logs))
}
}
// TestLogInfo_concurrentInvocations_noCrossContamination is THE
// regression guard for bugboard #108's empirically-observed symptom:
// push-fanout's invocation record contained log lines from rpc-router
// because both shared the singleton h.logs slice.
//
// Sixteen goroutines simulating concurrent invocations each attach
// their own LogBuffer to ctx, then write distinguishable entries via
// HostFunctions.LogInfo. After all goroutines complete, each buffer
// must contain ONLY its own entries — zero cross-talk.
//
// Run with -race for stronger signal. Pre-fix (singleton h.logs), every
// goroutine wrote into the shared slice and a different goroutine's
// GetLogs() snapshot would scoop them up.
func TestLogInfo_concurrentInvocations_noCrossContamination(t *testing.T) {
h := &HostFunctions{logger: zap.NewNop()}
const (
goroutines = 16
opsPerG = 50
)
var (
wg sync.WaitGroup
failures int64
)
for g := 0; g < goroutines; g++ {
wg.Add(1)
go func(gid int) {
defer wg.Done()
buf := serverless.NewLogBuffer()
ctx := serverless.WithLogBuffer(context.Background(), buf)
myMarker := workloadMarker(gid)
for op := 0; op < opsPerG; op++ {
h.LogInfo(ctx, myMarker)
}
snap := buf.Snapshot()
if len(snap) != opsPerG {
atomic.AddInt64(&failures, 1)
t.Errorf("goroutine %d: snapshot len = %d; want %d", gid, len(snap), opsPerG)
return
}
for _, e := range snap {
if e.Message != myMarker {
atomic.AddInt64(&failures, 1)
t.Errorf("goroutine %d: foreign entry %q in own buffer", gid, e.Message)
return
}
}
}(g)
}
wg.Wait()
if atomic.LoadInt64(&failures) != 0 {
t.Fatalf("%d cross-contamination failures across %d concurrent invocations",
atomic.LoadInt64(&failures), goroutines)
}
// Singleton must NOT have grown — every write went to a ctx buffer.
if len(h.logs) != 0 {
t.Errorf("singleton h.logs got %d entries; want 0 (all should have gone to ctx buffers)",
len(h.logs))
}
}
func workloadMarker(g int) string {
return "workload-" + itoaHF(g)
}
func itoaHF(n int) string {
if n == 0 {
return "0"
}
digits := []byte{}
for n > 0 {
digits = append([]byte{byte('0' + n%10)}, digits...)
n /= 10
}
return string(digits)
}

View File

@ -0,0 +1,96 @@
package serverless
import (
"context"
"sync"
)
// logBufferKey is the unexported context-value key used to attach a
// per-invocation LogBuffer. Empty struct = standard Go pattern for ctx
// keys (avoids string-collision risk). Parallels invCtxKey used by
// WithInvocationContext — both fix the same class of singleton-state
// cross-contamination bug.
type logBufferKey struct{}
// LogBuffer collects WASM-emitted log entries (oh.LogInfo / oh.LogError)
// for ONE invocation. Each Engine.Execute creates a fresh LogBuffer and
// attaches it to the ctx passed to wazero; host functions extract it
// from ctx and append. Engine.logInvocation reads the buffer's snapshot
// when writing the invocation record.
//
// Why this exists: HostFunctions used to hold a singleton `logs` slice
// shared across every concurrent WASM invocation, with a per-call reset
// in SetInvocationContext. Two invocations executing concurrently would
// see each other's logs scooped up by whichever called GetLogs() first
// — empirically observed on bugboard #108 (push-fanout's invocation
// record contained rpc-router and message-push-handler log lines).
//
// The fix attaches a fresh LogBuffer to ctx per invocation. HostFunctions.
// LogInfo / LogError read the buffer from ctx and append to its
// invocation-local slice. The singleton h.logs field is kept as a
// back-compat fallback for tests that haven't been migrated, but no
// production code path relies on it once Engine.Execute is routing
// through the ctx buffer.
type LogBuffer struct {
mu sync.Mutex
entries []LogEntry
}
// NewLogBuffer returns an empty buffer ready to receive entries.
func NewLogBuffer() *LogBuffer {
return &LogBuffer{}
}
// Append adds one log entry. Thread-safe — wazero modules aren't
// goroutine-safe in practice, but the lock makes the invariant explicit
// rather than relying on call-site discipline.
func (b *LogBuffer) Append(entry LogEntry) {
b.mu.Lock()
defer b.mu.Unlock()
b.entries = append(b.entries, entry)
}
// Snapshot returns a defensive copy of the buffer's entries. Callers
// (e.g. Engine.logInvocation) iterate the snapshot without holding the
// buffer's lock.
func (b *LogBuffer) Snapshot() []LogEntry {
b.mu.Lock()
defer b.mu.Unlock()
out := make([]LogEntry, len(b.entries))
copy(out, b.entries)
return out
}
// Len returns the number of buffered entries — used in tests to assert
// per-invocation accounting without making a full copy.
func (b *LogBuffer) Len() int {
b.mu.Lock()
defer b.mu.Unlock()
return len(b.entries)
}
// WithLogBuffer returns a derived ctx that carries buf. HostFunctions.
// LogInfo / LogError check ctx FIRST and only fall back to the
// HostFunctions singleton slice if no buffer is attached.
//
// Callers MUST create a fresh LogBuffer per invocation (NewLogBuffer)
// rather than reusing one across calls — that's the whole point of the
// fix. Reusing a buffer would re-create the cross-contamination class.
func WithLogBuffer(ctx context.Context, buf *LogBuffer) context.Context {
if buf == nil {
return ctx
}
return context.WithValue(ctx, logBufferKey{}, buf)
}
// LogBufferFromCtx extracts the LogBuffer attached via WithLogBuffer, or
// nil if none is present (in which case callers fall back to the legacy
// singleton h.logs path). Exported so hostfunctions can retrieve the
// buffer without re-importing the key type.
func LogBufferFromCtx(ctx context.Context) *LogBuffer {
if ctx == nil {
return nil
}
v, _ := ctx.Value(logBufferKey{}).(*LogBuffer)
return v
}

View File

@ -0,0 +1,190 @@
package serverless
import (
"context"
"sync"
"sync/atomic"
"testing"
)
// TestLogBuffer_appendAndSnapshot verifies the basic Append → Snapshot
// roundtrip. The snapshot must be a defensive copy so mutating it
// doesn't corrupt the buffer's internal state.
func TestLogBuffer_appendAndSnapshot(t *testing.T) {
b := NewLogBuffer()
b.Append(LogEntry{Level: "info", Message: "hello"})
b.Append(LogEntry{Level: "error", Message: "boom"})
snap := b.Snapshot()
if len(snap) != 2 {
t.Fatalf("snapshot len = %d; want 2", len(snap))
}
if snap[0].Message != "hello" || snap[1].Message != "boom" {
t.Errorf("snapshot order wrong: %+v", snap)
}
// Mutate the snapshot — buffer must be unaffected.
snap[0].Message = "MUTATED"
freshSnap := b.Snapshot()
if freshSnap[0].Message != "hello" {
t.Errorf("snapshot must be defensive copy; buffer was mutated: %+v", freshSnap)
}
}
// TestWithLogBuffer_extractsAttachedBuffer is the basic ctx-attachment
// round-trip. Anything more sophisticated (cross-call propagation) is
// validated end-to-end in the host-functions tests.
func TestWithLogBuffer_extractsAttachedBuffer(t *testing.T) {
b := NewLogBuffer()
ctx := WithLogBuffer(context.Background(), b)
got := LogBufferFromCtx(ctx)
if got != b {
t.Errorf("LogBufferFromCtx returned %p; want %p", got, b)
}
}
// TestWithLogBuffer_nilIsNoop guards the contract that passing nil
// returns ctx unchanged. Important because the call site in Engine.Execute
// always passes a non-nil buffer, but tests and back-compat callers
// might pass nil and expect ctx untouched (and LogBufferFromCtx to
// return nil so logging falls back to the singleton).
func TestWithLogBuffer_nilIsNoop(t *testing.T) {
ctx := WithLogBuffer(context.Background(), nil)
if got := LogBufferFromCtx(ctx); got != nil {
t.Errorf("LogBufferFromCtx after WithLogBuffer(nil) = %p; want nil", got)
}
}
// TestLogBufferFromCtx_nilCtxIsSafe — defensive guard. ctx-key lookup
// on a nil ctx panics if not handled.
func TestLogBufferFromCtx_nilCtxIsSafe(t *testing.T) {
if got := LogBufferFromCtx(nil); got != nil {
t.Errorf("LogBufferFromCtx(nil) = %p; want nil", got)
}
}
// TestLogBuffer_concurrentAppendIsSafe stresses the lock contract. The
// bug we're fixing (bugboard #108) was about state being shared across
// goroutines without locking — this test asserts the FIX doesn't
// reintroduce a different race in its own internal state.
//
// Run with -race for stronger signal. Without the mutex inside Append,
// the race detector would flag this.
func TestLogBuffer_concurrentAppendIsSafe(t *testing.T) {
b := NewLogBuffer()
const (
writers = 16
writesPerW = 100
)
var wg sync.WaitGroup
for w := 0; w < writers; w++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for n := 0; n < writesPerW; n++ {
b.Append(LogEntry{Level: "info", Message: "x"})
}
}(w)
}
wg.Wait()
got := b.Len()
want := writers * writesPerW
if got != want {
t.Errorf("Len after concurrent writes = %d; want %d (lost writes — race)", got, want)
}
}
// TestLogBuffer_concurrentInvocationsDoNotCrossContaminate is the
// REGRESSION GUARD for bugboard #108. Two goroutines simulating
// concurrent invocations each create their OWN LogBuffer attached to
// their OWN ctx. They append distinguishable entries. The snapshots
// MUST be cleanly separated — no entry from goroutine A ever ends up
// in goroutine B's buffer.
//
// Pre-fix, this kind of cross-contamination was the empirically-observed
// symptom: push-fanout's invocation record contained log lines from
// rpc-router because both shared the singleton h.logs slice. This test
// codifies the invariant that with per-invocation buffers, that class
// of cross-talk is impossible.
func TestLogBuffer_concurrentInvocationsDoNotCrossContaminate(t *testing.T) {
const (
goroutines = 16
opsPerG = 50
)
var (
wg sync.WaitGroup
failures int64
)
for g := 0; g < goroutines; g++ {
wg.Add(1)
go func(gid int) {
defer wg.Done()
// Each goroutine simulates one invocation: fresh buffer +
// fresh ctx, writes its own ID into each entry.
buf := NewLogBuffer()
ctx := WithLogBuffer(context.Background(), buf)
myID := goroutineMarker(gid)
for op := 0; op < opsPerG; op++ {
// Pull buffer from ctx (mimics what host.LogInfo does)
// and append. If a different goroutine's buffer somehow
// got attached to this ctx, the entries land in the
// wrong buffer and we detect it post-hoc.
cur := LogBufferFromCtx(ctx)
if cur != buf {
atomic.AddInt64(&failures, 1)
t.Errorf("goroutine %d: LogBufferFromCtx returned a different buffer", gid)
return
}
cur.Append(LogEntry{Level: "info", Message: myID})
}
// Verify the snapshot is entirely this goroutine's entries —
// no cross-talk. (Length AND content check.)
snap := buf.Snapshot()
if len(snap) != opsPerG {
atomic.AddInt64(&failures, 1)
t.Errorf("goroutine %d: snapshot len = %d; want %d (cross-contamination)",
gid, len(snap), opsPerG)
return
}
for _, e := range snap {
if e.Message != myID {
atomic.AddInt64(&failures, 1)
t.Errorf("goroutine %d: snapshot contains foreign entry %q (want all %q)",
gid, e.Message, myID)
return
}
}
}(g)
}
wg.Wait()
if atomic.LoadInt64(&failures) != 0 {
t.Fatalf("%d cross-contamination failures across %d concurrent invocations",
atomic.LoadInt64(&failures), goroutines)
}
}
// goroutineMarker is a deterministic per-goroutine message that
// uniquely identifies which goroutine wrote a log entry. Used by the
// cross-contamination test to verify the entry came from the right
// invocation.
func goroutineMarker(g int) string {
return "goroutine-" + itoaLB(g)
}
// itoaLB avoids strconv to keep the test file's deps minimal.
func itoaLB(n int) string {
if n == 0 {
return "0"
}
digits := []byte{}
for n > 0 {
digits = append([]byte{byte('0' + n%10)}, digits...)
n /= 10
}
return string(digits)
}

View File

@ -180,10 +180,18 @@ func (i *Instance) withInvCtx(ctx context.Context) context.Context {
i.invCtxMu.RLock() i.invCtxMu.RLock()
cur := i.invCtx cur := i.invCtx
i.invCtxMu.RUnlock() i.invCtxMu.RUnlock()
if cur == nil { if cur != nil {
return ctx ctx = serverless.WithInvocationContext(ctx, cur)
} }
return serverless.WithInvocationContext(ctx, cur) // Attach a fresh per-call LogBuffer so oh.LogInfo / oh.LogError from
// inside this ws_open / ws_frame / ws_close call write to a
// scoped slice instead of the HostFunctions singleton (bugboard
// #108 fix). Persistent WS doesn't currently persist these logs to
// function_logs (no logInvocation for persistent frames), so the
// buffer is discarded when the call returns — the point is to
// avoid leaking entries into the singleton where a concurrent
// stateless Execute would otherwise see them.
return serverless.WithLogBuffer(ctx, serverless.NewLogBuffer())
} }
// UpdateInvocationContext atomically swaps the per-instance invocation // UpdateInvocationContext atomically swaps the per-instance invocation

View File

@ -38,6 +38,14 @@ type CronScheduler struct {
// NewCronScheduler builds a scheduler. Reasonable defaults: poll every // NewCronScheduler builds a scheduler. Reasonable defaults: poll every
// 30 seconds, dispatch up to 100 triggers per tick. // 30 seconds, dispatch up to 100 triggers per tick.
//
// Sub-second pollInterval is permitted (down to the engine config's
// MinCronPollInterval) for typing/presence-style ephemeral state prune
// workloads — see bugboard #109. Each tick costs ~1 rqlite ListDue
// + ~2 MarkRun writes per dispatched trigger (per-call ~340-450ms on
// a cross-region cluster), so picking faster than that on average
// queues ticks. Logged as a warning when the operator goes below 1s
// so the trade-off is visible.
func NewCronScheduler( func NewCronScheduler(
store *CronTriggerStore, store *CronTriggerStore,
invoker CronInvoker, invoker CronInvoker,
@ -47,6 +55,10 @@ func NewCronScheduler(
if pollInterval <= 0 { if pollInterval <= 0 {
pollInterval = 30 * time.Second pollInterval = 30 * time.Second
} }
if pollInterval < time.Second {
logger.Warn("cron scheduler: sub-second poll interval; ensure per-tick rqlite cost is bounded or scheduler will queue ticks indefinitely (bugboard #109)",
zap.Duration("poll_interval", pollInterval))
}
return &CronScheduler{ return &CronScheduler{
store: store, store: store,
invoker: invoker, invoker: invoker,

View File

@ -0,0 +1,80 @@
package triggers
import (
"testing"
"time"
)
// TestParseCron_everySecond is the regression guard for bugboard #109's
// canonical use case: `*/1 * * * * *` (6-field, "every second"). The
// parser already supports 6-field expressions with seconds — this test
// pins that behavior so a future refactor of the 6-field branch can't
// silently break the ephemeral-state prune workload.
func TestParseCron_everySecond(t *testing.T) {
c, err := ParseCron("*/1 * * * * *")
if err != nil {
t.Fatalf("ParseCron: %v", err)
}
if !c.hasSeconds {
t.Error("hasSeconds = false; want true for 6-field expression")
}
for s := 0; s < 60; s++ {
if !c.seconds.match(s) {
t.Errorf("seconds.match(%d) = false; want true for `*/1` (every second)", s)
}
}
}
// TestNext_everySecond verifies that `*/1 * * * * *` advances by
// exactly one second on each Next() call. If the cron scheduler is
// ticking every 1s and the expression matches every second, the
// dispatched next_run_at MUST land on the next whole second — not a
// minute later (which would defeat sub-second cron entirely).
func TestNext_everySecond(t *testing.T) {
c, err := ParseCron("*/1 * * * * *")
if err != nil {
t.Fatalf("ParseCron: %v", err)
}
start := time.Date(2026, 5, 21, 13, 14, 15, 0, time.UTC)
got, err := c.Next(start)
if err != nil {
t.Fatalf("Next: %v", err)
}
want := time.Date(2026, 5, 21, 13, 14, 16, 0, time.UTC)
if !got.Equal(want) {
t.Errorf("Next(%s) = %s; want %s (every-second cron should advance 1s)",
start.Format(time.RFC3339), got.Format(time.RFC3339), want.Format(time.RFC3339))
}
// And the next one is +1s from that.
got2, _ := c.Next(got)
want2 := want.Add(time.Second)
if !got2.Equal(want2) {
t.Errorf("Next(%s) = %s; want %s", got.Format(time.RFC3339),
got2.Format(time.RFC3339), want2.Format(time.RFC3339))
}
}
// TestParseCron_subSecondStep_validation covers a few practical
// sub-second-style expressions the operator might try, ensuring the
// parser rejects nothing legitimate. Negative coverage in the existing
// cron_parser_test.go for invalid expressions.
func TestParseCron_subSecondStep_validation(t *testing.T) {
cases := []struct {
expr string
want bool // true = should parse OK
}{
{"*/1 * * * * *", true}, // every second
{"*/5 * * * * *", true}, // every 5s
{"*/30 * * * * *", true}, // every 30s (already tested in cron_parser_test.go)
{"0 * * * * *", true}, // at second 0 of every minute (= once a minute, 6-field)
{"*/2 */1 * * * *", true},
{"*/1 * * * *", true}, // 5-field: every minute (NOT every second — different schedule!)
}
for _, tc := range cases {
_, err := ParseCron(tc.expr)
if (err == nil) != tc.want {
t.Errorf("ParseCron(%q): err=%v; want parseable=%v", tc.expr, err, tc.want)
}
}
}