mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 03:33:01 +00:00
feat: implement PubSub trigger management with API endpoints for adding, listing, and removing triggers
This commit is contained in:
parent
e4d51676cc
commit
a79ae41dd5
2
Makefile
2
Makefile
@ -63,7 +63,7 @@ test-e2e-quick:
|
||||
|
||||
.PHONY: build clean test deps tidy fmt vet lint install-hooks redeploy-devnet redeploy-testnet release health
|
||||
|
||||
VERSION := 0.111.0
|
||||
VERSION := 0.112.0
|
||||
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
|
||||
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
|
||||
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
|
||||
|
||||
@ -22,6 +22,7 @@ import (
|
||||
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless"
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless/hostfunctions"
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless/triggers"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
olriclib "github.com/olric-data/olric"
|
||||
"go.uber.org/zap"
|
||||
@ -59,6 +60,9 @@ type Dependencies struct {
|
||||
ServerlessWSMgr *serverless.WSManager
|
||||
ServerlessHandlers *serverlesshandlers.ServerlessHandlers
|
||||
|
||||
// PubSub trigger dispatcher (used to wire into PubSubHandlers)
|
||||
PubSubDispatcher *triggers.PubSubDispatcher
|
||||
|
||||
// Authentication service
|
||||
AuthService *auth.Service
|
||||
}
|
||||
@ -434,11 +438,27 @@ func initializeServerless(logger *logging.ColoredLogger, cfg *Config, deps *Depe
|
||||
// Create invoker
|
||||
deps.ServerlessInvoker = serverless.NewInvoker(engine, registry, hostFuncs, logger.Logger)
|
||||
|
||||
// Create PubSub trigger store and dispatcher
|
||||
triggerStore := triggers.NewPubSubTriggerStore(deps.ORMClient, logger.Logger)
|
||||
|
||||
var olricUnderlying olriclib.Client
|
||||
if deps.OlricClient != nil {
|
||||
olricUnderlying = deps.OlricClient.UnderlyingClient()
|
||||
}
|
||||
deps.PubSubDispatcher = triggers.NewPubSubDispatcher(
|
||||
triggerStore,
|
||||
deps.ServerlessInvoker,
|
||||
olricUnderlying,
|
||||
logger.Logger,
|
||||
)
|
||||
|
||||
// Create HTTP handlers
|
||||
deps.ServerlessHandlers = serverlesshandlers.NewServerlessHandlers(
|
||||
deps.ServerlessInvoker,
|
||||
registry,
|
||||
deps.ServerlessWSMgr,
|
||||
triggerStore,
|
||||
deps.PubSubDispatcher,
|
||||
logger.Logger,
|
||||
)
|
||||
|
||||
|
||||
@ -330,6 +330,13 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) {
|
||||
// Initialize handler instances
|
||||
gw.pubsubHandlers = pubsubhandlers.NewPubSubHandlers(deps.Client, logger)
|
||||
|
||||
// Wire PubSub trigger dispatch if serverless is available
|
||||
if deps.PubSubDispatcher != nil {
|
||||
gw.pubsubHandlers.SetOnPublish(func(ctx context.Context, namespace, topic string, data []byte) {
|
||||
deps.PubSubDispatcher.Dispatch(ctx, namespace, topic, data, 0)
|
||||
})
|
||||
}
|
||||
|
||||
if cfg.WebRTCEnabled && cfg.SFUPort > 0 {
|
||||
gw.webrtcHandlers = webrtchandlers.NewWebRTCHandlers(
|
||||
logger,
|
||||
|
||||
@ -67,6 +67,11 @@ func (p *PubSubHandlers) PublishHandler(w http.ResponseWriter, r *http.Request)
|
||||
zap.Int("local_subscribers", len(localSubs)),
|
||||
zap.Int("local_delivered", localDeliveryCount))
|
||||
|
||||
// Fire PubSub triggers for serverless functions (non-blocking)
|
||||
if p.onPublish != nil {
|
||||
go p.onPublish(context.Background(), ns, body.Topic, data)
|
||||
}
|
||||
|
||||
// Publish to libp2p asynchronously for cross-node delivery
|
||||
// This prevents blocking the HTTP response if libp2p network is slow
|
||||
go func() {
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package pubsub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
@ -19,6 +20,16 @@ type PubSubHandlers struct {
|
||||
presenceMembers map[string][]PresenceMember // topicKey -> members
|
||||
mu sync.RWMutex
|
||||
presenceMu sync.RWMutex
|
||||
|
||||
// onPublish is called when a message is published, to dispatch PubSub triggers.
|
||||
// Set via SetOnPublish. May be nil if serverless triggers are not configured.
|
||||
onPublish func(ctx context.Context, namespace, topic string, data []byte)
|
||||
}
|
||||
|
||||
// SetOnPublish sets the callback invoked when messages are published.
|
||||
// Used to wire PubSub trigger dispatch from the serverless engine.
|
||||
func (p *PubSubHandlers) SetOnPublish(fn func(ctx context.Context, namespace, topic string, data []byte)) {
|
||||
p.onPublish = fn
|
||||
}
|
||||
|
||||
// NewPubSubHandlers creates a new PubSubHandlers instance
|
||||
|
||||
@ -154,6 +154,20 @@ func (h *ServerlessHandlers) DeployFunction(w http.ResponseWriter, r *http.Reque
|
||||
return
|
||||
}
|
||||
|
||||
// Register PubSub triggers from definition (deploy-time auto-registration)
|
||||
if h.triggerStore != nil && len(def.PubSubTopics) > 0 && fn != nil {
|
||||
_ = h.triggerStore.RemoveByFunction(ctx, fn.ID)
|
||||
for _, topic := range def.PubSubTopics {
|
||||
if _, err := h.triggerStore.Add(ctx, fn.ID, topic); err != nil {
|
||||
h.logger.Warn("Failed to register pubsub trigger",
|
||||
zap.String("topic", topic),
|
||||
zap.Error(err))
|
||||
} else if h.dispatcher != nil {
|
||||
h.dispatcher.InvalidateCache(ctx, def.Namespace, topic)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusCreated, map[string]interface{}{
|
||||
"message": "Function deployed successfully",
|
||||
"function": fn,
|
||||
|
||||
@ -92,6 +92,8 @@ func newTestHandlers(reg serverless.FunctionRegistry) *ServerlessHandlers {
|
||||
nil, // invoker is nil — we only test paths that don't reach it
|
||||
reg,
|
||||
wsManager,
|
||||
nil, // triggerStore
|
||||
nil, // dispatcher
|
||||
logger,
|
||||
)
|
||||
}
|
||||
|
||||
@ -30,14 +30,17 @@ func (h *ServerlessHandlers) handleFunctions(w http.ResponseWriter, r *http.Requ
|
||||
|
||||
// handleFunctionByName handles operations on a specific function
|
||||
// Routes:
|
||||
// - GET /v1/functions/{name} - Get function info
|
||||
// - DELETE /v1/functions/{name} - Delete function
|
||||
// - POST /v1/functions/{name}/invoke - Invoke function
|
||||
// - GET /v1/functions/{name}/versions - List versions
|
||||
// - GET /v1/functions/{name}/logs - Get logs
|
||||
// - WS /v1/functions/{name}/ws - WebSocket invoke
|
||||
// - GET /v1/functions/{name} - Get function info
|
||||
// - DELETE /v1/functions/{name} - Delete function
|
||||
// - POST /v1/functions/{name}/invoke - Invoke function
|
||||
// - GET /v1/functions/{name}/versions - List versions
|
||||
// - GET /v1/functions/{name}/logs - Get logs
|
||||
// - WS /v1/functions/{name}/ws - WebSocket invoke
|
||||
// - POST /v1/functions/{name}/triggers - Add trigger
|
||||
// - GET /v1/functions/{name}/triggers - List triggers
|
||||
// - DELETE /v1/functions/{name}/triggers/{id} - Remove trigger
|
||||
func (h *ServerlessHandlers) handleFunctionByName(w http.ResponseWriter, r *http.Request) {
|
||||
// Parse path: /v1/functions/{name}[/{action}]
|
||||
// Parse path: /v1/functions/{name}[/{action}[/{subID}]]
|
||||
path := strings.TrimPrefix(r.URL.Path, "/v1/functions/")
|
||||
parts := strings.SplitN(path, "/", 2)
|
||||
|
||||
@ -62,6 +65,13 @@ func (h *ServerlessHandlers) handleFunctionByName(w http.ResponseWriter, r *http
|
||||
}
|
||||
}
|
||||
|
||||
// Handle triggers sub-path: "triggers" or "triggers/{triggerID}"
|
||||
triggerID := ""
|
||||
if strings.HasPrefix(action, "triggers/") {
|
||||
triggerID = strings.TrimPrefix(action, "triggers/")
|
||||
action = "triggers"
|
||||
}
|
||||
|
||||
switch action {
|
||||
case "invoke":
|
||||
h.InvokeFunction(w, r, name, version)
|
||||
@ -71,6 +81,17 @@ func (h *ServerlessHandlers) handleFunctionByName(w http.ResponseWriter, r *http
|
||||
h.ListVersions(w, r, name)
|
||||
case "logs":
|
||||
h.GetFunctionLogs(w, r, name)
|
||||
case "triggers":
|
||||
switch {
|
||||
case triggerID != "" && r.Method == http.MethodDelete:
|
||||
h.HandleDeleteTrigger(w, r, name, triggerID)
|
||||
case r.Method == http.MethodPost:
|
||||
h.HandleAddTrigger(w, r, name)
|
||||
case r.Method == http.MethodGet:
|
||||
h.HandleListTriggers(w, r, name)
|
||||
default:
|
||||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
}
|
||||
case "":
|
||||
switch r.Method {
|
||||
case http.MethodGet:
|
||||
|
||||
188
pkg/gateway/handlers/serverless/trigger_handler.go
Normal file
188
pkg/gateway/handlers/serverless/trigger_handler.go
Normal file
@ -0,0 +1,188 @@
|
||||
package serverless
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// addTriggerRequest is the request body for adding a PubSub trigger.
|
||||
type addTriggerRequest struct {
|
||||
Topic string `json:"topic"`
|
||||
}
|
||||
|
||||
// HandleAddTrigger handles POST /v1/functions/{name}/triggers
|
||||
// Adds a PubSub trigger that invokes this function when a message is published to the topic.
|
||||
func (h *ServerlessHandlers) HandleAddTrigger(w http.ResponseWriter, r *http.Request, functionName string) {
|
||||
if h.triggerStore == nil {
|
||||
writeError(w, http.StatusNotImplemented, "PubSub triggers not available")
|
||||
return
|
||||
}
|
||||
|
||||
namespace := h.getNamespaceFromRequest(r)
|
||||
if namespace == "" {
|
||||
writeError(w, http.StatusBadRequest, "namespace required")
|
||||
return
|
||||
}
|
||||
|
||||
var req addTriggerRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "Invalid JSON: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if req.Topic == "" {
|
||||
writeError(w, http.StatusBadRequest, "topic required")
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Look up function to get its ID
|
||||
fn, err := h.registry.Get(ctx, namespace, functionName, 0)
|
||||
if err != nil {
|
||||
if serverless.IsNotFound(err) {
|
||||
writeError(w, http.StatusNotFound, "Function not found")
|
||||
} else {
|
||||
writeError(w, http.StatusInternalServerError, "Failed to look up function")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
triggerID, err := h.triggerStore.Add(ctx, fn.ID, req.Topic)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to add PubSub trigger",
|
||||
zap.String("function", functionName),
|
||||
zap.String("topic", req.Topic),
|
||||
zap.Error(err),
|
||||
)
|
||||
writeError(w, http.StatusInternalServerError, "Failed to add trigger: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Invalidate cache for this topic
|
||||
if h.dispatcher != nil {
|
||||
h.dispatcher.InvalidateCache(ctx, namespace, req.Topic)
|
||||
}
|
||||
|
||||
h.logger.Info("PubSub trigger added via API",
|
||||
zap.String("function", functionName),
|
||||
zap.String("topic", req.Topic),
|
||||
zap.String("trigger_id", triggerID),
|
||||
)
|
||||
|
||||
writeJSON(w, http.StatusCreated, map[string]interface{}{
|
||||
"trigger_id": triggerID,
|
||||
"function": functionName,
|
||||
"topic": req.Topic,
|
||||
})
|
||||
}
|
||||
|
||||
// HandleListTriggers handles GET /v1/functions/{name}/triggers
|
||||
// Lists all PubSub triggers for a function.
|
||||
func (h *ServerlessHandlers) HandleListTriggers(w http.ResponseWriter, r *http.Request, functionName string) {
|
||||
if h.triggerStore == nil {
|
||||
writeError(w, http.StatusNotImplemented, "PubSub triggers not available")
|
||||
return
|
||||
}
|
||||
|
||||
namespace := h.getNamespaceFromRequest(r)
|
||||
if namespace == "" {
|
||||
writeError(w, http.StatusBadRequest, "namespace required")
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Look up function to get its ID
|
||||
fn, err := h.registry.Get(ctx, namespace, functionName, 0)
|
||||
if err != nil {
|
||||
if serverless.IsNotFound(err) {
|
||||
writeError(w, http.StatusNotFound, "Function not found")
|
||||
} else {
|
||||
writeError(w, http.StatusInternalServerError, "Failed to look up function")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
triggers, err := h.triggerStore.ListByFunction(ctx, fn.ID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "Failed to list triggers")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]interface{}{
|
||||
"triggers": triggers,
|
||||
"count": len(triggers),
|
||||
})
|
||||
}
|
||||
|
||||
// HandleDeleteTrigger handles DELETE /v1/functions/{name}/triggers/{triggerID}
|
||||
// Removes a PubSub trigger.
|
||||
func (h *ServerlessHandlers) HandleDeleteTrigger(w http.ResponseWriter, r *http.Request, functionName, triggerID string) {
|
||||
if h.triggerStore == nil {
|
||||
writeError(w, http.StatusNotImplemented, "PubSub triggers not available")
|
||||
return
|
||||
}
|
||||
|
||||
namespace := h.getNamespaceFromRequest(r)
|
||||
if namespace == "" {
|
||||
writeError(w, http.StatusBadRequest, "namespace required")
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Look up the trigger's topic before deleting (for cache invalidation)
|
||||
fn, err := h.registry.Get(ctx, namespace, functionName, 0)
|
||||
if err != nil {
|
||||
if serverless.IsNotFound(err) {
|
||||
writeError(w, http.StatusNotFound, "Function not found")
|
||||
} else {
|
||||
writeError(w, http.StatusInternalServerError, "Failed to look up function")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Get current triggers to find the topic for cache invalidation
|
||||
triggers, err := h.triggerStore.ListByFunction(ctx, fn.ID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "Failed to look up triggers")
|
||||
return
|
||||
}
|
||||
|
||||
// Find the topic for the trigger being deleted
|
||||
var triggerTopic string
|
||||
for _, t := range triggers {
|
||||
if t.ID == triggerID {
|
||||
triggerTopic = t.Topic
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if err := h.triggerStore.Remove(ctx, triggerID); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "Failed to remove trigger: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Invalidate cache for the topic
|
||||
if h.dispatcher != nil && triggerTopic != "" {
|
||||
h.dispatcher.InvalidateCache(ctx, namespace, triggerTopic)
|
||||
}
|
||||
|
||||
h.logger.Info("PubSub trigger removed via API",
|
||||
zap.String("function", functionName),
|
||||
zap.String("trigger_id", triggerID),
|
||||
)
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]interface{}{
|
||||
"message": "Trigger removed",
|
||||
})
|
||||
}
|
||||
@ -6,16 +6,19 @@ import (
|
||||
"github.com/DeBrosOfficial/network/pkg/gateway/auth"
|
||||
"github.com/DeBrosOfficial/network/pkg/gateway/ctxkeys"
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless"
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless/triggers"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// ServerlessHandlers contains handlers for serverless function endpoints.
|
||||
// It's a separate struct to keep the Gateway struct clean.
|
||||
type ServerlessHandlers struct {
|
||||
invoker *serverless.Invoker
|
||||
registry serverless.FunctionRegistry
|
||||
wsManager *serverless.WSManager
|
||||
logger *zap.Logger
|
||||
invoker *serverless.Invoker
|
||||
registry serverless.FunctionRegistry
|
||||
wsManager *serverless.WSManager
|
||||
triggerStore *triggers.PubSubTriggerStore
|
||||
dispatcher *triggers.PubSubDispatcher
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// NewServerlessHandlers creates a new ServerlessHandlers instance.
|
||||
@ -23,13 +26,17 @@ func NewServerlessHandlers(
|
||||
invoker *serverless.Invoker,
|
||||
registry serverless.FunctionRegistry,
|
||||
wsManager *serverless.WSManager,
|
||||
triggerStore *triggers.PubSubTriggerStore,
|
||||
dispatcher *triggers.PubSubDispatcher,
|
||||
logger *zap.Logger,
|
||||
) *ServerlessHandlers {
|
||||
return &ServerlessHandlers{
|
||||
invoker: invoker,
|
||||
registry: registry,
|
||||
wsManager: wsManager,
|
||||
logger: logger,
|
||||
invoker: invoker,
|
||||
registry: registry,
|
||||
wsManager: wsManager,
|
||||
triggerStore: triggerStore,
|
||||
dispatcher: dispatcher,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -50,7 +50,7 @@ func TestServerlessHandlers_ListFunctions(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
h := serverlesshandlers.NewServerlessHandlers(nil, registry, nil, logger)
|
||||
h := serverlesshandlers.NewServerlessHandlers(nil, registry, nil, nil, nil, logger)
|
||||
|
||||
req, _ := http.NewRequest("GET", "/v1/functions?namespace=ns1", nil)
|
||||
rr := httptest.NewRecorder()
|
||||
@ -73,7 +73,7 @@ func TestServerlessHandlers_DeployFunction(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
registry := &mockFunctionRegistry{}
|
||||
|
||||
h := serverlesshandlers.NewServerlessHandlers(nil, registry, nil, logger)
|
||||
h := serverlesshandlers.NewServerlessHandlers(nil, registry, nil, nil, nil, logger)
|
||||
|
||||
// Test JSON deploy (which is partially supported according to code)
|
||||
// Should be 400 because WASM is missing or base64 not supported
|
||||
|
||||
230
pkg/serverless/triggers/dispatcher.go
Normal file
230
pkg/serverless/triggers/dispatcher.go
Normal file
@ -0,0 +1,230 @@
|
||||
package triggers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless"
|
||||
olriclib "github.com/olric-data/olric"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
// triggerCacheDMap is the Olric DMap name for caching trigger lookups.
|
||||
triggerCacheDMap = "pubsub_triggers"
|
||||
|
||||
// maxTriggerDepth prevents infinite loops when triggered functions publish
|
||||
// back to the same topic via the HTTP API.
|
||||
maxTriggerDepth = 5
|
||||
|
||||
// dispatchTimeout is the timeout for each triggered function invocation.
|
||||
dispatchTimeout = 60 * time.Second
|
||||
)
|
||||
|
||||
// PubSubEvent is the JSON payload sent to functions triggered by PubSub messages.
|
||||
type PubSubEvent struct {
|
||||
Topic string `json:"topic"`
|
||||
Data json.RawMessage `json:"data"`
|
||||
Namespace string `json:"namespace"`
|
||||
TriggerDepth int `json:"trigger_depth"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
}
|
||||
|
||||
// PubSubDispatcher looks up triggers for a topic+namespace and asynchronously
|
||||
// invokes matching serverless functions.
|
||||
type PubSubDispatcher struct {
|
||||
store *PubSubTriggerStore
|
||||
invoker *serverless.Invoker
|
||||
olricClient olriclib.Client // may be nil (cache disabled)
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// NewPubSubDispatcher creates a new PubSub trigger dispatcher.
|
||||
func NewPubSubDispatcher(
|
||||
store *PubSubTriggerStore,
|
||||
invoker *serverless.Invoker,
|
||||
olricClient olriclib.Client,
|
||||
logger *zap.Logger,
|
||||
) *PubSubDispatcher {
|
||||
return &PubSubDispatcher{
|
||||
store: store,
|
||||
invoker: invoker,
|
||||
olricClient: olricClient,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch looks up all triggers registered for the given topic+namespace and
|
||||
// invokes matching functions asynchronously. Each invocation runs in its own
|
||||
// goroutine and does not block the caller.
|
||||
func (d *PubSubDispatcher) Dispatch(ctx context.Context, namespace, topic string, data []byte, depth int) {
|
||||
if depth >= maxTriggerDepth {
|
||||
d.logger.Warn("PubSub trigger depth limit reached, skipping dispatch",
|
||||
zap.String("namespace", namespace),
|
||||
zap.String("topic", topic),
|
||||
zap.Int("depth", depth),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
matches, err := d.getMatches(ctx, namespace, topic)
|
||||
if err != nil {
|
||||
d.logger.Error("Failed to look up PubSub triggers",
|
||||
zap.String("namespace", namespace),
|
||||
zap.String("topic", topic),
|
||||
zap.Error(err),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
if len(matches) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Build the event payload once for all invocations
|
||||
event := PubSubEvent{
|
||||
Topic: topic,
|
||||
Data: json.RawMessage(data),
|
||||
Namespace: namespace,
|
||||
TriggerDepth: depth + 1,
|
||||
Timestamp: time.Now().Unix(),
|
||||
}
|
||||
eventJSON, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
d.logger.Error("Failed to marshal PubSub event", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
d.logger.Debug("Dispatching PubSub triggers",
|
||||
zap.String("namespace", namespace),
|
||||
zap.String("topic", topic),
|
||||
zap.Int("matches", len(matches)),
|
||||
zap.Int("depth", depth),
|
||||
)
|
||||
|
||||
for _, match := range matches {
|
||||
go d.invokeFunction(match, eventJSON)
|
||||
}
|
||||
}
|
||||
|
||||
// InvalidateCache removes the cached trigger lookup for a namespace+topic.
|
||||
// Call this when triggers are added or removed.
|
||||
func (d *PubSubDispatcher) InvalidateCache(ctx context.Context, namespace, topic string) {
|
||||
if d.olricClient == nil {
|
||||
return
|
||||
}
|
||||
|
||||
dm, err := d.olricClient.NewDMap(triggerCacheDMap)
|
||||
if err != nil {
|
||||
d.logger.Debug("Failed to get trigger cache DMap for invalidation", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
key := cacheKey(namespace, topic)
|
||||
if _, err := dm.Delete(ctx, key); err != nil {
|
||||
d.logger.Debug("Failed to invalidate trigger cache", zap.String("key", key), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// getMatches returns the trigger matches for a topic+namespace, using Olric cache when available.
|
||||
func (d *PubSubDispatcher) getMatches(ctx context.Context, namespace, topic string) ([]TriggerMatch, error) {
|
||||
// Try cache first
|
||||
if d.olricClient != nil {
|
||||
if matches, ok := d.getCached(ctx, namespace, topic); ok {
|
||||
return matches, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Cache miss — query database
|
||||
matches, err := d.store.GetByTopicAndNamespace(ctx, topic, namespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Populate cache
|
||||
if d.olricClient != nil && matches != nil {
|
||||
d.setCache(ctx, namespace, topic, matches)
|
||||
}
|
||||
|
||||
return matches, nil
|
||||
}
|
||||
|
||||
// getCached attempts to retrieve trigger matches from Olric cache.
|
||||
func (d *PubSubDispatcher) getCached(ctx context.Context, namespace, topic string) ([]TriggerMatch, bool) {
|
||||
dm, err := d.olricClient.NewDMap(triggerCacheDMap)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
key := cacheKey(namespace, topic)
|
||||
result, err := dm.Get(ctx, key)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
data, err := result.Byte()
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
var matches []TriggerMatch
|
||||
if err := json.Unmarshal(data, &matches); err != nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return matches, true
|
||||
}
|
||||
|
||||
// setCache stores trigger matches in Olric cache.
|
||||
func (d *PubSubDispatcher) setCache(ctx context.Context, namespace, topic string, matches []TriggerMatch) {
|
||||
dm, err := d.olricClient.NewDMap(triggerCacheDMap)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
data, err := json.Marshal(matches)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
key := cacheKey(namespace, topic)
|
||||
_ = dm.Put(ctx, key, data)
|
||||
}
|
||||
|
||||
// invokeFunction invokes a single function for a trigger match.
|
||||
func (d *PubSubDispatcher) invokeFunction(match TriggerMatch, eventJSON []byte) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), dispatchTimeout)
|
||||
defer cancel()
|
||||
|
||||
req := &serverless.InvokeRequest{
|
||||
Namespace: match.Namespace,
|
||||
FunctionName: match.FunctionName,
|
||||
Input: eventJSON,
|
||||
TriggerType: serverless.TriggerTypePubSub,
|
||||
}
|
||||
|
||||
resp, err := d.invoker.Invoke(ctx, req)
|
||||
if err != nil {
|
||||
d.logger.Warn("PubSub trigger invocation failed",
|
||||
zap.String("function", match.FunctionName),
|
||||
zap.String("namespace", match.Namespace),
|
||||
zap.String("topic", match.Topic),
|
||||
zap.String("trigger_id", match.TriggerID),
|
||||
zap.Error(err),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
d.logger.Debug("PubSub trigger invocation completed",
|
||||
zap.String("function", match.FunctionName),
|
||||
zap.String("topic", match.Topic),
|
||||
zap.String("status", string(resp.Status)),
|
||||
zap.Int64("duration_ms", resp.DurationMS),
|
||||
)
|
||||
}
|
||||
|
||||
// cacheKey returns the Olric cache key for a namespace+topic pair.
|
||||
func cacheKey(namespace, topic string) string {
|
||||
return "triggers:" + namespace + ":" + topic
|
||||
}
|
||||
187
pkg/serverless/triggers/pubsub_store.go
Normal file
187
pkg/serverless/triggers/pubsub_store.go
Normal file
@ -0,0 +1,187 @@
|
||||
// Package triggers provides PubSub trigger management for the serverless engine.
|
||||
// It handles registering, querying, and removing triggers that automatically invoke
|
||||
// functions when messages are published to specific PubSub topics.
|
||||
package triggers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless"
|
||||
"github.com/google/uuid"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// TriggerMatch contains the fields needed to dispatch a trigger invocation.
|
||||
// It's the result of JOINing function_pubsub_triggers with functions.
|
||||
type TriggerMatch struct {
|
||||
TriggerID string
|
||||
FunctionID string
|
||||
FunctionName string
|
||||
Namespace string
|
||||
Topic string
|
||||
}
|
||||
|
||||
// triggerRow maps to the function_pubsub_triggers table for query scanning.
|
||||
type triggerRow struct {
|
||||
ID string
|
||||
FunctionID string
|
||||
Topic string
|
||||
Enabled bool
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
// triggerMatchRow maps to the JOIN query result for scanning.
|
||||
type triggerMatchRow struct {
|
||||
TriggerID string
|
||||
FunctionID string
|
||||
FunctionName string
|
||||
Namespace string
|
||||
Topic string
|
||||
}
|
||||
|
||||
// PubSubTriggerStore manages PubSub trigger persistence in RQLite.
|
||||
type PubSubTriggerStore struct {
|
||||
db rqlite.Client
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// NewPubSubTriggerStore creates a new PubSub trigger store.
|
||||
func NewPubSubTriggerStore(db rqlite.Client, logger *zap.Logger) *PubSubTriggerStore {
|
||||
return &PubSubTriggerStore{
|
||||
db: db,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Add registers a new PubSub trigger for a function.
|
||||
// Returns the trigger ID.
|
||||
func (s *PubSubTriggerStore) Add(ctx context.Context, functionID, topic string) (string, error) {
|
||||
if functionID == "" {
|
||||
return "", fmt.Errorf("function ID required")
|
||||
}
|
||||
if topic == "" {
|
||||
return "", fmt.Errorf("topic required")
|
||||
}
|
||||
|
||||
id := uuid.New().String()
|
||||
now := time.Now()
|
||||
|
||||
query := `
|
||||
INSERT INTO function_pubsub_triggers (id, function_id, topic, enabled, created_at)
|
||||
VALUES (?, ?, ?, TRUE, ?)
|
||||
`
|
||||
if _, err := s.db.Exec(ctx, query, id, functionID, topic, now); err != nil {
|
||||
return "", fmt.Errorf("failed to add pubsub trigger: %w", err)
|
||||
}
|
||||
|
||||
s.logger.Info("PubSub trigger added",
|
||||
zap.String("trigger_id", id),
|
||||
zap.String("function_id", functionID),
|
||||
zap.String("topic", topic),
|
||||
)
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// Remove deletes a trigger by ID.
|
||||
func (s *PubSubTriggerStore) Remove(ctx context.Context, triggerID string) error {
|
||||
if triggerID == "" {
|
||||
return fmt.Errorf("trigger ID required")
|
||||
}
|
||||
|
||||
query := `DELETE FROM function_pubsub_triggers WHERE id = ?`
|
||||
result, err := s.db.Exec(ctx, query, triggerID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to remove trigger: %w", err)
|
||||
}
|
||||
|
||||
affected, _ := result.RowsAffected()
|
||||
if affected == 0 {
|
||||
return fmt.Errorf("trigger not found: %s", triggerID)
|
||||
}
|
||||
|
||||
s.logger.Info("PubSub trigger removed", zap.String("trigger_id", triggerID))
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveByFunction deletes all triggers for a function.
|
||||
// Used during function re-deploy to clear old triggers.
|
||||
func (s *PubSubTriggerStore) RemoveByFunction(ctx context.Context, functionID string) error {
|
||||
if functionID == "" {
|
||||
return fmt.Errorf("function ID required")
|
||||
}
|
||||
|
||||
query := `DELETE FROM function_pubsub_triggers WHERE function_id = ?`
|
||||
if _, err := s.db.Exec(ctx, query, functionID); err != nil {
|
||||
return fmt.Errorf("failed to remove triggers for function: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListByFunction returns all PubSub triggers for a function.
|
||||
func (s *PubSubTriggerStore) ListByFunction(ctx context.Context, functionID string) ([]serverless.PubSubTrigger, error) {
|
||||
if functionID == "" {
|
||||
return nil, fmt.Errorf("function ID required")
|
||||
}
|
||||
|
||||
query := `
|
||||
SELECT id, function_id, topic, enabled, created_at
|
||||
FROM function_pubsub_triggers
|
||||
WHERE function_id = ?
|
||||
`
|
||||
|
||||
var rows []triggerRow
|
||||
if err := s.db.Query(ctx, &rows, query, functionID); err != nil {
|
||||
return nil, fmt.Errorf("failed to list triggers: %w", err)
|
||||
}
|
||||
|
||||
triggers := make([]serverless.PubSubTrigger, len(rows))
|
||||
for i, row := range rows {
|
||||
triggers[i] = serverless.PubSubTrigger{
|
||||
ID: row.ID,
|
||||
FunctionID: row.FunctionID,
|
||||
Topic: row.Topic,
|
||||
Enabled: row.Enabled,
|
||||
}
|
||||
}
|
||||
|
||||
return triggers, nil
|
||||
}
|
||||
|
||||
// GetByTopicAndNamespace returns all enabled triggers for a topic within a namespace.
|
||||
// Only returns triggers for active functions.
|
||||
func (s *PubSubTriggerStore) GetByTopicAndNamespace(ctx context.Context, topic, namespace string) ([]TriggerMatch, error) {
|
||||
if topic == "" || namespace == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
query := `
|
||||
SELECT t.id AS trigger_id, t.function_id AS function_id,
|
||||
f.name AS function_name, f.namespace AS namespace, t.topic AS topic
|
||||
FROM function_pubsub_triggers t
|
||||
JOIN functions f ON t.function_id = f.id
|
||||
WHERE t.topic = ? AND f.namespace = ? AND t.enabled = TRUE AND f.status = 'active'
|
||||
`
|
||||
|
||||
var rows []triggerMatchRow
|
||||
if err := s.db.Query(ctx, &rows, query, topic, namespace); err != nil {
|
||||
return nil, fmt.Errorf("failed to query triggers for topic: %w", err)
|
||||
}
|
||||
|
||||
matches := make([]TriggerMatch, len(rows))
|
||||
for i, row := range rows {
|
||||
matches[i] = TriggerMatch{
|
||||
TriggerID: row.TriggerID,
|
||||
FunctionID: row.FunctionID,
|
||||
FunctionName: row.FunctionName,
|
||||
Namespace: row.Namespace,
|
||||
Topic: row.Topic,
|
||||
}
|
||||
}
|
||||
|
||||
return matches, nil
|
||||
}
|
||||
219
pkg/serverless/triggers/triggers_test.go
Normal file
219
pkg/serverless/triggers/triggers_test.go
Normal file
@ -0,0 +1,219 @@
|
||||
package triggers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/serverless"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Mock Invoker
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type mockInvokeCall struct {
|
||||
Namespace string
|
||||
FunctionName string
|
||||
TriggerType serverless.TriggerType
|
||||
Input []byte
|
||||
}
|
||||
|
||||
// mockInvokerForTest wraps a real nil invoker but tracks calls.
|
||||
// Since we can't construct a real Invoker without engine/registry/hostfuncs,
|
||||
// we test the dispatcher at a higher level by checking its behavior.
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Dispatcher Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestDispatcher_DepthLimit(t *testing.T) {
|
||||
logger, _ := zap.NewDevelopment()
|
||||
store := NewPubSubTriggerStore(nil, logger) // store won't be called
|
||||
d := NewPubSubDispatcher(store, nil, nil, logger)
|
||||
|
||||
// Dispatch at max depth should be a no-op (no panic, no store call)
|
||||
d.Dispatch(context.Background(), "ns", "topic", []byte("data"), maxTriggerDepth)
|
||||
d.Dispatch(context.Background(), "ns", "topic", []byte("data"), maxTriggerDepth+1)
|
||||
}
|
||||
|
||||
func TestCacheKey(t *testing.T) {
|
||||
key := cacheKey("my-namespace", "my-topic")
|
||||
if key != "triggers:my-namespace:my-topic" {
|
||||
t.Errorf("unexpected cache key: %s", key)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPubSubEvent_Marshal(t *testing.T) {
|
||||
event := PubSubEvent{
|
||||
Topic: "chat",
|
||||
Data: json.RawMessage(`{"msg":"hello"}`),
|
||||
Namespace: "my-app",
|
||||
TriggerDepth: 1,
|
||||
Timestamp: 1708300000,
|
||||
}
|
||||
|
||||
data, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal failed: %v", err)
|
||||
}
|
||||
|
||||
var decoded PubSubEvent
|
||||
if err := json.Unmarshal(data, &decoded); err != nil {
|
||||
t.Fatalf("unmarshal failed: %v", err)
|
||||
}
|
||||
|
||||
if decoded.Topic != "chat" {
|
||||
t.Errorf("expected topic 'chat', got '%s'", decoded.Topic)
|
||||
}
|
||||
if decoded.Namespace != "my-app" {
|
||||
t.Errorf("expected namespace 'my-app', got '%s'", decoded.Namespace)
|
||||
}
|
||||
if decoded.TriggerDepth != 1 {
|
||||
t.Errorf("expected depth 1, got %d", decoded.TriggerDepth)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Store Tests (validation only — DB operations require rqlite.Client)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestStore_AddValidation(t *testing.T) {
|
||||
logger, _ := zap.NewDevelopment()
|
||||
store := NewPubSubTriggerStore(nil, logger)
|
||||
|
||||
_, err := store.Add(context.Background(), "", "topic")
|
||||
if err == nil {
|
||||
t.Error("expected error for empty function ID")
|
||||
}
|
||||
|
||||
_, err = store.Add(context.Background(), "fn-123", "")
|
||||
if err == nil {
|
||||
t.Error("expected error for empty topic")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_RemoveValidation(t *testing.T) {
|
||||
logger, _ := zap.NewDevelopment()
|
||||
store := NewPubSubTriggerStore(nil, logger)
|
||||
|
||||
err := store.Remove(context.Background(), "")
|
||||
if err == nil {
|
||||
t.Error("expected error for empty trigger ID")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_RemoveByFunctionValidation(t *testing.T) {
|
||||
logger, _ := zap.NewDevelopment()
|
||||
store := NewPubSubTriggerStore(nil, logger)
|
||||
|
||||
err := store.RemoveByFunction(context.Background(), "")
|
||||
if err == nil {
|
||||
t.Error("expected error for empty function ID")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_ListByFunctionValidation(t *testing.T) {
|
||||
logger, _ := zap.NewDevelopment()
|
||||
store := NewPubSubTriggerStore(nil, logger)
|
||||
|
||||
_, err := store.ListByFunction(context.Background(), "")
|
||||
if err == nil {
|
||||
t.Error("expected error for empty function ID")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_GetByTopicAndNamespace_Empty(t *testing.T) {
|
||||
logger, _ := zap.NewDevelopment()
|
||||
store := NewPubSubTriggerStore(nil, logger)
|
||||
|
||||
// Empty topic/namespace should return nil, nil (not an error)
|
||||
matches, err := store.GetByTopicAndNamespace(context.Background(), "", "ns")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if matches != nil {
|
||||
t.Errorf("expected nil matches for empty topic, got %v", matches)
|
||||
}
|
||||
|
||||
matches, err = store.GetByTopicAndNamespace(context.Background(), "topic", "")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if matches != nil {
|
||||
t.Errorf("expected nil matches for empty namespace, got %v", matches)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Dispatcher Integration-like Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestDispatcher_NoMatchesNoPanic(t *testing.T) {
|
||||
// Dispatcher with nil olricClient and nil invoker should handle
|
||||
// the case where there are no matches gracefully.
|
||||
logger, _ := zap.NewDevelopment()
|
||||
|
||||
// Create a mock store that returns empty matches
|
||||
store := &mockTriggerStore{matches: nil}
|
||||
d := &PubSubDispatcher{
|
||||
store: &PubSubTriggerStore{db: nil, logger: logger},
|
||||
invoker: nil,
|
||||
logger: logger,
|
||||
}
|
||||
// Replace store field directly for testing
|
||||
d.store = store.asPubSubTriggerStore()
|
||||
|
||||
// This should not panic even with nil invoker since no matches
|
||||
// We can't easily test this without a real store, so we test the depth limit instead
|
||||
d.Dispatch(context.Background(), "ns", "topic", []byte("data"), maxTriggerDepth)
|
||||
}
|
||||
|
||||
// mockTriggerStore is used only for structural validation in tests.
|
||||
type mockTriggerStore struct {
|
||||
matches []TriggerMatch
|
||||
}
|
||||
|
||||
func (m *mockTriggerStore) asPubSubTriggerStore() *PubSubTriggerStore {
|
||||
// Can't return a mock as *PubSubTriggerStore since it's a concrete type.
|
||||
// This is a limitation — integration tests with a real rqlite would be needed.
|
||||
return nil
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Callback Wiring Test
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestOnPublishCallback(t *testing.T) {
|
||||
var called atomic.Int32
|
||||
var receivedNS, receivedTopic string
|
||||
var receivedData []byte
|
||||
|
||||
callback := func(ctx context.Context, namespace, topic string, data []byte) {
|
||||
called.Add(1)
|
||||
receivedNS = namespace
|
||||
receivedTopic = topic
|
||||
receivedData = data
|
||||
}
|
||||
|
||||
// Simulate what gateway.go does
|
||||
callback(context.Background(), "my-ns", "events", []byte("hello"))
|
||||
|
||||
time.Sleep(10 * time.Millisecond) // Let goroutine complete
|
||||
|
||||
if called.Load() != 1 {
|
||||
t.Errorf("expected callback called once, got %d", called.Load())
|
||||
}
|
||||
if receivedNS != "my-ns" {
|
||||
t.Errorf("expected namespace 'my-ns', got '%s'", receivedNS)
|
||||
}
|
||||
if receivedTopic != "events" {
|
||||
t.Errorf("expected topic 'events', got '%s'", receivedTopic)
|
||||
}
|
||||
if string(receivedData) != "hello" {
|
||||
t.Errorf("expected data 'hello', got '%s'", string(receivedData))
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user