Enhance README with detailed Pub/Sub messaging features, including multi-subscriber support and message interface; improve HttpClient with per-request timeout handling; update PubSubClient to support raw message handling and validate message envelopes.

This commit is contained in:
anonpenguin23 2025-10-27 17:30:49 +02:00
parent eab542952e
commit 2de0cb1983
3 changed files with 177 additions and 29 deletions

View File

@ -122,10 +122,23 @@ const results = await client.db.transaction([
### Pub/Sub Messaging ### Pub/Sub Messaging
The SDK provides a robust pub/sub client with:
- **Multi-subscriber support**: Multiple connections can subscribe to the same topic
- **Namespace isolation**: Topics are scoped to your authenticated namespace
- **Server timestamps**: Messages preserve server-side timestamps
- **Binary-safe**: Supports both string and binary (`Uint8Array`) payloads
- **Strict envelope validation**: Type-safe message parsing with error handling
#### Publish a Message #### Publish a Message
```typescript ```typescript
// Publish a string message
await client.pubsub.publish("notifications", "Hello, Network!"); await client.pubsub.publish("notifications", "Hello, Network!");
// Publish binary data
const binaryData = new Uint8Array([1, 2, 3, 4]);
await client.pubsub.publish("binary-topic", binaryData);
``` ```
#### Subscribe to Topics #### Subscribe to Topics
@ -133,7 +146,9 @@ await client.pubsub.publish("notifications", "Hello, Network!");
```typescript ```typescript
const subscription = await client.pubsub.subscribe("notifications", { const subscription = await client.pubsub.subscribe("notifications", {
onMessage: (msg) => { onMessage: (msg) => {
console.log("Received:", msg.data); console.log("Topic:", msg.topic);
console.log("Data:", msg.data);
console.log("Server timestamp:", new Date(msg.timestamp));
}, },
onError: (err) => { onError: (err) => {
console.error("Subscription error:", err); console.error("Subscription error:", err);
@ -147,6 +162,52 @@ const subscription = await client.pubsub.subscribe("notifications", {
subscription.close(); subscription.close();
``` ```
**Message Interface:**
```typescript
interface Message {
data: string; // Decoded message payload (string)
topic: string; // Topic name
timestamp: number; // Server timestamp in milliseconds
}
```
#### Debug Raw Envelopes
For debugging, you can inspect raw message envelopes before decoding:
```typescript
const subscription = await client.pubsub.subscribe("notifications", {
onMessage: (msg) => {
console.log("Decoded message:", msg.data);
},
onRaw: (envelope) => {
console.log("Raw envelope:", envelope);
// { data: "base64...", timestamp: 1234567890, topic: "notifications" }
},
});
```
#### Multi-Subscriber Support
Multiple subscriptions to the same topic are supported. Each receives its own copy of messages:
```typescript
// First subscriber
const sub1 = await client.pubsub.subscribe("events", {
onMessage: (msg) => console.log("Sub1:", msg.data),
});
// Second subscriber (both receive messages)
const sub2 = await client.pubsub.subscribe("events", {
onMessage: (msg) => console.log("Sub2:", msg.data),
});
// Unsubscribe independently
sub1.close(); // sub2 still active
sub2.close(); // fully unsubscribed
```
#### List Topics #### List Topics
```typescript ```typescript

View File

@ -56,6 +56,7 @@ export class HttpClient {
body?: any; body?: any;
headers?: Record<string, string>; headers?: Record<string, string>;
query?: Record<string, string | number | boolean>; query?: Record<string, string | number | boolean>;
timeout?: number; // Per-request timeout override
} = {} } = {}
): Promise<T> { ): Promise<T> {
const url = new URL(this.baseURL + path); const url = new URL(this.baseURL + path);
@ -71,17 +72,25 @@ export class HttpClient {
...options.headers, ...options.headers,
}; };
const controller = new AbortController();
const requestTimeout = options.timeout ?? this.timeout; // Use override or default
const timeoutId = setTimeout(() => controller.abort(), requestTimeout);
const fetchOptions: RequestInit = { const fetchOptions: RequestInit = {
method, method,
headers, headers,
signal: AbortSignal.timeout(this.timeout), signal: controller.signal,
}; };
if (options.body !== undefined) { if (options.body !== undefined) {
fetchOptions.body = JSON.stringify(options.body); fetchOptions.body = JSON.stringify(options.body);
} }
return this.requestWithRetry(url.toString(), fetchOptions); try {
return await this.requestWithRetry(url.toString(), fetchOptions);
} finally {
clearTimeout(timeoutId);
}
} }
private async requestWithRetry( private async requestWithRetry(

View File

@ -4,7 +4,13 @@ import { WSClient, WSClientConfig } from "../core/ws";
export interface Message { export interface Message {
data: string; data: string;
topic: string; topic: string;
timestamp?: number; timestamp: number;
}
export interface RawEnvelope {
data: string; // base64-encoded
timestamp: number;
topic: string;
} }
// Cross-platform base64 encoding/decoding utilities // Cross-platform base64 encoding/decoding utilities
@ -23,6 +29,21 @@ function base64Encode(str: string): string {
throw new Error("No base64 encoding method available"); throw new Error("No base64 encoding method available");
} }
function base64EncodeBytes(bytes: Uint8Array): string {
if (typeof Buffer !== "undefined") {
// Node.js environment
return Buffer.from(bytes).toString("base64");
} else if (typeof btoa !== "undefined") {
// Browser/React Native environment
let binary = "";
for (let i = 0; i < bytes.length; i++) {
binary += String.fromCharCode(bytes[i]);
}
return btoa(binary);
}
throw new Error("No base64 encoding method available");
}
function base64Decode(b64: string): string { function base64Decode(b64: string): string {
if (typeof Buffer !== "undefined") { if (typeof Buffer !== "undefined") {
// Node.js environment // Node.js environment
@ -42,6 +63,7 @@ function base64Decode(b64: string): string {
export type MessageHandler = (message: Message) => void; export type MessageHandler = (message: Message) => void;
export type ErrorHandler = (error: Error) => void; export type ErrorHandler = (error: Error) => void;
export type CloseHandler = () => void; export type CloseHandler = () => void;
export type RawMessageHandler = (envelope: RawEnvelope) => void;
export class PubSubClient { export class PubSubClient {
private httpClient: HttpClient; private httpClient: HttpClient;
@ -60,16 +82,26 @@ export class PubSubClient {
if (typeof data === "string") { if (typeof data === "string") {
dataBase64 = base64Encode(data); dataBase64 = base64Encode(data);
} else { } else {
// Convert Uint8Array to string first // Encode bytes directly to preserve binary data
const decoder = new TextDecoder(); dataBase64 = base64EncodeBytes(data);
const str = decoder.decode(data);
dataBase64 = base64Encode(str);
} }
await this.httpClient.post("/v1/pubsub/publish", { console.log("[PubSubClient] Publishing message:", {
topic, topic,
data_base64: dataBase64, data: typeof data === "string" ? data : `<${data.length} bytes>`,
}); });
// Use longer timeout for pub/sub operations (60s instead of default 30s)
await this.httpClient.post(
"/v1/pubsub/publish",
{
topic,
data_base64: dataBase64,
},
{
timeout: 60000, // 60 seconds
}
);
} }
/** /**
@ -92,6 +124,7 @@ export class PubSubClient {
onMessage?: MessageHandler; onMessage?: MessageHandler;
onError?: ErrorHandler; onError?: ErrorHandler;
onClose?: CloseHandler; onClose?: CloseHandler;
onRaw?: RawMessageHandler;
} = {} } = {}
): Promise<Subscription> { ): Promise<Subscription> {
const wsUrl = new URL(this.wsConfig.wsURL || "ws://localhost:6001"); const wsUrl = new URL(this.wsConfig.wsURL || "ws://localhost:6001");
@ -115,6 +148,9 @@ export class PubSubClient {
if (handlers.onClose) { if (handlers.onClose) {
subscription.onClose(handlers.onClose); subscription.onClose(handlers.onClose);
} }
if (handlers.onRaw) {
subscription.onRaw(handlers.onRaw);
}
await wsClient.connect(); await wsClient.connect();
return subscription; return subscription;
@ -127,6 +163,7 @@ export class Subscription {
private messageHandlers: Set<MessageHandler> = new Set(); private messageHandlers: Set<MessageHandler> = new Set();
private errorHandlers: Set<ErrorHandler> = new Set(); private errorHandlers: Set<ErrorHandler> = new Set();
private closeHandlers: Set<CloseHandler> = new Set(); private closeHandlers: Set<CloseHandler> = new Set();
private rawHandlers: Set<RawMessageHandler> = new Set();
constructor(wsClient: WSClient, topic: string) { constructor(wsClient: WSClient, topic: string) {
this.wsClient = wsClient; this.wsClient = wsClient;
@ -134,34 +171,70 @@ export class Subscription {
this.wsClient.onMessage((data) => { this.wsClient.onMessage((data) => {
try { try {
let messageData = data;
// Parse gateway JSON envelope: {data: base64String, timestamp, topic} // Parse gateway JSON envelope: {data: base64String, timestamp, topic}
let envelope: RawEnvelope;
try { try {
const envelope = JSON.parse(data); envelope = JSON.parse(data);
if (envelope.data && typeof envelope.data === "string") {
// The gateway sends base64-encoded data in the 'data' field // Validate envelope structure
// Decode it back to the original string if (!envelope || typeof envelope !== "object") {
try { throw new Error("Invalid envelope: not an object");
messageData = base64Decode(envelope.data); }
} catch (decodeError) { if (!envelope.data || typeof envelope.data !== "string") {
console.error( throw new Error("Invalid envelope: missing or invalid data field");
"[Subscription] Base64 decode failed:", }
decodeError if (!envelope.topic || typeof envelope.topic !== "string") {
); throw new Error("Invalid envelope: missing or invalid topic field");
// If base64 decode fails, use the envelope data as-is }
messageData = envelope.data; if (typeof envelope.timestamp !== "number") {
} throw new Error(
"Invalid envelope: missing or invalid timestamp field"
);
}
// Validate topic matches subscription
if (envelope.topic !== this.topic) {
console.warn(
`[Subscription] Topic mismatch: expected ${this.topic}, got ${envelope.topic}`
);
} }
} catch (parseError) { } catch (parseError) {
// If not JSON, use the data as-is console.error("[Subscription] Failed to parse envelope:", parseError);
this.errorHandlers.forEach((handler) =>
handler(
parseError instanceof Error
? parseError
: new Error(String(parseError))
)
);
return;
}
// Call raw handlers for debugging
this.rawHandlers.forEach((handler) => handler(envelope));
// Decode base64 data
let messageData: string;
try {
messageData = base64Decode(envelope.data);
} catch (decodeError) {
console.error("[Subscription] Base64 decode failed:", decodeError);
this.errorHandlers.forEach((handler) =>
handler(
decodeError instanceof Error
? decodeError
: new Error(String(decodeError))
)
);
return;
} }
const message: Message = { const message: Message = {
topic: this.topic, topic: envelope.topic,
data: messageData, data: messageData,
timestamp: Date.now(), timestamp: envelope.timestamp,
}; };
console.log("[Subscription] Received message:", message);
this.messageHandlers.forEach((handler) => handler(message)); this.messageHandlers.forEach((handler) => handler(message));
} catch (error) { } catch (error) {
console.error("[Subscription] Error processing message:", error); console.error("[Subscription] Error processing message:", error);
@ -195,6 +268,11 @@ export class Subscription {
return () => this.closeHandlers.delete(handler); return () => this.closeHandlers.delete(handler);
} }
onRaw(handler: RawMessageHandler) {
this.rawHandlers.add(handler);
return () => this.rawHandlers.delete(handler);
}
close() { close() {
this.wsClient.close(); this.wsClient.close();
} }