feat: enhance serverless function management and logging

- Updated the serverless functions table schema to remove the version constraint for uniqueness, allowing for more flexible function definitions.
- Enhanced the serverless engine to support HTTP fetch functionality, enabling external API calls from serverless functions.
- Implemented logging capabilities for function invocations, capturing detailed logs for better debugging and monitoring.
- Improved the authentication middleware to handle public endpoints more effectively, ensuring seamless access to serverless functions.
- Added new configuration options for serverless functions, including memory limits, timeout settings, and retry parameters, to optimize performance and reliability.
This commit is contained in:
anonpenguin23 2026-01-02 08:40:28 +02:00
parent df5b11b175
commit 4f893e08d1
13 changed files with 403 additions and 110 deletions

View File

@ -24,7 +24,7 @@ CREATE TABLE IF NOT EXISTS functions (
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_by TEXT NOT NULL, created_by TEXT NOT NULL,
UNIQUE(namespace, name, version) UNIQUE(namespace, name)
); );
CREATE INDEX IF NOT EXISTS idx_functions_namespace ON functions(namespace); CREATE INDEX IF NOT EXISTS idx_functions_namespace ON functions(namespace);

View File

@ -17,12 +17,12 @@ import (
"github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/client"
"github.com/DeBrosOfficial/network/pkg/config" "github.com/DeBrosOfficial/network/pkg/config"
"github.com/DeBrosOfficial/network/pkg/gateway/auth"
"github.com/DeBrosOfficial/network/pkg/ipfs" "github.com/DeBrosOfficial/network/pkg/ipfs"
"github.com/DeBrosOfficial/network/pkg/logging" "github.com/DeBrosOfficial/network/pkg/logging"
"github.com/DeBrosOfficial/network/pkg/olric" "github.com/DeBrosOfficial/network/pkg/olric"
"github.com/DeBrosOfficial/network/pkg/rqlite" "github.com/DeBrosOfficial/network/pkg/rqlite"
"github.com/DeBrosOfficial/network/pkg/serverless" "github.com/DeBrosOfficial/network/pkg/serverless"
"github.com/DeBrosOfficial/network/pkg/gateway/auth"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
olriclib "github.com/olric-data/olric" olriclib "github.com/olric-data/olric"
"go.uber.org/zap" "go.uber.org/zap"
@ -345,7 +345,7 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) {
engineCfg.ModuleCacheSize = 100 engineCfg.ModuleCacheSize = 100
// Create WASM engine // Create WASM engine
engine, engineErr := serverless.NewEngine(engineCfg, registry, hostFuncs, logger.Logger) engine, engineErr := serverless.NewEngine(engineCfg, registry, hostFuncs, logger.Logger, serverless.WithInvocationLogger(registry))
if engineErr != nil { if engineErr != nil {
logger.ComponentWarn(logging.ComponentGeneral, "failed to initialize serverless engine; functions disabled", zap.Error(engineErr)) logger.ComponentWarn(logging.ComponentGeneral, "failed to initialize serverless engine; functions disabled", zap.Error(engineErr))
} else { } else {

View File

@ -63,11 +63,8 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler {
next.ServeHTTP(w, r) next.ServeHTTP(w, r)
return return
} }
// Allow public endpoints without auth
if isPublicPath(r.URL.Path) { isPublic := isPublicPath(r.URL.Path)
next.ServeHTTP(w, r)
return
}
// 1) Try JWT Bearer first if Authorization looks like one // 1) Try JWT Bearer first if Authorization looks like one
if auth := r.Header.Get("Authorization"); auth != "" { if auth := r.Header.Get("Authorization"); auth != "" {
@ -92,6 +89,10 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler {
// 2) Fallback to API key (validate against DB) // 2) Fallback to API key (validate against DB)
key := extractAPIKey(r) key := extractAPIKey(r)
if key == "" { if key == "" {
if isPublic {
next.ServeHTTP(w, r)
return
}
w.Header().Set("WWW-Authenticate", "Bearer realm=\"gateway\", charset=\"UTF-8\"") w.Header().Set("WWW-Authenticate", "Bearer realm=\"gateway\", charset=\"UTF-8\"")
writeError(w, http.StatusUnauthorized, "missing API key") writeError(w, http.StatusUnauthorized, "missing API key")
return return
@ -105,6 +106,10 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler {
q := "SELECT namespaces.name FROM api_keys JOIN namespaces ON api_keys.namespace_id = namespaces.id WHERE api_keys.key = ? LIMIT 1" q := "SELECT namespaces.name FROM api_keys JOIN namespaces ON api_keys.namespace_id = namespaces.id WHERE api_keys.key = ? LIMIT 1"
res, err := db.Query(internalCtx, q, key) res, err := db.Query(internalCtx, q, key)
if err != nil || res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 { if err != nil || res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 {
if isPublic {
next.ServeHTTP(w, r)
return
}
w.Header().Set("WWW-Authenticate", "Bearer error=\"invalid_token\"") w.Header().Set("WWW-Authenticate", "Bearer error=\"invalid_token\"")
writeError(w, http.StatusUnauthorized, "invalid API key") writeError(w, http.StatusUnauthorized, "invalid API key")
return return
@ -119,6 +124,10 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler {
ns = strings.TrimSpace(ns) ns = strings.TrimSpace(ns)
} }
if ns == "" { if ns == "" {
if isPublic {
next.ServeHTTP(w, r)
return
}
w.Header().Set("WWW-Authenticate", "Bearer error=\"invalid_token\"") w.Header().Set("WWW-Authenticate", "Bearer error=\"invalid_token\"")
writeError(w, http.StatusUnauthorized, "invalid API key") writeError(w, http.StatusUnauthorized, "invalid API key")
return return
@ -184,6 +193,11 @@ func isPublicPath(p string) bool {
return true return true
} }
// Serverless invocation is public (authorization is handled within the invoker)
if strings.HasPrefix(p, "/v1/invoke/") || (strings.HasPrefix(p, "/v1/functions/") && strings.HasSuffix(p, "/invoke")) {
return true
}
switch p { switch p {
case "/health", "/v1/health", "/status", "/v1/status", "/v1/auth/jwks", "/.well-known/jwks.json", "/v1/version", "/v1/auth/login", "/v1/auth/challenge", "/v1/auth/verify", "/v1/auth/register", "/v1/auth/refresh", "/v1/auth/logout", "/v1/auth/api-key", "/v1/auth/simple-key", "/v1/network/status", "/v1/network/peers": case "/health", "/v1/health", "/status", "/v1/status", "/v1/auth/jwks", "/.well-known/jwks.json", "/v1/version", "/v1/auth/login", "/v1/auth/challenge", "/v1/auth/verify", "/v1/auth/register", "/v1/auth/refresh", "/v1/auth/logout", "/v1/auth/api-key", "/v1/auth/simple-key", "/v1/network/status", "/v1/network/peers":
return true return true
@ -325,6 +339,9 @@ func requiresNamespaceOwnership(p string) bool {
if strings.HasPrefix(p, "/v1/proxy/") { if strings.HasPrefix(p, "/v1/proxy/") {
return true return true
} }
if strings.HasPrefix(p, "/v1/functions") {
return true
}
return false return false
} }

View File

@ -214,6 +214,23 @@ func (h *ServerlessHandlers) deployFunction(w http.ResponseWriter, r *http.Reque
def.Namespace = r.FormValue("namespace") def.Namespace = r.FormValue("namespace")
} }
// Get other configuration fields from form
if v := r.FormValue("is_public"); v != "" {
def.IsPublic, _ = strconv.ParseBool(v)
}
if v := r.FormValue("memory_limit_mb"); v != "" {
def.MemoryLimitMB, _ = strconv.Atoi(v)
}
if v := r.FormValue("timeout_seconds"); v != "" {
def.TimeoutSeconds, _ = strconv.Atoi(v)
}
if v := r.FormValue("retry_count"); v != "" {
def.RetryCount, _ = strconv.Atoi(v)
}
if v := r.FormValue("retry_delay_seconds"); v != "" {
def.RetryDelaySeconds, _ = strconv.Atoi(v)
}
// Get WASM file // Get WASM file
file, _, err := r.FormFile("wasm") file, _, err := r.FormFile("wasm")
if err != nil { if err != nil {
@ -269,7 +286,8 @@ func (h *ServerlessHandlers) deployFunction(w http.ResponseWriter, r *http.Reque
ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second) ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second)
defer cancel() defer cancel()
if err := h.registry.Register(ctx, &def, wasmBytes); err != nil { oldFn, err := h.registry.Register(ctx, &def, wasmBytes)
if err != nil {
h.logger.Error("Failed to deploy function", h.logger.Error("Failed to deploy function",
zap.String("name", def.Name), zap.String("name", def.Name),
zap.Error(err), zap.Error(err),
@ -278,6 +296,15 @@ func (h *ServerlessHandlers) deployFunction(w http.ResponseWriter, r *http.Reque
return return
} }
// Invalidate cache for the old version to ensure the new one is loaded
if oldFn != nil {
h.invoker.InvalidateCache(oldFn.WASMCID)
h.logger.Debug("Invalidated function cache",
zap.String("name", def.Name),
zap.String("old_wasm_cid", oldFn.WASMCID),
)
}
h.logger.Info("Function deployed", h.logger.Info("Function deployed",
zap.String("name", def.Name), zap.String("name", def.Name),
zap.String("namespace", def.Namespace), zap.String("namespace", def.Namespace),
@ -410,6 +437,8 @@ func (h *ServerlessHandlers) invokeFunction(w http.ResponseWriter, r *http.Reque
statusCode = http.StatusNotFound statusCode = http.StatusNotFound
} else if serverless.IsResourceExhausted(err) { } else if serverless.IsResourceExhausted(err) {
statusCode = http.StatusTooManyRequests statusCode = http.StatusTooManyRequests
} else if serverless.IsUnauthorized(err) {
statusCode = http.StatusUnauthorized
} }
writeJSON(w, statusCode, map[string]interface{}{ writeJSON(w, statusCode, map[string]interface{}{
@ -565,27 +594,59 @@ func (h *ServerlessHandlers) listVersions(w http.ResponseWriter, r *http.Request
// getFunctionLogs handles GET /v1/functions/{name}/logs // getFunctionLogs handles GET /v1/functions/{name}/logs
func (h *ServerlessHandlers) getFunctionLogs(w http.ResponseWriter, r *http.Request, name string) { func (h *ServerlessHandlers) getFunctionLogs(w http.ResponseWriter, r *http.Request, name string) {
// TODO: Implement log retrieval from function_logs table namespace := r.URL.Query().Get("namespace")
if namespace == "" {
namespace = h.getNamespaceFromRequest(r)
}
if namespace == "" {
writeError(w, http.StatusBadRequest, "namespace required")
return
}
limit := 100
if lStr := r.URL.Query().Get("limit"); lStr != "" {
if l, err := strconv.Atoi(lStr); err == nil {
limit = l
}
}
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
logs, err := h.registry.GetLogs(ctx, namespace, name, limit)
if err != nil {
h.logger.Error("Failed to get function logs",
zap.String("name", name),
zap.String("namespace", namespace),
zap.Error(err),
)
writeError(w, http.StatusInternalServerError, "Failed to get logs")
return
}
writeJSON(w, http.StatusOK, map[string]interface{}{ writeJSON(w, http.StatusOK, map[string]interface{}{
"logs": []interface{}{}, "name": name,
"message": "Log retrieval not yet implemented", "namespace": namespace,
"logs": logs,
"count": len(logs),
}) })
} }
// getNamespaceFromRequest extracts namespace from JWT or query param // getNamespaceFromRequest extracts namespace from JWT or query param
func (h *ServerlessHandlers) getNamespaceFromRequest(r *http.Request) string { func (h *ServerlessHandlers) getNamespaceFromRequest(r *http.Request) string {
// Try query param first // Try context first (set by auth middleware) - most secure
if ns := r.URL.Query().Get("namespace"); ns != "" {
return ns
}
// Try context (set by auth middleware)
if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil { if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil {
if ns, ok := v.(string); ok && ns != "" { if ns, ok := v.(string); ok && ns != "" {
return ns return ns
} }
} }
// Try query param as fallback (e.g. for public access or admin)
if ns := r.URL.Query().Get("namespace"); ns != "" {
return ns
}
// Try header as fallback // Try header as fallback
if ns := r.Header.Get("X-Namespace"); ns != "" { if ns := r.Header.Get("X-Namespace"); ns != "" {
return ns return ns

View File

@ -65,6 +65,7 @@ type InvocationRecord struct {
Status InvocationStatus `json:"status"` Status InvocationStatus `json:"status"`
ErrorMessage string `json:"error_message,omitempty"` ErrorMessage string `json:"error_message,omitempty"`
MemoryUsedMB float64 `json:"memory_used_mb"` MemoryUsedMB float64 `json:"memory_used_mb"`
Logs []LogEntry `json:"logs,omitempty"`
} }
// RateLimiter checks if a request should be rate limited. // RateLimiter checks if a request should be rate limited.
@ -470,6 +471,11 @@ func (e *Engine) logInvocation(ctx context.Context, fn *Function, invCtx *Invoca
record.ErrorMessage = err.Error() record.ErrorMessage = err.Error()
} }
// Collect logs from host services if supported
if hf, ok := e.hostServices.(interface{ GetLogs() []LogEntry }); ok {
record.Logs = hf.GetLogs()
}
if logErr := e.invocationLogger.Log(ctx, record); logErr != nil { if logErr := e.invocationLogger.Log(ctx, record); logErr != nil {
e.logger.Warn("Failed to log invocation", zap.Error(logErr)) e.logger.Warn("Failed to log invocation", zap.Error(logErr))
} }
@ -489,6 +495,7 @@ func (e *Engine) registerHostModule(ctx context.Context) error {
NewFunctionBuilder().WithFunc(e.hDBExecute).Export("db_execute"). NewFunctionBuilder().WithFunc(e.hDBExecute).Export("db_execute").
NewFunctionBuilder().WithFunc(e.hCacheGet).Export("cache_get"). NewFunctionBuilder().WithFunc(e.hCacheGet).Export("cache_get").
NewFunctionBuilder().WithFunc(e.hCacheSet).Export("cache_set"). NewFunctionBuilder().WithFunc(e.hCacheSet).Export("cache_set").
NewFunctionBuilder().WithFunc(e.hHTTPFetch).Export("http_fetch").
NewFunctionBuilder().WithFunc(e.hLogInfo).Export("log_info"). NewFunctionBuilder().WithFunc(e.hLogInfo).Export("log_info").
NewFunctionBuilder().WithFunc(e.hLogError).Export("log_error"). NewFunctionBuilder().WithFunc(e.hLogError).Export("log_error").
Instantiate(ctx) Instantiate(ctx)
@ -606,6 +613,39 @@ func (e *Engine) hCacheSet(ctx context.Context, mod api.Module, keyPtr, keyLen,
_ = e.hostServices.CacheSet(ctx, string(key), val, ttl) _ = e.hostServices.CacheSet(ctx, string(key), val, ttl)
} }
func (e *Engine) hHTTPFetch(ctx context.Context, mod api.Module, methodPtr, methodLen, urlPtr, urlLen, headersPtr, headersLen, bodyPtr, bodyLen uint32) uint64 {
method, ok := mod.Memory().Read(methodPtr, methodLen)
if !ok {
return 0
}
u, ok := mod.Memory().Read(urlPtr, urlLen)
if !ok {
return 0
}
var headers map[string]string
if headersLen > 0 {
headersData, ok := mod.Memory().Read(headersPtr, headersLen)
if !ok {
return 0
}
if err := json.Unmarshal(headersData, &headers); err != nil {
e.logger.Error("failed to unmarshal http_fetch headers", zap.Error(err))
return 0
}
}
body, ok := mod.Memory().Read(bodyPtr, bodyLen)
if !ok {
return 0
}
resp, err := e.hostServices.HTTPFetch(ctx, string(method), string(u), headers, body)
if err != nil {
e.logger.Error("host function http_fetch failed", zap.Error(err), zap.String("url", string(u)))
return 0
}
return e.writeToGuest(ctx, mod, resp)
}
func (e *Engine) hLogInfo(ctx context.Context, mod api.Module, ptr, size uint32) { func (e *Engine) hLogInfo(ctx context.Context, mod api.Module, ptr, size uint32) {
msg, ok := mod.Memory().Read(ptr, size) msg, ok := mod.Memory().Read(ptr, size)
if ok { if ok {

View File

@ -39,7 +39,7 @@ func TestEngine_Execute(t *testing.T) {
TimeoutSeconds: 5, TimeoutSeconds: 5,
} }
err = registry.Register(context.Background(), fnDef, wasmBytes) _, err = registry.Register(context.Background(), fnDef, wasmBytes)
if err != nil { if err != nil {
t.Fatalf("failed to register function: %v", err) t.Fatalf("failed to register function: %v", err)
} }
@ -121,7 +121,7 @@ func TestEngine_Timeout(t *testing.T) {
fn, _ := registry.Get(context.Background(), "test", "timeout", 0) fn, _ := registry.Get(context.Background(), "test", "timeout", 0)
if fn == nil { if fn == nil {
_ = registry.Register(context.Background(), &FunctionDefinition{Name: "timeout", Namespace: "test"}, wasmBytes) _, _ = registry.Register(context.Background(), &FunctionDefinition{Name: "timeout", Namespace: "test"}, wasmBytes)
fn, _ = registry.Get(context.Background(), "test", "timeout", 0) fn, _ = registry.Get(context.Background(), "test", "timeout", 0)
} }
fn.TimeoutSeconds = 1 fn.TimeoutSeconds = 1
@ -151,7 +151,7 @@ func TestEngine_MemoryLimit(t *testing.T) {
0x0a, 0x04, 0x01, 0x02, 0x00, 0x0b, 0x0a, 0x04, 0x01, 0x02, 0x00, 0x0b,
} }
_ = registry.Register(context.Background(), &FunctionDefinition{Name: "memory", Namespace: "test", MemoryLimitMB: 1, TimeoutSeconds: 5}, wasmBytes) _, _ = registry.Register(context.Background(), &FunctionDefinition{Name: "memory", Namespace: "test", MemoryLimitMB: 1, TimeoutSeconds: 5}, wasmBytes)
fn, _ := registry.Get(context.Background(), "test", "memory", 0) fn, _ := registry.Get(context.Background(), "test", "memory", 0)
// This should pass because the minimal WASM doesn't use much memory // This should pass because the minimal WASM doesn't use much memory
@ -183,7 +183,7 @@ func TestEngine_RealWASM(t *testing.T) {
Namespace: "examples", Namespace: "examples",
TimeoutSeconds: 10, TimeoutSeconds: 10,
} }
_ = registry.Register(context.Background(), fnDef, wasmBytes) _, _ = registry.Register(context.Background(), fnDef, wasmBytes)
fn, _ := registry.Get(context.Background(), "examples", "hello", 0) fn, _ := registry.Get(context.Background(), "examples", "hello", 0)
output, err := engine.Execute(context.Background(), fn, []byte(`{"name": "Tester"}`), nil) output, err := engine.Execute(context.Background(), fn, []byte(`{"name": "Tester"}`), nil)

View File

@ -194,6 +194,11 @@ func IsNotFound(err error) bool {
errors.Is(err, ErrWSClientNotFound) errors.Is(err, ErrWSClientNotFound)
} }
// IsUnauthorized checks if an error indicates a lack of authorization.
func IsUnauthorized(err error) bool {
return errors.Is(err, ErrUnauthorized)
}
// IsResourceExhausted checks if an error indicates resource exhaustion. // IsResourceExhausted checks if an error indicates resource exhaustion.
func IsResourceExhausted(err error) bool { func IsResourceExhausted(err error) bool {
return errors.Is(err, ErrRateLimited) || return errors.Is(err, ErrRateLimited) ||
@ -209,4 +214,3 @@ func IsServiceUnavailable(err error) bool {
errors.Is(err, ErrDatabaseUnavailable) || errors.Is(err, ErrDatabaseUnavailable) ||
errors.Is(err, ErrCacheUnavailable) errors.Is(err, ErrCacheUnavailable)
} }

View File

@ -15,9 +15,10 @@ import (
"time" "time"
"github.com/DeBrosOfficial/network/pkg/ipfs" "github.com/DeBrosOfficial/network/pkg/ipfs"
olriclib "github.com/olric-data/olric"
"github.com/DeBrosOfficial/network/pkg/pubsub" "github.com/DeBrosOfficial/network/pkg/pubsub"
"github.com/DeBrosOfficial/network/pkg/rqlite" "github.com/DeBrosOfficial/network/pkg/rqlite"
"github.com/DeBrosOfficial/network/pkg/tlsutil"
olriclib "github.com/olric-data/olric"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -76,7 +77,7 @@ func NewHostFunctions(
pubsub: pubsubAdapter, pubsub: pubsubAdapter,
wsManager: wsManager, wsManager: wsManager,
secrets: secrets, secrets: secrets,
httpClient: &http.Client{Timeout: httpTimeout}, httpClient: tlsutil.NewHTTPClient(httpTimeout),
logger: logger, logger: logger,
logs: make([]LogEntry, 0), logs: make([]LogEntry, 0),
} }
@ -328,7 +329,12 @@ func (h *HostFunctions) HTTPFetch(ctx context.Context, method, url string, heade
req, err := http.NewRequestWithContext(ctx, method, url, bodyReader) req, err := http.NewRequestWithContext(ctx, method, url, bodyReader)
if err != nil { if err != nil {
return nil, &HostFunctionError{Function: "http_fetch", Cause: fmt.Errorf("failed to create request: %w", err)} h.logger.Error("http_fetch request creation error", zap.Error(err), zap.String("url", url))
errorResp := map[string]interface{}{
"error": "failed to create request: " + err.Error(),
"status": 0,
}
return json.Marshal(errorResp)
} }
for key, value := range headers { for key, value := range headers {
@ -337,13 +343,23 @@ func (h *HostFunctions) HTTPFetch(ctx context.Context, method, url string, heade
resp, err := h.httpClient.Do(req) resp, err := h.httpClient.Do(req)
if err != nil { if err != nil {
return nil, &HostFunctionError{Function: "http_fetch", Cause: err} h.logger.Error("http_fetch transport error", zap.Error(err), zap.String("url", url))
errorResp := map[string]interface{}{
"error": err.Error(),
"status": 0, // Transport error
}
return json.Marshal(errorResp)
} }
defer resp.Body.Close() defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body) respBody, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return nil, &HostFunctionError{Function: "http_fetch", Cause: fmt.Errorf("failed to read response: %w", err)} h.logger.Error("http_fetch response read error", zap.Error(err), zap.String("url", url))
errorResp := map[string]interface{}{
"error": "failed to read response: " + err.Error(),
"status": resp.StatusCode,
}
return json.Marshal(errorResp)
} }
// Encode response with status code // Encode response with status code
@ -638,4 +654,3 @@ func (s *DBSecretsManager) decrypt(ciphertext []byte) ([]byte, error) {
nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:] nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:]
return gcm.Open(nil, nonce, ciphertext, nil) return gcm.Open(nil, nonce, ciphertext, nil)
} }

View File

@ -76,6 +76,17 @@ func (i *Invoker) Invoke(ctx context.Context, req *InvokeRequest) (*InvokeRespon
}, err }, err
} }
// Check authorization
authorized, err := i.CanInvoke(ctx, req.Namespace, req.FunctionName, req.CallerWallet)
if err != nil || !authorized {
return &InvokeResponse{
RequestID: requestID,
Status: InvocationStatusError,
Error: "unauthorized",
DurationMS: time.Since(startTime).Milliseconds(),
}, ErrUnauthorized
}
// Get environment variables // Get environment variables
envVars, err := i.getEnvVars(ctx, fn.ID) envVars, err := i.getEnvVars(ctx, fn.ID)
if err != nil { if err != nil {
@ -159,6 +170,11 @@ func (i *Invoker) InvokeByID(ctx context.Context, functionID string, input []byt
return response, nil return response, nil
} }
// InvalidateCache removes a compiled module from the engine's cache.
func (i *Invoker) InvalidateCache(wasmCID string) {
i.engine.Invalidate(wasmCID)
}
// executeWithRetry executes a function with retry logic and DLQ. // executeWithRetry executes a function with retry logic and DLQ.
func (i *Invoker) executeWithRetry(ctx context.Context, fn *Function, input []byte, invCtx *InvocationContext) ([]byte, int, error) { func (i *Invoker) executeWithRetry(ctx context.Context, fn *Function, input []byte, invCtx *InvocationContext) ([]byte, int, error) {
var lastErr error var lastErr error
@ -434,4 +450,3 @@ func (i *Invoker) ValidateInput(input []byte, maxSize int) error {
} }
return nil return nil
} }

View File

@ -28,11 +28,12 @@ func NewMockRegistry() *MockRegistry {
} }
} }
func (m *MockRegistry) Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) error { func (m *MockRegistry) Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) (*Function, error) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
id := fn.Namespace + "/" + fn.Name id := fn.Namespace + "/" + fn.Name
wasmCID := "cid-" + id wasmCID := "cid-" + id
oldFn := m.functions[id]
m.functions[id] = &Function{ m.functions[id] = &Function{
ID: id, ID: id,
Name: fn.Name, Name: fn.Name,
@ -40,10 +41,13 @@ func (m *MockRegistry) Register(ctx context.Context, fn *FunctionDefinition, was
WASMCID: wasmCID, WASMCID: wasmCID,
MemoryLimitMB: fn.MemoryLimitMB, MemoryLimitMB: fn.MemoryLimitMB,
TimeoutSeconds: fn.TimeoutSeconds, TimeoutSeconds: fn.TimeoutSeconds,
IsPublic: fn.IsPublic,
RetryCount: fn.RetryCount,
RetryDelaySeconds: fn.RetryDelaySeconds,
Status: FunctionStatusActive, Status: FunctionStatusActive,
} }
m.wasm[wasmCID] = wasmBytes m.wasm[wasmCID] = wasmBytes
return nil return oldFn, nil
} }
func (m *MockRegistry) Get(ctx context.Context, namespace, name string, version int) (*Function, error) { func (m *MockRegistry) Get(ctx context.Context, namespace, name string, version int) (*Function, error) {
@ -85,6 +89,10 @@ func (m *MockRegistry) GetWASMBytes(ctx context.Context, wasmCID string) ([]byte
return data, nil return data, nil
} }
func (m *MockRegistry) GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error) {
return []LogEntry{}, nil
}
// MockHostServices is a mock implementation of HostServices // MockHostServices is a mock implementation of HostServices
type MockHostServices struct { type MockHostServices struct {
mu sync.RWMutex mu sync.RWMutex

View File

@ -6,6 +6,7 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
"io" "io"
"strings"
"time" "time"
"github.com/DeBrosOfficial/network/pkg/ipfs" "github.com/DeBrosOfficial/network/pkg/ipfs"
@ -14,8 +15,9 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
// Ensure Registry implements FunctionRegistry interface. // Ensure Registry implements FunctionRegistry and InvocationLogger interfaces.
var _ FunctionRegistry = (*Registry)(nil) var _ FunctionRegistry = (*Registry)(nil)
var _ InvocationLogger = (*Registry)(nil)
// Registry manages function metadata in RQLite and bytecode in IPFS. // Registry manages function metadata in RQLite and bytecode in IPFS.
// It implements the FunctionRegistry interface. // It implements the FunctionRegistry interface.
@ -43,35 +45,34 @@ func NewRegistry(db rqlite.Client, ipfsClient ipfs.IPFSClient, cfg RegistryConfi
} }
} }
// Register deploys a new function or creates a new version. // Register deploys a new function or updates an existing one.
func (r *Registry) Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) error { func (r *Registry) Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) (*Function, error) {
if fn == nil { if fn == nil {
return &ValidationError{Field: "definition", Message: "cannot be nil"} return nil, &ValidationError{Field: "definition", Message: "cannot be nil"}
} }
fn.Name = strings.TrimSpace(fn.Name)
fn.Namespace = strings.TrimSpace(fn.Namespace)
if fn.Name == "" { if fn.Name == "" {
return &ValidationError{Field: "name", Message: "cannot be empty"} return nil, &ValidationError{Field: "name", Message: "cannot be empty"}
} }
if fn.Namespace == "" { if fn.Namespace == "" {
return &ValidationError{Field: "namespace", Message: "cannot be empty"} return nil, &ValidationError{Field: "namespace", Message: "cannot be empty"}
} }
if len(wasmBytes) == 0 { if len(wasmBytes) == 0 {
return &ValidationError{Field: "wasmBytes", Message: "cannot be empty"} return nil, &ValidationError{Field: "wasmBytes", Message: "cannot be empty"}
}
// Check if function already exists (regardless of status) to get old metadata for invalidation
oldFn, err := r.getByNameInternal(ctx, fn.Namespace, fn.Name)
if err != nil && err != ErrFunctionNotFound {
return nil, &DeployError{FunctionName: fn.Name, Cause: err}
} }
// Upload WASM to IPFS // Upload WASM to IPFS
wasmCID, err := r.uploadWASM(ctx, wasmBytes, fn.Name) wasmCID, err := r.uploadWASM(ctx, wasmBytes, fn.Name)
if err != nil { if err != nil {
return &DeployError{FunctionName: fn.Name, Cause: err} return nil, &DeployError{FunctionName: fn.Name, Cause: err}
}
// Determine version (auto-increment if not specified)
version := fn.Version
if version == 0 {
latestVersion, err := r.getLatestVersion(ctx, fn.Namespace, fn.Name)
if err != nil && err != ErrFunctionNotFound {
return &DeployError{FunctionName: fn.Name, Cause: err}
}
version = latestVersion + 1
} }
// Apply defaults // Apply defaults
@ -88,48 +89,59 @@ func (r *Registry) Register(ctx context.Context, fn *FunctionDefinition, wasmByt
retryDelay = 5 retryDelay = 5
} }
// Generate ID now := time.Now()
id := uuid.New().String() id := uuid.New().String()
version := 1
// Insert function record if oldFn != nil {
// Use existing ID and increment version
id = oldFn.ID
version = oldFn.Version + 1
}
// Use INSERT OR REPLACE to ensure we never hit UNIQUE constraint failures on (namespace, name).
// This handles both new registrations and overwriting existing (even inactive) functions.
query := ` query := `
INSERT INTO functions ( INSERT OR REPLACE INTO functions (
id, name, namespace, version, wasm_cid, id, name, namespace, version, wasm_cid,
memory_limit_mb, timeout_seconds, is_public, memory_limit_mb, timeout_seconds, is_public,
retry_count, retry_delay_seconds, dlq_topic, retry_count, retry_delay_seconds, dlq_topic,
status, created_at, updated_at, created_by status, created_at, updated_at, created_by
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
` `
now := time.Now()
_, err = r.db.Exec(ctx, query, _, err = r.db.Exec(ctx, query,
id, fn.Name, fn.Namespace, version, wasmCID, id, fn.Name, fn.Namespace, version, wasmCID,
memoryLimit, timeout, fn.IsPublic, memoryLimit, timeout, fn.IsPublic,
fn.RetryCount, retryDelay, fn.DLQTopic, fn.RetryCount, retryDelay, fn.DLQTopic,
string(FunctionStatusActive), now, now, fn.Namespace, // created_by = namespace for now string(FunctionStatusActive), now, now, fn.Namespace,
) )
if err != nil { if err != nil {
return &DeployError{FunctionName: fn.Name, Cause: fmt.Errorf("failed to insert function: %w", err)} return nil, &DeployError{FunctionName: fn.Name, Cause: fmt.Errorf("failed to register function: %w", err)}
} }
// Insert environment variables // Save environment variables
if err := r.saveEnvVars(ctx, id, fn.EnvVars); err != nil { if err := r.saveEnvVars(ctx, id, fn.EnvVars); err != nil {
return &DeployError{FunctionName: fn.Name, Cause: err} return nil, &DeployError{FunctionName: fn.Name, Cause: err}
} }
r.logger.Info("Function registered", r.logger.Info("Function registered",
zap.String("id", id), zap.String("id", id),
zap.String("name", fn.Name), zap.String("name", fn.Name),
zap.String("namespace", fn.Namespace), zap.String("namespace", fn.Namespace),
zap.Int("version", version),
zap.String("wasm_cid", wasmCID), zap.String("wasm_cid", wasmCID),
zap.Int("version", version),
zap.Bool("updated", oldFn != nil),
) )
return nil return oldFn, nil
} }
// Get retrieves a function by name and optional version. // Get retrieves a function by name and optional version.
// If version is 0, returns the latest version. // If version is 0, returns the latest version.
func (r *Registry) Get(ctx context.Context, namespace, name string, version int) (*Function, error) { func (r *Registry) Get(ctx context.Context, namespace, name string, version int) (*Function, error) {
namespace = strings.TrimSpace(namespace)
name = strings.TrimSpace(name)
var query string var query string
var args []interface{} var args []interface{}
@ -208,6 +220,9 @@ func (r *Registry) List(ctx context.Context, namespace string) ([]*Function, err
// Delete removes a function. If version is 0, removes all versions. // Delete removes a function. If version is 0, removes all versions.
func (r *Registry) Delete(ctx context.Context, namespace, name string, version int) error { func (r *Registry) Delete(ctx context.Context, namespace, name string, version int) error {
namespace = strings.TrimSpace(namespace)
name = strings.TrimSpace(name)
var query string var query string
var args []interface{} var args []interface{}
@ -327,6 +342,88 @@ func (r *Registry) ListVersions(ctx context.Context, namespace, name string) ([]
return functions, nil return functions, nil
} }
// Log records a function invocation and its logs to the database.
func (r *Registry) Log(ctx context.Context, inv *InvocationRecord) error {
if inv == nil {
return nil
}
// Insert invocation record
invQuery := `
INSERT INTO function_invocations (
id, function_id, request_id, trigger_type, caller_wallet,
input_size, output_size, started_at, completed_at,
duration_ms, status, error_message, memory_used_mb
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
_, err := r.db.Exec(ctx, invQuery,
inv.ID, inv.FunctionID, inv.RequestID, string(inv.TriggerType), inv.CallerWallet,
inv.InputSize, inv.OutputSize, inv.StartedAt, inv.CompletedAt,
inv.DurationMS, string(inv.Status), inv.ErrorMessage, inv.MemoryUsedMB,
)
if err != nil {
return fmt.Errorf("failed to insert invocation record: %w", err)
}
// Insert logs if any
if len(inv.Logs) > 0 {
for _, entry := range inv.Logs {
logID := uuid.New().String()
logQuery := `
INSERT INTO function_logs (
id, function_id, invocation_id, level, message, timestamp
) VALUES (?, ?, ?, ?, ?, ?)
`
_, err := r.db.Exec(ctx, logQuery,
logID, inv.FunctionID, inv.ID, entry.Level, entry.Message, entry.Timestamp,
)
if err != nil {
r.logger.Warn("Failed to insert function log", zap.Error(err))
// Continue with other logs
}
}
}
return nil
}
// GetLogs retrieves logs for a function.
func (r *Registry) GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error) {
if limit <= 0 {
limit = 100
}
query := `
SELECT l.level, l.message, l.timestamp
FROM function_logs l
JOIN functions f ON l.function_id = f.id
WHERE f.namespace = ? AND f.name = ?
ORDER BY l.timestamp DESC
LIMIT ?
`
var results []struct {
Level string `db:"level"`
Message string `db:"message"`
Timestamp time.Time `db:"timestamp"`
}
if err := r.db.Query(ctx, &results, query, namespace, name, limit); err != nil {
return nil, fmt.Errorf("failed to query logs: %w", err)
}
logs := make([]LogEntry, len(results))
for i, res := range results {
logs[i] = LogEntry{
Level: res.Level,
Message: res.Message,
Timestamp: res.Timestamp,
}
}
return logs, nil
}
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// Private helpers // Private helpers
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -362,8 +459,42 @@ func (r *Registry) getLatestVersion(ctx context.Context, namespace, name string)
return int(maxVersion.Int64), nil return int(maxVersion.Int64), nil
} }
// getByNameInternal retrieves a function by name regardless of status.
func (r *Registry) getByNameInternal(ctx context.Context, namespace, name string) (*Function, error) {
namespace = strings.TrimSpace(namespace)
name = strings.TrimSpace(name)
query := `
SELECT id, name, namespace, version, wasm_cid, source_cid,
memory_limit_mb, timeout_seconds, is_public,
retry_count, retry_delay_seconds, dlq_topic,
status, created_at, updated_at, created_by
FROM functions
WHERE namespace = ? AND name = ?
ORDER BY version DESC
LIMIT 1
`
var functions []functionRow
if err := r.db.Query(ctx, &functions, query, namespace, name); err != nil {
return nil, fmt.Errorf("failed to query function: %w", err)
}
if len(functions) == 0 {
return nil, ErrFunctionNotFound
}
return r.rowToFunction(&functions[0]), nil
}
// saveEnvVars saves environment variables for a function. // saveEnvVars saves environment variables for a function.
func (r *Registry) saveEnvVars(ctx context.Context, functionID string, envVars map[string]string) error { func (r *Registry) saveEnvVars(ctx context.Context, functionID string, envVars map[string]string) error {
// Clear existing env vars first
deleteQuery := `DELETE FROM function_env_vars WHERE function_id = ?`
if _, err := r.db.Exec(ctx, deleteQuery, functionID); err != nil {
return fmt.Errorf("failed to clear existing env vars: %w", err)
}
if len(envVars) == 0 { if len(envVars) == 0 {
return nil return nil
} }
@ -428,4 +559,3 @@ type envVarRow struct {
Key string `db:"key"` Key string `db:"key"`
Value string `db:"value"` Value string `db:"value"`
} }

View File

@ -22,7 +22,7 @@ func TestRegistry_RegisterAndGet(t *testing.T) {
} }
wasmBytes := []byte("mock wasm") wasmBytes := []byte("mock wasm")
err := registry.Register(ctx, fnDef, wasmBytes) _, err := registry.Register(ctx, fnDef, wasmBytes)
if err != nil { if err != nil {
t.Fatalf("Register failed: %v", err) t.Fatalf("Register failed: %v", err)
} }
@ -38,4 +38,3 @@ func TestRegistry_RegisterAndGet(t *testing.T) {
t.Errorf("expected 'mock wasm', got %q", string(wasm)) t.Errorf("expected 'mock wasm', got %q", string(wasm))
} }
} }

View File

@ -68,7 +68,8 @@ const (
// Responsible for CRUD operations on function definitions. // Responsible for CRUD operations on function definitions.
type FunctionRegistry interface { type FunctionRegistry interface {
// Register deploys a new function or updates an existing one. // Register deploys a new function or updates an existing one.
Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) error // Returns the old function definition if it was updated, or nil if it was a new registration.
Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) (*Function, error)
// Get retrieves a function by name and optional version. // Get retrieves a function by name and optional version.
// If version is 0, returns the latest version. // If version is 0, returns the latest version.
@ -82,6 +83,9 @@ type FunctionRegistry interface {
// GetWASMBytes retrieves the compiled WASM bytecode for a function. // GetWASMBytes retrieves the compiled WASM bytecode for a function.
GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, error) GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, error)
// GetLogs retrieves logs for a function.
GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error)
} }
// FunctionExecutor handles the actual execution of WASM functions. // FunctionExecutor handles the actual execution of WASM functions.