PostgreSQL contains a wealth of great features. Here is one such not so well known/appreciated feature.
Pub/Sub Notifications
Publish/subscribe messaging, or pub/sub messaging, is a form of asynchronous service-to-service communication used in serverless and microservices architectures.
LISTEN and NOTIFY in PostgreSQL
There are two commands,
-
LISTEN - listen for a notification
LISTEN channel
-
NOTIFY - generate a notification
NOTIFY channel [ , payload ]
The LISTEN
command registers the current session as listener on the notification channel named channel
.
The NOTIFY
command sends a notification event (with an optional payload) to each client that has previously executed the LISTEN
cmd.
Database setup
For a quick demo, I have created the table job_status
with the trigger demo_notify
on it
which fires everytime the column job_status
is updated to 2.
The trigger calls the notify
function which in turn invokes the pg_notify
method which is much more flexible to use.
The function takes the channel name as the first argument and the payload as the second.
CREATE TABLE public.job_status (
id serial constraint firstkey primary key,
job_id text not null,
status int not null
);
create trigger demo_notify
after update on public.job_status
for each row
when (new.status = 2)
execute procedure public.notify();
create or replace function public.notify() returns trigger as
$BODY$
begin
perform pg_notify('event', new.job_id);
return new;
end;
$BODY$
language plpgsql;
-- insert some rows to play around
insert into job_status(job_id, status) values('job_23456', 1);
JDBC Aspect
Here are two java classes.
First is the listener which continuously polls the database to check for the notifications.
Standard LISTEN
, NOTIFY
cmds are issued via the standard Statement interface.
To retrieve and process retrieved notifications the Connection
must be cast to the PostgreSQL specific extension interface PGConnection
.
From there the getNotifications()
method can be used to retrieve any outstanding notifications.
The second class is the driver class which invokes the listener.
package com.vijayg.db;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Objects;
class Listener implements Runnable {
private final Connection conn;
private final org.postgresql.PGConnection pgconn;
Listener(Connection conn) throws SQLException {
this.conn = conn;
this.pgconn = (org.postgresql.PGConnection) conn;
try (Statement stmt = conn.createStatement()) {
stmt.execute("LISTEN event");
}
}
@Override
public void run() {
while (true) {
try {
// issue a dummy query to contact the backend
// and receive any pending notifications.
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT 1");
rs.close();
stmt.close();
org.postgresql.PGNotification[] notifications = pgconn.getNotifications();
if (Objects.nonNull(notifications)) {
for (org.postgresql.PGNotification notification : notifications) {
System.out.println("Got job: " + notification.getParameter());
// interesting async processing
}
}
// wait a while before checking again for new notifications
Thread.sleep(500);
} catch (SQLException sqle) {
sqle.printStackTrace();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
ie.printStackTrace();
}
}
}
}
package com.vijayg.db;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class PubSubDemo {
public static void main(String[] args) throws SQLException {
String url = "jdbc:postgresql://localhost:5432/postgres";
Connection lConn = DriverManager.getConnection(url, "user", "pwd");
Listener listener = new Listener(lConn);
listener.run();
}
}
Now to invoke the listener, update the row in the table
update job_status set status = 2 where id = 1;
At last a key point worth noting,
JDBC driver cannot receive asynchronous notifications.
Beware that if your listener is down when the notifier issues a notification, it will be lost.