network/pkg/serverless/hostfuncs.go
anonpenguin23 fff665374f feat: disable debug logging in Rqlite MCP server to reduce disk writes
- Commented out debug logging statements in the Rqlite MCP server to prevent excessive disk writes during operation.
- Added a new PubSubAdapter method in the client for direct access to the pubsub.ClientAdapter, bypassing authentication checks for serverless functions.
- Integrated the pubsub adapter into the gateway for serverless function support.
- Implemented a new pubsub_publish host function in the serverless engine for publishing messages to topics.
2026-01-05 10:22:55 +02:00

688 lines
20 KiB
Go

package serverless
import (
"bytes"
"context"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
"github.com/DeBrosOfficial/network/pkg/ipfs"
"github.com/DeBrosOfficial/network/pkg/pubsub"
"github.com/DeBrosOfficial/network/pkg/rqlite"
"github.com/DeBrosOfficial/network/pkg/tlsutil"
olriclib "github.com/olric-data/olric"
"go.uber.org/zap"
)
// Ensure HostFunctions implements HostServices interface.
var _ HostServices = (*HostFunctions)(nil)
// HostFunctions provides the bridge between WASM functions and Orama services.
// It implements the HostServices interface and is injected into the execution context.
type HostFunctions struct {
db rqlite.Client
cacheClient olriclib.Client
storage ipfs.IPFSClient
ipfsAPIURL string
pubsub *pubsub.ClientAdapter
wsManager WebSocketManager
secrets SecretsManager
httpClient *http.Client
logger *zap.Logger
// Current invocation context (set per-execution)
invCtx *InvocationContext
invCtxLock sync.RWMutex
// Captured logs for this invocation
logs []LogEntry
logsLock sync.Mutex
}
// HostFunctionsConfig holds configuration for HostFunctions.
type HostFunctionsConfig struct {
IPFSAPIURL string
HTTPTimeout time.Duration
}
// NewHostFunctions creates a new HostFunctions instance.
func NewHostFunctions(
db rqlite.Client,
cacheClient olriclib.Client,
storage ipfs.IPFSClient,
pubsubAdapter *pubsub.ClientAdapter,
wsManager WebSocketManager,
secrets SecretsManager,
cfg HostFunctionsConfig,
logger *zap.Logger,
) *HostFunctions {
httpTimeout := cfg.HTTPTimeout
if httpTimeout == 0 {
httpTimeout = 30 * time.Second
}
return &HostFunctions{
db: db,
cacheClient: cacheClient,
storage: storage,
ipfsAPIURL: cfg.IPFSAPIURL,
pubsub: pubsubAdapter,
wsManager: wsManager,
secrets: secrets,
httpClient: tlsutil.NewHTTPClient(httpTimeout),
logger: logger,
logs: make([]LogEntry, 0),
}
}
// SetInvocationContext sets the current invocation context.
// Must be called before executing a function.
func (h *HostFunctions) SetInvocationContext(invCtx *InvocationContext) {
h.invCtxLock.Lock()
defer h.invCtxLock.Unlock()
h.invCtx = invCtx
h.logs = make([]LogEntry, 0) // Reset logs for new invocation
}
// GetLogs returns the captured logs for the current invocation.
func (h *HostFunctions) GetLogs() []LogEntry {
h.logsLock.Lock()
defer h.logsLock.Unlock()
logsCopy := make([]LogEntry, len(h.logs))
copy(logsCopy, h.logs)
return logsCopy
}
// ClearContext clears the invocation context after execution.
func (h *HostFunctions) ClearContext() {
h.invCtxLock.Lock()
defer h.invCtxLock.Unlock()
h.invCtx = nil
}
// -----------------------------------------------------------------------------
// Database Operations
// -----------------------------------------------------------------------------
// DBQuery executes a SELECT query and returns JSON-encoded results.
func (h *HostFunctions) DBQuery(ctx context.Context, query string, args []interface{}) ([]byte, error) {
if h.db == nil {
return nil, &HostFunctionError{Function: "db_query", Cause: ErrDatabaseUnavailable}
}
var results []map[string]interface{}
if err := h.db.Query(ctx, &results, query, args...); err != nil {
return nil, &HostFunctionError{Function: "db_query", Cause: err}
}
data, err := json.Marshal(results)
if err != nil {
return nil, &HostFunctionError{Function: "db_query", Cause: fmt.Errorf("failed to marshal results: %w", err)}
}
return data, nil
}
// DBExecute executes an INSERT/UPDATE/DELETE query and returns affected rows.
func (h *HostFunctions) DBExecute(ctx context.Context, query string, args []interface{}) (int64, error) {
if h.db == nil {
return 0, &HostFunctionError{Function: "db_execute", Cause: ErrDatabaseUnavailable}
}
result, err := h.db.Exec(ctx, query, args...)
if err != nil {
return 0, &HostFunctionError{Function: "db_execute", Cause: err}
}
affected, _ := result.RowsAffected()
return affected, nil
}
// -----------------------------------------------------------------------------
// Cache Operations
// -----------------------------------------------------------------------------
const cacheDMapName = "serverless_cache"
// CacheGet retrieves a value from the cache.
func (h *HostFunctions) CacheGet(ctx context.Context, key string) ([]byte, error) {
if h.cacheClient == nil {
return nil, &HostFunctionError{Function: "cache_get", Cause: ErrCacheUnavailable}
}
dm, err := h.cacheClient.NewDMap(cacheDMapName)
if err != nil {
return nil, &HostFunctionError{Function: "cache_get", Cause: fmt.Errorf("failed to get DMap: %w", err)}
}
result, err := dm.Get(ctx, key)
if err != nil {
return nil, &HostFunctionError{Function: "cache_get", Cause: err}
}
value, err := result.Byte()
if err != nil {
return nil, &HostFunctionError{Function: "cache_get", Cause: fmt.Errorf("failed to decode value: %w", err)}
}
return value, nil
}
// CacheSet stores a value in the cache with optional TTL.
// Note: TTL is currently not supported by the underlying Olric DMap.Put method.
// Values are stored indefinitely until explicitly deleted.
func (h *HostFunctions) CacheSet(ctx context.Context, key string, value []byte, ttlSeconds int64) error {
if h.cacheClient == nil {
return &HostFunctionError{Function: "cache_set", Cause: ErrCacheUnavailable}
}
dm, err := h.cacheClient.NewDMap(cacheDMapName)
if err != nil {
return &HostFunctionError{Function: "cache_set", Cause: fmt.Errorf("failed to get DMap: %w", err)}
}
// Note: Olric DMap.Put doesn't support TTL in the basic API
// For TTL support, consider using Olric's Expire API separately
if err := dm.Put(ctx, key, value); err != nil {
return &HostFunctionError{Function: "cache_set", Cause: err}
}
return nil
}
// CacheDelete removes a value from the cache.
func (h *HostFunctions) CacheDelete(ctx context.Context, key string) error {
if h.cacheClient == nil {
return &HostFunctionError{Function: "cache_delete", Cause: ErrCacheUnavailable}
}
dm, err := h.cacheClient.NewDMap(cacheDMapName)
if err != nil {
return &HostFunctionError{Function: "cache_delete", Cause: fmt.Errorf("failed to get DMap: %w", err)}
}
if _, err := dm.Delete(ctx, key); err != nil {
return &HostFunctionError{Function: "cache_delete", Cause: err}
}
return nil
}
// CacheIncr atomically increments a numeric value in cache by 1 and returns the new value.
// If the key doesn't exist, it is initialized to 0 before incrementing.
// Returns an error if the value exists but is not numeric.
func (h *HostFunctions) CacheIncr(ctx context.Context, key string) (int64, error) {
return h.CacheIncrBy(ctx, key, 1)
}
// CacheIncrBy atomically increments a numeric value by delta and returns the new value.
// If the key doesn't exist, it is initialized to 0 before incrementing.
// Returns an error if the value exists but is not numeric.
func (h *HostFunctions) CacheIncrBy(ctx context.Context, key string, delta int64) (int64, error) {
if h.cacheClient == nil {
return 0, &HostFunctionError{Function: "cache_incr_by", Cause: ErrCacheUnavailable}
}
dm, err := h.cacheClient.NewDMap(cacheDMapName)
if err != nil {
return 0, &HostFunctionError{Function: "cache_incr_by", Cause: fmt.Errorf("failed to get DMap: %w", err)}
}
// Olric's Incr method atomically increments a numeric value
// It initializes the key to 0 if it doesn't exist, then increments by delta
// Note: Olric's Incr takes int (not int64) and returns int
newValue, err := dm.Incr(ctx, key, int(delta))
if err != nil {
return 0, &HostFunctionError{Function: "cache_incr_by", Cause: fmt.Errorf("failed to increment: %w", err)}
}
return int64(newValue), nil
}
// -----------------------------------------------------------------------------
// Storage Operations
// -----------------------------------------------------------------------------
// StoragePut uploads data to IPFS and returns the CID.
func (h *HostFunctions) StoragePut(ctx context.Context, data []byte) (string, error) {
if h.storage == nil {
return "", &HostFunctionError{Function: "storage_put", Cause: ErrStorageUnavailable}
}
reader := bytes.NewReader(data)
resp, err := h.storage.Add(ctx, reader, "function-data")
if err != nil {
return "", &HostFunctionError{Function: "storage_put", Cause: err}
}
return resp.Cid, nil
}
// StorageGet retrieves data from IPFS by CID.
func (h *HostFunctions) StorageGet(ctx context.Context, cid string) ([]byte, error) {
if h.storage == nil {
return nil, &HostFunctionError{Function: "storage_get", Cause: ErrStorageUnavailable}
}
reader, err := h.storage.Get(ctx, cid, h.ipfsAPIURL)
if err != nil {
return nil, &HostFunctionError{Function: "storage_get", Cause: err}
}
defer reader.Close()
data, err := io.ReadAll(reader)
if err != nil {
return nil, &HostFunctionError{Function: "storage_get", Cause: fmt.Errorf("failed to read data: %w", err)}
}
return data, nil
}
// -----------------------------------------------------------------------------
// PubSub Operations
// -----------------------------------------------------------------------------
// PubSubPublish publishes a message to a topic.
func (h *HostFunctions) PubSubPublish(ctx context.Context, topic string, data []byte) error {
if h.pubsub == nil {
return &HostFunctionError{Function: "pubsub_publish", Cause: fmt.Errorf("pubsub not available")}
}
// The pubsub adapter handles namespacing internally
if err := h.pubsub.Publish(ctx, topic, data); err != nil {
return &HostFunctionError{Function: "pubsub_publish", Cause: err}
}
return nil
}
// -----------------------------------------------------------------------------
// WebSocket Operations
// -----------------------------------------------------------------------------
// WSSend sends data to a specific WebSocket client.
func (h *HostFunctions) WSSend(ctx context.Context, clientID string, data []byte) error {
if h.wsManager == nil {
return &HostFunctionError{Function: "ws_send", Cause: ErrWSNotAvailable}
}
// If no clientID provided, use the current invocation's client
if clientID == "" {
h.invCtxLock.RLock()
if h.invCtx != nil && h.invCtx.WSClientID != "" {
clientID = h.invCtx.WSClientID
}
h.invCtxLock.RUnlock()
}
if clientID == "" {
return &HostFunctionError{Function: "ws_send", Cause: ErrWSNotAvailable}
}
if err := h.wsManager.Send(clientID, data); err != nil {
return &HostFunctionError{Function: "ws_send", Cause: err}
}
return nil
}
// WSBroadcast sends data to all WebSocket clients subscribed to a topic.
func (h *HostFunctions) WSBroadcast(ctx context.Context, topic string, data []byte) error {
if h.wsManager == nil {
return &HostFunctionError{Function: "ws_broadcast", Cause: ErrWSNotAvailable}
}
if err := h.wsManager.Broadcast(topic, data); err != nil {
return &HostFunctionError{Function: "ws_broadcast", Cause: err}
}
return nil
}
// -----------------------------------------------------------------------------
// HTTP Operations
// -----------------------------------------------------------------------------
// HTTPFetch makes an outbound HTTP request.
func (h *HostFunctions) HTTPFetch(ctx context.Context, method, url string, headers map[string]string, body []byte) ([]byte, error) {
var bodyReader io.Reader
if len(body) > 0 {
bodyReader = bytes.NewReader(body)
}
req, err := http.NewRequestWithContext(ctx, method, url, bodyReader)
if err != nil {
h.logger.Error("http_fetch request creation error", zap.Error(err), zap.String("url", url))
errorResp := map[string]interface{}{
"error": "failed to create request: " + err.Error(),
"status": 0,
}
return json.Marshal(errorResp)
}
for key, value := range headers {
req.Header.Set(key, value)
}
resp, err := h.httpClient.Do(req)
if err != nil {
h.logger.Error("http_fetch transport error", zap.Error(err), zap.String("url", url))
errorResp := map[string]interface{}{
"error": err.Error(),
"status": 0, // Transport error
}
return json.Marshal(errorResp)
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
h.logger.Error("http_fetch response read error", zap.Error(err), zap.String("url", url))
errorResp := map[string]interface{}{
"error": "failed to read response: " + err.Error(),
"status": resp.StatusCode,
}
return json.Marshal(errorResp)
}
// Encode response with status code
response := map[string]interface{}{
"status": resp.StatusCode,
"headers": resp.Header,
"body": string(respBody),
}
data, err := json.Marshal(response)
if err != nil {
return nil, &HostFunctionError{Function: "http_fetch", Cause: fmt.Errorf("failed to marshal response: %w", err)}
}
return data, nil
}
// -----------------------------------------------------------------------------
// Context Operations
// -----------------------------------------------------------------------------
// GetEnv retrieves an environment variable for the function.
func (h *HostFunctions) GetEnv(ctx context.Context, key string) (string, error) {
h.invCtxLock.RLock()
defer h.invCtxLock.RUnlock()
if h.invCtx == nil || h.invCtx.EnvVars == nil {
return "", nil
}
return h.invCtx.EnvVars[key], nil
}
// GetSecret retrieves a decrypted secret.
func (h *HostFunctions) GetSecret(ctx context.Context, name string) (string, error) {
if h.secrets == nil {
return "", &HostFunctionError{Function: "get_secret", Cause: fmt.Errorf("secrets manager not available")}
}
h.invCtxLock.RLock()
namespace := ""
if h.invCtx != nil {
namespace = h.invCtx.Namespace
}
h.invCtxLock.RUnlock()
value, err := h.secrets.Get(ctx, namespace, name)
if err != nil {
return "", &HostFunctionError{Function: "get_secret", Cause: err}
}
return value, nil
}
// GetRequestID returns the current request ID.
func (h *HostFunctions) GetRequestID(ctx context.Context) string {
h.invCtxLock.RLock()
defer h.invCtxLock.RUnlock()
if h.invCtx == nil {
return ""
}
return h.invCtx.RequestID
}
// GetCallerWallet returns the wallet address of the caller.
func (h *HostFunctions) GetCallerWallet(ctx context.Context) string {
h.invCtxLock.RLock()
defer h.invCtxLock.RUnlock()
if h.invCtx == nil {
return ""
}
return h.invCtx.CallerWallet
}
// -----------------------------------------------------------------------------
// Job Operations
// -----------------------------------------------------------------------------
// EnqueueBackground queues a function for background execution.
func (h *HostFunctions) EnqueueBackground(ctx context.Context, functionName string, payload []byte) (string, error) {
// This will be implemented when JobManager is integrated
// For now, return an error indicating it's not yet available
return "", &HostFunctionError{Function: "enqueue_background", Cause: fmt.Errorf("background jobs not yet implemented")}
}
// ScheduleOnce schedules a function to run once at a specific time.
func (h *HostFunctions) ScheduleOnce(ctx context.Context, functionName string, runAt time.Time, payload []byte) (string, error) {
// This will be implemented when Scheduler is integrated
return "", &HostFunctionError{Function: "schedule_once", Cause: fmt.Errorf("timers not yet implemented")}
}
// -----------------------------------------------------------------------------
// Logging Operations
// -----------------------------------------------------------------------------
// LogInfo logs an info message.
func (h *HostFunctions) LogInfo(ctx context.Context, message string) {
h.logsLock.Lock()
defer h.logsLock.Unlock()
h.logs = append(h.logs, LogEntry{
Level: "info",
Message: message,
Timestamp: time.Now(),
})
h.logger.Info(message,
zap.String("request_id", h.GetRequestID(ctx)),
zap.String("level", "function"),
)
}
// LogError logs an error message.
func (h *HostFunctions) LogError(ctx context.Context, message string) {
h.logsLock.Lock()
defer h.logsLock.Unlock()
h.logs = append(h.logs, LogEntry{
Level: "error",
Message: message,
Timestamp: time.Now(),
})
h.logger.Error(message,
zap.String("request_id", h.GetRequestID(ctx)),
zap.String("level", "function"),
)
}
// -----------------------------------------------------------------------------
// Secrets Manager Implementation (built-in)
// -----------------------------------------------------------------------------
// DBSecretsManager implements SecretsManager using the database.
type DBSecretsManager struct {
db rqlite.Client
encryptionKey []byte // 32-byte AES-256 key
logger *zap.Logger
}
// Ensure DBSecretsManager implements SecretsManager.
var _ SecretsManager = (*DBSecretsManager)(nil)
// NewDBSecretsManager creates a secrets manager backed by the database.
func NewDBSecretsManager(db rqlite.Client, encryptionKeyHex string, logger *zap.Logger) (*DBSecretsManager, error) {
var key []byte
if encryptionKeyHex != "" {
var err error
key, err = hex.DecodeString(encryptionKeyHex)
if err != nil || len(key) != 32 {
return nil, fmt.Errorf("invalid encryption key: must be 32 bytes hex-encoded")
}
} else {
// Generate a random key if none provided
key = make([]byte, 32)
if _, err := rand.Read(key); err != nil {
return nil, fmt.Errorf("failed to generate encryption key: %w", err)
}
logger.Warn("Generated random secrets encryption key - secrets will not persist across restarts")
}
return &DBSecretsManager{
db: db,
encryptionKey: key,
logger: logger,
}, nil
}
// Set stores an encrypted secret.
func (s *DBSecretsManager) Set(ctx context.Context, namespace, name, value string) error {
encrypted, err := s.encrypt([]byte(value))
if err != nil {
return fmt.Errorf("failed to encrypt secret: %w", err)
}
// Upsert the secret
query := `
INSERT INTO function_secrets (id, namespace, name, encrypted_value, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(namespace, name) DO UPDATE SET
encrypted_value = excluded.encrypted_value,
updated_at = excluded.updated_at
`
id := fmt.Sprintf("%s:%s", namespace, name)
now := time.Now()
if _, err := s.db.Exec(ctx, query, id, namespace, name, encrypted, now, now); err != nil {
return fmt.Errorf("failed to save secret: %w", err)
}
return nil
}
// Get retrieves a decrypted secret.
func (s *DBSecretsManager) Get(ctx context.Context, namespace, name string) (string, error) {
query := `SELECT encrypted_value FROM function_secrets WHERE namespace = ? AND name = ?`
var rows []struct {
EncryptedValue []byte `db:"encrypted_value"`
}
if err := s.db.Query(ctx, &rows, query, namespace, name); err != nil {
return "", fmt.Errorf("failed to query secret: %w", err)
}
if len(rows) == 0 {
return "", ErrSecretNotFound
}
decrypted, err := s.decrypt(rows[0].EncryptedValue)
if err != nil {
return "", fmt.Errorf("failed to decrypt secret: %w", err)
}
return string(decrypted), nil
}
// List returns all secret names for a namespace.
func (s *DBSecretsManager) List(ctx context.Context, namespace string) ([]string, error) {
query := `SELECT name FROM function_secrets WHERE namespace = ? ORDER BY name`
var rows []struct {
Name string `db:"name"`
}
if err := s.db.Query(ctx, &rows, query, namespace); err != nil {
return nil, fmt.Errorf("failed to list secrets: %w", err)
}
names := make([]string, len(rows))
for i, row := range rows {
names[i] = row.Name
}
return names, nil
}
// Delete removes a secret.
func (s *DBSecretsManager) Delete(ctx context.Context, namespace, name string) error {
query := `DELETE FROM function_secrets WHERE namespace = ? AND name = ?`
result, err := s.db.Exec(ctx, query, namespace, name)
if err != nil {
return fmt.Errorf("failed to delete secret: %w", err)
}
affected, _ := result.RowsAffected()
if affected == 0 {
return ErrSecretNotFound
}
return nil
}
// encrypt encrypts data using AES-256-GCM.
func (s *DBSecretsManager) encrypt(plaintext []byte) ([]byte, error) {
block, err := aes.NewCipher(s.encryptionKey)
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
nonce := make([]byte, gcm.NonceSize())
if _, err := rand.Read(nonce); err != nil {
return nil, err
}
return gcm.Seal(nonce, nonce, plaintext, nil), nil
}
// decrypt decrypts data using AES-256-GCM.
func (s *DBSecretsManager) decrypt(ciphertext []byte) ([]byte, error) {
block, err := aes.NewCipher(s.encryptionKey)
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
nonceSize := gcm.NonceSize()
if len(ciphertext) < nonceSize {
return nil, fmt.Errorf("ciphertext too short")
}
nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:]
return gcm.Open(nil, nonce, ciphertext, nil)
}