Increase default timeout in HttpClient to 60 seconds for pub/sub operations; simplify WSClient by removing unused configuration options and enhancing connection handling with open and close event handlers; refactor PubSubClient to improve WebSocket subscription management and message handling.

This commit is contained in:
anonpenguin23 2025-10-28 10:25:10 +02:00
parent 2de0cb1983
commit c6dfb0bfed
3 changed files with 178 additions and 159 deletions

View File

@ -19,7 +19,7 @@ export class HttpClient {
constructor(config: HttpClientConfig) {
this.baseURL = config.baseURL.replace(/\/$/, "");
this.timeout = config.timeout ?? 30000;
this.timeout = config.timeout ?? 60000; // Increased from 30s to 60s for pub/sub operations
this.maxRetries = config.maxRetries ?? 3;
this.retryDelayMs = config.retryDelayMs ?? 1000;
this.fetch = config.fetch ?? globalThis.fetch;

View File

@ -4,10 +4,6 @@ import { SDKError } from "../errors";
export interface WSClientConfig {
wsURL: string;
timeout?: number;
maxReconnectAttempts?: number;
reconnectDelayMs?: number;
heartbeatIntervalMs?: number;
authMode?: "header" | "query";
authToken?: string;
WebSocket?: typeof WebSocket;
}
@ -15,44 +11,41 @@ export interface WSClientConfig {
export type WSMessageHandler = (data: string) => void;
export type WSErrorHandler = (error: Error) => void;
export type WSCloseHandler = () => void;
export type WSOpenHandler = () => void;
/**
* Simple WebSocket client with minimal abstractions
* No complex reconnection, no heartbeats - keep it simple
*/
export class WSClient {
private url: string;
private timeout: number;
private maxReconnectAttempts: number;
private reconnectDelayMs: number;
private heartbeatIntervalMs: number;
private authMode: "header" | "query";
private authToken?: string;
private WebSocketClass: typeof WebSocket;
private ws?: WebSocket;
private reconnectAttempts = 0;
private heartbeatInterval?: NodeJS.Timeout;
private messageHandlers: Set<WSMessageHandler> = new Set();
private errorHandlers: Set<WSErrorHandler> = new Set();
private closeHandlers: Set<WSCloseHandler> = new Set();
private isManuallyClosed = false;
private openHandlers: Set<WSOpenHandler> = new Set();
private isClosed = false;
constructor(config: WSClientConfig) {
this.url = config.wsURL;
this.timeout = config.timeout ?? 30000;
this.maxReconnectAttempts = config.maxReconnectAttempts ?? 5;
this.reconnectDelayMs = config.reconnectDelayMs ?? 1000;
this.heartbeatIntervalMs = config.heartbeatIntervalMs ?? 30000;
this.authMode = config.authMode ?? "header";
this.authToken = config.authToken;
this.WebSocketClass = config.WebSocket ?? WebSocket;
}
/**
* Connect to WebSocket server
*/
connect(): Promise<void> {
return new Promise((resolve, reject) => {
try {
const wsUrl = this.buildWSUrl();
this.ws = new this.WebSocketClass(wsUrl);
// Note: Custom headers via ws library in Node.js are not sent with WebSocket upgrade requests
// so we rely on query parameters for authentication
this.isClosed = false;
const timeout = setTimeout(() => {
this.ws?.close();
@ -63,8 +56,8 @@ export class WSClient {
this.ws.addEventListener("open", () => {
clearTimeout(timeout);
this.reconnectAttempts = 0;
this.startHeartbeat();
console.log("[WSClient] Connected to", this.url);
this.openHandlers.forEach((handler) => handler());
resolve();
});
@ -82,12 +75,8 @@ export class WSClient {
this.ws.addEventListener("close", () => {
clearTimeout(timeout);
this.stopHeartbeat();
if (!this.isManuallyClosed) {
this.attemptReconnect();
} else {
this.closeHandlers.forEach((handler) => handler());
}
console.log("[WSClient] Connection closed");
this.closeHandlers.forEach((handler) => handler());
});
} catch (error) {
reject(error);
@ -95,11 +84,12 @@ export class WSClient {
});
}
/**
* Build WebSocket URL with auth token
*/
private buildWSUrl(): string {
let url = this.url;
// Always append auth token as query parameter for compatibility
// Works in both Node.js and browser environments
if (this.authToken) {
const separator = url.includes("?") ? "&" : "?";
const paramName = this.authToken.startsWith("ak_") ? "api_key" : "token";
@ -109,68 +99,91 @@ export class WSClient {
return url;
}
private startHeartbeat() {
this.heartbeatInterval = setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: "ping" }));
}
}, this.heartbeatIntervalMs);
}
private stopHeartbeat() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = undefined;
}
}
private attemptReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delayMs = this.reconnectDelayMs * this.reconnectAttempts;
setTimeout(() => {
this.connect().catch((error) => {
this.errorHandlers.forEach((handler) => handler(error));
});
}, delayMs);
} else {
this.closeHandlers.forEach((handler) => handler());
}
}
onMessage(handler: WSMessageHandler) {
/**
* Register message handler
*/
onMessage(handler: WSMessageHandler): () => void {
this.messageHandlers.add(handler);
return () => this.messageHandlers.delete(handler);
}
onError(handler: WSErrorHandler) {
/**
* Unregister message handler
*/
offMessage(handler: WSMessageHandler): void {
this.messageHandlers.delete(handler);
}
/**
* Register error handler
*/
onError(handler: WSErrorHandler): () => void {
this.errorHandlers.add(handler);
return () => this.errorHandlers.delete(handler);
}
onClose(handler: WSCloseHandler) {
/**
* Unregister error handler
*/
offError(handler: WSErrorHandler): void {
this.errorHandlers.delete(handler);
}
/**
* Register close handler
*/
onClose(handler: WSCloseHandler): () => void {
this.closeHandlers.add(handler);
return () => this.closeHandlers.delete(handler);
}
send(data: string) {
/**
* Unregister close handler
*/
offClose(handler: WSCloseHandler): void {
this.closeHandlers.delete(handler);
}
/**
* Register open handler
*/
onOpen(handler: WSOpenHandler): () => void {
this.openHandlers.add(handler);
return () => this.openHandlers.delete(handler);
}
/**
* Send data through WebSocket
*/
send(data: string): void {
if (this.ws?.readyState !== WebSocket.OPEN) {
throw new SDKError("WebSocket is not connected", 500, "WS_NOT_CONNECTED");
}
this.ws.send(data);
}
close() {
this.isManuallyClosed = true;
this.stopHeartbeat();
/**
* Close WebSocket connection
*/
close(): void {
if (this.isClosed) {
return;
}
this.isClosed = true;
this.ws?.close();
}
/**
* Check if WebSocket is connected
*/
isConnected(): boolean {
return this.ws?.readyState === WebSocket.OPEN;
return !this.isClosed && this.ws?.readyState === WebSocket.OPEN;
}
setAuthToken(token?: string) {
/**
* Update auth token
*/
setAuthToken(token?: string): void {
this.authToken = token;
}
}

View File

@ -16,10 +16,8 @@ export interface RawEnvelope {
// Cross-platform base64 encoding/decoding utilities
function base64Encode(str: string): string {
if (typeof Buffer !== "undefined") {
// Node.js environment
return Buffer.from(str).toString("base64");
} else if (typeof btoa !== "undefined") {
// Browser/React Native environment
return btoa(
encodeURIComponent(str).replace(/%([0-9A-F]{2})/g, (match, p1) =>
String.fromCharCode(parseInt(p1, 16))
@ -31,10 +29,8 @@ function base64Encode(str: string): string {
function base64EncodeBytes(bytes: Uint8Array): string {
if (typeof Buffer !== "undefined") {
// Node.js environment
return Buffer.from(bytes).toString("base64");
} else if (typeof btoa !== "undefined") {
// Browser/React Native environment
let binary = "";
for (let i = 0; i < bytes.length; i++) {
binary += String.fromCharCode(bytes[i]);
@ -46,10 +42,8 @@ function base64EncodeBytes(bytes: Uint8Array): string {
function base64Decode(b64: string): string {
if (typeof Buffer !== "undefined") {
// Node.js environment
return Buffer.from(b64, "base64").toString("utf-8");
} else if (typeof atob !== "undefined") {
// Browser/React Native environment
const binary = atob(b64);
const bytes = new Uint8Array(binary.length);
for (let i = 0; i < binary.length; i++) {
@ -63,8 +57,11 @@ function base64Decode(b64: string): string {
export type MessageHandler = (message: Message) => void;
export type ErrorHandler = (error: Error) => void;
export type CloseHandler = () => void;
export type RawMessageHandler = (envelope: RawEnvelope) => void;
/**
* Simple PubSub client - one WebSocket connection per topic
* No connection pooling, no reference counting - keep it simple
*/
export class PubSubClient {
private httpClient: HttpClient;
private wsConfig: Partial<WSClientConfig>;
@ -75,23 +72,18 @@ export class PubSubClient {
}
/**
* Publish a message to a topic.
* Publish a message to a topic via HTTP
*/
async publish(topic: string, data: string | Uint8Array): Promise<void> {
let dataBase64: string;
if (typeof data === "string") {
dataBase64 = base64Encode(data);
} else {
// Encode bytes directly to preserve binary data
dataBase64 = base64EncodeBytes(data);
}
console.log("[PubSubClient] Publishing message:", {
topic,
data: typeof data === "string" ? data : `<${data.length} bytes>`,
});
console.log("[PubSubClient] Publishing to topic:", topic);
// Use longer timeout for pub/sub operations (60s instead of default 30s)
await this.httpClient.post(
"/v1/pubsub/publish",
{
@ -99,13 +91,13 @@ export class PubSubClient {
data_base64: dataBase64,
},
{
timeout: 60000, // 60 seconds
timeout: 30000,
}
);
}
/**
* List active topics in the current namespace.
* List active topics in the current namespace
*/
async topics(): Promise<string[]> {
const response = await this.httpClient.get<{ topics: string[] }>(
@ -115,8 +107,8 @@ export class PubSubClient {
}
/**
* Subscribe to a topic via WebSocket.
* Returns a subscription object with event handlers.
* Subscribe to a topic via WebSocket
* Creates one WebSocket connection per topic
*/
async subscribe(
topic: string,
@ -124,19 +116,24 @@ export class PubSubClient {
onMessage?: MessageHandler;
onError?: ErrorHandler;
onClose?: CloseHandler;
onRaw?: RawMessageHandler;
} = {}
): Promise<Subscription> {
// Build WebSocket URL for this topic
const wsUrl = new URL(this.wsConfig.wsURL || "ws://localhost:6001");
wsUrl.pathname = "/v1/pubsub/ws";
wsUrl.searchParams.set("topic", topic);
// Create WebSocket client
const wsClient = new WSClient({
...this.wsConfig,
wsURL: wsUrl.toString(),
authToken: this.httpClient.getToken(),
});
console.log("[PubSubClient] Connecting to topic:", topic);
await wsClient.connect();
// Create subscription wrapper
const subscription = new Subscription(wsClient, topic);
if (handlers.onMessage) {
@ -148,93 +145,61 @@ export class PubSubClient {
if (handlers.onClose) {
subscription.onClose(handlers.onClose);
}
if (handlers.onRaw) {
subscription.onRaw(handlers.onRaw);
}
await wsClient.connect();
return subscription;
}
}
/**
* Subscription represents an active WebSocket subscription to a topic
*/
export class Subscription {
private wsClient: WSClient;
private topic: string;
private messageHandlers: Set<MessageHandler> = new Set();
private errorHandlers: Set<ErrorHandler> = new Set();
private closeHandlers: Set<CloseHandler> = new Set();
private rawHandlers: Set<RawMessageHandler> = new Set();
private isClosed = false;
private wsMessageHandler: ((data: string) => void) | null = null;
private wsErrorHandler: ((error: Error) => void) | null = null;
private wsCloseHandler: (() => void) | null = null;
constructor(wsClient: WSClient, topic: string) {
this.wsClient = wsClient;
this.topic = topic;
this.wsClient.onMessage((data) => {
// Register message handler
this.wsMessageHandler = (data) => {
try {
// Parse gateway JSON envelope: {data: base64String, timestamp, topic}
let envelope: RawEnvelope;
try {
envelope = JSON.parse(data);
const envelope: RawEnvelope = JSON.parse(data);
// Validate envelope structure
if (!envelope || typeof envelope !== "object") {
throw new Error("Invalid envelope: not an object");
}
if (!envelope.data || typeof envelope.data !== "string") {
throw new Error("Invalid envelope: missing or invalid data field");
}
if (!envelope.topic || typeof envelope.topic !== "string") {
throw new Error("Invalid envelope: missing or invalid topic field");
}
if (typeof envelope.timestamp !== "number") {
throw new Error(
"Invalid envelope: missing or invalid timestamp field"
);
}
// Validate topic matches subscription
if (envelope.topic !== this.topic) {
console.warn(
`[Subscription] Topic mismatch: expected ${this.topic}, got ${envelope.topic}`
);
}
} catch (parseError) {
console.error("[Subscription] Failed to parse envelope:", parseError);
this.errorHandlers.forEach((handler) =>
handler(
parseError instanceof Error
? parseError
: new Error(String(parseError))
)
);
return;
// Validate envelope structure
if (!envelope || typeof envelope !== "object") {
throw new Error("Invalid envelope: not an object");
}
if (!envelope.data || typeof envelope.data !== "string") {
throw new Error("Invalid envelope: missing or invalid data field");
}
if (!envelope.topic || typeof envelope.topic !== "string") {
throw new Error("Invalid envelope: missing or invalid topic field");
}
if (typeof envelope.timestamp !== "number") {
throw new Error(
"Invalid envelope: missing or invalid timestamp field"
);
}
// Call raw handlers for debugging
this.rawHandlers.forEach((handler) => handler(envelope));
// Decode base64 data
let messageData: string;
try {
messageData = base64Decode(envelope.data);
} catch (decodeError) {
console.error("[Subscription] Base64 decode failed:", decodeError);
this.errorHandlers.forEach((handler) =>
handler(
decodeError instanceof Error
? decodeError
: new Error(String(decodeError))
)
);
return;
}
const messageData = base64Decode(envelope.data);
const message: Message = {
topic: envelope.topic,
data: messageData,
timestamp: envelope.timestamp,
};
console.log("[Subscription] Received message:", message);
console.log("[Subscription] Received message on topic:", this.topic);
this.messageHandlers.forEach((handler) => handler(message));
} catch (error) {
console.error("[Subscription] Error processing message:", error);
@ -242,42 +207,83 @@ export class Subscription {
handler(error instanceof Error ? error : new Error(String(error)))
);
}
});
};
this.wsClient.onError((error) => {
this.wsClient.onMessage(this.wsMessageHandler);
// Register error handler
this.wsErrorHandler = (error) => {
this.errorHandlers.forEach((handler) => handler(error));
});
};
this.wsClient.onError(this.wsErrorHandler);
this.wsClient.onClose(() => {
// Register close handler
this.wsCloseHandler = () => {
this.closeHandlers.forEach((handler) => handler());
});
};
this.wsClient.onClose(this.wsCloseHandler);
}
onMessage(handler: MessageHandler) {
/**
* Register message handler
*/
onMessage(handler: MessageHandler): () => void {
this.messageHandlers.add(handler);
return () => this.messageHandlers.delete(handler);
}
onError(handler: ErrorHandler) {
/**
* Register error handler
*/
onError(handler: ErrorHandler): () => void {
this.errorHandlers.add(handler);
return () => this.errorHandlers.delete(handler);
}
onClose(handler: CloseHandler) {
/**
* Register close handler
*/
onClose(handler: CloseHandler): () => void {
this.closeHandlers.add(handler);
return () => this.closeHandlers.delete(handler);
}
onRaw(handler: RawMessageHandler) {
this.rawHandlers.add(handler);
return () => this.rawHandlers.delete(handler);
}
/**
* Close subscription and underlying WebSocket
*/
close(): void {
if (this.isClosed) {
return;
}
this.isClosed = true;
close() {
// Remove handlers from WSClient
if (this.wsMessageHandler) {
this.wsClient.offMessage(this.wsMessageHandler);
this.wsMessageHandler = null;
}
if (this.wsErrorHandler) {
this.wsClient.offError(this.wsErrorHandler);
this.wsErrorHandler = null;
}
if (this.wsCloseHandler) {
this.wsClient.offClose(this.wsCloseHandler);
this.wsCloseHandler = null;
}
// Clear all local handlers
this.messageHandlers.clear();
this.errorHandlers.clear();
this.closeHandlers.clear();
// Close WebSocket connection
this.wsClient.close();
}
/**
* Check if subscription is active
*/
isConnected(): boolean {
return this.wsClient.isConnected();
return !this.isClosed && this.wsClient.isConnected();
}
}