mirror of
https://github.com/DeBrosOfficial/network.git
synced 2026-01-30 09:53:03 +00:00
- Commented out debug logging statements in the Rqlite MCP server to prevent excessive disk writes during operation. - Added a new PubSubAdapter method in the client for direct access to the pubsub.ClientAdapter, bypassing authentication checks for serverless functions. - Integrated the pubsub adapter into the gateway for serverless function support. - Implemented a new pubsub_publish host function in the serverless engine for publishing messages to topics.
688 lines
20 KiB
Go
688 lines
20 KiB
Go
package serverless
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/aes"
|
|
"crypto/cipher"
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/DeBrosOfficial/network/pkg/ipfs"
|
|
"github.com/DeBrosOfficial/network/pkg/pubsub"
|
|
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
|
"github.com/DeBrosOfficial/network/pkg/tlsutil"
|
|
olriclib "github.com/olric-data/olric"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Ensure HostFunctions implements HostServices interface.
|
|
var _ HostServices = (*HostFunctions)(nil)
|
|
|
|
// HostFunctions provides the bridge between WASM functions and Orama services.
|
|
// It implements the HostServices interface and is injected into the execution context.
|
|
type HostFunctions struct {
|
|
db rqlite.Client
|
|
cacheClient olriclib.Client
|
|
storage ipfs.IPFSClient
|
|
ipfsAPIURL string
|
|
pubsub *pubsub.ClientAdapter
|
|
wsManager WebSocketManager
|
|
secrets SecretsManager
|
|
httpClient *http.Client
|
|
logger *zap.Logger
|
|
|
|
// Current invocation context (set per-execution)
|
|
invCtx *InvocationContext
|
|
invCtxLock sync.RWMutex
|
|
|
|
// Captured logs for this invocation
|
|
logs []LogEntry
|
|
logsLock sync.Mutex
|
|
}
|
|
|
|
// HostFunctionsConfig holds configuration for HostFunctions.
|
|
type HostFunctionsConfig struct {
|
|
IPFSAPIURL string
|
|
HTTPTimeout time.Duration
|
|
}
|
|
|
|
// NewHostFunctions creates a new HostFunctions instance.
|
|
func NewHostFunctions(
|
|
db rqlite.Client,
|
|
cacheClient olriclib.Client,
|
|
storage ipfs.IPFSClient,
|
|
pubsubAdapter *pubsub.ClientAdapter,
|
|
wsManager WebSocketManager,
|
|
secrets SecretsManager,
|
|
cfg HostFunctionsConfig,
|
|
logger *zap.Logger,
|
|
) *HostFunctions {
|
|
httpTimeout := cfg.HTTPTimeout
|
|
if httpTimeout == 0 {
|
|
httpTimeout = 30 * time.Second
|
|
}
|
|
|
|
return &HostFunctions{
|
|
db: db,
|
|
cacheClient: cacheClient,
|
|
storage: storage,
|
|
ipfsAPIURL: cfg.IPFSAPIURL,
|
|
pubsub: pubsubAdapter,
|
|
wsManager: wsManager,
|
|
secrets: secrets,
|
|
httpClient: tlsutil.NewHTTPClient(httpTimeout),
|
|
logger: logger,
|
|
logs: make([]LogEntry, 0),
|
|
}
|
|
}
|
|
|
|
// SetInvocationContext sets the current invocation context.
|
|
// Must be called before executing a function.
|
|
func (h *HostFunctions) SetInvocationContext(invCtx *InvocationContext) {
|
|
h.invCtxLock.Lock()
|
|
defer h.invCtxLock.Unlock()
|
|
h.invCtx = invCtx
|
|
h.logs = make([]LogEntry, 0) // Reset logs for new invocation
|
|
}
|
|
|
|
// GetLogs returns the captured logs for the current invocation.
|
|
func (h *HostFunctions) GetLogs() []LogEntry {
|
|
h.logsLock.Lock()
|
|
defer h.logsLock.Unlock()
|
|
logsCopy := make([]LogEntry, len(h.logs))
|
|
copy(logsCopy, h.logs)
|
|
return logsCopy
|
|
}
|
|
|
|
// ClearContext clears the invocation context after execution.
|
|
func (h *HostFunctions) ClearContext() {
|
|
h.invCtxLock.Lock()
|
|
defer h.invCtxLock.Unlock()
|
|
h.invCtx = nil
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// Database Operations
|
|
// -----------------------------------------------------------------------------
|
|
|
|
// DBQuery executes a SELECT query and returns JSON-encoded results.
|
|
func (h *HostFunctions) DBQuery(ctx context.Context, query string, args []interface{}) ([]byte, error) {
|
|
if h.db == nil {
|
|
return nil, &HostFunctionError{Function: "db_query", Cause: ErrDatabaseUnavailable}
|
|
}
|
|
|
|
var results []map[string]interface{}
|
|
if err := h.db.Query(ctx, &results, query, args...); err != nil {
|
|
return nil, &HostFunctionError{Function: "db_query", Cause: err}
|
|
}
|
|
|
|
data, err := json.Marshal(results)
|
|
if err != nil {
|
|
return nil, &HostFunctionError{Function: "db_query", Cause: fmt.Errorf("failed to marshal results: %w", err)}
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
// DBExecute executes an INSERT/UPDATE/DELETE query and returns affected rows.
|
|
func (h *HostFunctions) DBExecute(ctx context.Context, query string, args []interface{}) (int64, error) {
|
|
if h.db == nil {
|
|
return 0, &HostFunctionError{Function: "db_execute", Cause: ErrDatabaseUnavailable}
|
|
}
|
|
|
|
result, err := h.db.Exec(ctx, query, args...)
|
|
if err != nil {
|
|
return 0, &HostFunctionError{Function: "db_execute", Cause: err}
|
|
}
|
|
|
|
affected, _ := result.RowsAffected()
|
|
return affected, nil
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// Cache Operations
|
|
// -----------------------------------------------------------------------------
|
|
|
|
const cacheDMapName = "serverless_cache"
|
|
|
|
// CacheGet retrieves a value from the cache.
|
|
func (h *HostFunctions) CacheGet(ctx context.Context, key string) ([]byte, error) {
|
|
if h.cacheClient == nil {
|
|
return nil, &HostFunctionError{Function: "cache_get", Cause: ErrCacheUnavailable}
|
|
}
|
|
|
|
dm, err := h.cacheClient.NewDMap(cacheDMapName)
|
|
if err != nil {
|
|
return nil, &HostFunctionError{Function: "cache_get", Cause: fmt.Errorf("failed to get DMap: %w", err)}
|
|
}
|
|
|
|
result, err := dm.Get(ctx, key)
|
|
if err != nil {
|
|
return nil, &HostFunctionError{Function: "cache_get", Cause: err}
|
|
}
|
|
|
|
value, err := result.Byte()
|
|
if err != nil {
|
|
return nil, &HostFunctionError{Function: "cache_get", Cause: fmt.Errorf("failed to decode value: %w", err)}
|
|
}
|
|
|
|
return value, nil
|
|
}
|
|
|
|
// CacheSet stores a value in the cache with optional TTL.
|
|
// Note: TTL is currently not supported by the underlying Olric DMap.Put method.
|
|
// Values are stored indefinitely until explicitly deleted.
|
|
func (h *HostFunctions) CacheSet(ctx context.Context, key string, value []byte, ttlSeconds int64) error {
|
|
if h.cacheClient == nil {
|
|
return &HostFunctionError{Function: "cache_set", Cause: ErrCacheUnavailable}
|
|
}
|
|
|
|
dm, err := h.cacheClient.NewDMap(cacheDMapName)
|
|
if err != nil {
|
|
return &HostFunctionError{Function: "cache_set", Cause: fmt.Errorf("failed to get DMap: %w", err)}
|
|
}
|
|
|
|
// Note: Olric DMap.Put doesn't support TTL in the basic API
|
|
// For TTL support, consider using Olric's Expire API separately
|
|
if err := dm.Put(ctx, key, value); err != nil {
|
|
return &HostFunctionError{Function: "cache_set", Cause: err}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CacheDelete removes a value from the cache.
|
|
func (h *HostFunctions) CacheDelete(ctx context.Context, key string) error {
|
|
if h.cacheClient == nil {
|
|
return &HostFunctionError{Function: "cache_delete", Cause: ErrCacheUnavailable}
|
|
}
|
|
|
|
dm, err := h.cacheClient.NewDMap(cacheDMapName)
|
|
if err != nil {
|
|
return &HostFunctionError{Function: "cache_delete", Cause: fmt.Errorf("failed to get DMap: %w", err)}
|
|
}
|
|
|
|
if _, err := dm.Delete(ctx, key); err != nil {
|
|
return &HostFunctionError{Function: "cache_delete", Cause: err}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CacheIncr atomically increments a numeric value in cache by 1 and returns the new value.
|
|
// If the key doesn't exist, it is initialized to 0 before incrementing.
|
|
// Returns an error if the value exists but is not numeric.
|
|
func (h *HostFunctions) CacheIncr(ctx context.Context, key string) (int64, error) {
|
|
return h.CacheIncrBy(ctx, key, 1)
|
|
}
|
|
|
|
// CacheIncrBy atomically increments a numeric value by delta and returns the new value.
|
|
// If the key doesn't exist, it is initialized to 0 before incrementing.
|
|
// Returns an error if the value exists but is not numeric.
|
|
func (h *HostFunctions) CacheIncrBy(ctx context.Context, key string, delta int64) (int64, error) {
|
|
if h.cacheClient == nil {
|
|
return 0, &HostFunctionError{Function: "cache_incr_by", Cause: ErrCacheUnavailable}
|
|
}
|
|
|
|
dm, err := h.cacheClient.NewDMap(cacheDMapName)
|
|
if err != nil {
|
|
return 0, &HostFunctionError{Function: "cache_incr_by", Cause: fmt.Errorf("failed to get DMap: %w", err)}
|
|
}
|
|
|
|
// Olric's Incr method atomically increments a numeric value
|
|
// It initializes the key to 0 if it doesn't exist, then increments by delta
|
|
// Note: Olric's Incr takes int (not int64) and returns int
|
|
newValue, err := dm.Incr(ctx, key, int(delta))
|
|
if err != nil {
|
|
return 0, &HostFunctionError{Function: "cache_incr_by", Cause: fmt.Errorf("failed to increment: %w", err)}
|
|
}
|
|
|
|
return int64(newValue), nil
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// Storage Operations
|
|
// -----------------------------------------------------------------------------
|
|
|
|
// StoragePut uploads data to IPFS and returns the CID.
|
|
func (h *HostFunctions) StoragePut(ctx context.Context, data []byte) (string, error) {
|
|
if h.storage == nil {
|
|
return "", &HostFunctionError{Function: "storage_put", Cause: ErrStorageUnavailable}
|
|
}
|
|
|
|
reader := bytes.NewReader(data)
|
|
resp, err := h.storage.Add(ctx, reader, "function-data")
|
|
if err != nil {
|
|
return "", &HostFunctionError{Function: "storage_put", Cause: err}
|
|
}
|
|
|
|
return resp.Cid, nil
|
|
}
|
|
|
|
// StorageGet retrieves data from IPFS by CID.
|
|
func (h *HostFunctions) StorageGet(ctx context.Context, cid string) ([]byte, error) {
|
|
if h.storage == nil {
|
|
return nil, &HostFunctionError{Function: "storage_get", Cause: ErrStorageUnavailable}
|
|
}
|
|
|
|
reader, err := h.storage.Get(ctx, cid, h.ipfsAPIURL)
|
|
if err != nil {
|
|
return nil, &HostFunctionError{Function: "storage_get", Cause: err}
|
|
}
|
|
defer reader.Close()
|
|
|
|
data, err := io.ReadAll(reader)
|
|
if err != nil {
|
|
return nil, &HostFunctionError{Function: "storage_get", Cause: fmt.Errorf("failed to read data: %w", err)}
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// PubSub Operations
|
|
// -----------------------------------------------------------------------------
|
|
|
|
// PubSubPublish publishes a message to a topic.
|
|
func (h *HostFunctions) PubSubPublish(ctx context.Context, topic string, data []byte) error {
|
|
if h.pubsub == nil {
|
|
return &HostFunctionError{Function: "pubsub_publish", Cause: fmt.Errorf("pubsub not available")}
|
|
}
|
|
|
|
// The pubsub adapter handles namespacing internally
|
|
if err := h.pubsub.Publish(ctx, topic, data); err != nil {
|
|
return &HostFunctionError{Function: "pubsub_publish", Cause: err}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// WebSocket Operations
|
|
// -----------------------------------------------------------------------------
|
|
|
|
// WSSend sends data to a specific WebSocket client.
|
|
func (h *HostFunctions) WSSend(ctx context.Context, clientID string, data []byte) error {
|
|
if h.wsManager == nil {
|
|
return &HostFunctionError{Function: "ws_send", Cause: ErrWSNotAvailable}
|
|
}
|
|
|
|
// If no clientID provided, use the current invocation's client
|
|
if clientID == "" {
|
|
h.invCtxLock.RLock()
|
|
if h.invCtx != nil && h.invCtx.WSClientID != "" {
|
|
clientID = h.invCtx.WSClientID
|
|
}
|
|
h.invCtxLock.RUnlock()
|
|
}
|
|
|
|
if clientID == "" {
|
|
return &HostFunctionError{Function: "ws_send", Cause: ErrWSNotAvailable}
|
|
}
|
|
|
|
if err := h.wsManager.Send(clientID, data); err != nil {
|
|
return &HostFunctionError{Function: "ws_send", Cause: err}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// WSBroadcast sends data to all WebSocket clients subscribed to a topic.
|
|
func (h *HostFunctions) WSBroadcast(ctx context.Context, topic string, data []byte) error {
|
|
if h.wsManager == nil {
|
|
return &HostFunctionError{Function: "ws_broadcast", Cause: ErrWSNotAvailable}
|
|
}
|
|
|
|
if err := h.wsManager.Broadcast(topic, data); err != nil {
|
|
return &HostFunctionError{Function: "ws_broadcast", Cause: err}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// HTTP Operations
|
|
// -----------------------------------------------------------------------------
|
|
|
|
// HTTPFetch makes an outbound HTTP request.
|
|
func (h *HostFunctions) HTTPFetch(ctx context.Context, method, url string, headers map[string]string, body []byte) ([]byte, error) {
|
|
var bodyReader io.Reader
|
|
if len(body) > 0 {
|
|
bodyReader = bytes.NewReader(body)
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, method, url, bodyReader)
|
|
if err != nil {
|
|
h.logger.Error("http_fetch request creation error", zap.Error(err), zap.String("url", url))
|
|
errorResp := map[string]interface{}{
|
|
"error": "failed to create request: " + err.Error(),
|
|
"status": 0,
|
|
}
|
|
return json.Marshal(errorResp)
|
|
}
|
|
|
|
for key, value := range headers {
|
|
req.Header.Set(key, value)
|
|
}
|
|
|
|
resp, err := h.httpClient.Do(req)
|
|
if err != nil {
|
|
h.logger.Error("http_fetch transport error", zap.Error(err), zap.String("url", url))
|
|
errorResp := map[string]interface{}{
|
|
"error": err.Error(),
|
|
"status": 0, // Transport error
|
|
}
|
|
return json.Marshal(errorResp)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
respBody, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
h.logger.Error("http_fetch response read error", zap.Error(err), zap.String("url", url))
|
|
errorResp := map[string]interface{}{
|
|
"error": "failed to read response: " + err.Error(),
|
|
"status": resp.StatusCode,
|
|
}
|
|
return json.Marshal(errorResp)
|
|
}
|
|
|
|
// Encode response with status code
|
|
response := map[string]interface{}{
|
|
"status": resp.StatusCode,
|
|
"headers": resp.Header,
|
|
"body": string(respBody),
|
|
}
|
|
|
|
data, err := json.Marshal(response)
|
|
if err != nil {
|
|
return nil, &HostFunctionError{Function: "http_fetch", Cause: fmt.Errorf("failed to marshal response: %w", err)}
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// Context Operations
|
|
// -----------------------------------------------------------------------------
|
|
|
|
// GetEnv retrieves an environment variable for the function.
|
|
func (h *HostFunctions) GetEnv(ctx context.Context, key string) (string, error) {
|
|
h.invCtxLock.RLock()
|
|
defer h.invCtxLock.RUnlock()
|
|
|
|
if h.invCtx == nil || h.invCtx.EnvVars == nil {
|
|
return "", nil
|
|
}
|
|
|
|
return h.invCtx.EnvVars[key], nil
|
|
}
|
|
|
|
// GetSecret retrieves a decrypted secret.
|
|
func (h *HostFunctions) GetSecret(ctx context.Context, name string) (string, error) {
|
|
if h.secrets == nil {
|
|
return "", &HostFunctionError{Function: "get_secret", Cause: fmt.Errorf("secrets manager not available")}
|
|
}
|
|
|
|
h.invCtxLock.RLock()
|
|
namespace := ""
|
|
if h.invCtx != nil {
|
|
namespace = h.invCtx.Namespace
|
|
}
|
|
h.invCtxLock.RUnlock()
|
|
|
|
value, err := h.secrets.Get(ctx, namespace, name)
|
|
if err != nil {
|
|
return "", &HostFunctionError{Function: "get_secret", Cause: err}
|
|
}
|
|
|
|
return value, nil
|
|
}
|
|
|
|
// GetRequestID returns the current request ID.
|
|
func (h *HostFunctions) GetRequestID(ctx context.Context) string {
|
|
h.invCtxLock.RLock()
|
|
defer h.invCtxLock.RUnlock()
|
|
|
|
if h.invCtx == nil {
|
|
return ""
|
|
}
|
|
return h.invCtx.RequestID
|
|
}
|
|
|
|
// GetCallerWallet returns the wallet address of the caller.
|
|
func (h *HostFunctions) GetCallerWallet(ctx context.Context) string {
|
|
h.invCtxLock.RLock()
|
|
defer h.invCtxLock.RUnlock()
|
|
|
|
if h.invCtx == nil {
|
|
return ""
|
|
}
|
|
return h.invCtx.CallerWallet
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// Job Operations
|
|
// -----------------------------------------------------------------------------
|
|
|
|
// EnqueueBackground queues a function for background execution.
|
|
func (h *HostFunctions) EnqueueBackground(ctx context.Context, functionName string, payload []byte) (string, error) {
|
|
// This will be implemented when JobManager is integrated
|
|
// For now, return an error indicating it's not yet available
|
|
return "", &HostFunctionError{Function: "enqueue_background", Cause: fmt.Errorf("background jobs not yet implemented")}
|
|
}
|
|
|
|
// ScheduleOnce schedules a function to run once at a specific time.
|
|
func (h *HostFunctions) ScheduleOnce(ctx context.Context, functionName string, runAt time.Time, payload []byte) (string, error) {
|
|
// This will be implemented when Scheduler is integrated
|
|
return "", &HostFunctionError{Function: "schedule_once", Cause: fmt.Errorf("timers not yet implemented")}
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// Logging Operations
|
|
// -----------------------------------------------------------------------------
|
|
|
|
// LogInfo logs an info message.
|
|
func (h *HostFunctions) LogInfo(ctx context.Context, message string) {
|
|
h.logsLock.Lock()
|
|
defer h.logsLock.Unlock()
|
|
|
|
h.logs = append(h.logs, LogEntry{
|
|
Level: "info",
|
|
Message: message,
|
|
Timestamp: time.Now(),
|
|
})
|
|
|
|
h.logger.Info(message,
|
|
zap.String("request_id", h.GetRequestID(ctx)),
|
|
zap.String("level", "function"),
|
|
)
|
|
}
|
|
|
|
// LogError logs an error message.
|
|
func (h *HostFunctions) LogError(ctx context.Context, message string) {
|
|
h.logsLock.Lock()
|
|
defer h.logsLock.Unlock()
|
|
|
|
h.logs = append(h.logs, LogEntry{
|
|
Level: "error",
|
|
Message: message,
|
|
Timestamp: time.Now(),
|
|
})
|
|
|
|
h.logger.Error(message,
|
|
zap.String("request_id", h.GetRequestID(ctx)),
|
|
zap.String("level", "function"),
|
|
)
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// Secrets Manager Implementation (built-in)
|
|
// -----------------------------------------------------------------------------
|
|
|
|
// DBSecretsManager implements SecretsManager using the database.
|
|
type DBSecretsManager struct {
|
|
db rqlite.Client
|
|
encryptionKey []byte // 32-byte AES-256 key
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// Ensure DBSecretsManager implements SecretsManager.
|
|
var _ SecretsManager = (*DBSecretsManager)(nil)
|
|
|
|
// NewDBSecretsManager creates a secrets manager backed by the database.
|
|
func NewDBSecretsManager(db rqlite.Client, encryptionKeyHex string, logger *zap.Logger) (*DBSecretsManager, error) {
|
|
var key []byte
|
|
if encryptionKeyHex != "" {
|
|
var err error
|
|
key, err = hex.DecodeString(encryptionKeyHex)
|
|
if err != nil || len(key) != 32 {
|
|
return nil, fmt.Errorf("invalid encryption key: must be 32 bytes hex-encoded")
|
|
}
|
|
} else {
|
|
// Generate a random key if none provided
|
|
key = make([]byte, 32)
|
|
if _, err := rand.Read(key); err != nil {
|
|
return nil, fmt.Errorf("failed to generate encryption key: %w", err)
|
|
}
|
|
logger.Warn("Generated random secrets encryption key - secrets will not persist across restarts")
|
|
}
|
|
|
|
return &DBSecretsManager{
|
|
db: db,
|
|
encryptionKey: key,
|
|
logger: logger,
|
|
}, nil
|
|
}
|
|
|
|
// Set stores an encrypted secret.
|
|
func (s *DBSecretsManager) Set(ctx context.Context, namespace, name, value string) error {
|
|
encrypted, err := s.encrypt([]byte(value))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to encrypt secret: %w", err)
|
|
}
|
|
|
|
// Upsert the secret
|
|
query := `
|
|
INSERT INTO function_secrets (id, namespace, name, encrypted_value, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(namespace, name) DO UPDATE SET
|
|
encrypted_value = excluded.encrypted_value,
|
|
updated_at = excluded.updated_at
|
|
`
|
|
|
|
id := fmt.Sprintf("%s:%s", namespace, name)
|
|
now := time.Now()
|
|
if _, err := s.db.Exec(ctx, query, id, namespace, name, encrypted, now, now); err != nil {
|
|
return fmt.Errorf("failed to save secret: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Get retrieves a decrypted secret.
|
|
func (s *DBSecretsManager) Get(ctx context.Context, namespace, name string) (string, error) {
|
|
query := `SELECT encrypted_value FROM function_secrets WHERE namespace = ? AND name = ?`
|
|
|
|
var rows []struct {
|
|
EncryptedValue []byte `db:"encrypted_value"`
|
|
}
|
|
if err := s.db.Query(ctx, &rows, query, namespace, name); err != nil {
|
|
return "", fmt.Errorf("failed to query secret: %w", err)
|
|
}
|
|
|
|
if len(rows) == 0 {
|
|
return "", ErrSecretNotFound
|
|
}
|
|
|
|
decrypted, err := s.decrypt(rows[0].EncryptedValue)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to decrypt secret: %w", err)
|
|
}
|
|
|
|
return string(decrypted), nil
|
|
}
|
|
|
|
// List returns all secret names for a namespace.
|
|
func (s *DBSecretsManager) List(ctx context.Context, namespace string) ([]string, error) {
|
|
query := `SELECT name FROM function_secrets WHERE namespace = ? ORDER BY name`
|
|
|
|
var rows []struct {
|
|
Name string `db:"name"`
|
|
}
|
|
if err := s.db.Query(ctx, &rows, query, namespace); err != nil {
|
|
return nil, fmt.Errorf("failed to list secrets: %w", err)
|
|
}
|
|
|
|
names := make([]string, len(rows))
|
|
for i, row := range rows {
|
|
names[i] = row.Name
|
|
}
|
|
|
|
return names, nil
|
|
}
|
|
|
|
// Delete removes a secret.
|
|
func (s *DBSecretsManager) Delete(ctx context.Context, namespace, name string) error {
|
|
query := `DELETE FROM function_secrets WHERE namespace = ? AND name = ?`
|
|
|
|
result, err := s.db.Exec(ctx, query, namespace, name)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete secret: %w", err)
|
|
}
|
|
|
|
affected, _ := result.RowsAffected()
|
|
if affected == 0 {
|
|
return ErrSecretNotFound
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// encrypt encrypts data using AES-256-GCM.
|
|
func (s *DBSecretsManager) encrypt(plaintext []byte) ([]byte, error) {
|
|
block, err := aes.NewCipher(s.encryptionKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
gcm, err := cipher.NewGCM(block)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
nonce := make([]byte, gcm.NonceSize())
|
|
if _, err := rand.Read(nonce); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return gcm.Seal(nonce, nonce, plaintext, nil), nil
|
|
}
|
|
|
|
// decrypt decrypts data using AES-256-GCM.
|
|
func (s *DBSecretsManager) decrypt(ciphertext []byte) ([]byte, error) {
|
|
block, err := aes.NewCipher(s.encryptionKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
gcm, err := cipher.NewGCM(block)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
nonceSize := gcm.NonceSize()
|
|
if len(ciphertext) < nonceSize {
|
|
return nil, fmt.Errorf("ciphertext too short")
|
|
}
|
|
|
|
nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:]
|
|
return gcm.Open(nil, nonce, ciphertext, nil)
|
|
}
|