diff --git a/pkg/serverless/engine.go b/pkg/serverless/engine.go index 95ec187..bef0e8b 100644 --- a/pkg/serverless/engine.go +++ b/pkg/serverless/engine.go @@ -495,6 +495,8 @@ func (e *Engine) registerHostModule(ctx context.Context) error { NewFunctionBuilder().WithFunc(e.hDBExecute).Export("db_execute"). NewFunctionBuilder().WithFunc(e.hCacheGet).Export("cache_get"). NewFunctionBuilder().WithFunc(e.hCacheSet).Export("cache_set"). + NewFunctionBuilder().WithFunc(e.hCacheIncr).Export("cache_incr"). + NewFunctionBuilder().WithFunc(e.hCacheIncrBy).Export("cache_incr_by"). NewFunctionBuilder().WithFunc(e.hHTTPFetch).Export("http_fetch"). NewFunctionBuilder().WithFunc(e.hPubSubPublish).Export("pubsub_publish"). NewFunctionBuilder().WithFunc(e.hLogInfo).Export("log_info"). @@ -614,6 +616,32 @@ func (e *Engine) hCacheSet(ctx context.Context, mod api.Module, keyPtr, keyLen, _ = e.hostServices.CacheSet(ctx, string(key), val, ttl) } +func (e *Engine) hCacheIncr(ctx context.Context, mod api.Module, keyPtr, keyLen uint32) int64 { + key, ok := mod.Memory().Read(keyPtr, keyLen) + if !ok { + return 0 + } + val, err := e.hostServices.CacheIncr(ctx, string(key)) + if err != nil { + e.logger.Error("host function cache_incr failed", zap.Error(err), zap.String("key", string(key))) + return 0 + } + return val +} + +func (e *Engine) hCacheIncrBy(ctx context.Context, mod api.Module, keyPtr, keyLen uint32, delta int64) int64 { + key, ok := mod.Memory().Read(keyPtr, keyLen) + if !ok { + return 0 + } + val, err := e.hostServices.CacheIncrBy(ctx, string(key), delta) + if err != nil { + e.logger.Error("host function cache_incr_by failed", zap.Error(err), zap.String("key", string(key)), zap.Int64("delta", delta)) + return 0 + } + return val +} + func (e *Engine) hHTTPFetch(ctx context.Context, mod api.Module, methodPtr, methodLen, urlPtr, urlLen, headersPtr, headersLen, bodyPtr, bodyLen uint32) uint64 { method, ok := mod.Memory().Read(methodPtr, methodLen) if !ok { diff --git a/pkg/serverless/hostfuncs.go b/pkg/serverless/hostfuncs.go index ead5e35..26f7838 100644 --- a/pkg/serverless/hostfuncs.go +++ b/pkg/serverless/hostfuncs.go @@ -216,6 +216,37 @@ func (h *HostFunctions) CacheDelete(ctx context.Context, key string) error { return nil } +// CacheIncr atomically increments a numeric value in cache by 1 and returns the new value. +// If the key doesn't exist, it is initialized to 0 before incrementing. +// Returns an error if the value exists but is not numeric. +func (h *HostFunctions) CacheIncr(ctx context.Context, key string) (int64, error) { + return h.CacheIncrBy(ctx, key, 1) +} + +// CacheIncrBy atomically increments a numeric value by delta and returns the new value. +// If the key doesn't exist, it is initialized to 0 before incrementing. +// Returns an error if the value exists but is not numeric. +func (h *HostFunctions) CacheIncrBy(ctx context.Context, key string, delta int64) (int64, error) { + if h.cacheClient == nil { + return 0, &HostFunctionError{Function: "cache_incr_by", Cause: ErrCacheUnavailable} + } + + dm, err := h.cacheClient.NewDMap(cacheDMapName) + if err != nil { + return 0, &HostFunctionError{Function: "cache_incr_by", Cause: fmt.Errorf("failed to get DMap: %w", err)} + } + + // Olric's Incr method atomically increments a numeric value + // It initializes the key to 0 if it doesn't exist, then increments by delta + // Note: Olric's Incr takes int (not int64) and returns int + newValue, err := dm.Incr(ctx, key, int(delta)) + if err != nil { + return 0, &HostFunctionError{Function: "cache_incr_by", Cause: fmt.Errorf("failed to increment: %w", err)} + } + + return int64(newValue), nil +} + // ----------------------------------------------------------------------------- // Storage Operations // ----------------------------------------------------------------------------- diff --git a/pkg/serverless/mocks_test.go b/pkg/serverless/mocks_test.go index 80be551..a0ce990 100644 --- a/pkg/serverless/mocks_test.go +++ b/pkg/serverless/mocks_test.go @@ -375,6 +375,35 @@ func (m *MockDMap) Delete(ctx context.Context, key string) (bool, error) { return ok, nil } +func (m *MockDMap) Incr(ctx context.Context, key string, delta int64) (int64, error) { + var currentValue int64 + + // Get current value if it exists + if val, ok := m.data[key]; ok { + // Try to parse as int64 + var err error + currentValue, err = parseInt64FromBytes(val) + if err != nil { + return 0, fmt.Errorf("value is not numeric") + } + } + + // Increment + newValue := currentValue + delta + + // Store the new value + m.data[key] = []byte(fmt.Sprintf("%d", newValue)) + + return newValue, nil +} + +// parseInt64FromBytes parses an int64 from byte slice +func parseInt64FromBytes(data []byte) (int64, error) { + var val int64 + _, err := fmt.Sscanf(string(data), "%d", &val) + return val, err +} + type MockGetResponse struct { val []byte } diff --git a/pkg/serverless/types.go b/pkg/serverless/types.go index 72e6f8a..66a13f7 100644 --- a/pkg/serverless/types.go +++ b/pkg/serverless/types.go @@ -328,6 +328,8 @@ type HostServices interface { CacheGet(ctx context.Context, key string) ([]byte, error) CacheSet(ctx context.Context, key string, value []byte, ttlSeconds int64) error CacheDelete(ctx context.Context, key string) error + CacheIncr(ctx context.Context, key string) (int64, error) + CacheIncrBy(ctx context.Context, key string, delta int64) (int64, error) // Storage operations StoragePut(ctx context.Context, data []byte) (string, error)