mirror of
https://github.com/DeBrosOfficial/network.git
synced 2026-01-30 03:43:04 +00:00
- Introduced a new Rqlite MCP server implementation in `cmd/rqlite-mcp`, enabling JSON-RPC communication for database operations. - Updated the Makefile to include the build command for the Rqlite MCP server. - Enhanced the WebSocket PubSub client with presence capabilities, allowing members to join and leave topics with notifications. - Implemented presence management in the gateway, including endpoints for querying current members in a topic. - Added end-to-end tests for presence functionality, ensuring correct behavior during member join and leave events.
470 lines
14 KiB
Go
470 lines
14 KiB
Go
package gateway
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/DeBrosOfficial/network/pkg/client"
|
|
"github.com/DeBrosOfficial/network/pkg/pubsub"
|
|
"github.com/google/uuid"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
var wsUpgrader = websocket.Upgrader{
|
|
ReadBufferSize: 1024,
|
|
WriteBufferSize: 1024,
|
|
// For early development we accept any origin; tighten later.
|
|
CheckOrigin: func(r *http.Request) bool { return true },
|
|
}
|
|
|
|
// pubsubWebsocketHandler upgrades to WS, subscribes to a namespaced topic, and
|
|
// forwards received PubSub messages to the client. Messages sent by the client
|
|
// are published to the same namespaced topic.
|
|
func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) {
|
|
if g.client == nil {
|
|
g.logger.ComponentWarn("gateway", "pubsub ws: client not initialized")
|
|
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
|
return
|
|
}
|
|
if r.Method != http.MethodGet {
|
|
g.logger.ComponentWarn("gateway", "pubsub ws: method not allowed")
|
|
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
|
return
|
|
}
|
|
|
|
// Resolve namespace from auth context
|
|
ns := resolveNamespaceFromRequest(r)
|
|
if ns == "" {
|
|
g.logger.ComponentWarn("gateway", "pubsub ws: namespace not resolved")
|
|
writeError(w, http.StatusForbidden, "namespace not resolved")
|
|
return
|
|
}
|
|
|
|
topic := r.URL.Query().Get("topic")
|
|
if topic == "" {
|
|
g.logger.ComponentWarn("gateway", "pubsub ws: missing topic")
|
|
writeError(w, http.StatusBadRequest, "missing 'topic'")
|
|
return
|
|
}
|
|
|
|
// Presence handling
|
|
enablePresence := r.URL.Query().Get("presence") == "true"
|
|
memberID := r.URL.Query().Get("member_id")
|
|
memberMetaStr := r.URL.Query().Get("member_meta")
|
|
var memberMeta map[string]interface{}
|
|
if memberMetaStr != "" {
|
|
_ = json.Unmarshal([]byte(memberMetaStr), &memberMeta)
|
|
}
|
|
|
|
if enablePresence && memberID == "" {
|
|
g.logger.ComponentWarn("gateway", "pubsub ws: presence enabled but missing member_id")
|
|
writeError(w, http.StatusBadRequest, "missing 'member_id' for presence")
|
|
return
|
|
}
|
|
|
|
conn, err := wsUpgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
g.logger.ComponentWarn("gateway", "pubsub ws: upgrade failed")
|
|
return
|
|
}
|
|
defer conn.Close()
|
|
|
|
// Channel to deliver PubSub messages to WS writer
|
|
msgs := make(chan []byte, 128)
|
|
|
|
// NEW: Register as local subscriber for direct message delivery
|
|
localSub := &localSubscriber{
|
|
msgChan: msgs,
|
|
namespace: ns,
|
|
}
|
|
topicKey := fmt.Sprintf("%s.%s", ns, topic)
|
|
|
|
g.mu.Lock()
|
|
g.localSubscribers[topicKey] = append(g.localSubscribers[topicKey], localSub)
|
|
subscriberCount := len(g.localSubscribers[topicKey])
|
|
g.mu.Unlock()
|
|
|
|
connID := uuid.New().String()
|
|
if enablePresence {
|
|
member := PresenceMember{
|
|
MemberID: memberID,
|
|
JoinedAt: time.Now().Unix(),
|
|
Meta: memberMeta,
|
|
ConnID: connID,
|
|
}
|
|
|
|
g.presenceMu.Lock()
|
|
g.presenceMembers[topicKey] = append(g.presenceMembers[topicKey], member)
|
|
g.presenceMu.Unlock()
|
|
|
|
// Broadcast join event
|
|
joinEvent := map[string]interface{}{
|
|
"type": "presence.join",
|
|
"member_id": memberID,
|
|
"meta": memberMeta,
|
|
"timestamp": member.JoinedAt,
|
|
}
|
|
eventData, _ := json.Marshal(joinEvent)
|
|
// Use a background context for the broadcast to ensure it finishes even if the connection closes immediately
|
|
broadcastCtx := pubsub.WithNamespace(client.WithInternalAuth(context.Background()), ns)
|
|
_ = g.client.PubSub().Publish(broadcastCtx, topic, eventData)
|
|
|
|
g.logger.ComponentInfo("gateway", "pubsub ws: member joined presence",
|
|
zap.String("topic", topic),
|
|
zap.String("member_id", memberID))
|
|
}
|
|
|
|
g.logger.ComponentInfo("gateway", "pubsub ws: registered local subscriber",
|
|
zap.String("topic", topic),
|
|
zap.String("namespace", ns),
|
|
zap.Int("total_subscribers", subscriberCount))
|
|
|
|
// Unregister on close
|
|
defer func() {
|
|
g.mu.Lock()
|
|
subs := g.localSubscribers[topicKey]
|
|
for i, sub := range subs {
|
|
if sub == localSub {
|
|
g.localSubscribers[topicKey] = append(subs[:i], subs[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
remainingCount := len(g.localSubscribers[topicKey])
|
|
if remainingCount == 0 {
|
|
delete(g.localSubscribers, topicKey)
|
|
}
|
|
g.mu.Unlock()
|
|
|
|
if enablePresence {
|
|
g.presenceMu.Lock()
|
|
members := g.presenceMembers[topicKey]
|
|
for i, m := range members {
|
|
if m.ConnID == connID {
|
|
g.presenceMembers[topicKey] = append(members[:i], members[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
if len(g.presenceMembers[topicKey]) == 0 {
|
|
delete(g.presenceMembers, topicKey)
|
|
}
|
|
g.presenceMu.Unlock()
|
|
|
|
// Broadcast leave event
|
|
leaveEvent := map[string]interface{}{
|
|
"type": "presence.leave",
|
|
"member_id": memberID,
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
eventData, _ := json.Marshal(leaveEvent)
|
|
broadcastCtx := pubsub.WithNamespace(client.WithInternalAuth(context.Background()), ns)
|
|
_ = g.client.PubSub().Publish(broadcastCtx, topic, eventData)
|
|
|
|
g.logger.ComponentInfo("gateway", "pubsub ws: member left presence",
|
|
zap.String("topic", topic),
|
|
zap.String("member_id", memberID))
|
|
}
|
|
|
|
g.logger.ComponentInfo("gateway", "pubsub ws: unregistered local subscriber",
|
|
zap.String("topic", topic),
|
|
zap.Int("remaining_subscribers", remainingCount))
|
|
}()
|
|
|
|
// Use internal auth context when interacting with client to avoid circular auth requirements
|
|
ctx := client.WithInternalAuth(r.Context())
|
|
// Apply namespace isolation
|
|
ctx = pubsub.WithNamespace(ctx, ns)
|
|
|
|
// Writer loop - START THIS FIRST before libp2p subscription
|
|
done := make(chan struct{})
|
|
go func() {
|
|
g.logger.ComponentInfo("gateway", "pubsub ws: writer goroutine started",
|
|
zap.String("topic", topic))
|
|
defer g.logger.ComponentInfo("gateway", "pubsub ws: writer goroutine exiting",
|
|
zap.String("topic", topic))
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case b, ok := <-msgs:
|
|
if !ok {
|
|
g.logger.ComponentWarn("gateway", "pubsub ws: message channel closed",
|
|
zap.String("topic", topic))
|
|
_ = conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(5*time.Second))
|
|
close(done)
|
|
return
|
|
}
|
|
|
|
g.logger.ComponentInfo("gateway", "pubsub ws: sending message to client",
|
|
zap.String("topic", topic),
|
|
zap.Int("data_len", len(b)))
|
|
|
|
// Format message as JSON envelope with data (base64 encoded), timestamp, and topic
|
|
// This matches the SDK's Message interface: {data: string, timestamp: number, topic: string}
|
|
envelope := map[string]interface{}{
|
|
"data": base64.StdEncoding.EncodeToString(b),
|
|
"timestamp": time.Now().UnixMilli(),
|
|
"topic": topic,
|
|
}
|
|
envelopeJSON, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
g.logger.ComponentWarn("gateway", "pubsub ws: failed to marshal envelope",
|
|
zap.String("topic", topic),
|
|
zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
g.logger.ComponentDebug("gateway", "pubsub ws: envelope created",
|
|
zap.String("topic", topic),
|
|
zap.Int("envelope_len", len(envelopeJSON)))
|
|
|
|
conn.SetWriteDeadline(time.Now().Add(30 * time.Second))
|
|
if err := conn.WriteMessage(websocket.TextMessage, envelopeJSON); err != nil {
|
|
g.logger.ComponentWarn("gateway", "pubsub ws: failed to write to websocket",
|
|
zap.String("topic", topic),
|
|
zap.Error(err))
|
|
close(done)
|
|
return
|
|
}
|
|
|
|
g.logger.ComponentInfo("gateway", "pubsub ws: message sent successfully",
|
|
zap.String("topic", topic))
|
|
case <-ticker.C:
|
|
// Ping keepalive
|
|
_ = conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(5*time.Second))
|
|
case <-ctx.Done():
|
|
close(done)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Subscribe to libp2p for cross-node messages (in background, non-blocking)
|
|
go func() {
|
|
h := func(_ string, data []byte) error {
|
|
g.logger.ComponentInfo("gateway", "pubsub ws: received message from libp2p",
|
|
zap.String("topic", topic),
|
|
zap.Int("data_len", len(data)))
|
|
|
|
select {
|
|
case msgs <- data:
|
|
g.logger.ComponentInfo("gateway", "pubsub ws: forwarded to client",
|
|
zap.String("topic", topic),
|
|
zap.String("source", "libp2p"))
|
|
return nil
|
|
default:
|
|
// Drop if client is slow to avoid blocking network
|
|
g.logger.ComponentWarn("gateway", "pubsub ws: client slow, dropping message",
|
|
zap.String("topic", topic))
|
|
return nil
|
|
}
|
|
}
|
|
if err := g.client.PubSub().Subscribe(ctx, topic, h); err != nil {
|
|
g.logger.ComponentWarn("gateway", "pubsub ws: libp2p subscribe failed (will use local-only)",
|
|
zap.String("topic", topic),
|
|
zap.Error(err))
|
|
return
|
|
}
|
|
g.logger.ComponentInfo("gateway", "pubsub ws: libp2p subscription established",
|
|
zap.String("topic", topic))
|
|
|
|
// Keep subscription alive until done
|
|
<-done
|
|
_ = g.client.PubSub().Unsubscribe(ctx, topic)
|
|
g.logger.ComponentInfo("gateway", "pubsub ws: libp2p subscription closed",
|
|
zap.String("topic", topic))
|
|
}()
|
|
|
|
// Reader loop: treat any client message as publish to the same topic
|
|
for {
|
|
mt, data, err := conn.ReadMessage()
|
|
if err != nil {
|
|
break
|
|
}
|
|
if mt != websocket.TextMessage && mt != websocket.BinaryMessage {
|
|
continue
|
|
}
|
|
|
|
// Filter out WebSocket heartbeat messages
|
|
// Don't publish them to the topic
|
|
var msg map[string]interface{}
|
|
if err := json.Unmarshal(data, &msg); err == nil {
|
|
if msgType, ok := msg["type"].(string); ok && msgType == "ping" {
|
|
g.logger.ComponentInfo("gateway", "pubsub ws: filtering out heartbeat ping")
|
|
continue
|
|
}
|
|
}
|
|
|
|
if err := g.client.PubSub().Publish(ctx, topic, data); err != nil {
|
|
// Best-effort notify client
|
|
_ = conn.WriteMessage(websocket.TextMessage, []byte("publish_error"))
|
|
}
|
|
}
|
|
<-done
|
|
}
|
|
|
|
// pubsubPublishHandler handles POST /v1/pubsub/publish {topic, data_base64}
|
|
func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) {
|
|
if g.client == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
|
return
|
|
}
|
|
if r.Method != http.MethodPost {
|
|
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
|
return
|
|
}
|
|
ns := resolveNamespaceFromRequest(r)
|
|
if ns == "" {
|
|
writeError(w, http.StatusForbidden, "namespace not resolved")
|
|
return
|
|
}
|
|
var body struct {
|
|
Topic string `json:"topic"`
|
|
DataB64 string `json:"data_base64"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Topic == "" || body.DataB64 == "" {
|
|
writeError(w, http.StatusBadRequest, "invalid body: expected {topic,data_base64}")
|
|
return
|
|
}
|
|
data, err := base64.StdEncoding.DecodeString(body.DataB64)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid base64 data")
|
|
return
|
|
}
|
|
|
|
// NEW: Check for local websocket subscribers FIRST and deliver directly
|
|
g.mu.RLock()
|
|
localSubs := g.getLocalSubscribers(body.Topic, ns)
|
|
g.mu.RUnlock()
|
|
|
|
localDeliveryCount := 0
|
|
if len(localSubs) > 0 {
|
|
for _, sub := range localSubs {
|
|
select {
|
|
case sub.msgChan <- data:
|
|
localDeliveryCount++
|
|
g.logger.ComponentDebug("gateway", "delivered to local subscriber",
|
|
zap.String("topic", body.Topic))
|
|
default:
|
|
// Drop if buffer full
|
|
g.logger.ComponentWarn("gateway", "local subscriber buffer full, dropping message",
|
|
zap.String("topic", body.Topic))
|
|
}
|
|
}
|
|
}
|
|
|
|
g.logger.ComponentInfo("gateway", "pubsub publish: processing message",
|
|
zap.String("topic", body.Topic),
|
|
zap.String("namespace", ns),
|
|
zap.Int("data_len", len(data)),
|
|
zap.Int("local_subscribers", len(localSubs)),
|
|
zap.Int("local_delivered", localDeliveryCount))
|
|
|
|
// Publish to libp2p asynchronously for cross-node delivery
|
|
// This prevents blocking the HTTP response if libp2p network is slow
|
|
go func() {
|
|
publishCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
ctx := pubsub.WithNamespace(client.WithInternalAuth(publishCtx), ns)
|
|
if err := g.client.PubSub().Publish(ctx, body.Topic, data); err != nil {
|
|
g.logger.ComponentWarn("gateway", "async libp2p publish failed",
|
|
zap.String("topic", body.Topic),
|
|
zap.Error(err))
|
|
} else {
|
|
g.logger.ComponentDebug("gateway", "async libp2p publish succeeded",
|
|
zap.String("topic", body.Topic))
|
|
}
|
|
}()
|
|
|
|
// Return immediately after local delivery
|
|
// Local WebSocket subscribers already received the message
|
|
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
|
|
}
|
|
|
|
// pubsubTopicsHandler lists topics within the caller's namespace
|
|
func (g *Gateway) pubsubTopicsHandler(w http.ResponseWriter, r *http.Request) {
|
|
if g.client == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
|
return
|
|
}
|
|
ns := resolveNamespaceFromRequest(r)
|
|
if ns == "" {
|
|
writeError(w, http.StatusForbidden, "namespace not resolved")
|
|
return
|
|
}
|
|
// Apply namespace isolation
|
|
ctx := pubsub.WithNamespace(client.WithInternalAuth(r.Context()), ns)
|
|
all, err := g.client.PubSub().ListTopics(ctx)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
// Client returns topics already trimmed to its namespace; return as-is
|
|
writeJSON(w, http.StatusOK, map[string]any{"topics": all})
|
|
}
|
|
|
|
// resolveNamespaceFromRequest gets namespace from context set by auth middleware
|
|
func resolveNamespaceFromRequest(r *http.Request) string {
|
|
if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil {
|
|
if s, ok := v.(string); ok {
|
|
return s
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func namespacePrefix(ns string) string {
|
|
return "ns::" + ns + "::"
|
|
}
|
|
|
|
func namespacedTopic(ns, topic string) string {
|
|
return namespacePrefix(ns) + topic
|
|
}
|
|
|
|
// pubsubPresenceHandler handles GET /v1/pubsub/presence?topic=mytopic
|
|
func (g *Gateway) pubsubPresenceHandler(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
|
return
|
|
}
|
|
|
|
ns := resolveNamespaceFromRequest(r)
|
|
if ns == "" {
|
|
writeError(w, http.StatusForbidden, "namespace not resolved")
|
|
return
|
|
}
|
|
|
|
topic := r.URL.Query().Get("topic")
|
|
if topic == "" {
|
|
writeError(w, http.StatusBadRequest, "missing 'topic'")
|
|
return
|
|
}
|
|
|
|
topicKey := fmt.Sprintf("%s.%s", ns, topic)
|
|
|
|
g.presenceMu.RLock()
|
|
members, ok := g.presenceMembers[topicKey]
|
|
g.presenceMu.RUnlock()
|
|
|
|
if !ok {
|
|
writeJSON(w, http.StatusOK, map[string]any{
|
|
"topic": topic,
|
|
"members": []PresenceMember{},
|
|
"count": 0,
|
|
})
|
|
return
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, map[string]any{
|
|
"topic": topic,
|
|
"members": members,
|
|
"count": len(members),
|
|
})
|
|
}
|