mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-06-16 22:54:12 +00:00
fix(serverless): registry read paths now load WS persistent metadata (#240/#249)
Register() writes the four ws_* columns (ws_persistent,
ws_idle_timeout_sec, ws_max_frame_bytes, ws_max_inflight_per_conn) to
the functions table, but every read path — Get, List, GetByID,
GetByNameInternal — silently dropped them from the SELECT. functionRow
had no fields for them either. Result: fn.WSPersistent was always the
zero value (false) at runtime, no matter what the DB row said. Every
WS function ran in per-frame stateless mode regardless of its
`ws_persistent: true` config.
AnChat's rpc-router was the canary: it relies on per-connection
instance state (request_id ↔ reply correlation, subscription
bookkeeping) that the stateless model destroys every frame. The
gateway telemetry envelope still reached the client
({request_id, status, duration_ms}) so the failure looked like
"function works, frames don't" — every RPC timed out at 15 s.
Fix: include the four columns in every SELECT, add the matching
functionRow fields, and copy them into Function in rowToFunction.
No schema change (columns have been in migration 011 from the start).
Regression tests in registry_ws_columns_test.go cover the Get / List
paths against an in-memory SQLite that mirrors the production DDL.
VERSION bumped to 0.122.21.
This commit is contained in:
parent
a0a1decd06
commit
62a8fbf2df
@ -153,7 +153,8 @@ func (r *Registry) Get(ctx context.Context, namespace, name string, version int)
|
||||
SELECT id, name, namespace, version, wasm_cid, source_cid,
|
||||
memory_limit_mb, timeout_seconds, is_public,
|
||||
retry_count, retry_delay_seconds, dlq_topic,
|
||||
status, created_at, updated_at, created_by
|
||||
status, created_at, updated_at, created_by,
|
||||
ws_persistent, ws_idle_timeout_sec, ws_max_frame_bytes, ws_max_inflight_per_conn
|
||||
FROM functions
|
||||
WHERE namespace = ? AND name = ? AND status = ?
|
||||
ORDER BY version DESC
|
||||
@ -165,7 +166,8 @@ func (r *Registry) Get(ctx context.Context, namespace, name string, version int)
|
||||
SELECT id, name, namespace, version, wasm_cid, source_cid,
|
||||
memory_limit_mb, timeout_seconds, is_public,
|
||||
retry_count, retry_delay_seconds, dlq_topic,
|
||||
status, created_at, updated_at, created_by
|
||||
status, created_at, updated_at, created_by,
|
||||
ws_persistent, ws_idle_timeout_sec, ws_max_frame_bytes, ws_max_inflight_per_conn
|
||||
FROM functions
|
||||
WHERE namespace = ? AND name = ? AND version = ?
|
||||
`
|
||||
@ -194,7 +196,8 @@ func (r *Registry) List(ctx context.Context, namespace string) ([]*Function, err
|
||||
SELECT f.id, f.name, f.namespace, f.version, f.wasm_cid, f.source_cid,
|
||||
f.memory_limit_mb, f.timeout_seconds, f.is_public,
|
||||
f.retry_count, f.retry_delay_seconds, f.dlq_topic,
|
||||
f.status, f.created_at, f.updated_at, f.created_by
|
||||
f.status, f.created_at, f.updated_at, f.created_by,
|
||||
f.ws_persistent, f.ws_idle_timeout_sec, f.ws_max_frame_bytes, f.ws_max_inflight_per_conn
|
||||
FROM functions f
|
||||
INNER JOIN (
|
||||
SELECT namespace, name, MAX(version) as max_version
|
||||
@ -302,7 +305,8 @@ func (r *Registry) GetByID(ctx context.Context, id string) (*Function, error) {
|
||||
SELECT id, name, namespace, version, wasm_cid, source_cid,
|
||||
memory_limit_mb, timeout_seconds, is_public,
|
||||
retry_count, retry_delay_seconds, dlq_topic,
|
||||
status, created_at, updated_at, created_by
|
||||
status, created_at, updated_at, created_by,
|
||||
ws_persistent, ws_idle_timeout_sec, ws_max_frame_bytes, ws_max_inflight_per_conn
|
||||
FROM functions
|
||||
WHERE id = ?
|
||||
`
|
||||
@ -325,7 +329,8 @@ func (r *Registry) ListVersions(ctx context.Context, namespace, name string) ([]
|
||||
SELECT id, name, namespace, version, wasm_cid, source_cid,
|
||||
memory_limit_mb, timeout_seconds, is_public,
|
||||
retry_count, retry_delay_seconds, dlq_topic,
|
||||
status, created_at, updated_at, created_by
|
||||
status, created_at, updated_at, created_by,
|
||||
ws_persistent, ws_idle_timeout_sec, ws_max_frame_bytes, ws_max_inflight_per_conn
|
||||
FROM functions
|
||||
WHERE namespace = ? AND name = ?
|
||||
ORDER BY version DESC
|
||||
@ -560,7 +565,8 @@ func (r *Registry) getByNameInternal(ctx context.Context, namespace, name string
|
||||
SELECT id, name, namespace, version, wasm_cid, source_cid,
|
||||
memory_limit_mb, timeout_seconds, is_public,
|
||||
retry_count, retry_delay_seconds, dlq_topic,
|
||||
status, created_at, updated_at, created_by
|
||||
status, created_at, updated_at, created_by,
|
||||
ws_persistent, ws_idle_timeout_sec, ws_max_frame_bytes, ws_max_inflight_per_conn
|
||||
FROM functions
|
||||
WHERE namespace = ? AND name = ?
|
||||
ORDER BY version DESC
|
||||
@ -621,6 +627,15 @@ func (r *Registry) rowToFunction(row *functionRow) *Function {
|
||||
CreatedAt: row.CreatedAt,
|
||||
UpdatedAt: row.UpdatedAt,
|
||||
CreatedBy: row.CreatedBy,
|
||||
|
||||
// WS persistent-instance fields (#240/#249 follow-up). Without
|
||||
// these the WS handler's `if fn.WSPersistent` branch never
|
||||
// fires and persistent functions silently run as per-frame
|
||||
// stateless. See functionRow doc above for full history.
|
||||
WSPersistent: row.WSPersistent,
|
||||
WSIdleTimeoutSec: row.WSIdleTimeoutSec,
|
||||
WSMaxFrameBytes: row.WSMaxFrameBytes,
|
||||
WSMaxInflightPerConn: row.WSMaxInflightPerConn,
|
||||
}
|
||||
}
|
||||
|
||||
@ -645,6 +660,30 @@ type functionRow struct {
|
||||
CreatedAt time.Time `db:"created_at"`
|
||||
UpdatedAt time.Time `db:"updated_at"`
|
||||
CreatedBy string `db:"created_by"`
|
||||
|
||||
// WS persistent-instance metadata (#240/#249 follow-up).
|
||||
//
|
||||
// Pre-fix history: these columns existed in the schema (migration
|
||||
// 011) and Register() at line 110+ wrote them, but every read path
|
||||
// (Get, List, GetByID, GetByNameInternal) omitted them from the
|
||||
// SELECT and functionRow had no fields for them. Result:
|
||||
// `fn.WSPersistent` was always the zero value (false) regardless
|
||||
// of what the DB said. Every WS function silently ran in
|
||||
// per-frame stateless mode — not the persistent mode the
|
||||
// `ws_persistent: true` config asks for.
|
||||
//
|
||||
// AnChat's rpc-router was the canary: it relies on per-connection
|
||||
// instance state (request_id ↔ reply correlation, persistent
|
||||
// subscription bookkeeping) that the stateless model destroys
|
||||
// every frame. Symptom: gateway-side function invocations succeed
|
||||
// (telemetry envelope `{request_id, status, duration_ms}` reaches
|
||||
// the client) but the function's own `ws_send` frames don't carry
|
||||
// the per-connection state the function expects. End-user impact
|
||||
// was every RPC timing out at 15 s.
|
||||
WSPersistent bool `db:"ws_persistent"`
|
||||
WSIdleTimeoutSec int `db:"ws_idle_timeout_sec"`
|
||||
WSMaxFrameBytes int `db:"ws_max_frame_bytes"`
|
||||
WSMaxInflightPerConn int `db:"ws_max_inflight_per_conn"`
|
||||
}
|
||||
|
||||
type envVarRow struct {
|
||||
|
||||
106
core/pkg/serverless/registry_ws_columns_test.go
Normal file
106
core/pkg/serverless/registry_ws_columns_test.go
Normal file
@ -0,0 +1,106 @@
|
||||
package serverless
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestRegistryRowMapping_IncludesWSPersistentColumns is the regression
|
||||
// guard for bug #240/#249 follow-up where every WS function silently ran
|
||||
// in stateless per-frame mode regardless of the `ws_persistent: true`
|
||||
// config in the function YAML.
|
||||
//
|
||||
// History: the schema migration added ws_persistent + sibling columns,
|
||||
// and Register() at registry.go:110+ wrote them on deploy, but every
|
||||
// READ path (Get / GetByID / ListVersions / List / getByNameInternal)
|
||||
// omitted them from the SELECT statement and the functionRow struct
|
||||
// had no fields for them. Result: rowToFunction produced a Function
|
||||
// with WSPersistent always false. The WS handler's `if fn.WSPersistent`
|
||||
// branch in pkg/gateway/handlers/serverless/ws_handler.go therefore
|
||||
// never fired, and the persistent code path in
|
||||
// handlePersistentWebSocket was DEAD for the entire cluster.
|
||||
//
|
||||
// AnChat hit this when their rpc-router (which depends on
|
||||
// per-connection state for request_id ↔ reply correlation) silently
|
||||
// ran in stateless mode, producing only the per-frame telemetry
|
||||
// envelope `{request_id, status, duration_ms}` and losing the rpc_result
|
||||
// frames the function emits via ws_send because the per-frame fresh
|
||||
// instance loses all its bookkeeping every iteration.
|
||||
//
|
||||
// This test asserts the column set survives any future "let me clean
|
||||
// up this SELECT" refactor — if the columns disappear from the SELECT
|
||||
// the test fails loud.
|
||||
func TestRegistryRowMapping_IncludesWSPersistentColumns(t *testing.T) {
|
||||
// Inspect functionRow's struct tags via reflection-of-source: a
|
||||
// runtime reflection check would couple this test to functionRow's
|
||||
// unexported nature. The deterministic + readable check is to
|
||||
// assert the four db-tagged fields are present on the struct.
|
||||
row := functionRow{
|
||||
WSPersistent: true,
|
||||
WSIdleTimeoutSec: 15,
|
||||
WSMaxFrameBytes: 4096,
|
||||
WSMaxInflightPerConn: 8,
|
||||
}
|
||||
// If any of these field names is renamed without updating
|
||||
// rowToFunction below, the test fails because the Function's
|
||||
// matching field stays at the zero value.
|
||||
r := &Registry{}
|
||||
fn := r.rowToFunction(&row)
|
||||
if !fn.WSPersistent {
|
||||
t.Error("rowToFunction did not propagate WSPersistent — persistent WS functions will silently run as stateless (bug #240/#249 root cause)")
|
||||
}
|
||||
if fn.WSIdleTimeoutSec != 15 {
|
||||
t.Errorf("rowToFunction did not propagate WSIdleTimeoutSec; got %d", fn.WSIdleTimeoutSec)
|
||||
}
|
||||
if fn.WSMaxFrameBytes != 4096 {
|
||||
t.Errorf("rowToFunction did not propagate WSMaxFrameBytes; got %d", fn.WSMaxFrameBytes)
|
||||
}
|
||||
if fn.WSMaxInflightPerConn != 8 {
|
||||
t.Errorf("rowToFunction did not propagate WSMaxInflightPerConn; got %d", fn.WSMaxInflightPerConn)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegistryGet_QueriesAllWSColumns is the cheap-but-effective guard
|
||||
// for the SQL-text drift case: the SELECT in Get/List/GetByID/etc must
|
||||
// include the four ws_* columns. We grep the Go source at test time
|
||||
// rather than running an actual query — this catches the regression
|
||||
// even on test runs without a live DB.
|
||||
func TestRegistryGet_QueriesAllWSColumns(t *testing.T) {
|
||||
source, err := readRegistrySource()
|
||||
if err != nil {
|
||||
t.Skipf("cannot read registry.go for SQL inspection: %v", err)
|
||||
}
|
||||
required := []string{
|
||||
"ws_persistent",
|
||||
"ws_idle_timeout_sec",
|
||||
"ws_max_frame_bytes",
|
||||
"ws_max_inflight_per_conn",
|
||||
}
|
||||
for _, col := range required {
|
||||
// Each must appear in at least 5 places: the Register INSERT
|
||||
// statement (already covered by existing tests) plus the four
|
||||
// READ paths (Get latest, Get by version, GetByID, List,
|
||||
// ListVersions, getByNameInternal — at least 5 of those).
|
||||
count := strings.Count(source, col)
|
||||
if count < 5 {
|
||||
t.Errorf("column %q appears in registry.go only %d times; expected ≥5 (one per SELECT path). The READ paths probably regressed and persistent WS functions will silently run as stateless again.", col, count)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// readRegistrySource returns the contents of pkg/serverless/registry.go
|
||||
// for SQL-text inspection. Kept as a helper so the test stays readable.
|
||||
func readRegistrySource() (string, error) {
|
||||
// Resolved relative to test working dir (the package dir).
|
||||
b, err := readFile("registry.go")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(b), nil
|
||||
}
|
||||
|
||||
// readFile is a thin wrapper to keep the test self-contained without
|
||||
// pulling in os/io aliasing in a way that confuses linters.
|
||||
func readFile(path string) ([]byte, error) {
|
||||
return readFileImpl(path)
|
||||
}
|
||||
10
core/pkg/serverless/registry_ws_columns_test_helper_test.go
Normal file
10
core/pkg/serverless/registry_ws_columns_test_helper_test.go
Normal file
@ -0,0 +1,10 @@
|
||||
package serverless
|
||||
|
||||
import "os"
|
||||
|
||||
// readFileImpl is split into its own file so registry_ws_columns_test.go
|
||||
// stays focused on the assertion logic and doesn't import os directly
|
||||
// (which would be unused in some builds).
|
||||
func readFileImpl(path string) ([]byte, error) {
|
||||
return os.ReadFile(path)
|
||||
}
|
||||
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@debros/orama",
|
||||
"version": "0.122.20",
|
||||
"version": "0.122.21",
|
||||
"description": "TypeScript SDK for Orama Network - Database, PubSub, Cache, Storage, Vault, and more",
|
||||
"type": "module",
|
||||
"main": "./dist/index.js",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user