Enhance WebSocket error handling and add base64 encoding/decoding utilities in PubSubClient; refactor message publishing logic to support base64 data

This commit is contained in:
anonpenguin23 2025-10-27 09:01:14 +02:00
parent acf9540daa
commit eab542952e
2 changed files with 74 additions and 17 deletions

View File

@ -56,7 +56,9 @@ export class WSClient {
const timeout = setTimeout(() => {
this.ws?.close();
reject(new SDKError("WebSocket connection timeout", 408, "WS_TIMEOUT"));
reject(
new SDKError("WebSocket connection timeout", 408, "WS_TIMEOUT")
);
}, this.timeout);
this.ws.addEventListener("open", () => {
@ -72,13 +74,9 @@ export class WSClient {
});
this.ws.addEventListener("error", (event: Event) => {
console.error("[WSClient] WebSocket error:", event);
clearTimeout(timeout);
const error = new SDKError(
"WebSocket error",
500,
"WS_ERROR",
event
);
const error = new SDKError("WebSocket error", 500, "WS_ERROR", event);
this.errorHandlers.forEach((handler) => handler(error));
});
@ -99,7 +97,7 @@ export class WSClient {
private buildWSUrl(): string {
let url = this.url;
// Always append auth token as query parameter for compatibility
// Works in both Node.js and browser environments
if (this.authToken) {
@ -107,7 +105,7 @@ export class WSClient {
const paramName = this.authToken.startsWith("ak_") ? "api_key" : "token";
url += `${separator}${paramName}=${encodeURIComponent(this.authToken)}`;
}
return url;
}
@ -157,11 +155,7 @@ export class WSClient {
send(data: string) {
if (this.ws?.readyState !== WebSocket.OPEN) {
throw new SDKError(
"WebSocket is not connected",
500,
"WS_NOT_CONNECTED"
);
throw new SDKError("WebSocket is not connected", 500, "WS_NOT_CONNECTED");
}
this.ws.send(data);
}

View File

@ -7,6 +7,38 @@ export interface Message {
timestamp?: number;
}
// Cross-platform base64 encoding/decoding utilities
function base64Encode(str: string): string {
if (typeof Buffer !== "undefined") {
// Node.js environment
return Buffer.from(str).toString("base64");
} else if (typeof btoa !== "undefined") {
// Browser/React Native environment
return btoa(
encodeURIComponent(str).replace(/%([0-9A-F]{2})/g, (match, p1) =>
String.fromCharCode(parseInt(p1, 16))
)
);
}
throw new Error("No base64 encoding method available");
}
function base64Decode(b64: string): string {
if (typeof Buffer !== "undefined") {
// Node.js environment
return Buffer.from(b64, "base64").toString("utf-8");
} else if (typeof atob !== "undefined") {
// Browser/React Native environment
const binary = atob(b64);
const bytes = new Uint8Array(binary.length);
for (let i = 0; i < binary.length; i++) {
bytes[i] = binary.charCodeAt(i);
}
return new TextDecoder().decode(bytes);
}
throw new Error("No base64 decoding method available");
}
export type MessageHandler = (message: Message) => void;
export type ErrorHandler = (error: Error) => void;
export type CloseHandler = () => void;
@ -24,8 +56,15 @@ export class PubSubClient {
* Publish a message to a topic.
*/
async publish(topic: string, data: string | Uint8Array): Promise<void> {
const dataBase64 =
typeof data === "string" ? Buffer.from(data).toString("base64") : Buffer.from(data).toString("base64");
let dataBase64: string;
if (typeof data === "string") {
dataBase64 = base64Encode(data);
} else {
// Convert Uint8Array to string first
const decoder = new TextDecoder();
const str = decoder.decode(data);
dataBase64 = base64Encode(str);
}
await this.httpClient.post("/v1/pubsub/publish", {
topic,
@ -95,13 +134,37 @@ export class Subscription {
this.wsClient.onMessage((data) => {
try {
let messageData = data;
// Parse gateway JSON envelope: {data: base64String, timestamp, topic}
try {
const envelope = JSON.parse(data);
if (envelope.data && typeof envelope.data === "string") {
// The gateway sends base64-encoded data in the 'data' field
// Decode it back to the original string
try {
messageData = base64Decode(envelope.data);
} catch (decodeError) {
console.error(
"[Subscription] Base64 decode failed:",
decodeError
);
// If base64 decode fails, use the envelope data as-is
messageData = envelope.data;
}
}
} catch (parseError) {
// If not JSON, use the data as-is
}
const message: Message = {
topic: this.topic,
data: data,
data: messageData,
timestamp: Date.now(),
};
this.messageHandlers.forEach((handler) => handler(message));
} catch (error) {
console.error("[Subscription] Error processing message:", error);
this.errorHandlers.forEach((handler) =>
handler(error instanceof Error ? error : new Error(String(error)))
);