orama/core/pkg/serverless/invoke.go
anonpenguin23 17b06d38e4 fix(gateway,serverless): libp2p mesh peer-port + system-trigger auth bypass
Two serious bugs found via cross-node behavior observation:

1. libp2p peer-discovery published wrong port
   PeerDiscovery's multiaddr was using the gateway's HTTP API port (e.g.
   10004), not the actual libp2p TCP port. Remote gateways dialed that
   port, hit the HTTP server, received 400, and failed the libp2p
   multistream handshake ("message did not have trailing newline").
   Result: cluster-wide cross-node libp2p mesh had 0 connected peers
   and cross-node pubsub silently dropped 100% of messages.

   The libp2p port is OS-assigned at startup (client.go uses
   /ip4/0.0.0.0/tcp/0). It's not anywhere in cfg — it's only on
   host.Addrs(). Fix: drop the listenPort field from PeerDiscovery
   entirely and derive the port live from host.Addrs() via
   extractLibp2pTCPPort. WG IP still comes from getWireGuardIP
   (libp2p filters its own enumeration so WG IPs don't appear in
   host.Addrs(), but the listener is bound 0.0.0.0 so the port is
   reachable on the WG interface).

2. System triggers silently blocked by CanInvoke (#264)
   Cron, pubsub, database, timer, and job triggers all fire from
   gateway-internal state with no caller identity. Invoke() ran every
   request through CanInvoke(callerWallet) which returned false for
   the empty wallet — every fire returned ErrUnauthorized. Reported as
   a cron firing every minute with "unauthorized" for 19+ hours.

   Auth boundary for system triggers belongs at REGISTRATION time
   (POST /v1/functions/{name}/triggers, deploy-time auto-register
   from function.yaml). Skip the per-invocation check for system
   trigger types; user-driven triggers (HTTP, WebSocket) still gate
   on caller identity as before.

Tests:
- gateway/peer_discovery_test.go covers extractLibp2pTCPPort.
- serverless/invoke_system_trigger_test.go covers the bypass and the
  user-trigger gate.

VERSION bumped to 0.122.25.
2026-05-16 15:43:18 +03:00

522 lines
17 KiB
Go

package serverless
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
)
// FunctionInvoker is the minimal interface needed to invoke a function by
// name. It exists so packages downstream of `serverless` (notably
// `serverless/hostfunctions`) can hold a reference to the concrete
// `*Invoker` without creating an import cycle.
//
// Implemented by `*Invoker`.
type FunctionInvoker interface {
Invoke(ctx context.Context, req *InvokeRequest) (*InvokeResponse, error)
}
// Invoker handles function invocation with retry logic and DLQ support.
// It wraps the Engine to provide higher-level invocation semantics.
type Invoker struct {
engine *Engine
registry FunctionRegistry
hostServices HostServices
logger *zap.Logger
}
// NewInvoker creates a new function invoker.
func NewInvoker(engine *Engine, registry FunctionRegistry, hostServices HostServices, logger *zap.Logger) *Invoker {
return &Invoker{
engine: engine,
registry: registry,
hostServices: hostServices,
logger: logger,
}
}
// InvokeRequest contains the parameters for invoking a function.
type InvokeRequest struct {
Namespace string `json:"namespace"`
FunctionName string `json:"function_name"`
Version int `json:"version,omitempty"` // 0 = latest
Input []byte `json:"input"`
TriggerType TriggerType `json:"trigger_type"`
CallerWallet string `json:"caller_wallet,omitempty"`
// CallerIP is the source IP of the request, used by the multi-tier
// rate limiter as a fallback bucket for anonymous (no-wallet) callers.
CallerIP string `json:"caller_ip,omitempty"`
WSClientID string `json:"ws_client_id,omitempty"`
// CallerClaims holds custom JWT claims to expose via get_caller_claim.
CallerClaims map[string]string `json:"caller_claims,omitempty"`
// CallerJWTSubject carries the JWT `sub` claim explicitly so the
// engine can populate InvocationContext.CallerJWTSubject — fixes the
// bug-#215 case where API-key precedence buries the JWT identity.
CallerJWTSubject string `json:"caller_jwt_subject,omitempty"`
}
// InvokeResponse contains the result of a function invocation.
type InvokeResponse struct {
RequestID string `json:"request_id"`
Output []byte `json:"output,omitempty"`
Status InvocationStatus `json:"status"`
Error string `json:"error,omitempty"`
DurationMS int64 `json:"duration_ms"`
Retries int `json:"retries,omitempty"`
}
// Invoke executes a function with automatic retry logic.
func (i *Invoker) Invoke(ctx context.Context, req *InvokeRequest) (*InvokeResponse, error) {
if req == nil {
return nil, &ValidationError{Field: "request", Message: "cannot be nil"}
}
if req.FunctionName == "" {
return nil, &ValidationError{Field: "function_name", Message: "cannot be empty"}
}
if req.Namespace == "" {
return nil, &ValidationError{Field: "namespace", Message: "cannot be empty"}
}
requestID := uuid.New().String()
startTime := time.Now()
// Get function from registry
fn, err := i.registry.Get(ctx, req.Namespace, req.FunctionName, req.Version)
if err != nil {
return &InvokeResponse{
RequestID: requestID,
Status: InvocationStatusError,
Error: err.Error(),
DurationMS: time.Since(startTime).Milliseconds(),
}, err
}
// Check authorization — ONLY for user-driven trigger types. System
// triggers (cron, pubsub, database, timer, job) fire from rows the
// gateway itself persisted on behalf of an already-authenticated
// operator; there is no per-invocation caller identity to check, and
// requiring one is a 100% blocking no-op safety check (see bugboard
// #264). The auth boundary for system triggers is at REGISTRATION
// time (HTTP `POST /v1/functions/{name}/triggers`, or deploy-time
// auto-register from function.yaml), not at firing time.
if !isSystemTrigger(req.TriggerType) {
authorized, err := i.CanInvoke(ctx, req.Namespace, req.FunctionName, req.CallerWallet)
if err != nil || !authorized {
return &InvokeResponse{
RequestID: requestID,
Status: InvocationStatusError,
Error: "unauthorized",
DurationMS: time.Since(startTime).Milliseconds(),
}, ErrUnauthorized
}
}
// Get environment variables
envVars, err := i.getEnvVars(ctx, fn.ID)
if err != nil {
i.logger.Warn("Failed to get env vars", zap.Error(err))
envVars = make(map[string]string)
}
// Build invocation context
invCtx := &InvocationContext{
RequestID: requestID,
FunctionID: fn.ID,
FunctionName: fn.Name,
Namespace: fn.Namespace,
CallerWallet: req.CallerWallet,
CallerIP: req.CallerIP,
TriggerType: req.TriggerType,
WSClientID: req.WSClientID,
EnvVars: envVars,
CallerClaims: req.CallerClaims,
CallerJWTSubject: req.CallerJWTSubject,
}
// Execute with retry logic
output, retries, err := i.executeWithRetry(ctx, fn, req.Input, invCtx)
response := &InvokeResponse{
RequestID: requestID,
Output: output,
DurationMS: time.Since(startTime).Milliseconds(),
Retries: retries,
}
if err != nil {
response.Status = InvocationStatusError
response.Error = err.Error()
// Check if it's a timeout
if ctx.Err() == context.DeadlineExceeded {
response.Status = InvocationStatusTimeout
}
return response, err
}
response.Status = InvocationStatusSuccess
return response, nil
}
// InvokeByID invokes a function by its ID.
func (i *Invoker) InvokeByID(ctx context.Context, functionID string, input []byte, invCtx *InvocationContext) (*InvokeResponse, error) {
// Get function from registry by ID
fn, err := i.getByID(ctx, functionID)
if err != nil {
return nil, err
}
if invCtx == nil {
invCtx = &InvocationContext{
RequestID: uuid.New().String(),
FunctionID: fn.ID,
FunctionName: fn.Name,
Namespace: fn.Namespace,
TriggerType: TriggerTypeHTTP,
}
}
startTime := time.Now()
output, retries, err := i.executeWithRetry(ctx, fn, input, invCtx)
response := &InvokeResponse{
RequestID: invCtx.RequestID,
Output: output,
DurationMS: time.Since(startTime).Milliseconds(),
Retries: retries,
}
if err != nil {
response.Status = InvocationStatusError
response.Error = err.Error()
return response, err
}
response.Status = InvocationStatusSuccess
return response, nil
}
// InvalidateCache removes a compiled module from the engine's cache.
func (i *Invoker) InvalidateCache(wasmCID string) {
i.engine.Invalidate(wasmCID)
}
// executeWithRetry executes a function with retry logic and DLQ.
func (i *Invoker) executeWithRetry(ctx context.Context, fn *Function, input []byte, invCtx *InvocationContext) ([]byte, int, error) {
var lastErr error
var output []byte
maxAttempts := fn.RetryCount + 1 // Initial attempt + retries
if maxAttempts < 1 {
maxAttempts = 1
}
for attempt := 0; attempt < maxAttempts; attempt++ {
// Check if context is cancelled
if ctx.Err() != nil {
return nil, attempt, ctx.Err()
}
// Execute the function
output, lastErr = i.engine.Execute(ctx, fn, input, invCtx)
if lastErr == nil {
return output, attempt, nil
}
i.logger.Warn("Function execution failed",
zap.String("function", fn.Name),
zap.String("request_id", invCtx.RequestID),
zap.Int("attempt", attempt+1),
zap.Int("max_attempts", maxAttempts),
zap.Error(lastErr),
)
// Don't retry on certain errors
if !i.isRetryable(lastErr) {
break
}
// Don't wait after the last attempt
if attempt < maxAttempts-1 {
delay := i.calculateBackoff(fn.RetryDelaySeconds, attempt)
select {
case <-ctx.Done():
return nil, attempt + 1, ctx.Err()
case <-time.After(delay):
// Continue to next attempt
}
}
}
// All retries exhausted - send to DLQ if configured
if fn.DLQTopic != "" {
i.sendToDLQ(ctx, fn, input, invCtx, lastErr)
}
return nil, maxAttempts - 1, lastErr
}
// isRetryable determines if an error should trigger a retry.
func (i *Invoker) isRetryable(err error) bool {
// Don't retry validation errors or not-found errors
if IsNotFound(err) {
return false
}
// Don't retry resource exhaustion (rate limits, memory)
if IsResourceExhausted(err) {
return false
}
// Retry service unavailable errors
if IsServiceUnavailable(err) {
return true
}
// Retry execution errors (could be transient)
var execErr *ExecutionError
if errors.As(err, &execErr) {
return true
}
// Default to retryable for unknown errors
return true
}
// calculateBackoff calculates the delay before the next retry attempt.
// Uses exponential backoff with jitter.
func (i *Invoker) calculateBackoff(baseDelaySeconds, attempt int) time.Duration {
if baseDelaySeconds <= 0 {
baseDelaySeconds = 5
}
// Exponential backoff: delay * 2^attempt
delay := time.Duration(baseDelaySeconds) * time.Second
for j := 0; j < attempt; j++ {
delay *= 2
if delay > 5*time.Minute {
delay = 5 * time.Minute
break
}
}
return delay
}
// sendToDLQ sends a failed invocation to the dead letter queue.
func (i *Invoker) sendToDLQ(ctx context.Context, fn *Function, input []byte, invCtx *InvocationContext, err error) {
dlqMessage := DLQMessage{
FunctionID: fn.ID,
FunctionName: fn.Name,
Namespace: fn.Namespace,
RequestID: invCtx.RequestID,
Input: input,
Error: err.Error(),
FailedAt: time.Now(),
TriggerType: invCtx.TriggerType,
CallerWallet: invCtx.CallerWallet,
}
data, marshalErr := json.Marshal(dlqMessage)
if marshalErr != nil {
i.logger.Error("Failed to marshal DLQ message",
zap.Error(marshalErr),
zap.String("function", fn.Name),
)
return
}
// Publish to DLQ topic via host services
if err := i.hostServices.PubSubPublish(ctx, fn.DLQTopic, data); err != nil {
i.logger.Error("Failed to send to DLQ",
zap.Error(err),
zap.String("function", fn.Name),
zap.String("dlq_topic", fn.DLQTopic),
)
} else {
i.logger.Info("Sent failed invocation to DLQ",
zap.String("function", fn.Name),
zap.String("dlq_topic", fn.DLQTopic),
zap.String("request_id", invCtx.RequestID),
)
}
}
// getEnvVars retrieves environment variables for a function.
func (i *Invoker) getEnvVars(ctx context.Context, functionID string) (map[string]string, error) {
// Type assert to get extended registry methods
if reg, ok := i.registry.(*Registry); ok {
return reg.GetEnvVars(ctx, functionID)
}
return nil, nil
}
// getByID retrieves a function by ID.
func (i *Invoker) getByID(ctx context.Context, functionID string) (*Function, error) {
// Type assert to get extended registry methods
if reg, ok := i.registry.(*Registry); ok {
return reg.GetByID(ctx, functionID)
}
return nil, ErrFunctionNotFound
}
// DLQMessage represents a message sent to the dead letter queue.
type DLQMessage struct {
FunctionID string `json:"function_id"`
FunctionName string `json:"function_name"`
Namespace string `json:"namespace"`
RequestID string `json:"request_id"`
Input []byte `json:"input"`
Error string `json:"error"`
FailedAt time.Time `json:"failed_at"`
TriggerType TriggerType `json:"trigger_type"`
CallerWallet string `json:"caller_wallet,omitempty"`
}
// -----------------------------------------------------------------------------
// Batch Invocation (for future use)
// -----------------------------------------------------------------------------
// BatchInvokeRequest contains parameters for batch invocation.
type BatchInvokeRequest struct {
Requests []*InvokeRequest `json:"requests"`
}
// BatchInvokeResponse contains results of batch invocation.
type BatchInvokeResponse struct {
Responses []*InvokeResponse `json:"responses"`
Duration time.Duration `json:"duration"`
}
// BatchInvoke executes multiple functions in parallel.
func (i *Invoker) BatchInvoke(ctx context.Context, req *BatchInvokeRequest) (*BatchInvokeResponse, error) {
if req == nil || len(req.Requests) == 0 {
return nil, &ValidationError{Field: "requests", Message: "cannot be empty"}
}
startTime := time.Now()
responses := make([]*InvokeResponse, len(req.Requests))
// For simplicity, execute sequentially for now
// TODO: Implement parallel execution with goroutines and semaphore
for idx, invReq := range req.Requests {
resp, err := i.Invoke(ctx, invReq)
if err != nil && resp == nil {
responses[idx] = &InvokeResponse{
RequestID: uuid.New().String(),
Status: InvocationStatusError,
Error: err.Error(),
}
} else {
responses[idx] = resp
}
}
return &BatchInvokeResponse{
Responses: responses,
Duration: time.Since(startTime),
}, nil
}
// -----------------------------------------------------------------------------
// Public Invocation Helpers
// -----------------------------------------------------------------------------
// CanInvoke checks if a caller is authorized to invoke a function.
//
// Authorization model:
// - Public functions (`is_public: true`): anyone can invoke, no auth needed.
// The auth middleware lets unauthenticated requests through to public
// paths.
// - Private functions: any caller that the auth middleware has already
// authenticated for THIS namespace can invoke. By the time we reach
// this function, the request has passed authMiddleware which validates
// EITHER a JWT-bearing token whose `namespace` claim matches the target
// namespace OR an API key resolved to the target namespace. So the
// non-empty `callerWallet` here is sufficient evidence of a verified
// in-namespace caller.
//
// History (bug #215 follow-up): the previous logic was a stub —
//
// return callerWallet == namespace || fn.CreatedBy == callerWallet, nil
//
// — which only allowed namespace-name-as-wallet (the API-key fallback) or
// the deploying wallet. Onboarding-style functions like `user-create`,
// where a brand-new wallet calls in to register, were rejected with 401
// because the new wallet was neither the namespace string nor the
// deployer. They worked only as a side-effect of JWT verification
// silently failing pre-#215, falling through to API-key auth, and
// callerWallet collapsing to the namespace string. Once JWT verify was
// fixed, the underlying flaw surfaced.
//
// Fine-grained per-function ACLs (group membership, roles) are deferred
// until there's a concrete tenant requirement. Today, "private" means
// "authenticated in-namespace caller required" and that's enforced
// here + at authMiddleware.
// isSystemTrigger reports whether a trigger type fires from gateway-internal
// state (a cron row, a pubsub dispatcher, a DB-change watcher, an in-process
// scheduler) rather than from an external caller request.
//
// The distinction matters for authorization:
//
// - User-driven triggers (HTTP, WebSocket) carry a real caller identity
// populated by auth middleware. CanInvoke gates them on that identity.
// - System triggers carry no caller identity by design — they were
// registered by an already-authenticated operator, stored in the
// namespace's own rqlite, and are now firing from the gateway process
// itself. Gating them on CallerWallet returns false unconditionally and
// silently blocks every fire (bugboard #264 — discovered via a cron
// trigger that fired every minute with "unauthorized" for 19+ hours).
func isSystemTrigger(t TriggerType) bool {
switch t {
case TriggerTypeCron, TriggerTypePubSub, TriggerTypeDatabase,
TriggerTypeTimer, TriggerTypeJob:
return true
}
return false
}
func (i *Invoker) CanInvoke(ctx context.Context, namespace, functionName string, callerWallet string) (bool, error) {
fn, err := i.registry.Get(ctx, namespace, functionName, 0)
if err != nil {
return false, err
}
// Public functions can be invoked by anyone (auth middleware allows
// the request through without credentials).
if fn.IsPublic {
return true, nil
}
// Private function: require an authenticated caller. The auth
// middleware has already verified the caller belongs to this
// namespace (either via JWT `namespace` claim or via API-key
// namespace lookup) before this function ever runs, so the only
// thing we need to confirm here is that the caller has SOME
// identity at all (i.e. the request wasn't anonymous).
return strings.TrimSpace(callerWallet) != "", nil
}
// GetFunctionInfo returns basic info about a function for invocation.
func (i *Invoker) GetFunctionInfo(ctx context.Context, namespace, functionName string, version int) (*Function, error) {
return i.registry.Get(ctx, namespace, functionName, version)
}
// ValidateInput performs basic input validation.
func (i *Invoker) ValidateInput(input []byte, maxSize int) error {
if maxSize > 0 && len(input) > maxSize {
return &ValidationError{
Field: "input",
Message: fmt.Sprintf("exceeds maximum size of %d bytes", maxSize),
}
}
return nil
}