mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 21:46:57 +00:00
- Invalidate plaintext refresh tokens (migration 019) - Add `--sign` flag to `orama build` for rootwallet manifest signing - Add `--ca-fingerprint` TOFU verification for production joins/invites - Save cluster secrets from join (RQLite auth, Olric key, IPFS peers) - Add RQLite auth config fields
112 lines
3.1 KiB
Go
112 lines
3.1 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/DeBrosOfficial/network/pkg/logging"
|
|
"github.com/gorilla/websocket"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var wsUpgrader = websocket.Upgrader{
|
|
ReadBufferSize: 1024,
|
|
WriteBufferSize: 1024,
|
|
CheckOrigin: checkWSOrigin,
|
|
}
|
|
|
|
// checkWSOrigin validates WebSocket origins against the request's Host header.
|
|
// Non-browser clients (no Origin) are allowed. Browser clients must match the host.
|
|
func checkWSOrigin(r *http.Request) bool {
|
|
origin := r.Header.Get("Origin")
|
|
if origin == "" {
|
|
return true
|
|
}
|
|
host := r.Host
|
|
if host == "" {
|
|
return false
|
|
}
|
|
if idx := strings.LastIndex(host, ":"); idx != -1 {
|
|
host = host[:idx]
|
|
}
|
|
parsed, err := url.Parse(origin)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
originHost := parsed.Hostname()
|
|
return originHost == host || strings.HasSuffix(originHost, "."+host)
|
|
}
|
|
|
|
// wsClient wraps a WebSocket connection with message handling
|
|
type wsClient struct {
|
|
conn *websocket.Conn
|
|
topic string
|
|
logger *logging.ColoredLogger
|
|
}
|
|
|
|
// newWSClient creates a new WebSocket client wrapper
|
|
func newWSClient(conn *websocket.Conn, topic string, logger *logging.ColoredLogger) *wsClient {
|
|
return &wsClient{
|
|
conn: conn,
|
|
topic: topic,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// writeMessage sends a message to the WebSocket client with proper envelope formatting
|
|
func (c *wsClient) writeMessage(data []byte) error {
|
|
c.logger.ComponentInfo("gateway", "pubsub ws: sending message to client",
|
|
zap.String("topic", c.topic),
|
|
zap.Int("data_len", len(data)))
|
|
|
|
// Format message as JSON envelope with data (base64 encoded), timestamp, and topic
|
|
// This matches the SDK's Message interface: {data: string, timestamp: number, topic: string}
|
|
envelope := map[string]interface{}{
|
|
"data": base64.StdEncoding.EncodeToString(data),
|
|
"timestamp": time.Now().UnixMilli(),
|
|
"topic": c.topic,
|
|
}
|
|
envelopeJSON, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
c.logger.ComponentWarn("gateway", "pubsub ws: failed to marshal envelope",
|
|
zap.String("topic", c.topic),
|
|
zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
c.logger.ComponentDebug("gateway", "pubsub ws: envelope created",
|
|
zap.String("topic", c.topic),
|
|
zap.Int("envelope_len", len(envelopeJSON)))
|
|
|
|
c.conn.SetWriteDeadline(time.Now().Add(30 * time.Second))
|
|
if err := c.conn.WriteMessage(websocket.TextMessage, envelopeJSON); err != nil {
|
|
c.logger.ComponentWarn("gateway", "pubsub ws: failed to write to websocket",
|
|
zap.String("topic", c.topic),
|
|
zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
c.logger.ComponentInfo("gateway", "pubsub ws: message sent successfully",
|
|
zap.String("topic", c.topic))
|
|
return nil
|
|
}
|
|
|
|
// writeControl sends a WebSocket control message
|
|
func (c *wsClient) writeControl(messageType int, data []byte, deadline time.Time) error {
|
|
return c.conn.WriteControl(messageType, data, deadline)
|
|
}
|
|
|
|
// readMessage reads a message from the WebSocket client
|
|
func (c *wsClient) readMessage() (messageType int, data []byte, err error) {
|
|
return c.conn.ReadMessage()
|
|
}
|
|
|
|
// close closes the WebSocket connection
|
|
func (c *wsClient) close() error {
|
|
return c.conn.Close()
|
|
}
|