diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e03532..d8e765a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,21 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant ### Deprecated ### Fixed +## [0.82.0] - 2026-01-03 + +### Added +- Added PubSub Presence feature, allowing clients to track members connected to a topic via WebSocket. +- Added a new tool, `rqlite-mcp`, which implements the Model Communication Protocol (MCP) for Rqlite, enabling AI models to interact with the database using tools. + +### Changed +- Updated the development environment to include and manage the new `rqlite-mcp` service. + +### Deprecated + +### Removed + +### Fixed +\n ## [0.81.0] - 2025-12-31 ### Added diff --git a/Makefile b/Makefile index 632125e..05ffe5c 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill -VERSION := 0.81.0 +VERSION := 0.82.0 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' @@ -31,6 +31,7 @@ build: deps go build -ldflags "$(LDFLAGS)" -o bin/identity ./cmd/identity go build -ldflags "$(LDFLAGS)" -o bin/orama-node ./cmd/node go build -ldflags "$(LDFLAGS)" -o bin/orama cmd/cli/main.go + go build -ldflags "$(LDFLAGS)" -o bin/rqlite-mcp ./cmd/rqlite-mcp # Inject gateway build metadata via pkg path variables go build -ldflags "$(LDFLAGS) -X 'github.com/DeBrosOfficial/network/pkg/gateway.BuildVersion=$(VERSION)' -X 'github.com/DeBrosOfficial/network/pkg/gateway.BuildCommit=$(COMMIT)' -X 'github.com/DeBrosOfficial/network/pkg/gateway.BuildTime=$(DATE)'" -o bin/gateway ./cmd/gateway @echo "Build complete! Run ./bin/orama version" diff --git a/cmd/rqlite-mcp/main.go b/cmd/rqlite-mcp/main.go new file mode 100644 index 0000000..514922f --- /dev/null +++ b/cmd/rqlite-mcp/main.go @@ -0,0 +1,318 @@ +package main + +import ( + "bufio" + "encoding/json" + "fmt" + "log" + "os" + "strings" + "time" + + "github.com/rqlite/gorqlite" +) + +// MCP JSON-RPC types +type JSONRPCRequest struct { + JSONRPC string `json:"jsonrpc"` + ID any `json:"id,omitempty"` + Method string `json:"method"` + Params json.RawMessage `json:"params,omitempty"` +} + +type JSONRPCResponse struct { + JSONRPC string `json:"jsonrpc"` + ID any `json:"id"` + Result any `json:"result,omitempty"` + Error *ResponseError `json:"error,omitempty"` +} + +type ResponseError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// Tool definition +type Tool struct { + Name string `json:"name"` + Description string `json:"description"` + InputSchema any `json:"inputSchema"` +} + +// Tool call types +type CallToolRequest struct { + Name string `json:"name"` + Arguments json.RawMessage `json:"arguments"` +} + +type TextContent struct { + Type string `json:"type"` + Text string `json:"text"` +} + +type CallToolResult struct { + Content []TextContent `json:"content"` + IsError bool `json:"isError,omitempty"` +} + +type MCPServer struct { + conn *gorqlite.Connection +} + +func NewMCPServer(rqliteURL string) (*MCPServer, error) { + conn, err := gorqlite.Open(rqliteURL) + if err != nil { + return nil, err + } + return &MCPServer{ + conn: conn, + }, nil +} + +func (s *MCPServer) handleRequest(req JSONRPCRequest) JSONRPCResponse { + var resp JSONRPCResponse + resp.JSONRPC = "2.0" + resp.ID = req.ID + + log.Printf("Received method: %s", req.Method) + + switch req.Method { + case "initialize": + resp.Result = map[string]any{ + "protocolVersion": "2024-11-05", + "capabilities": map[string]any{ + "tools": map[string]any{}, + }, + "serverInfo": map[string]any{ + "name": "rqlite-mcp", + "version": "0.1.0", + }, + } + + case "notifications/initialized": + // This is a notification, no response needed + return JSONRPCResponse{} + + case "tools/list": + log.Printf("Listing tools") + tools := []Tool{ + { + Name: "list_tables", + Description: "List all tables in the Rqlite database", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{}, + }, + }, + { + Name: "query", + Description: "Run a SELECT query on the Rqlite database", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{ + "sql": map[string]any{ + "type": "string", + "description": "The SQL SELECT query to run", + }, + }, + "required": []string{"sql"}, + }, + }, + { + Name: "execute", + Description: "Run an INSERT, UPDATE, or DELETE statement on the Rqlite database", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{ + "sql": map[string]any{ + "type": "string", + "description": "The SQL statement (INSERT, UPDATE, DELETE) to run", + }, + }, + "required": []string{"sql"}, + }, + }, + } + resp.Result = map[string]any{"tools": tools} + + case "tools/call": + var callReq CallToolRequest + if err := json.Unmarshal(req.Params, &callReq); err != nil { + resp.Error = &ResponseError{Code: -32700, Message: "Parse error"} + return resp + } + resp.Result = s.handleToolCall(callReq) + + default: + log.Printf("Unknown method: %s", req.Method) + resp.Error = &ResponseError{Code: -32601, Message: "Method not found"} + } + + return resp +} + +func (s *MCPServer) handleToolCall(req CallToolRequest) CallToolResult { + log.Printf("Tool call: %s", req.Name) + + switch req.Name { + case "list_tables": + rows, err := s.conn.QueryOne("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") + if err != nil { + return errorResult(fmt.Sprintf("Error listing tables: %v", err)) + } + var tables []string + for rows.Next() { + slice, err := rows.Slice() + if err == nil && len(slice) > 0 { + tables = append(tables, fmt.Sprint(slice[0])) + } + } + if len(tables) == 0 { + return textResult("No tables found") + } + return textResult(strings.Join(tables, "\n")) + + case "query": + var args struct { + SQL string `json:"sql"` + } + if err := json.Unmarshal(req.Arguments, &args); err != nil { + return errorResult(fmt.Sprintf("Invalid arguments: %v", err)) + } + log.Printf("Executing query: %s", args.SQL) + rows, err := s.conn.QueryOne(args.SQL) + if err != nil { + return errorResult(fmt.Sprintf("Query error: %v", err)) + } + + var result strings.Builder + cols := rows.Columns() + result.WriteString(strings.Join(cols, " | ") + "\n") + result.WriteString(strings.Repeat("-", len(cols)*10) + "\n") + + rowCount := 0 + for rows.Next() { + vals, err := rows.Slice() + if err != nil { + continue + } + rowCount++ + for i, v := range vals { + if i > 0 { + result.WriteString(" | ") + } + result.WriteString(fmt.Sprint(v)) + } + result.WriteString("\n") + } + result.WriteString(fmt.Sprintf("\n(%d rows)", rowCount)) + return textResult(result.String()) + + case "execute": + var args struct { + SQL string `json:"sql"` + } + if err := json.Unmarshal(req.Arguments, &args); err != nil { + return errorResult(fmt.Sprintf("Invalid arguments: %v", err)) + } + log.Printf("Executing statement: %s", args.SQL) + res, err := s.conn.WriteOne(args.SQL) + if err != nil { + return errorResult(fmt.Sprintf("Execution error: %v", err)) + } + return textResult(fmt.Sprintf("Rows affected: %d", res.RowsAffected)) + + default: + return errorResult(fmt.Sprintf("Unknown tool: %s", req.Name)) + } +} + +func textResult(text string) CallToolResult { + return CallToolResult{ + Content: []TextContent{ + { + Type: "text", + Text: text, + }, + }, + } +} + +func errorResult(text string) CallToolResult { + return CallToolResult{ + Content: []TextContent{ + { + Type: "text", + Text: text, + }, + }, + IsError: true, + } +} + +func main() { + // Log to stderr so stdout is clean for JSON-RPC + log.SetOutput(os.Stderr) + + rqliteURL := "http://localhost:5001" + if u := os.Getenv("RQLITE_URL"); u != "" { + rqliteURL = u + } + + var server *MCPServer + var err error + + // Retry connecting to rqlite + maxRetries := 30 + for i := 0; i < maxRetries; i++ { + server, err = NewMCPServer(rqliteURL) + if err == nil { + break + } + if i%5 == 0 { + log.Printf("Waiting for Rqlite at %s... (%d/%d)", rqliteURL, i+1, maxRetries) + } + time.Sleep(1 * time.Second) + } + + if err != nil { + log.Fatalf("Failed to connect to Rqlite after %d retries: %v", maxRetries, err) + } + + log.Printf("MCP Rqlite server started (stdio transport)") + log.Printf("Connected to Rqlite at %s", rqliteURL) + + // Read JSON-RPC requests from stdin, write responses to stdout + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + line := scanner.Text() + if line == "" { + continue + } + + var req JSONRPCRequest + if err := json.Unmarshal([]byte(line), &req); err != nil { + log.Printf("Failed to parse request: %v", err) + continue + } + + resp := server.handleRequest(req) + + // Don't send response for notifications (no ID) + if req.ID == nil && strings.HasPrefix(req.Method, "notifications/") { + continue + } + + respData, err := json.Marshal(resp) + if err != nil { + log.Printf("Failed to marshal response: %v", err) + continue + } + + fmt.Println(string(respData)) + } + + if err := scanner.Err(); err != nil { + log.Printf("Scanner error: %v", err) + } +} diff --git a/e2e/env.go b/e2e/env.go index ca991c8..aff3399 100644 --- a/e2e/env.go +++ b/e2e/env.go @@ -738,6 +738,65 @@ func NewWSPubSubClient(t *testing.T, topic string) (*WSPubSubClient, error) { return client, nil } +// NewWSPubSubPresenceClient creates a new WebSocket PubSub client with presence parameters +func NewWSPubSubPresenceClient(t *testing.T, topic, memberID string, meta map[string]interface{}) (*WSPubSubClient, error) { + t.Helper() + + // Build WebSocket URL + gatewayURL := GetGatewayURL() + wsURL := strings.Replace(gatewayURL, "http://", "ws://", 1) + wsURL = strings.Replace(wsURL, "https://", "wss://", 1) + + u, err := url.Parse(wsURL + "/v1/pubsub/ws") + if err != nil { + return nil, fmt.Errorf("failed to parse WebSocket URL: %w", err) + } + q := u.Query() + q.Set("topic", topic) + q.Set("presence", "true") + q.Set("member_id", memberID) + if meta != nil { + metaJSON, _ := json.Marshal(meta) + q.Set("member_meta", string(metaJSON)) + } + u.RawQuery = q.Encode() + + // Set up headers with authentication + headers := http.Header{} + if apiKey := GetAPIKey(); apiKey != "" { + headers.Set("Authorization", "Bearer "+apiKey) + } + + // Connect to WebSocket + dialer := websocket.Dialer{ + HandshakeTimeout: 10 * time.Second, + } + + conn, resp, err := dialer.Dial(u.String(), headers) + if err != nil { + if resp != nil { + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + return nil, fmt.Errorf("websocket dial failed (status %d): %w - body: %s", resp.StatusCode, err, string(body)) + } + return nil, fmt.Errorf("websocket dial failed: %w", err) + } + + client := &WSPubSubClient{ + t: t, + conn: conn, + topic: topic, + handlers: make([]func(topic string, data []byte) error, 0), + msgChan: make(chan []byte, 128), + doneChan: make(chan struct{}), + } + + // Start reader goroutine + go client.readLoop() + + return client, nil +} + // readLoop reads messages from the WebSocket and dispatches to handlers func (c *WSPubSubClient) readLoop() { defer close(c.doneChan) diff --git a/e2e/pubsub_presence_test.go b/e2e/pubsub_presence_test.go new file mode 100644 index 0000000..8c0ddc1 --- /dev/null +++ b/e2e/pubsub_presence_test.go @@ -0,0 +1,122 @@ +//go:build e2e + +package e2e + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "testing" + "time" +) + +func TestPubSub_Presence(t *testing.T) { + SkipIfMissingGateway(t) + + topic := GenerateTopic() + memberID := "user123" + memberMeta := map[string]interface{}{"name": "Alice"} + + // 1. Subscribe with presence + client1, err := NewWSPubSubPresenceClient(t, topic, memberID, memberMeta) + if err != nil { + t.Fatalf("failed to create presence client: %v", err) + } + defer client1.Close() + + // Wait for join event + msg, err := client1.ReceiveWithTimeout(5 * time.Second) + if err != nil { + t.Fatalf("did not receive join event: %v", err) + } + + var event map[string]interface{} + if err := json.Unmarshal(msg, &event); err != nil { + t.Fatalf("failed to unmarshal event: %v", err) + } + + if event["type"] != "presence.join" { + t.Fatalf("expected presence.join event, got %v", event["type"]) + } + + if event["member_id"] != memberID { + t.Fatalf("expected member_id %s, got %v", memberID, event["member_id"]) + } + + // 2. Query presence endpoint + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + req := &HTTPRequest{ + Method: http.MethodGet, + URL: fmt.Sprintf("%s/v1/pubsub/presence?topic=%s", GetGatewayURL(), topic), + } + + body, status, err := req.Do(ctx) + if err != nil { + t.Fatalf("presence query failed: %v", err) + } + + if status != http.StatusOK { + t.Fatalf("expected status 200, got %d", status) + } + + var resp map[string]interface{} + if err := DecodeJSON(body, &resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + + if resp["count"] != float64(1) { + t.Fatalf("expected count 1, got %v", resp["count"]) + } + + members := resp["members"].([]interface{}) + if len(members) != 1 { + t.Fatalf("expected 1 member, got %d", len(members)) + } + + member := members[0].(map[string]interface{}) + if member["member_id"] != memberID { + t.Fatalf("expected member_id %s, got %v", memberID, member["member_id"]) + } + + // 3. Subscribe second member + memberID2 := "user456" + client2, err := NewWSPubSubPresenceClient(t, topic, memberID2, nil) + if err != nil { + t.Fatalf("failed to create second presence client: %v", err) + } + // We'll close client2 later to test leave event + + // Client1 should receive join event for Client2 + msg2, err := client1.ReceiveWithTimeout(5 * time.Second) + if err != nil { + t.Fatalf("client1 did not receive join event for client2: %v", err) + } + + if err := json.Unmarshal(msg2, &event); err != nil { + t.Fatalf("failed to unmarshal event: %v", err) + } + + if event["type"] != "presence.join" || event["member_id"] != memberID2 { + t.Fatalf("expected presence.join for %s, got %v for %v", memberID2, event["type"], event["member_id"]) + } + + // 4. Disconnect client2 and verify leave event + client2.Close() + + msg3, err := client1.ReceiveWithTimeout(5 * time.Second) + if err != nil { + t.Fatalf("client1 did not receive leave event for client2: %v", err) + } + + if err := json.Unmarshal(msg3, &event); err != nil { + t.Fatalf("failed to unmarshal event: %v", err) + } + + if event["type"] != "presence.leave" || event["member_id"] != memberID2 { + t.Fatalf("expected presence.leave for %s, got %v for %v", memberID2, event["type"], event["member_id"]) + } +} + diff --git a/pkg/environments/development/process.go b/pkg/environments/development/process.go index 55d6ee1..02b8fdb 100644 --- a/pkg/environments/development/process.go +++ b/pkg/environments/development/process.go @@ -29,7 +29,8 @@ func (pm *ProcessManager) printStartupSummary(topology *Topology) { fmt.Fprintf(pm.logWriter, "📊 Other Services:\n") fmt.Fprintf(pm.logWriter, " Olric: http://localhost:%d\n", topology.OlricHTTPPort) - fmt.Fprintf(pm.logWriter, " Anon SOCKS: 127.0.0.1:%d\n\n", topology.AnonSOCKSPort) + fmt.Fprintf(pm.logWriter, " Anon SOCKS: 127.0.0.1:%d\n", topology.AnonSOCKSPort) + fmt.Fprintf(pm.logWriter, " Rqlite MCP: http://localhost:%d/sse\n\n", topology.MCPPort) fmt.Fprintf(pm.logWriter, "📝 Useful Commands:\n") fmt.Fprintf(pm.logWriter, " ./bin/orama dev status - Check service status\n") @@ -192,6 +193,31 @@ func (pm *ProcessManager) startAnon(ctx context.Context) error { return nil } +func (pm *ProcessManager) startMCP(ctx context.Context) error { + topology := DefaultTopology() + pidPath := filepath.Join(pm.pidsDir, "rqlite-mcp.pid") + logPath := filepath.Join(pm.oramaDir, "logs", "rqlite-mcp.log") + + cmd := exec.CommandContext(ctx, "./bin/rqlite-mcp") + cmd.Env = append(os.Environ(), + fmt.Sprintf("MCP_PORT=%d", topology.MCPPort), + fmt.Sprintf("RQLITE_URL=http://localhost:%d", topology.Nodes[0].RQLiteHTTPPort), + ) + logFile, _ := os.Create(logPath) + cmd.Stdout = logFile + cmd.Stderr = logFile + + if err := cmd.Start(); err != nil { + fmt.Fprintf(pm.logWriter, " ⚠️ Failed to start Rqlite MCP: %v\n", err) + return nil + } + + os.WriteFile(pidPath, []byte(fmt.Sprintf("%d", cmd.Process.Pid)), 0644) + fmt.Fprintf(pm.logWriter, "✓ Rqlite MCP started (PID: %d, port: %d)\n", cmd.Process.Pid, topology.MCPPort) + + return nil +} + func (pm *ProcessManager) startNodes(ctx context.Context) error { topology := DefaultTopology() for _, nodeSpec := range topology.Nodes { @@ -203,4 +229,3 @@ func (pm *ProcessManager) startNodes(ctx context.Context) error { } return nil } - diff --git a/pkg/environments/development/runner.go b/pkg/environments/development/runner.go index fc0c1e6..7b39c05 100644 --- a/pkg/environments/development/runner.go +++ b/pkg/environments/development/runner.go @@ -12,7 +12,7 @@ import ( // ProcessManager manages all dev environment processes type ProcessManager struct { - oramaDir string + oramaDir string pidsDir string processes map[string]*ManagedProcess mutex sync.Mutex @@ -33,7 +33,7 @@ func NewProcessManager(oramaDir string, logWriter io.Writer) *ProcessManager { os.MkdirAll(pidsDir, 0755) return &ProcessManager{ - oramaDir: oramaDir, + oramaDir: oramaDir, pidsDir: pidsDir, processes: make(map[string]*ManagedProcess), logWriter: logWriter, @@ -60,6 +60,7 @@ func (pm *ProcessManager) StartAll(ctx context.Context) error { {"Olric", pm.startOlric}, {"Anon", pm.startAnon}, {"Nodes (Network)", pm.startNodes}, + {"Rqlite MCP", pm.startMCP}, } for _, svc := range services { @@ -109,10 +110,10 @@ func (pm *ProcessManager) StopAll(ctx context.Context) error { node := topology.Nodes[i] services = append(services, fmt.Sprintf("ipfs-%s", node.Name)) } - services = append(services, "olric", "anon") + services = append(services, "olric", "anon", "rqlite-mcp") fmt.Fprintf(pm.logWriter, "Stopping %d services...\n\n", len(services)) - + stoppedCount := 0 for _, svc := range services { if err := pm.stopProcess(svc); err != nil { @@ -176,6 +177,10 @@ func (pm *ProcessManager) Status(ctx context.Context) { name string ports []int }{"Anon SOCKS", []int{topology.AnonSOCKSPort}}) + services = append(services, struct { + name string + ports []int + }{"Rqlite MCP", []int{topology.MCPPort}}) for _, svc := range services { pidPath := filepath.Join(pm.pidsDir, fmt.Sprintf("%s.pid", svc.name)) diff --git a/pkg/environments/development/topology.go b/pkg/environments/development/topology.go index 31c4de0..607bed7 100644 --- a/pkg/environments/development/topology.go +++ b/pkg/environments/development/topology.go @@ -4,20 +4,20 @@ import "fmt" // NodeSpec defines configuration for a single dev environment node type NodeSpec struct { - Name string // node-1, node-2, node-3, node-4, node-5 - ConfigFilename string // node-1.yaml, node-2.yaml, etc. - DataDir string // relative path from .orama root - P2PPort int // LibP2P listen port - IPFSAPIPort int // IPFS API port - IPFSSwarmPort int // IPFS Swarm port - IPFSGatewayPort int // IPFS HTTP Gateway port - RQLiteHTTPPort int // RQLite HTTP API port - RQLiteRaftPort int // RQLite Raft consensus port - ClusterAPIPort int // IPFS Cluster REST API port - ClusterPort int // IPFS Cluster P2P port - UnifiedGatewayPort int // Unified gateway port (proxies all services) - RQLiteJoinTarget string // which node's RQLite Raft port to join (empty for first node) - ClusterJoinTarget string // which node's cluster to join (empty for first node) + Name string // node-1, node-2, node-3, node-4, node-5 + ConfigFilename string // node-1.yaml, node-2.yaml, etc. + DataDir string // relative path from .orama root + P2PPort int // LibP2P listen port + IPFSAPIPort int // IPFS API port + IPFSSwarmPort int // IPFS Swarm port + IPFSGatewayPort int // IPFS HTTP Gateway port + RQLiteHTTPPort int // RQLite HTTP API port + RQLiteRaftPort int // RQLite Raft consensus port + ClusterAPIPort int // IPFS Cluster REST API port + ClusterPort int // IPFS Cluster P2P port + UnifiedGatewayPort int // Unified gateway port (proxies all services) + RQLiteJoinTarget string // which node's RQLite Raft port to join (empty for first node) + ClusterJoinTarget string // which node's cluster to join (empty for first node) } // Topology defines the complete development environment topology @@ -27,97 +27,99 @@ type Topology struct { OlricHTTPPort int OlricMemberPort int AnonSOCKSPort int + MCPPort int } // DefaultTopology returns the default five-node dev environment topology func DefaultTopology() *Topology { return &Topology{ Nodes: []NodeSpec{ - { - Name: "node-1", - ConfigFilename: "node-1.yaml", - DataDir: "node-1", - P2PPort: 4001, - IPFSAPIPort: 4501, - IPFSSwarmPort: 4101, - IPFSGatewayPort: 7501, - RQLiteHTTPPort: 5001, - RQLiteRaftPort: 7001, - ClusterAPIPort: 9094, - ClusterPort: 9096, - UnifiedGatewayPort: 6001, - RQLiteJoinTarget: "", // First node - creates cluster - ClusterJoinTarget: "", + { + Name: "node-1", + ConfigFilename: "node-1.yaml", + DataDir: "node-1", + P2PPort: 4001, + IPFSAPIPort: 4501, + IPFSSwarmPort: 4101, + IPFSGatewayPort: 7501, + RQLiteHTTPPort: 5001, + RQLiteRaftPort: 7001, + ClusterAPIPort: 9094, + ClusterPort: 9096, + UnifiedGatewayPort: 6001, + RQLiteJoinTarget: "", // First node - creates cluster + ClusterJoinTarget: "", + }, + { + Name: "node-2", + ConfigFilename: "node-2.yaml", + DataDir: "node-2", + P2PPort: 4011, + IPFSAPIPort: 4511, + IPFSSwarmPort: 4111, + IPFSGatewayPort: 7511, + RQLiteHTTPPort: 5011, + RQLiteRaftPort: 7011, + ClusterAPIPort: 9104, + ClusterPort: 9106, + UnifiedGatewayPort: 6002, + RQLiteJoinTarget: "localhost:7001", + ClusterJoinTarget: "localhost:9096", + }, + { + Name: "node-3", + ConfigFilename: "node-3.yaml", + DataDir: "node-3", + P2PPort: 4002, + IPFSAPIPort: 4502, + IPFSSwarmPort: 4102, + IPFSGatewayPort: 7502, + RQLiteHTTPPort: 5002, + RQLiteRaftPort: 7002, + ClusterAPIPort: 9114, + ClusterPort: 9116, + UnifiedGatewayPort: 6003, + RQLiteJoinTarget: "localhost:7001", + ClusterJoinTarget: "localhost:9096", + }, + { + Name: "node-4", + ConfigFilename: "node-4.yaml", + DataDir: "node-4", + P2PPort: 4003, + IPFSAPIPort: 4503, + IPFSSwarmPort: 4103, + IPFSGatewayPort: 7503, + RQLiteHTTPPort: 5003, + RQLiteRaftPort: 7003, + ClusterAPIPort: 9124, + ClusterPort: 9126, + UnifiedGatewayPort: 6004, + RQLiteJoinTarget: "localhost:7001", + ClusterJoinTarget: "localhost:9096", + }, + { + Name: "node-5", + ConfigFilename: "node-5.yaml", + DataDir: "node-5", + P2PPort: 4004, + IPFSAPIPort: 4504, + IPFSSwarmPort: 4104, + IPFSGatewayPort: 7504, + RQLiteHTTPPort: 5004, + RQLiteRaftPort: 7004, + ClusterAPIPort: 9134, + ClusterPort: 9136, + UnifiedGatewayPort: 6005, + RQLiteJoinTarget: "localhost:7001", + ClusterJoinTarget: "localhost:9096", + }, }, - { - Name: "node-2", - ConfigFilename: "node-2.yaml", - DataDir: "node-2", - P2PPort: 4011, - IPFSAPIPort: 4511, - IPFSSwarmPort: 4111, - IPFSGatewayPort: 7511, - RQLiteHTTPPort: 5011, - RQLiteRaftPort: 7011, - ClusterAPIPort: 9104, - ClusterPort: 9106, - UnifiedGatewayPort: 6002, - RQLiteJoinTarget: "localhost:7001", - ClusterJoinTarget: "localhost:9096", - }, - { - Name: "node-3", - ConfigFilename: "node-3.yaml", - DataDir: "node-3", - P2PPort: 4002, - IPFSAPIPort: 4502, - IPFSSwarmPort: 4102, - IPFSGatewayPort: 7502, - RQLiteHTTPPort: 5002, - RQLiteRaftPort: 7002, - ClusterAPIPort: 9114, - ClusterPort: 9116, - UnifiedGatewayPort: 6003, - RQLiteJoinTarget: "localhost:7001", - ClusterJoinTarget: "localhost:9096", - }, - { - Name: "node-4", - ConfigFilename: "node-4.yaml", - DataDir: "node-4", - P2PPort: 4003, - IPFSAPIPort: 4503, - IPFSSwarmPort: 4103, - IPFSGatewayPort: 7503, - RQLiteHTTPPort: 5003, - RQLiteRaftPort: 7003, - ClusterAPIPort: 9124, - ClusterPort: 9126, - UnifiedGatewayPort: 6004, - RQLiteJoinTarget: "localhost:7001", - ClusterJoinTarget: "localhost:9096", - }, - { - Name: "node-5", - ConfigFilename: "node-5.yaml", - DataDir: "node-5", - P2PPort: 4004, - IPFSAPIPort: 4504, - IPFSSwarmPort: 4104, - IPFSGatewayPort: 7504, - RQLiteHTTPPort: 5004, - RQLiteRaftPort: 7004, - ClusterAPIPort: 9134, - ClusterPort: 9136, - UnifiedGatewayPort: 6005, - RQLiteJoinTarget: "localhost:7001", - ClusterJoinTarget: "localhost:9096", - }, - }, - GatewayPort: 6000, // Main gateway on 6000 (nodes use 6001-6005) + GatewayPort: 6000, // Main gateway on 6000 (nodes use 6001-6005) OlricHTTPPort: 3320, OlricMemberPort: 3322, AnonSOCKSPort: 9050, + MCPPort: 5825, } } diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 297d2fd..2730f94 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -85,7 +85,9 @@ type Gateway struct { // Local pub/sub bypass for same-gateway subscribers localSubscribers map[string][]*localSubscriber // topic+namespace -> subscribers + presenceMembers map[string][]PresenceMember // topicKey -> members mu sync.RWMutex + presenceMu sync.RWMutex // Serverless function engine serverlessEngine *serverless.Engine @@ -104,6 +106,14 @@ type localSubscriber struct { namespace string } +// PresenceMember represents a member in a topic's presence list +type PresenceMember struct { + MemberID string `json:"member_id"` + JoinedAt int64 `json:"joined_at"` // Unix timestamp + Meta map[string]interface{} `json:"meta,omitempty"` + ConnID string `json:"-"` // Internal: for tracking which connection +} + // New creates and initializes a new Gateway instance func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { logger.ComponentInfo(logging.ComponentGeneral, "Building client config...") @@ -140,6 +150,7 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { nodePeerID: cfg.NodePeerID, startedAt: time.Now(), localSubscribers: make(map[string][]*localSubscriber), + presenceMembers: make(map[string][]PresenceMember), } logger.ComponentInfo(logging.ComponentGeneral, "Initializing RQLite ORM HTTP gateway...") diff --git a/pkg/gateway/pubsub_handlers.go b/pkg/gateway/pubsub_handlers.go index 8a951c2..4a027b9 100644 --- a/pkg/gateway/pubsub_handlers.go +++ b/pkg/gateway/pubsub_handlers.go @@ -10,6 +10,7 @@ import ( "github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/pubsub" + "github.com/google/uuid" "go.uber.org/zap" "github.com/gorilla/websocket" @@ -51,6 +52,22 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) writeError(w, http.StatusBadRequest, "missing 'topic'") return } + + // Presence handling + enablePresence := r.URL.Query().Get("presence") == "true" + memberID := r.URL.Query().Get("member_id") + memberMetaStr := r.URL.Query().Get("member_meta") + var memberMeta map[string]interface{} + if memberMetaStr != "" { + _ = json.Unmarshal([]byte(memberMetaStr), &memberMeta) + } + + if enablePresence && memberID == "" { + g.logger.ComponentWarn("gateway", "pubsub ws: presence enabled but missing member_id") + writeError(w, http.StatusBadRequest, "missing 'member_id' for presence") + return + } + conn, err := wsUpgrader.Upgrade(w, r, nil) if err != nil { g.logger.ComponentWarn("gateway", "pubsub ws: upgrade failed") @@ -73,6 +90,36 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) subscriberCount := len(g.localSubscribers[topicKey]) g.mu.Unlock() + connID := uuid.New().String() + if enablePresence { + member := PresenceMember{ + MemberID: memberID, + JoinedAt: time.Now().Unix(), + Meta: memberMeta, + ConnID: connID, + } + + g.presenceMu.Lock() + g.presenceMembers[topicKey] = append(g.presenceMembers[topicKey], member) + g.presenceMu.Unlock() + + // Broadcast join event + joinEvent := map[string]interface{}{ + "type": "presence.join", + "member_id": memberID, + "meta": memberMeta, + "timestamp": member.JoinedAt, + } + eventData, _ := json.Marshal(joinEvent) + // Use a background context for the broadcast to ensure it finishes even if the connection closes immediately + broadcastCtx := pubsub.WithNamespace(client.WithInternalAuth(context.Background()), ns) + _ = g.client.PubSub().Publish(broadcastCtx, topic, eventData) + + g.logger.ComponentInfo("gateway", "pubsub ws: member joined presence", + zap.String("topic", topic), + zap.String("member_id", memberID)) + } + g.logger.ComponentInfo("gateway", "pubsub ws: registered local subscriber", zap.String("topic", topic), zap.String("namespace", ns), @@ -93,6 +140,36 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) delete(g.localSubscribers, topicKey) } g.mu.Unlock() + + if enablePresence { + g.presenceMu.Lock() + members := g.presenceMembers[topicKey] + for i, m := range members { + if m.ConnID == connID { + g.presenceMembers[topicKey] = append(members[:i], members[i+1:]...) + break + } + } + if len(g.presenceMembers[topicKey]) == 0 { + delete(g.presenceMembers, topicKey) + } + g.presenceMu.Unlock() + + // Broadcast leave event + leaveEvent := map[string]interface{}{ + "type": "presence.leave", + "member_id": memberID, + "timestamp": time.Now().Unix(), + } + eventData, _ := json.Marshal(leaveEvent) + broadcastCtx := pubsub.WithNamespace(client.WithInternalAuth(context.Background()), ns) + _ = g.client.PubSub().Publish(broadcastCtx, topic, eventData) + + g.logger.ComponentInfo("gateway", "pubsub ws: member left presence", + zap.String("topic", topic), + zap.String("member_id", memberID)) + } + g.logger.ComponentInfo("gateway", "pubsub ws: unregistered local subscriber", zap.String("topic", topic), zap.Int("remaining_subscribers", remainingCount)) @@ -349,3 +426,44 @@ func namespacePrefix(ns string) string { func namespacedTopic(ns, topic string) string { return namespacePrefix(ns) + topic } + +// pubsubPresenceHandler handles GET /v1/pubsub/presence?topic=mytopic +func (g *Gateway) pubsubPresenceHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + ns := resolveNamespaceFromRequest(r) + if ns == "" { + writeError(w, http.StatusForbidden, "namespace not resolved") + return + } + + topic := r.URL.Query().Get("topic") + if topic == "" { + writeError(w, http.StatusBadRequest, "missing 'topic'") + return + } + + topicKey := fmt.Sprintf("%s.%s", ns, topic) + + g.presenceMu.RLock() + members, ok := g.presenceMembers[topicKey] + g.presenceMu.RUnlock() + + if !ok { + writeJSON(w, http.StatusOK, map[string]any{ + "topic": topic, + "members": []PresenceMember{}, + "count": 0, + }) + return + } + + writeJSON(w, http.StatusOK, map[string]any{ + "topic": topic, + "members": members, + "count": len(members), + }) +} diff --git a/pkg/gateway/routes.go b/pkg/gateway/routes.go index 6e2a22b..f574ea2 100644 --- a/pkg/gateway/routes.go +++ b/pkg/gateway/routes.go @@ -44,6 +44,7 @@ func (g *Gateway) Routes() http.Handler { mux.HandleFunc("/v1/pubsub/ws", g.pubsubWebsocketHandler) mux.HandleFunc("/v1/pubsub/publish", g.pubsubPublishHandler) mux.HandleFunc("/v1/pubsub/topics", g.pubsubTopicsHandler) + mux.HandleFunc("/v1/pubsub/presence", g.pubsubPresenceHandler) // anon proxy (authenticated users only) mux.HandleFunc("/v1/proxy/anon", g.anonProxyHandler)