A PostgreSQL-only event server in Go. Does one thing and one thing only: take events generated by the PostgreSQL NOTIFY
command and passes the payload along to waiting WebSockets clients.
Builds of the latest code:
Download the source code and build:
make build
The executable will read user/connection information from the DATABASE_URL
environment variable and connect to the database, allowing any client with HTTP access to the server to connect and set up a WebSocket listening to any allowed channel on the server.
export DATABASE_URL=postgresql://username:password@host/dbname
./pg_eventserv
SET DATABASE_URL=postgresql://username:password@host/dbname
pg_eventserv.exe
Once the service is running, you need a client to attach a web socket to it. You can use the built-in viewer at http://localhost:7700/
for testing, but you will eventually need to build one into your client application.
Here is a very simple Javascript client, for example.
<!DOCTYPE html>
<html lang="en">
<body>
<p><textarea id="display" rows="20" cols="60"></textarea></p>
<p id="status"></p>
<script>
window.onload = function() {
// events on channel 'people'
var url = "ws://localhost:7700/listen/people";
var status = document.getElementById("status");
var display = document.getElementById("display");
// open socket and connect event handlers
var ws = new WebSocket(url);
ws.onopen = function() {
status.innerHTML = "Socket open.";
};
ws.onerror = function(error) {
status.innerHTML = "Socket error.";
};
ws.onmessage = function (e) {
// First try to parse message as JSON.
// Catch failures and return.
try {
var payload = JSON.parse(e.data);
display.innerHTML += JSON.stringify(payload, null, 2) + "\n";
}
catch (err) {
display.innerHTML += e.data + "\n";
}
display.scrollTop = display.scrollHeight;
};
ws.onclose = function(event) {
status.innerHTML = "Socket closed.";
}
}
</script>
</body>
</html>
You can also test the service by pointing your browser at the service test page, http://localhost:7700/
, and entering one or more channel names.
To send a message from the database to the web socket client, connect to your database. Then run the NOTIFY
command:
NOTIFY channelname, 'message to send';
You can also raise a notification by running the pg_notify()
function.
SELECT pg_notify('channelname', 'message to send');
To get more information about what is going on behind the scenes, run with the --debug
commandline parameter on, or turn on debugging in the configuration file:
./pg_eventserv --debug
Any parameter in the configuration file can be over-ridden at run-time in the environment. Prepend the upper-cased parameter name with ES_
to set the value. For example, to change the HTTP port using the environment:
export ES_HTTPPORT=7777
The purpose of pg_eventserv
is to take events that are generated in the database and make them accessible to web clients. The general idea is to instrument the database model to capture the events that are of interest to your application, so you don't need to build that orchestration somewhere else. Databases have lots of logic and filtering smarts, and generating the events of interests inside the database can simplify development of event-driven applications.
You can listen to any channel allowed by the service (the default is to open all channels, but that can be limited with the Channels
configuration option) by opening a WebSocket connection with the following URL pattern.
ws://{host}:7700/listen/{channel}
Once the channel is open all NOTIFY
commands in the database that reference the channel will cause a WebSockets message to be sent to all clients listening to the channel.
The most basic form of event is a change of data. An insert or an update, for example. Here's an example that generates a new event for every insert and update on the table.
CREATE TABLE people (
pk serial primary key,
ts timestamptz DEFAULT now(),
name text,
age integer,
height real
);
CREATE OR REPLACE FUNCTION data_change() RETURNS trigger AS
$$
DECLARE
js jsonb;
BEGIN
SELECT to_jsonb(NEW.*) INTO js;
js := jsonb_set(js, '{dml_action}', to_jsonb(TG_OP));
PERFORM (
SELECT pg_notify('people', js::text)
);
RETURN NEW;
END;
$$ LANGUAGE 'plpgsql';
CREATE OR REPLACE TRIGGER data_change_trigger
BEFORE INSERT OR UPDATE ON people
FOR EACH ROW
EXECUTE FUNCTION data_change();
Install these functions, turn on a web socket for the people
channel (use the service front page, for example) then run some data modifications.
INSERT INTO people (name, age, height) VALUES ('Paul', 51, 1.9);
INSERT INTO people (name, age, height) VALUES ('Colin', 65, 1.5);
Sending data modification events is less interesting than sending events when some kind of condition exists. For example, we might only want to raise events when the new data indicates a height
greater than 2.0
.
CREATE OR REPLACE FUNCTION data_change() RETURNS trigger AS
$$
DECLARE
js jsonb;
BEGIN
IF NEW.height >= 2.0
THEN
SELECT to_jsonb(NEW.*) INTO js;
PERFORM (
SELECT pg_notify('people', js::text)
);
END IF;
RETURN NEW;
END;
$$ LANGUAGE 'plpgsql';
Then send in some updates that pass the filter and others that don't.
UPDATE people SET name = 'Shorty', height = 1.5 WHERE age = 51;
UPDATE people SET name = 'Bozo', height = 2.1 WHERE age = 51;
INSERT INTO people (name, age, height) VALUES ('Stretch', 33, 2.8);