diff --git a/README.md b/README.md index 914778b..2dcce81 100644 --- a/README.md +++ b/README.md @@ -122,10 +122,23 @@ const results = await client.db.transaction([ ### Pub/Sub Messaging +The SDK provides a robust pub/sub client with: + +- **Multi-subscriber support**: Multiple connections can subscribe to the same topic +- **Namespace isolation**: Topics are scoped to your authenticated namespace +- **Server timestamps**: Messages preserve server-side timestamps +- **Binary-safe**: Supports both string and binary (`Uint8Array`) payloads +- **Strict envelope validation**: Type-safe message parsing with error handling + #### Publish a Message ```typescript +// Publish a string message await client.pubsub.publish("notifications", "Hello, Network!"); + +// Publish binary data +const binaryData = new Uint8Array([1, 2, 3, 4]); +await client.pubsub.publish("binary-topic", binaryData); ``` #### Subscribe to Topics @@ -133,7 +146,9 @@ await client.pubsub.publish("notifications", "Hello, Network!"); ```typescript const subscription = await client.pubsub.subscribe("notifications", { onMessage: (msg) => { - console.log("Received:", msg.data); + console.log("Topic:", msg.topic); + console.log("Data:", msg.data); + console.log("Server timestamp:", new Date(msg.timestamp)); }, onError: (err) => { console.error("Subscription error:", err); @@ -147,6 +162,52 @@ const subscription = await client.pubsub.subscribe("notifications", { subscription.close(); ``` +**Message Interface:** + +```typescript +interface Message { + data: string; // Decoded message payload (string) + topic: string; // Topic name + timestamp: number; // Server timestamp in milliseconds +} +``` + +#### Debug Raw Envelopes + +For debugging, you can inspect raw message envelopes before decoding: + +```typescript +const subscription = await client.pubsub.subscribe("notifications", { + onMessage: (msg) => { + console.log("Decoded message:", msg.data); + }, + onRaw: (envelope) => { + console.log("Raw envelope:", envelope); + // { data: "base64...", timestamp: 1234567890, topic: "notifications" } + }, +}); +``` + +#### Multi-Subscriber Support + +Multiple subscriptions to the same topic are supported. Each receives its own copy of messages: + +```typescript +// First subscriber +const sub1 = await client.pubsub.subscribe("events", { + onMessage: (msg) => console.log("Sub1:", msg.data), +}); + +// Second subscriber (both receive messages) +const sub2 = await client.pubsub.subscribe("events", { + onMessage: (msg) => console.log("Sub2:", msg.data), +}); + +// Unsubscribe independently +sub1.close(); // sub2 still active +sub2.close(); // fully unsubscribed +``` + #### List Topics ```typescript diff --git a/src/core/http.ts b/src/core/http.ts index e912720..5002a07 100644 --- a/src/core/http.ts +++ b/src/core/http.ts @@ -56,6 +56,7 @@ export class HttpClient { body?: any; headers?: Record; query?: Record; + timeout?: number; // Per-request timeout override } = {} ): Promise { const url = new URL(this.baseURL + path); @@ -71,17 +72,25 @@ export class HttpClient { ...options.headers, }; + const controller = new AbortController(); + const requestTimeout = options.timeout ?? this.timeout; // Use override or default + const timeoutId = setTimeout(() => controller.abort(), requestTimeout); + const fetchOptions: RequestInit = { method, headers, - signal: AbortSignal.timeout(this.timeout), + signal: controller.signal, }; if (options.body !== undefined) { fetchOptions.body = JSON.stringify(options.body); } - return this.requestWithRetry(url.toString(), fetchOptions); + try { + return await this.requestWithRetry(url.toString(), fetchOptions); + } finally { + clearTimeout(timeoutId); + } } private async requestWithRetry( diff --git a/src/pubsub/client.ts b/src/pubsub/client.ts index c934ef3..554dd07 100644 --- a/src/pubsub/client.ts +++ b/src/pubsub/client.ts @@ -4,7 +4,13 @@ import { WSClient, WSClientConfig } from "../core/ws"; export interface Message { data: string; topic: string; - timestamp?: number; + timestamp: number; +} + +export interface RawEnvelope { + data: string; // base64-encoded + timestamp: number; + topic: string; } // Cross-platform base64 encoding/decoding utilities @@ -23,6 +29,21 @@ function base64Encode(str: string): string { throw new Error("No base64 encoding method available"); } +function base64EncodeBytes(bytes: Uint8Array): string { + if (typeof Buffer !== "undefined") { + // Node.js environment + return Buffer.from(bytes).toString("base64"); + } else if (typeof btoa !== "undefined") { + // Browser/React Native environment + let binary = ""; + for (let i = 0; i < bytes.length; i++) { + binary += String.fromCharCode(bytes[i]); + } + return btoa(binary); + } + throw new Error("No base64 encoding method available"); +} + function base64Decode(b64: string): string { if (typeof Buffer !== "undefined") { // Node.js environment @@ -42,6 +63,7 @@ function base64Decode(b64: string): string { export type MessageHandler = (message: Message) => void; export type ErrorHandler = (error: Error) => void; export type CloseHandler = () => void; +export type RawMessageHandler = (envelope: RawEnvelope) => void; export class PubSubClient { private httpClient: HttpClient; @@ -60,16 +82,26 @@ export class PubSubClient { if (typeof data === "string") { dataBase64 = base64Encode(data); } else { - // Convert Uint8Array to string first - const decoder = new TextDecoder(); - const str = decoder.decode(data); - dataBase64 = base64Encode(str); + // Encode bytes directly to preserve binary data + dataBase64 = base64EncodeBytes(data); } - await this.httpClient.post("/v1/pubsub/publish", { + console.log("[PubSubClient] Publishing message:", { topic, - data_base64: dataBase64, + data: typeof data === "string" ? data : `<${data.length} bytes>`, }); + + // Use longer timeout for pub/sub operations (60s instead of default 30s) + await this.httpClient.post( + "/v1/pubsub/publish", + { + topic, + data_base64: dataBase64, + }, + { + timeout: 60000, // 60 seconds + } + ); } /** @@ -92,6 +124,7 @@ export class PubSubClient { onMessage?: MessageHandler; onError?: ErrorHandler; onClose?: CloseHandler; + onRaw?: RawMessageHandler; } = {} ): Promise { const wsUrl = new URL(this.wsConfig.wsURL || "ws://localhost:6001"); @@ -115,6 +148,9 @@ export class PubSubClient { if (handlers.onClose) { subscription.onClose(handlers.onClose); } + if (handlers.onRaw) { + subscription.onRaw(handlers.onRaw); + } await wsClient.connect(); return subscription; @@ -127,6 +163,7 @@ export class Subscription { private messageHandlers: Set = new Set(); private errorHandlers: Set = new Set(); private closeHandlers: Set = new Set(); + private rawHandlers: Set = new Set(); constructor(wsClient: WSClient, topic: string) { this.wsClient = wsClient; @@ -134,34 +171,70 @@ export class Subscription { this.wsClient.onMessage((data) => { try { - let messageData = data; - // Parse gateway JSON envelope: {data: base64String, timestamp, topic} + let envelope: RawEnvelope; try { - const envelope = JSON.parse(data); - if (envelope.data && typeof envelope.data === "string") { - // The gateway sends base64-encoded data in the 'data' field - // Decode it back to the original string - try { - messageData = base64Decode(envelope.data); - } catch (decodeError) { - console.error( - "[Subscription] Base64 decode failed:", - decodeError - ); - // If base64 decode fails, use the envelope data as-is - messageData = envelope.data; - } + envelope = JSON.parse(data); + + // Validate envelope structure + if (!envelope || typeof envelope !== "object") { + throw new Error("Invalid envelope: not an object"); + } + if (!envelope.data || typeof envelope.data !== "string") { + throw new Error("Invalid envelope: missing or invalid data field"); + } + if (!envelope.topic || typeof envelope.topic !== "string") { + throw new Error("Invalid envelope: missing or invalid topic field"); + } + if (typeof envelope.timestamp !== "number") { + throw new Error( + "Invalid envelope: missing or invalid timestamp field" + ); + } + + // Validate topic matches subscription + if (envelope.topic !== this.topic) { + console.warn( + `[Subscription] Topic mismatch: expected ${this.topic}, got ${envelope.topic}` + ); } } catch (parseError) { - // If not JSON, use the data as-is + console.error("[Subscription] Failed to parse envelope:", parseError); + this.errorHandlers.forEach((handler) => + handler( + parseError instanceof Error + ? parseError + : new Error(String(parseError)) + ) + ); + return; + } + + // Call raw handlers for debugging + this.rawHandlers.forEach((handler) => handler(envelope)); + + // Decode base64 data + let messageData: string; + try { + messageData = base64Decode(envelope.data); + } catch (decodeError) { + console.error("[Subscription] Base64 decode failed:", decodeError); + this.errorHandlers.forEach((handler) => + handler( + decodeError instanceof Error + ? decodeError + : new Error(String(decodeError)) + ) + ); + return; } const message: Message = { - topic: this.topic, + topic: envelope.topic, data: messageData, - timestamp: Date.now(), + timestamp: envelope.timestamp, }; + console.log("[Subscription] Received message:", message); this.messageHandlers.forEach((handler) => handler(message)); } catch (error) { console.error("[Subscription] Error processing message:", error); @@ -195,6 +268,11 @@ export class Subscription { return () => this.closeHandlers.delete(handler); } + onRaw(handler: RawMessageHandler) { + this.rawHandlers.add(handler); + return () => this.rawHandlers.delete(handler); + } + close() { this.wsClient.close(); }