diff --git a/bin/gateway b/bin/gateway new file mode 100755 index 0000000..579e5c0 Binary files /dev/null and b/bin/gateway differ diff --git a/bin/identity b/bin/identity new file mode 100755 index 0000000..387e60a Binary files /dev/null and b/bin/identity differ diff --git a/bin/orama b/bin/orama new file mode 100755 index 0000000..d022fb3 Binary files /dev/null and b/bin/orama differ diff --git a/bin/orama-node b/bin/orama-node new file mode 100755 index 0000000..462f57e Binary files /dev/null and b/bin/orama-node differ diff --git a/core/pkg/cli/functions/helpers.go b/core/pkg/cli/functions/helpers.go index f0baf84..41a2b79 100644 --- a/core/pkg/cli/functions/helpers.go +++ b/core/pkg/cli/functions/helpers.go @@ -24,6 +24,14 @@ type FunctionConfig struct { Timeout int `yaml:"timeout"` Retry RetryConfig `yaml:"retry"` Env map[string]string `yaml:"env"` + + // Persistent WebSocket settings — when WSPersistent is true, the function + // must export ws_open / ws_frame / ws_close instead of running per-frame + // stateless. See core/plans/platform/06_PERSISTENT_WS_FUNCTIONS.md. + WSPersistent bool `yaml:"ws_persistent"` + WSIdleTimeoutSec int `yaml:"ws_idle_timeout_sec"` + WSMaxFrameBytes int `yaml:"ws_max_frame_bytes"` + WSMaxInflightPerConn int `yaml:"ws_max_inflight_per_conn"` } // RetryConfig holds retry settings. @@ -198,11 +206,28 @@ func uploadWASMFunction(wasmPath string, cfg *FunctionConfig) (map[string]interf writer.WriteField("retry_count", strconv.Itoa(cfg.Retry.Count)) writer.WriteField("retry_delay_seconds", strconv.Itoa(cfg.Retry.Delay)) - // Add env vars as metadata JSON + // Build metadata JSON. The deploy handler json.Unmarshal()s this into + // FunctionDefinition first, then overlays the explicit form fields below. + // Any field that has no explicit form-field equivalent (env vars, the + // ws_* persistent settings) MUST live in this blob. + metaObj := map[string]interface{}{} if len(cfg.Env) > 0 { - metadata, _ := json.Marshal(map[string]interface{}{ - "env_vars": cfg.Env, - }) + metaObj["env_vars"] = cfg.Env + } + if cfg.WSPersistent { + metaObj["ws_persistent"] = true + } + if cfg.WSIdleTimeoutSec > 0 { + metaObj["ws_idle_timeout_sec"] = cfg.WSIdleTimeoutSec + } + if cfg.WSMaxFrameBytes > 0 { + metaObj["ws_max_frame_bytes"] = cfg.WSMaxFrameBytes + } + if cfg.WSMaxInflightPerConn > 0 { + metaObj["ws_max_inflight_per_conn"] = cfg.WSMaxInflightPerConn + } + if len(metaObj) > 0 { + metadata, _ := json.Marshal(metaObj) writer.WriteField("metadata", string(metadata)) } diff --git a/core/pkg/cli/functions/triggers.go b/core/pkg/cli/functions/triggers.go index 3b56e7f..4c4f842 100644 --- a/core/pkg/cli/functions/triggers.go +++ b/core/pkg/cli/functions/triggers.go @@ -10,30 +10,42 @@ import ( "github.com/spf13/cobra" ) -var triggerTopic string +var ( + triggerTopic string + triggerSchedule string +) // TriggersCmd is the parent command for trigger management. var TriggersCmd = &cobra.Command{ Use: "triggers", - Short: "Manage function PubSub triggers", - Long: `Add, list, and delete PubSub triggers for your serverless functions. + Short: "Manage function PubSub and cron triggers", + Long: `Add, list, and delete triggers for your serverless functions. -When a message is published to a topic, all functions with a trigger on -that topic are automatically invoked with the message as input. +PubSub: when a message is published to a topic, every function with a +matching trigger is invoked with the message as input. + +Cron: a function is invoked on a schedule (5-field crontab, or 6-field +crontab with a leading seconds column). Examples: orama function triggers add my-function --topic calls:invite + orama function triggers add my-function --schedule "0 3 * * *" + orama function triggers add my-function --schedule "*/30 * * * * *" orama function triggers list my-function orama function triggers delete my-function `, } -// TriggersAddCmd adds a PubSub trigger to a function. +// TriggersAddCmd adds a PubSub or Cron trigger to a function. var TriggersAddCmd = &cobra.Command{ Use: "add ", - Short: "Add a PubSub trigger", - Long: "Registers a PubSub trigger so the function is invoked when a message is published to the topic.", - Args: cobra.ExactArgs(1), - RunE: runTriggersAdd, + Short: "Add a PubSub or Cron trigger", + Long: `Registers a trigger that invokes the function automatically. + +Pass exactly one of --topic (PubSub) or --schedule (cron). Schedules +accept either 5-field crontab (minute hour dom month dow) or 6-field +with seconds (sec minute hour dom month dow).`, + Args: cobra.ExactArgs(1), + RunE: runTriggersAdd, } // TriggersListCmd lists triggers for a function. @@ -57,15 +69,18 @@ func init() { TriggersCmd.AddCommand(TriggersListCmd) TriggersCmd.AddCommand(TriggersDeleteCmd) - TriggersAddCmd.Flags().StringVar(&triggerTopic, "topic", "", "PubSub topic to trigger on (required)") - TriggersAddCmd.MarkFlagRequired("topic") + TriggersAddCmd.Flags().StringVar(&triggerTopic, "topic", "", "PubSub topic to trigger on") + TriggersAddCmd.Flags().StringVar(&triggerSchedule, "schedule", "", "Cron expression to trigger on (e.g. \"0 3 * * *\")") + TriggersAddCmd.MarkFlagsMutuallyExclusive("topic", "schedule") + TriggersAddCmd.MarkFlagsOneRequired("topic", "schedule") } func runTriggersAdd(cmd *cobra.Command, args []string) error { funcName := args[0] body, _ := json.Marshal(map[string]string{ - "topic": triggerTopic, + "topic": triggerTopic, + "cron_expression": triggerSchedule, }) resp, err := apiRequest("POST", "/v1/functions/"+funcName+"/triggers", bytes.NewReader(body), "application/json") @@ -88,7 +103,11 @@ func runTriggersAdd(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to parse response: %w", err) } - fmt.Printf("Trigger added: %s → %s (id: %s)\n", triggerTopic, funcName, result["trigger_id"]) + if triggerSchedule != "" { + fmt.Printf("Trigger added: cron(%s) → %s (id: %s)\n", triggerSchedule, funcName, result["trigger_id"]) + } else { + fmt.Printf("Trigger added: %s → %s (id: %s)\n", triggerTopic, funcName, result["trigger_id"]) + } return nil } diff --git a/core/pkg/gateway/dependencies.go b/core/pkg/gateway/dependencies.go index e1e7717..e1e8221 100644 --- a/core/pkg/gateway/dependencies.go +++ b/core/pkg/gateway/dependencies.go @@ -68,6 +68,12 @@ type Dependencies struct { // PubSub trigger dispatcher (used to wire into PubSubHandlers) PubSubDispatcher *triggers.PubSubDispatcher + // Cron trigger store + scheduler. The scheduler is started by gateway + // lifecycle code after Dependencies is constructed; Stop is called + // during shutdown. + CronTriggerStore *triggers.CronTriggerStore + CronScheduler *triggers.CronScheduler + // PersistentWSManager tracks long-lived WS function instances. // Used by the WS handler when fn.WSPersistent=true; nil = disabled. PersistentWSManager *persistent.Manager @@ -496,6 +502,11 @@ func initializeServerless(logger *logging.ColoredLogger, cfg *Config, deps *Depe // Create invoker deps.ServerlessInvoker = serverless.NewInvoker(engine, registry, hostFuncs, logger.Logger) + // Wire the invoker back into hostFuncs so the function_invoke host + // function can dispatch sub-invocations from inside a WASM function + // (e.g. rpc-router routing client RPCs to per-op handlers). + hostFuncs.SetInvoker(deps.ServerlessInvoker) + // Create PubSub trigger store and dispatcher triggerStore := triggers.NewPubSubTriggerStore(deps.ORMClient, logger.Logger) @@ -510,6 +521,19 @@ func initializeServerless(logger *logging.ColoredLogger, cfg *Config, deps *Depe logger.Logger, ) + // Cron trigger store + scheduler. The scheduler polls + // function_cron_triggers and invokes due rows via the same + // ServerlessInvoker used for PubSub triggers; the ↓ Start call wires + // the goroutine up — Stop is invoked from gateway lifecycle shutdown. + cronStore := triggers.NewCronTriggerStore(deps.ORMClient, logger.Logger) + deps.CronTriggerStore = cronStore + deps.CronScheduler = triggers.NewCronScheduler( + cronStore, + deps.ServerlessInvoker, + logger.Logger, + 30*time.Second, + ) + // Persistent WS instance manager. Cap from gateway config (TODO: surface // the knob); 5000 is a sensible default per plan 06. deps.PersistentWSManager = persistent.NewManager(5000, logger.Logger) @@ -521,6 +545,7 @@ func initializeServerless(logger *logging.ColoredLogger, cfg *Config, deps *Depe registry, deps.ServerlessWSMgr, triggerStore, + cronStore, deps.PubSubDispatcher, deps.PersistentWSManager, deps.WSBridge, diff --git a/core/pkg/gateway/gateway.go b/core/pkg/gateway/gateway.go index 8e04a53..50995db 100644 --- a/core/pkg/gateway/gateway.go +++ b/core/pkg/gateway/gateway.go @@ -96,6 +96,7 @@ type Gateway struct { serverlessHandlers *serverlesshandlers.ServerlessHandlers pubsubDispatcher *triggers.PubSubDispatcher persistentWSManager *persistent.Manager + cronScheduler *triggers.CronScheduler // Authentication service authService *auth.Service @@ -356,6 +357,11 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { if deps.PersistentWSManager != nil { gw.persistentWSManager = deps.PersistentWSManager } + if deps.CronScheduler != nil { + gw.cronScheduler = deps.CronScheduler + // Background goroutine — Stop is called from gateway.Close. + gw.cronScheduler.Start(context.Background()) + } // Push notification handlers — disabled when no provider is configured. // The handlers themselves return 503 if dispatcher/store is nil; we diff --git a/core/pkg/gateway/handlers/serverless/handlers_test.go b/core/pkg/gateway/handlers/serverless/handlers_test.go index 907f2d4..ecc9077 100644 --- a/core/pkg/gateway/handlers/serverless/handlers_test.go +++ b/core/pkg/gateway/handlers/serverless/handlers_test.go @@ -94,6 +94,7 @@ func newTestHandlers(reg serverless.FunctionRegistry) *ServerlessHandlers { reg, wsManager, nil, // triggerStore + nil, // cronStore nil, // dispatcher nil, // persistentMgr nil, // wsBridge diff --git a/core/pkg/gateway/handlers/serverless/secrets_handler_test.go b/core/pkg/gateway/handlers/serverless/secrets_handler_test.go index b086181..e3c7f8e 100644 --- a/core/pkg/gateway/handlers/serverless/secrets_handler_test.go +++ b/core/pkg/gateway/handlers/serverless/secrets_handler_test.go @@ -96,6 +96,7 @@ func newSecretsTestHandlers(sm serverless.SecretsManager) *ServerlessHandlers { newMockRegistry(), wsManager, nil, // triggerStore + nil, // cronStore nil, // dispatcher nil, // persistentMgr nil, // wsBridge diff --git a/core/pkg/gateway/handlers/serverless/trigger_handler.go b/core/pkg/gateway/handlers/serverless/trigger_handler.go index 8832866..7e6094f 100644 --- a/core/pkg/gateway/handlers/serverless/trigger_handler.go +++ b/core/pkg/gateway/handlers/serverless/trigger_handler.go @@ -10,19 +10,17 @@ import ( "go.uber.org/zap" ) -// addTriggerRequest is the request body for adding a PubSub trigger. +// addTriggerRequest is the request body for adding a PubSub or Cron trigger. +// Exactly one of `topic` or `cron_expression` must be set. type addTriggerRequest struct { - Topic string `json:"topic"` + Topic string `json:"topic"` + CronExpression string `json:"cron_expression"` } // HandleAddTrigger handles POST /v1/functions/{name}/triggers -// Adds a PubSub trigger that invokes this function when a message is published to the topic. +// Branches between PubSub (topic) and Cron (cron_expression) based on the +// request body. Both stores must be wired for their respective branches. func (h *ServerlessHandlers) HandleAddTrigger(w http.ResponseWriter, r *http.Request, functionName string) { - if h.triggerStore == nil { - writeError(w, http.StatusNotImplemented, "PubSub triggers not available") - return - } - namespace := h.getNamespaceFromRequest(r) if namespace == "" { writeError(w, http.StatusBadRequest, "namespace required") @@ -35,15 +33,18 @@ func (h *ServerlessHandlers) HandleAddTrigger(w http.ResponseWriter, r *http.Req return } - if req.Topic == "" { - writeError(w, http.StatusBadRequest, "topic required") + if req.Topic == "" && req.CronExpression == "" { + writeError(w, http.StatusBadRequest, "topic or cron_expression required") + return + } + if req.Topic != "" && req.CronExpression != "" { + writeError(w, http.StatusBadRequest, "topic and cron_expression are mutually exclusive") return } ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) defer cancel() - // Look up function to get its ID fn, err := h.registry.Get(ctx, namespace, functionName, 0) if err != nil { if serverless.IsNotFound(err) { @@ -54,6 +55,38 @@ func (h *ServerlessHandlers) HandleAddTrigger(w http.ResponseWriter, r *http.Req return } + if req.CronExpression != "" { + if h.cronStore == nil { + writeError(w, http.StatusNotImplemented, "Cron triggers not available") + return + } + triggerID, err := h.cronStore.Add(ctx, fn.ID, req.CronExpression) + if err != nil { + h.logger.Error("Failed to add Cron trigger", + zap.String("function", functionName), + zap.String("cron_expression", req.CronExpression), + zap.Error(err), + ) + writeError(w, http.StatusBadRequest, "Failed to add trigger: "+err.Error()) + return + } + h.logger.Info("Cron trigger added via API", + zap.String("function", functionName), + zap.String("cron_expression", req.CronExpression), + zap.String("trigger_id", triggerID), + ) + writeJSON(w, http.StatusCreated, map[string]interface{}{ + "trigger_id": triggerID, + "function": functionName, + "cron_expression": req.CronExpression, + }) + return + } + + if h.triggerStore == nil { + writeError(w, http.StatusNotImplemented, "PubSub triggers not available") + return + } triggerID, err := h.triggerStore.Add(ctx, fn.ID, req.Topic) if err != nil { h.logger.Error("Failed to add PubSub trigger", @@ -64,18 +97,14 @@ func (h *ServerlessHandlers) HandleAddTrigger(w http.ResponseWriter, r *http.Req writeError(w, http.StatusInternalServerError, "Failed to add trigger: "+err.Error()) return } - - // Invalidate cache for this topic if h.dispatcher != nil { h.dispatcher.InvalidateCache(ctx, namespace, req.Topic) } - h.logger.Info("PubSub trigger added via API", zap.String("function", functionName), zap.String("topic", req.Topic), zap.String("trigger_id", triggerID), ) - writeJSON(w, http.StatusCreated, map[string]interface{}{ "trigger_id": triggerID, "function": functionName, @@ -84,10 +113,12 @@ func (h *ServerlessHandlers) HandleAddTrigger(w http.ResponseWriter, r *http.Req } // HandleListTriggers handles GET /v1/functions/{name}/triggers -// Lists all PubSub triggers for a function. +// Returns the merged set of PubSub and Cron triggers for a function. +// Each row carries enough metadata for the CLI's `triggers list` to render +// it; the kind is implied by which fields are populated (Topic vs CronExpression). func (h *ServerlessHandlers) HandleListTriggers(w http.ResponseWriter, r *http.Request, functionName string) { - if h.triggerStore == nil { - writeError(w, http.StatusNotImplemented, "PubSub triggers not available") + if h.triggerStore == nil && h.cronStore == nil { + writeError(w, http.StatusNotImplemented, "Triggers not available") return } @@ -100,7 +131,6 @@ func (h *ServerlessHandlers) HandleListTriggers(w http.ResponseWriter, r *http.R ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) defer cancel() - // Look up function to get its ID fn, err := h.registry.Get(ctx, namespace, functionName, 0) if err != nil { if serverless.IsNotFound(err) { @@ -111,23 +141,53 @@ func (h *ServerlessHandlers) HandleListTriggers(w http.ResponseWriter, r *http.R return } - triggers, err := h.triggerStore.ListByFunction(ctx, fn.ID) - if err != nil { - writeError(w, http.StatusInternalServerError, "Failed to list triggers") - return + merged := []map[string]interface{}{} + if h.triggerStore != nil { + pubsubTriggers, err := h.triggerStore.ListByFunction(ctx, fn.ID) + if err != nil { + writeError(w, http.StatusInternalServerError, "Failed to list pubsub triggers") + return + } + for _, t := range pubsubTriggers { + merged = append(merged, map[string]interface{}{ + "id": t.ID, + "kind": "pubsub", + "topic": t.Topic, + "enabled": t.Enabled, + }) + } + } + if h.cronStore != nil { + cronTriggers, err := h.cronStore.ListByFunction(ctx, fn.ID) + if err != nil { + writeError(w, http.StatusInternalServerError, "Failed to list cron triggers") + return + } + for _, t := range cronTriggers { + merged = append(merged, map[string]interface{}{ + "id": t.ID, + "kind": "cron", + "cron_expression": t.CronExpression, + "next_run_at": t.NextRunAt, + "last_run_at": t.LastRunAt, + "enabled": t.Enabled, + }) + } } writeJSON(w, http.StatusOK, map[string]interface{}{ - "triggers": triggers, - "count": len(triggers), + "triggers": merged, + "count": len(merged), }) } // HandleDeleteTrigger handles DELETE /v1/functions/{name}/triggers/{triggerID} -// Removes a PubSub trigger. +// Removes either a PubSub or Cron trigger. Tries PubSub first (the more +// common case) and falls back to Cron — trigger IDs are UUIDs and can't +// collide between stores, so order is just an optimisation. func (h *ServerlessHandlers) HandleDeleteTrigger(w http.ResponseWriter, r *http.Request, functionName, triggerID string) { - if h.triggerStore == nil { - writeError(w, http.StatusNotImplemented, "PubSub triggers not available") + if h.triggerStore == nil && h.cronStore == nil { + writeError(w, http.StatusNotImplemented, "Triggers not available") return } @@ -140,7 +200,6 @@ func (h *ServerlessHandlers) HandleDeleteTrigger(w http.ResponseWriter, r *http. ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) defer cancel() - // Look up the trigger's topic before deleting (for cache invalidation) fn, err := h.registry.Get(ctx, namespace, functionName, 0) if err != nil { if serverless.IsNotFound(err) { @@ -151,38 +210,47 @@ func (h *ServerlessHandlers) HandleDeleteTrigger(w http.ResponseWriter, r *http. return } - // Get current triggers to find the topic for cache invalidation - triggers, err := h.triggerStore.ListByFunction(ctx, fn.ID) - if err != nil { - writeError(w, http.StatusInternalServerError, "Failed to look up triggers") - return - } - - // Find the topic for the trigger being deleted + // Walk the PubSub list first to capture the topic for cache invalidation. var triggerTopic string - for _, t := range triggers { - if t.ID == triggerID { - triggerTopic = t.Topic - break + if h.triggerStore != nil { + triggers, listErr := h.triggerStore.ListByFunction(ctx, fn.ID) + if listErr == nil { + for _, t := range triggers { + if t.ID == triggerID { + triggerTopic = t.Topic + break + } + } } } - if err := h.triggerStore.Remove(ctx, triggerID); err != nil { - writeError(w, http.StatusInternalServerError, "Failed to remove trigger: "+err.Error()) + if triggerTopic != "" { + if err := h.triggerStore.Remove(ctx, triggerID); err != nil { + writeError(w, http.StatusInternalServerError, "Failed to remove trigger: "+err.Error()) + return + } + if h.dispatcher != nil { + h.dispatcher.InvalidateCache(ctx, namespace, triggerTopic) + } + h.logger.Info("PubSub trigger removed via API", + zap.String("function", functionName), + zap.String("trigger_id", triggerID), + ) + writeJSON(w, http.StatusOK, map[string]interface{}{"message": "Trigger removed"}) return } - // Invalidate cache for the topic - if h.dispatcher != nil && triggerTopic != "" { - h.dispatcher.InvalidateCache(ctx, namespace, triggerTopic) + // Not a PubSub trigger — try cron. + if h.cronStore != nil { + if err := h.cronStore.Remove(ctx, triggerID); err == nil { + h.logger.Info("Cron trigger removed via API", + zap.String("function", functionName), + zap.String("trigger_id", triggerID), + ) + writeJSON(w, http.StatusOK, map[string]interface{}{"message": "Trigger removed"}) + return + } } - h.logger.Info("PubSub trigger removed via API", - zap.String("function", functionName), - zap.String("trigger_id", triggerID), - ) - - writeJSON(w, http.StatusOK, map[string]interface{}{ - "message": "Trigger removed", - }) + writeError(w, http.StatusNotFound, "Trigger not found") } diff --git a/core/pkg/gateway/handlers/serverless/types.go b/core/pkg/gateway/handlers/serverless/types.go index 8d79ab3..bc0a947 100644 --- a/core/pkg/gateway/handlers/serverless/types.go +++ b/core/pkg/gateway/handlers/serverless/types.go @@ -20,6 +20,7 @@ type ServerlessHandlers struct { registry serverless.FunctionRegistry wsManager *serverless.WSManager triggerStore *triggers.PubSubTriggerStore + cronStore *triggers.CronTriggerStore // optional; nil = cron triggers unavailable dispatcher *triggers.PubSubDispatcher persistentMgr *persistent.Manager // optional; when nil persistent WS rejects 503 wsBridge *wsbridge.Bridge // optional; nil = no client→ns registration @@ -39,6 +40,7 @@ func NewServerlessHandlers( registry serverless.FunctionRegistry, wsManager *serverless.WSManager, triggerStore *triggers.PubSubTriggerStore, + cronStore *triggers.CronTriggerStore, dispatcher *triggers.PubSubDispatcher, persistentMgr *persistent.Manager, wsBridge *wsbridge.Bridge, @@ -51,6 +53,7 @@ func NewServerlessHandlers( registry: registry, wsManager: wsManager, triggerStore: triggerStore, + cronStore: cronStore, dispatcher: dispatcher, persistentMgr: persistentMgr, wsBridge: wsBridge, diff --git a/core/pkg/gateway/lifecycle.go b/core/pkg/gateway/lifecycle.go index 000ba30..9380a16 100644 --- a/core/pkg/gateway/lifecycle.go +++ b/core/pkg/gateway/lifecycle.go @@ -29,6 +29,13 @@ func (g *Gateway) Close() { } } + // Stop the cron scheduler before tearing down the engine — pending + // invocations call back into the invoker which still needs the engine + // to be alive. + if g.cronScheduler != nil { + g.cronScheduler.Stop() + } + // Drain persistent WebSocket instances. Each instance gets a slice of // the 30s budget; ws_close on each is best-effort. if g.persistentWSManager != nil { diff --git a/core/pkg/gateway/serverless_handlers_test.go b/core/pkg/gateway/serverless_handlers_test.go index 9c0c523..a872c7b 100644 --- a/core/pkg/gateway/serverless_handlers_test.go +++ b/core/pkg/gateway/serverless_handlers_test.go @@ -50,7 +50,7 @@ func TestServerlessHandlers_ListFunctions(t *testing.T) { }, } - h := serverlesshandlers.NewServerlessHandlers(nil, nil, registry, nil, nil, nil, nil, nil, nil, logger) + h := serverlesshandlers.NewServerlessHandlers(nil, nil, registry, nil, nil, nil, nil, nil, nil, nil, logger) req, _ := http.NewRequest("GET", "/v1/functions?namespace=ns1", nil) rr := httptest.NewRecorder() @@ -73,7 +73,7 @@ func TestServerlessHandlers_DeployFunction(t *testing.T) { logger := zap.NewNop() registry := &mockFunctionRegistry{} - h := serverlesshandlers.NewServerlessHandlers(nil, nil, registry, nil, nil, nil, nil, nil, nil, logger) + h := serverlesshandlers.NewServerlessHandlers(nil, nil, registry, nil, nil, nil, nil, nil, nil, nil, logger) // Test JSON deploy (which is partially supported according to code) // Should be 400 because WASM is missing or base64 not supported diff --git a/core/pkg/serverless/engine.go b/core/pkg/serverless/engine.go index 1a35f12..69b2a85 100644 --- a/core/pkg/serverless/engine.go +++ b/core/pkg/serverless/engine.go @@ -423,6 +423,7 @@ func (e *Engine) registerHostModule(ctx context.Context) error { NewFunctionBuilder().WithFunc(e.hWSPubSubUnbridge).Export("ws_pubsub_unbridge"). NewFunctionBuilder().WithFunc(e.hWSSend).Export("ws_send"). NewFunctionBuilder().WithFunc(e.hWSBroadcast).Export("ws_broadcast"). + NewFunctionBuilder().WithFunc(e.hFunctionInvoke).Export("function_invoke"). NewFunctionBuilder().WithFunc(e.hLogInfo).Export("log_info"). NewFunctionBuilder().WithFunc(e.hLogError).Export("log_error"). Instantiate(ctx) @@ -743,6 +744,39 @@ func (e *Engine) hWSPubSubUnbridge(ctx context.Context, mod api.Module, return 1 } +// hFunctionInvoke is the WASM-callable wrapper for FunctionInvoke. Used by +// the rpc-router persistent function (and any future dispatcher) to run +// another function in the same namespace synchronously and forward its +// output back to its caller. +// +// Inputs: +// +// namePtr/nameLen — UTF-8 target function name +// payloadPtr/payloadLen — raw input bytes for the target function +// +// Returns a packed uint64 (ptr<<32 | len) pointing to the target's output +// bytes in guest memory, or 0 on error. The caller is expected to JSON- +// decode the output (target functions ack with JSON envelopes). +func (e *Engine) hFunctionInvoke(ctx context.Context, mod api.Module, + namePtr, nameLen, payloadPtr, payloadLen uint32) uint64 { + name, ok := e.executor.ReadFromGuest(mod, namePtr, nameLen) + if !ok { + return 0 + } + payload, ok := e.executor.ReadFromGuest(mod, payloadPtr, payloadLen) + if !ok { + return 0 + } + out, err := e.hostServices.FunctionInvoke(ctx, string(name), payload) + if err != nil { + e.logger.Warn("function_invoke failed", + zap.String("name", string(name)), + zap.Error(err)) + return 0 + } + return e.executor.WriteToGuest(ctx, mod, out) +} + // hWSSend is the WASM-callable wrapper for WSSend. // Inputs: clientID + raw frame bytes. clientID may be empty — in that case // the host falls back to the current invocation's WS client (if any). diff --git a/core/pkg/serverless/errors.go b/core/pkg/serverless/errors.go index 135dd6a..c06dd5f 100644 --- a/core/pkg/serverless/errors.go +++ b/core/pkg/serverless/errors.go @@ -55,6 +55,11 @@ var ( // ErrWSNotAvailable is returned when WebSocket operations are used outside WS context. ErrWSNotAvailable = errors.New("websocket operations not available in this context") + // ErrFunctionInvokeNotAvailable is returned when FunctionInvoke is called + // but no invoker has been wired into the host-services bag (e.g. unit + // tests, or before the gateway finishes bootstrap). + ErrFunctionInvokeNotAvailable = errors.New("function_invoke not available in this context") + // ErrWSClientNotFound is returned when a WebSocket client is not connected. ErrWSClientNotFound = errors.New("websocket client not found") diff --git a/core/pkg/serverless/hostfuncs_test.go b/core/pkg/serverless/hostfuncs_test.go index d35166a..1fa4421 100644 --- a/core/pkg/serverless/hostfuncs_test.go +++ b/core/pkg/serverless/hostfuncs_test.go @@ -122,6 +122,10 @@ func (m *mockHostServices) WSBroadcast(ctx context.Context, topic string, data [ return nil } +func (m *mockHostServices) FunctionInvoke(ctx context.Context, name string, payload []byte) ([]byte, error) { + return nil, nil +} + func (m *mockHostServices) HTTPFetch(ctx context.Context, method, url string, headers map[string]string, body []byte) ([]byte, error) { return nil, nil } diff --git a/core/pkg/serverless/hostfunctions/context.go b/core/pkg/serverless/hostfunctions/context.go index af7fd29..85e336c 100644 --- a/core/pkg/serverless/hostfunctions/context.go +++ b/core/pkg/serverless/hostfunctions/context.go @@ -31,6 +31,59 @@ func (h *HostFunctions) ClearContext() { h.invCtx = nil } +// SetInvoker wires the function invoker used by FunctionInvoke. Must be +// called once after both HostFunctions and Invoker exist (Invoker depends +// on HostServices, so the cycle is broken via this setter rather than a +// constructor argument). +func (h *HostFunctions) SetInvoker(inv serverless.FunctionInvoker) { + h.invokerLock.Lock() + defer h.invokerLock.Unlock() + h.invoker = inv +} + +// FunctionInvoke synchronously runs another function in the same namespace +// and returns its output bytes. Caller wallet, JWT claims, and WS client +// ID are inherited from the current invocation so the inner function sees +// the same authenticated identity. Returns ErrFunctionInvokeNotAvailable +// when no invoker has been wired (e.g. tests). +func (h *HostFunctions) FunctionInvoke(ctx context.Context, name string, payload []byte) ([]byte, error) { + h.invokerLock.RLock() + inv := h.invoker + h.invokerLock.RUnlock() + if inv == nil { + return nil, &serverless.HostFunctionError{ + Function: "function_invoke", + Cause: serverless.ErrFunctionInvokeNotAvailable, + } + } + + h.invCtxLock.RLock() + cur := h.invCtx + h.invCtxLock.RUnlock() + if cur == nil { + return nil, &serverless.HostFunctionError{ + Function: "function_invoke", + Cause: serverless.ErrFunctionInvokeNotAvailable, + } + } + + req := &serverless.InvokeRequest{ + Namespace: cur.Namespace, + FunctionName: name, + Input: payload, + TriggerType: serverless.TriggerTypeWebSocket, + CallerWallet: cur.CallerWallet, + CallerIP: cur.CallerIP, + WSClientID: cur.WSClientID, + CallerClaims: cur.CallerClaims, + } + resp, err := inv.Invoke(ctx, req) + if err != nil { + return nil, &serverless.HostFunctionError{Function: "function_invoke", Cause: err} + } + return resp.Output, nil +} + // GetEnv retrieves an environment variable for the function. func (h *HostFunctions) GetEnv(ctx context.Context, key string) (string, error) { h.invCtxLock.RLock() diff --git a/core/pkg/serverless/hostfunctions/types.go b/core/pkg/serverless/hostfunctions/types.go index ecb0ba8..12ccaa4 100644 --- a/core/pkg/serverless/hostfunctions/types.go +++ b/core/pkg/serverless/hostfunctions/types.go @@ -43,6 +43,12 @@ type HostFunctions struct { // — bridging is a deliberate request whose absence should be visible. wsBridge *wsbridge.Bridge + // invoker is set after construction (via SetInvoker) to break the + // engine ↔ host-functions circular dep. nil means FunctionInvoke + // returns ErrFunctionInvokeNotAvailable. + invoker serverless.FunctionInvoker + invokerLock sync.RWMutex + // Current invocation context (set per-execution) invCtx *serverless.InvocationContext invCtxLock sync.RWMutex diff --git a/core/pkg/serverless/invoke.go b/core/pkg/serverless/invoke.go index 9d3c738..6d9a952 100644 --- a/core/pkg/serverless/invoke.go +++ b/core/pkg/serverless/invoke.go @@ -11,6 +11,16 @@ import ( "go.uber.org/zap" ) +// FunctionInvoker is the minimal interface needed to invoke a function by +// name. It exists so packages downstream of `serverless` (notably +// `serverless/hostfunctions`) can hold a reference to the concrete +// `*Invoker` without creating an import cycle. +// +// Implemented by `*Invoker`. +type FunctionInvoker interface { + Invoke(ctx context.Context, req *InvokeRequest) (*InvokeResponse, error) +} + // Invoker handles function invocation with retry logic and DLQ support. // It wraps the Engine to provide higher-level invocation semantics. type Invoker struct { diff --git a/core/pkg/serverless/mocks_test.go b/core/pkg/serverless/mocks_test.go index e4ffcf4..dbfdb53 100644 --- a/core/pkg/serverless/mocks_test.go +++ b/core/pkg/serverless/mocks_test.go @@ -208,6 +208,10 @@ func (m *MockHostServices) WSBroadcast(ctx context.Context, topic string, data [ return nil } +func (m *MockHostServices) FunctionInvoke(ctx context.Context, name string, payload []byte) ([]byte, error) { + return nil, nil +} + func (m *MockHostServices) HTTPFetch(ctx context.Context, method, url string, headers map[string]string, body []byte) ([]byte, error) { return nil, nil } diff --git a/core/pkg/serverless/triggers/cron_parser.go b/core/pkg/serverless/triggers/cron_parser.go new file mode 100644 index 0000000..a38efa0 --- /dev/null +++ b/core/pkg/serverless/triggers/cron_parser.go @@ -0,0 +1,211 @@ +package triggers + +import ( + "fmt" + "strconv" + "strings" + "time" +) + +// CronExpression is a parsed cron expression that can compute the next +// occurrence after an arbitrary instant. +// +// Two layouts are accepted: +// +// 5 fields: minute hour day-of-month month day-of-week +// 6 fields: second minute hour day-of-month month day-of-week +// +// Each field supports `*`, single integers, comma-separated lists, +// `a-b` ranges, and `*/n` step expressions. Day-of-week values are +// 0-6 with 0 = Sunday; 7 is normalised to 0. Month values are 1-12. +// +// Schedules are treated as wall-clock UTC; that matches the gateway's +// time.Now() — no per-namespace timezone configuration today. +type CronExpression struct { + hasSeconds bool + seconds fieldMatcher // 0-59 (only when hasSeconds) + minutes fieldMatcher // 0-59 + hours fieldMatcher // 0-23 + dom fieldMatcher // 1-31 + month fieldMatcher // 1-12 + dow fieldMatcher // 0-6 (Sunday = 0) + expr string +} + +// fieldMatcher is a bitmask over the legal value range for a cron field. +// Up to 64 entries — the largest range we model is seconds/minutes +// (0..59), comfortably below 64. +type fieldMatcher uint64 + +func (f fieldMatcher) match(v int) bool { + if v < 0 || v >= 64 { + return false + } + return f&(1<= 0 { + n, err := strconv.Atoi(seg[i+1:]) + if err != nil || n <= 0 { + return 0, fmt.Errorf("bad step in %q", seg) + } + step = n + seg = seg[:i] + } + var rangeLo, rangeHi int + switch { + case seg == "*": + rangeLo, rangeHi = lo, hi + case strings.Contains(seg, "-"): + parts := strings.SplitN(seg, "-", 2) + a, err1 := strconv.Atoi(parts[0]) + b, err2 := strconv.Atoi(parts[1]) + if err1 != nil || err2 != nil { + return 0, fmt.Errorf("bad range %q", seg) + } + rangeLo, rangeHi = a, b + default: + n, err := strconv.Atoi(seg) + if err != nil { + return 0, fmt.Errorf("bad value %q", seg) + } + rangeLo, rangeHi = n, n + } + if rangeLo < lo || rangeHi > hi || rangeLo > rangeHi { + return 0, fmt.Errorf("value %d-%d out of range [%d,%d]", rangeLo, rangeHi, lo, hi) + } + for v := rangeLo; v <= rangeHi; v += step { + if v >= 64 { + continue + } + mask |= 1 << uint(v) + } + } + return mask, nil +} + +// Next returns the smallest time strictly after `after` that matches the +// expression. The search is bounded to a few years out — schedules that +// can never match (e.g. day-of-month=31 with month=Feb) return a non-nil +// error rather than looping forever. +func (c *CronExpression) Next(after time.Time) (time.Time, error) { + t := after.UTC() + if c.hasSeconds { + t = t.Add(time.Second).Truncate(time.Second) + } else { + t = t.Add(time.Minute).Truncate(time.Minute) + } + + // Search horizon: 5 years. A valid expression matches well within + // this window; pathological ones (impossible date combos) are caught + // by this bound. + deadline := after.Add(5 * 365 * 24 * time.Hour) + for t.Before(deadline) { + if !c.month.match(int(t.Month())) { + // Jump to the first of the next month. + t = time.Date(t.Year(), t.Month()+1, 1, 0, 0, 0, 0, time.UTC) + continue + } + if !c.dom.match(t.Day()) || !c.dow.match(int(t.Weekday())) { + t = time.Date(t.Year(), t.Month(), t.Day()+1, 0, 0, 0, 0, time.UTC) + continue + } + if !c.hours.match(t.Hour()) { + t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour()+1, 0, 0, 0, time.UTC) + continue + } + if !c.minutes.match(t.Minute()) { + t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute()+1, 0, 0, time.UTC) + continue + } + if c.hasSeconds && !c.seconds.match(t.Second()) { + t = t.Add(time.Second) + continue + } + return t, nil + } + return time.Time{}, fmt.Errorf("cron: no match within 5 years for %q", c.expr) +} + +// String returns the original expression as parsed. +func (c *CronExpression) String() string { return c.expr } diff --git a/core/pkg/serverless/triggers/cron_scheduler.go b/core/pkg/serverless/triggers/cron_scheduler.go new file mode 100644 index 0000000..d0edbfa --- /dev/null +++ b/core/pkg/serverless/triggers/cron_scheduler.go @@ -0,0 +1,168 @@ +package triggers + +import ( + "context" + "sync" + "time" + + "github.com/DeBrosOfficial/network/pkg/serverless" + "go.uber.org/zap" +) + +// CronInvoker is the subset of the gateway's serverless.Invoker that the +// scheduler uses. Stated as an interface so tests can swap it out. +type CronInvoker interface { + Invoke(ctx context.Context, req *serverless.InvokeRequest) (*serverless.InvokeResponse, error) +} + +// CronScheduler is a single goroutine that periodically scans the +// function_cron_triggers table for due rows and invokes the matching +// functions. +// +// One scheduler per gateway instance is sufficient: the work is bounded +// by the number of cron triggers (small) and per-tick polling is cheap. +// If multiple gateway instances run concurrently, MarkRun's "next_run_at +// = computed_next" UPDATE acts as a soft lease — duplicate invocations +// are reduced to a tight race window. For stricter exactly-once we'd +// need an explicit lease table; deferred until needed. +type CronScheduler struct { + store *CronTriggerStore + invoker CronInvoker + logger *zap.Logger + pollInterval time.Duration + batchLimit int + + cancel context.CancelFunc + wg sync.WaitGroup +} + +// NewCronScheduler builds a scheduler. Reasonable defaults: poll every +// 30 seconds, dispatch up to 100 triggers per tick. +func NewCronScheduler( + store *CronTriggerStore, + invoker CronInvoker, + logger *zap.Logger, + pollInterval time.Duration, +) *CronScheduler { + if pollInterval <= 0 { + pollInterval = 30 * time.Second + } + return &CronScheduler{ + store: store, + invoker: invoker, + logger: logger, + pollInterval: pollInterval, + batchLimit: 100, + } +} + +// Start launches the polling goroutine. Idempotent: a second Start while +// already running is a no-op. +func (s *CronScheduler) Start(ctx context.Context) { + if s.cancel != nil { + return + } + runCtx, cancel := context.WithCancel(ctx) + s.cancel = cancel + s.wg.Add(1) + go s.loop(runCtx) + s.logger.Info("cron scheduler started", zap.Duration("poll_interval", s.pollInterval)) +} + +// Stop cancels the goroutine and waits for it to exit. Safe to call +// multiple times. +func (s *CronScheduler) Stop() { + if s.cancel == nil { + return + } + s.cancel() + s.cancel = nil + s.wg.Wait() + s.logger.Info("cron scheduler stopped") +} + +func (s *CronScheduler) loop(ctx context.Context) { + defer s.wg.Done() + ticker := time.NewTicker(s.pollInterval) + defer ticker.Stop() + + // Run an immediate tick so a freshly-booted gateway picks up triggers + // that fired during the downtime, instead of waiting `pollInterval`. + s.tick(ctx) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.tick(ctx) + } + } +} + +func (s *CronScheduler) tick(ctx context.Context) { + now := time.Now().UTC() + due, err := s.store.ListDue(ctx, now, s.batchLimit) + if err != nil { + s.logger.Warn("cron scheduler: ListDue failed", zap.Error(err)) + return + } + if len(due) == 0 { + return + } + for _, row := range due { + if ctx.Err() != nil { + return + } + s.dispatch(ctx, row, now) + } +} + +func (s *CronScheduler) dispatch(ctx context.Context, row CronDueRow, now time.Time) { + // Compute the next run BEFORE invoking so we always advance the cursor + // even if the invoke errors. This prevents a busted handler from + // starving the scheduler. + parsed, err := ParseCron(row.CronExpression) + if err != nil { + s.logger.Warn("cron scheduler: bad expression — disabling trigger", + zap.String("trigger_id", row.TriggerID), + zap.String("expr", row.CronExpression), + zap.Error(err)) + // Push next_run_at far out so we don't keep looking at this row. + _ = s.store.MarkRun(ctx, row.TriggerID, now, now.Add(365*24*time.Hour), + "error", "bad cron expression: "+err.Error()) + return + } + next, err := parsed.Next(now) + if err != nil { + _ = s.store.MarkRun(ctx, row.TriggerID, now, now.Add(365*24*time.Hour), + "error", "no next match: "+err.Error()) + return + } + + req := &serverless.InvokeRequest{ + Namespace: row.Namespace, + FunctionName: row.FunctionName, + Input: []byte(`{"trigger":"cron"}`), + TriggerType: serverless.TriggerTypeCron, + } + resp, invErr := s.invoker.Invoke(ctx, req) + status := "ok" + errMsg := "" + if invErr != nil { + status = "error" + errMsg = invErr.Error() + s.logger.Warn("cron scheduler: invoke failed", + zap.String("trigger_id", row.TriggerID), + zap.String("function", row.FunctionName), + zap.String("namespace", row.Namespace), + zap.Error(invErr)) + } else if resp != nil && resp.Status != serverless.InvocationStatusSuccess { + status = "error" + errMsg = resp.Error + } + if err := s.store.MarkRun(ctx, row.TriggerID, now, next, status, errMsg); err != nil { + s.logger.Warn("cron scheduler: MarkRun failed", + zap.String("trigger_id", row.TriggerID), + zap.Error(err)) + } +} diff --git a/core/pkg/serverless/triggers/cron_store.go b/core/pkg/serverless/triggers/cron_store.go new file mode 100644 index 0000000..f62e7a9 --- /dev/null +++ b/core/pkg/serverless/triggers/cron_store.go @@ -0,0 +1,196 @@ +package triggers + +import ( + "context" + "fmt" + "time" + + "github.com/DeBrosOfficial/network/pkg/rqlite" + "github.com/DeBrosOfficial/network/pkg/serverless" + "github.com/google/uuid" + "go.uber.org/zap" +) + +// CronTriggerStore manages cron-trigger persistence in RQLite. Reads/writes +// the function_cron_triggers table created by migration 004. +// +// Each row carries the cron expression, last/next run timestamps, last +// status / error, and an enabled flag — that's all the scheduler needs to +// pick due triggers up. Function metadata (name + namespace) is JOINed at +// dispatch time. +type CronTriggerStore struct { + db rqlite.Client + logger *zap.Logger +} + +// NewCronTriggerStore creates a new cron trigger store. +func NewCronTriggerStore(db rqlite.Client, logger *zap.Logger) *CronTriggerStore { + return &CronTriggerStore{db: db, logger: logger} +} + +// cronRow maps to function_cron_triggers for query scanning. +type cronRow struct { + ID string + FunctionID string + CronExpression string + NextRunAt time.Time + LastRunAt *time.Time + LastStatus *string + LastError *string + Enabled bool + CreatedAt time.Time +} + +// CronDueRow is the JOINed row returned by ListDue: trigger + the function +// metadata the scheduler needs to invoke it. +type CronDueRow struct { + TriggerID string + FunctionID string + FunctionName string + Namespace string + CronExpression string + NextRunAt time.Time +} + +// Add registers a new cron trigger. Validates the expression up front so +// callers get an immediate error instead of discovering it at scheduler +// firing time. +func (s *CronTriggerStore) Add(ctx context.Context, functionID, expr string) (string, error) { + if functionID == "" { + return "", fmt.Errorf("function ID required") + } + parsed, err := ParseCron(expr) + if err != nil { + return "", err + } + now := time.Now().UTC() + next, err := parsed.Next(now) + if err != nil { + return "", err + } + + id := uuid.New().String() + query := ` + INSERT INTO function_cron_triggers + (id, function_id, cron_expression, next_run_at, enabled, created_at) + VALUES (?, ?, ?, ?, TRUE, ?) + ` + if _, err := s.db.Exec(ctx, query, id, functionID, expr, next, now); err != nil { + return "", fmt.Errorf("failed to add cron trigger: %w", err) + } + s.logger.Info("Cron trigger added", + zap.String("trigger_id", id), + zap.String("function_id", functionID), + zap.String("expr", expr), + zap.Time("next_run_at", next), + ) + return id, nil +} + +// Remove deletes a trigger by ID. +func (s *CronTriggerStore) Remove(ctx context.Context, triggerID string) error { + if triggerID == "" { + return fmt.Errorf("trigger ID required") + } + query := `DELETE FROM function_cron_triggers WHERE id = ?` + res, err := s.db.Exec(ctx, query, triggerID) + if err != nil { + return fmt.Errorf("failed to remove cron trigger: %w", err) + } + if affected, _ := res.RowsAffected(); affected == 0 { + return fmt.Errorf("trigger not found: %s", triggerID) + } + return nil +} + +// RemoveByFunction wipes every cron trigger for a function. Used during +// re-deploy so stale schedules don't survive a function rewrite. +func (s *CronTriggerStore) RemoveByFunction(ctx context.Context, functionID string) error { + if functionID == "" { + return fmt.Errorf("function ID required") + } + _, err := s.db.Exec(ctx, `DELETE FROM function_cron_triggers WHERE function_id = ?`, functionID) + return err +} + +// ListByFunction returns every cron trigger for a function. Used by the +// CLI's `triggers list` and the gateway's HandleListTriggers. +func (s *CronTriggerStore) ListByFunction(ctx context.Context, functionID string) ([]serverless.CronTrigger, error) { + if functionID == "" { + return nil, fmt.Errorf("function ID required") + } + query := ` + SELECT id, function_id, cron_expression, next_run_at, last_run_at, + last_status, last_error, enabled, created_at + FROM function_cron_triggers + WHERE function_id = ? + ` + var rows []cronRow + if err := s.db.Query(ctx, &rows, query, functionID); err != nil { + return nil, fmt.Errorf("failed to list cron triggers: %w", err) + } + out := make([]serverless.CronTrigger, len(rows)) + for i, r := range rows { + nextRunAt := r.NextRunAt // copy so &local doesn't capture the loop var + t := serverless.CronTrigger{ + ID: r.ID, + FunctionID: r.FunctionID, + CronExpression: r.CronExpression, + NextRunAt: &nextRunAt, + Enabled: r.Enabled, + } + if r.LastRunAt != nil { + lastRunAt := *r.LastRunAt + t.LastRunAt = &lastRunAt + } + out[i] = t + } + return out, nil +} + +// ListDue returns enabled triggers whose next_run_at has elapsed, joined +// with the owning function so the scheduler can invoke them without an +// extra registry round-trip per trigger. Bounded by `limit` to avoid +// unbounded scan of a backlog after a long outage. +func (s *CronTriggerStore) ListDue(ctx context.Context, now time.Time, limit int) ([]CronDueRow, error) { + if limit <= 0 { + limit = 100 + } + query := ` + SELECT t.id AS trigger_id, t.function_id AS function_id, + f.name AS function_name, f.namespace AS namespace, + t.cron_expression AS cron_expression, t.next_run_at AS next_run_at + FROM function_cron_triggers t + JOIN functions f ON t.function_id = f.id + WHERE t.enabled = TRUE + AND f.status = 'active' + AND t.next_run_at <= ? + ORDER BY t.next_run_at ASC + LIMIT ? + ` + var rows []CronDueRow + if err := s.db.Query(ctx, &rows, query, now, limit); err != nil { + return nil, fmt.Errorf("failed to query due cron triggers: %w", err) + } + return rows, nil +} + +// MarkRun updates next_run_at + last_run_at + last_status / last_error +// after the scheduler invokes a trigger. Idempotent: if the row was +// removed concurrently, the UPDATE is a no-op. +func (s *CronTriggerStore) MarkRun( + ctx context.Context, + triggerID string, + ranAt time.Time, + nextRunAt time.Time, + status string, + errMsg string, +) error { + query := ` + UPDATE function_cron_triggers + SET last_run_at = ?, next_run_at = ?, last_status = ?, last_error = ? + WHERE id = ? + ` + _, err := s.db.Exec(ctx, query, ranAt, nextRunAt, status, errMsg, triggerID) + return err +} diff --git a/core/pkg/serverless/types.go b/core/pkg/serverless/types.go index 0962c52..12ad738 100644 --- a/core/pkg/serverless/types.go +++ b/core/pkg/serverless/types.go @@ -421,6 +421,19 @@ type HostServices interface { WSSend(ctx context.Context, clientID string, data []byte) error WSBroadcast(ctx context.Context, topic string, data []byte) error + // FunctionInvoke synchronously invokes another function in the same + // namespace from inside a function (e.g. a persistent rpc-router + // dispatching client RPCs to per-op handlers). The caller's wallet, + // JWT claims, and WS client ID are inherited so the invoked function + // sees the same authenticated identity as the outer call. + // + // `name` is the target function name; `payload` is the raw input bytes + // to feed the function (typically JSON). Returns the function's output + // bytes on success. Errors (not found, unauthorized, runtime) are + // returned as Go errors and the caller should surface them as + // rpc_error to the client. + FunctionInvoke(ctx context.Context, name string, payload []byte) ([]byte, error) + // HTTP operations HTTPFetch(ctx context.Context, method, url string, headers map[string]string, body []byte) ([]byte, error)