Skip to content

Instantly share code, notes, and snippets.

@erdesigns-eu
Last active January 3, 2025 09:06
Show Gist options
  • Save erdesigns-eu/62518bb66cd4fb4c99e2e11bb5c363a0 to your computer and use it in GitHub Desktop.
Save erdesigns-eu/62518bb66cd4fb4c99e2e11bb5c363a0 to your computer and use it in GitHub Desktop.
DDP Client in Typescript
// DDPClient.ts
import { EventEmitter } from 'events';
/**
* Enum representing the different types of DDP messages.
* @property CONNECT - Client initiates a connection.
* @property CONNECTED - Server acknowledges a successful connection.
* @property FAILED - Server indicates a failed connection attempt.
* @property SUBSCRIBE - Client subscribes to a publication.
* @property READY - Server indicates that a subscription is ready.
* @property UNSUBSCRIBE - Client unsubscribes from a publication.
* @property METHOD - Client invokes a Meteor method.
* @property RESULT - Server responds with the result of a method call.
* @property ADD - Server indicates a new document has been added to a collection.
* @property CHANGE - Server indicates a document in a collection has been changed.
* @property REMOVE - Server indicates a document has been removed from a collection.
* @property ERROR - Server indicates an error occurred.
*/
enum DDPMessageType {
CONNECT = 'connect',
CONNECTED = 'connected',
FAILED = 'failed',
SUBSCRIBE = 'sub',
READY = 'ready',
UNSUBSCRIBE = 'unsub',
METHOD = 'method',
RESULT = 'result',
ADD = 'added',
CHANGE = 'changed',
REMOVE = 'removed',
ERROR = 'error',
}
/**
* Message sent by the client to initiate a connection.
* @property msg - Message type (CONNECT).
* @property version - DDP protocol version.
* @property support - Supported DDP protocol versions.
*/
interface DDPConnectMessage {
msg: DDPMessageType.CONNECT;
version: string;
support: string[];
}
/**
* Message received from the server upon a successful connection.
* @property msg - Message type (CONNECTED).
* @property session - Session ID assigned by the server.
*/
interface DDPConnectedMessage {
msg: DDPMessageType.CONNECTED;
session: string;
}
/**
* Message received from the server when a connection attempt fails.
* @property msg - Message type (FAILED).
* @property version - DDP protocol version.
* @property support - Supported DDP protocol versions.
*/
interface DDPFailedMessage {
msg: DDPMessageType.FAILED;
version: string;
support: string[];
}
/**
* Message sent by the client to subscribe to a publication.
* @property msg - Message type (SUBSCRIBE).
* @property id - Subscription ID.
* @property name - Name of the publication to subscribe to.
* @property params - Parameters to pass to the publication.
*/
interface DDPSubscribeMessage {
msg: DDPMessageType.SUBSCRIBE;
id: string;
name: string;
params: any[];
}
/**
* Message received from the server indicating that a subscription is ready.
* @property msg - Message type (READY).
* @property subs - Subscription IDs that are ready.
* @property id - ID of the subscription that is ready.
*/
interface DDPReadyMessage {
msg: DDPMessageType.READY;
subs: string[];
id: string;
}
/**
* Message sent by the client to unsubscribe from a publication.
* @property msg - Message type (UNSUBSCRIBE).
* @property id - Subscription ID to unsubscribe from.
*/
interface DDPUnsubscribeMessage {
msg: DDPMessageType.UNSUBSCRIBE;
id: string;
}
/**
* Message sent by the client to invoke a Meteor method.
* @property msg - Message type (METHOD).
* @property method - Name of the method to call.
* @property params - Parameters to pass to the method.
* @property id - Method call ID.
*/
interface DDPMethodMessage {
msg: DDPMessageType.METHOD;
method: string;
params: any[];
id: string;
}
/**
* Message received from the server with the result of a method call.
* @property msg - Message type (RESULT).
* @property id - Method call ID.
* @property result - Result returned by the method call.
* @property error - Error returned by the method call.
*/
interface DDPResultMessage {
msg: DDPMessageType.RESULT;
id: string;
result?: any;
error?: DDPError;
}
/**
* Message received from the server indicating a new document has been added to a collection.
* @property msg - Message type (ADDED).
* @property collection - Name of the collection.
* @property id - ID of the document added.
* @property fields - Fields of the document added.
*/
interface DDPAddedMessage {
msg: DDPMessageType.ADD;
collection: string;
id: string;
fields: { [key: string]: any };
}
/**
* Message received from the server indicating a document in a collection has been changed.
* @property msg - Message type (CHANGED).
* @property collection - Name of the collection.
* @property id - ID of the document changed.
* @property fields - Fields of the document changed.
* @property cleared - Fields that have been removed from the document.
*/
interface DDPChangedMessage {
msg: DDPMessageType.CHANGE;
collection: string;
id: string;
fields?: { [key: string]: any };
cleared?: string[];
}
/**
* Message received from the server indicating a document has been removed from a collection.
* @property msg - Message type (REMOVED).
* @property collection - Name of the collection.
* @property id - ID of the document removed.
*/
interface DDPRemovedMessage {
msg: DDPMessageType.REMOVE;
collection: string;
id: string;
}
/**
* Interface representing a DDP-level error.
* @property error - Error type.
* @property reason - Human-readable error message.
* @property details - Additional error details.
*/
interface DDPError {
error: string;
reason?: string;
details?: string;
}
/**
* Union type encompassing all possible DDP messages.
*/
type DDPMessage =
| DDPConnectMessage
| DDPConnectedMessage
| DDPFailedMessage
| DDPSubscribeMessage
| DDPReadyMessage
| DDPUnsubscribeMessage
| DDPMethodMessage
| DDPResultMessage
| DDPAddedMessage
| DDPChangedMessage
| DDPRemovedMessage;
/**
* Configuration options for the DDPClient.
* @property endpoint - WebSocket endpoint for the Meteor server's DDP connection.
* @property autoReconnect - Automatically attempt to reconnect if the connection is lost.
* @property autoReconnectTimer - Time in milliseconds to wait before attempting to reconnect.
* @property maintainCollections - Maintain collections locally.
* @property ddpVersion - DDP protocol version to use.
* @property headers - Additional headers to send with the WebSocket connection.
* @property WebSocketFactory - Custom WebSocket constructor to use instead of the default WebSocket.
*/
interface DDPClientOptions {
endpoint: string;
autoReconnect?: boolean;
autoReconnectTimer?: number;
maintainCollections?: boolean;
ddpVersion?: string;
headers?: { [key: string]: string };
WebSocketFactory?: new (url: string, protocols?: string | string[]) => WebSocket;
}
/**
* Type definition for method call callbacks.
* @param error DDPError if the method call failed.
* @param result Result returned by the method call.
*/
type MethodCallback = (error: DDPError | null, result?: any) => void;
/**
* Type definition for subscription callbacks.
* @param ready Indicates whether the subscription is ready.
*/
type SubscriptionCallback = (ready: boolean) => void;
/**
* DDPClient is a TypeScript class that manages a DDP (Distributed Data Protocol) connection to a Meteor server.
* It handles connection management, subscriptions, method calls, and real-time data updates.
*/
class DDPClient extends EventEmitter {
/** WebSocket instance for DDP communication */
private socket: WebSocket | null = null;
/** Configuration options for the DDP client */
private options: DDPClientOptions;
/** Indicates whether the client is currently connected */
private connected: boolean = false;
/** Session ID assigned by the Meteor server upon successful connection */
private session: string = '';
/** Map of active subscriptions with their corresponding callbacks */
private subscriptions: Map<string, SubscriptionCallback> = new Map();
/** Map of active method calls with their corresponding callbacks */
private methods: Map<string, MethodCallback> = new Map();
/** Timer for scheduling reconnection attempts */
private reconnectTimer: NodeJS.Timeout | null = null;
/**
* Constructs a new DDPClient instance with the provided configuration options.
* @param options Configuration options for the DDP client.
*/
constructor(options: DDPClientOptions) {
// Call the parent class constructor
super();
// Merge user-provided options with default values
this.options = {
autoReconnect: true,
autoReconnectTimer: 5000,
maintainCollections: true,
ddpVersion: '1',
...options,
};
}
/**
* Establishes a WebSocket connection to the DDP server.
* Prevents multiple simultaneous connections.
* @returns void
*/
public connect(): void {
if (
this.socket &&
(this.socket.readyState === WebSocket.OPEN || this.socket.readyState === WebSocket.CONNECTING)
) {
return this.emit('warning', { message: 'DDPClient: Already connected or connecting.'});
}
// Initialize the WebSocket connection using the provided factory or default WebSocket
const protocol = 'meteor/' + (this.options.ddpVersion || '1');
const WebSocketConstructor = this.options.WebSocketFactory || WebSocket;
this.socket = new WebSocketConstructor(this.options.endpoint, protocol);
// Event handler for when the WebSocket connection is opened
this.socket.onopen = () => {
this.emit('socketOpen');
this.sendConnectMessage();
};
// Event handler for incoming messages from the server
this.socket.onmessage = (event) => {
this.emit('message', { data: event.data });
this.handleMessage(event.data);
};
// Event handler for WebSocket errors
this.socket.onerror = (error) => {
this.emit('error', { error });
};
// Event handler for when the WebSocket connection is closed
this.socket.onclose = (event) => {
this.connected = false;
this.emit('disconnected', { code: event.code, reason: event.reason });
// Attempt to reconnect if autoReconnect is enabled
if (this.options.autoReconnect) {
this.scheduleReconnect();
}
};
}
/**
* Closes the WebSocket connection and clears any pending reconnection attempts.
* @returns void
*/
public disconnect(): void {
if (this.socket) {
this.socket.close();
this.socket = null;
}
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
this.connected = false;
}
/**
* Subscribes to a Meteor publication.
* @param name The name of the publication to subscribe to.
* @param params Parameters to pass to the publication.
* @param callback Optional callback invoked when the subscription is ready.
* @returns The subscription ID.
*/
public subscribe(name: string, params: any[] = [], callback?: SubscriptionCallback): string {
const id = this.generateUUID();
const message: DDPSubscribeMessage = {
msg: DDPMessageType.SUBSCRIBE,
id,
name,
params,
};
// Store the callback associated with this subscription ID
if (callback) {
this.subscriptions.set(id, callback);
}
this.send(message);
return id;
}
/**
* Unsubscribes from a Meteor publication.
* @param id The subscription ID to unsubscribe from.
* @returns void
*/
public unsubscribe(id: string): void {
const message: DDPUnsubscribeMessage = {
msg: DDPMessageType.UNSUBSCRIBE,
id,
};
this.send(message);
this.subscriptions.delete(id);
}
/**
* Calls a Meteor method on the server.
* @param method The name of the method to call.
* @param params Parameters to pass to the method.
* @param callback Optional callback invoked with the result or error.
* @returns The method call ID.
*/
public call(method: string, params: any[] = [], callback?: MethodCallback): string {
const id = this.generateUUID();
const message: DDPMethodMessage = {
msg: DDPMessageType.METHOD,
method,
params,
id,
};
// Store the callback associated with this method call ID
if (callback) {
this.methods.set(id, callback);
}
this.send(message);
return id;
}
/**
* Handles incoming messages from the DDP server.
* Parses the message and delegates to the appropriate handler based on message type.
* @param data The raw message data received from the server.
* @returns void
*/
private handleMessage(data: any): void {
let message: DDPMessage;
try {
message = JSON.parse(data);
} catch (error) {
this.emit('error', { error, data});
return;
}
if (!message.msg) {
this.emit('warning', { message: `DDPClient: Received message without msg field`, data });
return;
}
// Delegate message handling based on message type
switch (message.msg) {
case DDPMessageType.CONNECTED:
this.handleConnected(message as DDPConnectedMessage);
break;
case DDPMessageType.FAILED:
this.handleFailed(message as DDPFailedMessage);
break;
case DDPMessageType.READY:
this.handleReady(message as DDPReadyMessage);
break;
case DDPMessageType.RESULT:
this.handleResult(message as DDPResultMessage);
break;
case DDPMessageType.ADD:
case DDPMessageType.CHANGE:
case DDPMessageType.REMOVE:
this.emit('collectionChange', { message });
break;
case DDPMessageType.ERROR:
this.handleDDPError(message as DDPError);
break;
default:
this.emit('warning', { message: `DDPClient: Received unknown message type: ${message.msg}`, data });
}
}
/**
* Handles the 'connected' message from the server.
* Updates the session ID and emits a 'connected' event.
* @param message The connected message received from the server.
* @returns void
*/
private handleConnected(message: DDPConnectedMessage): void {
this.connected = true;
this.session = message.session;
this.emit('connected', { session: this.session });
}
/**
* Handles the 'failed' message from the server.
* Emits a 'failed' event and disconnects the client.
* @param message The failed message received from the server.
* @returns void
*/
private handleFailed(message: DDPFailedMessage): void {
this.emit('failed', { message});
this.connected = false;
this.disconnect();
}
/**
* Handles the 'ready' message indicating that a subscription is ready.
* Invokes the associated subscription callback.
* @param message The ready message received from the server.
* @returns void
*/
private handleReady(message: DDPReadyMessage): void {
const { subs, id } = message;
// Invoke the subscription callback if it exists
const callback = this.subscriptions.get(id);
if (callback) {
callback(true);
this.subscriptions.delete(id);
}
this.emit('ready', { subs });
}
/**
* Handles the 'result' message containing the result of a method call.
* Invokes the associated method callback with the result or error.
* @param message The result message received from the server.
* @returns void
*/
private handleResult(message: DDPResultMessage): void {
const { id, result, error } = message;
const callback = this.methods.get(id);
if (callback) {
if (error) {
callback(error, undefined);
} else {
callback(null, result);
}
this.methods.delete(id);
}
}
/**
* Handles DDP-level errors by emitting an 'error' event.
* @param error The DDPError received from the server.
* @returns void
*/
private handleDDPError(error: DDPError): void {
this.emit('error', { error });
}
/**
* Sends a message to the DDP server over the WebSocket connection.
* Ensures that the connection is open before sending.
* @param message The message object to send.
* @returns void
*/
private send(message: any): void {
if (this.socket && this.connected) {
try {
this.socket.send(JSON.stringify(message));
} catch (error) {
this.emit('error', { error, message });
}
} else {
this.emit('warning', { message: `DDPClient: Cannot send message, not connected`, data: message });
}
}
/**
* Sends the initial 'connect' message to the server to initiate the DDP handshake.
* @returns void
*/
private sendConnectMessage(): void {
const message: DDPConnectMessage = {
msg: DDPMessageType.CONNECT,
version: this.options.ddpVersion || '1',
support: ['1', 'pre2', 'pre3'],
};
this.send(message);
}
/**
* Generates a UUID v4 string.
* This implementation does not rely on external packages.
* @returns A UUID v4 string.
*/
private generateUUID(): string {$
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (c) => {
const r = (Math.random() * 16) | 0;
const v = c === 'x' ? r : (r & 0x3) | 0x8;
return v.toString(16);
});
}
/**
* Schedules a reconnection attempt after a specified delay.
* Prevents multiple simultaneous reconnection attempts.
* @returns void
*/
private scheduleReconnect(): void {
if (this.reconnectTimer) {
return; // Reconnection already scheduled
}
this.emit('reconnecting', { delay: this.options.autoReconnectTimer });
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this.connect();
}, this.options.autoReconnectTimer);
}
}
export default DDPClient;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment