One of the advanced features of PostgreSQL is the NOTIFY
and LISTEN
functionality. These features enable real-time communication between databases and applications. Modern web applications that requires event-driven behavoir and real-time updates will benefit from this.
Combining these features with Ruby on Rails can create powerful real-time applications with minimum overhead.
The NOTIFY
and LISTEN
features allows a database session to send and receive asynchoronous messages. With NOTIFY
, a session broadcasts a message on a specific channel. LISTEN
allows other sessions to subscribe to those messages. The mechanishm enables real-time communication and thus reducing the need for continous polling of the database.
For example, a NOTIFY
statement can be used to alert other sessions that a new record was created.
NOTIFY new_invoice_created, 'payload';
LISTEN new_invoice_created
From here, the application can update notifications, graphs, charts or dashboards or sequentially trigger external integrations.
Trigger functions in PosgreSQL are special functions executed automatically in response to certain events on a database table. These events are basically INSERT
, UPDATE
, or DELETE
which are often used to carry out business logics, audit trails and automate workflows directly at the database level.
CREATE OR REPLACE FUNCTION notify_on_insert() RETURNS trigger AS $$
BEGIN
PERFORM pg_notify('new_invoice_created', row_to_json(NEW)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
Combined with NOTIFY
, this sample trigger function sends a JSON payload to the new_invoice_created
channel whenever a new record is created.
Start by creating a migration to define the trigger. Depending on your preferrence, you can either embed raw SQL in the migrations or create a separate SQL file and call it from there.
class AddNotifyTriggerToInvoices < ActiveRecord::Migration[7.0]
def up
execute <<~SQL
CREATE OR REPLACE FUNCTION notify_on_invoice_insert() RETURNS trigger AS $$
BEGIN
PERFORM pg_notify('new_invoice_created', row_to_json(NEW)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER notify_after_invoice_insert
AFTER INSERT ON invoices
FOR EACH ROW
EXECUTE FUNCTION notify_on_invoice_insert();
SQL
end
def down
execute <<~SQL
DROP TRIGGER IF EXISTS notify_after_invoice_insert ON messages;
DROP FUNCTION IF EXISTS notify_on_invoice_insert();
SQL
end
end
This migration will set up the trigger and function that will enable real-time notifications whenever a new record is created in the invoices
table.
A background job or a dedicated thread can handle the NOTIFY
messages in Rails. The pg
gem provides the necessary tools to subscribe via LISTEN
to channels.
# app/services/postgres_listener.rb
class PostgresListener
def listen
ActiveRecord::Base.connection_pool.with_connection do |connection|
conn = connection.instance_variable_get(:@connection)
begin
conn.async_exec "LISTEN new_invoice_created"
conn.async_exec "LISTEN another_channel"
conn.async_exec "LISTEN other_table_update"
loop do
conn.wait_for_notify do |channel, pid, payload|
if payload.present?
record = JSON.parse(payload)
case channel
when 'new_invoice_created'
handle_new_invoice(record)
when 'another_channel'
handle_another_channel(record)
when 'other_table_update'
handle_other_table_update(record)
end
end
end
end
ensure
conn.async_exec "UNLISTEN *"
end
end
end
private
def new_invoice_created(record)
NewInvoiceCreatedJob.perform_now(record)
end
def handle_another_channel(record)
HandleOtherChannelJob.perform_now(record)
end
def handle_other_table_update(record)
HandleOtherTableUpdateJob.perform_now(record)
end
end
For this example, postgres_listener.rb
is saved inside the /app/services
folder of the Rails project. We begin by looping into the connection pool and listen to specific channels. As soon as a broadcast message is transmitted, we can handle it accordingly based on the channel and the payload that it sends.
Just a heads-up on the payload, it is in JSON format. If you have alias_attributes
in the model for the specific table that you are monitoring, the actual field names will be used as reference. Example, record['first_name__c']
will be used if you have alias_attribute :first_name, :first_name__c
in the Invoice
model.
Another thing to note is the payload
is maxed at 8000 bytes. If you are monitoring a table with a lot of fields, you might want to limit the fields that you want to return on the NOTIFY
part.
PERFORM pg_notify('invoice_updated', '{ "reference_id": "' || COALESCE(NEW.reference_id) || '", "invoice_status": "' || COALESCE(NEW.invoice_status) || '" }');
The code above only returns two fields from the payload. Also, notice the NEW.invoice_status
reference? That means you can also return the OLD.invoice_status
field value if your business logic requires it.
Create a rake file, database_listener.rake
to call the PostgresListener
and listen
.
# lib/tasks/database_listener.rake
namespace :database_listener do
desc 'listen to PG notifications'
task listen: :environment do
listener = PostgresListener.new
listener.listen
end
end
In Heroku, create an entry in the Procfile and provision a dyno for it. You can scale this as the need arise. The listener might be handling different notifications from a number of tables and for different events.
# Procfile
web: bundle exec puma -t 5:5 -p ${PORT:-3000} -e ${RACK_ENV:-production}
worker: bundle exec puma -t 5:5 -p ${PORT:-3000} -e ${RACK_ENV:-production}
sidekiq: bundle exec sidekiq -c 2 -v -q default -q mailers
listeners: bundle exec rake database_listener:listen
release: rake db:migrate
For Rails applications in Heroku that is integrated into Salesforce via Heroku Connect, records created on Salesforce doesn't trigger the ActiveRecord's callbacks. If your requirement is to react on specific events, the only way to get this is via the NOTIFY
and TRIGGER
features of PostGreSQL on the database level.
Another application is keeping distributed systems in sync. Notifications from one service’s database can trigger actions in another service, ensuring data consistency across the ecosystem.
NOTIFY
and LISTEN
are powerful but can affect database performance. The notifications has some overhead and excessive usage can affect the database's operations. Avoid triggering notification for every minor change.
Sanitize payloads to prevent injection attacks and validate incoming messages to ensure they’re from trusted sources.