diff --git a/migrations/004_serverless_functions.sql b/migrations/004_serverless_functions.sql index 5c3cb0f..194e565 100644 --- a/migrations/004_serverless_functions.sql +++ b/migrations/004_serverless_functions.sql @@ -24,7 +24,7 @@ CREATE TABLE IF NOT EXISTS functions ( created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, created_by TEXT NOT NULL, - UNIQUE(namespace, name, version) + UNIQUE(namespace, name) ); CREATE INDEX IF NOT EXISTS idx_functions_namespace ON functions(namespace); diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index e644f85..297d2fd 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -17,12 +17,12 @@ import ( "github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/config" + "github.com/DeBrosOfficial/network/pkg/gateway/auth" "github.com/DeBrosOfficial/network/pkg/ipfs" "github.com/DeBrosOfficial/network/pkg/logging" "github.com/DeBrosOfficial/network/pkg/olric" "github.com/DeBrosOfficial/network/pkg/rqlite" "github.com/DeBrosOfficial/network/pkg/serverless" - "github.com/DeBrosOfficial/network/pkg/gateway/auth" "github.com/multiformats/go-multiaddr" olriclib "github.com/olric-data/olric" "go.uber.org/zap" @@ -65,11 +65,11 @@ type Config struct { } type Gateway struct { - logger *logging.ColoredLogger - cfg *Config - client client.NetworkClient - nodePeerID string // The node's actual peer ID from its identity file (overrides client's peer ID) - startedAt time.Time + logger *logging.ColoredLogger + cfg *Config + client client.NetworkClient + nodePeerID string // The node's actual peer ID from its identity file (overrides client's peer ID) + startedAt time.Time // rqlite SQL connection and HTTP ORM gateway sqlDB *sql.DB @@ -345,7 +345,7 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { engineCfg.ModuleCacheSize = 100 // Create WASM engine - engine, engineErr := serverless.NewEngine(engineCfg, registry, hostFuncs, logger.Logger) + engine, engineErr := serverless.NewEngine(engineCfg, registry, hostFuncs, logger.Logger, serverless.WithInvocationLogger(registry)) if engineErr != nil { logger.ComponentWarn(logging.ComponentGeneral, "failed to initialize serverless engine; functions disabled", zap.Error(engineErr)) } else { @@ -355,28 +355,28 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { gw.serverlessInvoker = serverless.NewInvoker(engine, registry, hostFuncs, logger.Logger) // Create HTTP handlers - gw.serverlessHandlers = NewServerlessHandlers( - gw.serverlessInvoker, - registry, - gw.serverlessWSMgr, - logger.Logger, - ) + gw.serverlessHandlers = NewServerlessHandlers( + gw.serverlessInvoker, + registry, + gw.serverlessWSMgr, + logger.Logger, + ) - // Initialize auth service - // For now using ephemeral key, can be loaded from config later - key, _ := rsa.GenerateKey(rand.Reader, 2048) - keyPEM := pem.EncodeToMemory(&pem.Block{ - Type: "RSA PRIVATE KEY", - Bytes: x509.MarshalPKCS1PrivateKey(key), - }) - authService, err := auth.NewService(logger, c, string(keyPEM), cfg.ClientNamespace) - if err != nil { - logger.ComponentError(logging.ComponentGeneral, "failed to initialize auth service", zap.Error(err)) - } else { - gw.authService = authService - } + // Initialize auth service + // For now using ephemeral key, can be loaded from config later + key, _ := rsa.GenerateKey(rand.Reader, 2048) + keyPEM := pem.EncodeToMemory(&pem.Block{ + Type: "RSA PRIVATE KEY", + Bytes: x509.MarshalPKCS1PrivateKey(key), + }) + authService, err := auth.NewService(logger, c, string(keyPEM), cfg.ClientNamespace) + if err != nil { + logger.ComponentError(logging.ComponentGeneral, "failed to initialize auth service", zap.Error(err)) + } else { + gw.authService = authService + } - logger.ComponentInfo(logging.ComponentGeneral, "Serverless function engine ready", + logger.ComponentInfo(logging.ComponentGeneral, "Serverless function engine ready", zap.Int("default_memory_mb", engineCfg.DefaultMemoryLimitMB), zap.Int("default_timeout_sec", engineCfg.DefaultTimeoutSeconds), zap.Int("module_cache_size", engineCfg.ModuleCacheSize), diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index 1cd3075..2461716 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -63,11 +63,8 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler { next.ServeHTTP(w, r) return } - // Allow public endpoints without auth - if isPublicPath(r.URL.Path) { - next.ServeHTTP(w, r) - return - } + + isPublic := isPublicPath(r.URL.Path) // 1) Try JWT Bearer first if Authorization looks like one if auth := r.Header.Get("Authorization"); auth != "" { @@ -92,6 +89,10 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler { // 2) Fallback to API key (validate against DB) key := extractAPIKey(r) if key == "" { + if isPublic { + next.ServeHTTP(w, r) + return + } w.Header().Set("WWW-Authenticate", "Bearer realm=\"gateway\", charset=\"UTF-8\"") writeError(w, http.StatusUnauthorized, "missing API key") return @@ -105,6 +106,10 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler { q := "SELECT namespaces.name FROM api_keys JOIN namespaces ON api_keys.namespace_id = namespaces.id WHERE api_keys.key = ? LIMIT 1" res, err := db.Query(internalCtx, q, key) if err != nil || res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 { + if isPublic { + next.ServeHTTP(w, r) + return + } w.Header().Set("WWW-Authenticate", "Bearer error=\"invalid_token\"") writeError(w, http.StatusUnauthorized, "invalid API key") return @@ -119,6 +124,10 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler { ns = strings.TrimSpace(ns) } if ns == "" { + if isPublic { + next.ServeHTTP(w, r) + return + } w.Header().Set("WWW-Authenticate", "Bearer error=\"invalid_token\"") writeError(w, http.StatusUnauthorized, "invalid API key") return @@ -184,6 +193,11 @@ func isPublicPath(p string) bool { return true } + // Serverless invocation is public (authorization is handled within the invoker) + if strings.HasPrefix(p, "/v1/invoke/") || (strings.HasPrefix(p, "/v1/functions/") && strings.HasSuffix(p, "/invoke")) { + return true + } + switch p { case "/health", "/v1/health", "/status", "/v1/status", "/v1/auth/jwks", "/.well-known/jwks.json", "/v1/version", "/v1/auth/login", "/v1/auth/challenge", "/v1/auth/verify", "/v1/auth/register", "/v1/auth/refresh", "/v1/auth/logout", "/v1/auth/api-key", "/v1/auth/simple-key", "/v1/network/status", "/v1/network/peers": return true @@ -325,6 +339,9 @@ func requiresNamespaceOwnership(p string) bool { if strings.HasPrefix(p, "/v1/proxy/") { return true } + if strings.HasPrefix(p, "/v1/functions") { + return true + } return false } diff --git a/pkg/gateway/serverless_handlers.go b/pkg/gateway/serverless_handlers.go index e583168..1f37244 100644 --- a/pkg/gateway/serverless_handlers.go +++ b/pkg/gateway/serverless_handlers.go @@ -214,6 +214,23 @@ func (h *ServerlessHandlers) deployFunction(w http.ResponseWriter, r *http.Reque def.Namespace = r.FormValue("namespace") } + // Get other configuration fields from form + if v := r.FormValue("is_public"); v != "" { + def.IsPublic, _ = strconv.ParseBool(v) + } + if v := r.FormValue("memory_limit_mb"); v != "" { + def.MemoryLimitMB, _ = strconv.Atoi(v) + } + if v := r.FormValue("timeout_seconds"); v != "" { + def.TimeoutSeconds, _ = strconv.Atoi(v) + } + if v := r.FormValue("retry_count"); v != "" { + def.RetryCount, _ = strconv.Atoi(v) + } + if v := r.FormValue("retry_delay_seconds"); v != "" { + def.RetryDelaySeconds, _ = strconv.Atoi(v) + } + // Get WASM file file, _, err := r.FormFile("wasm") if err != nil { @@ -269,7 +286,8 @@ func (h *ServerlessHandlers) deployFunction(w http.ResponseWriter, r *http.Reque ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second) defer cancel() - if err := h.registry.Register(ctx, &def, wasmBytes); err != nil { + oldFn, err := h.registry.Register(ctx, &def, wasmBytes) + if err != nil { h.logger.Error("Failed to deploy function", zap.String("name", def.Name), zap.Error(err), @@ -278,6 +296,15 @@ func (h *ServerlessHandlers) deployFunction(w http.ResponseWriter, r *http.Reque return } + // Invalidate cache for the old version to ensure the new one is loaded + if oldFn != nil { + h.invoker.InvalidateCache(oldFn.WASMCID) + h.logger.Debug("Invalidated function cache", + zap.String("name", def.Name), + zap.String("old_wasm_cid", oldFn.WASMCID), + ) + } + h.logger.Info("Function deployed", zap.String("name", def.Name), zap.String("namespace", def.Namespace), @@ -410,6 +437,8 @@ func (h *ServerlessHandlers) invokeFunction(w http.ResponseWriter, r *http.Reque statusCode = http.StatusNotFound } else if serverless.IsResourceExhausted(err) { statusCode = http.StatusTooManyRequests + } else if serverless.IsUnauthorized(err) { + statusCode = http.StatusUnauthorized } writeJSON(w, statusCode, map[string]interface{}{ @@ -565,27 +594,59 @@ func (h *ServerlessHandlers) listVersions(w http.ResponseWriter, r *http.Request // getFunctionLogs handles GET /v1/functions/{name}/logs func (h *ServerlessHandlers) getFunctionLogs(w http.ResponseWriter, r *http.Request, name string) { - // TODO: Implement log retrieval from function_logs table + namespace := r.URL.Query().Get("namespace") + if namespace == "" { + namespace = h.getNamespaceFromRequest(r) + } + + if namespace == "" { + writeError(w, http.StatusBadRequest, "namespace required") + return + } + + limit := 100 + if lStr := r.URL.Query().Get("limit"); lStr != "" { + if l, err := strconv.Atoi(lStr); err == nil { + limit = l + } + } + + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + + logs, err := h.registry.GetLogs(ctx, namespace, name, limit) + if err != nil { + h.logger.Error("Failed to get function logs", + zap.String("name", name), + zap.String("namespace", namespace), + zap.Error(err), + ) + writeError(w, http.StatusInternalServerError, "Failed to get logs") + return + } + writeJSON(w, http.StatusOK, map[string]interface{}{ - "logs": []interface{}{}, - "message": "Log retrieval not yet implemented", + "name": name, + "namespace": namespace, + "logs": logs, + "count": len(logs), }) } // getNamespaceFromRequest extracts namespace from JWT or query param func (h *ServerlessHandlers) getNamespaceFromRequest(r *http.Request) string { - // Try query param first - if ns := r.URL.Query().Get("namespace"); ns != "" { - return ns - } - - // Try context (set by auth middleware) + // Try context first (set by auth middleware) - most secure if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil { if ns, ok := v.(string); ok && ns != "" { return ns } } + // Try query param as fallback (e.g. for public access or admin) + if ns := r.URL.Query().Get("namespace"); ns != "" { + return ns + } + // Try header as fallback if ns := r.Header.Get("X-Namespace"); ns != "" { return ns diff --git a/pkg/serverless/engine.go b/pkg/serverless/engine.go index 440401e..4ff4249 100644 --- a/pkg/serverless/engine.go +++ b/pkg/serverless/engine.go @@ -65,6 +65,7 @@ type InvocationRecord struct { Status InvocationStatus `json:"status"` ErrorMessage string `json:"error_message,omitempty"` MemoryUsedMB float64 `json:"memory_used_mb"` + Logs []LogEntry `json:"logs,omitempty"` } // RateLimiter checks if a request should be rate limited. @@ -470,6 +471,11 @@ func (e *Engine) logInvocation(ctx context.Context, fn *Function, invCtx *Invoca record.ErrorMessage = err.Error() } + // Collect logs from host services if supported + if hf, ok := e.hostServices.(interface{ GetLogs() []LogEntry }); ok { + record.Logs = hf.GetLogs() + } + if logErr := e.invocationLogger.Log(ctx, record); logErr != nil { e.logger.Warn("Failed to log invocation", zap.Error(logErr)) } @@ -489,6 +495,7 @@ func (e *Engine) registerHostModule(ctx context.Context) error { NewFunctionBuilder().WithFunc(e.hDBExecute).Export("db_execute"). NewFunctionBuilder().WithFunc(e.hCacheGet).Export("cache_get"). NewFunctionBuilder().WithFunc(e.hCacheSet).Export("cache_set"). + NewFunctionBuilder().WithFunc(e.hHTTPFetch).Export("http_fetch"). NewFunctionBuilder().WithFunc(e.hLogInfo).Export("log_info"). NewFunctionBuilder().WithFunc(e.hLogError).Export("log_error"). Instantiate(ctx) @@ -606,6 +613,39 @@ func (e *Engine) hCacheSet(ctx context.Context, mod api.Module, keyPtr, keyLen, _ = e.hostServices.CacheSet(ctx, string(key), val, ttl) } +func (e *Engine) hHTTPFetch(ctx context.Context, mod api.Module, methodPtr, methodLen, urlPtr, urlLen, headersPtr, headersLen, bodyPtr, bodyLen uint32) uint64 { + method, ok := mod.Memory().Read(methodPtr, methodLen) + if !ok { + return 0 + } + u, ok := mod.Memory().Read(urlPtr, urlLen) + if !ok { + return 0 + } + var headers map[string]string + if headersLen > 0 { + headersData, ok := mod.Memory().Read(headersPtr, headersLen) + if !ok { + return 0 + } + if err := json.Unmarshal(headersData, &headers); err != nil { + e.logger.Error("failed to unmarshal http_fetch headers", zap.Error(err)) + return 0 + } + } + body, ok := mod.Memory().Read(bodyPtr, bodyLen) + if !ok { + return 0 + } + + resp, err := e.hostServices.HTTPFetch(ctx, string(method), string(u), headers, body) + if err != nil { + e.logger.Error("host function http_fetch failed", zap.Error(err), zap.String("url", string(u))) + return 0 + } + return e.writeToGuest(ctx, mod, resp) +} + func (e *Engine) hLogInfo(ctx context.Context, mod api.Module, ptr, size uint32) { msg, ok := mod.Memory().Read(ptr, size) if ok { diff --git a/pkg/serverless/engine_test.go b/pkg/serverless/engine_test.go index 7ce4195..ba79dcf 100644 --- a/pkg/serverless/engine_test.go +++ b/pkg/serverless/engine_test.go @@ -39,7 +39,7 @@ func TestEngine_Execute(t *testing.T) { TimeoutSeconds: 5, } - err = registry.Register(context.Background(), fnDef, wasmBytes) + _, err = registry.Register(context.Background(), fnDef, wasmBytes) if err != nil { t.Fatalf("failed to register function: %v", err) } @@ -121,7 +121,7 @@ func TestEngine_Timeout(t *testing.T) { fn, _ := registry.Get(context.Background(), "test", "timeout", 0) if fn == nil { - _ = registry.Register(context.Background(), &FunctionDefinition{Name: "timeout", Namespace: "test"}, wasmBytes) + _, _ = registry.Register(context.Background(), &FunctionDefinition{Name: "timeout", Namespace: "test"}, wasmBytes) fn, _ = registry.Get(context.Background(), "test", "timeout", 0) } fn.TimeoutSeconds = 1 @@ -151,7 +151,7 @@ func TestEngine_MemoryLimit(t *testing.T) { 0x0a, 0x04, 0x01, 0x02, 0x00, 0x0b, } - _ = registry.Register(context.Background(), &FunctionDefinition{Name: "memory", Namespace: "test", MemoryLimitMB: 1, TimeoutSeconds: 5}, wasmBytes) + _, _ = registry.Register(context.Background(), &FunctionDefinition{Name: "memory", Namespace: "test", MemoryLimitMB: 1, TimeoutSeconds: 5}, wasmBytes) fn, _ := registry.Get(context.Background(), "test", "memory", 0) // This should pass because the minimal WASM doesn't use much memory @@ -183,7 +183,7 @@ func TestEngine_RealWASM(t *testing.T) { Namespace: "examples", TimeoutSeconds: 10, } - _ = registry.Register(context.Background(), fnDef, wasmBytes) + _, _ = registry.Register(context.Background(), fnDef, wasmBytes) fn, _ := registry.Get(context.Background(), "examples", "hello", 0) output, err := engine.Execute(context.Background(), fn, []byte(`{"name": "Tester"}`), nil) diff --git a/pkg/serverless/errors.go b/pkg/serverless/errors.go index 38b07e1..135dd6a 100644 --- a/pkg/serverless/errors.go +++ b/pkg/serverless/errors.go @@ -163,10 +163,10 @@ func (e *ValidationError) Error() string { // RetryableError wraps an error that should be retried. type RetryableError struct { - Cause error - RetryAfter int // Suggested retry delay in seconds - MaxRetries int // Maximum number of retries remaining - CurrentTry int // Current attempt number + Cause error + RetryAfter int // Suggested retry delay in seconds + MaxRetries int // Maximum number of retries remaining + CurrentTry int // Current attempt number } func (e *RetryableError) Error() string { @@ -194,6 +194,11 @@ func IsNotFound(err error) bool { errors.Is(err, ErrWSClientNotFound) } +// IsUnauthorized checks if an error indicates a lack of authorization. +func IsUnauthorized(err error) bool { + return errors.Is(err, ErrUnauthorized) +} + // IsResourceExhausted checks if an error indicates resource exhaustion. func IsResourceExhausted(err error) bool { return errors.Is(err, ErrRateLimited) || @@ -209,4 +214,3 @@ func IsServiceUnavailable(err error) bool { errors.Is(err, ErrDatabaseUnavailable) || errors.Is(err, ErrCacheUnavailable) } - diff --git a/pkg/serverless/hostfuncs.go b/pkg/serverless/hostfuncs.go index 220ce62..ead5e35 100644 --- a/pkg/serverless/hostfuncs.go +++ b/pkg/serverless/hostfuncs.go @@ -15,9 +15,10 @@ import ( "time" "github.com/DeBrosOfficial/network/pkg/ipfs" - olriclib "github.com/olric-data/olric" "github.com/DeBrosOfficial/network/pkg/pubsub" "github.com/DeBrosOfficial/network/pkg/rqlite" + "github.com/DeBrosOfficial/network/pkg/tlsutil" + olriclib "github.com/olric-data/olric" "go.uber.org/zap" ) @@ -76,7 +77,7 @@ func NewHostFunctions( pubsub: pubsubAdapter, wsManager: wsManager, secrets: secrets, - httpClient: &http.Client{Timeout: httpTimeout}, + httpClient: tlsutil.NewHTTPClient(httpTimeout), logger: logger, logs: make([]LogEntry, 0), } @@ -328,7 +329,12 @@ func (h *HostFunctions) HTTPFetch(ctx context.Context, method, url string, heade req, err := http.NewRequestWithContext(ctx, method, url, bodyReader) if err != nil { - return nil, &HostFunctionError{Function: "http_fetch", Cause: fmt.Errorf("failed to create request: %w", err)} + h.logger.Error("http_fetch request creation error", zap.Error(err), zap.String("url", url)) + errorResp := map[string]interface{}{ + "error": "failed to create request: " + err.Error(), + "status": 0, + } + return json.Marshal(errorResp) } for key, value := range headers { @@ -337,13 +343,23 @@ func (h *HostFunctions) HTTPFetch(ctx context.Context, method, url string, heade resp, err := h.httpClient.Do(req) if err != nil { - return nil, &HostFunctionError{Function: "http_fetch", Cause: err} + h.logger.Error("http_fetch transport error", zap.Error(err), zap.String("url", url)) + errorResp := map[string]interface{}{ + "error": err.Error(), + "status": 0, // Transport error + } + return json.Marshal(errorResp) } defer resp.Body.Close() respBody, err := io.ReadAll(resp.Body) if err != nil { - return nil, &HostFunctionError{Function: "http_fetch", Cause: fmt.Errorf("failed to read response: %w", err)} + h.logger.Error("http_fetch response read error", zap.Error(err), zap.String("url", url)) + errorResp := map[string]interface{}{ + "error": "failed to read response: " + err.Error(), + "status": resp.StatusCode, + } + return json.Marshal(errorResp) } // Encode response with status code @@ -638,4 +654,3 @@ func (s *DBSecretsManager) decrypt(ciphertext []byte) ([]byte, error) { nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:] return gcm.Open(nil, nonce, ciphertext, nil) } - diff --git a/pkg/serverless/invoke.go b/pkg/serverless/invoke.go index f1accea..87ba126 100644 --- a/pkg/serverless/invoke.go +++ b/pkg/serverless/invoke.go @@ -76,6 +76,17 @@ func (i *Invoker) Invoke(ctx context.Context, req *InvokeRequest) (*InvokeRespon }, err } + // Check authorization + authorized, err := i.CanInvoke(ctx, req.Namespace, req.FunctionName, req.CallerWallet) + if err != nil || !authorized { + return &InvokeResponse{ + RequestID: requestID, + Status: InvocationStatusError, + Error: "unauthorized", + DurationMS: time.Since(startTime).Milliseconds(), + }, ErrUnauthorized + } + // Get environment variables envVars, err := i.getEnvVars(ctx, fn.ID) if err != nil { @@ -159,6 +170,11 @@ func (i *Invoker) InvokeByID(ctx context.Context, functionID string, input []byt return response, nil } +// InvalidateCache removes a compiled module from the engine's cache. +func (i *Invoker) InvalidateCache(wasmCID string) { + i.engine.Invalidate(wasmCID) +} + // executeWithRetry executes a function with retry logic and DLQ. func (i *Invoker) executeWithRetry(ctx context.Context, fn *Function, input []byte, invCtx *InvocationContext) ([]byte, int, error) { var lastErr error @@ -434,4 +450,3 @@ func (i *Invoker) ValidateInput(input []byte, maxSize int) error { } return nil } - diff --git a/pkg/serverless/mocks_test.go b/pkg/serverless/mocks_test.go index 8c7b411..80be551 100644 --- a/pkg/serverless/mocks_test.go +++ b/pkg/serverless/mocks_test.go @@ -28,22 +28,26 @@ func NewMockRegistry() *MockRegistry { } } -func (m *MockRegistry) Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) error { +func (m *MockRegistry) Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) (*Function, error) { m.mu.Lock() defer m.mu.Unlock() id := fn.Namespace + "/" + fn.Name wasmCID := "cid-" + id + oldFn := m.functions[id] m.functions[id] = &Function{ - ID: id, - Name: fn.Name, - Namespace: fn.Namespace, - WASMCID: wasmCID, - MemoryLimitMB: fn.MemoryLimitMB, - TimeoutSeconds: fn.TimeoutSeconds, - Status: FunctionStatusActive, + ID: id, + Name: fn.Name, + Namespace: fn.Namespace, + WASMCID: wasmCID, + MemoryLimitMB: fn.MemoryLimitMB, + TimeoutSeconds: fn.TimeoutSeconds, + IsPublic: fn.IsPublic, + RetryCount: fn.RetryCount, + RetryDelaySeconds: fn.RetryDelaySeconds, + Status: FunctionStatusActive, } m.wasm[wasmCID] = wasmBytes - return nil + return oldFn, nil } func (m *MockRegistry) Get(ctx context.Context, namespace, name string, version int) (*Function, error) { @@ -85,6 +89,10 @@ func (m *MockRegistry) GetWASMBytes(ctx context.Context, wasmCID string) ([]byte return data, nil } +func (m *MockRegistry) GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error) { + return []LogEntry{}, nil +} + // MockHostServices is a mock implementation of HostServices type MockHostServices struct { mu sync.RWMutex diff --git a/pkg/serverless/registry.go b/pkg/serverless/registry.go index 821a810..0d2bf6f 100644 --- a/pkg/serverless/registry.go +++ b/pkg/serverless/registry.go @@ -6,6 +6,7 @@ import ( "database/sql" "fmt" "io" + "strings" "time" "github.com/DeBrosOfficial/network/pkg/ipfs" @@ -14,17 +15,18 @@ import ( "go.uber.org/zap" ) -// Ensure Registry implements FunctionRegistry interface. +// Ensure Registry implements FunctionRegistry and InvocationLogger interfaces. var _ FunctionRegistry = (*Registry)(nil) +var _ InvocationLogger = (*Registry)(nil) // Registry manages function metadata in RQLite and bytecode in IPFS. // It implements the FunctionRegistry interface. type Registry struct { - db rqlite.Client - ipfs ipfs.IPFSClient - ipfsAPIURL string - logger *zap.Logger - tableName string + db rqlite.Client + ipfs ipfs.IPFSClient + ipfsAPIURL string + logger *zap.Logger + tableName string } // RegistryConfig holds configuration for the Registry. @@ -43,35 +45,34 @@ func NewRegistry(db rqlite.Client, ipfsClient ipfs.IPFSClient, cfg RegistryConfi } } -// Register deploys a new function or creates a new version. -func (r *Registry) Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) error { +// Register deploys a new function or updates an existing one. +func (r *Registry) Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) (*Function, error) { if fn == nil { - return &ValidationError{Field: "definition", Message: "cannot be nil"} + return nil, &ValidationError{Field: "definition", Message: "cannot be nil"} } + fn.Name = strings.TrimSpace(fn.Name) + fn.Namespace = strings.TrimSpace(fn.Namespace) + if fn.Name == "" { - return &ValidationError{Field: "name", Message: "cannot be empty"} + return nil, &ValidationError{Field: "name", Message: "cannot be empty"} } if fn.Namespace == "" { - return &ValidationError{Field: "namespace", Message: "cannot be empty"} + return nil, &ValidationError{Field: "namespace", Message: "cannot be empty"} } if len(wasmBytes) == 0 { - return &ValidationError{Field: "wasmBytes", Message: "cannot be empty"} + return nil, &ValidationError{Field: "wasmBytes", Message: "cannot be empty"} + } + + // Check if function already exists (regardless of status) to get old metadata for invalidation + oldFn, err := r.getByNameInternal(ctx, fn.Namespace, fn.Name) + if err != nil && err != ErrFunctionNotFound { + return nil, &DeployError{FunctionName: fn.Name, Cause: err} } // Upload WASM to IPFS wasmCID, err := r.uploadWASM(ctx, wasmBytes, fn.Name) if err != nil { - return &DeployError{FunctionName: fn.Name, Cause: err} - } - - // Determine version (auto-increment if not specified) - version := fn.Version - if version == 0 { - latestVersion, err := r.getLatestVersion(ctx, fn.Namespace, fn.Name) - if err != nil && err != ErrFunctionNotFound { - return &DeployError{FunctionName: fn.Name, Cause: err} - } - version = latestVersion + 1 + return nil, &DeployError{FunctionName: fn.Name, Cause: err} } // Apply defaults @@ -88,48 +89,59 @@ func (r *Registry) Register(ctx context.Context, fn *FunctionDefinition, wasmByt retryDelay = 5 } - // Generate ID + now := time.Now() id := uuid.New().String() + version := 1 - // Insert function record + if oldFn != nil { + // Use existing ID and increment version + id = oldFn.ID + version = oldFn.Version + 1 + } + + // Use INSERT OR REPLACE to ensure we never hit UNIQUE constraint failures on (namespace, name). + // This handles both new registrations and overwriting existing (even inactive) functions. query := ` - INSERT INTO functions ( + INSERT OR REPLACE INTO functions ( id, name, namespace, version, wasm_cid, memory_limit_mb, timeout_seconds, is_public, retry_count, retry_delay_seconds, dlq_topic, status, created_at, updated_at, created_by ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` - now := time.Now() _, err = r.db.Exec(ctx, query, id, fn.Name, fn.Namespace, version, wasmCID, memoryLimit, timeout, fn.IsPublic, fn.RetryCount, retryDelay, fn.DLQTopic, - string(FunctionStatusActive), now, now, fn.Namespace, // created_by = namespace for now + string(FunctionStatusActive), now, now, fn.Namespace, ) if err != nil { - return &DeployError{FunctionName: fn.Name, Cause: fmt.Errorf("failed to insert function: %w", err)} + return nil, &DeployError{FunctionName: fn.Name, Cause: fmt.Errorf("failed to register function: %w", err)} } - // Insert environment variables + // Save environment variables if err := r.saveEnvVars(ctx, id, fn.EnvVars); err != nil { - return &DeployError{FunctionName: fn.Name, Cause: err} + return nil, &DeployError{FunctionName: fn.Name, Cause: err} } r.logger.Info("Function registered", zap.String("id", id), zap.String("name", fn.Name), zap.String("namespace", fn.Namespace), - zap.Int("version", version), zap.String("wasm_cid", wasmCID), + zap.Int("version", version), + zap.Bool("updated", oldFn != nil), ) - return nil + return oldFn, nil } // Get retrieves a function by name and optional version. // If version is 0, returns the latest version. func (r *Registry) Get(ctx context.Context, namespace, name string, version int) (*Function, error) { + namespace = strings.TrimSpace(namespace) + name = strings.TrimSpace(name) + var query string var args []interface{} @@ -208,6 +220,9 @@ func (r *Registry) List(ctx context.Context, namespace string) ([]*Function, err // Delete removes a function. If version is 0, removes all versions. func (r *Registry) Delete(ctx context.Context, namespace, name string, version int) error { + namespace = strings.TrimSpace(namespace) + name = strings.TrimSpace(name) + var query string var args []interface{} @@ -327,6 +342,88 @@ func (r *Registry) ListVersions(ctx context.Context, namespace, name string) ([] return functions, nil } +// Log records a function invocation and its logs to the database. +func (r *Registry) Log(ctx context.Context, inv *InvocationRecord) error { + if inv == nil { + return nil + } + + // Insert invocation record + invQuery := ` + INSERT INTO function_invocations ( + id, function_id, request_id, trigger_type, caller_wallet, + input_size, output_size, started_at, completed_at, + duration_ms, status, error_message, memory_used_mb + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ` + _, err := r.db.Exec(ctx, invQuery, + inv.ID, inv.FunctionID, inv.RequestID, string(inv.TriggerType), inv.CallerWallet, + inv.InputSize, inv.OutputSize, inv.StartedAt, inv.CompletedAt, + inv.DurationMS, string(inv.Status), inv.ErrorMessage, inv.MemoryUsedMB, + ) + if err != nil { + return fmt.Errorf("failed to insert invocation record: %w", err) + } + + // Insert logs if any + if len(inv.Logs) > 0 { + for _, entry := range inv.Logs { + logID := uuid.New().String() + logQuery := ` + INSERT INTO function_logs ( + id, function_id, invocation_id, level, message, timestamp + ) VALUES (?, ?, ?, ?, ?, ?) + ` + _, err := r.db.Exec(ctx, logQuery, + logID, inv.FunctionID, inv.ID, entry.Level, entry.Message, entry.Timestamp, + ) + if err != nil { + r.logger.Warn("Failed to insert function log", zap.Error(err)) + // Continue with other logs + } + } + } + + return nil +} + +// GetLogs retrieves logs for a function. +func (r *Registry) GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error) { + if limit <= 0 { + limit = 100 + } + + query := ` + SELECT l.level, l.message, l.timestamp + FROM function_logs l + JOIN functions f ON l.function_id = f.id + WHERE f.namespace = ? AND f.name = ? + ORDER BY l.timestamp DESC + LIMIT ? + ` + + var results []struct { + Level string `db:"level"` + Message string `db:"message"` + Timestamp time.Time `db:"timestamp"` + } + + if err := r.db.Query(ctx, &results, query, namespace, name, limit); err != nil { + return nil, fmt.Errorf("failed to query logs: %w", err) + } + + logs := make([]LogEntry, len(results)) + for i, res := range results { + logs[i] = LogEntry{ + Level: res.Level, + Message: res.Message, + Timestamp: res.Timestamp, + } + } + + return logs, nil +} + // ----------------------------------------------------------------------------- // Private helpers // ----------------------------------------------------------------------------- @@ -362,8 +459,42 @@ func (r *Registry) getLatestVersion(ctx context.Context, namespace, name string) return int(maxVersion.Int64), nil } +// getByNameInternal retrieves a function by name regardless of status. +func (r *Registry) getByNameInternal(ctx context.Context, namespace, name string) (*Function, error) { + namespace = strings.TrimSpace(namespace) + name = strings.TrimSpace(name) + + query := ` + SELECT id, name, namespace, version, wasm_cid, source_cid, + memory_limit_mb, timeout_seconds, is_public, + retry_count, retry_delay_seconds, dlq_topic, + status, created_at, updated_at, created_by + FROM functions + WHERE namespace = ? AND name = ? + ORDER BY version DESC + LIMIT 1 + ` + + var functions []functionRow + if err := r.db.Query(ctx, &functions, query, namespace, name); err != nil { + return nil, fmt.Errorf("failed to query function: %w", err) + } + + if len(functions) == 0 { + return nil, ErrFunctionNotFound + } + + return r.rowToFunction(&functions[0]), nil +} + // saveEnvVars saves environment variables for a function. func (r *Registry) saveEnvVars(ctx context.Context, functionID string, envVars map[string]string) error { + // Clear existing env vars first + deleteQuery := `DELETE FROM function_env_vars WHERE function_id = ?` + if _, err := r.db.Exec(ctx, deleteQuery, functionID); err != nil { + return fmt.Errorf("failed to clear existing env vars: %w", err) + } + if len(envVars) == 0 { return nil } @@ -428,4 +559,3 @@ type envVarRow struct { Key string `db:"key"` Value string `db:"value"` } - diff --git a/pkg/serverless/registry_test.go b/pkg/serverless/registry_test.go index 32fe587..d2f0328 100644 --- a/pkg/serverless/registry_test.go +++ b/pkg/serverless/registry_test.go @@ -11,9 +11,9 @@ func TestRegistry_RegisterAndGet(t *testing.T) { db := NewMockRQLite() ipfs := NewMockIPFSClient() logger := zap.NewNop() - + registry := NewRegistry(db, ipfs, RegistryConfig{IPFSAPIURL: "http://localhost:5001"}, logger) - + ctx := context.Background() fnDef := &FunctionDefinition{ Name: "test-func", @@ -21,13 +21,13 @@ func TestRegistry_RegisterAndGet(t *testing.T) { IsPublic: true, } wasmBytes := []byte("mock wasm") - - err := registry.Register(ctx, fnDef, wasmBytes) + + _, err := registry.Register(ctx, fnDef, wasmBytes) if err != nil { t.Fatalf("Register failed: %v", err) } - - // Since MockRQLite doesn't fully implement Query scanning yet, + + // Since MockRQLite doesn't fully implement Query scanning yet, // we won't be able to test Get() effectively without more work. // But we can check if wasm was uploaded. wasm, err := registry.GetWASMBytes(ctx, "cid-test-func.wasm") @@ -38,4 +38,3 @@ func TestRegistry_RegisterAndGet(t *testing.T) { t.Errorf("expected 'mock wasm', got %q", string(wasm)) } } - diff --git a/pkg/serverless/types.go b/pkg/serverless/types.go index f3e0dac..72e6f8a 100644 --- a/pkg/serverless/types.go +++ b/pkg/serverless/types.go @@ -68,7 +68,8 @@ const ( // Responsible for CRUD operations on function definitions. type FunctionRegistry interface { // Register deploys a new function or updates an existing one. - Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) error + // Returns the old function definition if it was updated, or nil if it was a new registration. + Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) (*Function, error) // Get retrieves a function by name and optional version. // If version is 0, returns the latest version. @@ -82,6 +83,9 @@ type FunctionRegistry interface { // GetWASMBytes retrieves the compiled WASM bytecode for a function. GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, error) + + // GetLogs retrieves logs for a function. + GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error) } // FunctionExecutor handles the actual execution of WASM functions.