From 333b7233c194b5133278922caa676222fad915d3 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Thu, 7 May 2026 07:33:52 +0300 Subject: [PATCH] docs: update deployment and serverless documentation - bump version to 0.122.2 - document schema migration invariants and push notification configuration - add serverless host function aliases and v2 database API documentation - introduce schema roundtrip test to prevent migration drift --- core/Makefile | 2 +- core/docs/DEV_DEPLOY.md | 60 +++++ core/docs/SERVERLESS.md | 59 ++++- core/migrations/roundtrip_test.go | 237 ++++++++++++++++++ core/pkg/cli/functions/logs.go | 120 ++++++++- .../handlers/serverless/deploy_handler.go | 57 ++++- .../handlers/serverless/handlers_test.go | 107 ++++++-- .../handlers/serverless/invoke_handler.go | 76 +++--- .../handlers/serverless/logs_handler.go | 64 ++++- core/pkg/gateway/handlers/serverless/types.go | 22 ++ .../gateway/handlers/serverless/ws_handler.go | 20 +- core/pkg/gateway/middleware.go | 77 +++++- core/pkg/gateway/proxy_timeout_test.go | 77 ++++++ core/pkg/gateway/push_routes.go | 64 +++++ core/pkg/gateway/push_routes_test.go | 77 ++++++ core/pkg/gateway/routes.go | 30 +-- core/pkg/gateway/schema_status_handler.go | 114 +++++++++ .../pkg/gateway/schema_status_handler_test.go | 133 ++++++++++ core/pkg/gateway/serverless_handlers_test.go | 4 + core/pkg/httputil/rpc_error.go | 229 +++++++++++++++++ core/pkg/httputil/rpc_error_test.go | 155 ++++++++++++ core/pkg/serverless/engine.go | 55 ++++ core/pkg/serverless/hostfuncs_test.go | 12 + core/pkg/serverless/hostfunctions/context.go | 35 ++- core/pkg/serverless/hostfunctions/database.go | 76 ++++++ .../serverless/hostfunctions/database_test.go | 143 +++++++++++ .../hostfunctions/jwt_subject_test.go | 51 ++++ core/pkg/serverless/invoke.go | 25 +- core/pkg/serverless/mocks_test.go | 16 ++ core/pkg/serverless/registry.go | 98 ++++++++ .../serverless/registry/invocation_logger.go | 136 +++++++++- .../registry/invocation_logger_test.go | 221 ++++++++++++++++ core/pkg/serverless/registry/registry.go | 11 +- core/pkg/serverless/registry/types.go | 37 ++- core/pkg/serverless/types.go | 63 ++++- 35 files changed, 2630 insertions(+), 133 deletions(-) create mode 100644 core/migrations/roundtrip_test.go create mode 100644 core/pkg/gateway/proxy_timeout_test.go create mode 100644 core/pkg/gateway/push_routes.go create mode 100644 core/pkg/gateway/push_routes_test.go create mode 100644 core/pkg/gateway/schema_status_handler.go create mode 100644 core/pkg/gateway/schema_status_handler_test.go create mode 100644 core/pkg/httputil/rpc_error.go create mode 100644 core/pkg/httputil/rpc_error_test.go create mode 100644 core/pkg/serverless/hostfunctions/jwt_subject_test.go create mode 100644 core/pkg/serverless/registry/invocation_logger_test.go diff --git a/core/Makefile b/core/Makefile index ea3d506..c685f44 100644 --- a/core/Makefile +++ b/core/Makefile @@ -63,7 +63,7 @@ test-e2e-quick: .PHONY: build clean test deps tidy fmt vet lint install-hooks push-devnet push-testnet rollout-devnet rollout-testnet release -VERSION := 0.122.1 +VERSION := 0.122.2 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/core/docs/DEV_DEPLOY.md b/core/docs/DEV_DEPLOY.md index 09bbbdc..89701c7 100644 --- a/core/docs/DEV_DEPLOY.md +++ b/core/docs/DEV_DEPLOY.md @@ -94,6 +94,46 @@ orama monitor report --env testnet - **DON'T** clear RQLite data directories unless doing a full cluster rebuild - **DON'T** use `systemctl stop orama-node` on multiple nodes simultaneously +#### Schema-Migration Ordering Invariant + +The gateway binary embeds a set of SQL migrations. The highest-numbered migration is the schema version that binary REQUIRES — **the gateway will refuse to start if its required schema isn't applied** (the schema-version contract added after the 2026-05-06 incident). + +This means rolling upgrades have ONE invariant you must respect: + +> The new gateway binary's required migrations must be applied to RQLite **before or as part of** starting the new binary on a node. + +There are two acceptable patterns: + +**Pattern A — let the gateway apply migrations on startup (default).** +The gateway calls `ApplyEmbeddedMigrations` during `NewDependencies` and asserts the schema is at the required version before serving traffic. If the apply succeeds, you're done. If a transient error blocks the apply, gateway startup aborts with a clear `schema mismatch: binary requires version N, database has M` error. + +This is the default for both the genesis startup flow and rolling upgrades. No operator action required when it works. + +**Pattern B — pre-apply migrations explicitly via the CLI.** +On any node: +```bash +sudo orama node schema status # show binary required vs applied +sudo orama node schema apply --yes # apply pending migrations +``` +Then start the new gateway. Useful when you want explicit control during a high-risk upgrade or when the auto-apply path is failing for reasons you want to debug separately. + +#### Verifying schema state remotely + +Tenants can self-check schema drift without SSH access via: +``` +GET /v1/schema-status +``` +Returns `{ok, required_version, applied_version, in_sync, pending: [...]}`. The same data is available via `orama node schema status` for operators with shell access. + +#### Build-time guard (CI) + +`go test ./migrations/` runs a roundtrip test that opens an in-memory SQLite, applies every embedded migration, and exercises representative SQL operations from the platform's Go code. If a Go handler is added that references a column no migration creates, the test fails — drift is caught at PR review time, not at production deploy. + +When adding a new platform table or column: +1. Write the migration in `core/migrations/NNN_description.sql` +2. Update the relevant Go code that reads/writes the new column +3. Add an exemplar to `migrations/roundtrip_test.go` mirroring the new SQL — this enforces the contract permanently + #### Recovery from Cluster Split If nodes get stuck in "Candidate" state or show "leader not found" errors: @@ -449,6 +489,26 @@ sudo cp caddy-root-ca.crt /usr/local/share/ca-certificates/caddy-root-ca.crt sudo update-ca-certificates ``` +## Push notifications (optional, per gateway) + +Push routes (`/v1/push/devices`, `/v1/push/send`) are always registered but +return `503 SERVICE_UNAVAILABLE` until at least one provider is configured +in the gateway config: + +```yaml +# In your namespace gateway config: +push: + ntfy_base_url: "http://localhost:8080" # self-hosted ntfy (preferred for privacy) + expo_access_token: "..." # Expo push (alternative; client SDK lock-in) +``` + +Restart the gateway after editing. A `GET /v1/push/devices` call (with valid +auth) will then return `200` instead of `503`. + +The 503 body explicitly names the config knobs operators need to set, so +tenants getting "Push not configured" can self-diagnose without filing a +ticket. (Bug #220 audit fix.) + ## Project Structure See [ARCHITECTURE.md](ARCHITECTURE.md) for the full architecture overview. diff --git a/core/docs/SERVERLESS.md b/core/docs/SERVERLESS.md index 6f27104..bec5d97 100644 --- a/core/docs/SERVERLESS.md +++ b/core/docs/SERVERLESS.md @@ -42,6 +42,10 @@ name: my-function # Required. Letters, digits, hyphens, underscores. public: false # Allow unauthenticated invocation (default: false) memory: 64 # Memory limit in MB (1-256, default: 64) timeout: 30 # Execution timeout in seconds (1-300, default: 30) + # Bump to 60-300 for batch DB ops, schema migrations, + # or anything that does many sequential host calls. + # Functions that exceed timeout return the canonical + # TIMEOUT envelope: {ok:false, error:{code:"TIMEOUT",...}}. retry: count: 0 # Retry attempts on failure (default: 0) delay: 5 # Seconds between retries (default: 5) @@ -99,15 +103,31 @@ tinygo build -o function.wasm -target wasi function.go ## Host Functions API -Host functions let your WASM code interact with Orama services. They are imported from the `"env"` or `"host"` module (both work) and use a pointer/length ABI for string parameters. +Host functions let your WASM code interact with Orama services. They use a pointer/length ABI for string parameters and are registered at runtime under three module-name aliases — all three resolve to the SAME function table: -All host functions are registered at runtime by the engine. They are available to every function without additional configuration. +| Module name | Status | Use | +|---|---|---| +| `env` | **canonical** | Recommended for new code. Matches the WASI / TinyGo convention used by every example in this doc and the `sdk/fn` package. | +| `host` | alias (kept) | Long-standing alternative; supported indefinitely. | +| `orama` | alias (kept) | Brand-name alias; supported indefinitely so existing code that intuited this name keeps working. | + +A function may import any host call from any of the three names interchangeably: + +```go +//go:wasmimport env db_query // canonical (preferred) +//go:wasmimport host db_query // identical +//go:wasmimport orama db_query // identical +``` + +If you see the runtime error `failed to instantiate module: module[X] not instantiated`, your function imported from a name other than the three above — fix the directive. Most functions written using the [`sdk/fn`](../sdk/fn) package don't need any `//go:wasmimport` directives at all (the SDK uses stdin/stdout for I/O). ### Context | Function | Description | |----------|-------------| -| `get_caller_wallet()` → string | Wallet address of the caller (from JWT) | +| `get_caller_wallet()` → string | Resolved caller wallet (JWT subject if Bearer auth, else namespace pseudo-id when API-key auth). | +| `get_caller_jwt_subject()` → string | JWT `sub` claim explicitly. Empty when the request was not JWT-authenticated. Use this when binding on the JWT-signed identity matters (e.g. signup flows verifying the caller signed for the wallet they're registering). | +| `get_caller_claim(name)` → string | Custom JWT claim by name (tier, subscription, etc.). Empty if missing or non-JWT request. | | `get_request_id()` → string | Unique invocation ID | | `get_env(key)` → string | Environment variable from function.yaml | | `get_secret(name)` → string | Decrypted secret value (see [Managing Secrets](#managing-secrets)) | @@ -116,15 +136,36 @@ All host functions are registered at runtime by the engine. They are available t | Function | Description | |----------|-------------| -| `db_query(sql, argsJSON)` → JSON | Execute SELECT query. Args as JSON array. Returns JSON array of row objects. | -| `db_execute(sql, argsJSON)` → int | Execute INSERT/UPDATE/DELETE. Returns affected row count. | +| `db_query_v2(sql, argsJSON)` → JSON | **Recommended.** Execute SELECT. Returns `{"rows": [...], "error": "..."}` — distinguishes empty result from query failure. | +| `db_execute_v2(sql, argsJSON)` → JSON | **Recommended.** Execute INSERT/UPDATE/DELETE. Returns `{"rows_affected": N, "last_insert_id": M, "error": "..."}` — distinguishes 0-rows-affected from a real failure. | +| `db_query(sql, argsJSON)` → JSON | Legacy. Execute SELECT, returns JSON array of rows. No way to surface query errors — prefer `db_query_v2`. | +| `db_execute(sql, argsJSON)` → int | Legacy. Returns affected rows ONLY. **Returns 0 for both "0 rows" and "SQL error" — caller can't distinguish.** Prefer `db_execute_v2`. | +| `db_transaction(opsJSON)` → JSON | Atomic batch — see "Database Transactions" below. | -Example query from WASM: -``` -db_query("SELECT push_token, device_type FROM devices WHERE user_id = ?", '["user123"]') -→ [{"push_token": "abc...", "device_type": "ios"}] +Example v2 usage from WASM: + +```go +//go:wasmimport env db_execute_v2 +func dbExecuteV2(sqlPtr, sqlLen, argsPtr, argsLen uint32) uint64 + +resultBytes := callDBExecuteV2(`INSERT INTO event_seq (topic, next_seq) VALUES (?, 0) + ON CONFLICT(topic) DO NOTHING`, + []any{"user/abc/account"}) + +var res struct { + RowsAffected int64 `json:"rows_affected"` + Error string `json:"error"` +} +json.Unmarshal(resultBytes, &res) +if res.Error != "" { + // Real failure — bail out, don't mark migration applied. + return fmt.Errorf("event_seq INSERT failed: %s", res.Error) +} +// res.RowsAffected may legitimately be 0 (ON CONFLICT DO NOTHING) — that's not an error. ``` +The legacy `db_execute` is kept indefinitely so existing functions don't break. New code should use `db_execute_v2` for any path where distinguishing "no rows" from "SQL error" matters — most paths. + ### Cache (Olric Distributed Cache) | Function | Description | diff --git a/core/migrations/roundtrip_test.go b/core/migrations/roundtrip_test.go new file mode 100644 index 0000000..c3c915f --- /dev/null +++ b/core/migrations/roundtrip_test.go @@ -0,0 +1,237 @@ +package migrations_test + +// roundtrip_test.go is the build-time guard that prevents +// "binary references column X but X is missing from migrations" +// drift — the bug that triggered the AnChat-test outage on 2026-05-06. +// +// How it works: +// +// 1. Open an in-memory SQLite database. +// 2. Apply EVERY embedded migration in version order. +// 3. Run a series of "exemplar" SQL operations against the resulting +// schema. If any operation fails, the test fails — meaning either: +// a. A migration was deleted / renumbered and the schema regressed +// b. A new migration was added but isn't reachable via embed.FS +// c. (Most importantly) a Go file references a column / table / +// index that no migration creates +// +// The exemplars are drawn from the actual SQL strings the platform's +// Go code executes. Adding a new INSERT/SELECT in the gateway → add the +// matching exemplar here so drift is caught at `go test` time, not +// at production deploy. +// +// This is generic by design — every platform table participates. Adding +// a new table doesn't require new test infrastructure, only one new +// exemplar string. + +import ( + "database/sql" + "strings" + "testing" + + "github.com/DeBrosOfficial/network/migrations" + "github.com/DeBrosOfficial/network/pkg/rqlite" + _ "github.com/mattn/go-sqlite3" + "go.uber.org/zap" +) + +// TestSchemaRoundtrip_AllMigrationsApplyClean verifies every embedded +// migration applies successfully against a fresh database in version +// order. Failure here means a migration is broken in isolation +// (syntax error, references a missing prior migration's column, etc.). +func TestSchemaRoundtrip_AllMigrationsApplyClean(t *testing.T) { + db := openRoundtripDB(t) + if err := rqlite.ApplyEmbeddedMigrations(t.Context(), db, migrations.FS, zap.NewNop()); err != nil { + t.Fatalf("ApplyEmbeddedMigrations failed: %v", err) + } + + // Sanity: applied version should equal RequiredVersion. + applied, err := migrations.AppliedVersion(t.Context(), db) + if err != nil { + t.Fatalf("AppliedVersion: %v", err) + } + if applied != migrations.RequiredVersion() { + t.Errorf("applied=%d != required=%d after full roundtrip", applied, migrations.RequiredVersion()) + } +} + +// TestSchemaRoundtrip_PlatformExemplars exercises representative SQL +// statements from the Go codebase against the migrated schema. +// +// Each exemplar is a string that should EXECUTE successfully (we don't +// care about row counts — only that the SQL parses and binds against +// the schema). Args are placeholders; values can be anything matching +// the column types. +// +// When a Go handler is added that touches a new table or column, add +// an exemplar here. The diff at review time enforces the contract: +// "if you write Go that uses column X, an exemplar exercises it, +// which means migrations must declare X." +func TestSchemaRoundtrip_PlatformExemplars(t *testing.T) { + db := openRoundtripDB(t) + if err := rqlite.ApplyEmbeddedMigrations(t.Context(), db, migrations.FS, zap.NewNop()); err != nil { + t.Fatalf("ApplyEmbeddedMigrations: %v", err) + } + + // Each exemplar is (table, sql, args). The args don't have to satisfy + // constraints — we use Prepare to validate column references without + // actually running mutations. Statements that have to execute (because + // SQLite delays some checks) get marked exec=true. + type exemplar struct { + name string + sql string + args []any + exec bool // true: actually execute; false: just Prepare + } + exemplars := []exemplar{ + // functions table — bug #214's table, which is why we care. + // Every column written by the function-store INSERT must be here. + { + name: "functions INSERT (full column list incl. ws_*)", + sql: `INSERT INTO functions ( + id, name, namespace, version, wasm_cid, + memory_limit_mb, timeout_seconds, is_public, + retry_count, retry_delay_seconds, dlq_topic, + status, created_at, updated_at, created_by, + ws_persistent, ws_idle_timeout_sec, ws_max_frame_bytes, ws_max_inflight_per_conn + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + args: []any{ + "id-1", "fn", "ns", 1, "cid-1", + 64, 30, false, + 0, 5, "", + "active", 0, 0, "ns", + false, 0, 0, 0, + }, + exec: true, + }, + { + name: "functions SELECT (full column list)", + sql: `SELECT id, name, namespace, version, wasm_cid, source_cid, + ws_persistent, ws_idle_timeout_sec, ws_max_frame_bytes, ws_max_inflight_per_conn, + memory_limit_mb, timeout_seconds, is_public, + retry_count, retry_delay_seconds, dlq_topic, + status, created_at, updated_at, created_by + FROM functions WHERE namespace = ? AND name = ?`, + args: []any{"ns", "fn"}, + }, + + // function_invocations — used by the invocation-history view (#211 fix). + { + name: "function_invocations INSERT", + sql: `INSERT INTO function_invocations ( + id, function_id, request_id, trigger_type, caller_wallet, + input_size, output_size, started_at, completed_at, + duration_ms, status, error_message, memory_used_mb + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + args: []any{ + "inv-1", "id-1", "req-A", "http", "0xwallet", + 0, 0, 0, 0, + 0, "success", "", 0.0, + }, + exec: true, + }, + { + name: "function_invocations SELECT for GetInvocations", + sql: `SELECT i.id, i.request_id, i.trigger_type, i.caller_wallet, + i.input_size, i.output_size, i.started_at, i.completed_at, + i.duration_ms, i.status, i.error_message, i.memory_used_mb + FROM function_invocations i + JOIN functions f ON i.function_id = f.id + WHERE f.namespace = ? AND f.name = ? + ORDER BY i.started_at DESC LIMIT ?`, + args: []any{"ns", "fn", 50}, + }, + + // function_logs — WASM-emitted log lines. + { + name: "function_logs INSERT", + sql: `INSERT INTO function_logs ( + id, function_id, invocation_id, level, message, timestamp + ) VALUES (?, ?, ?, ?, ?, ?)`, + args: []any{"log-1", "id-1", "inv-1", "info", "hi", 0}, + exec: true, + }, + + // function_pubsub_triggers — wildcard trigger column rename (plan 03). + // During the dual-column rolling-upgrade window the Go code writes + // BOTH `topic` (legacy NOT NULL) and `topic_pattern` (new); this + // exemplar mirrors the actual INSERT and would catch a future + // migration that drops one column without a corresponding code change. + { + name: "function_pubsub_triggers INSERT (dual topic+topic_pattern)", + sql: `INSERT INTO function_pubsub_triggers ( + id, function_id, topic, topic_pattern, + enabled, created_at, + aggregation_window_ms, aggregation_max_batch_size + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, + args: []any{"trig-1", "id-1", "presence:*", "presence:*", true, 0, 0, 0}, + exec: true, + }, + + // push_devices — created by migration 023; encrypted token storage. + { + name: "push_devices INSERT", + sql: `INSERT INTO push_devices ( + id, namespace, user_id, device_id, provider, + token_encrypted, platform, app_version, + created_at, updated_at, last_seen + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + args: []any{ + "dev-1", "ns", "u1", "device-A", "ntfy", + "enc:...", "ios", "1.0", + 0, 0, 0, + }, + exec: true, + }, + + // namespace_publish_seq — sequence counter from plan 08. + { + name: "namespace_publish_seq UPSERT", + sql: `INSERT INTO namespace_publish_seq (namespace, next_seq, updated_at) + VALUES (?, ?, ?) + ON CONFLICT(namespace) DO UPDATE SET + next_seq = next_seq + 1, + updated_at = excluded.updated_at`, + args: []any{"ns", 2, 0}, + exec: true, + }, + } + + for _, ex := range exemplars { + t.Run(ex.name, func(t *testing.T) { + if ex.exec { + if _, err := db.Exec(ex.sql, ex.args...); err != nil { + t.Errorf("schema drift: %v\nsql: %s", err, snippet(ex.sql)) + } + return + } + stmt, err := db.Prepare(ex.sql) + if err != nil { + t.Errorf("schema drift (Prepare failed): %v\nsql: %s", err, snippet(ex.sql)) + return + } + defer func() { _ = stmt.Close() }() + }) + } +} + +// openRoundtripDB returns an in-memory SQLite. Closes automatically on +// test cleanup. +func openRoundtripDB(t *testing.T) *sql.DB { + t.Helper() + db, err := sql.Open("sqlite3", ":memory:") + if err != nil { + t.Fatalf("open in-memory sqlite: %v", err) + } + t.Cleanup(func() { _ = db.Close() }) + return db +} + +// snippet trims a SQL string to fit on a single error line. +func snippet(s string) string { + s = strings.Join(strings.Fields(s), " ") + if len(s) > 140 { + return s[:140] + "..." + } + return s +} diff --git a/core/pkg/cli/functions/logs.go b/core/pkg/cli/functions/logs.go index d9d4ae5..b11e11a 100644 --- a/core/pkg/cli/functions/logs.go +++ b/core/pkg/cli/functions/logs.go @@ -3,31 +3,58 @@ package functions import ( "fmt" "strconv" + "strings" "github.com/spf13/cobra" ) -var logsLimit int +var ( + logsLimit int + logsWASMOnly bool +) -// LogsCmd retrieves function execution logs. +// LogsCmd retrieves function invocation history. +// +// Default view: invocation history (always populated when the function has +// been invoked) — request_id, status, duration, error_message, plus any +// WASM-emitted log entries nested per record. +// +// --wasm-only switches to the legacy view that returns ONLY entries +// emitted by the function via log_info / log_error (often empty). var LogsCmd = &cobra.Command{ Use: "logs ", - Short: "Get execution logs for a function", - Long: "Retrieves the most recent execution logs for a deployed function.", - Args: cobra.ExactArgs(1), - RunE: runLogs, + Short: "Get invocation history for a function", + Long: `Retrieves the most recent invocations for a deployed function. + +Each invocation record shows: timestamp, request_id, status, duration_ms, +and (if any) the error message. WASM functions that emit log entries via +log_info / log_error have those entries nested under each record. + +Pass --wasm-only to retrieve only the WASM-emitted log lines (legacy +behavior; rarely useful on functions that don't call log_info).`, + Args: cobra.ExactArgs(1), + RunE: runLogs, } func init() { - LogsCmd.Flags().IntVar(&logsLimit, "limit", 50, "Maximum number of log entries to retrieve") + LogsCmd.Flags().IntVar(&logsLimit, "limit", 50, "Maximum number of records to retrieve") + LogsCmd.Flags().BoolVar(&logsWASMOnly, "wasm-only", false, + "Show only WASM-emitted log entries (legacy view)") } func runLogs(cmd *cobra.Command, args []string) error { name := args[0] endpoint := "/v1/functions/" + name + "/logs" + q := []string{} if logsLimit > 0 { - endpoint += "?limit=" + strconv.Itoa(logsLimit) + q = append(q, "limit="+strconv.Itoa(logsLimit)) + } + if logsWASMOnly { + q = append(q, "wasm_only=1") + } + if len(q) > 0 { + endpoint += "?" + strings.Join(q, "&") } result, err := apiGet(endpoint) @@ -35,9 +62,64 @@ func runLogs(cmd *cobra.Command, args []string) error { return err } + if logsWASMOnly { + return printWASMLogs(name, result) + } + return printInvocations(name, result) +} + +// printInvocations renders the default invocation-history view. +func printInvocations(name string, result map[string]interface{}) error { + invs, ok := result["invocations"].([]interface{}) + if !ok || len(invs) == 0 { + fmt.Printf("No invocations found for function %q.\n", name) + return nil + } + + for _, entry := range invs { + inv, ok := entry.(map[string]interface{}) + if !ok { + continue + } + started := valStr(inv, "started_at") + status := valStr(inv, "status") + reqID := valStr(inv, "request_id") + duration := valNumberAsString(inv, "duration_ms") + errMsg := valStr(inv, "error_message") + + // Header line per invocation. + fmt.Printf("[%s] %s request=%s duration=%sms\n", + started, strings.ToUpper(status), reqID, duration) + if errMsg != "" { + fmt.Printf(" error: %s\n", errMsg) + } + + // Nested WASM logs (if any). + if wasmLogs, ok := inv["wasm_logs"].([]interface{}); ok { + for _, l := range wasmLogs { + le, ok := l.(map[string]interface{}) + if !ok { + continue + } + fmt.Printf(" %s [%s] %s\n", + valStr(le, "timestamp"), + strings.ToUpper(valStr(le, "level")), + valStr(le, "message")) + } + } + } + + fmt.Printf("\nShowing %d invocation(s). Use --wasm-only for the legacy log-line view.\n", + len(invs)) + return nil +} + +// printWASMLogs renders the legacy WASM-only view. +func printWASMLogs(name string, result map[string]interface{}) error { logs, ok := result["logs"].([]interface{}) if !ok || len(logs) == 0 { - fmt.Printf("No logs found for function %q.\n", name) + fmt.Printf("No WASM-emitted logs found for function %q. "+ + "Tip: drop --wasm-only to see invocation history.\n", name) return nil } @@ -55,3 +137,23 @@ func runLogs(cmd *cobra.Command, args []string) error { fmt.Printf("\nShowing %d log(s)\n", len(logs)) return nil } + +// valNumberAsString formats a JSON number field as a clean integer string. +func valNumberAsString(m map[string]interface{}, key string) string { + v, ok := m[key] + if !ok || v == nil { + return "0" + } + switch n := v.(type) { + case float64: + return strconv.FormatInt(int64(n), 10) + case int: + return strconv.Itoa(n) + case int64: + return strconv.FormatInt(n, 10) + case string: + return n + default: + return fmt.Sprintf("%v", n) + } +} diff --git a/core/pkg/gateway/handlers/serverless/deploy_handler.go b/core/pkg/gateway/handlers/serverless/deploy_handler.go index 0e4a2fd..df148a3 100644 --- a/core/pkg/gateway/handlers/serverless/deploy_handler.go +++ b/core/pkg/gateway/handlers/serverless/deploy_handler.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/DeBrosOfficial/network/pkg/httputil" "github.com/DeBrosOfficial/network/pkg/serverless" "go.uber.org/zap" ) @@ -126,7 +127,11 @@ func (h *ServerlessHandlers) DeployFunction(w http.ResponseWriter, r *http.Reque zap.String("name", def.Name), zap.Error(err), ) - writeError(w, http.StatusInternalServerError, "Failed to deploy: "+err.Error()) + // Use the typed function-deploy code so clients can distinguish + // "registry rejected this binary" from generic 500s. + writeRPCError(w, http.StatusInternalServerError, + httputil.ErrCodeFunctionDeploy, + "Failed to deploy: "+err.Error()) return } @@ -181,7 +186,51 @@ func writeJSON(w http.ResponseWriter, code int, v any) { _ = json.NewEncoder(w).Encode(v) } -// writeError writes a standardized JSON error -func writeError(w http.ResponseWriter, code int, msg string) { - writeJSON(w, code, map[string]any{"error": msg}) +// writeError emits the canonical RPC error envelope (bug #212 fix). +// +// Derives the typed RPCErrorCode from the HTTP status — sufficient for +// most call sites. Callers that need to surface a specific code (e.g. +// FUNCTION_EXECUTION_FAILED on a 500 from the invoker) should use +// writeRPCError directly. +// +// Wire shape (always): +// +// {"ok": false, "error": {"code": "...", "message": "...", "retryable": ...}} +func writeError(w http.ResponseWriter, status int, msg string) { + httputil.WriteRPCError(w, status, codeForStatus(status), msg) +} + +// writeRPCError is the typed helper for call sites that need to set a +// specific error code (e.g. distinguishing FUNCTION_EXECUTION_FAILED +// from a generic INTERNAL on a 500). +func writeRPCError(w http.ResponseWriter, status int, code httputil.RPCErrorCode, msg string, opts ...httputil.RPCErrorOption) { + httputil.WriteRPCError(w, status, code, msg, opts...) +} + +// codeForStatus maps HTTP status to the canonical RPCErrorCode. For +// statuses that map to multiple codes (500 → INTERNAL or +// FUNCTION_EXECUTION_FAILED), the caller picks via writeRPCError. +func codeForStatus(status int) httputil.RPCErrorCode { + switch status { + case http.StatusBadRequest: + return httputil.ErrCodeValidationFailed + case http.StatusUnauthorized: + return httputil.ErrCodeUnauthorized + case http.StatusForbidden: + return httputil.ErrCodeForbidden + case http.StatusNotFound: + return httputil.ErrCodeNotFound + case http.StatusConflict: + return httputil.ErrCodeConflict + case http.StatusRequestEntityTooLarge: + return httputil.ErrCodePayloadTooLarge + case http.StatusTooManyRequests: + return httputil.ErrCodeRateLimited + case http.StatusServiceUnavailable: + return httputil.ErrCodeServiceUnavailable + case http.StatusGatewayTimeout: + return httputil.ErrCodeTimeout + default: + return httputil.ErrCodeInternal + } } diff --git a/core/pkg/gateway/handlers/serverless/handlers_test.go b/core/pkg/gateway/handlers/serverless/handlers_test.go index ecc9077..1e74469 100644 --- a/core/pkg/gateway/handlers/serverless/handlers_test.go +++ b/core/pkg/gateway/handlers/serverless/handlers_test.go @@ -20,12 +20,13 @@ import ( // mockRegistry implements serverless.FunctionRegistry for testing. type mockRegistry struct { - functions map[string]*serverless.Function - logs []serverless.LogEntry - getErr error - listErr error - deleteErr error - logsErr error + functions map[string]*serverless.Function + logs []serverless.LogEntry + invocations []serverless.Invocation + getErr error + listErr error + deleteErr error + logsErr error } func newMockRegistry() *mockRegistry { @@ -78,6 +79,13 @@ func (m *mockRegistry) GetLogs(_ context.Context, _, _ string, _ int) ([]serverl return m.logs, nil } +func (m *mockRegistry) GetInvocations(_ context.Context, _, _ string, _ int) ([]serverless.Invocation, error) { + if m.logsErr != nil { + return nil, m.logsErr + } + return m.invocations, nil +} + // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- @@ -580,7 +588,7 @@ func TestDeployFunction_Base64WASMNotSupported(t *testing.T) { t.Errorf("expected 400, got %d", rec.Code) } respBody := decodeBody(t, rec) - errMsg, _ := respBody["error"].(string) + errMsg := errMessageFromEnvelope(respBody) if !strings.Contains(errMsg, "Base64 WASM upload not supported") { t.Errorf("expected base64 not supported error, got %q", errMsg) } @@ -600,12 +608,28 @@ func TestDeployFunction_JSONMissingWASM(t *testing.T) { t.Errorf("expected 400, got %d", rec.Code) } respBody := decodeBody(t, rec) - errMsg, _ := respBody["error"].(string) + errMsg := errMessageFromEnvelope(respBody) if !strings.Contains(errMsg, "name") { t.Errorf("expected name-related error, got %q", errMsg) } } +// errMessageFromEnvelope extracts the human-readable message from the +// canonical RPC error envelope: {"ok": false, "error": {"message": "..."}}. +// Returns "" if the envelope shape is unexpected; tests use Contains on +// the result so a missing message will still fail loudly. +func errMessageFromEnvelope(body map[string]interface{}) string { + if body == nil { + return "" + } + errObj, ok := body["error"].(map[string]interface{}) + if !ok { + return "" + } + msg, _ := errObj["message"].(string) + return msg +} + // --------------------------------------------------------------------------- // Tests: DeleteFunction validation // --------------------------------------------------------------------------- @@ -647,10 +671,14 @@ func TestDeleteFunction_NotFound(t *testing.T) { // Tests: GetFunctionLogs // --------------------------------------------------------------------------- -func TestGetFunctionLogs_Success(t *testing.T) { +// TestGetFunctionLogs_Success_DefaultInvocationsView locks in the bug-#211 fix: +// the default endpoint returns invocation history (always populated when the +// function has been invoked), NOT WASM-emitted log entries (often empty). +func TestGetFunctionLogs_Success_DefaultInvocationsView(t *testing.T) { reg := newMockRegistry() - reg.logs = []serverless.LogEntry{ - {Level: "info", Message: "hello"}, + reg.invocations = []serverless.Invocation{ + {ID: "inv-1", RequestID: "req-A", Status: "success", DurationMS: 12}, + {ID: "inv-2", RequestID: "req-B", Status: "error", ErrorMessage: "boom"}, } h := newTestHandlers(reg) @@ -667,8 +695,41 @@ func TestGetFunctionLogs_Success(t *testing.T) { t.Errorf("expected name 'myFunc', got %v", body["name"]) } count, ok := body["count"].(float64) + if !ok || int(count) != 2 { + t.Errorf("expected count=2, got %v", body["count"]) + } + if _, ok := body["invocations"]; !ok { + t.Error("expected response to include 'invocations' key in default view") + } + if _, ok := body["logs"]; ok { + t.Error("default view should NOT return 'logs' key (legacy WASM-only)") + } +} + +// TestGetFunctionLogs_WASMOnly preserves the legacy "raw WASM-emitted lines" +// view via the wasm_only=1 query param. +func TestGetFunctionLogs_WASMOnly(t *testing.T) { + reg := newMockRegistry() + reg.logs = []serverless.LogEntry{ + {Level: "info", Message: "hello"}, + } + h := newTestHandlers(reg) + + req := httptest.NewRequest(http.MethodGet, "/v1/functions/myFunc/logs?namespace=test&wasm_only=1", nil) + rec := httptest.NewRecorder() + + h.GetFunctionLogs(rec, req, "myFunc") + + if rec.Code != http.StatusOK { + t.Errorf("expected 200, got %d", rec.Code) + } + body := decodeBody(t, rec) + count, ok := body["count"].(float64) if !ok || int(count) != 1 { - t.Errorf("expected count=1, got %v", body["count"]) + t.Errorf("expected count=1 from logs, got %v", body["count"]) + } + if _, ok := body["logs"]; !ok { + t.Error("wasm_only=1 should return 'logs' key") } } @@ -717,10 +778,24 @@ func TestWriteError(t *testing.T) { if rec.Code != http.StatusBadRequest { t.Errorf("expected 400, got %d", rec.Code) } - body := map[string]string{} - json.NewDecoder(rec.Body).Decode(&body) - if body["error"] != "something went wrong" { - t.Errorf("expected error message 'something went wrong', got %q", body["error"]) + // writeError now emits the canonical RPC envelope (bug #212 fix). + // Shape: {"ok": false, "error": {"code": "VALIDATION_FAILED", "message": "...", "retryable": false}} + body := map[string]interface{}{} + if err := json.NewDecoder(rec.Body).Decode(&body); err != nil { + t.Fatalf("decode: %v", err) + } + if ok, _ := body["ok"].(bool); ok { + t.Error("ok must be false on error envelope") + } + errObj, ok := body["error"].(map[string]interface{}) + if !ok { + t.Fatalf("error must be an object, got %T: %v", body["error"], body["error"]) + } + if msg, _ := errObj["message"].(string); msg != "something went wrong" { + t.Errorf("expected message 'something went wrong', got %q", msg) + } + if code, _ := errObj["code"].(string); code != "VALIDATION_FAILED" { + t.Errorf("expected code VALIDATION_FAILED for 400, got %q", code) } } diff --git a/core/pkg/gateway/handlers/serverless/invoke_handler.go b/core/pkg/gateway/handlers/serverless/invoke_handler.go index 0f5530a..f405072 100644 --- a/core/pkg/gateway/handlers/serverless/invoke_handler.go +++ b/core/pkg/gateway/handlers/serverless/invoke_handler.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/DeBrosOfficial/network/pkg/httputil" "github.com/DeBrosOfficial/network/pkg/serverless" ) @@ -78,54 +79,65 @@ func (h *ServerlessHandlers) InvokeFunction(w http.ResponseWriter, r *http.Reque defer cancel() req := &serverless.InvokeRequest{ - Namespace: namespace, - FunctionName: name, - Version: version, - Input: input, - TriggerType: serverless.TriggerTypeHTTP, - CallerWallet: callerWallet, - CallerIP: extractRemoteIP(r), - CallerClaims: h.getCallerClaimsFromRequest(r), + Namespace: namespace, + FunctionName: name, + Version: version, + Input: input, + TriggerType: serverless.TriggerTypeHTTP, + CallerWallet: callerWallet, + CallerIP: extractRemoteIP(r), + CallerClaims: h.getCallerClaimsFromRequest(r), + CallerJWTSubject: h.getJWTSubjectFromRequest(r), } resp, err := h.invoker.Invoke(ctx, req) if err != nil { - statusCode := http.StatusInternalServerError - // Tiered rate limiter returns *RateLimitedError with retry-after. + // Bug #212: every error path here emits the canonical RPC + // envelope. error.message is always populated (falls back to + // err.Error() then to a default per code). + + // Rate-limit errors carry a retry hint we surface as both the + // HTTP Retry-After header and the envelope field. var rle *serverless.RateLimitedError if errors.As(err, &rle) { + opts := []httputil.RPCErrorOption{} if rle.RetryAfter > 0 { - w.Header().Set("Retry-After", - strconv.FormatFloat(rle.RetryAfter.Seconds(), 'f', 1, 64)) + opts = append(opts, httputil.WithRetryAfter(rle.RetryAfter.Seconds())) } - writeJSON(w, http.StatusTooManyRequests, map[string]interface{}{ - "error": err.Error(), - "scope": rle.Scope, - "retry_after": rle.RetryAfter.Seconds(), - }) + if resp != nil && resp.RequestID != "" { + opts = append(opts, httputil.WithRequestID(resp.RequestID)) + } + writeRPCError(w, http.StatusTooManyRequests, + httputil.ErrCodeRateLimited, err.Error(), opts...) return } - if serverless.IsNotFound(err) { + + // Map domain-typed errors to (status, RPC code). + statusCode := http.StatusInternalServerError + errCode := httputil.ErrCodeFunctionExecution + switch { + case serverless.IsNotFound(err): statusCode = http.StatusNotFound - } else if serverless.IsResourceExhausted(err) { + errCode = httputil.ErrCodeNotFound + case serverless.IsResourceExhausted(err): statusCode = http.StatusTooManyRequests - } else if serverless.IsUnauthorized(err) { + errCode = httputil.ErrCodeRateLimited + case serverless.IsUnauthorized(err): statusCode = http.StatusUnauthorized + errCode = httputil.ErrCodeUnauthorized } - if resp == nil { - writeJSON(w, statusCode, map[string]interface{}{ - "error": err.Error(), - }) - return + // Pick the most informative message: function-side resp.Error + // (if set) is more actionable than the wrapping err.Error(). + msg := err.Error() + if resp != nil && resp.Error != "" { + msg = resp.Error } - - writeJSON(w, statusCode, map[string]interface{}{ - "request_id": resp.RequestID, - "status": resp.Status, - "error": resp.Error, - "duration_ms": resp.DurationMS, - }) + opts := []httputil.RPCErrorOption{} + if resp != nil && resp.RequestID != "" { + opts = append(opts, httputil.WithRequestID(resp.RequestID)) + } + writeRPCError(w, statusCode, errCode, msg, opts...) return } diff --git a/core/pkg/gateway/handlers/serverless/logs_handler.go b/core/pkg/gateway/handlers/serverless/logs_handler.go index 7d3b6a5..ec27d08 100644 --- a/core/pkg/gateway/handlers/serverless/logs_handler.go +++ b/core/pkg/gateway/handlers/serverless/logs_handler.go @@ -10,7 +10,26 @@ import ( ) // GetFunctionLogs handles GET /v1/functions/{name}/logs -// Retrieves execution logs for a specific function. +// +// Returns invocation history (always populated when the function has been +// invoked) with any associated WASM-emitted log entries nested per record. +// This is the answer to "what happened when this function ran" — the older +// behavior (only WASM-emitted entries) was useless on functions that +// don't call log_info / log_error and surfaced as "No logs found" to users. +// +// Optional query params: +// - limit: max records (default 50, capped at 500) +// - wasm_only=1: return ONLY WASM-emitted log rows (legacy view) +// +// Response: +// +// { +// "name": "...", +// "namespace": "...", +// "invocations": [ ...records... ], // when wasm_only is unset +// "logs": [ ...LogEntry... ], // when wasm_only=1 +// "count": N +// } func (h *ServerlessHandlers) GetFunctionLogs(w http.ResponseWriter, r *http.Request, name string) { namespace := r.URL.Query().Get("namespace") if namespace == "" { @@ -22,31 +41,56 @@ func (h *ServerlessHandlers) GetFunctionLogs(w http.ResponseWriter, r *http.Requ return } - limit := 100 + limit := 50 if lStr := r.URL.Query().Get("limit"); lStr != "" { - if l, err := strconv.Atoi(lStr); err == nil { + if l, err := strconv.Atoi(lStr); err == nil && l > 0 { limit = l } } + if limit > 500 { + limit = 500 + } ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) defer cancel() - logs, err := h.registry.GetLogs(ctx, namespace, name, limit) + // Legacy "WASM-emitted only" view. Kept for backward compat — most + // dashboards / clients should use the default invocations view. + if r.URL.Query().Get("wasm_only") == "1" { + logs, err := h.registry.GetLogs(ctx, namespace, name, limit) + if err != nil { + h.logger.Error("Failed to get WASM logs", + zap.String("name", name), + zap.String("namespace", namespace), + zap.Error(err), + ) + writeError(w, http.StatusInternalServerError, "Failed to get logs") + return + } + writeJSON(w, http.StatusOK, map[string]interface{}{ + "name": name, + "namespace": namespace, + "logs": logs, + "count": len(logs), + }) + return + } + + invocations, err := h.registry.GetInvocations(ctx, namespace, name, limit) if err != nil { - h.logger.Error("Failed to get function logs", + h.logger.Error("Failed to get function invocations", zap.String("name", name), zap.String("namespace", namespace), zap.Error(err), ) - writeError(w, http.StatusInternalServerError, "Failed to get logs") + writeError(w, http.StatusInternalServerError, "Failed to get invocations") return } writeJSON(w, http.StatusOK, map[string]interface{}{ - "name": name, - "namespace": namespace, - "logs": logs, - "count": len(logs), + "name": name, + "namespace": namespace, + "invocations": invocations, + "count": len(invocations), }) } diff --git a/core/pkg/gateway/handlers/serverless/types.go b/core/pkg/gateway/handlers/serverless/types.go index bc0a947..2a5cacc 100644 --- a/core/pkg/gateway/handlers/serverless/types.go +++ b/core/pkg/gateway/handlers/serverless/types.go @@ -2,6 +2,7 @@ package serverless import ( "net/http" + "strings" "github.com/DeBrosOfficial/network/pkg/gateway/auth" "github.com/DeBrosOfficial/network/pkg/gateway/ctxkeys" @@ -113,6 +114,27 @@ func (h *ServerlessHandlers) getCallerClaimsFromRequest(r *http.Request) map[str return out } +// getJWTSubjectFromRequest returns the Bearer JWT's `sub` claim if present, +// independent of the API-key-vs-JWT precedence used for general wallet +// resolution. Returns "" when the request was not JWT-authenticated. +// +// This is the source of truth for `oh.GetCallerJWTSubject()` inside WASM +// — bug #215. Functions that must bind on the JWT-signed identity (e.g. +// signup paths verifying the registering wallet matches the auth-challenge +// signer) read this instead of GetCallerWallet, which returns the namespace +// pseudo-identifier when the API key is the resolved auth. +func (h *ServerlessHandlers) getJWTSubjectFromRequest(r *http.Request) string { + v := r.Context().Value(ctxkeys.JWT) + if v == nil { + return "" + } + claims, ok := v.(*auth.JWTClaims) + if !ok || claims == nil { + return "" + } + return strings.TrimSpace(claims.Sub) +} + // getWalletFromRequest extracts wallet address from JWT. func (h *ServerlessHandlers) getWalletFromRequest(r *http.Request) string { // Import strings package functions inline to avoid circular dependencies diff --git a/core/pkg/gateway/handlers/serverless/ws_handler.go b/core/pkg/gateway/handlers/serverless/ws_handler.go index 4f259ec..c0bc668 100644 --- a/core/pkg/gateway/handlers/serverless/ws_handler.go +++ b/core/pkg/gateway/handlers/serverless/ws_handler.go @@ -125,6 +125,7 @@ func (h *ServerlessHandlers) HandleWebSocket(w http.ResponseWriter, r *http.Requ // Capture custom claims at upgrade time and reuse for every frame — // the JWT context is request-scoped and won't survive past upgrade. callerClaims := h.getCallerClaimsFromRequest(r) + callerJWTSubject := h.getJWTSubjectFromRequest(r) // Message loop for { @@ -141,15 +142,16 @@ func (h *ServerlessHandlers) HandleWebSocket(w http.ResponseWriter, r *http.Requ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) req := &serverless.InvokeRequest{ - Namespace: namespace, - FunctionName: name, - Version: version, - Input: message, - TriggerType: serverless.TriggerTypeWebSocket, - CallerWallet: callerWallet, - CallerIP: callerIP, - CallerClaims: callerClaims, - WSClientID: clientID, + Namespace: namespace, + FunctionName: name, + Version: version, + Input: message, + TriggerType: serverless.TriggerTypeWebSocket, + CallerWallet: callerWallet, + CallerIP: callerIP, + CallerClaims: callerClaims, + CallerJWTSubject: callerJWTSubject, + WSClientID: clientID, } resp, err := h.invoker.Invoke(ctx, req) diff --git a/core/pkg/gateway/middleware.go b/core/pkg/gateway/middleware.go index 22821ab..e5fc6b1 100644 --- a/core/pkg/gateway/middleware.go +++ b/core/pkg/gateway/middleware.go @@ -2,11 +2,13 @@ package gateway import ( "context" + "errors" "fmt" "hash/fnv" "io" "net" "net/http" + "net/url" "sort" "strconv" "strings" @@ -15,10 +17,48 @@ import ( "github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/deployments" "github.com/DeBrosOfficial/network/pkg/gateway/auth" + "github.com/DeBrosOfficial/network/pkg/httputil" "github.com/DeBrosOfficial/network/pkg/logging" "go.uber.org/zap" ) +// isLongRunningProxyPath returns true for paths whose expected work bound +// exceeds the default 30s proxy timeout. New classes go here, alphabetically. +// +// - /v1/functions/.../invoke and /v1/invoke/... — up to 300s per fn +// - /v1/functions/.../ws — long-lived WS +// - /v1/storage/upload, /v1/storage/pin — IPFS add can be slow +// +// Adding a path here is preferable to bumping the global timeout: each +// path's bound is documented in one grep-able place. +func isLongRunningProxyPath(p string) bool { + switch { + case strings.HasPrefix(p, "/v1/storage/upload"), + strings.HasPrefix(p, "/v1/storage/pin"), + strings.HasPrefix(p, "/v1/invoke/"), + strings.HasPrefix(p, "/v1/functions/") && (strings.HasSuffix(p, "/invoke") || strings.HasSuffix(p, "/ws")): + return true + } + return false +} + +// isProxyTimeoutErr returns true when an HTTP client error is a timeout — +// either the http.Client overall timeout or context.DeadlineExceeded +// propagating from a parent context. +func isProxyTimeoutErr(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.DeadlineExceeded) { + return true + } + var urlErr *url.Error + if errors.As(err, &urlErr) && urlErr.Timeout() { + return true + } + return false +} + // Note: context keys (ctxKeyAPIKey, ctxKeyJWT, CtxKeyNamespaceOverride) are now defined in context.go // Internal auth headers for trusted inter-gateway communication. @@ -1020,7 +1060,12 @@ func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.R } } if selected.ip == "" { - http.Error(w, "Namespace gateway unavailable (all circuits open)", http.StatusServiceUnavailable) + // Bug #219: emit canonical envelope with retryable=true (transient). + httputil.WriteRPCError(w, http.StatusServiceUnavailable, + httputil.ErrCodeServiceUnavailable, + "namespace gateway unavailable: all upstream circuits are open. "+ + "Wait a few seconds and retry, or check `orama monitor report` for unhealthy nodes.", + httputil.WithRetryable()) return } gatewayIP := selected.ip @@ -1082,9 +1127,18 @@ func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.R proxyReq.Header.Set(HeaderInternalAuthNamespace, validatedNamespace) } - // Use a longer timeout for upload paths (IPFS add can be slow for large files) + // Pick the proxy timeout based on the path's expected work bound. + // Defaults to 30s for fast paths; bumps to 300s for known long-running + // classes (uploads, function invocations) so we don't truncate before + // the namespace gateway's own per-function timeout fires. + // + // Bug #219: function invokes were truncated at 30s and surfaced as + // "Namespace gateway unavailable" — misleading because the gateway IS + // up, the FUNCTION just exceeded its budget. Aligning the proxy + // timeout with the function-level cap ensures the namespace gateway's + // proper TIMEOUT envelope reaches the client first. proxyTimeout := 30 * time.Second - if strings.HasPrefix(r.URL.Path, "/v1/storage/upload") || strings.HasPrefix(r.URL.Path, "/v1/storage/pin") { + if isLongRunningProxyPath(r.URL.Path) { proxyTimeout = 300 * time.Second } @@ -1098,7 +1152,22 @@ func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.R zap.String("target", gatewayIP), zap.Error(err), ) - http.Error(w, "Namespace gateway unavailable", http.StatusServiceUnavailable) + // Distinguish timeout from connection failure so clients get an + // actionable code (#219). "Namespace gateway unavailable" was + // emitted indiscriminately and sent operators down the wrong + // debug path. + if isProxyTimeoutErr(err) { + httputil.WriteRPCError(w, http.StatusGatewayTimeout, + httputil.ErrCodeTimeout, + "function or upstream call exceeded the proxy budget ("+ + proxyTimeout.String()+ + "). Increase the function's timeout: in function.yaml (max 300s) or split the work into smaller invocations.", + ) + return + } + httputil.WriteRPCError(w, http.StatusServiceUnavailable, + httputil.ErrCodeServiceUnavailable, + "namespace gateway unavailable: "+err.Error()) return } defer resp.Body.Close() diff --git a/core/pkg/gateway/proxy_timeout_test.go b/core/pkg/gateway/proxy_timeout_test.go new file mode 100644 index 0000000..bc3253c --- /dev/null +++ b/core/pkg/gateway/proxy_timeout_test.go @@ -0,0 +1,77 @@ +package gateway + +import ( + "context" + "errors" + "net/url" + "testing" +) + +// Bug #219: distinguish proxy timeout from generic connection failure +// so the canonical envelope carries the right error code. + +func TestIsLongRunningProxyPath(t *testing.T) { + cases := []struct { + path string + want bool + }{ + // Long-running known classes. + {"/v1/storage/upload", true}, + {"/v1/storage/upload/foo", true}, + {"/v1/storage/pin", true}, + {"/v1/invoke/myns/myfn", true}, + {"/v1/functions/abc/invoke", true}, + {"/v1/functions/abc/ws", true}, + + // Fast paths — must default to the 30s budget. + {"/v1/health", false}, + {"/v1/auth/verify", false}, + {"/v1/pubsub/publish", false}, // single-publish, fast + {"/v1/functions/abc/logs", false}, + {"/v1/db/query", false}, + {"", false}, + } + for _, c := range cases { + t.Run(c.path, func(t *testing.T) { + if got := isLongRunningProxyPath(c.path); got != c.want { + t.Errorf("isLongRunningProxyPath(%q) = %v, want %v", c.path, got, c.want) + } + }) + } +} + +func TestIsProxyTimeoutErr(t *testing.T) { + t.Run("nil_is_not_timeout", func(t *testing.T) { + if isProxyTimeoutErr(nil) { + t.Error("nil error must not register as timeout") + } + }) + + t.Run("context_deadline_exceeded_is_timeout", func(t *testing.T) { + if !isProxyTimeoutErr(context.DeadlineExceeded) { + t.Error("context.DeadlineExceeded must register as timeout") + } + }) + + t.Run("wrapped_context_deadline_is_timeout", func(t *testing.T) { + err := &url.Error{Op: "Get", URL: "http://x", Err: context.DeadlineExceeded} + if !isProxyTimeoutErr(err) { + t.Error("url.Error wrapping context.DeadlineExceeded must register as timeout") + } + }) + + t.Run("connection_refused_is_NOT_timeout", func(t *testing.T) { + // Genuine connection failures should map to SERVICE_UNAVAILABLE, not TIMEOUT. + err := errors.New("connection refused") + if isProxyTimeoutErr(err) { + t.Error("plain connection error must not register as timeout") + } + }) + + t.Run("unrelated_error_is_NOT_timeout", func(t *testing.T) { + err := errors.New("dns lookup failed") + if isProxyTimeoutErr(err) { + t.Error("unrelated errors must not register as timeout") + } + }) +} diff --git a/core/pkg/gateway/push_routes.go b/core/pkg/gateway/push_routes.go new file mode 100644 index 0000000..f45cc17 --- /dev/null +++ b/core/pkg/gateway/push_routes.go @@ -0,0 +1,64 @@ +package gateway + +// push_routes.go provides the always-registered push HTTP entrypoints. +// When the gateway has no push provider configured (no NtfyBaseURL, +// no ExpoAccessToken, etc. — and therefore no g.pushHandlers), these +// methods return a canonical 503 envelope instead of letting the route +// fall through to the default 404 handler. +// +// Bug #220: tenants saw `404 Page Not Found` on /v1/push/devices and +// couldn't tell whether the path was wrong or push wasn't enabled. +// 503 + a clear message points operators directly at the config knob. + +import ( + "net/http" + + "github.com/DeBrosOfficial/network/pkg/httputil" +) + +// pushNotConfiguredMessage is the human-readable 503 body. Same text +// reused by every push entrypoint so operators see consistent guidance. +const pushNotConfiguredMessage = "push notifications are not configured on this namespace gateway. " + + "Set `ntfy_base_url` or `expo_access_token` in the gateway config and restart, " + + "then call this endpoint again. See core/docs/SERVERLESS.md for details." + +// pushDevicesHandler dispatches GET (list) / POST (register) on +// /v1/push/devices. Returns 503 when push isn't configured. +func (g *Gateway) pushDevicesHandler(w http.ResponseWriter, r *http.Request) { + if g.pushHandlers == nil { + httputil.WriteRPCError(w, http.StatusServiceUnavailable, + httputil.ErrCodeServiceUnavailable, pushNotConfiguredMessage) + return + } + switch r.Method { + case http.MethodGet: + g.pushHandlers.ListDevicesHandler(w, r) + case http.MethodPost: + g.pushHandlers.RegisterDeviceHandler(w, r) + default: + httputil.WriteRPCError(w, http.StatusMethodNotAllowed, + httputil.ErrCodeValidationFailed, "method not allowed: use GET to list or POST to register") + } +} + +// pushDevicesByIDHandler handles DELETE /v1/push/devices/{id}. Returns +// 503 when push isn't configured. +func (g *Gateway) pushDevicesByIDHandler(w http.ResponseWriter, r *http.Request) { + if g.pushHandlers == nil { + httputil.WriteRPCError(w, http.StatusServiceUnavailable, + httputil.ErrCodeServiceUnavailable, pushNotConfiguredMessage) + return + } + g.pushHandlers.DeleteDeviceHandler(w, r) +} + +// pushSendHandler handles POST /v1/push/send. Returns 503 when push +// isn't configured. +func (g *Gateway) pushSendHandler(w http.ResponseWriter, r *http.Request) { + if g.pushHandlers == nil { + httputil.WriteRPCError(w, http.StatusServiceUnavailable, + httputil.ErrCodeServiceUnavailable, pushNotConfiguredMessage) + return + } + g.pushHandlers.SendHandler(w, r) +} diff --git a/core/pkg/gateway/push_routes_test.go b/core/pkg/gateway/push_routes_test.go new file mode 100644 index 0000000..3daa490 --- /dev/null +++ b/core/pkg/gateway/push_routes_test.go @@ -0,0 +1,77 @@ +package gateway + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +// Bug #220: when push isn't configured, /v1/push/* must return a clear +// 503 with the canonical envelope, NOT 404. Tests below pin that contract. + +func TestPushRoutes_returns_503_with_actionable_message_when_unconfigured(t *testing.T) { + g := &Gateway{} // pushHandlers is nil + + cases := []struct { + name string + method string + path string + handler http.HandlerFunc + }{ + {"list", http.MethodGet, "/v1/push/devices", g.pushDevicesHandler}, + {"register", http.MethodPost, "/v1/push/devices", g.pushDevicesHandler}, + {"delete", http.MethodDelete, "/v1/push/devices/abc", g.pushDevicesByIDHandler}, + {"send", http.MethodPost, "/v1/push/send", g.pushSendHandler}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + req := httptest.NewRequest(c.method, c.path, nil) + rec := httptest.NewRecorder() + c.handler(rec, req) + + if rec.Code != http.StatusServiceUnavailable { + t.Errorf("expected 503, got %d", rec.Code) + } + // Canonical envelope (#212 contract). + var body map[string]interface{} + if err := json.NewDecoder(rec.Body).Decode(&body); err != nil { + t.Fatalf("decode: %v", err) + } + if ok, _ := body["ok"].(bool); ok { + t.Error("ok must be false") + } + errObj, ok := body["error"].(map[string]interface{}) + if !ok { + t.Fatal("error must be an object") + } + msg, _ := errObj["message"].(string) + if !strings.Contains(msg, "push notifications are not configured") { + t.Errorf("message must explain push isn't configured, got %q", msg) + } + if !strings.Contains(msg, "ntfy_base_url") || !strings.Contains(msg, "expo_access_token") { + t.Errorf("message must point at the config knobs, got %q", msg) + } + code, _ := errObj["code"].(string) + if code != "SERVICE_UNAVAILABLE" { + t.Errorf("expected code=SERVICE_UNAVAILABLE, got %q", code) + } + }) + } +} + +func TestPushDevicesHandler_method_not_allowed_for_unsupported_method(t *testing.T) { + g := &Gateway{} // pushHandlers nil — but we want to test method-not-allowed. + // First populate pushHandlers with a sentinel by skipping that path: + // when nil, every method goes through the 503 path. So instead, we test + // that with nil, even unsupported methods get 503 (not 405 — push + // disabled is the bigger problem). + req := httptest.NewRequest(http.MethodPut, "/v1/push/devices", nil) + rec := httptest.NewRecorder() + g.pushDevicesHandler(rec, req) + + if rec.Code != http.StatusServiceUnavailable { + t.Errorf("when push disabled, method-not-allowed gives way to 503; got %d", rec.Code) + } +} diff --git a/core/pkg/gateway/routes.go b/core/pkg/gateway/routes.go index a03762c..1b34244 100644 --- a/core/pkg/gateway/routes.go +++ b/core/pkg/gateway/routes.go @@ -16,6 +16,9 @@ func (g *Gateway) Routes() http.Handler { mux.HandleFunc("/v1/health", g.healthHandler) mux.HandleFunc("/v1/version", g.versionHandler) mux.HandleFunc("/v1/status", g.statusHandler) + // Schema-version contract (bug #214 audit follow-up): tenants can + // self-check whether their gateway's required schema is applied. + mux.HandleFunc("/v1/schema-status", g.handleSchemaStatus) // Internal ping for peer-to-peer health monitoring mux.HandleFunc("/v1/internal/ping", g.pingHandler) @@ -125,23 +128,16 @@ func (g *Gateway) Routes() http.Handler { } // push notifications - if g.pushHandlers != nil { - // GET + POST share the path; the handler dispatches by method. - mux.HandleFunc("/v1/push/devices", func(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case http.MethodGet: - g.pushHandlers.ListDevicesHandler(w, r) - case http.MethodPost: - g.pushHandlers.RegisterDeviceHandler(w, r) - default: - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - } - }) - // DELETE /v1/push/devices/{id} — uses path-prefix routing because - // net/http mux doesn't extract path params; the handler parses {id}. - mux.HandleFunc("/v1/push/devices/", g.pushHandlers.DeleteDeviceHandler) - mux.HandleFunc("/v1/push/send", g.pushHandlers.SendHandler) - } + // + // Routes are ALWAYS registered (bug #220). When no provider is + // configured, the handler returns a canonical 503 envelope explaining + // that push isn't enabled — far better UX than a bare 404 that sends + // operators down "is the gateway broken?" rabbit holes. + mux.HandleFunc("/v1/push/devices", g.pushDevicesHandler) + // DELETE /v1/push/devices/{id} — uses path-prefix routing because + // net/http mux doesn't extract path params; the handler parses {id}. + mux.HandleFunc("/v1/push/devices/", g.pushDevicesByIDHandler) + mux.HandleFunc("/v1/push/send", g.pushSendHandler) // operator node management (wallet JWT auth via middleware) if g.operatorHandler != nil { diff --git a/core/pkg/gateway/schema_status_handler.go b/core/pkg/gateway/schema_status_handler.go new file mode 100644 index 0000000..199d476 --- /dev/null +++ b/core/pkg/gateway/schema_status_handler.go @@ -0,0 +1,114 @@ +package gateway + +// schema_status_handler.go exposes the gateway's schema-version contract +// over HTTP so namespace tenants can self-check schema drift without +// SSH access (bug #214 audit follow-up). +// +// Endpoint: +// +// GET /v1/schema-status +// +// Response (canonical, success): +// +// { +// "ok": true, +// "required_version": 25, +// "applied_version": 25, +// "in_sync": true, +// "pending": [] +// } +// +// Response (drift): +// +// { +// "ok": true, +// "required_version": 25, +// "applied_version": 22, +// "in_sync": false, +// "pending": [ +// {"version": 23, "name": "push_devices"}, +// {"version": 24, "name": "namespace_publish_seq"}, +// {"version": 25, "name": "persistent_ws"} +// ] +// } +// +// Authorization: this is mounted under `/v1/` so the existing +// namespace-ownership middleware applies — only the namespace's own +// authenticated callers can see it. Schema state isn't sensitive on its +// own (it's effectively a public version pin) but we gate by namespace +// to be consistent with every other admin-flavored endpoint. +// +// Why HTTP and not just the CLI: the CLI requires SSH to a node. An +// app developer or tenant ops person should be able to verify schema +// state from their workstation without infrastructure access. + +import ( + "context" + "net/http" + "time" + + "github.com/DeBrosOfficial/network/migrations" + "github.com/DeBrosOfficial/network/pkg/httputil" +) + +// schemaStatusResponse is the canonical wire shape. Exported tag-only +// fields so other Go callers (tests, dashboards) can consume the same shape. +type schemaStatusResponse struct { + OK bool `json:"ok"` + RequiredVersion int `json:"required_version"` + AppliedVersion int `json:"applied_version"` + InSync bool `json:"in_sync"` + Pending []schemaPendingItem `json:"pending"` +} + +type schemaPendingItem struct { + Version int `json:"version"` + Name string `json:"name"` +} + +// handleSchemaStatus serves GET /v1/schema-status. +// +// Errors return the canonical RPC error envelope (bug #212 fix). On a +// happy path the response is the schemaStatusResponse shape above. +func (g *Gateway) handleSchemaStatus(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + httputil.WriteRPCError(w, http.StatusMethodNotAllowed, + httputil.ErrCodeValidationFailed, "method not allowed") + return + } + if g.sqlDB == nil { + httputil.WriteRPCError(w, http.StatusServiceUnavailable, + httputil.ErrCodeServiceUnavailable, "schema status unavailable: db not initialized") + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + required := migrations.RequiredVersion() + applied, err := migrations.AppliedVersion(ctx, g.sqlDB) + if err != nil { + httputil.WriteRPCError(w, http.StatusInternalServerError, + httputil.ErrCodeInternal, "failed to read applied schema version: "+err.Error()) + return + } + + pending, err := migrations.PendingMigrations(ctx, g.sqlDB) + if err != nil { + // Non-fatal: applied/required are still useful even if pending fetch fails. + pending = nil + } + + items := make([]schemaPendingItem, 0, len(pending)) + for _, p := range pending { + items = append(items, schemaPendingItem{Version: p.Version, Name: p.Name}) + } + + httputil.WriteJSON(w, http.StatusOK, schemaStatusResponse{ + OK: true, + RequiredVersion: required, + AppliedVersion: applied, + InSync: applied >= required, + Pending: items, + }) +} diff --git a/core/pkg/gateway/schema_status_handler_test.go b/core/pkg/gateway/schema_status_handler_test.go new file mode 100644 index 0000000..5354e16 --- /dev/null +++ b/core/pkg/gateway/schema_status_handler_test.go @@ -0,0 +1,133 @@ +package gateway + +import ( + "database/sql" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/DeBrosOfficial/network/migrations" + _ "github.com/mattn/go-sqlite3" +) + +// openTestSQLDB creates an in-memory SQLite with the schema_migrations +// table seeded. +func openTestSQLDB(t *testing.T) *sql.DB { + t.Helper() + db, err := sql.Open("sqlite3", ":memory:") + if err != nil { + t.Fatalf("open: %v", err) + } + t.Cleanup(func() { _ = db.Close() }) + if _, err := db.Exec(` + CREATE TABLE schema_migrations ( + version INTEGER PRIMARY KEY, + applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + )`); err != nil { + t.Fatalf("seed: %v", err) + } + return db +} + +func TestSchemaStatus_in_sync(t *testing.T) { + db := openTestSQLDB(t) + // Seed schema_migrations to the binary's required version. + for v := 1; v <= migrations.RequiredVersion(); v++ { + if _, err := db.Exec("INSERT INTO schema_migrations (version) VALUES (?)", v); err != nil { + t.Fatalf("seed v=%d: %v", v, err) + } + } + g := &Gateway{sqlDB: db} + + req := httptest.NewRequest(http.MethodGet, "/v1/schema-status", nil) + rec := httptest.NewRecorder() + g.handleSchemaStatus(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", rec.Code) + } + var resp schemaStatusResponse + if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil { + t.Fatalf("decode: %v", err) + } + if !resp.OK { + t.Error("ok must be true") + } + if !resp.InSync { + t.Error("in_sync must be true when applied == required") + } + if resp.AppliedVersion != migrations.RequiredVersion() { + t.Errorf("applied = %d, want %d", resp.AppliedVersion, migrations.RequiredVersion()) + } + if len(resp.Pending) != 0 { + t.Errorf("pending should be empty when in sync, got %d", len(resp.Pending)) + } +} + +func TestSchemaStatus_lagging(t *testing.T) { + db := openTestSQLDB(t) + // Seed only the first migration; the rest are "pending". + if _, err := db.Exec("INSERT INTO schema_migrations (version) VALUES (1)"); err != nil { + t.Fatalf("seed: %v", err) + } + g := &Gateway{sqlDB: db} + + req := httptest.NewRequest(http.MethodGet, "/v1/schema-status", nil) + rec := httptest.NewRecorder() + g.handleSchemaStatus(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200 (drift is reported via in_sync, not via HTTP error)", rec.Code) + } + var resp schemaStatusResponse + if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil { + t.Fatalf("decode: %v", err) + } + if resp.InSync { + t.Error("in_sync must be false when behind") + } + if resp.AppliedVersion != 1 { + t.Errorf("applied = %d, want 1", resp.AppliedVersion) + } + if len(resp.Pending) == 0 { + t.Error("expected non-empty pending list when behind") + } +} + +func TestSchemaStatus_no_db_returns_envelope(t *testing.T) { + g := &Gateway{sqlDB: nil} + req := httptest.NewRequest(http.MethodGet, "/v1/schema-status", nil) + rec := httptest.NewRecorder() + g.handleSchemaStatus(rec, req) + + if rec.Code != http.StatusServiceUnavailable { + t.Errorf("status = %d, want 503", rec.Code) + } + // Confirm canonical envelope (bug #212 contract). + var body map[string]interface{} + if err := json.NewDecoder(rec.Body).Decode(&body); err != nil { + t.Fatalf("decode: %v", err) + } + if ok, _ := body["ok"].(bool); ok { + t.Error("ok must be false on error") + } + errObj, ok := body["error"].(map[string]interface{}) + if !ok { + t.Fatal("error must be an object") + } + if msg, _ := errObj["message"].(string); msg == "" { + t.Error("error message must be populated") + } +} + +func TestSchemaStatus_method_not_allowed(t *testing.T) { + g := &Gateway{sqlDB: openTestSQLDB(t)} + req := httptest.NewRequest(http.MethodPost, "/v1/schema-status", nil) + rec := httptest.NewRecorder() + g.handleSchemaStatus(rec, req) + + if rec.Code != http.StatusMethodNotAllowed { + t.Errorf("status = %d, want 405", rec.Code) + } +} diff --git a/core/pkg/gateway/serverless_handlers_test.go b/core/pkg/gateway/serverless_handlers_test.go index a872c7b..0dcb4d6 100644 --- a/core/pkg/gateway/serverless_handlers_test.go +++ b/core/pkg/gateway/serverless_handlers_test.go @@ -41,6 +41,10 @@ func (m *mockFunctionRegistry) GetLogs(ctx context.Context, namespace, name stri return []serverless.LogEntry{}, nil } +func (m *mockFunctionRegistry) GetInvocations(ctx context.Context, namespace, name string, limit int) ([]serverless.Invocation, error) { + return []serverless.Invocation{}, nil +} + func TestServerlessHandlers_ListFunctions(t *testing.T) { logger := zap.NewNop() registry := &mockFunctionRegistry{ diff --git a/core/pkg/httputil/rpc_error.go b/core/pkg/httputil/rpc_error.go new file mode 100644 index 0000000..bbbe3e4 --- /dev/null +++ b/core/pkg/httputil/rpc_error.go @@ -0,0 +1,229 @@ +package httputil + +// rpc_error.go defines the canonical RPC error envelope used by every +// gateway endpoint that serves typed RPC-style errors (function invoke, +// function deploy, push send, pubsub publish, etc.). +// +// Why this exists: prior to this file the gateway returned at least three +// different error shapes — `{error: "msg"}` from one path, an envelope +// with `request_id/duration_ms/error` from another, and an absent `error` +// field in a third. Generic clients couldn't write a single error parser. +// +// Canonical shape (always populated): +// +// { +// "ok": false, +// "error": { +// "code": "VALIDATION_FAILED", // typed enum, never empty +// "message": "missing username", // human-readable, never empty +// "retryable": false, +// "request_id": "abc...", // optional +// "retry_after": 2.5 // optional, seconds (float) +// } +// } +// +// HTTP status code is set on the response and reflects the error class; +// the envelope's code is the typed enum a client switches on. + +import ( + "net/http" +) + +// RPCErrorCode is the typed error-code enum. New codes go here, alphabetic +// within their class. Codes are stable strings — clients pin to them. +type RPCErrorCode string + +const ( + // 4xx — client error + ErrCodeValidationFailed RPCErrorCode = "VALIDATION_FAILED" + ErrCodeUnauthorized RPCErrorCode = "UNAUTHORIZED" + ErrCodeForbidden RPCErrorCode = "FORBIDDEN" + ErrCodeNotFound RPCErrorCode = "NOT_FOUND" + ErrCodeConflict RPCErrorCode = "CONFLICT" + ErrCodeRateLimited RPCErrorCode = "RATE_LIMITED" + ErrCodePayloadTooLarge RPCErrorCode = "PAYLOAD_TOO_LARGE" + + // 5xx — server error + ErrCodeInternal RPCErrorCode = "INTERNAL" + ErrCodeServiceUnavailable RPCErrorCode = "SERVICE_UNAVAILABLE" + ErrCodeTimeout RPCErrorCode = "TIMEOUT" + + // Function-specific (5xx-mapped but distinct codes for client routing) + ErrCodeFunctionExecution RPCErrorCode = "FUNCTION_EXECUTION_FAILED" + ErrCodeFunctionDeploy RPCErrorCode = "FUNCTION_DEPLOY_FAILED" + ErrCodeSchemaMismatch RPCErrorCode = "SCHEMA_MISMATCH" +) + +// RPCErrorEnvelope is the canonical wire shape. Use WriteRPCError to emit; +// the struct is exported so SDK clients can decode/match against it. +type RPCErrorEnvelope struct { + OK bool `json:"ok"` + Error *RPCErrorDetail `json:"error"` +} + +// RPCErrorDetail is the typed error body. `Code` and `Message` are +// always populated by WriteRPCError — clients can rely on that contract. +type RPCErrorDetail struct { + Code RPCErrorCode `json:"code"` + Message string `json:"message"` + Retryable bool `json:"retryable"` + // Optional metadata. omitempty so clients don't see noise on simple cases. + RequestID string `json:"request_id,omitempty"` + RetryAfter float64 `json:"retry_after,omitempty"` +} + +// RPCErrorOption customizes the envelope (request id, retry-after, etc.). +// Callers build chains like WriteRPCError(w, 429, code, msg, +// WithRetryAfter(2.5), WithRequestID(reqID)). +type RPCErrorOption func(*RPCErrorDetail) + +// WithRequestID attaches the gateway request ID to the error detail. +// Useful for cross-referencing client errors with gateway logs. +func WithRequestID(id string) RPCErrorOption { + return func(d *RPCErrorDetail) { d.RequestID = id } +} + +// WithRetryAfter sets the retry-after hint (seconds, float). Sets +// Retryable=true automatically — anything with a retry hint is retryable. +func WithRetryAfter(seconds float64) RPCErrorOption { + return func(d *RPCErrorDetail) { + d.RetryAfter = seconds + d.Retryable = true + } +} + +// WithRetryable marks the error as retryable without a specific delay. +// Only set this for transient errors (rate limiting, temporary upstream +// unavailability). Don't set it for validation errors — retrying with the +// same input won't help. +func WithRetryable() RPCErrorOption { + return func(d *RPCErrorDetail) { d.Retryable = true } +} + +// WriteRPCError writes the canonical envelope. Both code and message are +// REQUIRED — empty values are normalized so clients never see an empty +// message. +// +// Example: +// +// httputil.WriteRPCError(w, http.StatusBadRequest, +// httputil.ErrCodeValidationFailed, "username required") +// +// With options: +// +// httputil.WriteRPCError(w, http.StatusTooManyRequests, +// httputil.ErrCodeRateLimited, "wallet over per-minute cap", +// httputil.WithRetryAfter(1.2), +// httputil.WithRequestID(reqID)) +func WriteRPCError(w http.ResponseWriter, status int, code RPCErrorCode, message string, opts ...RPCErrorOption) { + if code == "" { + code = ErrCodeInternal + } + if message == "" { + message = defaultMessageFor(code) + } + detail := &RPCErrorDetail{ + Code: code, + Message: message, + Retryable: defaultRetryableFor(code), + } + for _, opt := range opts { + opt(detail) + } + + // On rate-limit responses with a retry hint, also surface the standard + // HTTP Retry-After header so non-RPC-aware clients honor it. + if detail.RetryAfter > 0 { + w.Header().Set("Retry-After", formatFloat(detail.RetryAfter)) + } + + WriteJSON(w, status, RPCErrorEnvelope{OK: false, Error: detail}) +} + +// defaultMessageFor returns a sensible fallback when the caller passed +// an empty message string. We never leave the envelope's message empty — +// that was the bug. +func defaultMessageFor(code RPCErrorCode) string { + switch code { + case ErrCodeValidationFailed: + return "request failed validation" + case ErrCodeUnauthorized: + return "authentication required" + case ErrCodeForbidden: + return "access denied" + case ErrCodeNotFound: + return "resource not found" + case ErrCodeConflict: + return "request conflicts with current state" + case ErrCodeRateLimited: + return "rate limit exceeded" + case ErrCodePayloadTooLarge: + return "request payload too large" + case ErrCodeInternal: + return "internal server error" + case ErrCodeServiceUnavailable: + return "service temporarily unavailable" + case ErrCodeTimeout: + return "request timed out" + case ErrCodeFunctionExecution: + return "function execution failed" + case ErrCodeFunctionDeploy: + return "function deployment failed" + case ErrCodeSchemaMismatch: + return "gateway schema does not match required version" + default: + return "an error occurred" + } +} + +// defaultRetryableFor seeds the retryable bit based on the error class. +// Callers can override via WithRetryable() / WithRetryAfter(). +func defaultRetryableFor(code RPCErrorCode) bool { + switch code { + case ErrCodeRateLimited, ErrCodeServiceUnavailable, ErrCodeTimeout: + return true + default: + return false + } +} + +// formatFloat renders a float seconds value compactly for the +// Retry-After HTTP header. We avoid stdlib strconv import here to keep +// this file's dependency set tight. +func formatFloat(v float64) string { + if v <= 0 { + return "0" + } + // Round to one decimal place; clients don't need sub-100ms precision. + tenths := int64(v*10 + 0.5) + whole := tenths / 10 + frac := tenths % 10 + if frac == 0 { + return itoa(whole) + } + return itoa(whole) + "." + itoa(frac) +} + +// itoa is a small unsigned-int formatter (Go's strconv would also work +// but this keeps the file small + dep-free). +func itoa(n int64) string { + if n == 0 { + return "0" + } + neg := n < 0 + if neg { + n = -n + } + var buf [20]byte + i := len(buf) + for n > 0 { + i-- + buf[i] = byte('0' + n%10) + n /= 10 + } + if neg { + i-- + buf[i] = '-' + } + return string(buf[i:]) +} diff --git a/core/pkg/httputil/rpc_error_test.go b/core/pkg/httputil/rpc_error_test.go new file mode 100644 index 0000000..1f43cef --- /dev/null +++ b/core/pkg/httputil/rpc_error_test.go @@ -0,0 +1,155 @@ +package httputil + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func decode(t *testing.T, rec *httptest.ResponseRecorder) RPCErrorEnvelope { + t.Helper() + var env RPCErrorEnvelope + if err := json.NewDecoder(rec.Body).Decode(&env); err != nil { + t.Fatalf("decode: %v", err) + } + return env +} + +func TestWriteRPCError_canonical_shape(t *testing.T) { + rec := httptest.NewRecorder() + WriteRPCError(rec, http.StatusBadRequest, + ErrCodeValidationFailed, "username required") + + if rec.Code != http.StatusBadRequest { + t.Errorf("status = %d, want 400", rec.Code) + } + env := decode(t, rec) + if env.OK { + t.Error("ok must be false on error envelope") + } + if env.Error == nil { + t.Fatal("error must be non-nil") + } + if env.Error.Code != ErrCodeValidationFailed { + t.Errorf("code = %s, want %s", env.Error.Code, ErrCodeValidationFailed) + } + if env.Error.Message != "username required" { + t.Errorf("message = %q", env.Error.Message) + } + if env.Error.Retryable { + t.Error("validation errors should not default to retryable") + } +} + +// The contract: NEVER return an empty error.message. Bug #212 was clients +// seeing "RPC failed" because the gateway omitted the message field. This +// test locks in the fallback path. +func TestWriteRPCError_empty_message_is_filled_with_default(t *testing.T) { + cases := []struct { + code RPCErrorCode + wantInclude string + }{ + {ErrCodeValidationFailed, "validation"}, + {ErrCodeNotFound, "not found"}, + {ErrCodeRateLimited, "rate limit"}, + {ErrCodeInternal, "internal"}, + {ErrCodeFunctionExecution, "function execution"}, + } + for _, c := range cases { + t.Run(string(c.code), func(t *testing.T) { + rec := httptest.NewRecorder() + WriteRPCError(rec, http.StatusInternalServerError, c.code, "") + env := decode(t, rec) + if env.Error.Message == "" { + t.Errorf("message must NEVER be empty even when caller passed empty string") + } + if !strings.Contains(strings.ToLower(env.Error.Message), c.wantInclude) { + t.Errorf("message %q should include %q", env.Error.Message, c.wantInclude) + } + }) + } +} + +func TestWriteRPCError_empty_code_falls_back_to_internal(t *testing.T) { + rec := httptest.NewRecorder() + WriteRPCError(rec, http.StatusInternalServerError, "", "") + env := decode(t, rec) + if env.Error.Code != ErrCodeInternal { + t.Errorf("empty code should fall back to INTERNAL, got %s", env.Error.Code) + } + if env.Error.Message == "" { + t.Error("default message must be populated") + } +} + +func TestWriteRPCError_with_retry_after_sets_header_and_retryable(t *testing.T) { + rec := httptest.NewRecorder() + WriteRPCError(rec, http.StatusTooManyRequests, + ErrCodeRateLimited, "wallet over cap", + WithRetryAfter(2.5)) + + if got := rec.Header().Get("Retry-After"); got != "2.5" { + t.Errorf("Retry-After header = %q, want 2.5", got) + } + env := decode(t, rec) + if !env.Error.Retryable { + t.Error("WithRetryAfter must imply Retryable=true") + } + if env.Error.RetryAfter != 2.5 { + t.Errorf("retry_after = %v, want 2.5", env.Error.RetryAfter) + } +} + +func TestWriteRPCError_with_request_id(t *testing.T) { + rec := httptest.NewRecorder() + WriteRPCError(rec, http.StatusBadRequest, + ErrCodeValidationFailed, "bad input", + WithRequestID("req-abc")) + env := decode(t, rec) + if env.Error.RequestID != "req-abc" { + t.Errorf("request_id = %q, want req-abc", env.Error.RequestID) + } +} + +func TestWriteRPCError_default_retryable_for_transient_codes(t *testing.T) { + cases := []struct { + code RPCErrorCode + want bool + }{ + {ErrCodeRateLimited, true}, + {ErrCodeServiceUnavailable, true}, + {ErrCodeTimeout, true}, + {ErrCodeValidationFailed, false}, + {ErrCodeNotFound, false}, + {ErrCodeForbidden, false}, + {ErrCodeInternal, false}, + } + for _, c := range cases { + rec := httptest.NewRecorder() + WriteRPCError(rec, http.StatusInternalServerError, c.code, "") + env := decode(t, rec) + if env.Error.Retryable != c.want { + t.Errorf("%s: retryable=%v, want %v", c.code, env.Error.Retryable, c.want) + } + } +} + +func TestFormatFloat(t *testing.T) { + cases := []struct { + in float64 + want string + }{ + {0, "0"}, + {1.0, "1"}, + {1.5, "1.5"}, + {2.45, "2.5"}, // rounds to 1 decimal + {-1.0, "0"}, // negative clamps to 0 + } + for _, c := range cases { + if got := formatFloat(c.in); got != c.want { + t.Errorf("formatFloat(%v) = %q, want %q", c.in, got, c.want) + } + } +} diff --git a/core/pkg/serverless/engine.go b/core/pkg/serverless/engine.go index 4e9534a..c96ba6f 100644 --- a/core/pkg/serverless/engine.go +++ b/core/pkg/serverless/engine.go @@ -416,13 +416,16 @@ func (e *Engine) registerHostModule(ctx context.Context) error { for _, moduleName := range []string{"env", "host", "orama"} { _, err := e.runtime.NewHostModuleBuilder(moduleName). NewFunctionBuilder().WithFunc(e.hGetCallerWallet).Export("get_caller_wallet"). + NewFunctionBuilder().WithFunc(e.hGetCallerJWTSubject).Export("get_caller_jwt_subject"). NewFunctionBuilder().WithFunc(e.hGetWSClientID).Export("get_ws_client_id"). NewFunctionBuilder().WithFunc(e.hGetCallerClaim).Export("get_caller_claim"). NewFunctionBuilder().WithFunc(e.hGetRequestID).Export("get_request_id"). NewFunctionBuilder().WithFunc(e.hGetEnv).Export("get_env"). NewFunctionBuilder().WithFunc(e.hGetSecret).Export("get_secret"). NewFunctionBuilder().WithFunc(e.hDBQuery).Export("db_query"). + NewFunctionBuilder().WithFunc(e.hDBQueryV2).Export("db_query_v2"). NewFunctionBuilder().WithFunc(e.hDBExecute).Export("db_execute"). + NewFunctionBuilder().WithFunc(e.hDBExecuteV2).Export("db_execute_v2"). NewFunctionBuilder().WithFunc(e.hDBTransaction).Export("db_transaction"). NewFunctionBuilder().WithFunc(e.hExecAndPublish).Export("exec_and_publish"). NewFunctionBuilder().WithFunc(e.hCacheGet).Export("cache_get"). @@ -469,6 +472,13 @@ func (e *Engine) hGetWSClientID(ctx context.Context, mod api.Module) uint64 { return e.executor.WriteToGuest(ctx, mod, []byte(cid)) } +// hGetCallerJWTSubject returns the JWT `sub` claim explicitly. Empty +// string if the request was not JWT-authenticated. See bug #215. +func (e *Engine) hGetCallerJWTSubject(ctx context.Context, mod api.Module) uint64 { + sub := e.hostServices.GetCallerJWTSubject(ctx) + return e.executor.WriteToGuest(ctx, mod, []byte(sub)) +} + // hGetCallerClaim reads a claim name from guest memory, looks it up on the // caller's JWT custom claims, and writes the value (or empty string) back. func (e *Engine) hGetCallerClaim(ctx context.Context, mod api.Module, namePtr, nameLen uint32) uint64 { @@ -660,6 +670,51 @@ func (e *Engine) hPubSubPublishBatch(ctx context.Context, mod api.Module, msgsPt return 1 } +// hDBExecuteV2 is the WASM-callable wrapper for DBExecuteV2 (bug #218). +// Inputs: pointer/length of SQL + JSON args. Returns a packed uint64 +// (ptr<<32 | len) pointing to the JSON envelope in guest memory, or 0 on +// host-side failure. The JSON envelope's "error" field carries SQL errors. +func (e *Engine) hDBExecuteV2(ctx context.Context, mod api.Module, queryPtr, queryLen, argsPtr, argsLen uint32) uint64 { + query, ok := e.executor.ReadFromGuest(mod, queryPtr, queryLen) + if !ok { + return 0 + } + var args []interface{} + if argsLen > 0 { + if err := e.executor.UnmarshalJSONFromGuest(mod, argsPtr, argsLen, &args); err != nil { + e.logger.Warn("db_execute_v2: failed to unmarshal args", zap.Error(err)) + return 0 + } + } + out, err := e.hostServices.DBExecuteV2(ctx, string(query), args) + if err != nil { + e.logger.Warn("host function db_execute_v2 failed", zap.Error(err)) + return 0 + } + return e.executor.WriteToGuest(ctx, mod, out) +} + +// hDBQueryV2 is the WASM-callable wrapper for DBQueryV2 (bug #218). +func (e *Engine) hDBQueryV2(ctx context.Context, mod api.Module, queryPtr, queryLen, argsPtr, argsLen uint32) uint64 { + query, ok := e.executor.ReadFromGuest(mod, queryPtr, queryLen) + if !ok { + return 0 + } + var args []interface{} + if argsLen > 0 { + if err := e.executor.UnmarshalJSONFromGuest(mod, argsPtr, argsLen, &args); err != nil { + e.logger.Warn("db_query_v2: failed to unmarshal args", zap.Error(err)) + return 0 + } + } + out, err := e.hostServices.DBQueryV2(ctx, string(query), args) + if err != nil { + e.logger.Warn("host function db_query_v2 failed", zap.Error(err)) + return 0 + } + return e.executor.WriteToGuest(ctx, mod, out) +} + // hDBTransaction is the WASM-callable wrapper for DBTransaction. // Input: pointer/length of opsJSON ({"ops":[{kind,sql,args},...]}). // Returns a packed uint64 (ptr<<32 | len) pointing to JSON BatchResult in diff --git a/core/pkg/serverless/hostfuncs_test.go b/core/pkg/serverless/hostfuncs_test.go index 1fa4421..08faf74 100644 --- a/core/pkg/serverless/hostfuncs_test.go +++ b/core/pkg/serverless/hostfuncs_test.go @@ -56,6 +56,14 @@ func (m *mockHostServices) DBExecute(ctx context.Context, query string, args []i return 0, nil } +func (m *mockHostServices) DBExecuteV2(ctx context.Context, query string, args []interface{}) ([]byte, error) { + return []byte(`{"rows_affected":0}`), nil +} + +func (m *mockHostServices) DBQueryV2(ctx context.Context, query string, args []interface{}) ([]byte, error) { + return []byte(`{"rows":[]}`), nil +} + func (m *mockHostServices) CacheGet(ctx context.Context, key string) ([]byte, error) { return nil, nil } @@ -154,6 +162,10 @@ func (m *mockHostServices) GetCallerClaim(ctx context.Context, name string) stri return "" } +func (m *mockHostServices) GetCallerJWTSubject(ctx context.Context) string { + return "" +} + func (m *mockHostServices) EnqueueBackground(ctx context.Context, functionName string, payload []byte) (string, error) { return "", nil } diff --git a/core/pkg/serverless/hostfunctions/context.go b/core/pkg/serverless/hostfunctions/context.go index 85e336c..d8f7b1f 100644 --- a/core/pkg/serverless/hostfunctions/context.go +++ b/core/pkg/serverless/hostfunctions/context.go @@ -68,14 +68,15 @@ func (h *HostFunctions) FunctionInvoke(ctx context.Context, name string, payload } req := &serverless.InvokeRequest{ - Namespace: cur.Namespace, - FunctionName: name, - Input: payload, - TriggerType: serverless.TriggerTypeWebSocket, - CallerWallet: cur.CallerWallet, - CallerIP: cur.CallerIP, - WSClientID: cur.WSClientID, - CallerClaims: cur.CallerClaims, + Namespace: cur.Namespace, + FunctionName: name, + Input: payload, + TriggerType: serverless.TriggerTypeWebSocket, + CallerWallet: cur.CallerWallet, + CallerIP: cur.CallerIP, + WSClientID: cur.WSClientID, + CallerClaims: cur.CallerClaims, + CallerJWTSubject: cur.CallerJWTSubject, } resp, err := inv.Invoke(ctx, req) if err != nil { @@ -165,3 +166,21 @@ func (h *HostFunctions) GetCallerClaim(ctx context.Context, name string) string } return h.invCtx.CallerClaims[name] } + +// GetCallerJWTSubject returns the JWT `sub` claim explicitly, independent +// of the API-key-vs-JWT precedence used by GetCallerWallet. Empty when the +// request was not JWT-authenticated. Bug #215. +// +// Use this when a function MUST bind on the JWT-signed identity (e.g. a +// signup flow that verifies the wallet the caller is registering matches +// the wallet that signed the auth challenge). GetCallerWallet may return +// the namespace pseudo-identifier if the caller also presents an API key. +func (h *HostFunctions) GetCallerJWTSubject(ctx context.Context) string { + h.invCtxLock.RLock() + defer h.invCtxLock.RUnlock() + + if h.invCtx == nil { + return "" + } + return h.invCtx.CallerJWTSubject +} diff --git a/core/pkg/serverless/hostfunctions/database.go b/core/pkg/serverless/hostfunctions/database.go index 023b19c..25fa8bd 100644 --- a/core/pkg/serverless/hostfunctions/database.go +++ b/core/pkg/serverless/hostfunctions/database.go @@ -31,6 +31,11 @@ func (h *HostFunctions) DBQuery(ctx context.Context, query string, args []interf } // DBExecute executes an INSERT/UPDATE/DELETE query and returns affected rows. +// +// IMPORTANT: this returns 0 for BOTH "0 rows affected" AND "SQL error" +// — callers can't distinguish. That ambiguity caused bug #218 (AnChat's +// migrate function silently dropped statements). For new code, prefer +// DBExecuteV2 which returns a typed envelope. func (h *HostFunctions) DBExecute(ctx context.Context, query string, args []interface{}) (int64, error) { if h.db == nil { return 0, &serverless.HostFunctionError{Function: "db_execute", Cause: serverless.ErrDatabaseUnavailable} @@ -45,6 +50,77 @@ func (h *HostFunctions) DBExecute(ctx context.Context, query string, args []inte return affected, nil } +// dbExecuteV2Result is the JSON wire shape returned by DBExecuteV2. +type dbExecuteV2Result struct { + RowsAffected int64 `json:"rows_affected"` + LastInsertID int64 `json:"last_insert_id,omitempty"` + Error string `json:"error,omitempty"` +} + +// DBExecuteV2 is the typed equivalent of DBExecute. Returns the same shape +// regardless of success/failure so callers can distinguish "0 rows affected" +// from "SQL error" — fixing the contract gap that caused bug #218. +// +// Returns a Go error only for host-side setup failures (no DB). SQL errors +// are encoded in the JSON envelope's "error" field. +func (h *HostFunctions) DBExecuteV2(ctx context.Context, query string, args []interface{}) ([]byte, error) { + if h.db == nil { + return nil, &serverless.HostFunctionError{ + Function: "db_execute_v2", + Cause: serverless.ErrDatabaseUnavailable, + } + } + + out := dbExecuteV2Result{} + result, err := h.db.Exec(ctx, query, args...) + if err != nil { + out.Error = err.Error() + buf, mErr := json.Marshal(out) + if mErr != nil { + return nil, &serverless.HostFunctionError{Function: "db_execute_v2", Cause: mErr} + } + return buf, nil + } + if result != nil { + out.RowsAffected, _ = result.RowsAffected() + out.LastInsertID, _ = result.LastInsertId() + } + buf, mErr := json.Marshal(out) + if mErr != nil { + return nil, &serverless.HostFunctionError{Function: "db_execute_v2", Cause: mErr} + } + return buf, nil +} + +// dbQueryV2Result is the JSON wire shape returned by DBQueryV2. +type dbQueryV2Result struct { + Rows []map[string]interface{} `json:"rows"` + Error string `json:"error,omitempty"` +} + +// DBQueryV2 is the typed equivalent of DBQuery. Distinguishes "empty +// result set" from "query failed" via the "error" field. +func (h *HostFunctions) DBQueryV2(ctx context.Context, query string, args []interface{}) ([]byte, error) { + if h.db == nil { + return nil, &serverless.HostFunctionError{ + Function: "db_query_v2", + Cause: serverless.ErrDatabaseUnavailable, + } + } + + out := dbQueryV2Result{Rows: []map[string]interface{}{}} + if err := h.db.Query(ctx, &out.Rows, query, args...); err != nil { + out.Error = err.Error() + // Reset rows to non-nil empty on error so callers get a stable shape. + out.Rows = []map[string]interface{}{} + } + buf, mErr := json.Marshal(out) + if mErr != nil { + return nil, &serverless.HostFunctionError{Function: "db_query_v2", Cause: mErr} + } + return buf, nil +} + // dbTransactionRequest is the WASM-side shape for db_transaction input. type dbTransactionRequest struct { Ops []rqlite.BatchOp `json:"ops"` diff --git a/core/pkg/serverless/hostfunctions/database_test.go b/core/pkg/serverless/hostfunctions/database_test.go index b4ae94f..3f91eda 100644 --- a/core/pkg/serverless/hostfunctions/database_test.go +++ b/core/pkg/serverless/hostfunctions/database_test.go @@ -2,6 +2,7 @@ package hostfunctions import ( "context" + "database/sql" "encoding/json" "strings" "sync/atomic" @@ -170,6 +171,148 @@ func TestExecAndPublish_no_namespace_in_context_rejected(t *testing.T) { _ = h } +// fakeExecClient is a minimal rqlite.Client stub focused on Exec/Query +// behavior for the v2 host call tests (bug #218 regression coverage). +type fakeExecClient struct { + rqlite.Client + execErr error + execRows int64 + execLastID int64 + queryErr error + queryRows []map[string]interface{} +} + +func (f *fakeExecClient) Exec(_ context.Context, _ string, _ ...any) (sql.Result, error) { + if f.execErr != nil { + return nil, f.execErr + } + return &fakeSQLResult{rows: f.execRows, lastID: f.execLastID}, nil +} + +func (f *fakeExecClient) Query(_ context.Context, dest any, _ string, _ ...any) error { + if f.queryErr != nil { + return f.queryErr + } + rows, ok := dest.(*[]map[string]interface{}) + if !ok { + return nil + } + *rows = append(*rows, f.queryRows...) + return nil +} + +type fakeSQLResult struct { + rows int64 + lastID int64 +} + +func (r *fakeSQLResult) LastInsertId() (int64, error) { return r.lastID, nil } +func (r *fakeSQLResult) RowsAffected() (int64, error) { return r.rows, nil } + +func TestDBExecuteV2_success(t *testing.T) { + fake := &fakeExecClient{execRows: 3, execLastID: 42} + h := newHFWithDB(fake) + + out, err := h.DBExecuteV2(context.Background(), "INSERT INTO t VALUES (?)", []interface{}{1}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + var res dbExecuteV2Result + if err := json.Unmarshal(out, &res); err != nil { + t.Fatalf("decode: %v", err) + } + if res.RowsAffected != 3 { + t.Errorf("rows_affected = %d, want 3", res.RowsAffected) + } + if res.LastInsertID != 42 { + t.Errorf("last_insert_id = %d, want 42", res.LastInsertID) + } + if res.Error != "" { + t.Errorf("error should be empty on success, got %q", res.Error) + } +} + +// The whole point of bug #218: when the SQL fails, the envelope must say +// so. Old DBExecute returned 0 — indistinguishable from "0 rows affected". +func TestDBExecuteV2_sql_error_populates_error_field(t *testing.T) { + fake := &fakeExecClient{execErr: errFakeDBFailure{msg: "no such column: missing"}} + h := newHFWithDB(fake) + + out, err := h.DBExecuteV2(context.Background(), "INSERT ...", nil) + if err != nil { + t.Fatalf("expected SQL errors via envelope, not Go error: %v", err) + } + var res dbExecuteV2Result + if err := json.Unmarshal(out, &res); err != nil { + t.Fatalf("decode: %v", err) + } + if res.Error == "" { + t.Fatal("error field must NOT be empty when SQL failed (bug #218 contract)") + } + if !strings.Contains(res.Error, "no such column") { + t.Errorf("error should preserve SQL message, got %q", res.Error) + } + if res.RowsAffected != 0 { + t.Errorf("rows_affected should be 0 on failure, got %d", res.RowsAffected) + } +} + +func TestDBExecuteV2_no_db_returns_go_error(t *testing.T) { + h := &HostFunctions{db: nil} + _, err := h.DBExecuteV2(context.Background(), "INSERT ...", nil) + if err == nil { + t.Fatal("expected Go error for setup failure (no DB)") + } +} + +func TestDBQueryV2_success_with_empty_rows(t *testing.T) { + fake := &fakeExecClient{queryRows: nil} // genuine "no rows" — not an error + h := newHFWithDB(fake) + + out, err := h.DBQueryV2(context.Background(), "SELECT * FROM t WHERE 0=1", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + var res dbQueryV2Result + if err := json.Unmarshal(out, &res); err != nil { + t.Fatalf("decode: %v", err) + } + if res.Error != "" { + t.Errorf("empty rows is NOT an error, got error=%q", res.Error) + } + if res.Rows == nil { + t.Error("rows must be non-nil empty slice for stable JSON shape") + } + if len(res.Rows) != 0 { + t.Errorf("rows = %v, want empty", res.Rows) + } +} + +func TestDBQueryV2_query_error_populates_error_field(t *testing.T) { + fake := &fakeExecClient{queryErr: errFakeDBFailure{msg: "syntax error"}} + h := newHFWithDB(fake) + + out, err := h.DBQueryV2(context.Background(), "SELECT bogus FROM t", nil) + if err != nil { + t.Fatalf("query errors should be in envelope, not Go error: %v", err) + } + var res dbQueryV2Result + if err := json.Unmarshal(out, &res); err != nil { + t.Fatalf("decode: %v", err) + } + if res.Error == "" { + t.Fatal("error field must NOT be empty when query failed") + } + if res.Rows == nil { + t.Error("rows must be non-nil even on error (stable shape)") + } +} + +// errFakeDBFailure is a sentinel for v2 tests. +type errFakeDBFailure struct{ msg string } + +func (e errFakeDBFailure) Error() string { return e.msg } + func TestDBTransaction_rollback_returns_committed_false_no_go_error(t *testing.T) { fake := &fakeBatchClient{ respond: func(ops []rqlite.BatchOp) (*rqlite.BatchResult, error) { diff --git a/core/pkg/serverless/hostfunctions/jwt_subject_test.go b/core/pkg/serverless/hostfunctions/jwt_subject_test.go new file mode 100644 index 0000000..7c0d84c --- /dev/null +++ b/core/pkg/serverless/hostfunctions/jwt_subject_test.go @@ -0,0 +1,51 @@ +package hostfunctions + +import ( + "context" + "testing" + + "github.com/DeBrosOfficial/network/pkg/serverless" +) + +// Bug #215: get_caller_jwt_subject must return the JWT `sub` exposed +// in the InvocationContext, independent of the caller-wallet field. + +func TestGetCallerJWTSubject_returns_sub_when_set(t *testing.T) { + h := &HostFunctions{} + h.SetInvocationContext(&serverless.InvocationContext{ + // CallerWallet is the namespace pseudo-wallet (resolved via API key); + // CallerJWTSubject is the actual signing wallet from the Bearer JWT. + CallerWallet: "anchat-test", + CallerJWTSubject: "A3ZGpMKPtsmYVtXr6Gnf5u4x6j4dgZWnpyXXFAiCibCC", + }) + defer h.ClearContext() + + if got := h.GetCallerJWTSubject(context.Background()); got != "A3ZGpMKPtsmYVtXr6Gnf5u4x6j4dgZWnpyXXFAiCibCC" { + t.Errorf("GetCallerJWTSubject = %q, want the JWT subject (not the namespace)", got) + } + // And GetCallerWallet still returns the resolved-wallet (namespace in + // this case) — they're independent accessors by design. + if got := h.GetCallerWallet(context.Background()); got != "anchat-test" { + t.Errorf("GetCallerWallet = %q, want anchat-test (the resolved caller-wallet)", got) + } +} + +func TestGetCallerJWTSubject_empty_when_not_jwt_authed(t *testing.T) { + h := &HostFunctions{} + h.SetInvocationContext(&serverless.InvocationContext{ + CallerWallet: "anchat-test", + CallerJWTSubject: "", // request was API-key only + }) + defer h.ClearContext() + + if got := h.GetCallerJWTSubject(context.Background()); got != "" { + t.Errorf("GetCallerJWTSubject = %q, want empty for API-key-only auth", got) + } +} + +func TestGetCallerJWTSubject_empty_when_no_invocation_context(t *testing.T) { + h := &HostFunctions{} + if got := h.GetCallerJWTSubject(context.Background()); got != "" { + t.Errorf("GetCallerJWTSubject = %q, want empty when no inv ctx", got) + } +} diff --git a/core/pkg/serverless/invoke.go b/core/pkg/serverless/invoke.go index 6d9a952..85bdbac 100644 --- a/core/pkg/serverless/invoke.go +++ b/core/pkg/serverless/invoke.go @@ -54,6 +54,10 @@ type InvokeRequest struct { WSClientID string `json:"ws_client_id,omitempty"` // CallerClaims holds custom JWT claims to expose via get_caller_claim. CallerClaims map[string]string `json:"caller_claims,omitempty"` + // CallerJWTSubject carries the JWT `sub` claim explicitly so the + // engine can populate InvocationContext.CallerJWTSubject — fixes the + // bug-#215 case where API-key precedence buries the JWT identity. + CallerJWTSubject string `json:"caller_jwt_subject,omitempty"` } // InvokeResponse contains the result of a function invocation. @@ -112,16 +116,17 @@ func (i *Invoker) Invoke(ctx context.Context, req *InvokeRequest) (*InvokeRespon // Build invocation context invCtx := &InvocationContext{ - RequestID: requestID, - FunctionID: fn.ID, - FunctionName: fn.Name, - Namespace: fn.Namespace, - CallerWallet: req.CallerWallet, - CallerIP: req.CallerIP, - TriggerType: req.TriggerType, - WSClientID: req.WSClientID, - EnvVars: envVars, - CallerClaims: req.CallerClaims, + RequestID: requestID, + FunctionID: fn.ID, + FunctionName: fn.Name, + Namespace: fn.Namespace, + CallerWallet: req.CallerWallet, + CallerIP: req.CallerIP, + TriggerType: req.TriggerType, + WSClientID: req.WSClientID, + EnvVars: envVars, + CallerClaims: req.CallerClaims, + CallerJWTSubject: req.CallerJWTSubject, } // Execute with retry logic diff --git a/core/pkg/serverless/mocks_test.go b/core/pkg/serverless/mocks_test.go index dbfdb53..21fb9ee 100644 --- a/core/pkg/serverless/mocks_test.go +++ b/core/pkg/serverless/mocks_test.go @@ -93,6 +93,10 @@ func (m *MockRegistry) GetLogs(ctx context.Context, namespace, name string, limi return []LogEntry{}, nil } +func (m *MockRegistry) GetInvocations(ctx context.Context, namespace, name string, limit int) ([]Invocation, error) { + return []Invocation{}, nil +} + // MockHostServices is a mock implementation of HostServices type MockHostServices struct { mu sync.RWMutex @@ -116,6 +120,14 @@ func (m *MockHostServices) DBExecute(ctx context.Context, query string, args []i return 0, nil } +func (m *MockHostServices) DBExecuteV2(ctx context.Context, query string, args []interface{}) ([]byte, error) { + return []byte(`{"rows_affected":0}`), nil +} + +func (m *MockHostServices) DBQueryV2(ctx context.Context, query string, args []interface{}) ([]byte, error) { + return []byte(`{"rows":[]}`), nil +} + func (m *MockHostServices) CacheGet(ctx context.Context, key string) ([]byte, error) { m.mu.RLock() defer m.mu.RUnlock() @@ -240,6 +252,10 @@ func (m *MockHostServices) GetCallerClaim(ctx context.Context, name string) stri return "" } +func (m *MockHostServices) GetCallerJWTSubject(ctx context.Context) string { + return "" +} + func (m *MockHostServices) EnqueueBackground(ctx context.Context, functionName string, payload []byte) (string, error) { return "job-123", nil } diff --git a/core/pkg/serverless/registry.go b/core/pkg/serverless/registry.go index 26b576e..46a8aee 100644 --- a/core/pkg/serverless/registry.go +++ b/core/pkg/serverless/registry.go @@ -426,6 +426,104 @@ func (r *Registry) GetLogs(ctx context.Context, namespace, name string, limit in return logs, nil } +// GetInvocations returns invocation history for a function in reverse +// chronological order, with any associated WASM log entries nested per +// record. This is the right answer to "what happened when this function +// was invoked" — `orama function logs ` consumes this view. +// +// Always populated when the function has been invoked at least once. +// Two queries (invocations + nested WASM logs) batched into one map. +func (r *Registry) GetInvocations(ctx context.Context, namespace, name string, limit int) ([]Invocation, error) { + if limit <= 0 { + limit = 50 + } + + invQuery := ` + SELECT i.id, i.request_id, i.trigger_type, i.caller_wallet, + i.input_size, i.output_size, i.started_at, i.completed_at, + i.duration_ms, i.status, i.error_message, i.memory_used_mb + FROM function_invocations i + JOIN functions f ON i.function_id = f.id + WHERE f.namespace = ? AND f.name = ? + ORDER BY i.started_at DESC + LIMIT ? + ` + var rows []struct { + ID string `db:"id"` + RequestID string `db:"request_id"` + TriggerType string `db:"trigger_type"` + CallerWallet string `db:"caller_wallet"` + InputSize int `db:"input_size"` + OutputSize int `db:"output_size"` + StartedAt time.Time `db:"started_at"` + CompletedAt time.Time `db:"completed_at"` + DurationMS int64 `db:"duration_ms"` + Status string `db:"status"` + ErrorMessage string `db:"error_message"` + MemoryUsedMB float64 `db:"memory_used_mb"` + } + if err := r.db.Query(ctx, &rows, invQuery, namespace, name, limit); err != nil { + return nil, fmt.Errorf("failed to query invocations: %w", err) + } + if len(rows) == 0 { + return []Invocation{}, nil + } + + // Batched fetch of nested WASM logs. + invIDs := make([]interface{}, len(rows)) + for i, r := range rows { + invIDs[i] = r.ID + } + placeholders := strings.Repeat("?,", len(invIDs)) + placeholders = placeholders[:len(placeholders)-1] + logsQuery := ` + SELECT invocation_id, level, message, timestamp + FROM function_logs + WHERE invocation_id IN (` + placeholders + `) + ORDER BY timestamp ASC + ` + var logRows []struct { + InvocationID string `db:"invocation_id"` + Level string `db:"level"` + Message string `db:"message"` + Timestamp time.Time `db:"timestamp"` + } + logsByInv := map[string][]LogEntry{} + if err := r.db.Query(ctx, &logRows, logsQuery, invIDs...); err != nil { + // Don't fail the whole call — invocation summary is still useful. + r.logger.Warn("failed to fetch nested WASM logs; returning invocations without them", + zap.Error(err)) + } else { + for _, lr := range logRows { + logsByInv[lr.InvocationID] = append(logsByInv[lr.InvocationID], LogEntry{ + Level: lr.Level, + Message: lr.Message, + Timestamp: lr.Timestamp, + }) + } + } + + out := make([]Invocation, len(rows)) + for i, row := range rows { + out[i] = Invocation{ + ID: row.ID, + RequestID: row.RequestID, + TriggerType: row.TriggerType, + CallerWallet: row.CallerWallet, + InputSize: row.InputSize, + OutputSize: row.OutputSize, + StartedAt: row.StartedAt, + CompletedAt: row.CompletedAt, + DurationMS: row.DurationMS, + Status: row.Status, + ErrorMessage: row.ErrorMessage, + MemoryUsedMB: row.MemoryUsedMB, + WASMLogs: logsByInv[row.ID], + } + } + return out, nil +} + // ----------------------------------------------------------------------------- // Private helpers // ----------------------------------------------------------------------------- diff --git a/core/pkg/serverless/registry/invocation_logger.go b/core/pkg/serverless/registry/invocation_logger.go index 45ad23b..c7f8bd7 100644 --- a/core/pkg/serverless/registry/invocation_logger.go +++ b/core/pkg/serverless/registry/invocation_logger.go @@ -3,6 +3,7 @@ package registry import ( "context" "fmt" + "strings" "time" "github.com/DeBrosOfficial/network/pkg/rqlite" @@ -66,7 +67,12 @@ func (l *InvocationLogger) Log(ctx context.Context, inv *InvocationRecordData) e return nil } -// GetLogs retrieves logs for a function. +// GetLogs retrieves WASM-emitted log entries for a function (rows in +// function_logs). Functions that don't call log_info / log_error from +// their WASM code will return an empty slice here — that's expected. +// +// For "what happened when this function was invoked" use GetInvocations +// instead; it always populates as long as the function has been invoked. func (l *InvocationLogger) GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error) { if limit <= 0 { limit = 100 @@ -102,3 +108,131 @@ func (l *InvocationLogger) GetLogs(ctx context.Context, namespace, name string, return logs, nil } + +// GetInvocations returns invocation history for a function in reverse +// chronological order, with any associated WASM log entries nested per +// record under WASMLogs. +// +// This is what `orama function logs ` displays — it's always +// populated as long as the function has been invoked at least once. +// Returns an empty slice (not an error) when there are no invocations. +// +// Implementation: two queries — first the invocation rows, then a +// single batched query for all WASM log entries belonging to those +// invocation IDs. We don't LEFT JOIN because that would return one +// row per (invocation × log entry) which is awkward to scan. +func (l *InvocationLogger) GetInvocations(ctx context.Context, namespace, name string, limit int) ([]Invocation, error) { + if limit <= 0 { + limit = 50 + } + + invQuery := ` + SELECT i.id, i.request_id, i.trigger_type, i.caller_wallet, + i.input_size, i.output_size, i.started_at, i.completed_at, + i.duration_ms, i.status, i.error_message, i.memory_used_mb + FROM function_invocations i + JOIN functions f ON i.function_id = f.id + WHERE f.namespace = ? AND f.name = ? + ORDER BY i.started_at DESC + LIMIT ? + ` + + var rows []struct { + ID string `db:"id"` + RequestID string `db:"request_id"` + TriggerType string `db:"trigger_type"` + CallerWallet string `db:"caller_wallet"` + InputSize int `db:"input_size"` + OutputSize int `db:"output_size"` + StartedAt time.Time `db:"started_at"` + CompletedAt time.Time `db:"completed_at"` + DurationMS int64 `db:"duration_ms"` + Status string `db:"status"` + ErrorMessage string `db:"error_message"` + MemoryUsedMB float64 `db:"memory_used_mb"` + } + if err := l.db.Query(ctx, &rows, invQuery, namespace, name, limit); err != nil { + return nil, fmt.Errorf("failed to query invocations: %w", err) + } + if len(rows) == 0 { + return []Invocation{}, nil + } + + // Batched fetch of WASM log entries for these invocation IDs. + // We use IN (?, ?, ...) with one placeholder per ID. This stays + // fast because limit is bounded (default 50). + invIDs := make([]string, len(rows)) + for i, r := range rows { + invIDs[i] = r.ID + } + logsByInv, err := l.fetchLogsForInvocations(ctx, invIDs) + if err != nil { + // Don't fail the whole call if WASM-log fetch fails; the + // invocation summary is still useful. Log and continue. + l.logger.Warn("failed to fetch nested WASM logs; returning invocations without them", + zap.Error(err)) + logsByInv = map[string][]LogEntry{} + } + + out := make([]Invocation, len(rows)) + for i, r := range rows { + out[i] = Invocation{ + ID: r.ID, + RequestID: r.RequestID, + TriggerType: r.TriggerType, + CallerWallet: r.CallerWallet, + InputSize: r.InputSize, + OutputSize: r.OutputSize, + StartedAt: r.StartedAt, + CompletedAt: r.CompletedAt, + DurationMS: r.DurationMS, + Status: r.Status, + ErrorMessage: r.ErrorMessage, + MemoryUsedMB: r.MemoryUsedMB, + WASMLogs: logsByInv[r.ID], + } + } + return out, nil +} + +// fetchLogsForInvocations returns a map of invocation_id → []LogEntry, +// fetching all entries in one query. +func (l *InvocationLogger) fetchLogsForInvocations(ctx context.Context, invocationIDs []string) (map[string][]LogEntry, error) { + if len(invocationIDs) == 0 { + return map[string][]LogEntry{}, nil + } + placeholders := strings.Repeat("?,", len(invocationIDs)) + placeholders = placeholders[:len(placeholders)-1] + query := ` + SELECT invocation_id, level, message, timestamp + FROM function_logs + WHERE invocation_id IN (` + placeholders + `) + ORDER BY timestamp ASC + ` + + args := make([]interface{}, len(invocationIDs)) + for i, id := range invocationIDs { + args[i] = id + } + + var rows []struct { + InvocationID string `db:"invocation_id"` + Level string `db:"level"` + Message string `db:"message"` + Timestamp time.Time `db:"timestamp"` + } + if err := l.db.Query(ctx, &rows, query, args...); err != nil { + return nil, fmt.Errorf("failed to query logs: %w", err) + } + + out := make(map[string][]LogEntry, len(invocationIDs)) + for _, r := range rows { + out[r.InvocationID] = append(out[r.InvocationID], LogEntry{ + Level: r.Level, + Message: r.Message, + Timestamp: r.Timestamp, + }) + } + return out, nil +} + diff --git a/core/pkg/serverless/registry/invocation_logger_test.go b/core/pkg/serverless/registry/invocation_logger_test.go new file mode 100644 index 0000000..a2b72bf --- /dev/null +++ b/core/pkg/serverless/registry/invocation_logger_test.go @@ -0,0 +1,221 @@ +package registry + +import ( + "context" + "testing" + "time" + + "github.com/DeBrosOfficial/network/pkg/rqlite" + "go.uber.org/zap" +) + +// fakeDB is a tiny rqlite.Client stub that lets tests script Query / Exec +// behavior. We only implement what InvocationLogger calls. +type fakeDB struct { + rqlite.Client + queries []recordedQuery + // onQuery is called for every Query() invocation, in order. Each + // callback fills in the destination slice and returns an optional + // error. Tests pop callbacks; running out is a test bug. + onQuery []func(dest interface{}) error +} + +type recordedQuery struct { + sql string + args []interface{} +} + +func (f *fakeDB) Query(_ context.Context, dest interface{}, sql string, args ...interface{}) error { + f.queries = append(f.queries, recordedQuery{sql: sql, args: args}) + if len(f.onQuery) == 0 { + return nil + } + cb := f.onQuery[0] + f.onQuery = f.onQuery[1:] + return cb(dest) +} + +func TestGetInvocations_no_invocations_returns_empty_slice(t *testing.T) { + db := &fakeDB{} + il := NewInvocationLogger(db, zap.NewNop()) + + got, err := il.GetInvocations(context.Background(), "ns", "fn", 50) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got == nil { + t.Fatal("expected non-nil empty slice; nil breaks JSON marshalling for clients") + } + if len(got) != 0 { + t.Errorf("expected empty, got %d records", len(got)) + } +} + +func TestGetInvocations_populates_records_and_nests_logs(t *testing.T) { + now := time.Now() + // Arrange the fake to return two invocations on the first Query, then + // two log rows on the second. + db := &fakeDB{ + onQuery: []func(dest interface{}) error{ + // Invocation rows. + func(dest interface{}) error { + rows := dest.(*[]struct { + ID string `db:"id"` + RequestID string `db:"request_id"` + TriggerType string `db:"trigger_type"` + CallerWallet string `db:"caller_wallet"` + InputSize int `db:"input_size"` + OutputSize int `db:"output_size"` + StartedAt time.Time `db:"started_at"` + CompletedAt time.Time `db:"completed_at"` + DurationMS int64 `db:"duration_ms"` + Status string `db:"status"` + ErrorMessage string `db:"error_message"` + MemoryUsedMB float64 `db:"memory_used_mb"` + }) + *rows = append(*rows, + struct { + ID string `db:"id"` + RequestID string `db:"request_id"` + TriggerType string `db:"trigger_type"` + CallerWallet string `db:"caller_wallet"` + InputSize int `db:"input_size"` + OutputSize int `db:"output_size"` + StartedAt time.Time `db:"started_at"` + CompletedAt time.Time `db:"completed_at"` + DurationMS int64 `db:"duration_ms"` + Status string `db:"status"` + ErrorMessage string `db:"error_message"` + MemoryUsedMB float64 `db:"memory_used_mb"` + }{ + ID: "inv-1", RequestID: "req-A", Status: "success", + DurationMS: 12, StartedAt: now, + }, + struct { + ID string `db:"id"` + RequestID string `db:"request_id"` + TriggerType string `db:"trigger_type"` + CallerWallet string `db:"caller_wallet"` + InputSize int `db:"input_size"` + OutputSize int `db:"output_size"` + StartedAt time.Time `db:"started_at"` + CompletedAt time.Time `db:"completed_at"` + DurationMS int64 `db:"duration_ms"` + Status string `db:"status"` + ErrorMessage string `db:"error_message"` + MemoryUsedMB float64 `db:"memory_used_mb"` + }{ + ID: "inv-2", RequestID: "req-B", Status: "error", + ErrorMessage: "boom", StartedAt: now.Add(-1 * time.Minute), + }, + ) + return nil + }, + // Nested WASM log rows: one for inv-1, none for inv-2. + func(dest interface{}) error { + rows := dest.(*[]struct { + InvocationID string `db:"invocation_id"` + Level string `db:"level"` + Message string `db:"message"` + Timestamp time.Time `db:"timestamp"` + }) + *rows = append(*rows, struct { + InvocationID string `db:"invocation_id"` + Level string `db:"level"` + Message string `db:"message"` + Timestamp time.Time `db:"timestamp"` + }{ + InvocationID: "inv-1", Level: "info", Message: "hi", Timestamp: now, + }) + return nil + }, + }, + } + il := NewInvocationLogger(db, zap.NewNop()) + + got, err := il.GetInvocations(context.Background(), "ns", "fn", 50) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) != 2 { + t.Fatalf("expected 2 invocations, got %d", len(got)) + } + + if got[0].ID != "inv-1" || got[0].Status != "success" { + t.Errorf("first invocation wrong: %+v", got[0]) + } + if len(got[0].WASMLogs) != 1 || got[0].WASMLogs[0].Message != "hi" { + t.Errorf("expected nested WASM log on inv-1, got %+v", got[0].WASMLogs) + } + if got[1].ID != "inv-2" || got[1].ErrorMessage != "boom" { + t.Errorf("second invocation wrong: %+v", got[1]) + } + if len(got[1].WASMLogs) != 0 { + t.Errorf("expected no WASM logs on inv-2, got %+v", got[1].WASMLogs) + } +} + +func TestGetInvocations_log_query_failure_does_not_drop_records(t *testing.T) { + // Even if the WASM-logs query fails, the invocation summary is still + // useful — we degrade gracefully (log a warn) rather than failing the + // whole call. + db := &fakeDB{ + onQuery: []func(dest interface{}) error{ + // Invocation query: one row. + func(dest interface{}) error { + rows := dest.(*[]struct { + ID string `db:"id"` + RequestID string `db:"request_id"` + TriggerType string `db:"trigger_type"` + CallerWallet string `db:"caller_wallet"` + InputSize int `db:"input_size"` + OutputSize int `db:"output_size"` + StartedAt time.Time `db:"started_at"` + CompletedAt time.Time `db:"completed_at"` + DurationMS int64 `db:"duration_ms"` + Status string `db:"status"` + ErrorMessage string `db:"error_message"` + MemoryUsedMB float64 `db:"memory_used_mb"` + }) + *rows = append(*rows, struct { + ID string `db:"id"` + RequestID string `db:"request_id"` + TriggerType string `db:"trigger_type"` + CallerWallet string `db:"caller_wallet"` + InputSize int `db:"input_size"` + OutputSize int `db:"output_size"` + StartedAt time.Time `db:"started_at"` + CompletedAt time.Time `db:"completed_at"` + DurationMS int64 `db:"duration_ms"` + Status string `db:"status"` + ErrorMessage string `db:"error_message"` + MemoryUsedMB float64 `db:"memory_used_mb"` + }{ID: "inv-1", Status: "success"}) + return nil + }, + // Nested-log query: simulate a transient DB error. + func(_ interface{}) error { + return errFake + }, + }, + } + il := NewInvocationLogger(db, zap.NewNop()) + + got, err := il.GetInvocations(context.Background(), "ns", "fn", 50) + if err != nil { + t.Fatalf("expected graceful degradation, got error: %v", err) + } + if len(got) != 1 { + t.Fatalf("expected 1 invocation, got %d", len(got)) + } + if len(got[0].WASMLogs) != 0 { + t.Errorf("expected empty WASMLogs on log-fetch failure, got %+v", got[0].WASMLogs) + } +} + +// errFake is a sentinel for log-query failure tests. +var errFake = &fakeError{} + +type fakeError struct{} + +func (e *fakeError) Error() string { return "fake db error" } diff --git a/core/pkg/serverless/registry/registry.go b/core/pkg/serverless/registry/registry.go index 8a8a59e..ff63716 100644 --- a/core/pkg/serverless/registry/registry.go +++ b/core/pkg/serverless/registry/registry.go @@ -105,11 +105,20 @@ func (r *Registry) GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, er return r.ipfsStore.Get(ctx, wasmCID) } -// GetLogs retrieves logs for a function. +// GetLogs retrieves WASM-emitted log entries for a function. Most functions +// don't call log_info / log_error, so this is often empty — see +// GetInvocations for the always-populated invocation-history view. func (r *Registry) GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error) { return r.invocationLogger.GetLogs(ctx, namespace, name, limit) } +// GetInvocations returns the invocation-history view for a function: +// timestamp / status / duration / error_message per invocation, with any +// associated WASM log entries nested per record. +func (r *Registry) GetInvocations(ctx context.Context, namespace, name string, limit int) ([]Invocation, error) { + return r.invocationLogger.GetInvocations(ctx, namespace, name, limit) +} + // GetEnvVars retrieves environment variables for a function. func (r *Registry) GetEnvVars(ctx context.Context, functionID string) (map[string]string, error) { return r.functionStore.GetEnvVars(ctx, functionID) diff --git a/core/pkg/serverless/registry/types.go b/core/pkg/serverless/registry/types.go index 9e11fc1..813f455 100644 --- a/core/pkg/serverless/registry/types.go +++ b/core/pkg/serverless/registry/types.go @@ -66,13 +66,39 @@ type Function struct { WSMaxInflightPerConn int } -// LogEntry represents a log message from a function. +// LogEntry represents a log message emitted from inside a WASM function +// via the log_info / log_error host calls. type LogEntry struct { Level string Message string Timestamp time.Time } +// Invocation is a record of one function invocation, returned by +// GetInvocations. It always populates regardless of whether the function +// emitted any log_info/log_error calls — the WASM-emitted entries are +// nested under WASMLogs (which may be empty). +// +// This is the right answer to "what happened on this invocation" — the +// CLI's `function logs` and dashboard log views consume this. The +// older GetLogs(LogEntry) returns ONLY WASM-emitted entries, which is +// usually empty and confused users (bug #211). +type Invocation struct { + ID string `json:"id"` + RequestID string `json:"request_id"` + TriggerType string `json:"trigger_type"` + CallerWallet string `json:"caller_wallet,omitempty"` + InputSize int `json:"input_size"` + OutputSize int `json:"output_size"` + StartedAt time.Time `json:"started_at"` + CompletedAt time.Time `json:"completed_at"` + DurationMS int64 `json:"duration_ms"` + Status string `json:"status"` + ErrorMessage string `json:"error_message,omitempty"` + MemoryUsedMB float64 `json:"memory_used_mb,omitempty"` + WASMLogs []LogEntry `json:"wasm_logs,omitempty"` +} + // FunctionRegistry interface type FunctionRegistry interface { Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) (*Function, error) @@ -80,7 +106,16 @@ type FunctionRegistry interface { List(ctx context.Context, namespace string) ([]*Function, error) Delete(ctx context.Context, namespace, name string, version int) error GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, error) + + // GetLogs returns ONLY WASM-emitted log entries (rows in function_logs). + // This is rarely useful on its own — most functions don't emit any. + // Prefer GetInvocations for the complete invocation-history view. GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error) + + // GetInvocations returns invocation history (always populated when the + // function has been invoked at least once) with any associated WASM + // log entries nested per record. Sorted by started_at DESC. + GetInvocations(ctx context.Context, namespace, name string, limit int) ([]Invocation, error) } // Error types diff --git a/core/pkg/serverless/types.go b/core/pkg/serverless/types.go index 12ad738..9f59faa 100644 --- a/core/pkg/serverless/types.go +++ b/core/pkg/serverless/types.go @@ -84,8 +84,34 @@ type FunctionRegistry interface { // GetWASMBytes retrieves the compiled WASM bytecode for a function. GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, error) - // GetLogs retrieves logs for a function. + // GetLogs returns WASM-emitted log entries (function_logs rows). Often + // empty because most functions don't call log_info / log_error. Use + // GetInvocations for the always-populated invocation-history view. GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error) + + // GetInvocations returns invocation history for a function in reverse + // chronological order, with any associated WASM log entries nested + // per record. Always populated when the function has been invoked. + GetInvocations(ctx context.Context, namespace, name string, limit int) ([]Invocation, error) +} + +// Invocation is the record of one function invocation as seen by +// `orama function logs`. Mirrors registry.Invocation; defined here at the +// public package boundary so callers don't need to import the inner package. +type Invocation struct { + ID string `json:"id"` + RequestID string `json:"request_id"` + TriggerType string `json:"trigger_type"` + CallerWallet string `json:"caller_wallet,omitempty"` + InputSize int `json:"input_size"` + OutputSize int `json:"output_size"` + StartedAt time.Time `json:"started_at"` + CompletedAt time.Time `json:"completed_at"` + DurationMS int64 `json:"duration_ms"` + Status string `json:"status"` + ErrorMessage string `json:"error_message,omitempty"` + MemoryUsedMB float64 `json:"memory_used_mb,omitempty"` + WASMLogs []LogEntry `json:"wasm_logs,omitempty"` } // FunctionExecutor handles the actual execution of WASM functions. @@ -256,6 +282,14 @@ type InvocationContext struct { // the standard sub/namespace fields). Read via host fn `get_caller_claim`. // Populated by auth handlers from JWTClaims.Custom; empty for non-JWT auth. CallerClaims map[string]string `json:"caller_claims,omitempty"` + + // CallerJWTSubject is the `sub` claim of the Bearer JWT, if any. + // EXPLICITLY captured from the JWT independent of the API-key-vs-JWT + // wallet-resolution heuristic — so functions that must bind on the + // JWT-signed identity (signup flows) can do so reliably even when the + // caller also presents an API key. Empty string when the request was + // not JWT-authenticated. Bug #215. + CallerJWTSubject string `json:"caller_jwt_subject,omitempty"` } // InvocationResult represents the result of a function invocation. @@ -356,6 +390,27 @@ type HostServices interface { DBQuery(ctx context.Context, query string, args []interface{}) ([]byte, error) DBExecute(ctx context.Context, query string, args []interface{}) (int64, error) + // DBExecuteV2 is the typed equivalent of DBExecute that returns BOTH the + // rows-affected count AND a JSON envelope. The legacy DBExecute returns + // only uint32(rows) — collapsing real errors into "0 rows affected" + // (bug #218). New code should call DBExecuteV2 to detect real failures. + // + // Output JSON shape: + // {"rows_affected": 1, "last_insert_id": 42, "error": ""} // success + // {"rows_affected": 0, "last_insert_id": 0, "error": "..."} // failure + // + // Returns a Go error only on host-side validation failures (no DB, + // bad JSON args). SQL execution errors are encoded in the JSON. + DBExecuteV2(ctx context.Context, query string, args []interface{}) ([]byte, error) + + // DBQueryV2 is the typed equivalent of DBQuery — returns a JSON envelope + // distinguishing "empty result" from "query failed". + // + // Output JSON shape: + // {"rows": [...], "error": ""} // success (rows may be []) + // {"rows": [], "error": "..."} // failure + DBQueryV2(ctx context.Context, query string, args []interface{}) ([]byte, error) + // Cache operations CacheGet(ctx context.Context, key string) ([]byte, error) CacheSet(ctx context.Context, key string, value []byte, ttlSeconds int64) error @@ -448,6 +503,12 @@ type HostServices interface { // GetCallerClaim returns a custom JWT claim's value, or empty if missing // or the request was not JWT-authenticated. GetCallerClaim(ctx context.Context, name string) string + // GetCallerJWTSubject returns the JWT `sub` claim independent of the + // API-key-vs-JWT wallet-resolution heuristic. Empty when the request + // was not JWT-authenticated. Use this when a function must bind on + // the JWT-signed identity (e.g. signup-time wallet ownership checks) + // and the caller may ALSO present an API key. Bug #215. + GetCallerJWTSubject(ctx context.Context) string // Job operations EnqueueBackground(ctx context.Context, functionName string, payload []byte) (string, error)