Skip to content

Examples

Code Scratcher edited this page Oct 12, 2025 · 63 revisions

Basic Example

Below is the most basic example of using the library:

import {PgListener} from 'pg-listener';

const ls = new PgListener({pgp, db}); // properties are from pg-promise

await ls.listen(['channel_1', 'channel_2'], {
    onMessage(msg) {
        console.log(msg); // print notification message
    }
});

RxJs Observables

UPDATE: This page was created before the library was extended with createIterable, which you can now use instead, to create an iterable, and pass it into any consumer, like RxJs, so the extension like further down is not as necessary anymore:

import {from} from 'rxjs';

const r = await ls.listen(['channel_1', 'channel_2']);

from(r.createIterable())
    .subscribe(msg => {
        console.log(msg);
    });

This is an RxJs extension for the library, based on RxJs v7.8.2, to handle channel listening through observables.

See also: pg-rx-listen - a later alternative as a pure RxJs solution. Note that at the time of writing, it is not as reliable as pg-listener here, which has been battle-tested, along with the RxJs extension here.

Below, we add class PgListenerExt that extends the base PgListener with method listenAndObserve, to return an Observable that handles all aspects of error + connectivity handling and un-subscribe logic, i.e. it will automatically call cancel when no more subscribers left.

import {defer, finalize, Observable, Subject} from 'rxjs';
import {IListenMessage, IListenResult, PgListener} from 'pg-listener';
import {IConnected, ILostContext} from 'pg-promise';

/**
 * Options for listening and observing events, as used by `PgListenerExt.listenAndObserve()`.
 */
interface IListenObserveOptions {
    /**
     * If true, establishing connection and starting to listen
     * will be deferred until the first subscription.
     */
    defer?: boolean;
    /**
     * If true, method `cancel` will execute `UNLISTEN` for all channels.
     */
    unlisten?: boolean;
    /**
     * Callback for the resolved/connected state, with the `result` object.
     */
    onResult?: (result: IListenResult) => void;
    /**
     * Callback for any error encountered by the `cancel` method.
     */
    onCancelError?: (err: any) => void;
    /**
     * Callback for connection events, same as for `PgListener.listen()`.
     */
    onConnected?: (con: IConnected<{}, any>, count: number) => void;
    /**
     * Callback for disconnection events, same as for `PgListener.listen()`.
     */
    onDisconnected?: (err: any, ctx: ILostContext) => void;
    /**
     * Callback for failed reconnect events, same as for `PgListener.listen()`.
     */
    onFailedReconnect?: (err: any) => void;
}

/**
 * Extends `PgListener` with a method for observing `LISTEN` notifications,
 * and automatic `cancel` call when no more subscribers left.
 */
class PgListenerExt extends PgListener {

    /**
     * Subscribes to the specified channels and returns an observable to listen to incoming notification messages.
     *
     * It also extends event options and properties of method `listen` of the base class.
     *
     * The returned observable will automatically call `cancel` when no more subscribers left.
     *
     * @param {string[]} channels - An array of channel names to listen to.
     * @param {IListenObserveOptions} [options] - Configuration + event-handling options.
     *
     * @return {Observable<IListenMessage>} An observable that emits messages from the subscribed channels.
     */
    listenAndObserve(channels: string[], options?: IListenObserveOptions): Observable<IListenMessage> {
        const s = new Subject<IListenMessage>();
        let result: IListenResult | undefined;
        const start = () => {
            this.listen(channels, {
                onMessage(msg) {
                    s.next(msg);
                },
                onConnected(con, count) {
                    options?.onConnected?.(con, count);
                },
                onDisconnected(err, ctx) {
                    options?.onDisconnected?.(err, ctx);
                },
                onFailedReconnect(err) {
                    options?.onFailedReconnect?.(err);
                    s.error(err);
                }
            }).then(r => {
                result = r;
                const c = r.cancel;
                r.cancel = (unlisten?: boolean) => {
                    s.complete();
                    return c.call(r, unlisten);
                }
                options?.onResult?.(r);
            }).catch(err => s.error(err));
            return s.pipe(finalize(() => {
                if (!s.observed) {
                    // zero-timeout is necessary when "unsubscribe" is called from inside "subscribe",
                    // or else the connection may close while a notification is still resolving,
                    // which will result in error: Querying against a released or lost connection.
                    setTimeout(() => {
                        result?.cancel(!!options?.unlisten).catch(err => {
                            options?.onCancelError?.(err);
                        });
                    });
                }
            }));
        }
        let deferredObs: Observable<IListenMessage> | undefined;
        return options?.defer ? defer(() => deferredObs ??= start()) : start();
    }
}

In a usage example below, we unsubscribe after just 1 message, to show that cancel(true) is invoked just as we unsubscribe, and the process ends without delay, as the connection closes correctly too.

const ls = new PgListenerExt({pgp, db});

const channels = ['channel_1', 'channel_2']; // channels to listen to

const obs = ls.listenAndObserve(channels, {unlisten: true});

const sub = obs.subscribe(msg => {
    console.log(msg);
    sub.unsubscribe(); // unsubscribe after 1 message
});

// or, you can split it into one observable per channel instead:
const [obs1, obs2] = channels.map(c => obs.pipe(filter(a => a.channel === c)));

const sub1 = obs1.subscribe(msg => {/* channel_1 handler */});
const sub2 = obs2.subscribe(msg => {/* channel_2 handler */});
// just make sure to unsubscribe from them all, or "cancel" won't be called automatically.

The example below will send a notification just as it is connected, receive it and unsubscribe at once:

const ls = new PgListenerExt({pgp, db});

const obs = ls.listenAndObserve(['channel_1'], {
    async onResult(r) {
        await r.notify(['channel_1'], 'Hello World!');
    }
});

const sub = obs.subscribe(msg => {
    console.log('RECEIVED:', msg);
    sub.unsubscribe();
});

Output:

RECEIVED: {
  channel: 'channel_1',
  length: 31,
  payload: 'Hello World!',
  processId: 13572
}

Clone this wiki locally