Skip to content

Instantly share code, notes, and snippets.

@vitaly-t
Last active September 13, 2025 17:27
Show Gist options
  • Save vitaly-t/57cfddf3436167a98eb7fd164c4eff13 to your computer and use it in GitHub Desktop.
Save vitaly-t/57cfddf3436167a98eb7fd164c4eff13 to your computer and use it in GitHub Desktop.
import pgPromise, {IConnected, ILostContext} from 'pg-promise';
import {retryAsync, RetryOptions} from './retry-async';
const pgp = pgPromise();
const db = pgp({
user: 'postgres',
password: '########',
database: 'postgres',
port: 5432,
allowExitOnIdle: true,
keepAlive: true
});
interface IListenMessage {
channel: string;
payload: string;
processId: number;
}
interface IListenResult {
cancel: (unlisten?: boolean) => Promise<boolean>;
}
interface IListenOptions extends RetryOptions {
onMessage: (msg: IListenMessage) => void;
onConnected?: (con: IConnected<{}, any>, count: number) => void;
onDisconnected?: (err: any, ctx: ILostContext) => void;
onFailedReconnect?: (err: any) => void;
}
async function listen(channels: string[], opt?: IListenOptions): Promise<IListenResult> {
const handler = (m: IListenMessage) => opt?.onMessage({
channel: m.channel,
payload: m.payload,
processId: m.processId
});
let count = 0;
let con: IConnected<{}, any> | null = null;
const reconnect = async () => {
con = await db.connect({
direct: true,
onLost(err, ctx) {
con = null;
ctx.client.removeListener('notification', handler);
opt?.onDisconnected?.(err, ctx);
retryAsync(reconnect, opt).catch(err => opt?.onFailedReconnect?.(err));
}
});
con.client.on('notification', handler);
await con.multi(pgp.helpers.concat(channels.map(a => ({
query: 'LISTEN $1:name',
values: [a]
}))));
opt?.onConnected?.(con, ++count);
};
await retryAsync(reconnect, opt);
return {
async cancel(unlisten = false): Promise<boolean> {
if (con) {
con.client.removeListener('notification', handler);
if (unlisten) {
await con.multi(pgp.helpers.concat(channels.map(a => ({
query: 'UNLISTEN $1:name',
values: [a]
}))));
}
con.done();
con = null;
return true;
}
return false;
}
};
}
const listenOptions: IListenOptions = {
onMessage(msg: IListenMessage) {
console.log(msg); // log messages from 2 channels
},
onConnected(con: IConnected<{}, any>, count: number) {
console.log(`*** Connected: ${count} time(s) ***`);
},
onDisconnected(err: any, ctx: ILostContext) {
console.log('*** Disconnected:', err.message);
},
onFailedReconnect(err: any) {
// Listening Terminated: cannot reconnect
console.error('*** Reconnect Failed:', err.message);
},
// Retry-Async Options:
delay: s => (s.index + 1) * 1000, // +1s delay for each retry
retry: 5 // retry up to 5 times
};
listen(['channel_1', 'channel_2'], listenOptions)
.then((r: IListenResult) => {
console.log('*** Initial Connection ***');
})
.catch((e: any) => {
console. Error('Initial Connection Failed:', e.message);
});
@vitaly-t
Copy link
Author

vitaly-t commented Sep 13, 2025

Above, we have retryAsync fully integrated into the listen logic, so you can pass retry options into listen via its own options, and thus control retries for both initial connection and reconnections.

It is the most complete and reliable approach to handling connectivity-related issues.

See also: the original solution, without retryAsync.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment