PostgreSQL contains a wealth of great features. Here is one such not so well known/appreciated feature.

Pub/Sub Notifications

Publish/subscribe messaging, or pub/sub messaging, is a form of asynchronous service-to-service communication used in serverless and microservices architectures.

LISTEN and NOTIFY in PostgreSQL

There are two commands,

  1. LISTEN - listen for a notification

    LISTEN channel

  2. NOTIFY - generate a notification

    NOTIFY channel [ , payload ]

The LISTEN command registers the current session as listener on the notification channel named channel. The NOTIFY command sends a notification event (with an optional payload) to each client that has previously executed the LISTEN cmd.

Database setup

For a quick demo, I have created the table job_status with the trigger demo_notify on it which fires everytime the column job_status is updated to 2. The trigger calls the notify function which in turn invokes the pg_notify method which is much more flexible to use. The function takes the channel name as the first argument and the payload as the second.

CREATE TABLE public.job_status (
	id serial  constraint firstkey primary key,
	job_id text not null,
	status int not null
);

create trigger demo_notify
   after update on public.job_status
   for each row
   when (new.status = 2)
execute procedure public.notify();

create or replace function public.notify() returns trigger as
$BODY$
begin
   perform pg_notify('event', new.job_id);
   return new;
end;
$BODY$
   language plpgsql;

-- insert some rows to play around
insert into job_status(job_id, status) values('job_23456', 1);

JDBC Aspect

Here are two java classes. First is the listener which continuously polls the database to check for the notifications. Standard LISTEN, NOTIFY cmds are issued via the standard Statement interface. To retrieve and process retrieved notifications the Connection must be cast to the PostgreSQL specific extension interface PGConnection. From there the getNotifications() method can be used to retrieve any outstanding notifications.

The second class is the driver class which invokes the listener.


package com.vijayg.db;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Objects;

class Listener implements Runnable {

   private final Connection conn;
   private final org.postgresql.PGConnection pgconn;

   Listener(Connection conn) throws SQLException {
      this.conn = conn;
      this.pgconn = (org.postgresql.PGConnection) conn;
      try (Statement stmt = conn.createStatement()) {
         stmt.execute("LISTEN event");
      }
   }

   @Override
   public void run() {
      while (true) {
         try {
            // issue a dummy query to contact the backend
            // and receive any pending notifications.
            Statement stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery("SELECT 1");
            rs.close();
            stmt.close();

            org.postgresql.PGNotification[] notifications = pgconn.getNotifications();
            if (Objects.nonNull(notifications)) {
               for (org.postgresql.PGNotification notification : notifications) {
                  System.out.println("Got job: " + notification.getParameter());
                  // interesting async processing
               }
            }

            // wait a while before checking again for new notifications
            Thread.sleep(500);
         } catch (SQLException sqle) {
            sqle.printStackTrace();
         } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            ie.printStackTrace();
         }
      }
   }

}
package com.vijayg.db;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class PubSubDemo {

   public static void main(String[] args) throws SQLException {
      String url = "jdbc:postgresql://localhost:5432/postgres";
      Connection lConn = DriverManager.getConnection(url, "user", "pwd");
      Listener listener = new Listener(lConn);
      listener.run();
   }
}

Now to invoke the listener, update the row in the table

update job_status set status = 2 where id = 1;

At last a key point worth noting,

JDBC driver cannot receive asynchronous notifications.

Beware that if your listener is down when the notifier issues a notification, it will be lost.