/pg_eventserv

Small golang server to push PgSQL listen/notify events into websockets

Primary LanguageGoApache License 2.0Apache-2.0

pg_eventserv

A PostgreSQL-only event server in Go. Does one thing and one thing only: take events generated by the PostgreSQL NOTIFY command and passes the payload along to waiting WebSockets clients.

Setup and Installation

Download

Builds of the latest code:

Source

Download the source code and build:

make build

Basic Operation

The executable will read user/connection information from the DATABASE_URL environment variable and connect to the database, allowing any client with HTTP access to the server to connect and set up a WebSocket listening to any allowed channel on the server.

Linux/MacOS

export DATABASE_URL=postgresql://username:password@host/dbname
./pg_eventserv

Windows

SET DATABASE_URL=postgresql://username:password@host/dbname
pg_eventserv.exe

Client Side

Once the service is running, you need a client to attach a web socket to it. You can use the built-in viewer at http://localhost:7700/ for testing, but you will eventually need to build one into your client application.

Here is a very simple Javascript client, for example.

<!DOCTYPE html>
<html lang="en">
  <body>
    <p><textarea id="display" rows="20" cols="60"></textarea></p>
    <p id="status"></p>
    <script>
      window.onload = function() {
        // events on channel 'people'
        var url = "ws://localhost:7700/listen/people";
        var status = document.getElementById("status");
        var display = document.getElementById("display");
        // open socket and connect event handlers
        var ws = new WebSocket(url);
        ws.onopen = function() {
            status.innerHTML = "Socket open.";
        };
        ws.onerror = function(error) {
            status.innerHTML = "Socket error.";
        };
        ws.onmessage = function (e) {
          // First try to parse message as JSON.
          // Catch failures and return.
          try {
            var payload = JSON.parse(e.data);
            display.innerHTML += JSON.stringify(payload, null, 2) + "\n";
          }
          catch (err) {
            display.innerHTML += e.data + "\n";
          }
          display.scrollTop = display.scrollHeight;
        };
        ws.onclose = function(event) {
            status.innerHTML = "Socket closed.";
        }
      }
    </script>
  </body>
</html>

You can also test the service by pointing your browser at the service test page, http://localhost:7700/, and entering one or more channel names.

Raising a Notification

To send a message from the database to the web socket client, connect to your database. Then run the NOTIFY command:

NOTIFY channelname, 'message to send';

You can also raise a notification by running the pg_notify() function.

SELECT pg_notify('channelname', 'message to send');

Trouble-shooting

To get more information about what is going on behind the scenes, run with the --debug commandline parameter on, or turn on debugging in the configuration file:

./pg_eventserv --debug

Configuration Using Environment Variables

Any parameter in the configuration file can be over-ridden at run-time in the environment. Prepend the upper-cased parameter name with ES_ to set the value. For example, to change the HTTP port using the environment:

export ES_HTTPPORT=7777

Operation

The purpose of pg_eventserv is to take events that are generated in the database and make them accessible to web clients. The general idea is to instrument the database model to capture the events that are of interest to your application, so you don't need to build that orchestration somewhere else. Databases have lots of logic and filtering smarts, and generating the events of interests inside the database can simplify development of event-driven applications.

Listening to a Channel

You can listen to any channel allowed by the service (the default is to open all channels, but that can be limited with the Channels configuration option) by opening a WebSocket connection with the following URL pattern.

ws://{host}:7700/listen/{channel}

Once the channel is open all NOTIFY commands in the database that reference the channel will cause a WebSockets message to be sent to all clients listening to the channel.

Simple Data Notification

The most basic form of event is a change of data. An insert or an update, for example. Here's an example that generates a new event for every insert and update on the table.

CREATE TABLE people (
    pk serial primary key,
    ts timestamptz DEFAULT now(),
    name text,
    age integer,
    height real
);

CREATE OR REPLACE FUNCTION data_change() RETURNS trigger AS
$$
    DECLARE
        js jsonb;
    BEGIN
        SELECT to_jsonb(NEW.*) INTO js;
        js := jsonb_set(js, '{dml_action}', to_jsonb(TG_OP));
        PERFORM (
            SELECT pg_notify('people', js::text)
        );
        RETURN NEW;
    END;
$$ LANGUAGE 'plpgsql';

CREATE OR REPLACE TRIGGER data_change_trigger
    BEFORE INSERT OR UPDATE ON people
    FOR EACH ROW
        EXECUTE FUNCTION data_change();

Install these functions, turn on a web socket for the people channel (use the service front page, for example) then run some data modifications.

INSERT INTO people (name, age, height) VALUES ('Paul', 51, 1.9);
INSERT INTO people (name, age, height) VALUES ('Colin', 65, 1.5);

Filtered Data Notification

Sending data modification events is less interesting than sending events when some kind of condition exists. For example, we might only want to raise events when the new data indicates a height greater than 2.0.

CREATE OR REPLACE FUNCTION data_change() RETURNS trigger AS
$$
    DECLARE
        js jsonb;
    BEGIN
        IF NEW.height >= 2.0
        THEN
            SELECT to_jsonb(NEW.*) INTO js;
            PERFORM (
                SELECT pg_notify('people', js::text)
            );
        END IF;
        RETURN NEW;
    END;
$$ LANGUAGE 'plpgsql';

Then send in some updates that pass the filter and others that don't.

UPDATE people SET name = 'Shorty', height = 1.5 WHERE age = 51;
UPDATE people SET name = 'Bozo', height = 2.1 WHERE age = 51;
INSERT INTO people (name, age, height) VALUES ('Stretch', 33, 2.8);