diff --git a/cmd/rqlite-mcp/main.go b/cmd/rqlite-mcp/main.go index 514922f..acf5348 100644 --- a/cmd/rqlite-mcp/main.go +++ b/cmd/rqlite-mcp/main.go @@ -74,7 +74,8 @@ func (s *MCPServer) handleRequest(req JSONRPCRequest) JSONRPCResponse { resp.JSONRPC = "2.0" resp.ID = req.ID - log.Printf("Received method: %s", req.Method) + // Debug logging disabled to prevent excessive disk writes + // log.Printf("Received method: %s", req.Method) switch req.Method { case "initialize": @@ -94,7 +95,7 @@ func (s *MCPServer) handleRequest(req JSONRPCRequest) JSONRPCResponse { return JSONRPCResponse{} case "tools/list": - log.Printf("Listing tools") + // Debug logging disabled to prevent excessive disk writes tools := []Tool{ { Name: "list_tables", @@ -144,7 +145,7 @@ func (s *MCPServer) handleRequest(req JSONRPCRequest) JSONRPCResponse { resp.Result = s.handleToolCall(callReq) default: - log.Printf("Unknown method: %s", req.Method) + // Debug logging disabled to prevent excessive disk writes resp.Error = &ResponseError{Code: -32601, Message: "Method not found"} } @@ -152,7 +153,8 @@ func (s *MCPServer) handleRequest(req JSONRPCRequest) JSONRPCResponse { } func (s *MCPServer) handleToolCall(req CallToolRequest) CallToolResult { - log.Printf("Tool call: %s", req.Name) + // Debug logging disabled to prevent excessive disk writes + // log.Printf("Tool call: %s", req.Name) switch req.Name { case "list_tables": @@ -179,7 +181,7 @@ func (s *MCPServer) handleToolCall(req CallToolRequest) CallToolResult { if err := json.Unmarshal(req.Arguments, &args); err != nil { return errorResult(fmt.Sprintf("Invalid arguments: %v", err)) } - log.Printf("Executing query: %s", args.SQL) + // Debug logging disabled to prevent excessive disk writes rows, err := s.conn.QueryOne(args.SQL) if err != nil { return errorResult(fmt.Sprintf("Query error: %v", err)) @@ -215,7 +217,7 @@ func (s *MCPServer) handleToolCall(req CallToolRequest) CallToolResult { if err := json.Unmarshal(req.Arguments, &args); err != nil { return errorResult(fmt.Sprintf("Invalid arguments: %v", err)) } - log.Printf("Executing statement: %s", args.SQL) + // Debug logging disabled to prevent excessive disk writes res, err := s.conn.WriteOne(args.SQL) if err != nil { return errorResult(fmt.Sprintf("Execution error: %v", err)) @@ -292,7 +294,7 @@ func main() { var req JSONRPCRequest if err := json.Unmarshal([]byte(line), &req); err != nil { - log.Printf("Failed to parse request: %v", err) + // Debug logging disabled to prevent excessive disk writes continue } @@ -305,7 +307,7 @@ func main() { respData, err := json.Marshal(resp) if err != nil { - log.Printf("Failed to marshal response: %v", err) + // Debug logging disabled to prevent excessive disk writes continue } @@ -313,6 +315,6 @@ func main() { } if err := scanner.Err(); err != nil { - log.Printf("Scanner error: %v", err) + // Debug logging disabled to prevent excessive disk writes } } diff --git a/pkg/client/client.go b/pkg/client/client.go index d5ca094..82e844e 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -329,6 +329,18 @@ func (c *Client) getAppNamespace() string { return c.config.AppName } +// PubSubAdapter returns the underlying pubsub.ClientAdapter for direct use by serverless functions. +// This bypasses the authentication checks used by PubSub() since serverless functions +// are already authenticated via the gateway. +func (c *Client) PubSubAdapter() *pubsub.ClientAdapter { + c.mu.RLock() + defer c.mu.RUnlock() + if c.pubsub == nil { + return nil + } + return c.pubsub.adapter +} + // requireAccess enforces that credentials are present and that any context-based namespace overrides match func (c *Client) requireAccess(ctx context.Context) error { // Allow internal system operations to bypass authentication diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 2730f94..826de82 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -21,6 +21,7 @@ import ( "github.com/DeBrosOfficial/network/pkg/ipfs" "github.com/DeBrosOfficial/network/pkg/logging" "github.com/DeBrosOfficial/network/pkg/olric" + "github.com/DeBrosOfficial/network/pkg/pubsub" "github.com/DeBrosOfficial/network/pkg/rqlite" "github.com/DeBrosOfficial/network/pkg/serverless" "github.com/multiformats/go-multiaddr" @@ -331,7 +332,19 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { } // Create host functions provider (allows functions to call Orama services) - // Note: pubsub and secrets are nil for now - can be added later + // Get pubsub adapter from client for serverless functions + var pubsubAdapter *pubsub.ClientAdapter + if gw.client != nil { + if concreteClient, ok := gw.client.(*client.Client); ok { + pubsubAdapter = concreteClient.PubSubAdapter() + if pubsubAdapter != nil { + logger.ComponentInfo(logging.ComponentGeneral, "pubsub adapter available for serverless functions") + } else { + logger.ComponentWarn(logging.ComponentGeneral, "pubsub adapter is nil - serverless pubsub will be unavailable") + } + } + } + hostFuncsCfg := serverless.HostFunctionsConfig{ IPFSAPIURL: ipfsAPIURL, HTTPTimeout: 30 * time.Second, @@ -340,7 +353,7 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { gw.ormClient, olricClient, gw.ipfsClient, - nil, // pubsub adapter - TODO: integrate with gateway pubsub + pubsubAdapter, // pubsub adapter for serverless functions gw.serverlessWSMgr, nil, // secrets manager - TODO: implement hostFuncsCfg, diff --git a/pkg/serverless/engine.go b/pkg/serverless/engine.go index 4ff4249..95ec187 100644 --- a/pkg/serverless/engine.go +++ b/pkg/serverless/engine.go @@ -496,6 +496,7 @@ func (e *Engine) registerHostModule(ctx context.Context) error { NewFunctionBuilder().WithFunc(e.hCacheGet).Export("cache_get"). NewFunctionBuilder().WithFunc(e.hCacheSet).Export("cache_set"). NewFunctionBuilder().WithFunc(e.hHTTPFetch).Export("http_fetch"). + NewFunctionBuilder().WithFunc(e.hPubSubPublish).Export("pubsub_publish"). NewFunctionBuilder().WithFunc(e.hLogInfo).Export("log_info"). NewFunctionBuilder().WithFunc(e.hLogError).Export("log_error"). Instantiate(ctx) @@ -646,6 +647,25 @@ func (e *Engine) hHTTPFetch(ctx context.Context, mod api.Module, methodPtr, meth return e.writeToGuest(ctx, mod, resp) } +func (e *Engine) hPubSubPublish(ctx context.Context, mod api.Module, topicPtr, topicLen, dataPtr, dataLen uint32) uint32 { + topic, ok := mod.Memory().Read(topicPtr, topicLen) + if !ok { + return 0 + } + + data, ok := mod.Memory().Read(dataPtr, dataLen) + if !ok { + return 0 + } + + err := e.hostServices.PubSubPublish(ctx, string(topic), data) + if err != nil { + e.logger.Error("host function pubsub_publish failed", zap.Error(err), zap.String("topic", string(topic))) + return 0 + } + return 1 // Success +} + func (e *Engine) hLogInfo(ctx context.Context, mod api.Module, ptr, size uint32) { msg, ok := mod.Memory().Read(ptr, size) if ok {