This commit is contained in:
JohnySigma 2026-05-05 11:35:35 +03:00
parent ba68291566
commit 0f42816a78
25 changed files with 933 additions and 74 deletions

BIN
bin/gateway Executable file

Binary file not shown.

BIN
bin/identity Executable file

Binary file not shown.

BIN
bin/orama Executable file

Binary file not shown.

BIN
bin/orama-node Executable file

Binary file not shown.

View File

@ -24,6 +24,14 @@ type FunctionConfig struct {
Timeout int `yaml:"timeout"`
Retry RetryConfig `yaml:"retry"`
Env map[string]string `yaml:"env"`
// Persistent WebSocket settings — when WSPersistent is true, the function
// must export ws_open / ws_frame / ws_close instead of running per-frame
// stateless. See core/plans/platform/06_PERSISTENT_WS_FUNCTIONS.md.
WSPersistent bool `yaml:"ws_persistent"`
WSIdleTimeoutSec int `yaml:"ws_idle_timeout_sec"`
WSMaxFrameBytes int `yaml:"ws_max_frame_bytes"`
WSMaxInflightPerConn int `yaml:"ws_max_inflight_per_conn"`
}
// RetryConfig holds retry settings.
@ -198,11 +206,28 @@ func uploadWASMFunction(wasmPath string, cfg *FunctionConfig) (map[string]interf
writer.WriteField("retry_count", strconv.Itoa(cfg.Retry.Count))
writer.WriteField("retry_delay_seconds", strconv.Itoa(cfg.Retry.Delay))
// Add env vars as metadata JSON
// Build metadata JSON. The deploy handler json.Unmarshal()s this into
// FunctionDefinition first, then overlays the explicit form fields below.
// Any field that has no explicit form-field equivalent (env vars, the
// ws_* persistent settings) MUST live in this blob.
metaObj := map[string]interface{}{}
if len(cfg.Env) > 0 {
metadata, _ := json.Marshal(map[string]interface{}{
"env_vars": cfg.Env,
})
metaObj["env_vars"] = cfg.Env
}
if cfg.WSPersistent {
metaObj["ws_persistent"] = true
}
if cfg.WSIdleTimeoutSec > 0 {
metaObj["ws_idle_timeout_sec"] = cfg.WSIdleTimeoutSec
}
if cfg.WSMaxFrameBytes > 0 {
metaObj["ws_max_frame_bytes"] = cfg.WSMaxFrameBytes
}
if cfg.WSMaxInflightPerConn > 0 {
metaObj["ws_max_inflight_per_conn"] = cfg.WSMaxInflightPerConn
}
if len(metaObj) > 0 {
metadata, _ := json.Marshal(metaObj)
writer.WriteField("metadata", string(metadata))
}

View File

@ -10,30 +10,42 @@ import (
"github.com/spf13/cobra"
)
var triggerTopic string
var (
triggerTopic string
triggerSchedule string
)
// TriggersCmd is the parent command for trigger management.
var TriggersCmd = &cobra.Command{
Use: "triggers",
Short: "Manage function PubSub triggers",
Long: `Add, list, and delete PubSub triggers for your serverless functions.
Short: "Manage function PubSub and cron triggers",
Long: `Add, list, and delete triggers for your serverless functions.
When a message is published to a topic, all functions with a trigger on
that topic are automatically invoked with the message as input.
PubSub: when a message is published to a topic, every function with a
matching trigger is invoked with the message as input.
Cron: a function is invoked on a schedule (5-field crontab, or 6-field
crontab with a leading seconds column).
Examples:
orama function triggers add my-function --topic calls:invite
orama function triggers add my-function --schedule "0 3 * * *"
orama function triggers add my-function --schedule "*/30 * * * * *"
orama function triggers list my-function
orama function triggers delete my-function <trigger-id>`,
}
// TriggersAddCmd adds a PubSub trigger to a function.
// TriggersAddCmd adds a PubSub or Cron trigger to a function.
var TriggersAddCmd = &cobra.Command{
Use: "add <function-name>",
Short: "Add a PubSub trigger",
Long: "Registers a PubSub trigger so the function is invoked when a message is published to the topic.",
Args: cobra.ExactArgs(1),
RunE: runTriggersAdd,
Short: "Add a PubSub or Cron trigger",
Long: `Registers a trigger that invokes the function automatically.
Pass exactly one of --topic (PubSub) or --schedule (cron). Schedules
accept either 5-field crontab (minute hour dom month dow) or 6-field
with seconds (sec minute hour dom month dow).`,
Args: cobra.ExactArgs(1),
RunE: runTriggersAdd,
}
// TriggersListCmd lists triggers for a function.
@ -57,15 +69,18 @@ func init() {
TriggersCmd.AddCommand(TriggersListCmd)
TriggersCmd.AddCommand(TriggersDeleteCmd)
TriggersAddCmd.Flags().StringVar(&triggerTopic, "topic", "", "PubSub topic to trigger on (required)")
TriggersAddCmd.MarkFlagRequired("topic")
TriggersAddCmd.Flags().StringVar(&triggerTopic, "topic", "", "PubSub topic to trigger on")
TriggersAddCmd.Flags().StringVar(&triggerSchedule, "schedule", "", "Cron expression to trigger on (e.g. \"0 3 * * *\")")
TriggersAddCmd.MarkFlagsMutuallyExclusive("topic", "schedule")
TriggersAddCmd.MarkFlagsOneRequired("topic", "schedule")
}
func runTriggersAdd(cmd *cobra.Command, args []string) error {
funcName := args[0]
body, _ := json.Marshal(map[string]string{
"topic": triggerTopic,
"topic": triggerTopic,
"cron_expression": triggerSchedule,
})
resp, err := apiRequest("POST", "/v1/functions/"+funcName+"/triggers", bytes.NewReader(body), "application/json")
@ -88,7 +103,11 @@ func runTriggersAdd(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to parse response: %w", err)
}
fmt.Printf("Trigger added: %s → %s (id: %s)\n", triggerTopic, funcName, result["trigger_id"])
if triggerSchedule != "" {
fmt.Printf("Trigger added: cron(%s) → %s (id: %s)\n", triggerSchedule, funcName, result["trigger_id"])
} else {
fmt.Printf("Trigger added: %s → %s (id: %s)\n", triggerTopic, funcName, result["trigger_id"])
}
return nil
}

View File

@ -68,6 +68,12 @@ type Dependencies struct {
// PubSub trigger dispatcher (used to wire into PubSubHandlers)
PubSubDispatcher *triggers.PubSubDispatcher
// Cron trigger store + scheduler. The scheduler is started by gateway
// lifecycle code after Dependencies is constructed; Stop is called
// during shutdown.
CronTriggerStore *triggers.CronTriggerStore
CronScheduler *triggers.CronScheduler
// PersistentWSManager tracks long-lived WS function instances.
// Used by the WS handler when fn.WSPersistent=true; nil = disabled.
PersistentWSManager *persistent.Manager
@ -496,6 +502,11 @@ func initializeServerless(logger *logging.ColoredLogger, cfg *Config, deps *Depe
// Create invoker
deps.ServerlessInvoker = serverless.NewInvoker(engine, registry, hostFuncs, logger.Logger)
// Wire the invoker back into hostFuncs so the function_invoke host
// function can dispatch sub-invocations from inside a WASM function
// (e.g. rpc-router routing client RPCs to per-op handlers).
hostFuncs.SetInvoker(deps.ServerlessInvoker)
// Create PubSub trigger store and dispatcher
triggerStore := triggers.NewPubSubTriggerStore(deps.ORMClient, logger.Logger)
@ -510,6 +521,19 @@ func initializeServerless(logger *logging.ColoredLogger, cfg *Config, deps *Depe
logger.Logger,
)
// Cron trigger store + scheduler. The scheduler polls
// function_cron_triggers and invokes due rows via the same
// ServerlessInvoker used for PubSub triggers; the ↓ Start call wires
// the goroutine up — Stop is invoked from gateway lifecycle shutdown.
cronStore := triggers.NewCronTriggerStore(deps.ORMClient, logger.Logger)
deps.CronTriggerStore = cronStore
deps.CronScheduler = triggers.NewCronScheduler(
cronStore,
deps.ServerlessInvoker,
logger.Logger,
30*time.Second,
)
// Persistent WS instance manager. Cap from gateway config (TODO: surface
// the knob); 5000 is a sensible default per plan 06.
deps.PersistentWSManager = persistent.NewManager(5000, logger.Logger)
@ -521,6 +545,7 @@ func initializeServerless(logger *logging.ColoredLogger, cfg *Config, deps *Depe
registry,
deps.ServerlessWSMgr,
triggerStore,
cronStore,
deps.PubSubDispatcher,
deps.PersistentWSManager,
deps.WSBridge,

View File

@ -96,6 +96,7 @@ type Gateway struct {
serverlessHandlers *serverlesshandlers.ServerlessHandlers
pubsubDispatcher *triggers.PubSubDispatcher
persistentWSManager *persistent.Manager
cronScheduler *triggers.CronScheduler
// Authentication service
authService *auth.Service
@ -356,6 +357,11 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) {
if deps.PersistentWSManager != nil {
gw.persistentWSManager = deps.PersistentWSManager
}
if deps.CronScheduler != nil {
gw.cronScheduler = deps.CronScheduler
// Background goroutine — Stop is called from gateway.Close.
gw.cronScheduler.Start(context.Background())
}
// Push notification handlers — disabled when no provider is configured.
// The handlers themselves return 503 if dispatcher/store is nil; we

View File

@ -94,6 +94,7 @@ func newTestHandlers(reg serverless.FunctionRegistry) *ServerlessHandlers {
reg,
wsManager,
nil, // triggerStore
nil, // cronStore
nil, // dispatcher
nil, // persistentMgr
nil, // wsBridge

View File

@ -96,6 +96,7 @@ func newSecretsTestHandlers(sm serverless.SecretsManager) *ServerlessHandlers {
newMockRegistry(),
wsManager,
nil, // triggerStore
nil, // cronStore
nil, // dispatcher
nil, // persistentMgr
nil, // wsBridge

View File

@ -10,19 +10,17 @@ import (
"go.uber.org/zap"
)
// addTriggerRequest is the request body for adding a PubSub trigger.
// addTriggerRequest is the request body for adding a PubSub or Cron trigger.
// Exactly one of `topic` or `cron_expression` must be set.
type addTriggerRequest struct {
Topic string `json:"topic"`
Topic string `json:"topic"`
CronExpression string `json:"cron_expression"`
}
// HandleAddTrigger handles POST /v1/functions/{name}/triggers
// Adds a PubSub trigger that invokes this function when a message is published to the topic.
// Branches between PubSub (topic) and Cron (cron_expression) based on the
// request body. Both stores must be wired for their respective branches.
func (h *ServerlessHandlers) HandleAddTrigger(w http.ResponseWriter, r *http.Request, functionName string) {
if h.triggerStore == nil {
writeError(w, http.StatusNotImplemented, "PubSub triggers not available")
return
}
namespace := h.getNamespaceFromRequest(r)
if namespace == "" {
writeError(w, http.StatusBadRequest, "namespace required")
@ -35,15 +33,18 @@ func (h *ServerlessHandlers) HandleAddTrigger(w http.ResponseWriter, r *http.Req
return
}
if req.Topic == "" {
writeError(w, http.StatusBadRequest, "topic required")
if req.Topic == "" && req.CronExpression == "" {
writeError(w, http.StatusBadRequest, "topic or cron_expression required")
return
}
if req.Topic != "" && req.CronExpression != "" {
writeError(w, http.StatusBadRequest, "topic and cron_expression are mutually exclusive")
return
}
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
// Look up function to get its ID
fn, err := h.registry.Get(ctx, namespace, functionName, 0)
if err != nil {
if serverless.IsNotFound(err) {
@ -54,6 +55,38 @@ func (h *ServerlessHandlers) HandleAddTrigger(w http.ResponseWriter, r *http.Req
return
}
if req.CronExpression != "" {
if h.cronStore == nil {
writeError(w, http.StatusNotImplemented, "Cron triggers not available")
return
}
triggerID, err := h.cronStore.Add(ctx, fn.ID, req.CronExpression)
if err != nil {
h.logger.Error("Failed to add Cron trigger",
zap.String("function", functionName),
zap.String("cron_expression", req.CronExpression),
zap.Error(err),
)
writeError(w, http.StatusBadRequest, "Failed to add trigger: "+err.Error())
return
}
h.logger.Info("Cron trigger added via API",
zap.String("function", functionName),
zap.String("cron_expression", req.CronExpression),
zap.String("trigger_id", triggerID),
)
writeJSON(w, http.StatusCreated, map[string]interface{}{
"trigger_id": triggerID,
"function": functionName,
"cron_expression": req.CronExpression,
})
return
}
if h.triggerStore == nil {
writeError(w, http.StatusNotImplemented, "PubSub triggers not available")
return
}
triggerID, err := h.triggerStore.Add(ctx, fn.ID, req.Topic)
if err != nil {
h.logger.Error("Failed to add PubSub trigger",
@ -64,18 +97,14 @@ func (h *ServerlessHandlers) HandleAddTrigger(w http.ResponseWriter, r *http.Req
writeError(w, http.StatusInternalServerError, "Failed to add trigger: "+err.Error())
return
}
// Invalidate cache for this topic
if h.dispatcher != nil {
h.dispatcher.InvalidateCache(ctx, namespace, req.Topic)
}
h.logger.Info("PubSub trigger added via API",
zap.String("function", functionName),
zap.String("topic", req.Topic),
zap.String("trigger_id", triggerID),
)
writeJSON(w, http.StatusCreated, map[string]interface{}{
"trigger_id": triggerID,
"function": functionName,
@ -84,10 +113,12 @@ func (h *ServerlessHandlers) HandleAddTrigger(w http.ResponseWriter, r *http.Req
}
// HandleListTriggers handles GET /v1/functions/{name}/triggers
// Lists all PubSub triggers for a function.
// Returns the merged set of PubSub and Cron triggers for a function.
// Each row carries enough metadata for the CLI's `triggers list` to render
// it; the kind is implied by which fields are populated (Topic vs CronExpression).
func (h *ServerlessHandlers) HandleListTriggers(w http.ResponseWriter, r *http.Request, functionName string) {
if h.triggerStore == nil {
writeError(w, http.StatusNotImplemented, "PubSub triggers not available")
if h.triggerStore == nil && h.cronStore == nil {
writeError(w, http.StatusNotImplemented, "Triggers not available")
return
}
@ -100,7 +131,6 @@ func (h *ServerlessHandlers) HandleListTriggers(w http.ResponseWriter, r *http.R
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
// Look up function to get its ID
fn, err := h.registry.Get(ctx, namespace, functionName, 0)
if err != nil {
if serverless.IsNotFound(err) {
@ -111,23 +141,53 @@ func (h *ServerlessHandlers) HandleListTriggers(w http.ResponseWriter, r *http.R
return
}
triggers, err := h.triggerStore.ListByFunction(ctx, fn.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, "Failed to list triggers")
return
merged := []map[string]interface{}{}
if h.triggerStore != nil {
pubsubTriggers, err := h.triggerStore.ListByFunction(ctx, fn.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, "Failed to list pubsub triggers")
return
}
for _, t := range pubsubTriggers {
merged = append(merged, map[string]interface{}{
"id": t.ID,
"kind": "pubsub",
"topic": t.Topic,
"enabled": t.Enabled,
})
}
}
if h.cronStore != nil {
cronTriggers, err := h.cronStore.ListByFunction(ctx, fn.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, "Failed to list cron triggers")
return
}
for _, t := range cronTriggers {
merged = append(merged, map[string]interface{}{
"id": t.ID,
"kind": "cron",
"cron_expression": t.CronExpression,
"next_run_at": t.NextRunAt,
"last_run_at": t.LastRunAt,
"enabled": t.Enabled,
})
}
}
writeJSON(w, http.StatusOK, map[string]interface{}{
"triggers": triggers,
"count": len(triggers),
"triggers": merged,
"count": len(merged),
})
}
// HandleDeleteTrigger handles DELETE /v1/functions/{name}/triggers/{triggerID}
// Removes a PubSub trigger.
// Removes either a PubSub or Cron trigger. Tries PubSub first (the more
// common case) and falls back to Cron — trigger IDs are UUIDs and can't
// collide between stores, so order is just an optimisation.
func (h *ServerlessHandlers) HandleDeleteTrigger(w http.ResponseWriter, r *http.Request, functionName, triggerID string) {
if h.triggerStore == nil {
writeError(w, http.StatusNotImplemented, "PubSub triggers not available")
if h.triggerStore == nil && h.cronStore == nil {
writeError(w, http.StatusNotImplemented, "Triggers not available")
return
}
@ -140,7 +200,6 @@ func (h *ServerlessHandlers) HandleDeleteTrigger(w http.ResponseWriter, r *http.
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
// Look up the trigger's topic before deleting (for cache invalidation)
fn, err := h.registry.Get(ctx, namespace, functionName, 0)
if err != nil {
if serverless.IsNotFound(err) {
@ -151,38 +210,47 @@ func (h *ServerlessHandlers) HandleDeleteTrigger(w http.ResponseWriter, r *http.
return
}
// Get current triggers to find the topic for cache invalidation
triggers, err := h.triggerStore.ListByFunction(ctx, fn.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, "Failed to look up triggers")
return
}
// Find the topic for the trigger being deleted
// Walk the PubSub list first to capture the topic for cache invalidation.
var triggerTopic string
for _, t := range triggers {
if t.ID == triggerID {
triggerTopic = t.Topic
break
if h.triggerStore != nil {
triggers, listErr := h.triggerStore.ListByFunction(ctx, fn.ID)
if listErr == nil {
for _, t := range triggers {
if t.ID == triggerID {
triggerTopic = t.Topic
break
}
}
}
}
if err := h.triggerStore.Remove(ctx, triggerID); err != nil {
writeError(w, http.StatusInternalServerError, "Failed to remove trigger: "+err.Error())
if triggerTopic != "" {
if err := h.triggerStore.Remove(ctx, triggerID); err != nil {
writeError(w, http.StatusInternalServerError, "Failed to remove trigger: "+err.Error())
return
}
if h.dispatcher != nil {
h.dispatcher.InvalidateCache(ctx, namespace, triggerTopic)
}
h.logger.Info("PubSub trigger removed via API",
zap.String("function", functionName),
zap.String("trigger_id", triggerID),
)
writeJSON(w, http.StatusOK, map[string]interface{}{"message": "Trigger removed"})
return
}
// Invalidate cache for the topic
if h.dispatcher != nil && triggerTopic != "" {
h.dispatcher.InvalidateCache(ctx, namespace, triggerTopic)
// Not a PubSub trigger — try cron.
if h.cronStore != nil {
if err := h.cronStore.Remove(ctx, triggerID); err == nil {
h.logger.Info("Cron trigger removed via API",
zap.String("function", functionName),
zap.String("trigger_id", triggerID),
)
writeJSON(w, http.StatusOK, map[string]interface{}{"message": "Trigger removed"})
return
}
}
h.logger.Info("PubSub trigger removed via API",
zap.String("function", functionName),
zap.String("trigger_id", triggerID),
)
writeJSON(w, http.StatusOK, map[string]interface{}{
"message": "Trigger removed",
})
writeError(w, http.StatusNotFound, "Trigger not found")
}

View File

@ -20,6 +20,7 @@ type ServerlessHandlers struct {
registry serverless.FunctionRegistry
wsManager *serverless.WSManager
triggerStore *triggers.PubSubTriggerStore
cronStore *triggers.CronTriggerStore // optional; nil = cron triggers unavailable
dispatcher *triggers.PubSubDispatcher
persistentMgr *persistent.Manager // optional; when nil persistent WS rejects 503
wsBridge *wsbridge.Bridge // optional; nil = no client→ns registration
@ -39,6 +40,7 @@ func NewServerlessHandlers(
registry serverless.FunctionRegistry,
wsManager *serverless.WSManager,
triggerStore *triggers.PubSubTriggerStore,
cronStore *triggers.CronTriggerStore,
dispatcher *triggers.PubSubDispatcher,
persistentMgr *persistent.Manager,
wsBridge *wsbridge.Bridge,
@ -51,6 +53,7 @@ func NewServerlessHandlers(
registry: registry,
wsManager: wsManager,
triggerStore: triggerStore,
cronStore: cronStore,
dispatcher: dispatcher,
persistentMgr: persistentMgr,
wsBridge: wsBridge,

View File

@ -29,6 +29,13 @@ func (g *Gateway) Close() {
}
}
// Stop the cron scheduler before tearing down the engine — pending
// invocations call back into the invoker which still needs the engine
// to be alive.
if g.cronScheduler != nil {
g.cronScheduler.Stop()
}
// Drain persistent WebSocket instances. Each instance gets a slice of
// the 30s budget; ws_close on each is best-effort.
if g.persistentWSManager != nil {

View File

@ -50,7 +50,7 @@ func TestServerlessHandlers_ListFunctions(t *testing.T) {
},
}
h := serverlesshandlers.NewServerlessHandlers(nil, nil, registry, nil, nil, nil, nil, nil, nil, logger)
h := serverlesshandlers.NewServerlessHandlers(nil, nil, registry, nil, nil, nil, nil, nil, nil, nil, logger)
req, _ := http.NewRequest("GET", "/v1/functions?namespace=ns1", nil)
rr := httptest.NewRecorder()
@ -73,7 +73,7 @@ func TestServerlessHandlers_DeployFunction(t *testing.T) {
logger := zap.NewNop()
registry := &mockFunctionRegistry{}
h := serverlesshandlers.NewServerlessHandlers(nil, nil, registry, nil, nil, nil, nil, nil, nil, logger)
h := serverlesshandlers.NewServerlessHandlers(nil, nil, registry, nil, nil, nil, nil, nil, nil, nil, logger)
// Test JSON deploy (which is partially supported according to code)
// Should be 400 because WASM is missing or base64 not supported

View File

@ -423,6 +423,7 @@ func (e *Engine) registerHostModule(ctx context.Context) error {
NewFunctionBuilder().WithFunc(e.hWSPubSubUnbridge).Export("ws_pubsub_unbridge").
NewFunctionBuilder().WithFunc(e.hWSSend).Export("ws_send").
NewFunctionBuilder().WithFunc(e.hWSBroadcast).Export("ws_broadcast").
NewFunctionBuilder().WithFunc(e.hFunctionInvoke).Export("function_invoke").
NewFunctionBuilder().WithFunc(e.hLogInfo).Export("log_info").
NewFunctionBuilder().WithFunc(e.hLogError).Export("log_error").
Instantiate(ctx)
@ -743,6 +744,39 @@ func (e *Engine) hWSPubSubUnbridge(ctx context.Context, mod api.Module,
return 1
}
// hFunctionInvoke is the WASM-callable wrapper for FunctionInvoke. Used by
// the rpc-router persistent function (and any future dispatcher) to run
// another function in the same namespace synchronously and forward its
// output back to its caller.
//
// Inputs:
//
// namePtr/nameLen — UTF-8 target function name
// payloadPtr/payloadLen — raw input bytes for the target function
//
// Returns a packed uint64 (ptr<<32 | len) pointing to the target's output
// bytes in guest memory, or 0 on error. The caller is expected to JSON-
// decode the output (target functions ack with JSON envelopes).
func (e *Engine) hFunctionInvoke(ctx context.Context, mod api.Module,
namePtr, nameLen, payloadPtr, payloadLen uint32) uint64 {
name, ok := e.executor.ReadFromGuest(mod, namePtr, nameLen)
if !ok {
return 0
}
payload, ok := e.executor.ReadFromGuest(mod, payloadPtr, payloadLen)
if !ok {
return 0
}
out, err := e.hostServices.FunctionInvoke(ctx, string(name), payload)
if err != nil {
e.logger.Warn("function_invoke failed",
zap.String("name", string(name)),
zap.Error(err))
return 0
}
return e.executor.WriteToGuest(ctx, mod, out)
}
// hWSSend is the WASM-callable wrapper for WSSend.
// Inputs: clientID + raw frame bytes. clientID may be empty — in that case
// the host falls back to the current invocation's WS client (if any).

View File

@ -55,6 +55,11 @@ var (
// ErrWSNotAvailable is returned when WebSocket operations are used outside WS context.
ErrWSNotAvailable = errors.New("websocket operations not available in this context")
// ErrFunctionInvokeNotAvailable is returned when FunctionInvoke is called
// but no invoker has been wired into the host-services bag (e.g. unit
// tests, or before the gateway finishes bootstrap).
ErrFunctionInvokeNotAvailable = errors.New("function_invoke not available in this context")
// ErrWSClientNotFound is returned when a WebSocket client is not connected.
ErrWSClientNotFound = errors.New("websocket client not found")

View File

@ -122,6 +122,10 @@ func (m *mockHostServices) WSBroadcast(ctx context.Context, topic string, data [
return nil
}
func (m *mockHostServices) FunctionInvoke(ctx context.Context, name string, payload []byte) ([]byte, error) {
return nil, nil
}
func (m *mockHostServices) HTTPFetch(ctx context.Context, method, url string, headers map[string]string, body []byte) ([]byte, error) {
return nil, nil
}

View File

@ -31,6 +31,59 @@ func (h *HostFunctions) ClearContext() {
h.invCtx = nil
}
// SetInvoker wires the function invoker used by FunctionInvoke. Must be
// called once after both HostFunctions and Invoker exist (Invoker depends
// on HostServices, so the cycle is broken via this setter rather than a
// constructor argument).
func (h *HostFunctions) SetInvoker(inv serverless.FunctionInvoker) {
h.invokerLock.Lock()
defer h.invokerLock.Unlock()
h.invoker = inv
}
// FunctionInvoke synchronously runs another function in the same namespace
// and returns its output bytes. Caller wallet, JWT claims, and WS client
// ID are inherited from the current invocation so the inner function sees
// the same authenticated identity. Returns ErrFunctionInvokeNotAvailable
// when no invoker has been wired (e.g. tests).
func (h *HostFunctions) FunctionInvoke(ctx context.Context, name string, payload []byte) ([]byte, error) {
h.invokerLock.RLock()
inv := h.invoker
h.invokerLock.RUnlock()
if inv == nil {
return nil, &serverless.HostFunctionError{
Function: "function_invoke",
Cause: serverless.ErrFunctionInvokeNotAvailable,
}
}
h.invCtxLock.RLock()
cur := h.invCtx
h.invCtxLock.RUnlock()
if cur == nil {
return nil, &serverless.HostFunctionError{
Function: "function_invoke",
Cause: serverless.ErrFunctionInvokeNotAvailable,
}
}
req := &serverless.InvokeRequest{
Namespace: cur.Namespace,
FunctionName: name,
Input: payload,
TriggerType: serverless.TriggerTypeWebSocket,
CallerWallet: cur.CallerWallet,
CallerIP: cur.CallerIP,
WSClientID: cur.WSClientID,
CallerClaims: cur.CallerClaims,
}
resp, err := inv.Invoke(ctx, req)
if err != nil {
return nil, &serverless.HostFunctionError{Function: "function_invoke", Cause: err}
}
return resp.Output, nil
}
// GetEnv retrieves an environment variable for the function.
func (h *HostFunctions) GetEnv(ctx context.Context, key string) (string, error) {
h.invCtxLock.RLock()

View File

@ -43,6 +43,12 @@ type HostFunctions struct {
// — bridging is a deliberate request whose absence should be visible.
wsBridge *wsbridge.Bridge
// invoker is set after construction (via SetInvoker) to break the
// engine ↔ host-functions circular dep. nil means FunctionInvoke
// returns ErrFunctionInvokeNotAvailable.
invoker serverless.FunctionInvoker
invokerLock sync.RWMutex
// Current invocation context (set per-execution)
invCtx *serverless.InvocationContext
invCtxLock sync.RWMutex

View File

@ -11,6 +11,16 @@ import (
"go.uber.org/zap"
)
// FunctionInvoker is the minimal interface needed to invoke a function by
// name. It exists so packages downstream of `serverless` (notably
// `serverless/hostfunctions`) can hold a reference to the concrete
// `*Invoker` without creating an import cycle.
//
// Implemented by `*Invoker`.
type FunctionInvoker interface {
Invoke(ctx context.Context, req *InvokeRequest) (*InvokeResponse, error)
}
// Invoker handles function invocation with retry logic and DLQ support.
// It wraps the Engine to provide higher-level invocation semantics.
type Invoker struct {

View File

@ -208,6 +208,10 @@ func (m *MockHostServices) WSBroadcast(ctx context.Context, topic string, data [
return nil
}
func (m *MockHostServices) FunctionInvoke(ctx context.Context, name string, payload []byte) ([]byte, error) {
return nil, nil
}
func (m *MockHostServices) HTTPFetch(ctx context.Context, method, url string, headers map[string]string, body []byte) ([]byte, error) {
return nil, nil
}

View File

@ -0,0 +1,211 @@
package triggers
import (
"fmt"
"strconv"
"strings"
"time"
)
// CronExpression is a parsed cron expression that can compute the next
// occurrence after an arbitrary instant.
//
// Two layouts are accepted:
//
// 5 fields: minute hour day-of-month month day-of-week
// 6 fields: second minute hour day-of-month month day-of-week
//
// Each field supports `*`, single integers, comma-separated lists,
// `a-b` ranges, and `*/n` step expressions. Day-of-week values are
// 0-6 with 0 = Sunday; 7 is normalised to 0. Month values are 1-12.
//
// Schedules are treated as wall-clock UTC; that matches the gateway's
// time.Now() — no per-namespace timezone configuration today.
type CronExpression struct {
hasSeconds bool
seconds fieldMatcher // 0-59 (only when hasSeconds)
minutes fieldMatcher // 0-59
hours fieldMatcher // 0-23
dom fieldMatcher // 1-31
month fieldMatcher // 1-12
dow fieldMatcher // 0-6 (Sunday = 0)
expr string
}
// fieldMatcher is a bitmask over the legal value range for a cron field.
// Up to 64 entries — the largest range we model is seconds/minutes
// (0..59), comfortably below 64.
type fieldMatcher uint64
func (f fieldMatcher) match(v int) bool {
if v < 0 || v >= 64 {
return false
}
return f&(1<<uint(v)) != 0
}
// ParseCron parses a 5- or 6-field cron expression. Returns a non-nil
// error on any malformed field; otherwise returns a CronExpression
// usable across goroutines (immutable after construction).
func ParseCron(expr string) (*CronExpression, error) {
expr = strings.TrimSpace(expr)
if expr == "" {
return nil, fmt.Errorf("cron: empty expression")
}
parts := strings.Fields(expr)
c := &CronExpression{expr: expr}
switch len(parts) {
case 5:
c.hasSeconds = false
case 6:
c.hasSeconds = true
default:
return nil, fmt.Errorf("cron: expected 5 or 6 fields, got %d in %q", len(parts), expr)
}
idx := 0
if c.hasSeconds {
s, err := parseField(parts[idx], 0, 59)
if err != nil {
return nil, fmt.Errorf("cron seconds: %w", err)
}
c.seconds = s
idx++
}
min, err := parseField(parts[idx], 0, 59)
if err != nil {
return nil, fmt.Errorf("cron minutes: %w", err)
}
c.minutes = min
idx++
hr, err := parseField(parts[idx], 0, 23)
if err != nil {
return nil, fmt.Errorf("cron hours: %w", err)
}
c.hours = hr
idx++
dom, err := parseField(parts[idx], 1, 31)
if err != nil {
return nil, fmt.Errorf("cron day-of-month: %w", err)
}
c.dom = dom
idx++
mo, err := parseField(parts[idx], 1, 12)
if err != nil {
return nil, fmt.Errorf("cron month: %w", err)
}
c.month = mo
idx++
dow, err := parseField(parts[idx], 0, 7)
if err != nil {
return nil, fmt.Errorf("cron day-of-week: %w", err)
}
// Normalise Sunday: cron permits 0 OR 7.
if dow.match(7) {
dow |= 1
}
dow &= 0x7F // clamp to 0..6 bits
c.dow = dow
return c, nil
}
// parseField builds a bitmask over [lo, hi] inclusive from a single cron
// field. Accepts: "*", "n", "a,b,c", "a-b", "a-b/n", "*/n".
func parseField(s string, lo, hi int) (fieldMatcher, error) {
if s == "" {
return 0, fmt.Errorf("empty field")
}
var mask fieldMatcher
for _, segment := range strings.Split(s, ",") {
seg := strings.TrimSpace(segment)
step := 1
if i := strings.Index(seg, "/"); i >= 0 {
n, err := strconv.Atoi(seg[i+1:])
if err != nil || n <= 0 {
return 0, fmt.Errorf("bad step in %q", seg)
}
step = n
seg = seg[:i]
}
var rangeLo, rangeHi int
switch {
case seg == "*":
rangeLo, rangeHi = lo, hi
case strings.Contains(seg, "-"):
parts := strings.SplitN(seg, "-", 2)
a, err1 := strconv.Atoi(parts[0])
b, err2 := strconv.Atoi(parts[1])
if err1 != nil || err2 != nil {
return 0, fmt.Errorf("bad range %q", seg)
}
rangeLo, rangeHi = a, b
default:
n, err := strconv.Atoi(seg)
if err != nil {
return 0, fmt.Errorf("bad value %q", seg)
}
rangeLo, rangeHi = n, n
}
if rangeLo < lo || rangeHi > hi || rangeLo > rangeHi {
return 0, fmt.Errorf("value %d-%d out of range [%d,%d]", rangeLo, rangeHi, lo, hi)
}
for v := rangeLo; v <= rangeHi; v += step {
if v >= 64 {
continue
}
mask |= 1 << uint(v)
}
}
return mask, nil
}
// Next returns the smallest time strictly after `after` that matches the
// expression. The search is bounded to a few years out — schedules that
// can never match (e.g. day-of-month=31 with month=Feb) return a non-nil
// error rather than looping forever.
func (c *CronExpression) Next(after time.Time) (time.Time, error) {
t := after.UTC()
if c.hasSeconds {
t = t.Add(time.Second).Truncate(time.Second)
} else {
t = t.Add(time.Minute).Truncate(time.Minute)
}
// Search horizon: 5 years. A valid expression matches well within
// this window; pathological ones (impossible date combos) are caught
// by this bound.
deadline := after.Add(5 * 365 * 24 * time.Hour)
for t.Before(deadline) {
if !c.month.match(int(t.Month())) {
// Jump to the first of the next month.
t = time.Date(t.Year(), t.Month()+1, 1, 0, 0, 0, 0, time.UTC)
continue
}
if !c.dom.match(t.Day()) || !c.dow.match(int(t.Weekday())) {
t = time.Date(t.Year(), t.Month(), t.Day()+1, 0, 0, 0, 0, time.UTC)
continue
}
if !c.hours.match(t.Hour()) {
t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour()+1, 0, 0, 0, time.UTC)
continue
}
if !c.minutes.match(t.Minute()) {
t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute()+1, 0, 0, time.UTC)
continue
}
if c.hasSeconds && !c.seconds.match(t.Second()) {
t = t.Add(time.Second)
continue
}
return t, nil
}
return time.Time{}, fmt.Errorf("cron: no match within 5 years for %q", c.expr)
}
// String returns the original expression as parsed.
func (c *CronExpression) String() string { return c.expr }

View File

@ -0,0 +1,168 @@
package triggers
import (
"context"
"sync"
"time"
"github.com/DeBrosOfficial/network/pkg/serverless"
"go.uber.org/zap"
)
// CronInvoker is the subset of the gateway's serverless.Invoker that the
// scheduler uses. Stated as an interface so tests can swap it out.
type CronInvoker interface {
Invoke(ctx context.Context, req *serverless.InvokeRequest) (*serverless.InvokeResponse, error)
}
// CronScheduler is a single goroutine that periodically scans the
// function_cron_triggers table for due rows and invokes the matching
// functions.
//
// One scheduler per gateway instance is sufficient: the work is bounded
// by the number of cron triggers (small) and per-tick polling is cheap.
// If multiple gateway instances run concurrently, MarkRun's "next_run_at
// = computed_next" UPDATE acts as a soft lease — duplicate invocations
// are reduced to a tight race window. For stricter exactly-once we'd
// need an explicit lease table; deferred until needed.
type CronScheduler struct {
store *CronTriggerStore
invoker CronInvoker
logger *zap.Logger
pollInterval time.Duration
batchLimit int
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewCronScheduler builds a scheduler. Reasonable defaults: poll every
// 30 seconds, dispatch up to 100 triggers per tick.
func NewCronScheduler(
store *CronTriggerStore,
invoker CronInvoker,
logger *zap.Logger,
pollInterval time.Duration,
) *CronScheduler {
if pollInterval <= 0 {
pollInterval = 30 * time.Second
}
return &CronScheduler{
store: store,
invoker: invoker,
logger: logger,
pollInterval: pollInterval,
batchLimit: 100,
}
}
// Start launches the polling goroutine. Idempotent: a second Start while
// already running is a no-op.
func (s *CronScheduler) Start(ctx context.Context) {
if s.cancel != nil {
return
}
runCtx, cancel := context.WithCancel(ctx)
s.cancel = cancel
s.wg.Add(1)
go s.loop(runCtx)
s.logger.Info("cron scheduler started", zap.Duration("poll_interval", s.pollInterval))
}
// Stop cancels the goroutine and waits for it to exit. Safe to call
// multiple times.
func (s *CronScheduler) Stop() {
if s.cancel == nil {
return
}
s.cancel()
s.cancel = nil
s.wg.Wait()
s.logger.Info("cron scheduler stopped")
}
func (s *CronScheduler) loop(ctx context.Context) {
defer s.wg.Done()
ticker := time.NewTicker(s.pollInterval)
defer ticker.Stop()
// Run an immediate tick so a freshly-booted gateway picks up triggers
// that fired during the downtime, instead of waiting `pollInterval`.
s.tick(ctx)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.tick(ctx)
}
}
}
func (s *CronScheduler) tick(ctx context.Context) {
now := time.Now().UTC()
due, err := s.store.ListDue(ctx, now, s.batchLimit)
if err != nil {
s.logger.Warn("cron scheduler: ListDue failed", zap.Error(err))
return
}
if len(due) == 0 {
return
}
for _, row := range due {
if ctx.Err() != nil {
return
}
s.dispatch(ctx, row, now)
}
}
func (s *CronScheduler) dispatch(ctx context.Context, row CronDueRow, now time.Time) {
// Compute the next run BEFORE invoking so we always advance the cursor
// even if the invoke errors. This prevents a busted handler from
// starving the scheduler.
parsed, err := ParseCron(row.CronExpression)
if err != nil {
s.logger.Warn("cron scheduler: bad expression — disabling trigger",
zap.String("trigger_id", row.TriggerID),
zap.String("expr", row.CronExpression),
zap.Error(err))
// Push next_run_at far out so we don't keep looking at this row.
_ = s.store.MarkRun(ctx, row.TriggerID, now, now.Add(365*24*time.Hour),
"error", "bad cron expression: "+err.Error())
return
}
next, err := parsed.Next(now)
if err != nil {
_ = s.store.MarkRun(ctx, row.TriggerID, now, now.Add(365*24*time.Hour),
"error", "no next match: "+err.Error())
return
}
req := &serverless.InvokeRequest{
Namespace: row.Namespace,
FunctionName: row.FunctionName,
Input: []byte(`{"trigger":"cron"}`),
TriggerType: serverless.TriggerTypeCron,
}
resp, invErr := s.invoker.Invoke(ctx, req)
status := "ok"
errMsg := ""
if invErr != nil {
status = "error"
errMsg = invErr.Error()
s.logger.Warn("cron scheduler: invoke failed",
zap.String("trigger_id", row.TriggerID),
zap.String("function", row.FunctionName),
zap.String("namespace", row.Namespace),
zap.Error(invErr))
} else if resp != nil && resp.Status != serverless.InvocationStatusSuccess {
status = "error"
errMsg = resp.Error
}
if err := s.store.MarkRun(ctx, row.TriggerID, now, next, status, errMsg); err != nil {
s.logger.Warn("cron scheduler: MarkRun failed",
zap.String("trigger_id", row.TriggerID),
zap.Error(err))
}
}

View File

@ -0,0 +1,196 @@
package triggers
import (
"context"
"fmt"
"time"
"github.com/DeBrosOfficial/network/pkg/rqlite"
"github.com/DeBrosOfficial/network/pkg/serverless"
"github.com/google/uuid"
"go.uber.org/zap"
)
// CronTriggerStore manages cron-trigger persistence in RQLite. Reads/writes
// the function_cron_triggers table created by migration 004.
//
// Each row carries the cron expression, last/next run timestamps, last
// status / error, and an enabled flag — that's all the scheduler needs to
// pick due triggers up. Function metadata (name + namespace) is JOINed at
// dispatch time.
type CronTriggerStore struct {
db rqlite.Client
logger *zap.Logger
}
// NewCronTriggerStore creates a new cron trigger store.
func NewCronTriggerStore(db rqlite.Client, logger *zap.Logger) *CronTriggerStore {
return &CronTriggerStore{db: db, logger: logger}
}
// cronRow maps to function_cron_triggers for query scanning.
type cronRow struct {
ID string
FunctionID string
CronExpression string
NextRunAt time.Time
LastRunAt *time.Time
LastStatus *string
LastError *string
Enabled bool
CreatedAt time.Time
}
// CronDueRow is the JOINed row returned by ListDue: trigger + the function
// metadata the scheduler needs to invoke it.
type CronDueRow struct {
TriggerID string
FunctionID string
FunctionName string
Namespace string
CronExpression string
NextRunAt time.Time
}
// Add registers a new cron trigger. Validates the expression up front so
// callers get an immediate error instead of discovering it at scheduler
// firing time.
func (s *CronTriggerStore) Add(ctx context.Context, functionID, expr string) (string, error) {
if functionID == "" {
return "", fmt.Errorf("function ID required")
}
parsed, err := ParseCron(expr)
if err != nil {
return "", err
}
now := time.Now().UTC()
next, err := parsed.Next(now)
if err != nil {
return "", err
}
id := uuid.New().String()
query := `
INSERT INTO function_cron_triggers
(id, function_id, cron_expression, next_run_at, enabled, created_at)
VALUES (?, ?, ?, ?, TRUE, ?)
`
if _, err := s.db.Exec(ctx, query, id, functionID, expr, next, now); err != nil {
return "", fmt.Errorf("failed to add cron trigger: %w", err)
}
s.logger.Info("Cron trigger added",
zap.String("trigger_id", id),
zap.String("function_id", functionID),
zap.String("expr", expr),
zap.Time("next_run_at", next),
)
return id, nil
}
// Remove deletes a trigger by ID.
func (s *CronTriggerStore) Remove(ctx context.Context, triggerID string) error {
if triggerID == "" {
return fmt.Errorf("trigger ID required")
}
query := `DELETE FROM function_cron_triggers WHERE id = ?`
res, err := s.db.Exec(ctx, query, triggerID)
if err != nil {
return fmt.Errorf("failed to remove cron trigger: %w", err)
}
if affected, _ := res.RowsAffected(); affected == 0 {
return fmt.Errorf("trigger not found: %s", triggerID)
}
return nil
}
// RemoveByFunction wipes every cron trigger for a function. Used during
// re-deploy so stale schedules don't survive a function rewrite.
func (s *CronTriggerStore) RemoveByFunction(ctx context.Context, functionID string) error {
if functionID == "" {
return fmt.Errorf("function ID required")
}
_, err := s.db.Exec(ctx, `DELETE FROM function_cron_triggers WHERE function_id = ?`, functionID)
return err
}
// ListByFunction returns every cron trigger for a function. Used by the
// CLI's `triggers list` and the gateway's HandleListTriggers.
func (s *CronTriggerStore) ListByFunction(ctx context.Context, functionID string) ([]serverless.CronTrigger, error) {
if functionID == "" {
return nil, fmt.Errorf("function ID required")
}
query := `
SELECT id, function_id, cron_expression, next_run_at, last_run_at,
last_status, last_error, enabled, created_at
FROM function_cron_triggers
WHERE function_id = ?
`
var rows []cronRow
if err := s.db.Query(ctx, &rows, query, functionID); err != nil {
return nil, fmt.Errorf("failed to list cron triggers: %w", err)
}
out := make([]serverless.CronTrigger, len(rows))
for i, r := range rows {
nextRunAt := r.NextRunAt // copy so &local doesn't capture the loop var
t := serverless.CronTrigger{
ID: r.ID,
FunctionID: r.FunctionID,
CronExpression: r.CronExpression,
NextRunAt: &nextRunAt,
Enabled: r.Enabled,
}
if r.LastRunAt != nil {
lastRunAt := *r.LastRunAt
t.LastRunAt = &lastRunAt
}
out[i] = t
}
return out, nil
}
// ListDue returns enabled triggers whose next_run_at has elapsed, joined
// with the owning function so the scheduler can invoke them without an
// extra registry round-trip per trigger. Bounded by `limit` to avoid
// unbounded scan of a backlog after a long outage.
func (s *CronTriggerStore) ListDue(ctx context.Context, now time.Time, limit int) ([]CronDueRow, error) {
if limit <= 0 {
limit = 100
}
query := `
SELECT t.id AS trigger_id, t.function_id AS function_id,
f.name AS function_name, f.namespace AS namespace,
t.cron_expression AS cron_expression, t.next_run_at AS next_run_at
FROM function_cron_triggers t
JOIN functions f ON t.function_id = f.id
WHERE t.enabled = TRUE
AND f.status = 'active'
AND t.next_run_at <= ?
ORDER BY t.next_run_at ASC
LIMIT ?
`
var rows []CronDueRow
if err := s.db.Query(ctx, &rows, query, now, limit); err != nil {
return nil, fmt.Errorf("failed to query due cron triggers: %w", err)
}
return rows, nil
}
// MarkRun updates next_run_at + last_run_at + last_status / last_error
// after the scheduler invokes a trigger. Idempotent: if the row was
// removed concurrently, the UPDATE is a no-op.
func (s *CronTriggerStore) MarkRun(
ctx context.Context,
triggerID string,
ranAt time.Time,
nextRunAt time.Time,
status string,
errMsg string,
) error {
query := `
UPDATE function_cron_triggers
SET last_run_at = ?, next_run_at = ?, last_status = ?, last_error = ?
WHERE id = ?
`
_, err := s.db.Exec(ctx, query, ranAt, nextRunAt, status, errMsg, triggerID)
return err
}

View File

@ -421,6 +421,19 @@ type HostServices interface {
WSSend(ctx context.Context, clientID string, data []byte) error
WSBroadcast(ctx context.Context, topic string, data []byte) error
// FunctionInvoke synchronously invokes another function in the same
// namespace from inside a function (e.g. a persistent rpc-router
// dispatching client RPCs to per-op handlers). The caller's wallet,
// JWT claims, and WS client ID are inherited so the invoked function
// sees the same authenticated identity as the outer call.
//
// `name` is the target function name; `payload` is the raw input bytes
// to feed the function (typically JSON). Returns the function's output
// bytes on success. Errors (not found, unauthorized, runtime) are
// returned as Go errors and the caller should surface them as
// rpc_error to the client.
FunctionInvoke(ctx context.Context, name string, payload []byte) ([]byte, error)
// HTTP operations
HTTPFetch(ctx context.Context, method, url string, headers map[string]string, body []byte) ([]byte, error)