How to correctly listen for a postgresql notification from node.js

4.7k Views Asked by At

Objective:
When a new record is inserted into a specific PostgreSQL table, I would like PostgreSQL to notify my node.js web application so that it can initiate an API call to an external service.

I understand the basic steps are:

  1. Establish a PostgreSQL trigger function which will perform pg_notify() method.
  2. Establish a PostgreSQL trigger which will execute the trigger function after table insert.
  3. Establish a mechanism in node.js to listen to channel-specific PostgreSQL notification.

Here is my attempt at each step:

  1. Trigger function in notify_app_after_table_insert.pgsql

    CREATE OR REPLACE FUNCTION notify_app_after_table_insert()
    RETURNS TRIGGER AS
    $BODY$
        BEGIN
            PERFORM pg_notify('channel', row_to_json(NEW)::text);
            RETURN new;
        END;
    $BODY$
    LANGUAGE plpgsql
    
  2. Trigger in trigger_notify_app_after_table_insert.sql

    CREATE TRIGGER trigger_notify_app_after_table_insert
    AFTER INSERT
    ON table
    FOR EACH ROW
    EXECUTE PROCEDURE notify_app_after_table_insert();
    
  3. Listener mechanism in index.js (inside my web app's backend)

    //tools
    const express = require('express');
    const app = express();
    const cors = require('cors');
    const bodyParser = require('body-parser');
    const port = 3001;
    const pool = require('./db'); //stores my postgresql credentials
    
    // Middleware
    app.use(cors())
    app.use(bodyParser.json())
    app.use(bodyParser.urlencoded({extended: true}))
    
    // Apply app.listen notification to console.log
    app.listen(port, () => {
        console.log(`App running on port ${port}.`)
    })
    
    // Apply channel-specific listener mechanism
    pool.connect(function(err, client, done) {
        if(err) {
            console.log(err);
        }
        client.on('notification', function(msg) {
            console.log(msg);
        })
        client.query("LISTEN channel");
        done();
    });
    

Problem:
When the backend web-app server is running and a new record is inserted in the db table, I expect to see a notification message in my web-app's terminal, but nothing appears. I suspect the problem is in the last code block of index.js, but haven't been able to isolate it.

Any suggestions on how to correctly receive the notification in index.js? Thanks in advance.

2

There are 2 best solutions below

3
Goga Okradze On

I think this is because of order. Write like this:

client.query("LISTEN channel");
client.on('notification', function(msg) {
  console.log(msg);
})

For me querying for LISTEN first worked.

0
void symphony On

I had the same problem and I decided to use pg-listen (https://github.com/andywer/pg-listen). Here's my implementation:

PG:

CREATE TABLE active.events(
  uid UUID DEFAULT gen_random_uuid(),
  created_ts TIMESTAMP DEFAULT NOW(),
  consumed_ts TIMESTAMP NULL,
  origin VARCHAR(200) NOT NULL,
  channel VARCHAR(200) NOT NULL,
  type VARCHAR(50) NOT NULL,
  path VARCHAR(200) NOT NULL,
  payload JSONB NOT NULL,
  result JSONB,
  CONSTRAINT events_pkey PRIMARY KEY(uid),
  CONSTRAINT events_ukey UNIQUE(uid)
);
CREATE INDEX ON active.events(uid);
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE active.events TO _pg_mb_rl;
ALTER TABLE active.events OWNER TO _pg_mb_rl;

-- TRIGGER
CREATE OR REPLACE FUNCTION active.tg_notify_events()
 RETURNS TRIGGER
 LANGUAGE PLPGSQL
AS $tg_notify_events$
DECLARE
    --channel TEXT := TG_ARGV[0];
BEGIN
    PERFORM pg_notify(NEW.channel, row_to_json(NEW)::TEXT);
    UPDATE active.events SET consumed_ts = NOW() WHERE uid = NEW.uid;
  RETURN NULL;
END;
$tg_notify_events$;

CREATE OR REPLACE TRIGGER notify_events
    AFTER INSERT ON active.events
    FOR EACH ROW EXECUTE PROCEDURE active.tg_notify_events();

NODEJS:

const createSubscriber = require('pg-listen');

const channel = 'message_queue';
const subscriber = createSubscriber({ connectionString: process.env.DATABASE_URL });
subscriber.notifications.on(channel, (payload) => {
  console.log('Received notification in ' + channel, payload);
});

subscriber.events.on('error', (error) => {
  console.error('Fatal database connection error:', error)
  process.exit(1)
});

process.on('exit', () => {
  subscriber.close()
});

await subscriber.connect();
await subscriber.listenTo(channel);

Hope it helps!