diff --git a/src/core/http.ts b/src/core/http.ts index 5002a07..606d5b1 100644 --- a/src/core/http.ts +++ b/src/core/http.ts @@ -19,7 +19,7 @@ export class HttpClient { constructor(config: HttpClientConfig) { this.baseURL = config.baseURL.replace(/\/$/, ""); - this.timeout = config.timeout ?? 30000; + this.timeout = config.timeout ?? 60000; // Increased from 30s to 60s for pub/sub operations this.maxRetries = config.maxRetries ?? 3; this.retryDelayMs = config.retryDelayMs ?? 1000; this.fetch = config.fetch ?? globalThis.fetch; diff --git a/src/core/ws.ts b/src/core/ws.ts index d1734c7..8c714c4 100644 --- a/src/core/ws.ts +++ b/src/core/ws.ts @@ -4,10 +4,6 @@ import { SDKError } from "../errors"; export interface WSClientConfig { wsURL: string; timeout?: number; - maxReconnectAttempts?: number; - reconnectDelayMs?: number; - heartbeatIntervalMs?: number; - authMode?: "header" | "query"; authToken?: string; WebSocket?: typeof WebSocket; } @@ -15,44 +11,41 @@ export interface WSClientConfig { export type WSMessageHandler = (data: string) => void; export type WSErrorHandler = (error: Error) => void; export type WSCloseHandler = () => void; +export type WSOpenHandler = () => void; +/** + * Simple WebSocket client with minimal abstractions + * No complex reconnection, no heartbeats - keep it simple + */ export class WSClient { private url: string; private timeout: number; - private maxReconnectAttempts: number; - private reconnectDelayMs: number; - private heartbeatIntervalMs: number; - private authMode: "header" | "query"; private authToken?: string; private WebSocketClass: typeof WebSocket; private ws?: WebSocket; - private reconnectAttempts = 0; - private heartbeatInterval?: NodeJS.Timeout; private messageHandlers: Set = new Set(); private errorHandlers: Set = new Set(); private closeHandlers: Set = new Set(); - private isManuallyClosed = false; + private openHandlers: Set = new Set(); + private isClosed = false; constructor(config: WSClientConfig) { this.url = config.wsURL; this.timeout = config.timeout ?? 30000; - this.maxReconnectAttempts = config.maxReconnectAttempts ?? 5; - this.reconnectDelayMs = config.reconnectDelayMs ?? 1000; - this.heartbeatIntervalMs = config.heartbeatIntervalMs ?? 30000; - this.authMode = config.authMode ?? "header"; this.authToken = config.authToken; this.WebSocketClass = config.WebSocket ?? WebSocket; } + /** + * Connect to WebSocket server + */ connect(): Promise { return new Promise((resolve, reject) => { try { const wsUrl = this.buildWSUrl(); this.ws = new this.WebSocketClass(wsUrl); - - // Note: Custom headers via ws library in Node.js are not sent with WebSocket upgrade requests - // so we rely on query parameters for authentication + this.isClosed = false; const timeout = setTimeout(() => { this.ws?.close(); @@ -63,8 +56,8 @@ export class WSClient { this.ws.addEventListener("open", () => { clearTimeout(timeout); - this.reconnectAttempts = 0; - this.startHeartbeat(); + console.log("[WSClient] Connected to", this.url); + this.openHandlers.forEach((handler) => handler()); resolve(); }); @@ -82,12 +75,8 @@ export class WSClient { this.ws.addEventListener("close", () => { clearTimeout(timeout); - this.stopHeartbeat(); - if (!this.isManuallyClosed) { - this.attemptReconnect(); - } else { - this.closeHandlers.forEach((handler) => handler()); - } + console.log("[WSClient] Connection closed"); + this.closeHandlers.forEach((handler) => handler()); }); } catch (error) { reject(error); @@ -95,11 +84,12 @@ export class WSClient { }); } + /** + * Build WebSocket URL with auth token + */ private buildWSUrl(): string { let url = this.url; - // Always append auth token as query parameter for compatibility - // Works in both Node.js and browser environments if (this.authToken) { const separator = url.includes("?") ? "&" : "?"; const paramName = this.authToken.startsWith("ak_") ? "api_key" : "token"; @@ -109,68 +99,91 @@ export class WSClient { return url; } - private startHeartbeat() { - this.heartbeatInterval = setInterval(() => { - if (this.ws?.readyState === WebSocket.OPEN) { - this.ws.send(JSON.stringify({ type: "ping" })); - } - }, this.heartbeatIntervalMs); - } - - private stopHeartbeat() { - if (this.heartbeatInterval) { - clearInterval(this.heartbeatInterval); - this.heartbeatInterval = undefined; - } - } - - private attemptReconnect() { - if (this.reconnectAttempts < this.maxReconnectAttempts) { - this.reconnectAttempts++; - const delayMs = this.reconnectDelayMs * this.reconnectAttempts; - setTimeout(() => { - this.connect().catch((error) => { - this.errorHandlers.forEach((handler) => handler(error)); - }); - }, delayMs); - } else { - this.closeHandlers.forEach((handler) => handler()); - } - } - - onMessage(handler: WSMessageHandler) { + /** + * Register message handler + */ + onMessage(handler: WSMessageHandler): () => void { this.messageHandlers.add(handler); return () => this.messageHandlers.delete(handler); } - onError(handler: WSErrorHandler) { + /** + * Unregister message handler + */ + offMessage(handler: WSMessageHandler): void { + this.messageHandlers.delete(handler); + } + + /** + * Register error handler + */ + onError(handler: WSErrorHandler): () => void { this.errorHandlers.add(handler); return () => this.errorHandlers.delete(handler); } - onClose(handler: WSCloseHandler) { + /** + * Unregister error handler + */ + offError(handler: WSErrorHandler): void { + this.errorHandlers.delete(handler); + } + + /** + * Register close handler + */ + onClose(handler: WSCloseHandler): () => void { this.closeHandlers.add(handler); return () => this.closeHandlers.delete(handler); } - send(data: string) { + /** + * Unregister close handler + */ + offClose(handler: WSCloseHandler): void { + this.closeHandlers.delete(handler); + } + + /** + * Register open handler + */ + onOpen(handler: WSOpenHandler): () => void { + this.openHandlers.add(handler); + return () => this.openHandlers.delete(handler); + } + + /** + * Send data through WebSocket + */ + send(data: string): void { if (this.ws?.readyState !== WebSocket.OPEN) { throw new SDKError("WebSocket is not connected", 500, "WS_NOT_CONNECTED"); } this.ws.send(data); } - close() { - this.isManuallyClosed = true; - this.stopHeartbeat(); + /** + * Close WebSocket connection + */ + close(): void { + if (this.isClosed) { + return; + } + this.isClosed = true; this.ws?.close(); } + /** + * Check if WebSocket is connected + */ isConnected(): boolean { - return this.ws?.readyState === WebSocket.OPEN; + return !this.isClosed && this.ws?.readyState === WebSocket.OPEN; } - setAuthToken(token?: string) { + /** + * Update auth token + */ + setAuthToken(token?: string): void { this.authToken = token; } } diff --git a/src/pubsub/client.ts b/src/pubsub/client.ts index 554dd07..912cfc7 100644 --- a/src/pubsub/client.ts +++ b/src/pubsub/client.ts @@ -16,10 +16,8 @@ export interface RawEnvelope { // Cross-platform base64 encoding/decoding utilities function base64Encode(str: string): string { if (typeof Buffer !== "undefined") { - // Node.js environment return Buffer.from(str).toString("base64"); } else if (typeof btoa !== "undefined") { - // Browser/React Native environment return btoa( encodeURIComponent(str).replace(/%([0-9A-F]{2})/g, (match, p1) => String.fromCharCode(parseInt(p1, 16)) @@ -31,10 +29,8 @@ function base64Encode(str: string): string { function base64EncodeBytes(bytes: Uint8Array): string { if (typeof Buffer !== "undefined") { - // Node.js environment return Buffer.from(bytes).toString("base64"); } else if (typeof btoa !== "undefined") { - // Browser/React Native environment let binary = ""; for (let i = 0; i < bytes.length; i++) { binary += String.fromCharCode(bytes[i]); @@ -46,10 +42,8 @@ function base64EncodeBytes(bytes: Uint8Array): string { function base64Decode(b64: string): string { if (typeof Buffer !== "undefined") { - // Node.js environment return Buffer.from(b64, "base64").toString("utf-8"); } else if (typeof atob !== "undefined") { - // Browser/React Native environment const binary = atob(b64); const bytes = new Uint8Array(binary.length); for (let i = 0; i < binary.length; i++) { @@ -63,8 +57,11 @@ function base64Decode(b64: string): string { export type MessageHandler = (message: Message) => void; export type ErrorHandler = (error: Error) => void; export type CloseHandler = () => void; -export type RawMessageHandler = (envelope: RawEnvelope) => void; +/** + * Simple PubSub client - one WebSocket connection per topic + * No connection pooling, no reference counting - keep it simple + */ export class PubSubClient { private httpClient: HttpClient; private wsConfig: Partial; @@ -75,23 +72,18 @@ export class PubSubClient { } /** - * Publish a message to a topic. + * Publish a message to a topic via HTTP */ async publish(topic: string, data: string | Uint8Array): Promise { let dataBase64: string; if (typeof data === "string") { dataBase64 = base64Encode(data); } else { - // Encode bytes directly to preserve binary data dataBase64 = base64EncodeBytes(data); } - console.log("[PubSubClient] Publishing message:", { - topic, - data: typeof data === "string" ? data : `<${data.length} bytes>`, - }); + console.log("[PubSubClient] Publishing to topic:", topic); - // Use longer timeout for pub/sub operations (60s instead of default 30s) await this.httpClient.post( "/v1/pubsub/publish", { @@ -99,13 +91,13 @@ export class PubSubClient { data_base64: dataBase64, }, { - timeout: 60000, // 60 seconds + timeout: 30000, } ); } /** - * List active topics in the current namespace. + * List active topics in the current namespace */ async topics(): Promise { const response = await this.httpClient.get<{ topics: string[] }>( @@ -115,8 +107,8 @@ export class PubSubClient { } /** - * Subscribe to a topic via WebSocket. - * Returns a subscription object with event handlers. + * Subscribe to a topic via WebSocket + * Creates one WebSocket connection per topic */ async subscribe( topic: string, @@ -124,19 +116,24 @@ export class PubSubClient { onMessage?: MessageHandler; onError?: ErrorHandler; onClose?: CloseHandler; - onRaw?: RawMessageHandler; } = {} ): Promise { + // Build WebSocket URL for this topic const wsUrl = new URL(this.wsConfig.wsURL || "ws://localhost:6001"); wsUrl.pathname = "/v1/pubsub/ws"; wsUrl.searchParams.set("topic", topic); + // Create WebSocket client const wsClient = new WSClient({ ...this.wsConfig, wsURL: wsUrl.toString(), authToken: this.httpClient.getToken(), }); + console.log("[PubSubClient] Connecting to topic:", topic); + await wsClient.connect(); + + // Create subscription wrapper const subscription = new Subscription(wsClient, topic); if (handlers.onMessage) { @@ -148,93 +145,61 @@ export class PubSubClient { if (handlers.onClose) { subscription.onClose(handlers.onClose); } - if (handlers.onRaw) { - subscription.onRaw(handlers.onRaw); - } - await wsClient.connect(); return subscription; } } +/** + * Subscription represents an active WebSocket subscription to a topic + */ export class Subscription { private wsClient: WSClient; private topic: string; private messageHandlers: Set = new Set(); private errorHandlers: Set = new Set(); private closeHandlers: Set = new Set(); - private rawHandlers: Set = new Set(); + private isClosed = false; + private wsMessageHandler: ((data: string) => void) | null = null; + private wsErrorHandler: ((error: Error) => void) | null = null; + private wsCloseHandler: (() => void) | null = null; constructor(wsClient: WSClient, topic: string) { this.wsClient = wsClient; this.topic = topic; - this.wsClient.onMessage((data) => { + // Register message handler + this.wsMessageHandler = (data) => { try { // Parse gateway JSON envelope: {data: base64String, timestamp, topic} - let envelope: RawEnvelope; - try { - envelope = JSON.parse(data); + const envelope: RawEnvelope = JSON.parse(data); - // Validate envelope structure - if (!envelope || typeof envelope !== "object") { - throw new Error("Invalid envelope: not an object"); - } - if (!envelope.data || typeof envelope.data !== "string") { - throw new Error("Invalid envelope: missing or invalid data field"); - } - if (!envelope.topic || typeof envelope.topic !== "string") { - throw new Error("Invalid envelope: missing or invalid topic field"); - } - if (typeof envelope.timestamp !== "number") { - throw new Error( - "Invalid envelope: missing or invalid timestamp field" - ); - } - - // Validate topic matches subscription - if (envelope.topic !== this.topic) { - console.warn( - `[Subscription] Topic mismatch: expected ${this.topic}, got ${envelope.topic}` - ); - } - } catch (parseError) { - console.error("[Subscription] Failed to parse envelope:", parseError); - this.errorHandlers.forEach((handler) => - handler( - parseError instanceof Error - ? parseError - : new Error(String(parseError)) - ) - ); - return; + // Validate envelope structure + if (!envelope || typeof envelope !== "object") { + throw new Error("Invalid envelope: not an object"); + } + if (!envelope.data || typeof envelope.data !== "string") { + throw new Error("Invalid envelope: missing or invalid data field"); + } + if (!envelope.topic || typeof envelope.topic !== "string") { + throw new Error("Invalid envelope: missing or invalid topic field"); + } + if (typeof envelope.timestamp !== "number") { + throw new Error( + "Invalid envelope: missing or invalid timestamp field" + ); } - - // Call raw handlers for debugging - this.rawHandlers.forEach((handler) => handler(envelope)); // Decode base64 data - let messageData: string; - try { - messageData = base64Decode(envelope.data); - } catch (decodeError) { - console.error("[Subscription] Base64 decode failed:", decodeError); - this.errorHandlers.forEach((handler) => - handler( - decodeError instanceof Error - ? decodeError - : new Error(String(decodeError)) - ) - ); - return; - } + const messageData = base64Decode(envelope.data); const message: Message = { topic: envelope.topic, data: messageData, timestamp: envelope.timestamp, }; - console.log("[Subscription] Received message:", message); + + console.log("[Subscription] Received message on topic:", this.topic); this.messageHandlers.forEach((handler) => handler(message)); } catch (error) { console.error("[Subscription] Error processing message:", error); @@ -242,42 +207,83 @@ export class Subscription { handler(error instanceof Error ? error : new Error(String(error))) ); } - }); + }; - this.wsClient.onError((error) => { + this.wsClient.onMessage(this.wsMessageHandler); + + // Register error handler + this.wsErrorHandler = (error) => { this.errorHandlers.forEach((handler) => handler(error)); - }); + }; + this.wsClient.onError(this.wsErrorHandler); - this.wsClient.onClose(() => { + // Register close handler + this.wsCloseHandler = () => { this.closeHandlers.forEach((handler) => handler()); - }); + }; + this.wsClient.onClose(this.wsCloseHandler); } - onMessage(handler: MessageHandler) { + /** + * Register message handler + */ + onMessage(handler: MessageHandler): () => void { this.messageHandlers.add(handler); return () => this.messageHandlers.delete(handler); } - onError(handler: ErrorHandler) { + /** + * Register error handler + */ + onError(handler: ErrorHandler): () => void { this.errorHandlers.add(handler); return () => this.errorHandlers.delete(handler); } - onClose(handler: CloseHandler) { + /** + * Register close handler + */ + onClose(handler: CloseHandler): () => void { this.closeHandlers.add(handler); return () => this.closeHandlers.delete(handler); } - onRaw(handler: RawMessageHandler) { - this.rawHandlers.add(handler); - return () => this.rawHandlers.delete(handler); - } + /** + * Close subscription and underlying WebSocket + */ + close(): void { + if (this.isClosed) { + return; + } + this.isClosed = true; - close() { + // Remove handlers from WSClient + if (this.wsMessageHandler) { + this.wsClient.offMessage(this.wsMessageHandler); + this.wsMessageHandler = null; + } + if (this.wsErrorHandler) { + this.wsClient.offError(this.wsErrorHandler); + this.wsErrorHandler = null; + } + if (this.wsCloseHandler) { + this.wsClient.offClose(this.wsCloseHandler); + this.wsCloseHandler = null; + } + + // Clear all local handlers + this.messageHandlers.clear(); + this.errorHandlers.clear(); + this.closeHandlers.clear(); + + // Close WebSocket connection this.wsClient.close(); } + /** + * Check if subscription is active + */ isConnected(): boolean { - return this.wsClient.isConnected(); + return !this.isClosed && this.wsClient.isConnected(); } }