mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 16:46:58 +00:00
188 lines
5.2 KiB
Go
188 lines
5.2 KiB
Go
// Package triggers provides PubSub trigger management for the serverless engine.
|
|
// It handles registering, querying, and removing triggers that automatically invoke
|
|
// functions when messages are published to specific PubSub topics.
|
|
package triggers
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
|
"github.com/DeBrosOfficial/network/pkg/serverless"
|
|
"github.com/google/uuid"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// TriggerMatch contains the fields needed to dispatch a trigger invocation.
|
|
// It's the result of JOINing function_pubsub_triggers with functions.
|
|
type TriggerMatch struct {
|
|
TriggerID string
|
|
FunctionID string
|
|
FunctionName string
|
|
Namespace string
|
|
Topic string
|
|
}
|
|
|
|
// triggerRow maps to the function_pubsub_triggers table for query scanning.
|
|
type triggerRow struct {
|
|
ID string
|
|
FunctionID string
|
|
Topic string
|
|
Enabled bool
|
|
CreatedAt time.Time
|
|
}
|
|
|
|
// triggerMatchRow maps to the JOIN query result for scanning.
|
|
type triggerMatchRow struct {
|
|
TriggerID string
|
|
FunctionID string
|
|
FunctionName string
|
|
Namespace string
|
|
Topic string
|
|
}
|
|
|
|
// PubSubTriggerStore manages PubSub trigger persistence in RQLite.
|
|
type PubSubTriggerStore struct {
|
|
db rqlite.Client
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// NewPubSubTriggerStore creates a new PubSub trigger store.
|
|
func NewPubSubTriggerStore(db rqlite.Client, logger *zap.Logger) *PubSubTriggerStore {
|
|
return &PubSubTriggerStore{
|
|
db: db,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// Add registers a new PubSub trigger for a function.
|
|
// Returns the trigger ID.
|
|
func (s *PubSubTriggerStore) Add(ctx context.Context, functionID, topic string) (string, error) {
|
|
if functionID == "" {
|
|
return "", fmt.Errorf("function ID required")
|
|
}
|
|
if topic == "" {
|
|
return "", fmt.Errorf("topic required")
|
|
}
|
|
|
|
id := uuid.New().String()
|
|
now := time.Now()
|
|
|
|
query := `
|
|
INSERT INTO function_pubsub_triggers (id, function_id, topic, enabled, created_at)
|
|
VALUES (?, ?, ?, TRUE, ?)
|
|
`
|
|
if _, err := s.db.Exec(ctx, query, id, functionID, topic, now); err != nil {
|
|
return "", fmt.Errorf("failed to add pubsub trigger: %w", err)
|
|
}
|
|
|
|
s.logger.Info("PubSub trigger added",
|
|
zap.String("trigger_id", id),
|
|
zap.String("function_id", functionID),
|
|
zap.String("topic", topic),
|
|
)
|
|
|
|
return id, nil
|
|
}
|
|
|
|
// Remove deletes a trigger by ID.
|
|
func (s *PubSubTriggerStore) Remove(ctx context.Context, triggerID string) error {
|
|
if triggerID == "" {
|
|
return fmt.Errorf("trigger ID required")
|
|
}
|
|
|
|
query := `DELETE FROM function_pubsub_triggers WHERE id = ?`
|
|
result, err := s.db.Exec(ctx, query, triggerID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to remove trigger: %w", err)
|
|
}
|
|
|
|
affected, _ := result.RowsAffected()
|
|
if affected == 0 {
|
|
return fmt.Errorf("trigger not found: %s", triggerID)
|
|
}
|
|
|
|
s.logger.Info("PubSub trigger removed", zap.String("trigger_id", triggerID))
|
|
return nil
|
|
}
|
|
|
|
// RemoveByFunction deletes all triggers for a function.
|
|
// Used during function re-deploy to clear old triggers.
|
|
func (s *PubSubTriggerStore) RemoveByFunction(ctx context.Context, functionID string) error {
|
|
if functionID == "" {
|
|
return fmt.Errorf("function ID required")
|
|
}
|
|
|
|
query := `DELETE FROM function_pubsub_triggers WHERE function_id = ?`
|
|
if _, err := s.db.Exec(ctx, query, functionID); err != nil {
|
|
return fmt.Errorf("failed to remove triggers for function: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ListByFunction returns all PubSub triggers for a function.
|
|
func (s *PubSubTriggerStore) ListByFunction(ctx context.Context, functionID string) ([]serverless.PubSubTrigger, error) {
|
|
if functionID == "" {
|
|
return nil, fmt.Errorf("function ID required")
|
|
}
|
|
|
|
query := `
|
|
SELECT id, function_id, topic, enabled, created_at
|
|
FROM function_pubsub_triggers
|
|
WHERE function_id = ?
|
|
`
|
|
|
|
var rows []triggerRow
|
|
if err := s.db.Query(ctx, &rows, query, functionID); err != nil {
|
|
return nil, fmt.Errorf("failed to list triggers: %w", err)
|
|
}
|
|
|
|
triggers := make([]serverless.PubSubTrigger, len(rows))
|
|
for i, row := range rows {
|
|
triggers[i] = serverless.PubSubTrigger{
|
|
ID: row.ID,
|
|
FunctionID: row.FunctionID,
|
|
Topic: row.Topic,
|
|
Enabled: row.Enabled,
|
|
}
|
|
}
|
|
|
|
return triggers, nil
|
|
}
|
|
|
|
// GetByTopicAndNamespace returns all enabled triggers for a topic within a namespace.
|
|
// Only returns triggers for active functions.
|
|
func (s *PubSubTriggerStore) GetByTopicAndNamespace(ctx context.Context, topic, namespace string) ([]TriggerMatch, error) {
|
|
if topic == "" || namespace == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
query := `
|
|
SELECT t.id AS trigger_id, t.function_id AS function_id,
|
|
f.name AS function_name, f.namespace AS namespace, t.topic AS topic
|
|
FROM function_pubsub_triggers t
|
|
JOIN functions f ON t.function_id = f.id
|
|
WHERE t.topic = ? AND f.namespace = ? AND t.enabled = TRUE AND f.status = 'active'
|
|
`
|
|
|
|
var rows []triggerMatchRow
|
|
if err := s.db.Query(ctx, &rows, query, topic, namespace); err != nil {
|
|
return nil, fmt.Errorf("failed to query triggers for topic: %w", err)
|
|
}
|
|
|
|
matches := make([]TriggerMatch, len(rows))
|
|
for i, row := range rows {
|
|
matches[i] = TriggerMatch{
|
|
TriggerID: row.TriggerID,
|
|
FunctionID: row.FunctionID,
|
|
FunctionName: row.FunctionName,
|
|
Namespace: row.Namespace,
|
|
Topic: row.Topic,
|
|
}
|
|
}
|
|
|
|
return matches, nil
|
|
}
|