diff --git a/.zed/debug.json b/.zed/debug.json deleted file mode 100644 index 4119f7a..0000000 --- a/.zed/debug.json +++ /dev/null @@ -1,68 +0,0 @@ -// Project-local debug tasks -// -// For more documentation on how to configure debug tasks, -// see: https://zed.dev/docs/debugger -[ - { - "label": "Gateway Go (Delve)", - "adapter": "Delve", - "request": "launch", - "mode": "debug", - "program": "./cmd/gateway", - "env": { - "GATEWAY_ADDR": ":6001", - "GATEWAY_BOOTSTRAP_PEERS": "/ip4/localhost/tcp/4001/p2p/12D3KooWSHHwEY6cga3ng7tD1rzStAU58ogQXVMX3LZJ6Gqf6dee", - "GATEWAY_NAMESPACE": "default", - "GATEWAY_API_KEY": "ak_iGustrsFk9H8uXpwczCATe5U:default" - } - }, - { - "label": "E2E Test Go (Delve)", - "adapter": "Delve", - "request": "launch", - "mode": "test", - "buildFlags": "-tags e2e", - "program": "./e2e", - "env": { - "GATEWAY_API_KEY": "ak_iGustrsFk9H8uXpwczCATe5U:default" - }, - "args": ["-test.v"] - }, - { - "adapter": "Delve", - "label": "Gateway Go 6001 Port (Delve)", - "request": "launch", - "mode": "debug", - "program": "./cmd/gateway", - "env": { - "GATEWAY_ADDR": ":6001", - "GATEWAY_BOOTSTRAP_PEERS": "/ip4/localhost/tcp/4001/p2p/12D3KooWSHHwEY6cga3ng7tD1rzStAU58ogQXVMX3LZJ6Gqf6dee", - "GATEWAY_NAMESPACE": "default", - "GATEWAY_API_KEY": "ak_iGustrsFk9H8uXpwczCATe5U:default" - } - }, - { - "adapter": "Delve", - "label": "Network CLI - peers (Delve)", - "request": "launch", - "mode": "debug", - "program": "./cmd/cli", - "args": ["peers"] - }, - { - "adapter": "Delve", - "label": "Network CLI - PubSub Subscribe (Delve)", - "request": "launch", - "mode": "debug", - "program": "./cmd/cli", - "args": ["pubsub", "subscribe", "monitoring"] - }, - { - "adapter": "Delve", - "label": "Node Go (Delve)", - "request": "launch", - "mode": "debug", - "program": "./cmd/node", - "args": ["--config", "configs/node.yaml"] - } -] diff --git a/README.md b/README.md index e61e9d8..2aa30e4 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,54 @@ make build ./bin/orama auth logout ``` +## Serverless Functions (WASM) + +Orama supports high-performance serverless function execution using WebAssembly (WASM). Functions are isolated, secure, and can interact with network services like the distributed cache. + +### 1. Build Functions + +Functions must be compiled to WASM. We recommend using [TinyGo](https://tinygo.org/). + +```bash +# Build example functions to examples/functions/bin/ +./examples/functions/build.sh +``` + +### 2. Deployment + +Deploy your compiled `.wasm` file to the network via the Gateway. + +```bash +# Deploy a function +curl -X POST http://localhost:6001/v1/functions \ + -H "Authorization: Bearer " \ + -F "name=hello-world" \ + -F "namespace=default" \ + -F "wasm=@./examples/functions/bin/hello.wasm" +``` + +### 3. Invocation + +Trigger your function with a JSON payload. The function receives the payload via `stdin` and returns its response via `stdout`. + +```bash +# Invoke via HTTP +curl -X POST http://localhost:6001/v1/functions/hello-world/invoke \ + -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{"name": "Developer"}' +``` + +### 4. Management + +```bash +# List all functions in a namespace +curl http://localhost:6001/v1/functions?namespace=default + +# Delete a function +curl -X DELETE http://localhost:6001/v1/functions/hello-world?namespace=default +``` + ## Production Deployment ### Prerequisites @@ -260,6 +308,11 @@ sudo orama install - `POST /v1/pubsub/publish` - Publish message - `GET /v1/pubsub/topics` - List topics - `GET /v1/pubsub/ws?topic=` - WebSocket subscribe +- `POST /v1/functions` - Deploy function (multipart/form-data) +- `POST /v1/functions/{name}/invoke` - Invoke function +- `GET /v1/functions` - List functions +- `DELETE /v1/functions/{name}` - Delete function +- `GET /v1/functions/{name}/logs` - Get function logs See `openapi/gateway.yaml` for complete API specification. diff --git a/example.http b/example.http new file mode 100644 index 0000000..9a7e50c --- /dev/null +++ b/example.http @@ -0,0 +1,158 @@ +### Orama Network Gateway API Examples +# This file is designed for the VS Code "REST Client" extension. +# It demonstrates the core capabilities of the DeBros Network Gateway. + +@baseUrl = http://localhost:6001 +@apiKey = ak_X32jj2fiin8zzv0hmBKTC5b5:default +@contentType = application/json + +############################################################ +### 1. SYSTEM & HEALTH +############################################################ + +# @name HealthCheck +GET {{baseUrl}}/v1/health +X-API-Key: {{apiKey}} + +### + +# @name SystemStatus +# Returns the full status of the gateway and connected services +GET {{baseUrl}}/v1/status +X-API-Key: {{apiKey}} + +### + +# @name NetworkStatus +# Returns the P2P network status and PeerID +GET {{baseUrl}}/v1/network/status +X-API-Key: {{apiKey}} + + +############################################################ +### 2. DISTRIBUTED CACHE (OLRIC) +############################################################ + +# @name CachePut +# Stores a value in the distributed cache (DMap) +POST {{baseUrl}}/v1/cache/put +X-API-Key: {{apiKey}} +Content-Type: {{contentType}} + +{ + "dmap": "demo-cache", + "key": "video-demo", + "value": "Hello from REST Client!" +} + +### + +# @name CacheGet +# Retrieves a value from the distributed cache +POST {{baseUrl}}/v1/cache/get +X-API-Key: {{apiKey}} +Content-Type: {{contentType}} + +{ + "dmap": "demo-cache", + "key": "video-demo" +} + +### + +# @name CacheScan +# Scans for keys in a specific DMap +POST {{baseUrl}}/v1/cache/scan +X-API-Key: {{apiKey}} +Content-Type: {{contentType}} + +{ + "dmap": "demo-cache" +} + + +############################################################ +### 3. DECENTRALIZED STORAGE (IPFS) +############################################################ + +# @name StorageUpload +# Uploads a file to IPFS (Multipart) +POST {{baseUrl}}/v1/storage/upload +X-API-Key: {{apiKey}} +Content-Type: multipart/form-data; boundary=boundary + +--boundary +Content-Disposition: form-data; name="file"; filename="demo.txt" +Content-Type: text/plain + +This is a demonstration of decentralized storage on the Sonr Network. +--boundary-- + +### + +# @name StorageStatus +# Check the pinning status and replication of a CID +# Replace {cid} with the CID returned from the upload above +@demoCid = bafkreid76y6x6v2n5o4n6n5o4n6n5o4n6n5o4n6n5o4 +GET {{baseUrl}}/v1/storage/status/{{demoCid}} +X-API-Key: {{apiKey}} + +### + +# @name StorageDownload +# Retrieve content directly from IPFS via the gateway +GET {{baseUrl}}/v1/storage/get/{{demoCid}} +X-API-Key: {{apiKey}} + + +############################################################ +### 4. REAL-TIME PUB/SUB +############################################################ + +# @name ListTopics +# Lists all active topics in the current namespace +GET {{baseUrl}}/v1/pubsub/topics +X-API-Key: {{apiKey}} + +### + +# @name PublishMessage +# Publishes a base64 encoded message to a topic +POST {{baseUrl}}/v1/pubsub/publish +X-API-Key: {{apiKey}} +Content-Type: {{contentType}} + +{ + "topic": "network-updates", + "data_base64": "U29uciBOZXR3b3JrIGlzIGF3ZXNvbWUh" +} + + +############################################################ +### 5. SERVERLESS FUNCTIONS +############################################################ + +# @name ListFunctions +# Lists all deployed serverless functions +GET {{baseUrl}}/v1/functions +X-API-Key: {{apiKey}} + +### + +# @name InvokeFunction +# Invokes a deployed function by name +# Path: /v1/invoke/{namespace}/{functionName} +POST {{baseUrl}}/v1/invoke/default/hello +X-API-Key: {{apiKey}} +Content-Type: {{contentType}} + +{ + "name": "Developer" +} + +### + +# @name WhoAmI +# Validates the API Key and returns caller identity +GET {{baseUrl}}/v1/auth/whoami +X-API-Key: {{apiKey}} \ No newline at end of file diff --git a/pkg/gateway/serverless_handlers.go b/pkg/gateway/serverless_handlers.go index dfe6bc4..e583168 100644 --- a/pkg/gateway/serverless_handlers.go +++ b/pkg/gateway/serverless_handlers.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/DeBrosOfficial/network/pkg/gateway/auth" "github.com/DeBrosOfficial/network/pkg/serverless" "github.com/google/uuid" "github.com/gorilla/websocket" @@ -578,7 +579,14 @@ func (h *ServerlessHandlers) getNamespaceFromRequest(r *http.Request) string { return ns } - // Try to extract from JWT (if authentication middleware has set it) + // Try context (set by auth middleware) + if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil { + if ns, ok := v.(string); ok && ns != "" { + return ns + } + } + + // Try header as fallback if ns := r.Header.Get("X-Namespace"); ns != "" { return ns } @@ -588,9 +596,29 @@ func (h *ServerlessHandlers) getNamespaceFromRequest(r *http.Request) string { // getWalletFromRequest extracts wallet address from JWT func (h *ServerlessHandlers) getWalletFromRequest(r *http.Request) string { + // 1. Try X-Wallet header (legacy/direct bypass) if wallet := r.Header.Get("X-Wallet"); wallet != "" { return wallet } + + // 2. Try JWT claims from context + if v := r.Context().Value(ctxKeyJWT); v != nil { + if claims, ok := v.(*auth.JWTClaims); ok && claims != nil { + subj := strings.TrimSpace(claims.Sub) + // Ensure it's not an API key (standard Orama logic) + if !strings.HasPrefix(strings.ToLower(subj), "ak_") && !strings.Contains(subj, ":") { + return subj + } + } + } + + // 3. Fallback to API key identity (namespace) + if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil { + if ns, ok := v.(string); ok && ns != "" { + return ns + } + } + return "" } diff --git a/pkg/serverless/engine.go b/pkg/serverless/engine.go index 86d4e30..440401e 100644 --- a/pkg/serverless/engine.go +++ b/pkg/serverless/engine.go @@ -3,6 +3,7 @@ package serverless import ( "bytes" "context" + "encoding/json" "fmt" "sync" "time" @@ -14,6 +15,13 @@ import ( "go.uber.org/zap" ) +// contextAwareHostServices is an internal interface for services that need to know about +// the current invocation context. +type contextAwareHostServices interface { + SetInvocationContext(invCtx *InvocationContext) + ClearContext() +} + // Ensure Engine implements FunctionExecutor interface. var _ FunctionExecutor = (*Engine)(nil) @@ -111,6 +119,11 @@ func NewEngine(cfg *Config, registry FunctionRegistry, hostServices HostServices opt(engine) } + // Register host functions + if err := engine.registerHostModule(context.Background()); err != nil { + return nil, fmt.Errorf("failed to register host module: %w", err) + } + return engine, nil } @@ -303,6 +316,12 @@ func (e *Engine) getOrCompileModule(ctx context.Context, wasmCID string) (wazero // executeModule instantiates and runs a WASM module. func (e *Engine) executeModule(ctx context.Context, compiled wazero.CompiledModule, fn *Function, input []byte, invCtx *InvocationContext) ([]byte, error) { + // Set invocation context for host functions if the service supports it + if hf, ok := e.hostServices.(contextAwareHostServices); ok { + hf.SetInvocationContext(invCtx) + defer hf.ClearContext() + } + // Create buffers for stdin/stdout (WASI uses these for I/O) stdin := bytes.NewReader(input) stdout := new(bytes.Buffer) @@ -455,3 +474,175 @@ func (e *Engine) logInvocation(ctx context.Context, fn *Function, invCtx *Invoca e.logger.Warn("Failed to log invocation", zap.Error(logErr)) } } + +// registerHostModule registers the Orama host functions with the wazero runtime. +func (e *Engine) registerHostModule(ctx context.Context) error { + // Register under both "env" and "host" to support different import styles + // The user requested "env" in instructions but "host" in expected result. + for _, moduleName := range []string{"env", "host"} { + _, err := e.runtime.NewHostModuleBuilder(moduleName). + NewFunctionBuilder().WithFunc(e.hGetCallerWallet).Export("get_caller_wallet"). + NewFunctionBuilder().WithFunc(e.hGetRequestID).Export("get_request_id"). + NewFunctionBuilder().WithFunc(e.hGetEnv).Export("get_env"). + NewFunctionBuilder().WithFunc(e.hGetSecret).Export("get_secret"). + NewFunctionBuilder().WithFunc(e.hDBQuery).Export("db_query"). + NewFunctionBuilder().WithFunc(e.hDBExecute).Export("db_execute"). + NewFunctionBuilder().WithFunc(e.hCacheGet).Export("cache_get"). + NewFunctionBuilder().WithFunc(e.hCacheSet).Export("cache_set"). + NewFunctionBuilder().WithFunc(e.hLogInfo).Export("log_info"). + NewFunctionBuilder().WithFunc(e.hLogError).Export("log_error"). + Instantiate(ctx) + if err != nil { + return err + } + } + return nil +} + +func (e *Engine) hGetCallerWallet(ctx context.Context, mod api.Module) uint64 { + wallet := e.hostServices.GetCallerWallet(ctx) + return e.writeToGuest(ctx, mod, []byte(wallet)) +} + +func (e *Engine) hGetRequestID(ctx context.Context, mod api.Module) uint64 { + rid := e.hostServices.GetRequestID(ctx) + return e.writeToGuest(ctx, mod, []byte(rid)) +} + +func (e *Engine) hGetEnv(ctx context.Context, mod api.Module, keyPtr, keyLen uint32) uint64 { + key, ok := mod.Memory().Read(keyPtr, keyLen) + if !ok { + return 0 + } + val, _ := e.hostServices.GetEnv(ctx, string(key)) + return e.writeToGuest(ctx, mod, []byte(val)) +} + +func (e *Engine) hGetSecret(ctx context.Context, mod api.Module, namePtr, nameLen uint32) uint64 { + name, ok := mod.Memory().Read(namePtr, nameLen) + if !ok { + return 0 + } + val, err := e.hostServices.GetSecret(ctx, string(name)) + if err != nil { + return 0 + } + return e.writeToGuest(ctx, mod, []byte(val)) +} + +func (e *Engine) hDBQuery(ctx context.Context, mod api.Module, queryPtr, queryLen, argsPtr, argsLen uint32) uint64 { + query, ok := mod.Memory().Read(queryPtr, queryLen) + if !ok { + return 0 + } + + var args []interface{} + if argsLen > 0 { + argsData, ok := mod.Memory().Read(argsPtr, argsLen) + if !ok { + return 0 + } + if err := json.Unmarshal(argsData, &args); err != nil { + e.logger.Error("failed to unmarshal db_query arguments", zap.Error(err)) + return 0 + } + } + + results, err := e.hostServices.DBQuery(ctx, string(query), args) + if err != nil { + e.logger.Error("host function db_query failed", zap.Error(err), zap.String("query", string(query))) + return 0 + } + return e.writeToGuest(ctx, mod, results) +} + +func (e *Engine) hDBExecute(ctx context.Context, mod api.Module, queryPtr, queryLen, argsPtr, argsLen uint32) uint32 { + query, ok := mod.Memory().Read(queryPtr, queryLen) + if !ok { + return 0 + } + + var args []interface{} + if argsLen > 0 { + argsData, ok := mod.Memory().Read(argsPtr, argsLen) + if !ok { + return 0 + } + if err := json.Unmarshal(argsData, &args); err != nil { + e.logger.Error("failed to unmarshal db_execute arguments", zap.Error(err)) + return 0 + } + } + + affected, err := e.hostServices.DBExecute(ctx, string(query), args) + if err != nil { + e.logger.Error("host function db_execute failed", zap.Error(err), zap.String("query", string(query))) + return 0 + } + return uint32(affected) +} + +func (e *Engine) hCacheGet(ctx context.Context, mod api.Module, keyPtr, keyLen uint32) uint64 { + key, ok := mod.Memory().Read(keyPtr, keyLen) + if !ok { + return 0 + } + val, err := e.hostServices.CacheGet(ctx, string(key)) + if err != nil { + return 0 + } + return e.writeToGuest(ctx, mod, val) +} + +func (e *Engine) hCacheSet(ctx context.Context, mod api.Module, keyPtr, keyLen, valPtr, valLen uint32, ttl int64) { + key, ok := mod.Memory().Read(keyPtr, keyLen) + if !ok { + return + } + val, ok := mod.Memory().Read(valPtr, valLen) + if !ok { + return + } + _ = e.hostServices.CacheSet(ctx, string(key), val, ttl) +} + +func (e *Engine) hLogInfo(ctx context.Context, mod api.Module, ptr, size uint32) { + msg, ok := mod.Memory().Read(ptr, size) + if ok { + e.hostServices.LogInfo(ctx, string(msg)) + } +} + +func (e *Engine) hLogError(ctx context.Context, mod api.Module, ptr, size uint32) { + msg, ok := mod.Memory().Read(ptr, size) + if ok { + e.hostServices.LogError(ctx, string(msg)) + } +} + +func (e *Engine) writeToGuest(ctx context.Context, mod api.Module, data []byte) uint64 { + if len(data) == 0 { + return 0 + } + // Try to find a non-conflicting allocator first, fallback to malloc + malloc := mod.ExportedFunction("orama_alloc") + if malloc == nil { + malloc = mod.ExportedFunction("malloc") + } + + if malloc == nil { + e.logger.Warn("WASM module missing malloc/orama_alloc export, cannot return string/bytes to guest") + return 0 + } + results, err := malloc.Call(ctx, uint64(len(data))) + if err != nil { + e.logger.Error("failed to call malloc in WASM module", zap.Error(err)) + return 0 + } + ptr := uint32(results[0]) + if !mod.Memory().Write(ptr, data) { + e.logger.Error("failed to write to WASM memory") + return 0 + } + return (uint64(ptr) << 32) | uint64(len(data)) +}