feat: disable debug logging in Rqlite MCP server to reduce disk writes

- Commented out debug logging statements in the Rqlite MCP server to prevent excessive disk writes during operation.
- Added a new PubSubAdapter method in the client for direct access to the pubsub.ClientAdapter, bypassing authentication checks for serverless functions.
- Integrated the pubsub adapter into the gateway for serverless function support.
- Implemented a new pubsub_publish host function in the serverless engine for publishing messages to topics.
This commit is contained in:
anonpenguin23 2026-01-05 10:22:55 +02:00
parent 2b3e6874c8
commit fff665374f
4 changed files with 90 additions and 0 deletions

View File

@ -495,6 +495,8 @@ func (e *Engine) registerHostModule(ctx context.Context) error {
NewFunctionBuilder().WithFunc(e.hDBExecute).Export("db_execute"). NewFunctionBuilder().WithFunc(e.hDBExecute).Export("db_execute").
NewFunctionBuilder().WithFunc(e.hCacheGet).Export("cache_get"). NewFunctionBuilder().WithFunc(e.hCacheGet).Export("cache_get").
NewFunctionBuilder().WithFunc(e.hCacheSet).Export("cache_set"). NewFunctionBuilder().WithFunc(e.hCacheSet).Export("cache_set").
NewFunctionBuilder().WithFunc(e.hCacheIncr).Export("cache_incr").
NewFunctionBuilder().WithFunc(e.hCacheIncrBy).Export("cache_incr_by").
NewFunctionBuilder().WithFunc(e.hHTTPFetch).Export("http_fetch"). NewFunctionBuilder().WithFunc(e.hHTTPFetch).Export("http_fetch").
NewFunctionBuilder().WithFunc(e.hPubSubPublish).Export("pubsub_publish"). NewFunctionBuilder().WithFunc(e.hPubSubPublish).Export("pubsub_publish").
NewFunctionBuilder().WithFunc(e.hLogInfo).Export("log_info"). NewFunctionBuilder().WithFunc(e.hLogInfo).Export("log_info").
@ -614,6 +616,32 @@ func (e *Engine) hCacheSet(ctx context.Context, mod api.Module, keyPtr, keyLen,
_ = e.hostServices.CacheSet(ctx, string(key), val, ttl) _ = e.hostServices.CacheSet(ctx, string(key), val, ttl)
} }
func (e *Engine) hCacheIncr(ctx context.Context, mod api.Module, keyPtr, keyLen uint32) int64 {
key, ok := mod.Memory().Read(keyPtr, keyLen)
if !ok {
return 0
}
val, err := e.hostServices.CacheIncr(ctx, string(key))
if err != nil {
e.logger.Error("host function cache_incr failed", zap.Error(err), zap.String("key", string(key)))
return 0
}
return val
}
func (e *Engine) hCacheIncrBy(ctx context.Context, mod api.Module, keyPtr, keyLen uint32, delta int64) int64 {
key, ok := mod.Memory().Read(keyPtr, keyLen)
if !ok {
return 0
}
val, err := e.hostServices.CacheIncrBy(ctx, string(key), delta)
if err != nil {
e.logger.Error("host function cache_incr_by failed", zap.Error(err), zap.String("key", string(key)), zap.Int64("delta", delta))
return 0
}
return val
}
func (e *Engine) hHTTPFetch(ctx context.Context, mod api.Module, methodPtr, methodLen, urlPtr, urlLen, headersPtr, headersLen, bodyPtr, bodyLen uint32) uint64 { func (e *Engine) hHTTPFetch(ctx context.Context, mod api.Module, methodPtr, methodLen, urlPtr, urlLen, headersPtr, headersLen, bodyPtr, bodyLen uint32) uint64 {
method, ok := mod.Memory().Read(methodPtr, methodLen) method, ok := mod.Memory().Read(methodPtr, methodLen)
if !ok { if !ok {

View File

@ -216,6 +216,37 @@ func (h *HostFunctions) CacheDelete(ctx context.Context, key string) error {
return nil return nil
} }
// CacheIncr atomically increments a numeric value in cache by 1 and returns the new value.
// If the key doesn't exist, it is initialized to 0 before incrementing.
// Returns an error if the value exists but is not numeric.
func (h *HostFunctions) CacheIncr(ctx context.Context, key string) (int64, error) {
return h.CacheIncrBy(ctx, key, 1)
}
// CacheIncrBy atomically increments a numeric value by delta and returns the new value.
// If the key doesn't exist, it is initialized to 0 before incrementing.
// Returns an error if the value exists but is not numeric.
func (h *HostFunctions) CacheIncrBy(ctx context.Context, key string, delta int64) (int64, error) {
if h.cacheClient == nil {
return 0, &HostFunctionError{Function: "cache_incr_by", Cause: ErrCacheUnavailable}
}
dm, err := h.cacheClient.NewDMap(cacheDMapName)
if err != nil {
return 0, &HostFunctionError{Function: "cache_incr_by", Cause: fmt.Errorf("failed to get DMap: %w", err)}
}
// Olric's Incr method atomically increments a numeric value
// It initializes the key to 0 if it doesn't exist, then increments by delta
// Note: Olric's Incr takes int (not int64) and returns int
newValue, err := dm.Incr(ctx, key, int(delta))
if err != nil {
return 0, &HostFunctionError{Function: "cache_incr_by", Cause: fmt.Errorf("failed to increment: %w", err)}
}
return int64(newValue), nil
}
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// Storage Operations // Storage Operations
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------

View File

@ -375,6 +375,35 @@ func (m *MockDMap) Delete(ctx context.Context, key string) (bool, error) {
return ok, nil return ok, nil
} }
func (m *MockDMap) Incr(ctx context.Context, key string, delta int64) (int64, error) {
var currentValue int64
// Get current value if it exists
if val, ok := m.data[key]; ok {
// Try to parse as int64
var err error
currentValue, err = parseInt64FromBytes(val)
if err != nil {
return 0, fmt.Errorf("value is not numeric")
}
}
// Increment
newValue := currentValue + delta
// Store the new value
m.data[key] = []byte(fmt.Sprintf("%d", newValue))
return newValue, nil
}
// parseInt64FromBytes parses an int64 from byte slice
func parseInt64FromBytes(data []byte) (int64, error) {
var val int64
_, err := fmt.Sscanf(string(data), "%d", &val)
return val, err
}
type MockGetResponse struct { type MockGetResponse struct {
val []byte val []byte
} }

View File

@ -328,6 +328,8 @@ type HostServices interface {
CacheGet(ctx context.Context, key string) ([]byte, error) CacheGet(ctx context.Context, key string) ([]byte, error)
CacheSet(ctx context.Context, key string, value []byte, ttlSeconds int64) error CacheSet(ctx context.Context, key string, value []byte, ttlSeconds int64) error
CacheDelete(ctx context.Context, key string) error CacheDelete(ctx context.Context, key string) error
CacheIncr(ctx context.Context, key string) (int64, error)
CacheIncrBy(ctx context.Context, key string, delta int64) (int64, error)
// Storage operations // Storage operations
StoragePut(ctx context.Context, data []byte) (string, error) StoragePut(ctx context.Context, data []byte) (string, error)