docs: update deployment and serverless documentation

- bump version to 0.122.2
- document schema migration invariants and push notification configuration
- add serverless host function aliases and v2 database API documentation
- introduce schema roundtrip test to prevent migration drift
This commit is contained in:
anonpenguin23 2026-05-07 07:33:52 +03:00
parent c2556a14d7
commit 333b7233c1
35 changed files with 2630 additions and 133 deletions

View File

@ -63,7 +63,7 @@ test-e2e-quick:
.PHONY: build clean test deps tidy fmt vet lint install-hooks push-devnet push-testnet rollout-devnet rollout-testnet release
VERSION := 0.122.1
VERSION := 0.122.2
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'

View File

@ -94,6 +94,46 @@ orama monitor report --env testnet
- **DON'T** clear RQLite data directories unless doing a full cluster rebuild
- **DON'T** use `systemctl stop orama-node` on multiple nodes simultaneously
#### Schema-Migration Ordering Invariant
The gateway binary embeds a set of SQL migrations. The highest-numbered migration is the schema version that binary REQUIRES — **the gateway will refuse to start if its required schema isn't applied** (the schema-version contract added after the 2026-05-06 incident).
This means rolling upgrades have ONE invariant you must respect:
> The new gateway binary's required migrations must be applied to RQLite **before or as part of** starting the new binary on a node.
There are two acceptable patterns:
**Pattern A — let the gateway apply migrations on startup (default).**
The gateway calls `ApplyEmbeddedMigrations` during `NewDependencies` and asserts the schema is at the required version before serving traffic. If the apply succeeds, you're done. If a transient error blocks the apply, gateway startup aborts with a clear `schema mismatch: binary requires version N, database has M` error.
This is the default for both the genesis startup flow and rolling upgrades. No operator action required when it works.
**Pattern B — pre-apply migrations explicitly via the CLI.**
On any node:
```bash
sudo orama node schema status # show binary required vs applied
sudo orama node schema apply --yes # apply pending migrations
```
Then start the new gateway. Useful when you want explicit control during a high-risk upgrade or when the auto-apply path is failing for reasons you want to debug separately.
#### Verifying schema state remotely
Tenants can self-check schema drift without SSH access via:
```
GET /v1/schema-status
```
Returns `{ok, required_version, applied_version, in_sync, pending: [...]}`. The same data is available via `orama node schema status` for operators with shell access.
#### Build-time guard (CI)
`go test ./migrations/` runs a roundtrip test that opens an in-memory SQLite, applies every embedded migration, and exercises representative SQL operations from the platform's Go code. If a Go handler is added that references a column no migration creates, the test fails — drift is caught at PR review time, not at production deploy.
When adding a new platform table or column:
1. Write the migration in `core/migrations/NNN_description.sql`
2. Update the relevant Go code that reads/writes the new column
3. Add an exemplar to `migrations/roundtrip_test.go` mirroring the new SQL — this enforces the contract permanently
#### Recovery from Cluster Split
If nodes get stuck in "Candidate" state or show "leader not found" errors:
@ -449,6 +489,26 @@ sudo cp caddy-root-ca.crt /usr/local/share/ca-certificates/caddy-root-ca.crt
sudo update-ca-certificates
```
## Push notifications (optional, per gateway)
Push routes (`/v1/push/devices`, `/v1/push/send`) are always registered but
return `503 SERVICE_UNAVAILABLE` until at least one provider is configured
in the gateway config:
```yaml
# In your namespace gateway config:
push:
ntfy_base_url: "http://localhost:8080" # self-hosted ntfy (preferred for privacy)
expo_access_token: "..." # Expo push (alternative; client SDK lock-in)
```
Restart the gateway after editing. A `GET /v1/push/devices` call (with valid
auth) will then return `200` instead of `503`.
The 503 body explicitly names the config knobs operators need to set, so
tenants getting "Push not configured" can self-diagnose without filing a
ticket. (Bug #220 audit fix.)
## Project Structure
See [ARCHITECTURE.md](ARCHITECTURE.md) for the full architecture overview.

View File

@ -42,6 +42,10 @@ name: my-function # Required. Letters, digits, hyphens, underscores.
public: false # Allow unauthenticated invocation (default: false)
memory: 64 # Memory limit in MB (1-256, default: 64)
timeout: 30 # Execution timeout in seconds (1-300, default: 30)
# Bump to 60-300 for batch DB ops, schema migrations,
# or anything that does many sequential host calls.
# Functions that exceed timeout return the canonical
# TIMEOUT envelope: {ok:false, error:{code:"TIMEOUT",...}}.
retry:
count: 0 # Retry attempts on failure (default: 0)
delay: 5 # Seconds between retries (default: 5)
@ -99,15 +103,31 @@ tinygo build -o function.wasm -target wasi function.go
## Host Functions API
Host functions let your WASM code interact with Orama services. They are imported from the `"env"` or `"host"` module (both work) and use a pointer/length ABI for string parameters.
Host functions let your WASM code interact with Orama services. They use a pointer/length ABI for string parameters and are registered at runtime under three module-name aliases — all three resolve to the SAME function table:
All host functions are registered at runtime by the engine. They are available to every function without additional configuration.
| Module name | Status | Use |
|---|---|---|
| `env` | **canonical** | Recommended for new code. Matches the WASI / TinyGo convention used by every example in this doc and the `sdk/fn` package. |
| `host` | alias (kept) | Long-standing alternative; supported indefinitely. |
| `orama` | alias (kept) | Brand-name alias; supported indefinitely so existing code that intuited this name keeps working. |
A function may import any host call from any of the three names interchangeably:
```go
//go:wasmimport env db_query // canonical (preferred)
//go:wasmimport host db_query // identical
//go:wasmimport orama db_query // identical
```
If you see the runtime error `failed to instantiate module: module[X] not instantiated`, your function imported from a name other than the three above — fix the directive. Most functions written using the [`sdk/fn`](../sdk/fn) package don't need any `//go:wasmimport` directives at all (the SDK uses stdin/stdout for I/O).
### Context
| Function | Description |
|----------|-------------|
| `get_caller_wallet()` → string | Wallet address of the caller (from JWT) |
| `get_caller_wallet()` → string | Resolved caller wallet (JWT subject if Bearer auth, else namespace pseudo-id when API-key auth). |
| `get_caller_jwt_subject()` → string | JWT `sub` claim explicitly. Empty when the request was not JWT-authenticated. Use this when binding on the JWT-signed identity matters (e.g. signup flows verifying the caller signed for the wallet they're registering). |
| `get_caller_claim(name)` → string | Custom JWT claim by name (tier, subscription, etc.). Empty if missing or non-JWT request. |
| `get_request_id()` → string | Unique invocation ID |
| `get_env(key)` → string | Environment variable from function.yaml |
| `get_secret(name)` → string | Decrypted secret value (see [Managing Secrets](#managing-secrets)) |
@ -116,15 +136,36 @@ All host functions are registered at runtime by the engine. They are available t
| Function | Description |
|----------|-------------|
| `db_query(sql, argsJSON)` → JSON | Execute SELECT query. Args as JSON array. Returns JSON array of row objects. |
| `db_execute(sql, argsJSON)` → int | Execute INSERT/UPDATE/DELETE. Returns affected row count. |
| `db_query_v2(sql, argsJSON)` → JSON | **Recommended.** Execute SELECT. Returns `{"rows": [...], "error": "..."}` — distinguishes empty result from query failure. |
| `db_execute_v2(sql, argsJSON)` → JSON | **Recommended.** Execute INSERT/UPDATE/DELETE. Returns `{"rows_affected": N, "last_insert_id": M, "error": "..."}` — distinguishes 0-rows-affected from a real failure. |
| `db_query(sql, argsJSON)` → JSON | Legacy. Execute SELECT, returns JSON array of rows. No way to surface query errors — prefer `db_query_v2`. |
| `db_execute(sql, argsJSON)` → int | Legacy. Returns affected rows ONLY. **Returns 0 for both "0 rows" and "SQL error" — caller can't distinguish.** Prefer `db_execute_v2`. |
| `db_transaction(opsJSON)` → JSON | Atomic batch — see "Database Transactions" below. |
Example query from WASM:
```
db_query("SELECT push_token, device_type FROM devices WHERE user_id = ?", '["user123"]')
→ [{"push_token": "abc...", "device_type": "ios"}]
Example v2 usage from WASM:
```go
//go:wasmimport env db_execute_v2
func dbExecuteV2(sqlPtr, sqlLen, argsPtr, argsLen uint32) uint64
resultBytes := callDBExecuteV2(`INSERT INTO event_seq (topic, next_seq) VALUES (?, 0)
ON CONFLICT(topic) DO NOTHING`,
[]any{"user/abc/account"})
var res struct {
RowsAffected int64 `json:"rows_affected"`
Error string `json:"error"`
}
json.Unmarshal(resultBytes, &res)
if res.Error != "" {
// Real failure — bail out, don't mark migration applied.
return fmt.Errorf("event_seq INSERT failed: %s", res.Error)
}
// res.RowsAffected may legitimately be 0 (ON CONFLICT DO NOTHING) — that's not an error.
```
The legacy `db_execute` is kept indefinitely so existing functions don't break. New code should use `db_execute_v2` for any path where distinguishing "no rows" from "SQL error" matters — most paths.
### Cache (Olric Distributed Cache)
| Function | Description |

View File

@ -0,0 +1,237 @@
package migrations_test
// roundtrip_test.go is the build-time guard that prevents
// "binary references column X but X is missing from migrations"
// drift — the bug that triggered the AnChat-test outage on 2026-05-06.
//
// How it works:
//
// 1. Open an in-memory SQLite database.
// 2. Apply EVERY embedded migration in version order.
// 3. Run a series of "exemplar" SQL operations against the resulting
// schema. If any operation fails, the test fails — meaning either:
// a. A migration was deleted / renumbered and the schema regressed
// b. A new migration was added but isn't reachable via embed.FS
// c. (Most importantly) a Go file references a column / table /
// index that no migration creates
//
// The exemplars are drawn from the actual SQL strings the platform's
// Go code executes. Adding a new INSERT/SELECT in the gateway → add the
// matching exemplar here so drift is caught at `go test` time, not
// at production deploy.
//
// This is generic by design — every platform table participates. Adding
// a new table doesn't require new test infrastructure, only one new
// exemplar string.
import (
"database/sql"
"strings"
"testing"
"github.com/DeBrosOfficial/network/migrations"
"github.com/DeBrosOfficial/network/pkg/rqlite"
_ "github.com/mattn/go-sqlite3"
"go.uber.org/zap"
)
// TestSchemaRoundtrip_AllMigrationsApplyClean verifies every embedded
// migration applies successfully against a fresh database in version
// order. Failure here means a migration is broken in isolation
// (syntax error, references a missing prior migration's column, etc.).
func TestSchemaRoundtrip_AllMigrationsApplyClean(t *testing.T) {
db := openRoundtripDB(t)
if err := rqlite.ApplyEmbeddedMigrations(t.Context(), db, migrations.FS, zap.NewNop()); err != nil {
t.Fatalf("ApplyEmbeddedMigrations failed: %v", err)
}
// Sanity: applied version should equal RequiredVersion.
applied, err := migrations.AppliedVersion(t.Context(), db)
if err != nil {
t.Fatalf("AppliedVersion: %v", err)
}
if applied != migrations.RequiredVersion() {
t.Errorf("applied=%d != required=%d after full roundtrip", applied, migrations.RequiredVersion())
}
}
// TestSchemaRoundtrip_PlatformExemplars exercises representative SQL
// statements from the Go codebase against the migrated schema.
//
// Each exemplar is a string that should EXECUTE successfully (we don't
// care about row counts — only that the SQL parses and binds against
// the schema). Args are placeholders; values can be anything matching
// the column types.
//
// When a Go handler is added that touches a new table or column, add
// an exemplar here. The diff at review time enforces the contract:
// "if you write Go that uses column X, an exemplar exercises it,
// which means migrations must declare X."
func TestSchemaRoundtrip_PlatformExemplars(t *testing.T) {
db := openRoundtripDB(t)
if err := rqlite.ApplyEmbeddedMigrations(t.Context(), db, migrations.FS, zap.NewNop()); err != nil {
t.Fatalf("ApplyEmbeddedMigrations: %v", err)
}
// Each exemplar is (table, sql, args). The args don't have to satisfy
// constraints — we use Prepare to validate column references without
// actually running mutations. Statements that have to execute (because
// SQLite delays some checks) get marked exec=true.
type exemplar struct {
name string
sql string
args []any
exec bool // true: actually execute; false: just Prepare
}
exemplars := []exemplar{
// functions table — bug #214's table, which is why we care.
// Every column written by the function-store INSERT must be here.
{
name: "functions INSERT (full column list incl. ws_*)",
sql: `INSERT INTO functions (
id, name, namespace, version, wasm_cid,
memory_limit_mb, timeout_seconds, is_public,
retry_count, retry_delay_seconds, dlq_topic,
status, created_at, updated_at, created_by,
ws_persistent, ws_idle_timeout_sec, ws_max_frame_bytes, ws_max_inflight_per_conn
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
args: []any{
"id-1", "fn", "ns", 1, "cid-1",
64, 30, false,
0, 5, "",
"active", 0, 0, "ns",
false, 0, 0, 0,
},
exec: true,
},
{
name: "functions SELECT (full column list)",
sql: `SELECT id, name, namespace, version, wasm_cid, source_cid,
ws_persistent, ws_idle_timeout_sec, ws_max_frame_bytes, ws_max_inflight_per_conn,
memory_limit_mb, timeout_seconds, is_public,
retry_count, retry_delay_seconds, dlq_topic,
status, created_at, updated_at, created_by
FROM functions WHERE namespace = ? AND name = ?`,
args: []any{"ns", "fn"},
},
// function_invocations — used by the invocation-history view (#211 fix).
{
name: "function_invocations INSERT",
sql: `INSERT INTO function_invocations (
id, function_id, request_id, trigger_type, caller_wallet,
input_size, output_size, started_at, completed_at,
duration_ms, status, error_message, memory_used_mb
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
args: []any{
"inv-1", "id-1", "req-A", "http", "0xwallet",
0, 0, 0, 0,
0, "success", "", 0.0,
},
exec: true,
},
{
name: "function_invocations SELECT for GetInvocations",
sql: `SELECT i.id, i.request_id, i.trigger_type, i.caller_wallet,
i.input_size, i.output_size, i.started_at, i.completed_at,
i.duration_ms, i.status, i.error_message, i.memory_used_mb
FROM function_invocations i
JOIN functions f ON i.function_id = f.id
WHERE f.namespace = ? AND f.name = ?
ORDER BY i.started_at DESC LIMIT ?`,
args: []any{"ns", "fn", 50},
},
// function_logs — WASM-emitted log lines.
{
name: "function_logs INSERT",
sql: `INSERT INTO function_logs (
id, function_id, invocation_id, level, message, timestamp
) VALUES (?, ?, ?, ?, ?, ?)`,
args: []any{"log-1", "id-1", "inv-1", "info", "hi", 0},
exec: true,
},
// function_pubsub_triggers — wildcard trigger column rename (plan 03).
// During the dual-column rolling-upgrade window the Go code writes
// BOTH `topic` (legacy NOT NULL) and `topic_pattern` (new); this
// exemplar mirrors the actual INSERT and would catch a future
// migration that drops one column without a corresponding code change.
{
name: "function_pubsub_triggers INSERT (dual topic+topic_pattern)",
sql: `INSERT INTO function_pubsub_triggers (
id, function_id, topic, topic_pattern,
enabled, created_at,
aggregation_window_ms, aggregation_max_batch_size
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
args: []any{"trig-1", "id-1", "presence:*", "presence:*", true, 0, 0, 0},
exec: true,
},
// push_devices — created by migration 023; encrypted token storage.
{
name: "push_devices INSERT",
sql: `INSERT INTO push_devices (
id, namespace, user_id, device_id, provider,
token_encrypted, platform, app_version,
created_at, updated_at, last_seen
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
args: []any{
"dev-1", "ns", "u1", "device-A", "ntfy",
"enc:...", "ios", "1.0",
0, 0, 0,
},
exec: true,
},
// namespace_publish_seq — sequence counter from plan 08.
{
name: "namespace_publish_seq UPSERT",
sql: `INSERT INTO namespace_publish_seq (namespace, next_seq, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(namespace) DO UPDATE SET
next_seq = next_seq + 1,
updated_at = excluded.updated_at`,
args: []any{"ns", 2, 0},
exec: true,
},
}
for _, ex := range exemplars {
t.Run(ex.name, func(t *testing.T) {
if ex.exec {
if _, err := db.Exec(ex.sql, ex.args...); err != nil {
t.Errorf("schema drift: %v\nsql: %s", err, snippet(ex.sql))
}
return
}
stmt, err := db.Prepare(ex.sql)
if err != nil {
t.Errorf("schema drift (Prepare failed): %v\nsql: %s", err, snippet(ex.sql))
return
}
defer func() { _ = stmt.Close() }()
})
}
}
// openRoundtripDB returns an in-memory SQLite. Closes automatically on
// test cleanup.
func openRoundtripDB(t *testing.T) *sql.DB {
t.Helper()
db, err := sql.Open("sqlite3", ":memory:")
if err != nil {
t.Fatalf("open in-memory sqlite: %v", err)
}
t.Cleanup(func() { _ = db.Close() })
return db
}
// snippet trims a SQL string to fit on a single error line.
func snippet(s string) string {
s = strings.Join(strings.Fields(s), " ")
if len(s) > 140 {
return s[:140] + "..."
}
return s
}

View File

@ -3,31 +3,58 @@ package functions
import (
"fmt"
"strconv"
"strings"
"github.com/spf13/cobra"
)
var logsLimit int
var (
logsLimit int
logsWASMOnly bool
)
// LogsCmd retrieves function execution logs.
// LogsCmd retrieves function invocation history.
//
// Default view: invocation history (always populated when the function has
// been invoked) — request_id, status, duration, error_message, plus any
// WASM-emitted log entries nested per record.
//
// --wasm-only switches to the legacy view that returns ONLY entries
// emitted by the function via log_info / log_error (often empty).
var LogsCmd = &cobra.Command{
Use: "logs <name>",
Short: "Get execution logs for a function",
Long: "Retrieves the most recent execution logs for a deployed function.",
Args: cobra.ExactArgs(1),
RunE: runLogs,
Short: "Get invocation history for a function",
Long: `Retrieves the most recent invocations for a deployed function.
Each invocation record shows: timestamp, request_id, status, duration_ms,
and (if any) the error message. WASM functions that emit log entries via
log_info / log_error have those entries nested under each record.
Pass --wasm-only to retrieve only the WASM-emitted log lines (legacy
behavior; rarely useful on functions that don't call log_info).`,
Args: cobra.ExactArgs(1),
RunE: runLogs,
}
func init() {
LogsCmd.Flags().IntVar(&logsLimit, "limit", 50, "Maximum number of log entries to retrieve")
LogsCmd.Flags().IntVar(&logsLimit, "limit", 50, "Maximum number of records to retrieve")
LogsCmd.Flags().BoolVar(&logsWASMOnly, "wasm-only", false,
"Show only WASM-emitted log entries (legacy view)")
}
func runLogs(cmd *cobra.Command, args []string) error {
name := args[0]
endpoint := "/v1/functions/" + name + "/logs"
q := []string{}
if logsLimit > 0 {
endpoint += "?limit=" + strconv.Itoa(logsLimit)
q = append(q, "limit="+strconv.Itoa(logsLimit))
}
if logsWASMOnly {
q = append(q, "wasm_only=1")
}
if len(q) > 0 {
endpoint += "?" + strings.Join(q, "&")
}
result, err := apiGet(endpoint)
@ -35,9 +62,64 @@ func runLogs(cmd *cobra.Command, args []string) error {
return err
}
if logsWASMOnly {
return printWASMLogs(name, result)
}
return printInvocations(name, result)
}
// printInvocations renders the default invocation-history view.
func printInvocations(name string, result map[string]interface{}) error {
invs, ok := result["invocations"].([]interface{})
if !ok || len(invs) == 0 {
fmt.Printf("No invocations found for function %q.\n", name)
return nil
}
for _, entry := range invs {
inv, ok := entry.(map[string]interface{})
if !ok {
continue
}
started := valStr(inv, "started_at")
status := valStr(inv, "status")
reqID := valStr(inv, "request_id")
duration := valNumberAsString(inv, "duration_ms")
errMsg := valStr(inv, "error_message")
// Header line per invocation.
fmt.Printf("[%s] %s request=%s duration=%sms\n",
started, strings.ToUpper(status), reqID, duration)
if errMsg != "" {
fmt.Printf(" error: %s\n", errMsg)
}
// Nested WASM logs (if any).
if wasmLogs, ok := inv["wasm_logs"].([]interface{}); ok {
for _, l := range wasmLogs {
le, ok := l.(map[string]interface{})
if !ok {
continue
}
fmt.Printf(" %s [%s] %s\n",
valStr(le, "timestamp"),
strings.ToUpper(valStr(le, "level")),
valStr(le, "message"))
}
}
}
fmt.Printf("\nShowing %d invocation(s). Use --wasm-only for the legacy log-line view.\n",
len(invs))
return nil
}
// printWASMLogs renders the legacy WASM-only view.
func printWASMLogs(name string, result map[string]interface{}) error {
logs, ok := result["logs"].([]interface{})
if !ok || len(logs) == 0 {
fmt.Printf("No logs found for function %q.\n", name)
fmt.Printf("No WASM-emitted logs found for function %q. "+
"Tip: drop --wasm-only to see invocation history.\n", name)
return nil
}
@ -55,3 +137,23 @@ func runLogs(cmd *cobra.Command, args []string) error {
fmt.Printf("\nShowing %d log(s)\n", len(logs))
return nil
}
// valNumberAsString formats a JSON number field as a clean integer string.
func valNumberAsString(m map[string]interface{}, key string) string {
v, ok := m[key]
if !ok || v == nil {
return "0"
}
switch n := v.(type) {
case float64:
return strconv.FormatInt(int64(n), 10)
case int:
return strconv.Itoa(n)
case int64:
return strconv.FormatInt(n, 10)
case string:
return n
default:
return fmt.Sprintf("%v", n)
}
}

View File

@ -9,6 +9,7 @@ import (
"strings"
"time"
"github.com/DeBrosOfficial/network/pkg/httputil"
"github.com/DeBrosOfficial/network/pkg/serverless"
"go.uber.org/zap"
)
@ -126,7 +127,11 @@ func (h *ServerlessHandlers) DeployFunction(w http.ResponseWriter, r *http.Reque
zap.String("name", def.Name),
zap.Error(err),
)
writeError(w, http.StatusInternalServerError, "Failed to deploy: "+err.Error())
// Use the typed function-deploy code so clients can distinguish
// "registry rejected this binary" from generic 500s.
writeRPCError(w, http.StatusInternalServerError,
httputil.ErrCodeFunctionDeploy,
"Failed to deploy: "+err.Error())
return
}
@ -181,7 +186,51 @@ func writeJSON(w http.ResponseWriter, code int, v any) {
_ = json.NewEncoder(w).Encode(v)
}
// writeError writes a standardized JSON error
func writeError(w http.ResponseWriter, code int, msg string) {
writeJSON(w, code, map[string]any{"error": msg})
// writeError emits the canonical RPC error envelope (bug #212 fix).
//
// Derives the typed RPCErrorCode from the HTTP status — sufficient for
// most call sites. Callers that need to surface a specific code (e.g.
// FUNCTION_EXECUTION_FAILED on a 500 from the invoker) should use
// writeRPCError directly.
//
// Wire shape (always):
//
// {"ok": false, "error": {"code": "...", "message": "...", "retryable": ...}}
func writeError(w http.ResponseWriter, status int, msg string) {
httputil.WriteRPCError(w, status, codeForStatus(status), msg)
}
// writeRPCError is the typed helper for call sites that need to set a
// specific error code (e.g. distinguishing FUNCTION_EXECUTION_FAILED
// from a generic INTERNAL on a 500).
func writeRPCError(w http.ResponseWriter, status int, code httputil.RPCErrorCode, msg string, opts ...httputil.RPCErrorOption) {
httputil.WriteRPCError(w, status, code, msg, opts...)
}
// codeForStatus maps HTTP status to the canonical RPCErrorCode. For
// statuses that map to multiple codes (500 → INTERNAL or
// FUNCTION_EXECUTION_FAILED), the caller picks via writeRPCError.
func codeForStatus(status int) httputil.RPCErrorCode {
switch status {
case http.StatusBadRequest:
return httputil.ErrCodeValidationFailed
case http.StatusUnauthorized:
return httputil.ErrCodeUnauthorized
case http.StatusForbidden:
return httputil.ErrCodeForbidden
case http.StatusNotFound:
return httputil.ErrCodeNotFound
case http.StatusConflict:
return httputil.ErrCodeConflict
case http.StatusRequestEntityTooLarge:
return httputil.ErrCodePayloadTooLarge
case http.StatusTooManyRequests:
return httputil.ErrCodeRateLimited
case http.StatusServiceUnavailable:
return httputil.ErrCodeServiceUnavailable
case http.StatusGatewayTimeout:
return httputil.ErrCodeTimeout
default:
return httputil.ErrCodeInternal
}
}

View File

@ -20,12 +20,13 @@ import (
// mockRegistry implements serverless.FunctionRegistry for testing.
type mockRegistry struct {
functions map[string]*serverless.Function
logs []serverless.LogEntry
getErr error
listErr error
deleteErr error
logsErr error
functions map[string]*serverless.Function
logs []serverless.LogEntry
invocations []serverless.Invocation
getErr error
listErr error
deleteErr error
logsErr error
}
func newMockRegistry() *mockRegistry {
@ -78,6 +79,13 @@ func (m *mockRegistry) GetLogs(_ context.Context, _, _ string, _ int) ([]serverl
return m.logs, nil
}
func (m *mockRegistry) GetInvocations(_ context.Context, _, _ string, _ int) ([]serverless.Invocation, error) {
if m.logsErr != nil {
return nil, m.logsErr
}
return m.invocations, nil
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
@ -580,7 +588,7 @@ func TestDeployFunction_Base64WASMNotSupported(t *testing.T) {
t.Errorf("expected 400, got %d", rec.Code)
}
respBody := decodeBody(t, rec)
errMsg, _ := respBody["error"].(string)
errMsg := errMessageFromEnvelope(respBody)
if !strings.Contains(errMsg, "Base64 WASM upload not supported") {
t.Errorf("expected base64 not supported error, got %q", errMsg)
}
@ -600,12 +608,28 @@ func TestDeployFunction_JSONMissingWASM(t *testing.T) {
t.Errorf("expected 400, got %d", rec.Code)
}
respBody := decodeBody(t, rec)
errMsg, _ := respBody["error"].(string)
errMsg := errMessageFromEnvelope(respBody)
if !strings.Contains(errMsg, "name") {
t.Errorf("expected name-related error, got %q", errMsg)
}
}
// errMessageFromEnvelope extracts the human-readable message from the
// canonical RPC error envelope: {"ok": false, "error": {"message": "..."}}.
// Returns "" if the envelope shape is unexpected; tests use Contains on
// the result so a missing message will still fail loudly.
func errMessageFromEnvelope(body map[string]interface{}) string {
if body == nil {
return ""
}
errObj, ok := body["error"].(map[string]interface{})
if !ok {
return ""
}
msg, _ := errObj["message"].(string)
return msg
}
// ---------------------------------------------------------------------------
// Tests: DeleteFunction validation
// ---------------------------------------------------------------------------
@ -647,10 +671,14 @@ func TestDeleteFunction_NotFound(t *testing.T) {
// Tests: GetFunctionLogs
// ---------------------------------------------------------------------------
func TestGetFunctionLogs_Success(t *testing.T) {
// TestGetFunctionLogs_Success_DefaultInvocationsView locks in the bug-#211 fix:
// the default endpoint returns invocation history (always populated when the
// function has been invoked), NOT WASM-emitted log entries (often empty).
func TestGetFunctionLogs_Success_DefaultInvocationsView(t *testing.T) {
reg := newMockRegistry()
reg.logs = []serverless.LogEntry{
{Level: "info", Message: "hello"},
reg.invocations = []serverless.Invocation{
{ID: "inv-1", RequestID: "req-A", Status: "success", DurationMS: 12},
{ID: "inv-2", RequestID: "req-B", Status: "error", ErrorMessage: "boom"},
}
h := newTestHandlers(reg)
@ -667,8 +695,41 @@ func TestGetFunctionLogs_Success(t *testing.T) {
t.Errorf("expected name 'myFunc', got %v", body["name"])
}
count, ok := body["count"].(float64)
if !ok || int(count) != 2 {
t.Errorf("expected count=2, got %v", body["count"])
}
if _, ok := body["invocations"]; !ok {
t.Error("expected response to include 'invocations' key in default view")
}
if _, ok := body["logs"]; ok {
t.Error("default view should NOT return 'logs' key (legacy WASM-only)")
}
}
// TestGetFunctionLogs_WASMOnly preserves the legacy "raw WASM-emitted lines"
// view via the wasm_only=1 query param.
func TestGetFunctionLogs_WASMOnly(t *testing.T) {
reg := newMockRegistry()
reg.logs = []serverless.LogEntry{
{Level: "info", Message: "hello"},
}
h := newTestHandlers(reg)
req := httptest.NewRequest(http.MethodGet, "/v1/functions/myFunc/logs?namespace=test&wasm_only=1", nil)
rec := httptest.NewRecorder()
h.GetFunctionLogs(rec, req, "myFunc")
if rec.Code != http.StatusOK {
t.Errorf("expected 200, got %d", rec.Code)
}
body := decodeBody(t, rec)
count, ok := body["count"].(float64)
if !ok || int(count) != 1 {
t.Errorf("expected count=1, got %v", body["count"])
t.Errorf("expected count=1 from logs, got %v", body["count"])
}
if _, ok := body["logs"]; !ok {
t.Error("wasm_only=1 should return 'logs' key")
}
}
@ -717,10 +778,24 @@ func TestWriteError(t *testing.T) {
if rec.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d", rec.Code)
}
body := map[string]string{}
json.NewDecoder(rec.Body).Decode(&body)
if body["error"] != "something went wrong" {
t.Errorf("expected error message 'something went wrong', got %q", body["error"])
// writeError now emits the canonical RPC envelope (bug #212 fix).
// Shape: {"ok": false, "error": {"code": "VALIDATION_FAILED", "message": "...", "retryable": false}}
body := map[string]interface{}{}
if err := json.NewDecoder(rec.Body).Decode(&body); err != nil {
t.Fatalf("decode: %v", err)
}
if ok, _ := body["ok"].(bool); ok {
t.Error("ok must be false on error envelope")
}
errObj, ok := body["error"].(map[string]interface{})
if !ok {
t.Fatalf("error must be an object, got %T: %v", body["error"], body["error"])
}
if msg, _ := errObj["message"].(string); msg != "something went wrong" {
t.Errorf("expected message 'something went wrong', got %q", msg)
}
if code, _ := errObj["code"].(string); code != "VALIDATION_FAILED" {
t.Errorf("expected code VALIDATION_FAILED for 400, got %q", code)
}
}

View File

@ -10,6 +10,7 @@ import (
"strings"
"time"
"github.com/DeBrosOfficial/network/pkg/httputil"
"github.com/DeBrosOfficial/network/pkg/serverless"
)
@ -78,54 +79,65 @@ func (h *ServerlessHandlers) InvokeFunction(w http.ResponseWriter, r *http.Reque
defer cancel()
req := &serverless.InvokeRequest{
Namespace: namespace,
FunctionName: name,
Version: version,
Input: input,
TriggerType: serverless.TriggerTypeHTTP,
CallerWallet: callerWallet,
CallerIP: extractRemoteIP(r),
CallerClaims: h.getCallerClaimsFromRequest(r),
Namespace: namespace,
FunctionName: name,
Version: version,
Input: input,
TriggerType: serverless.TriggerTypeHTTP,
CallerWallet: callerWallet,
CallerIP: extractRemoteIP(r),
CallerClaims: h.getCallerClaimsFromRequest(r),
CallerJWTSubject: h.getJWTSubjectFromRequest(r),
}
resp, err := h.invoker.Invoke(ctx, req)
if err != nil {
statusCode := http.StatusInternalServerError
// Tiered rate limiter returns *RateLimitedError with retry-after.
// Bug #212: every error path here emits the canonical RPC
// envelope. error.message is always populated (falls back to
// err.Error() then to a default per code).
// Rate-limit errors carry a retry hint we surface as both the
// HTTP Retry-After header and the envelope field.
var rle *serverless.RateLimitedError
if errors.As(err, &rle) {
opts := []httputil.RPCErrorOption{}
if rle.RetryAfter > 0 {
w.Header().Set("Retry-After",
strconv.FormatFloat(rle.RetryAfter.Seconds(), 'f', 1, 64))
opts = append(opts, httputil.WithRetryAfter(rle.RetryAfter.Seconds()))
}
writeJSON(w, http.StatusTooManyRequests, map[string]interface{}{
"error": err.Error(),
"scope": rle.Scope,
"retry_after": rle.RetryAfter.Seconds(),
})
if resp != nil && resp.RequestID != "" {
opts = append(opts, httputil.WithRequestID(resp.RequestID))
}
writeRPCError(w, http.StatusTooManyRequests,
httputil.ErrCodeRateLimited, err.Error(), opts...)
return
}
if serverless.IsNotFound(err) {
// Map domain-typed errors to (status, RPC code).
statusCode := http.StatusInternalServerError
errCode := httputil.ErrCodeFunctionExecution
switch {
case serverless.IsNotFound(err):
statusCode = http.StatusNotFound
} else if serverless.IsResourceExhausted(err) {
errCode = httputil.ErrCodeNotFound
case serverless.IsResourceExhausted(err):
statusCode = http.StatusTooManyRequests
} else if serverless.IsUnauthorized(err) {
errCode = httputil.ErrCodeRateLimited
case serverless.IsUnauthorized(err):
statusCode = http.StatusUnauthorized
errCode = httputil.ErrCodeUnauthorized
}
if resp == nil {
writeJSON(w, statusCode, map[string]interface{}{
"error": err.Error(),
})
return
// Pick the most informative message: function-side resp.Error
// (if set) is more actionable than the wrapping err.Error().
msg := err.Error()
if resp != nil && resp.Error != "" {
msg = resp.Error
}
writeJSON(w, statusCode, map[string]interface{}{
"request_id": resp.RequestID,
"status": resp.Status,
"error": resp.Error,
"duration_ms": resp.DurationMS,
})
opts := []httputil.RPCErrorOption{}
if resp != nil && resp.RequestID != "" {
opts = append(opts, httputil.WithRequestID(resp.RequestID))
}
writeRPCError(w, statusCode, errCode, msg, opts...)
return
}

View File

@ -10,7 +10,26 @@ import (
)
// GetFunctionLogs handles GET /v1/functions/{name}/logs
// Retrieves execution logs for a specific function.
//
// Returns invocation history (always populated when the function has been
// invoked) with any associated WASM-emitted log entries nested per record.
// This is the answer to "what happened when this function ran" — the older
// behavior (only WASM-emitted entries) was useless on functions that
// don't call log_info / log_error and surfaced as "No logs found" to users.
//
// Optional query params:
// - limit: max records (default 50, capped at 500)
// - wasm_only=1: return ONLY WASM-emitted log rows (legacy view)
//
// Response:
//
// {
// "name": "...",
// "namespace": "...",
// "invocations": [ ...records... ], // when wasm_only is unset
// "logs": [ ...LogEntry... ], // when wasm_only=1
// "count": N
// }
func (h *ServerlessHandlers) GetFunctionLogs(w http.ResponseWriter, r *http.Request, name string) {
namespace := r.URL.Query().Get("namespace")
if namespace == "" {
@ -22,31 +41,56 @@ func (h *ServerlessHandlers) GetFunctionLogs(w http.ResponseWriter, r *http.Requ
return
}
limit := 100
limit := 50
if lStr := r.URL.Query().Get("limit"); lStr != "" {
if l, err := strconv.Atoi(lStr); err == nil {
if l, err := strconv.Atoi(lStr); err == nil && l > 0 {
limit = l
}
}
if limit > 500 {
limit = 500
}
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
logs, err := h.registry.GetLogs(ctx, namespace, name, limit)
// Legacy "WASM-emitted only" view. Kept for backward compat — most
// dashboards / clients should use the default invocations view.
if r.URL.Query().Get("wasm_only") == "1" {
logs, err := h.registry.GetLogs(ctx, namespace, name, limit)
if err != nil {
h.logger.Error("Failed to get WASM logs",
zap.String("name", name),
zap.String("namespace", namespace),
zap.Error(err),
)
writeError(w, http.StatusInternalServerError, "Failed to get logs")
return
}
writeJSON(w, http.StatusOK, map[string]interface{}{
"name": name,
"namespace": namespace,
"logs": logs,
"count": len(logs),
})
return
}
invocations, err := h.registry.GetInvocations(ctx, namespace, name, limit)
if err != nil {
h.logger.Error("Failed to get function logs",
h.logger.Error("Failed to get function invocations",
zap.String("name", name),
zap.String("namespace", namespace),
zap.Error(err),
)
writeError(w, http.StatusInternalServerError, "Failed to get logs")
writeError(w, http.StatusInternalServerError, "Failed to get invocations")
return
}
writeJSON(w, http.StatusOK, map[string]interface{}{
"name": name,
"namespace": namespace,
"logs": logs,
"count": len(logs),
"name": name,
"namespace": namespace,
"invocations": invocations,
"count": len(invocations),
})
}

View File

@ -2,6 +2,7 @@ package serverless
import (
"net/http"
"strings"
"github.com/DeBrosOfficial/network/pkg/gateway/auth"
"github.com/DeBrosOfficial/network/pkg/gateway/ctxkeys"
@ -113,6 +114,27 @@ func (h *ServerlessHandlers) getCallerClaimsFromRequest(r *http.Request) map[str
return out
}
// getJWTSubjectFromRequest returns the Bearer JWT's `sub` claim if present,
// independent of the API-key-vs-JWT precedence used for general wallet
// resolution. Returns "" when the request was not JWT-authenticated.
//
// This is the source of truth for `oh.GetCallerJWTSubject()` inside WASM
// — bug #215. Functions that must bind on the JWT-signed identity (e.g.
// signup paths verifying the registering wallet matches the auth-challenge
// signer) read this instead of GetCallerWallet, which returns the namespace
// pseudo-identifier when the API key is the resolved auth.
func (h *ServerlessHandlers) getJWTSubjectFromRequest(r *http.Request) string {
v := r.Context().Value(ctxkeys.JWT)
if v == nil {
return ""
}
claims, ok := v.(*auth.JWTClaims)
if !ok || claims == nil {
return ""
}
return strings.TrimSpace(claims.Sub)
}
// getWalletFromRequest extracts wallet address from JWT.
func (h *ServerlessHandlers) getWalletFromRequest(r *http.Request) string {
// Import strings package functions inline to avoid circular dependencies

View File

@ -125,6 +125,7 @@ func (h *ServerlessHandlers) HandleWebSocket(w http.ResponseWriter, r *http.Requ
// Capture custom claims at upgrade time and reuse for every frame —
// the JWT context is request-scoped and won't survive past upgrade.
callerClaims := h.getCallerClaimsFromRequest(r)
callerJWTSubject := h.getJWTSubjectFromRequest(r)
// Message loop
for {
@ -141,15 +142,16 @@ func (h *ServerlessHandlers) HandleWebSocket(w http.ResponseWriter, r *http.Requ
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
req := &serverless.InvokeRequest{
Namespace: namespace,
FunctionName: name,
Version: version,
Input: message,
TriggerType: serverless.TriggerTypeWebSocket,
CallerWallet: callerWallet,
CallerIP: callerIP,
CallerClaims: callerClaims,
WSClientID: clientID,
Namespace: namespace,
FunctionName: name,
Version: version,
Input: message,
TriggerType: serverless.TriggerTypeWebSocket,
CallerWallet: callerWallet,
CallerIP: callerIP,
CallerClaims: callerClaims,
CallerJWTSubject: callerJWTSubject,
WSClientID: clientID,
}
resp, err := h.invoker.Invoke(ctx, req)

View File

@ -2,11 +2,13 @@ package gateway
import (
"context"
"errors"
"fmt"
"hash/fnv"
"io"
"net"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
@ -15,10 +17,48 @@ import (
"github.com/DeBrosOfficial/network/pkg/client"
"github.com/DeBrosOfficial/network/pkg/deployments"
"github.com/DeBrosOfficial/network/pkg/gateway/auth"
"github.com/DeBrosOfficial/network/pkg/httputil"
"github.com/DeBrosOfficial/network/pkg/logging"
"go.uber.org/zap"
)
// isLongRunningProxyPath returns true for paths whose expected work bound
// exceeds the default 30s proxy timeout. New classes go here, alphabetically.
//
// - /v1/functions/.../invoke and /v1/invoke/... — up to 300s per fn
// - /v1/functions/.../ws — long-lived WS
// - /v1/storage/upload, /v1/storage/pin — IPFS add can be slow
//
// Adding a path here is preferable to bumping the global timeout: each
// path's bound is documented in one grep-able place.
func isLongRunningProxyPath(p string) bool {
switch {
case strings.HasPrefix(p, "/v1/storage/upload"),
strings.HasPrefix(p, "/v1/storage/pin"),
strings.HasPrefix(p, "/v1/invoke/"),
strings.HasPrefix(p, "/v1/functions/") && (strings.HasSuffix(p, "/invoke") || strings.HasSuffix(p, "/ws")):
return true
}
return false
}
// isProxyTimeoutErr returns true when an HTTP client error is a timeout —
// either the http.Client overall timeout or context.DeadlineExceeded
// propagating from a parent context.
func isProxyTimeoutErr(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) {
return true
}
var urlErr *url.Error
if errors.As(err, &urlErr) && urlErr.Timeout() {
return true
}
return false
}
// Note: context keys (ctxKeyAPIKey, ctxKeyJWT, CtxKeyNamespaceOverride) are now defined in context.go
// Internal auth headers for trusted inter-gateway communication.
@ -1020,7 +1060,12 @@ func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.R
}
}
if selected.ip == "" {
http.Error(w, "Namespace gateway unavailable (all circuits open)", http.StatusServiceUnavailable)
// Bug #219: emit canonical envelope with retryable=true (transient).
httputil.WriteRPCError(w, http.StatusServiceUnavailable,
httputil.ErrCodeServiceUnavailable,
"namespace gateway unavailable: all upstream circuits are open. "+
"Wait a few seconds and retry, or check `orama monitor report` for unhealthy nodes.",
httputil.WithRetryable())
return
}
gatewayIP := selected.ip
@ -1082,9 +1127,18 @@ func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.R
proxyReq.Header.Set(HeaderInternalAuthNamespace, validatedNamespace)
}
// Use a longer timeout for upload paths (IPFS add can be slow for large files)
// Pick the proxy timeout based on the path's expected work bound.
// Defaults to 30s for fast paths; bumps to 300s for known long-running
// classes (uploads, function invocations) so we don't truncate before
// the namespace gateway's own per-function timeout fires.
//
// Bug #219: function invokes were truncated at 30s and surfaced as
// "Namespace gateway unavailable" — misleading because the gateway IS
// up, the FUNCTION just exceeded its budget. Aligning the proxy
// timeout with the function-level cap ensures the namespace gateway's
// proper TIMEOUT envelope reaches the client first.
proxyTimeout := 30 * time.Second
if strings.HasPrefix(r.URL.Path, "/v1/storage/upload") || strings.HasPrefix(r.URL.Path, "/v1/storage/pin") {
if isLongRunningProxyPath(r.URL.Path) {
proxyTimeout = 300 * time.Second
}
@ -1098,7 +1152,22 @@ func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.R
zap.String("target", gatewayIP),
zap.Error(err),
)
http.Error(w, "Namespace gateway unavailable", http.StatusServiceUnavailable)
// Distinguish timeout from connection failure so clients get an
// actionable code (#219). "Namespace gateway unavailable" was
// emitted indiscriminately and sent operators down the wrong
// debug path.
if isProxyTimeoutErr(err) {
httputil.WriteRPCError(w, http.StatusGatewayTimeout,
httputil.ErrCodeTimeout,
"function or upstream call exceeded the proxy budget ("+
proxyTimeout.String()+
"). Increase the function's timeout: in function.yaml (max 300s) or split the work into smaller invocations.",
)
return
}
httputil.WriteRPCError(w, http.StatusServiceUnavailable,
httputil.ErrCodeServiceUnavailable,
"namespace gateway unavailable: "+err.Error())
return
}
defer resp.Body.Close()

View File

@ -0,0 +1,77 @@
package gateway
import (
"context"
"errors"
"net/url"
"testing"
)
// Bug #219: distinguish proxy timeout from generic connection failure
// so the canonical envelope carries the right error code.
func TestIsLongRunningProxyPath(t *testing.T) {
cases := []struct {
path string
want bool
}{
// Long-running known classes.
{"/v1/storage/upload", true},
{"/v1/storage/upload/foo", true},
{"/v1/storage/pin", true},
{"/v1/invoke/myns/myfn", true},
{"/v1/functions/abc/invoke", true},
{"/v1/functions/abc/ws", true},
// Fast paths — must default to the 30s budget.
{"/v1/health", false},
{"/v1/auth/verify", false},
{"/v1/pubsub/publish", false}, // single-publish, fast
{"/v1/functions/abc/logs", false},
{"/v1/db/query", false},
{"", false},
}
for _, c := range cases {
t.Run(c.path, func(t *testing.T) {
if got := isLongRunningProxyPath(c.path); got != c.want {
t.Errorf("isLongRunningProxyPath(%q) = %v, want %v", c.path, got, c.want)
}
})
}
}
func TestIsProxyTimeoutErr(t *testing.T) {
t.Run("nil_is_not_timeout", func(t *testing.T) {
if isProxyTimeoutErr(nil) {
t.Error("nil error must not register as timeout")
}
})
t.Run("context_deadline_exceeded_is_timeout", func(t *testing.T) {
if !isProxyTimeoutErr(context.DeadlineExceeded) {
t.Error("context.DeadlineExceeded must register as timeout")
}
})
t.Run("wrapped_context_deadline_is_timeout", func(t *testing.T) {
err := &url.Error{Op: "Get", URL: "http://x", Err: context.DeadlineExceeded}
if !isProxyTimeoutErr(err) {
t.Error("url.Error wrapping context.DeadlineExceeded must register as timeout")
}
})
t.Run("connection_refused_is_NOT_timeout", func(t *testing.T) {
// Genuine connection failures should map to SERVICE_UNAVAILABLE, not TIMEOUT.
err := errors.New("connection refused")
if isProxyTimeoutErr(err) {
t.Error("plain connection error must not register as timeout")
}
})
t.Run("unrelated_error_is_NOT_timeout", func(t *testing.T) {
err := errors.New("dns lookup failed")
if isProxyTimeoutErr(err) {
t.Error("unrelated errors must not register as timeout")
}
})
}

View File

@ -0,0 +1,64 @@
package gateway
// push_routes.go provides the always-registered push HTTP entrypoints.
// When the gateway has no push provider configured (no NtfyBaseURL,
// no ExpoAccessToken, etc. — and therefore no g.pushHandlers), these
// methods return a canonical 503 envelope instead of letting the route
// fall through to the default 404 handler.
//
// Bug #220: tenants saw `404 Page Not Found` on /v1/push/devices and
// couldn't tell whether the path was wrong or push wasn't enabled.
// 503 + a clear message points operators directly at the config knob.
import (
"net/http"
"github.com/DeBrosOfficial/network/pkg/httputil"
)
// pushNotConfiguredMessage is the human-readable 503 body. Same text
// reused by every push entrypoint so operators see consistent guidance.
const pushNotConfiguredMessage = "push notifications are not configured on this namespace gateway. " +
"Set `ntfy_base_url` or `expo_access_token` in the gateway config and restart, " +
"then call this endpoint again. See core/docs/SERVERLESS.md for details."
// pushDevicesHandler dispatches GET (list) / POST (register) on
// /v1/push/devices. Returns 503 when push isn't configured.
func (g *Gateway) pushDevicesHandler(w http.ResponseWriter, r *http.Request) {
if g.pushHandlers == nil {
httputil.WriteRPCError(w, http.StatusServiceUnavailable,
httputil.ErrCodeServiceUnavailable, pushNotConfiguredMessage)
return
}
switch r.Method {
case http.MethodGet:
g.pushHandlers.ListDevicesHandler(w, r)
case http.MethodPost:
g.pushHandlers.RegisterDeviceHandler(w, r)
default:
httputil.WriteRPCError(w, http.StatusMethodNotAllowed,
httputil.ErrCodeValidationFailed, "method not allowed: use GET to list or POST to register")
}
}
// pushDevicesByIDHandler handles DELETE /v1/push/devices/{id}. Returns
// 503 when push isn't configured.
func (g *Gateway) pushDevicesByIDHandler(w http.ResponseWriter, r *http.Request) {
if g.pushHandlers == nil {
httputil.WriteRPCError(w, http.StatusServiceUnavailable,
httputil.ErrCodeServiceUnavailable, pushNotConfiguredMessage)
return
}
g.pushHandlers.DeleteDeviceHandler(w, r)
}
// pushSendHandler handles POST /v1/push/send. Returns 503 when push
// isn't configured.
func (g *Gateway) pushSendHandler(w http.ResponseWriter, r *http.Request) {
if g.pushHandlers == nil {
httputil.WriteRPCError(w, http.StatusServiceUnavailable,
httputil.ErrCodeServiceUnavailable, pushNotConfiguredMessage)
return
}
g.pushHandlers.SendHandler(w, r)
}

View File

@ -0,0 +1,77 @@
package gateway
import (
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
)
// Bug #220: when push isn't configured, /v1/push/* must return a clear
// 503 with the canonical envelope, NOT 404. Tests below pin that contract.
func TestPushRoutes_returns_503_with_actionable_message_when_unconfigured(t *testing.T) {
g := &Gateway{} // pushHandlers is nil
cases := []struct {
name string
method string
path string
handler http.HandlerFunc
}{
{"list", http.MethodGet, "/v1/push/devices", g.pushDevicesHandler},
{"register", http.MethodPost, "/v1/push/devices", g.pushDevicesHandler},
{"delete", http.MethodDelete, "/v1/push/devices/abc", g.pushDevicesByIDHandler},
{"send", http.MethodPost, "/v1/push/send", g.pushSendHandler},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
req := httptest.NewRequest(c.method, c.path, nil)
rec := httptest.NewRecorder()
c.handler(rec, req)
if rec.Code != http.StatusServiceUnavailable {
t.Errorf("expected 503, got %d", rec.Code)
}
// Canonical envelope (#212 contract).
var body map[string]interface{}
if err := json.NewDecoder(rec.Body).Decode(&body); err != nil {
t.Fatalf("decode: %v", err)
}
if ok, _ := body["ok"].(bool); ok {
t.Error("ok must be false")
}
errObj, ok := body["error"].(map[string]interface{})
if !ok {
t.Fatal("error must be an object")
}
msg, _ := errObj["message"].(string)
if !strings.Contains(msg, "push notifications are not configured") {
t.Errorf("message must explain push isn't configured, got %q", msg)
}
if !strings.Contains(msg, "ntfy_base_url") || !strings.Contains(msg, "expo_access_token") {
t.Errorf("message must point at the config knobs, got %q", msg)
}
code, _ := errObj["code"].(string)
if code != "SERVICE_UNAVAILABLE" {
t.Errorf("expected code=SERVICE_UNAVAILABLE, got %q", code)
}
})
}
}
func TestPushDevicesHandler_method_not_allowed_for_unsupported_method(t *testing.T) {
g := &Gateway{} // pushHandlers nil — but we want to test method-not-allowed.
// First populate pushHandlers with a sentinel by skipping that path:
// when nil, every method goes through the 503 path. So instead, we test
// that with nil, even unsupported methods get 503 (not 405 — push
// disabled is the bigger problem).
req := httptest.NewRequest(http.MethodPut, "/v1/push/devices", nil)
rec := httptest.NewRecorder()
g.pushDevicesHandler(rec, req)
if rec.Code != http.StatusServiceUnavailable {
t.Errorf("when push disabled, method-not-allowed gives way to 503; got %d", rec.Code)
}
}

View File

@ -16,6 +16,9 @@ func (g *Gateway) Routes() http.Handler {
mux.HandleFunc("/v1/health", g.healthHandler)
mux.HandleFunc("/v1/version", g.versionHandler)
mux.HandleFunc("/v1/status", g.statusHandler)
// Schema-version contract (bug #214 audit follow-up): tenants can
// self-check whether their gateway's required schema is applied.
mux.HandleFunc("/v1/schema-status", g.handleSchemaStatus)
// Internal ping for peer-to-peer health monitoring
mux.HandleFunc("/v1/internal/ping", g.pingHandler)
@ -125,23 +128,16 @@ func (g *Gateway) Routes() http.Handler {
}
// push notifications
if g.pushHandlers != nil {
// GET + POST share the path; the handler dispatches by method.
mux.HandleFunc("/v1/push/devices", func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
g.pushHandlers.ListDevicesHandler(w, r)
case http.MethodPost:
g.pushHandlers.RegisterDeviceHandler(w, r)
default:
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
})
// DELETE /v1/push/devices/{id} — uses path-prefix routing because
// net/http mux doesn't extract path params; the handler parses {id}.
mux.HandleFunc("/v1/push/devices/", g.pushHandlers.DeleteDeviceHandler)
mux.HandleFunc("/v1/push/send", g.pushHandlers.SendHandler)
}
//
// Routes are ALWAYS registered (bug #220). When no provider is
// configured, the handler returns a canonical 503 envelope explaining
// that push isn't enabled — far better UX than a bare 404 that sends
// operators down "is the gateway broken?" rabbit holes.
mux.HandleFunc("/v1/push/devices", g.pushDevicesHandler)
// DELETE /v1/push/devices/{id} — uses path-prefix routing because
// net/http mux doesn't extract path params; the handler parses {id}.
mux.HandleFunc("/v1/push/devices/", g.pushDevicesByIDHandler)
mux.HandleFunc("/v1/push/send", g.pushSendHandler)
// operator node management (wallet JWT auth via middleware)
if g.operatorHandler != nil {

View File

@ -0,0 +1,114 @@
package gateway
// schema_status_handler.go exposes the gateway's schema-version contract
// over HTTP so namespace tenants can self-check schema drift without
// SSH access (bug #214 audit follow-up).
//
// Endpoint:
//
// GET /v1/schema-status
//
// Response (canonical, success):
//
// {
// "ok": true,
// "required_version": 25,
// "applied_version": 25,
// "in_sync": true,
// "pending": []
// }
//
// Response (drift):
//
// {
// "ok": true,
// "required_version": 25,
// "applied_version": 22,
// "in_sync": false,
// "pending": [
// {"version": 23, "name": "push_devices"},
// {"version": 24, "name": "namespace_publish_seq"},
// {"version": 25, "name": "persistent_ws"}
// ]
// }
//
// Authorization: this is mounted under `/v1/` so the existing
// namespace-ownership middleware applies — only the namespace's own
// authenticated callers can see it. Schema state isn't sensitive on its
// own (it's effectively a public version pin) but we gate by namespace
// to be consistent with every other admin-flavored endpoint.
//
// Why HTTP and not just the CLI: the CLI requires SSH to a node. An
// app developer or tenant ops person should be able to verify schema
// state from their workstation without infrastructure access.
import (
"context"
"net/http"
"time"
"github.com/DeBrosOfficial/network/migrations"
"github.com/DeBrosOfficial/network/pkg/httputil"
)
// schemaStatusResponse is the canonical wire shape. Exported tag-only
// fields so other Go callers (tests, dashboards) can consume the same shape.
type schemaStatusResponse struct {
OK bool `json:"ok"`
RequiredVersion int `json:"required_version"`
AppliedVersion int `json:"applied_version"`
InSync bool `json:"in_sync"`
Pending []schemaPendingItem `json:"pending"`
}
type schemaPendingItem struct {
Version int `json:"version"`
Name string `json:"name"`
}
// handleSchemaStatus serves GET /v1/schema-status.
//
// Errors return the canonical RPC error envelope (bug #212 fix). On a
// happy path the response is the schemaStatusResponse shape above.
func (g *Gateway) handleSchemaStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
httputil.WriteRPCError(w, http.StatusMethodNotAllowed,
httputil.ErrCodeValidationFailed, "method not allowed")
return
}
if g.sqlDB == nil {
httputil.WriteRPCError(w, http.StatusServiceUnavailable,
httputil.ErrCodeServiceUnavailable, "schema status unavailable: db not initialized")
return
}
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
required := migrations.RequiredVersion()
applied, err := migrations.AppliedVersion(ctx, g.sqlDB)
if err != nil {
httputil.WriteRPCError(w, http.StatusInternalServerError,
httputil.ErrCodeInternal, "failed to read applied schema version: "+err.Error())
return
}
pending, err := migrations.PendingMigrations(ctx, g.sqlDB)
if err != nil {
// Non-fatal: applied/required are still useful even if pending fetch fails.
pending = nil
}
items := make([]schemaPendingItem, 0, len(pending))
for _, p := range pending {
items = append(items, schemaPendingItem{Version: p.Version, Name: p.Name})
}
httputil.WriteJSON(w, http.StatusOK, schemaStatusResponse{
OK: true,
RequiredVersion: required,
AppliedVersion: applied,
InSync: applied >= required,
Pending: items,
})
}

View File

@ -0,0 +1,133 @@
package gateway
import (
"database/sql"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/DeBrosOfficial/network/migrations"
_ "github.com/mattn/go-sqlite3"
)
// openTestSQLDB creates an in-memory SQLite with the schema_migrations
// table seeded.
func openTestSQLDB(t *testing.T) *sql.DB {
t.Helper()
db, err := sql.Open("sqlite3", ":memory:")
if err != nil {
t.Fatalf("open: %v", err)
}
t.Cleanup(func() { _ = db.Close() })
if _, err := db.Exec(`
CREATE TABLE schema_migrations (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)`); err != nil {
t.Fatalf("seed: %v", err)
}
return db
}
func TestSchemaStatus_in_sync(t *testing.T) {
db := openTestSQLDB(t)
// Seed schema_migrations to the binary's required version.
for v := 1; v <= migrations.RequiredVersion(); v++ {
if _, err := db.Exec("INSERT INTO schema_migrations (version) VALUES (?)", v); err != nil {
t.Fatalf("seed v=%d: %v", v, err)
}
}
g := &Gateway{sqlDB: db}
req := httptest.NewRequest(http.MethodGet, "/v1/schema-status", nil)
rec := httptest.NewRecorder()
g.handleSchemaStatus(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("status = %d, want 200", rec.Code)
}
var resp schemaStatusResponse
if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil {
t.Fatalf("decode: %v", err)
}
if !resp.OK {
t.Error("ok must be true")
}
if !resp.InSync {
t.Error("in_sync must be true when applied == required")
}
if resp.AppliedVersion != migrations.RequiredVersion() {
t.Errorf("applied = %d, want %d", resp.AppliedVersion, migrations.RequiredVersion())
}
if len(resp.Pending) != 0 {
t.Errorf("pending should be empty when in sync, got %d", len(resp.Pending))
}
}
func TestSchemaStatus_lagging(t *testing.T) {
db := openTestSQLDB(t)
// Seed only the first migration; the rest are "pending".
if _, err := db.Exec("INSERT INTO schema_migrations (version) VALUES (1)"); err != nil {
t.Fatalf("seed: %v", err)
}
g := &Gateway{sqlDB: db}
req := httptest.NewRequest(http.MethodGet, "/v1/schema-status", nil)
rec := httptest.NewRecorder()
g.handleSchemaStatus(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("status = %d, want 200 (drift is reported via in_sync, not via HTTP error)", rec.Code)
}
var resp schemaStatusResponse
if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil {
t.Fatalf("decode: %v", err)
}
if resp.InSync {
t.Error("in_sync must be false when behind")
}
if resp.AppliedVersion != 1 {
t.Errorf("applied = %d, want 1", resp.AppliedVersion)
}
if len(resp.Pending) == 0 {
t.Error("expected non-empty pending list when behind")
}
}
func TestSchemaStatus_no_db_returns_envelope(t *testing.T) {
g := &Gateway{sqlDB: nil}
req := httptest.NewRequest(http.MethodGet, "/v1/schema-status", nil)
rec := httptest.NewRecorder()
g.handleSchemaStatus(rec, req)
if rec.Code != http.StatusServiceUnavailable {
t.Errorf("status = %d, want 503", rec.Code)
}
// Confirm canonical envelope (bug #212 contract).
var body map[string]interface{}
if err := json.NewDecoder(rec.Body).Decode(&body); err != nil {
t.Fatalf("decode: %v", err)
}
if ok, _ := body["ok"].(bool); ok {
t.Error("ok must be false on error")
}
errObj, ok := body["error"].(map[string]interface{})
if !ok {
t.Fatal("error must be an object")
}
if msg, _ := errObj["message"].(string); msg == "" {
t.Error("error message must be populated")
}
}
func TestSchemaStatus_method_not_allowed(t *testing.T) {
g := &Gateway{sqlDB: openTestSQLDB(t)}
req := httptest.NewRequest(http.MethodPost, "/v1/schema-status", nil)
rec := httptest.NewRecorder()
g.handleSchemaStatus(rec, req)
if rec.Code != http.StatusMethodNotAllowed {
t.Errorf("status = %d, want 405", rec.Code)
}
}

View File

@ -41,6 +41,10 @@ func (m *mockFunctionRegistry) GetLogs(ctx context.Context, namespace, name stri
return []serverless.LogEntry{}, nil
}
func (m *mockFunctionRegistry) GetInvocations(ctx context.Context, namespace, name string, limit int) ([]serverless.Invocation, error) {
return []serverless.Invocation{}, nil
}
func TestServerlessHandlers_ListFunctions(t *testing.T) {
logger := zap.NewNop()
registry := &mockFunctionRegistry{

View File

@ -0,0 +1,229 @@
package httputil
// rpc_error.go defines the canonical RPC error envelope used by every
// gateway endpoint that serves typed RPC-style errors (function invoke,
// function deploy, push send, pubsub publish, etc.).
//
// Why this exists: prior to this file the gateway returned at least three
// different error shapes — `{error: "msg"}` from one path, an envelope
// with `request_id/duration_ms/error` from another, and an absent `error`
// field in a third. Generic clients couldn't write a single error parser.
//
// Canonical shape (always populated):
//
// {
// "ok": false,
// "error": {
// "code": "VALIDATION_FAILED", // typed enum, never empty
// "message": "missing username", // human-readable, never empty
// "retryable": false,
// "request_id": "abc...", // optional
// "retry_after": 2.5 // optional, seconds (float)
// }
// }
//
// HTTP status code is set on the response and reflects the error class;
// the envelope's code is the typed enum a client switches on.
import (
"net/http"
)
// RPCErrorCode is the typed error-code enum. New codes go here, alphabetic
// within their class. Codes are stable strings — clients pin to them.
type RPCErrorCode string
const (
// 4xx — client error
ErrCodeValidationFailed RPCErrorCode = "VALIDATION_FAILED"
ErrCodeUnauthorized RPCErrorCode = "UNAUTHORIZED"
ErrCodeForbidden RPCErrorCode = "FORBIDDEN"
ErrCodeNotFound RPCErrorCode = "NOT_FOUND"
ErrCodeConflict RPCErrorCode = "CONFLICT"
ErrCodeRateLimited RPCErrorCode = "RATE_LIMITED"
ErrCodePayloadTooLarge RPCErrorCode = "PAYLOAD_TOO_LARGE"
// 5xx — server error
ErrCodeInternal RPCErrorCode = "INTERNAL"
ErrCodeServiceUnavailable RPCErrorCode = "SERVICE_UNAVAILABLE"
ErrCodeTimeout RPCErrorCode = "TIMEOUT"
// Function-specific (5xx-mapped but distinct codes for client routing)
ErrCodeFunctionExecution RPCErrorCode = "FUNCTION_EXECUTION_FAILED"
ErrCodeFunctionDeploy RPCErrorCode = "FUNCTION_DEPLOY_FAILED"
ErrCodeSchemaMismatch RPCErrorCode = "SCHEMA_MISMATCH"
)
// RPCErrorEnvelope is the canonical wire shape. Use WriteRPCError to emit;
// the struct is exported so SDK clients can decode/match against it.
type RPCErrorEnvelope struct {
OK bool `json:"ok"`
Error *RPCErrorDetail `json:"error"`
}
// RPCErrorDetail is the typed error body. `Code` and `Message` are
// always populated by WriteRPCError — clients can rely on that contract.
type RPCErrorDetail struct {
Code RPCErrorCode `json:"code"`
Message string `json:"message"`
Retryable bool `json:"retryable"`
// Optional metadata. omitempty so clients don't see noise on simple cases.
RequestID string `json:"request_id,omitempty"`
RetryAfter float64 `json:"retry_after,omitempty"`
}
// RPCErrorOption customizes the envelope (request id, retry-after, etc.).
// Callers build chains like WriteRPCError(w, 429, code, msg,
// WithRetryAfter(2.5), WithRequestID(reqID)).
type RPCErrorOption func(*RPCErrorDetail)
// WithRequestID attaches the gateway request ID to the error detail.
// Useful for cross-referencing client errors with gateway logs.
func WithRequestID(id string) RPCErrorOption {
return func(d *RPCErrorDetail) { d.RequestID = id }
}
// WithRetryAfter sets the retry-after hint (seconds, float). Sets
// Retryable=true automatically — anything with a retry hint is retryable.
func WithRetryAfter(seconds float64) RPCErrorOption {
return func(d *RPCErrorDetail) {
d.RetryAfter = seconds
d.Retryable = true
}
}
// WithRetryable marks the error as retryable without a specific delay.
// Only set this for transient errors (rate limiting, temporary upstream
// unavailability). Don't set it for validation errors — retrying with the
// same input won't help.
func WithRetryable() RPCErrorOption {
return func(d *RPCErrorDetail) { d.Retryable = true }
}
// WriteRPCError writes the canonical envelope. Both code and message are
// REQUIRED — empty values are normalized so clients never see an empty
// message.
//
// Example:
//
// httputil.WriteRPCError(w, http.StatusBadRequest,
// httputil.ErrCodeValidationFailed, "username required")
//
// With options:
//
// httputil.WriteRPCError(w, http.StatusTooManyRequests,
// httputil.ErrCodeRateLimited, "wallet over per-minute cap",
// httputil.WithRetryAfter(1.2),
// httputil.WithRequestID(reqID))
func WriteRPCError(w http.ResponseWriter, status int, code RPCErrorCode, message string, opts ...RPCErrorOption) {
if code == "" {
code = ErrCodeInternal
}
if message == "" {
message = defaultMessageFor(code)
}
detail := &RPCErrorDetail{
Code: code,
Message: message,
Retryable: defaultRetryableFor(code),
}
for _, opt := range opts {
opt(detail)
}
// On rate-limit responses with a retry hint, also surface the standard
// HTTP Retry-After header so non-RPC-aware clients honor it.
if detail.RetryAfter > 0 {
w.Header().Set("Retry-After", formatFloat(detail.RetryAfter))
}
WriteJSON(w, status, RPCErrorEnvelope{OK: false, Error: detail})
}
// defaultMessageFor returns a sensible fallback when the caller passed
// an empty message string. We never leave the envelope's message empty —
// that was the bug.
func defaultMessageFor(code RPCErrorCode) string {
switch code {
case ErrCodeValidationFailed:
return "request failed validation"
case ErrCodeUnauthorized:
return "authentication required"
case ErrCodeForbidden:
return "access denied"
case ErrCodeNotFound:
return "resource not found"
case ErrCodeConflict:
return "request conflicts with current state"
case ErrCodeRateLimited:
return "rate limit exceeded"
case ErrCodePayloadTooLarge:
return "request payload too large"
case ErrCodeInternal:
return "internal server error"
case ErrCodeServiceUnavailable:
return "service temporarily unavailable"
case ErrCodeTimeout:
return "request timed out"
case ErrCodeFunctionExecution:
return "function execution failed"
case ErrCodeFunctionDeploy:
return "function deployment failed"
case ErrCodeSchemaMismatch:
return "gateway schema does not match required version"
default:
return "an error occurred"
}
}
// defaultRetryableFor seeds the retryable bit based on the error class.
// Callers can override via WithRetryable() / WithRetryAfter().
func defaultRetryableFor(code RPCErrorCode) bool {
switch code {
case ErrCodeRateLimited, ErrCodeServiceUnavailable, ErrCodeTimeout:
return true
default:
return false
}
}
// formatFloat renders a float seconds value compactly for the
// Retry-After HTTP header. We avoid stdlib strconv import here to keep
// this file's dependency set tight.
func formatFloat(v float64) string {
if v <= 0 {
return "0"
}
// Round to one decimal place; clients don't need sub-100ms precision.
tenths := int64(v*10 + 0.5)
whole := tenths / 10
frac := tenths % 10
if frac == 0 {
return itoa(whole)
}
return itoa(whole) + "." + itoa(frac)
}
// itoa is a small unsigned-int formatter (Go's strconv would also work
// but this keeps the file small + dep-free).
func itoa(n int64) string {
if n == 0 {
return "0"
}
neg := n < 0
if neg {
n = -n
}
var buf [20]byte
i := len(buf)
for n > 0 {
i--
buf[i] = byte('0' + n%10)
n /= 10
}
if neg {
i--
buf[i] = '-'
}
return string(buf[i:])
}

View File

@ -0,0 +1,155 @@
package httputil
import (
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
)
func decode(t *testing.T, rec *httptest.ResponseRecorder) RPCErrorEnvelope {
t.Helper()
var env RPCErrorEnvelope
if err := json.NewDecoder(rec.Body).Decode(&env); err != nil {
t.Fatalf("decode: %v", err)
}
return env
}
func TestWriteRPCError_canonical_shape(t *testing.T) {
rec := httptest.NewRecorder()
WriteRPCError(rec, http.StatusBadRequest,
ErrCodeValidationFailed, "username required")
if rec.Code != http.StatusBadRequest {
t.Errorf("status = %d, want 400", rec.Code)
}
env := decode(t, rec)
if env.OK {
t.Error("ok must be false on error envelope")
}
if env.Error == nil {
t.Fatal("error must be non-nil")
}
if env.Error.Code != ErrCodeValidationFailed {
t.Errorf("code = %s, want %s", env.Error.Code, ErrCodeValidationFailed)
}
if env.Error.Message != "username required" {
t.Errorf("message = %q", env.Error.Message)
}
if env.Error.Retryable {
t.Error("validation errors should not default to retryable")
}
}
// The contract: NEVER return an empty error.message. Bug #212 was clients
// seeing "RPC failed" because the gateway omitted the message field. This
// test locks in the fallback path.
func TestWriteRPCError_empty_message_is_filled_with_default(t *testing.T) {
cases := []struct {
code RPCErrorCode
wantInclude string
}{
{ErrCodeValidationFailed, "validation"},
{ErrCodeNotFound, "not found"},
{ErrCodeRateLimited, "rate limit"},
{ErrCodeInternal, "internal"},
{ErrCodeFunctionExecution, "function execution"},
}
for _, c := range cases {
t.Run(string(c.code), func(t *testing.T) {
rec := httptest.NewRecorder()
WriteRPCError(rec, http.StatusInternalServerError, c.code, "")
env := decode(t, rec)
if env.Error.Message == "" {
t.Errorf("message must NEVER be empty even when caller passed empty string")
}
if !strings.Contains(strings.ToLower(env.Error.Message), c.wantInclude) {
t.Errorf("message %q should include %q", env.Error.Message, c.wantInclude)
}
})
}
}
func TestWriteRPCError_empty_code_falls_back_to_internal(t *testing.T) {
rec := httptest.NewRecorder()
WriteRPCError(rec, http.StatusInternalServerError, "", "")
env := decode(t, rec)
if env.Error.Code != ErrCodeInternal {
t.Errorf("empty code should fall back to INTERNAL, got %s", env.Error.Code)
}
if env.Error.Message == "" {
t.Error("default message must be populated")
}
}
func TestWriteRPCError_with_retry_after_sets_header_and_retryable(t *testing.T) {
rec := httptest.NewRecorder()
WriteRPCError(rec, http.StatusTooManyRequests,
ErrCodeRateLimited, "wallet over cap",
WithRetryAfter(2.5))
if got := rec.Header().Get("Retry-After"); got != "2.5" {
t.Errorf("Retry-After header = %q, want 2.5", got)
}
env := decode(t, rec)
if !env.Error.Retryable {
t.Error("WithRetryAfter must imply Retryable=true")
}
if env.Error.RetryAfter != 2.5 {
t.Errorf("retry_after = %v, want 2.5", env.Error.RetryAfter)
}
}
func TestWriteRPCError_with_request_id(t *testing.T) {
rec := httptest.NewRecorder()
WriteRPCError(rec, http.StatusBadRequest,
ErrCodeValidationFailed, "bad input",
WithRequestID("req-abc"))
env := decode(t, rec)
if env.Error.RequestID != "req-abc" {
t.Errorf("request_id = %q, want req-abc", env.Error.RequestID)
}
}
func TestWriteRPCError_default_retryable_for_transient_codes(t *testing.T) {
cases := []struct {
code RPCErrorCode
want bool
}{
{ErrCodeRateLimited, true},
{ErrCodeServiceUnavailable, true},
{ErrCodeTimeout, true},
{ErrCodeValidationFailed, false},
{ErrCodeNotFound, false},
{ErrCodeForbidden, false},
{ErrCodeInternal, false},
}
for _, c := range cases {
rec := httptest.NewRecorder()
WriteRPCError(rec, http.StatusInternalServerError, c.code, "")
env := decode(t, rec)
if env.Error.Retryable != c.want {
t.Errorf("%s: retryable=%v, want %v", c.code, env.Error.Retryable, c.want)
}
}
}
func TestFormatFloat(t *testing.T) {
cases := []struct {
in float64
want string
}{
{0, "0"},
{1.0, "1"},
{1.5, "1.5"},
{2.45, "2.5"}, // rounds to 1 decimal
{-1.0, "0"}, // negative clamps to 0
}
for _, c := range cases {
if got := formatFloat(c.in); got != c.want {
t.Errorf("formatFloat(%v) = %q, want %q", c.in, got, c.want)
}
}
}

View File

@ -416,13 +416,16 @@ func (e *Engine) registerHostModule(ctx context.Context) error {
for _, moduleName := range []string{"env", "host", "orama"} {
_, err := e.runtime.NewHostModuleBuilder(moduleName).
NewFunctionBuilder().WithFunc(e.hGetCallerWallet).Export("get_caller_wallet").
NewFunctionBuilder().WithFunc(e.hGetCallerJWTSubject).Export("get_caller_jwt_subject").
NewFunctionBuilder().WithFunc(e.hGetWSClientID).Export("get_ws_client_id").
NewFunctionBuilder().WithFunc(e.hGetCallerClaim).Export("get_caller_claim").
NewFunctionBuilder().WithFunc(e.hGetRequestID).Export("get_request_id").
NewFunctionBuilder().WithFunc(e.hGetEnv).Export("get_env").
NewFunctionBuilder().WithFunc(e.hGetSecret).Export("get_secret").
NewFunctionBuilder().WithFunc(e.hDBQuery).Export("db_query").
NewFunctionBuilder().WithFunc(e.hDBQueryV2).Export("db_query_v2").
NewFunctionBuilder().WithFunc(e.hDBExecute).Export("db_execute").
NewFunctionBuilder().WithFunc(e.hDBExecuteV2).Export("db_execute_v2").
NewFunctionBuilder().WithFunc(e.hDBTransaction).Export("db_transaction").
NewFunctionBuilder().WithFunc(e.hExecAndPublish).Export("exec_and_publish").
NewFunctionBuilder().WithFunc(e.hCacheGet).Export("cache_get").
@ -469,6 +472,13 @@ func (e *Engine) hGetWSClientID(ctx context.Context, mod api.Module) uint64 {
return e.executor.WriteToGuest(ctx, mod, []byte(cid))
}
// hGetCallerJWTSubject returns the JWT `sub` claim explicitly. Empty
// string if the request was not JWT-authenticated. See bug #215.
func (e *Engine) hGetCallerJWTSubject(ctx context.Context, mod api.Module) uint64 {
sub := e.hostServices.GetCallerJWTSubject(ctx)
return e.executor.WriteToGuest(ctx, mod, []byte(sub))
}
// hGetCallerClaim reads a claim name from guest memory, looks it up on the
// caller's JWT custom claims, and writes the value (or empty string) back.
func (e *Engine) hGetCallerClaim(ctx context.Context, mod api.Module, namePtr, nameLen uint32) uint64 {
@ -660,6 +670,51 @@ func (e *Engine) hPubSubPublishBatch(ctx context.Context, mod api.Module, msgsPt
return 1
}
// hDBExecuteV2 is the WASM-callable wrapper for DBExecuteV2 (bug #218).
// Inputs: pointer/length of SQL + JSON args. Returns a packed uint64
// (ptr<<32 | len) pointing to the JSON envelope in guest memory, or 0 on
// host-side failure. The JSON envelope's "error" field carries SQL errors.
func (e *Engine) hDBExecuteV2(ctx context.Context, mod api.Module, queryPtr, queryLen, argsPtr, argsLen uint32) uint64 {
query, ok := e.executor.ReadFromGuest(mod, queryPtr, queryLen)
if !ok {
return 0
}
var args []interface{}
if argsLen > 0 {
if err := e.executor.UnmarshalJSONFromGuest(mod, argsPtr, argsLen, &args); err != nil {
e.logger.Warn("db_execute_v2: failed to unmarshal args", zap.Error(err))
return 0
}
}
out, err := e.hostServices.DBExecuteV2(ctx, string(query), args)
if err != nil {
e.logger.Warn("host function db_execute_v2 failed", zap.Error(err))
return 0
}
return e.executor.WriteToGuest(ctx, mod, out)
}
// hDBQueryV2 is the WASM-callable wrapper for DBQueryV2 (bug #218).
func (e *Engine) hDBQueryV2(ctx context.Context, mod api.Module, queryPtr, queryLen, argsPtr, argsLen uint32) uint64 {
query, ok := e.executor.ReadFromGuest(mod, queryPtr, queryLen)
if !ok {
return 0
}
var args []interface{}
if argsLen > 0 {
if err := e.executor.UnmarshalJSONFromGuest(mod, argsPtr, argsLen, &args); err != nil {
e.logger.Warn("db_query_v2: failed to unmarshal args", zap.Error(err))
return 0
}
}
out, err := e.hostServices.DBQueryV2(ctx, string(query), args)
if err != nil {
e.logger.Warn("host function db_query_v2 failed", zap.Error(err))
return 0
}
return e.executor.WriteToGuest(ctx, mod, out)
}
// hDBTransaction is the WASM-callable wrapper for DBTransaction.
// Input: pointer/length of opsJSON ({"ops":[{kind,sql,args},...]}).
// Returns a packed uint64 (ptr<<32 | len) pointing to JSON BatchResult in

View File

@ -56,6 +56,14 @@ func (m *mockHostServices) DBExecute(ctx context.Context, query string, args []i
return 0, nil
}
func (m *mockHostServices) DBExecuteV2(ctx context.Context, query string, args []interface{}) ([]byte, error) {
return []byte(`{"rows_affected":0}`), nil
}
func (m *mockHostServices) DBQueryV2(ctx context.Context, query string, args []interface{}) ([]byte, error) {
return []byte(`{"rows":[]}`), nil
}
func (m *mockHostServices) CacheGet(ctx context.Context, key string) ([]byte, error) {
return nil, nil
}
@ -154,6 +162,10 @@ func (m *mockHostServices) GetCallerClaim(ctx context.Context, name string) stri
return ""
}
func (m *mockHostServices) GetCallerJWTSubject(ctx context.Context) string {
return ""
}
func (m *mockHostServices) EnqueueBackground(ctx context.Context, functionName string, payload []byte) (string, error) {
return "", nil
}

View File

@ -68,14 +68,15 @@ func (h *HostFunctions) FunctionInvoke(ctx context.Context, name string, payload
}
req := &serverless.InvokeRequest{
Namespace: cur.Namespace,
FunctionName: name,
Input: payload,
TriggerType: serverless.TriggerTypeWebSocket,
CallerWallet: cur.CallerWallet,
CallerIP: cur.CallerIP,
WSClientID: cur.WSClientID,
CallerClaims: cur.CallerClaims,
Namespace: cur.Namespace,
FunctionName: name,
Input: payload,
TriggerType: serverless.TriggerTypeWebSocket,
CallerWallet: cur.CallerWallet,
CallerIP: cur.CallerIP,
WSClientID: cur.WSClientID,
CallerClaims: cur.CallerClaims,
CallerJWTSubject: cur.CallerJWTSubject,
}
resp, err := inv.Invoke(ctx, req)
if err != nil {
@ -165,3 +166,21 @@ func (h *HostFunctions) GetCallerClaim(ctx context.Context, name string) string
}
return h.invCtx.CallerClaims[name]
}
// GetCallerJWTSubject returns the JWT `sub` claim explicitly, independent
// of the API-key-vs-JWT precedence used by GetCallerWallet. Empty when the
// request was not JWT-authenticated. Bug #215.
//
// Use this when a function MUST bind on the JWT-signed identity (e.g. a
// signup flow that verifies the wallet the caller is registering matches
// the wallet that signed the auth challenge). GetCallerWallet may return
// the namespace pseudo-identifier if the caller also presents an API key.
func (h *HostFunctions) GetCallerJWTSubject(ctx context.Context) string {
h.invCtxLock.RLock()
defer h.invCtxLock.RUnlock()
if h.invCtx == nil {
return ""
}
return h.invCtx.CallerJWTSubject
}

View File

@ -31,6 +31,11 @@ func (h *HostFunctions) DBQuery(ctx context.Context, query string, args []interf
}
// DBExecute executes an INSERT/UPDATE/DELETE query and returns affected rows.
//
// IMPORTANT: this returns 0 for BOTH "0 rows affected" AND "SQL error"
// — callers can't distinguish. That ambiguity caused bug #218 (AnChat's
// migrate function silently dropped statements). For new code, prefer
// DBExecuteV2 which returns a typed envelope.
func (h *HostFunctions) DBExecute(ctx context.Context, query string, args []interface{}) (int64, error) {
if h.db == nil {
return 0, &serverless.HostFunctionError{Function: "db_execute", Cause: serverless.ErrDatabaseUnavailable}
@ -45,6 +50,77 @@ func (h *HostFunctions) DBExecute(ctx context.Context, query string, args []inte
return affected, nil
}
// dbExecuteV2Result is the JSON wire shape returned by DBExecuteV2.
type dbExecuteV2Result struct {
RowsAffected int64 `json:"rows_affected"`
LastInsertID int64 `json:"last_insert_id,omitempty"`
Error string `json:"error,omitempty"`
}
// DBExecuteV2 is the typed equivalent of DBExecute. Returns the same shape
// regardless of success/failure so callers can distinguish "0 rows affected"
// from "SQL error" — fixing the contract gap that caused bug #218.
//
// Returns a Go error only for host-side setup failures (no DB). SQL errors
// are encoded in the JSON envelope's "error" field.
func (h *HostFunctions) DBExecuteV2(ctx context.Context, query string, args []interface{}) ([]byte, error) {
if h.db == nil {
return nil, &serverless.HostFunctionError{
Function: "db_execute_v2",
Cause: serverless.ErrDatabaseUnavailable,
}
}
out := dbExecuteV2Result{}
result, err := h.db.Exec(ctx, query, args...)
if err != nil {
out.Error = err.Error()
buf, mErr := json.Marshal(out)
if mErr != nil {
return nil, &serverless.HostFunctionError{Function: "db_execute_v2", Cause: mErr}
}
return buf, nil
}
if result != nil {
out.RowsAffected, _ = result.RowsAffected()
out.LastInsertID, _ = result.LastInsertId()
}
buf, mErr := json.Marshal(out)
if mErr != nil {
return nil, &serverless.HostFunctionError{Function: "db_execute_v2", Cause: mErr}
}
return buf, nil
}
// dbQueryV2Result is the JSON wire shape returned by DBQueryV2.
type dbQueryV2Result struct {
Rows []map[string]interface{} `json:"rows"`
Error string `json:"error,omitempty"`
}
// DBQueryV2 is the typed equivalent of DBQuery. Distinguishes "empty
// result set" from "query failed" via the "error" field.
func (h *HostFunctions) DBQueryV2(ctx context.Context, query string, args []interface{}) ([]byte, error) {
if h.db == nil {
return nil, &serverless.HostFunctionError{
Function: "db_query_v2",
Cause: serverless.ErrDatabaseUnavailable,
}
}
out := dbQueryV2Result{Rows: []map[string]interface{}{}}
if err := h.db.Query(ctx, &out.Rows, query, args...); err != nil {
out.Error = err.Error()
// Reset rows to non-nil empty on error so callers get a stable shape.
out.Rows = []map[string]interface{}{}
}
buf, mErr := json.Marshal(out)
if mErr != nil {
return nil, &serverless.HostFunctionError{Function: "db_query_v2", Cause: mErr}
}
return buf, nil
}
// dbTransactionRequest is the WASM-side shape for db_transaction input.
type dbTransactionRequest struct {
Ops []rqlite.BatchOp `json:"ops"`

View File

@ -2,6 +2,7 @@ package hostfunctions
import (
"context"
"database/sql"
"encoding/json"
"strings"
"sync/atomic"
@ -170,6 +171,148 @@ func TestExecAndPublish_no_namespace_in_context_rejected(t *testing.T) {
_ = h
}
// fakeExecClient is a minimal rqlite.Client stub focused on Exec/Query
// behavior for the v2 host call tests (bug #218 regression coverage).
type fakeExecClient struct {
rqlite.Client
execErr error
execRows int64
execLastID int64
queryErr error
queryRows []map[string]interface{}
}
func (f *fakeExecClient) Exec(_ context.Context, _ string, _ ...any) (sql.Result, error) {
if f.execErr != nil {
return nil, f.execErr
}
return &fakeSQLResult{rows: f.execRows, lastID: f.execLastID}, nil
}
func (f *fakeExecClient) Query(_ context.Context, dest any, _ string, _ ...any) error {
if f.queryErr != nil {
return f.queryErr
}
rows, ok := dest.(*[]map[string]interface{})
if !ok {
return nil
}
*rows = append(*rows, f.queryRows...)
return nil
}
type fakeSQLResult struct {
rows int64
lastID int64
}
func (r *fakeSQLResult) LastInsertId() (int64, error) { return r.lastID, nil }
func (r *fakeSQLResult) RowsAffected() (int64, error) { return r.rows, nil }
func TestDBExecuteV2_success(t *testing.T) {
fake := &fakeExecClient{execRows: 3, execLastID: 42}
h := newHFWithDB(fake)
out, err := h.DBExecuteV2(context.Background(), "INSERT INTO t VALUES (?)", []interface{}{1})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var res dbExecuteV2Result
if err := json.Unmarshal(out, &res); err != nil {
t.Fatalf("decode: %v", err)
}
if res.RowsAffected != 3 {
t.Errorf("rows_affected = %d, want 3", res.RowsAffected)
}
if res.LastInsertID != 42 {
t.Errorf("last_insert_id = %d, want 42", res.LastInsertID)
}
if res.Error != "" {
t.Errorf("error should be empty on success, got %q", res.Error)
}
}
// The whole point of bug #218: when the SQL fails, the envelope must say
// so. Old DBExecute returned 0 — indistinguishable from "0 rows affected".
func TestDBExecuteV2_sql_error_populates_error_field(t *testing.T) {
fake := &fakeExecClient{execErr: errFakeDBFailure{msg: "no such column: missing"}}
h := newHFWithDB(fake)
out, err := h.DBExecuteV2(context.Background(), "INSERT ...", nil)
if err != nil {
t.Fatalf("expected SQL errors via envelope, not Go error: %v", err)
}
var res dbExecuteV2Result
if err := json.Unmarshal(out, &res); err != nil {
t.Fatalf("decode: %v", err)
}
if res.Error == "" {
t.Fatal("error field must NOT be empty when SQL failed (bug #218 contract)")
}
if !strings.Contains(res.Error, "no such column") {
t.Errorf("error should preserve SQL message, got %q", res.Error)
}
if res.RowsAffected != 0 {
t.Errorf("rows_affected should be 0 on failure, got %d", res.RowsAffected)
}
}
func TestDBExecuteV2_no_db_returns_go_error(t *testing.T) {
h := &HostFunctions{db: nil}
_, err := h.DBExecuteV2(context.Background(), "INSERT ...", nil)
if err == nil {
t.Fatal("expected Go error for setup failure (no DB)")
}
}
func TestDBQueryV2_success_with_empty_rows(t *testing.T) {
fake := &fakeExecClient{queryRows: nil} // genuine "no rows" — not an error
h := newHFWithDB(fake)
out, err := h.DBQueryV2(context.Background(), "SELECT * FROM t WHERE 0=1", nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var res dbQueryV2Result
if err := json.Unmarshal(out, &res); err != nil {
t.Fatalf("decode: %v", err)
}
if res.Error != "" {
t.Errorf("empty rows is NOT an error, got error=%q", res.Error)
}
if res.Rows == nil {
t.Error("rows must be non-nil empty slice for stable JSON shape")
}
if len(res.Rows) != 0 {
t.Errorf("rows = %v, want empty", res.Rows)
}
}
func TestDBQueryV2_query_error_populates_error_field(t *testing.T) {
fake := &fakeExecClient{queryErr: errFakeDBFailure{msg: "syntax error"}}
h := newHFWithDB(fake)
out, err := h.DBQueryV2(context.Background(), "SELECT bogus FROM t", nil)
if err != nil {
t.Fatalf("query errors should be in envelope, not Go error: %v", err)
}
var res dbQueryV2Result
if err := json.Unmarshal(out, &res); err != nil {
t.Fatalf("decode: %v", err)
}
if res.Error == "" {
t.Fatal("error field must NOT be empty when query failed")
}
if res.Rows == nil {
t.Error("rows must be non-nil even on error (stable shape)")
}
}
// errFakeDBFailure is a sentinel for v2 tests.
type errFakeDBFailure struct{ msg string }
func (e errFakeDBFailure) Error() string { return e.msg }
func TestDBTransaction_rollback_returns_committed_false_no_go_error(t *testing.T) {
fake := &fakeBatchClient{
respond: func(ops []rqlite.BatchOp) (*rqlite.BatchResult, error) {

View File

@ -0,0 +1,51 @@
package hostfunctions
import (
"context"
"testing"
"github.com/DeBrosOfficial/network/pkg/serverless"
)
// Bug #215: get_caller_jwt_subject must return the JWT `sub` exposed
// in the InvocationContext, independent of the caller-wallet field.
func TestGetCallerJWTSubject_returns_sub_when_set(t *testing.T) {
h := &HostFunctions{}
h.SetInvocationContext(&serverless.InvocationContext{
// CallerWallet is the namespace pseudo-wallet (resolved via API key);
// CallerJWTSubject is the actual signing wallet from the Bearer JWT.
CallerWallet: "anchat-test",
CallerJWTSubject: "A3ZGpMKPtsmYVtXr6Gnf5u4x6j4dgZWnpyXXFAiCibCC",
})
defer h.ClearContext()
if got := h.GetCallerJWTSubject(context.Background()); got != "A3ZGpMKPtsmYVtXr6Gnf5u4x6j4dgZWnpyXXFAiCibCC" {
t.Errorf("GetCallerJWTSubject = %q, want the JWT subject (not the namespace)", got)
}
// And GetCallerWallet still returns the resolved-wallet (namespace in
// this case) — they're independent accessors by design.
if got := h.GetCallerWallet(context.Background()); got != "anchat-test" {
t.Errorf("GetCallerWallet = %q, want anchat-test (the resolved caller-wallet)", got)
}
}
func TestGetCallerJWTSubject_empty_when_not_jwt_authed(t *testing.T) {
h := &HostFunctions{}
h.SetInvocationContext(&serverless.InvocationContext{
CallerWallet: "anchat-test",
CallerJWTSubject: "", // request was API-key only
})
defer h.ClearContext()
if got := h.GetCallerJWTSubject(context.Background()); got != "" {
t.Errorf("GetCallerJWTSubject = %q, want empty for API-key-only auth", got)
}
}
func TestGetCallerJWTSubject_empty_when_no_invocation_context(t *testing.T) {
h := &HostFunctions{}
if got := h.GetCallerJWTSubject(context.Background()); got != "" {
t.Errorf("GetCallerJWTSubject = %q, want empty when no inv ctx", got)
}
}

View File

@ -54,6 +54,10 @@ type InvokeRequest struct {
WSClientID string `json:"ws_client_id,omitempty"`
// CallerClaims holds custom JWT claims to expose via get_caller_claim.
CallerClaims map[string]string `json:"caller_claims,omitempty"`
// CallerJWTSubject carries the JWT `sub` claim explicitly so the
// engine can populate InvocationContext.CallerJWTSubject — fixes the
// bug-#215 case where API-key precedence buries the JWT identity.
CallerJWTSubject string `json:"caller_jwt_subject,omitempty"`
}
// InvokeResponse contains the result of a function invocation.
@ -112,16 +116,17 @@ func (i *Invoker) Invoke(ctx context.Context, req *InvokeRequest) (*InvokeRespon
// Build invocation context
invCtx := &InvocationContext{
RequestID: requestID,
FunctionID: fn.ID,
FunctionName: fn.Name,
Namespace: fn.Namespace,
CallerWallet: req.CallerWallet,
CallerIP: req.CallerIP,
TriggerType: req.TriggerType,
WSClientID: req.WSClientID,
EnvVars: envVars,
CallerClaims: req.CallerClaims,
RequestID: requestID,
FunctionID: fn.ID,
FunctionName: fn.Name,
Namespace: fn.Namespace,
CallerWallet: req.CallerWallet,
CallerIP: req.CallerIP,
TriggerType: req.TriggerType,
WSClientID: req.WSClientID,
EnvVars: envVars,
CallerClaims: req.CallerClaims,
CallerJWTSubject: req.CallerJWTSubject,
}
// Execute with retry logic

View File

@ -93,6 +93,10 @@ func (m *MockRegistry) GetLogs(ctx context.Context, namespace, name string, limi
return []LogEntry{}, nil
}
func (m *MockRegistry) GetInvocations(ctx context.Context, namespace, name string, limit int) ([]Invocation, error) {
return []Invocation{}, nil
}
// MockHostServices is a mock implementation of HostServices
type MockHostServices struct {
mu sync.RWMutex
@ -116,6 +120,14 @@ func (m *MockHostServices) DBExecute(ctx context.Context, query string, args []i
return 0, nil
}
func (m *MockHostServices) DBExecuteV2(ctx context.Context, query string, args []interface{}) ([]byte, error) {
return []byte(`{"rows_affected":0}`), nil
}
func (m *MockHostServices) DBQueryV2(ctx context.Context, query string, args []interface{}) ([]byte, error) {
return []byte(`{"rows":[]}`), nil
}
func (m *MockHostServices) CacheGet(ctx context.Context, key string) ([]byte, error) {
m.mu.RLock()
defer m.mu.RUnlock()
@ -240,6 +252,10 @@ func (m *MockHostServices) GetCallerClaim(ctx context.Context, name string) stri
return ""
}
func (m *MockHostServices) GetCallerJWTSubject(ctx context.Context) string {
return ""
}
func (m *MockHostServices) EnqueueBackground(ctx context.Context, functionName string, payload []byte) (string, error) {
return "job-123", nil
}

View File

@ -426,6 +426,104 @@ func (r *Registry) GetLogs(ctx context.Context, namespace, name string, limit in
return logs, nil
}
// GetInvocations returns invocation history for a function in reverse
// chronological order, with any associated WASM log entries nested per
// record. This is the right answer to "what happened when this function
// was invoked" — `orama function logs <name>` consumes this view.
//
// Always populated when the function has been invoked at least once.
// Two queries (invocations + nested WASM logs) batched into one map.
func (r *Registry) GetInvocations(ctx context.Context, namespace, name string, limit int) ([]Invocation, error) {
if limit <= 0 {
limit = 50
}
invQuery := `
SELECT i.id, i.request_id, i.trigger_type, i.caller_wallet,
i.input_size, i.output_size, i.started_at, i.completed_at,
i.duration_ms, i.status, i.error_message, i.memory_used_mb
FROM function_invocations i
JOIN functions f ON i.function_id = f.id
WHERE f.namespace = ? AND f.name = ?
ORDER BY i.started_at DESC
LIMIT ?
`
var rows []struct {
ID string `db:"id"`
RequestID string `db:"request_id"`
TriggerType string `db:"trigger_type"`
CallerWallet string `db:"caller_wallet"`
InputSize int `db:"input_size"`
OutputSize int `db:"output_size"`
StartedAt time.Time `db:"started_at"`
CompletedAt time.Time `db:"completed_at"`
DurationMS int64 `db:"duration_ms"`
Status string `db:"status"`
ErrorMessage string `db:"error_message"`
MemoryUsedMB float64 `db:"memory_used_mb"`
}
if err := r.db.Query(ctx, &rows, invQuery, namespace, name, limit); err != nil {
return nil, fmt.Errorf("failed to query invocations: %w", err)
}
if len(rows) == 0 {
return []Invocation{}, nil
}
// Batched fetch of nested WASM logs.
invIDs := make([]interface{}, len(rows))
for i, r := range rows {
invIDs[i] = r.ID
}
placeholders := strings.Repeat("?,", len(invIDs))
placeholders = placeholders[:len(placeholders)-1]
logsQuery := `
SELECT invocation_id, level, message, timestamp
FROM function_logs
WHERE invocation_id IN (` + placeholders + `)
ORDER BY timestamp ASC
`
var logRows []struct {
InvocationID string `db:"invocation_id"`
Level string `db:"level"`
Message string `db:"message"`
Timestamp time.Time `db:"timestamp"`
}
logsByInv := map[string][]LogEntry{}
if err := r.db.Query(ctx, &logRows, logsQuery, invIDs...); err != nil {
// Don't fail the whole call — invocation summary is still useful.
r.logger.Warn("failed to fetch nested WASM logs; returning invocations without them",
zap.Error(err))
} else {
for _, lr := range logRows {
logsByInv[lr.InvocationID] = append(logsByInv[lr.InvocationID], LogEntry{
Level: lr.Level,
Message: lr.Message,
Timestamp: lr.Timestamp,
})
}
}
out := make([]Invocation, len(rows))
for i, row := range rows {
out[i] = Invocation{
ID: row.ID,
RequestID: row.RequestID,
TriggerType: row.TriggerType,
CallerWallet: row.CallerWallet,
InputSize: row.InputSize,
OutputSize: row.OutputSize,
StartedAt: row.StartedAt,
CompletedAt: row.CompletedAt,
DurationMS: row.DurationMS,
Status: row.Status,
ErrorMessage: row.ErrorMessage,
MemoryUsedMB: row.MemoryUsedMB,
WASMLogs: logsByInv[row.ID],
}
}
return out, nil
}
// -----------------------------------------------------------------------------
// Private helpers
// -----------------------------------------------------------------------------

View File

@ -3,6 +3,7 @@ package registry
import (
"context"
"fmt"
"strings"
"time"
"github.com/DeBrosOfficial/network/pkg/rqlite"
@ -66,7 +67,12 @@ func (l *InvocationLogger) Log(ctx context.Context, inv *InvocationRecordData) e
return nil
}
// GetLogs retrieves logs for a function.
// GetLogs retrieves WASM-emitted log entries for a function (rows in
// function_logs). Functions that don't call log_info / log_error from
// their WASM code will return an empty slice here — that's expected.
//
// For "what happened when this function was invoked" use GetInvocations
// instead; it always populates as long as the function has been invoked.
func (l *InvocationLogger) GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error) {
if limit <= 0 {
limit = 100
@ -102,3 +108,131 @@ func (l *InvocationLogger) GetLogs(ctx context.Context, namespace, name string,
return logs, nil
}
// GetInvocations returns invocation history for a function in reverse
// chronological order, with any associated WASM log entries nested per
// record under WASMLogs.
//
// This is what `orama function logs <name>` displays — it's always
// populated as long as the function has been invoked at least once.
// Returns an empty slice (not an error) when there are no invocations.
//
// Implementation: two queries — first the invocation rows, then a
// single batched query for all WASM log entries belonging to those
// invocation IDs. We don't LEFT JOIN because that would return one
// row per (invocation × log entry) which is awkward to scan.
func (l *InvocationLogger) GetInvocations(ctx context.Context, namespace, name string, limit int) ([]Invocation, error) {
if limit <= 0 {
limit = 50
}
invQuery := `
SELECT i.id, i.request_id, i.trigger_type, i.caller_wallet,
i.input_size, i.output_size, i.started_at, i.completed_at,
i.duration_ms, i.status, i.error_message, i.memory_used_mb
FROM function_invocations i
JOIN functions f ON i.function_id = f.id
WHERE f.namespace = ? AND f.name = ?
ORDER BY i.started_at DESC
LIMIT ?
`
var rows []struct {
ID string `db:"id"`
RequestID string `db:"request_id"`
TriggerType string `db:"trigger_type"`
CallerWallet string `db:"caller_wallet"`
InputSize int `db:"input_size"`
OutputSize int `db:"output_size"`
StartedAt time.Time `db:"started_at"`
CompletedAt time.Time `db:"completed_at"`
DurationMS int64 `db:"duration_ms"`
Status string `db:"status"`
ErrorMessage string `db:"error_message"`
MemoryUsedMB float64 `db:"memory_used_mb"`
}
if err := l.db.Query(ctx, &rows, invQuery, namespace, name, limit); err != nil {
return nil, fmt.Errorf("failed to query invocations: %w", err)
}
if len(rows) == 0 {
return []Invocation{}, nil
}
// Batched fetch of WASM log entries for these invocation IDs.
// We use IN (?, ?, ...) with one placeholder per ID. This stays
// fast because limit is bounded (default 50).
invIDs := make([]string, len(rows))
for i, r := range rows {
invIDs[i] = r.ID
}
logsByInv, err := l.fetchLogsForInvocations(ctx, invIDs)
if err != nil {
// Don't fail the whole call if WASM-log fetch fails; the
// invocation summary is still useful. Log and continue.
l.logger.Warn("failed to fetch nested WASM logs; returning invocations without them",
zap.Error(err))
logsByInv = map[string][]LogEntry{}
}
out := make([]Invocation, len(rows))
for i, r := range rows {
out[i] = Invocation{
ID: r.ID,
RequestID: r.RequestID,
TriggerType: r.TriggerType,
CallerWallet: r.CallerWallet,
InputSize: r.InputSize,
OutputSize: r.OutputSize,
StartedAt: r.StartedAt,
CompletedAt: r.CompletedAt,
DurationMS: r.DurationMS,
Status: r.Status,
ErrorMessage: r.ErrorMessage,
MemoryUsedMB: r.MemoryUsedMB,
WASMLogs: logsByInv[r.ID],
}
}
return out, nil
}
// fetchLogsForInvocations returns a map of invocation_id → []LogEntry,
// fetching all entries in one query.
func (l *InvocationLogger) fetchLogsForInvocations(ctx context.Context, invocationIDs []string) (map[string][]LogEntry, error) {
if len(invocationIDs) == 0 {
return map[string][]LogEntry{}, nil
}
placeholders := strings.Repeat("?,", len(invocationIDs))
placeholders = placeholders[:len(placeholders)-1]
query := `
SELECT invocation_id, level, message, timestamp
FROM function_logs
WHERE invocation_id IN (` + placeholders + `)
ORDER BY timestamp ASC
`
args := make([]interface{}, len(invocationIDs))
for i, id := range invocationIDs {
args[i] = id
}
var rows []struct {
InvocationID string `db:"invocation_id"`
Level string `db:"level"`
Message string `db:"message"`
Timestamp time.Time `db:"timestamp"`
}
if err := l.db.Query(ctx, &rows, query, args...); err != nil {
return nil, fmt.Errorf("failed to query logs: %w", err)
}
out := make(map[string][]LogEntry, len(invocationIDs))
for _, r := range rows {
out[r.InvocationID] = append(out[r.InvocationID], LogEntry{
Level: r.Level,
Message: r.Message,
Timestamp: r.Timestamp,
})
}
return out, nil
}

View File

@ -0,0 +1,221 @@
package registry
import (
"context"
"testing"
"time"
"github.com/DeBrosOfficial/network/pkg/rqlite"
"go.uber.org/zap"
)
// fakeDB is a tiny rqlite.Client stub that lets tests script Query / Exec
// behavior. We only implement what InvocationLogger calls.
type fakeDB struct {
rqlite.Client
queries []recordedQuery
// onQuery is called for every Query() invocation, in order. Each
// callback fills in the destination slice and returns an optional
// error. Tests pop callbacks; running out is a test bug.
onQuery []func(dest interface{}) error
}
type recordedQuery struct {
sql string
args []interface{}
}
func (f *fakeDB) Query(_ context.Context, dest interface{}, sql string, args ...interface{}) error {
f.queries = append(f.queries, recordedQuery{sql: sql, args: args})
if len(f.onQuery) == 0 {
return nil
}
cb := f.onQuery[0]
f.onQuery = f.onQuery[1:]
return cb(dest)
}
func TestGetInvocations_no_invocations_returns_empty_slice(t *testing.T) {
db := &fakeDB{}
il := NewInvocationLogger(db, zap.NewNop())
got, err := il.GetInvocations(context.Background(), "ns", "fn", 50)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got == nil {
t.Fatal("expected non-nil empty slice; nil breaks JSON marshalling for clients")
}
if len(got) != 0 {
t.Errorf("expected empty, got %d records", len(got))
}
}
func TestGetInvocations_populates_records_and_nests_logs(t *testing.T) {
now := time.Now()
// Arrange the fake to return two invocations on the first Query, then
// two log rows on the second.
db := &fakeDB{
onQuery: []func(dest interface{}) error{
// Invocation rows.
func(dest interface{}) error {
rows := dest.(*[]struct {
ID string `db:"id"`
RequestID string `db:"request_id"`
TriggerType string `db:"trigger_type"`
CallerWallet string `db:"caller_wallet"`
InputSize int `db:"input_size"`
OutputSize int `db:"output_size"`
StartedAt time.Time `db:"started_at"`
CompletedAt time.Time `db:"completed_at"`
DurationMS int64 `db:"duration_ms"`
Status string `db:"status"`
ErrorMessage string `db:"error_message"`
MemoryUsedMB float64 `db:"memory_used_mb"`
})
*rows = append(*rows,
struct {
ID string `db:"id"`
RequestID string `db:"request_id"`
TriggerType string `db:"trigger_type"`
CallerWallet string `db:"caller_wallet"`
InputSize int `db:"input_size"`
OutputSize int `db:"output_size"`
StartedAt time.Time `db:"started_at"`
CompletedAt time.Time `db:"completed_at"`
DurationMS int64 `db:"duration_ms"`
Status string `db:"status"`
ErrorMessage string `db:"error_message"`
MemoryUsedMB float64 `db:"memory_used_mb"`
}{
ID: "inv-1", RequestID: "req-A", Status: "success",
DurationMS: 12, StartedAt: now,
},
struct {
ID string `db:"id"`
RequestID string `db:"request_id"`
TriggerType string `db:"trigger_type"`
CallerWallet string `db:"caller_wallet"`
InputSize int `db:"input_size"`
OutputSize int `db:"output_size"`
StartedAt time.Time `db:"started_at"`
CompletedAt time.Time `db:"completed_at"`
DurationMS int64 `db:"duration_ms"`
Status string `db:"status"`
ErrorMessage string `db:"error_message"`
MemoryUsedMB float64 `db:"memory_used_mb"`
}{
ID: "inv-2", RequestID: "req-B", Status: "error",
ErrorMessage: "boom", StartedAt: now.Add(-1 * time.Minute),
},
)
return nil
},
// Nested WASM log rows: one for inv-1, none for inv-2.
func(dest interface{}) error {
rows := dest.(*[]struct {
InvocationID string `db:"invocation_id"`
Level string `db:"level"`
Message string `db:"message"`
Timestamp time.Time `db:"timestamp"`
})
*rows = append(*rows, struct {
InvocationID string `db:"invocation_id"`
Level string `db:"level"`
Message string `db:"message"`
Timestamp time.Time `db:"timestamp"`
}{
InvocationID: "inv-1", Level: "info", Message: "hi", Timestamp: now,
})
return nil
},
},
}
il := NewInvocationLogger(db, zap.NewNop())
got, err := il.GetInvocations(context.Background(), "ns", "fn", 50)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(got) != 2 {
t.Fatalf("expected 2 invocations, got %d", len(got))
}
if got[0].ID != "inv-1" || got[0].Status != "success" {
t.Errorf("first invocation wrong: %+v", got[0])
}
if len(got[0].WASMLogs) != 1 || got[0].WASMLogs[0].Message != "hi" {
t.Errorf("expected nested WASM log on inv-1, got %+v", got[0].WASMLogs)
}
if got[1].ID != "inv-2" || got[1].ErrorMessage != "boom" {
t.Errorf("second invocation wrong: %+v", got[1])
}
if len(got[1].WASMLogs) != 0 {
t.Errorf("expected no WASM logs on inv-2, got %+v", got[1].WASMLogs)
}
}
func TestGetInvocations_log_query_failure_does_not_drop_records(t *testing.T) {
// Even if the WASM-logs query fails, the invocation summary is still
// useful — we degrade gracefully (log a warn) rather than failing the
// whole call.
db := &fakeDB{
onQuery: []func(dest interface{}) error{
// Invocation query: one row.
func(dest interface{}) error {
rows := dest.(*[]struct {
ID string `db:"id"`
RequestID string `db:"request_id"`
TriggerType string `db:"trigger_type"`
CallerWallet string `db:"caller_wallet"`
InputSize int `db:"input_size"`
OutputSize int `db:"output_size"`
StartedAt time.Time `db:"started_at"`
CompletedAt time.Time `db:"completed_at"`
DurationMS int64 `db:"duration_ms"`
Status string `db:"status"`
ErrorMessage string `db:"error_message"`
MemoryUsedMB float64 `db:"memory_used_mb"`
})
*rows = append(*rows, struct {
ID string `db:"id"`
RequestID string `db:"request_id"`
TriggerType string `db:"trigger_type"`
CallerWallet string `db:"caller_wallet"`
InputSize int `db:"input_size"`
OutputSize int `db:"output_size"`
StartedAt time.Time `db:"started_at"`
CompletedAt time.Time `db:"completed_at"`
DurationMS int64 `db:"duration_ms"`
Status string `db:"status"`
ErrorMessage string `db:"error_message"`
MemoryUsedMB float64 `db:"memory_used_mb"`
}{ID: "inv-1", Status: "success"})
return nil
},
// Nested-log query: simulate a transient DB error.
func(_ interface{}) error {
return errFake
},
},
}
il := NewInvocationLogger(db, zap.NewNop())
got, err := il.GetInvocations(context.Background(), "ns", "fn", 50)
if err != nil {
t.Fatalf("expected graceful degradation, got error: %v", err)
}
if len(got) != 1 {
t.Fatalf("expected 1 invocation, got %d", len(got))
}
if len(got[0].WASMLogs) != 0 {
t.Errorf("expected empty WASMLogs on log-fetch failure, got %+v", got[0].WASMLogs)
}
}
// errFake is a sentinel for log-query failure tests.
var errFake = &fakeError{}
type fakeError struct{}
func (e *fakeError) Error() string { return "fake db error" }

View File

@ -105,11 +105,20 @@ func (r *Registry) GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, er
return r.ipfsStore.Get(ctx, wasmCID)
}
// GetLogs retrieves logs for a function.
// GetLogs retrieves WASM-emitted log entries for a function. Most functions
// don't call log_info / log_error, so this is often empty — see
// GetInvocations for the always-populated invocation-history view.
func (r *Registry) GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error) {
return r.invocationLogger.GetLogs(ctx, namespace, name, limit)
}
// GetInvocations returns the invocation-history view for a function:
// timestamp / status / duration / error_message per invocation, with any
// associated WASM log entries nested per record.
func (r *Registry) GetInvocations(ctx context.Context, namespace, name string, limit int) ([]Invocation, error) {
return r.invocationLogger.GetInvocations(ctx, namespace, name, limit)
}
// GetEnvVars retrieves environment variables for a function.
func (r *Registry) GetEnvVars(ctx context.Context, functionID string) (map[string]string, error) {
return r.functionStore.GetEnvVars(ctx, functionID)

View File

@ -66,13 +66,39 @@ type Function struct {
WSMaxInflightPerConn int
}
// LogEntry represents a log message from a function.
// LogEntry represents a log message emitted from inside a WASM function
// via the log_info / log_error host calls.
type LogEntry struct {
Level string
Message string
Timestamp time.Time
}
// Invocation is a record of one function invocation, returned by
// GetInvocations. It always populates regardless of whether the function
// emitted any log_info/log_error calls — the WASM-emitted entries are
// nested under WASMLogs (which may be empty).
//
// This is the right answer to "what happened on this invocation" — the
// CLI's `function logs` and dashboard log views consume this. The
// older GetLogs(LogEntry) returns ONLY WASM-emitted entries, which is
// usually empty and confused users (bug #211).
type Invocation struct {
ID string `json:"id"`
RequestID string `json:"request_id"`
TriggerType string `json:"trigger_type"`
CallerWallet string `json:"caller_wallet,omitempty"`
InputSize int `json:"input_size"`
OutputSize int `json:"output_size"`
StartedAt time.Time `json:"started_at"`
CompletedAt time.Time `json:"completed_at"`
DurationMS int64 `json:"duration_ms"`
Status string `json:"status"`
ErrorMessage string `json:"error_message,omitempty"`
MemoryUsedMB float64 `json:"memory_used_mb,omitempty"`
WASMLogs []LogEntry `json:"wasm_logs,omitempty"`
}
// FunctionRegistry interface
type FunctionRegistry interface {
Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) (*Function, error)
@ -80,7 +106,16 @@ type FunctionRegistry interface {
List(ctx context.Context, namespace string) ([]*Function, error)
Delete(ctx context.Context, namespace, name string, version int) error
GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, error)
// GetLogs returns ONLY WASM-emitted log entries (rows in function_logs).
// This is rarely useful on its own — most functions don't emit any.
// Prefer GetInvocations for the complete invocation-history view.
GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error)
// GetInvocations returns invocation history (always populated when the
// function has been invoked at least once) with any associated WASM
// log entries nested per record. Sorted by started_at DESC.
GetInvocations(ctx context.Context, namespace, name string, limit int) ([]Invocation, error)
}
// Error types

View File

@ -84,8 +84,34 @@ type FunctionRegistry interface {
// GetWASMBytes retrieves the compiled WASM bytecode for a function.
GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, error)
// GetLogs retrieves logs for a function.
// GetLogs returns WASM-emitted log entries (function_logs rows). Often
// empty because most functions don't call log_info / log_error. Use
// GetInvocations for the always-populated invocation-history view.
GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error)
// GetInvocations returns invocation history for a function in reverse
// chronological order, with any associated WASM log entries nested
// per record. Always populated when the function has been invoked.
GetInvocations(ctx context.Context, namespace, name string, limit int) ([]Invocation, error)
}
// Invocation is the record of one function invocation as seen by
// `orama function logs`. Mirrors registry.Invocation; defined here at the
// public package boundary so callers don't need to import the inner package.
type Invocation struct {
ID string `json:"id"`
RequestID string `json:"request_id"`
TriggerType string `json:"trigger_type"`
CallerWallet string `json:"caller_wallet,omitempty"`
InputSize int `json:"input_size"`
OutputSize int `json:"output_size"`
StartedAt time.Time `json:"started_at"`
CompletedAt time.Time `json:"completed_at"`
DurationMS int64 `json:"duration_ms"`
Status string `json:"status"`
ErrorMessage string `json:"error_message,omitempty"`
MemoryUsedMB float64 `json:"memory_used_mb,omitempty"`
WASMLogs []LogEntry `json:"wasm_logs,omitempty"`
}
// FunctionExecutor handles the actual execution of WASM functions.
@ -256,6 +282,14 @@ type InvocationContext struct {
// the standard sub/namespace fields). Read via host fn `get_caller_claim`.
// Populated by auth handlers from JWTClaims.Custom; empty for non-JWT auth.
CallerClaims map[string]string `json:"caller_claims,omitempty"`
// CallerJWTSubject is the `sub` claim of the Bearer JWT, if any.
// EXPLICITLY captured from the JWT independent of the API-key-vs-JWT
// wallet-resolution heuristic — so functions that must bind on the
// JWT-signed identity (signup flows) can do so reliably even when the
// caller also presents an API key. Empty string when the request was
// not JWT-authenticated. Bug #215.
CallerJWTSubject string `json:"caller_jwt_subject,omitempty"`
}
// InvocationResult represents the result of a function invocation.
@ -356,6 +390,27 @@ type HostServices interface {
DBQuery(ctx context.Context, query string, args []interface{}) ([]byte, error)
DBExecute(ctx context.Context, query string, args []interface{}) (int64, error)
// DBExecuteV2 is the typed equivalent of DBExecute that returns BOTH the
// rows-affected count AND a JSON envelope. The legacy DBExecute returns
// only uint32(rows) — collapsing real errors into "0 rows affected"
// (bug #218). New code should call DBExecuteV2 to detect real failures.
//
// Output JSON shape:
// {"rows_affected": 1, "last_insert_id": 42, "error": ""} // success
// {"rows_affected": 0, "last_insert_id": 0, "error": "..."} // failure
//
// Returns a Go error only on host-side validation failures (no DB,
// bad JSON args). SQL execution errors are encoded in the JSON.
DBExecuteV2(ctx context.Context, query string, args []interface{}) ([]byte, error)
// DBQueryV2 is the typed equivalent of DBQuery — returns a JSON envelope
// distinguishing "empty result" from "query failed".
//
// Output JSON shape:
// {"rows": [...], "error": ""} // success (rows may be [])
// {"rows": [], "error": "..."} // failure
DBQueryV2(ctx context.Context, query string, args []interface{}) ([]byte, error)
// Cache operations
CacheGet(ctx context.Context, key string) ([]byte, error)
CacheSet(ctx context.Context, key string, value []byte, ttlSeconds int64) error
@ -448,6 +503,12 @@ type HostServices interface {
// GetCallerClaim returns a custom JWT claim's value, or empty if missing
// or the request was not JWT-authenticated.
GetCallerClaim(ctx context.Context, name string) string
// GetCallerJWTSubject returns the JWT `sub` claim independent of the
// API-key-vs-JWT wallet-resolution heuristic. Empty when the request
// was not JWT-authenticated. Use this when a function must bind on
// the JWT-signed identity (e.g. signup-time wallet ownership checks)
// and the caller may ALSO present an API key. Bug #215.
GetCallerJWTSubject(ctx context.Context) string
// Job operations
EnqueueBackground(ctx context.Context, functionName string, payload []byte) (string, error)