graphile/crystal

Adding subscriptions

Closed this issue · 87 comments

In #87 I laid out a timeline for PostGraphQL, but I forgot to add subscriptions to that timeline! Once we’re done writing lots of documentation for PostGraphQL (step 2 in #87), let’s implement subscriptions to make PostGraphQL a viable contender for realtime API needs. I want to open this issue now because I’m not entirely sure how this would work, and I want help designing the feature. I know GraphQL JS has support for subscriptions, and PostgreSQL has a LISTEN/NOTIFY command for building simple pubsub systems. I’m just not sure how to wire them up.

This is especially challenging because (as I understand it) LISTEN/NOTIFY is not statically typed, so this means events are:

  1. Not discoverable.
  2. Not easily convertible into a statically typed GraphQL schema.

To make things even more tricky, in the default configuration NOTIFY payloads must be shorter than 8000 bytes.


Here are my preliminary thoughts on implementation, but let’s discuss them!

I’ve talked to people before who’ve implemented subscriptions in their GraphQL API and they said that whenever their server would get an update they would rerun the full GraphQL query. Lee Byron also said Facebook did this in the Reactiflux Q&A. The quote is:

the full subscription query is run every time the underlying pubsub system gets triggered

So we’ll do that. This means the 8000 bytes can be a Relay ID or something similar.

PostGraphQL would LISTEN to the postgraphql channel, where we’d expect information about types and a table primary key from NOTIFYs.

But we still need somewhere to define what kinds of subscriptions PostGraphQL knows about on startup time, so do we define these in a JSON file? In CLI arguments? Is there a way to add metadata to the PostgreSQL database itself? Do we create a postgraphql_subscriptions table and expect users to register subscriptions there? Or would a table like that be completely internal?

Thoughts?

Why do we need additional configs? Can't we discover the channels from the GraphQL subscriptions?

Let's say the client does something like

subscription newMessages($toId: Int!) {
  allMessages(condition: {toId:$toId, seen:false}, orderBy: CREATED_AT_ASC) {
	content
	seen
	fromId
	createdAt
  }
}

This would tell Postgraphql that there is a channel called newMessages, and that notify's to this channel should come with a toId in the payload. So it will LISTEN newMessages, and when it gets a NOTIFY it will match that against the toId argument provided by the connected clients. If the argument matches, it will run the GraphQL query from the subscription and send the results to the client.

There is a bit of discussion and some examples for inspiration here:
https://scaphold.io/blog/2016/11/09/build-realtime-apps-with-subs.html

Also it could perhaps be beneficial to utilize a pre-made websocket implementation such as https://github.com/apollostack/subscriptions-transport-ws

Slightly unrelated, but also related slightly is the current lack of ability to send emails without having to manually setup a microservice which subscribes to Postgres events.

An alternative could be having PostgraphQL calling out to hooked files...

I plan to be working on this feature as I'll be using this module in an up coming project, which will need subscriptions. Have you had any thoughts on the implementation? I'm new to this codebase and would love to contribute with a solution.

@RpprRoger Very open to you submitting a spec which we can iterate on before attempting to implement. I keep meaning to do this myself but am barely staying afloat!

I think a good approach to both #92 (comment) and #92 (comment) would be a straight-forward way (maybe some sort of sanctioned npm module) to hook into pg events.

I would recommend against using LISTEN and NOTIFY as they are not reliable (fire and forget) and have size limitations. If you need good delivery guarantees (in order, at least once) logical decoding is the way to go.

My first stab at a multi-vendor tool to stream events from a database can be found here:
https://github.com/JarvusInnovations/lapidus

How to emit events that are not "rows":
As of 9.6, pg_logical_emit_message (docs) is available for use in triggers much like NOTIFY is but you enjoy the same delivery guarantees. (JSONCDC does not support these messages yet, however, it's just a few lines of code to do so)

Logical decoding allows us to:

  • Handle transactions and rollbacks properly
  • Offer good delivery guarantees (in order, at least once)
  • Scale real-time subscribers independently of the database
  • Scale out using multiple instances of PostgreSQL

We should try to shed load away from the database at all costs, so nchan seems like a good fit here. To protect against unauthenticated DDoS attacks, you really don't want to be doing auth against the database, so putting NGINX/OpenResty in front and using Redis for auth/sessions and pub/sub makes good sense in production. (nchan does not required Redis, but it can use it to scale out and persist the configurable message queues). This is a lot to cover in a summary, however, I assure you, it checks the boxes if you can deal with the dependencies. I feel like building all of this into Postgraphql would be feature bloat and unwise.

FOOT CANNON WARNING:
If you create a logical decoding slot and forget about it PostgreSQL will keep all WAL files required by that slot. (You will eventually run out of disk space!) I would recommend adding a configurable option that decides what to do in that scenario.

PaaS and hosted environments:
Depending on your environment, you may or may not be able to install a logical decoding plugin.

Related resources:
https://github.com/slact/nchan
https://github.com/posix4e/jsoncdc
https://github.com/2ndQuadrant/pglogical (out of the box, logical decoding is an "all or nothing" affair, pglogical makes everything better, depending on your use case)

Due diligence?
Should we be suggesting/providing recipes for secure, scalable, production-ready deployments? I'd argue the barrier to entry has been lowered to an extent where it's on the people who know better to help beginners with securing and scaling out. Getting real-time right requires you to do ACL/RLS in a very mindful way that can also be applied to events.

Has anybody tried to build some sort of proof of concept? Do you people prefer the solution based on NOTIFY or logical decoding? I’m a big fan of this general idea, it would make this project really versatile for all sorts of use-cases.

Would love to see this happening. I'm working on a realtime POC from Postgres -> wherever and getting subscription would really be a killer feature to make it happen

Given the stated limitations of NOTIFY/LISTEN, especially the need to scale out to multiple databases, it might be wise to implement this as @jmealo described. That said, it would likely be best as a "Subscriptions" graphile-build plugin and this could be disabled if desired.

My current plan is actually to use both. The standard CRUD events would be logical decoding, but you'd also be able to use listen/notify to trigger custom events. These custom events cannot be auto-detected so my plan is to just have a generic API for them. I've also considered letting the client hook into this generic API so it can trigger events too; I've not 100% figured the security requirements around that yet (e.g. To prevent DOS) or what the payloads would be (at the moment I'm considering null payloads so you just go in via "query"). I have notes on paper but nothing solid at this point.

(Also I plan to require redis as the pub/sub mechanism and to effectively relay notifications though that from the DB listen/notify; would this cause anyone issues? I was thinking if a redis server was not specified we'd simply disable subscriptions. This also allows for us to add caching and cache-eviction in the future.)

I would prefer a solution without a Redis server. Systems with fewer dependencies are always better.

MrLoh commented

I think it is a good idea to use Redis, as it is the standard for GraphQL subscriptions right now and works well for custom servers. Trying to run without Redis or something similar will potentially make it very hard to scale such a soltuion.

@MrLoh: @ansarizafar: @benjie: @chadfurman: @simplesteph: @arnold-graf: @JesperWe @RpprRoger: Would you be interested in forming a working group regrading real-time event streaming from PostgreSQL [admittedly, I'd like this to be "frontend" agnostic, meaning it will work with postgraphql, postgrest, or your own APIs]?

@ansarizafar: I went the Lapidus/Socket.io (tiny side car service) route and a lot of the things I was doing to avoid Redis created a tremendous amount of WAL writes and "churn" (this is bad, especially on IO constrained VMs). I applaud keeping dependencies and complexity down, but it's only superficially more simple. I had a Koa-based API that had all "lookup tables" cached in memory. Sure, everything was super fast, but... you'll find your caching code starts to approach that of your application code and you'll say to yourself "wow, all of this to avoid Redis?!" ... YMMV

I have done a ton of R&D and experiments weighing the pros and cons of adding dependencies. I'd really like to get some feedback on what kind of messaging guarantees you guys want and how you handle auth/sessions/caching. I'm not as familiar with GraphQL subscriptions as I am with "all you can eat firehose" streaming from your database but I'd assume the same mechanics apply.

TLDR; For production usage: don't put your sessions/auth/pub/sub in PostgreSQL it will not allow you to scale features independently or protect your production data. You'll be stuck in "throw hardware at it" or refactor mode. PG10 lets you shard/replicate so you could manage a bunch of separate PostgreSQL servers... or you could just add Redis now and have a polyglot stack where you're not backed into a corner✝.

✝ If this is a small project and security is not an issue and you want to keep things simple, disregard this advice :). Do not over engineer. If you plan on using some Go, Node, Python, Rust and making a stack out of open source software I definitely think it's worth doing your sessions and auth outside of your application stack so you have flexibility moving forward.

  • Auth/Sessions outside of PostgreSQL = Protects your DB, is "required" for a truly secure RLS implementation that uses sessions
  • Pub/Sub outside of PostgreSQL = Allows you to scale pub/sub (real-time) separately from your database.

@MrLoh: @benjie:

If you do an internal subrequest in NGINX it doesn't have the overhead of a network connection. Managing pub/sub, auth and sessions from within NGINX really helps keep latency and resource usage down.

If you keep sessions and auth you can do a security definer function and prevent any privilege escalation (even with arbitrary SQL queries!!) with a properly configured database and RLS rules. The nice thing is, if you don't have a monolithic stack or it's polyglot, you can use a single real-time, sessions and auth mechanism separate from your app code (it's really great, it just feels right).

If you want to bind SSR to a "materialized view" (pre-rendered web stuffs, not the db, however, if you want automatically refreshing materialized views, I've done that too, but it requires the entire "table" to be rewritten which will thrash your WAL -- it's better to write a routine that only rewrites the rows that have changed. This is actually really trivial the way I've done it.) Anyway, these patterns make it so you never touch your database except to do mutations. If you're in an event-only system and want to do CQRS or you're into mostly append-only tables this is as close as I could get us to develop nirvana.

That being said, if I had a bit more time, I believe you could do all of this in Tarantool and get really great performance too... HOWEVER, my goal prior to discovering Tarantool was to release a lapidus-type plugin for each language so you could keep your data in memory (whether that be node, php-fpm, or some kind of shared memory slab).

I'm definitely interested in using a shared solution rather than reinventing the wheel! At the moment I don't have time to join a working group on this though - my priority at the moment is to get v4 out.

What's the quickest path to have subscription support? Websockets in a plugin?

we'd also want to make sure the session token is carried over: https://stackoverflow.com/questions/4753957/socket-io-authentication

Which means we'd likely want to send our auth token as a cookie (header auth won't work with sessions). Might as well make it HttpOnly and SameOrigin while we're at it (if not by default, then with flags). There's a ticket for exactly this:

#501

I'm working on #501 next week, will send a PR with what I come up with. @benjie has been very helpful in guiding me there.

You can use socket.handshake.headers to extract the authorization header; websockets send standard HTTP headers as part of the negotiation before upgrading to websocket protocol. Cookies come with their own set of CSRF and similar issues, so if we go that way we will need to ensure those issues are addressed.

(Not that JWTs aren't problematic! But I'm planning to deprecate them in v4 and replace with a largely compatible yet more secure alternative. Still in the research phase with this.)

Cookies do not add risk of CSRF if implemented correctly. Using SameOrigin policies and putting your API on a subdomain, for example. Moreover, HttpOnly cookies protect the JWT from XSS attacks.

Also, double-submit tokens that the API generates and the client submits twice -- once in the body of the request, and once in the HttpOnly, SameOrigin signed cookie that the API set. This means that an attacker triggering a CSRF could potentially include the double-submit in the body of their request, but getting it into a signed, SameOrigin cookie is going to be very very difficult.

It's not uncommon to run APIs on a separate subdomain (or even domain) and allow multiple parties to interact with them; this is in-part why we have the --cors option in PostGraphQL. Fortunately with the new pgSettings we can let the user implement the authentication solution that works for them (e.g. SameOrigin). So long as the subscription features hook into this (which they should be able to) we can reuse whatever logic is used for the regular requests with subscription requests.

The goal here is to have postgraphile generate "subscription" graphql schema as well as to create and maintain the subscription channel. Moreover, we need a way for a trigger in the database to emit an event to the browser clients.

For super-simple custom subscriptions, I'd be fine with pushing a simple eventName down a pipe and letting the client-side fetch what it needs with a separate query after that. Certainly not where we want to end up, but might be a quick and easy stop-gap until we get there.

Yeah, that's where I plan to start, just generic subscription infra with a field name like "subscribe" that you can pass a string key to, and optionally a "publish" mutation that does the reverse for peer-to-peer pub/sub.

I'm not sure how to limit the subscription messages to only the people who are meant to be able to see them; I was thinking running the subscription name through a postgres function at the time of subscription for a yes/no and then periodically (at a user defined period, which could be set to never) revalidating.

Another alternative I'm considering, and in fact leaning towards, is writing the subscriptions to a subscription table (id, topic_name, user_id, subscribed_since) such that the system (a trigger or whatever) could delete the row from the table which effectively revokes access to that subscription. That way a logout/kick from channel/password reset/unfriend could take immediate effect.

Does anyone want to weigh in on this?

I like the table idea, which might also be able to help through RLS

I also feel a postgres function to "direct" the message (on publish) would be useful. Something custom so I can say, "msg came from user 2 who is in room 3 so everyone in room 3 gets this message" or "msg came from admin, allow it." vs "non-admin said 'start' but is not authorized"

I guess I'm thinking more.like a postgres event emitter that clients can subscribe to, rather than p2p

Implicit versus Explicit Subscriptions
Do you want to allow clients to subscribe and then check if they're allowed to subscribe OR do you want to have the subscriptions implicit?

Implicit subscriptions meaning "this student in Math-07 gets all Math-07 course messages, messages directed to themselves, messages for the Class of 2018, messages for their campus".

Instead of writing code to check ACL against incoming subscriptions, write code that decides what subscriptions the user should have and automatically manage those subscriptions***.

*** For heavy UIs/views, I recommend binding the data to the view and then subscribing to the view, you can combine this with the packages I've mentioned above (Ledge in particular) to push down SSR HTML or populate an external cache prior to the web socket notification shielding your DB from a bunch of cold render calls. You're free to pump down JSON too, but you're not limited.

If you control the real-time notifications, even in a distributed system you can make sure not to send down links to a "cold" URL, pre-warming the cache before sending the notification allows for 100% cache hits and better overall responsiveness.

So, in many cases, rather than you having client code know to subscribe to various things and then server code to check whether they're allowed to subscribe... just handle all subscription logic on the server and have the client handle the messages it receives.

Routing messages to users in a stateless service using only a database row
If your goal is to make a stateless service you have to be able to "route" messages to the correct user. Sometimes that'll be available as a column. In advanced cases you may be doing writes to several tables at which point you need to consume all of the table changes in a transaction (in order to be stateless/not make DB queries to determine the destination users).

I've gone both routes and as long as your schema is "user-addressable" meaning, any mutation that occurs that should notify users has one or more columns to identify who should receive that change you're good.

Keep in mind, if privacy or subscriptions are part of your application logic, think about how you want to implement that so that it covers your "real-time/pubsub" and graphql/rest access (RLS).

Writing RLS rules and constraints with re-use in mind
If you can write your RLS rules in the same language as your subscription server (let's say you use PLV8 for your constraints and RLS rules, and you use Lapidus + Socket.IO) you can re-use some logic.

Using introspection (to get the constraints/rls rules that correspond to tables/columns, etc) you should be able to automatically do the plumbing with less code than you'd imagine.

Hopefully this is helpful, I'd love to hear more concrete use cases.

Session/Connection versus User (Person)
I would like to point out that addressing to the active sessions/sockets/people versus the user is very different. Multiple tabs are multiple connections unless you use a connection sharing solution (usually local storage for a lock, then window.postMessage for coordination). Consider the use case of one tab subscribing to a view (dashboard maybe?)... that's a connection/session level subscription versus a person one.

Avoid complexity at all costs -- keep it stateless if you can
I'd recommend making pub/sub so fast/reliable that you avoid trying to optimize for it in code, it's a slippery slope. It's not hard to do though, even restoring from pgdump doesn't crash Lapidus but V8 garbage collection can be rough sometimes, so I'm working on some LuaJIT/Rust/Go code to get better performance characteristics.

Generic WAL Messages
This will allow you to send durable messages from a trigger.

nchan
NChan will allow you to publish messages from your backend code using HTTP headers (X-Accel-Redirect style).

Keep in mind that PostGraphQL knows nothing of your user system/etc so any system we implement has to be generic enough that it fits almost all use cases whilst still being efficient/performant.

I'd like to see concrete examples in terms of GraphQL subscription queries and the SQL calls they trigger/events they listen for of the different things you outline; for example how do you see implicit subscriptions working in terms of GraphQL subscriptions; and what calls indicate which connections to send them too?

BTW, I see any SSR renders being pushed down as outside the scope of the project.

For explicit subscriptions, @benjie, In postgraphile, would it make sense to support a "subscriptions" schema (with a cli flag to rename it). The tables in the schema could be exposed as subscriptions, and the RLS policies on those tables would determine which users would be notified when a new event is "inserted" into the table (as well as which users could insert events). The actual event would be the GraphQL / JSON of the record.

@jmealo implicit subscriptions sound super sexy cool. For implicit subscriptions, users could be subscribed to all tables in a "subscriptions" schema, with RLS sorting out who gets what?

@jmealo w.r.t. PLV8, many custom extensions are not available in Amazon RDS. PLV8 v1.4.4. is supported, so this is a moot point. Maybe just for others reading this thread thinking of other extensions?

Not sure about nchan or generic WAL messages -- don't know enough to comment.

There was talk about Redis at one point -- If our server is behind a load balancer, using something like socketio-redis will be important.

I agree that SSR pushes and UI bindings are no more than a documentation task

Chad, how do you imagine us leveraging the RLS policies at event time? Concrete examples would be useful - are you suggesting writing to the table for the event and then for each subscriber reading from the table to determine if they can see the event or not; or do you have a more efficient implementation in mind?

@benjie Good catch. I was thinking write -> trigger read -> trigger delete. Wasn't thinking about efficiency.

I took a step back this time and re-read http://graphql.org/blog/subscriptions-in-graphql-and-relay/

They're using event-based subscriptions (i.e. explicit) because live queries (i.e. implicit?) proved substantially harder to implement.

They also state, in the above docs link, the following:

At Facebook, we send this query to the server at build time to generate a unique ID for it, then subscribe to a special MQTT topic with the subscription ID in it, but many different subscription mechanisms could be used here.

On the server, we then trigger this subscription every time someone likes a post. If all of our clients were using GraphQL, we could put this hook in the GraphQL mutation; since we have non-GraphQL clients as well, we put the hook in a layer below the GraphQL mutation to ensure it always fires.

So their implementation has users query the subscription and then the server establishes a pipe back through MQTT -- we could use MQTT too? If not MQTT, then socketio-redis?

Then we just need a way to trigger those subscriptions. Sending a notification via a mutation would be useful, I can imagine. Also, a client sending a notification directly without going through the database at all would also be quite useful. Both are possible.

In the situation where the client is sending the notification directly, the client calls a mutation which doesn't touch the DB but instead must be programmed as a postgraphile plugin. Perhaps, specifically, a subscription (as opposed to a plugin) which might have a cleaner interface for binding / emitting events. The subscription can, at that point, pass the event directly into MQTT, do "some form of auth", trigger database calls, and more..

In the situation where the stored procedure is triggering the notification, the stored procedure would have to either:

  1. write to a table and RAISE a notification with the table and record ID
    -- I don't like writing to a table, at this point. However, the structure provided by a table schema needs to come from somewhere. Not sure on the message limit for RAISE. Maybe we can make some ugly format string here that can provide schema information along with the event message, but I really don't like that idea at all.
  2. write to a table and NOTIFY
    -- this has the same drawbacks as 1, along with a known message-length limit. However, if #1 is not async and interrupts the flow of the function, then NOTIFY might be better...
  3. write to MQTT directly
    -- is this even possible?
  4. return an EVENT_TYPE like we have a JWT_TYPE. The EVENT_TYPE would contain all information necessary to write to the appropriate subscription.

Also, without writing to the tables I don't think RLS can help us at all. The auth logic will have to be handled in the subscription code, which means either in the database or in MQTT/Redis (where, technically, it should be anyways and so maybe this is part of a larger push)

EDIT:
Mixing authentication in with the subscriptions might be a pre-alpha v2 of this idea. For starts, we should focus on getting persistent messaging established between the client, the server, and other clients in the room. I like the idea of using MQTT as our messaging base -- MQTT supports usernames and passwords and so when we're ready to discuss implementation, the method of delivery is already solved.

This looks like the place to continue get some background on the auth discussion: #40

I'm not too worried right now about the specific technology for handling the pub/sub (listen/notify, redis, mqtt, rabbitmq, etc) I'll probably make this part pluggable, there'll be one "source of truth" and the others (if any) will syndicate into/out of it.

I'm not too worried now about how the events will be triggered (e.g. In custom PostgreSQL events probably via NOTIFY in a hook that syndicates out into the main (e.g. MQTT) pub/sub; in default PG CRUD events probably via logical decoding syndicated into the main pub/sub; in plugins via a direct write to the main pub/sub; in future we might allow users to directly trigger events also (in which case additional filtering may be necessary)).

Here is where I am not sure:


Say we've thousands of people connected and they've all subscribed* to the following subscription:

subscription {
  userUpdated(id: 3) {
    user {
      username
      location
      bio
      followerCount
    }
    query {
      currentUser {
        friendsCount
      }
    }
  }
}

Say also that you're only allowed to see a user if they're your "friend" (we'll skip the mechanic of how you become friends!).

What should happen when the userUpdated:id=3 event is triggered?

* or "attempted to subscribe" as in some of the scenarios the initial subscription will be rejected

Option 1: send it to everyone

We'll run the GraphQL query for every user and for those that aren't allowed to see the user will automatically get user: null as part of their subscription payloads since the data is blocked by RLS policies.

PRO: simple

CON: information disclosure: it can inform you that user 3 exists and that they updated their bio, even if you were not able to discern these things via other means (the actual data will not be leaked though since it is protected via RLS)

Option 2: filter events through a PostgreSQL function

e.g.

create function filter_event_viewers(event_name text, viewers subscription[])
returns subscription[] as $$
-- custom logic here, return only the subscription objects
-- that are allowed to see this event
$$ language plpgsql;

PRO: more efficient - one function call up front could prevent the need for many queries later

PRO: works for generic subscriptions (e.g. those not using RLS, or even those not sourced from PG!)

PRO: rather than asking "who can see this?" it asks "who out of these people can see this?" which should be a much smaller response (and require less work)

CON: must still be ran once for every event passing in every subscriber to that event (for the current node). Maybe a good use case for PLv8 due to JIT / hot loops though...

CON: complexity of implementation for user

CON: doesn't directly use RLS logic in filter

NOTE: it'd also be possible to implement this via a plugin (i.e. in native JS rather than SQL).

Option 3: check at subscribe time if you're allowed (and revalidate every X seconds)

Like option 2, but only applies at the time you start the subscription (and revalidates if duration since last check is > X seconds; e.g. 5 minutes)

PRO: more efficient than 2

PRO: can reject subscriptions outright up front, so don't even have to maintain a socket for bad actors

CON: information disclosure for X seconds if access is rejected (but the actual data will not be leaked because the query will still be re-executed and the data therein will be protected by RLS; it's just the fact that the event occurred that will be leaked)

Option 4: use a subscriptions table to track subscribers

When the user runs the subscribe query, insert a row into the table; if RLS rejects the write then reject the subscribe query.

Via logical decoding we can detect when the row is deleted (hoping the replication delay isn't too long!) and thus reject the subscription at a later time (e.g. as the result of a trigger elsewhere in the system).

On subscription end we'll delete the row to clean up after ourself.

PRO: leverages RLS policies

PRO: rejects subscriptions in a timely fashion

PRO: no additional overhead per-event

CON: involves writing to the DB (though only twice per subscription - not per event)

Option 5: something else (suggestions welcome)

Hi,

I like, option 2 the best because it seems to always do the right thing from a user perspective, where other options either send unexpected event (1), send events that are not expected anymore (3) or does not send events that were not authorized but now are (4)

not for option 2, maybe the implementation could decide also have a filter function to be called on subscription (to avoid registering subscription that can never be allowed) if performance become a problem

implementation details:

could the function looks like this instead (or optionally):

create function filter_event_viewer(event_name text, viewer subscription)
returns boolean as $$
-- custom logic here, return true if subscription
-- is allowed to see this event
$$ language plpgsql;

to simplify a bit implementation for the user

and while it does not directly leverage RLS a view (or table maybe maintained with triggers) with the event permissions could be maintained in a private schema and the filter function would only have to select in it

select count(*) > 0
from user_updated_event_auth
where viewer = $viewer_id and payload__id = 3

maybe some documentation could explain how to do that. and in the future if we could automatically detect those views respecting a naming convention and use them automatically

Thanks for your feedback 👍

One issue with your proposed simpler function is that it's going to require a function call for each event for each subscriber to that event; so if you had 100 events per second and 100 subscribers per event that'd be 10,000 function calls (each one issuing an SQL query) per second, as opposed to just 100 in my proposed function. A more efficient SQL query might be:

select viewer
from user_updated_event_auth
where viewer = ANY($viewer_ids) and payload__id = 3

yes calling the function for every subscriber comes with a performance hit, I’m suggesting only adding it for convenience but still having the function you proposed. maybe the little gain is not worth the performance implication and the added maintenance burden

I like option #1, solely because it's simplest and most intuitive. The other considerations seem a bit like premature optimization.

As you state in option 1, no specific information will be accessible without privs. so it's a working solution.

Do that and add a filter for edge cases and people that over engineer everything. 😉

Simple always wins! It's 80% the reason for the success of this project :)

I like 2 and 4. Rather than passing in a list of subscribers, you have a subscription table and a filter query (not RLS). The filter-query selects from the subscriptions table the users who are permitted to be subscribed

Then, we can have two subscription types -- RLS (slow) and non-RLS (performant, less secure). non-RLS subscriptions would be best run as a special "subscription" role that has "bypassrls" privileges. We can then run the query once and forward the results off to all subscribers that were originally returned by the filter query.

If the user doesn't expect to be relaying more than a few events a minute, it might make sense to use an RLS subscription (user joined the room that I'm subscribed to, for example). In that case, we'd run the query for every returned subscriber and forward the results. These events could then be throttled to one every few seconds.

Alternatively, pick either one for now and implement it then add the other in later if there's enough demand.

relay modern just updated their subscription docs (only somewhat related): https://facebook.github.io/relay/docs/subscriptions.html

More fuel for the fire: http://antirez.com/news/114 (redis now supports 'streams')

@chadfurman: great find. I was able to get generic WAL messages support into JSONCDC: https://github.com/posix4e/jsoncdc

You can send the PostgreSQL WAL messages into Redis while publishing them to NATS (and possibly HTTP) all using text-based protocols which make it possible to use shared memory or do verify few allocations.

I was doing this in LuaJIT because the FFI to RapidJSON* blows Node.js out of the water. The improved native support and turbofan makes me interested in giving Node a shot.

*RapidJSON can filter/enforce a JSON schema in C/C++ land without using managed memory which ends up being the bottleneck here. LuaJIT was 4-12MB where Node was 80-120MB.

Near zero-cost real-time support and cache management is a bit of an obsession right now :)

@jmealo will jsoncdc work with Amazon RDS? If so, I'd love to use it. If not, I'll need to find something else. I'm thinking it probably won't work as it's not listed as a compatible extension: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_PostgreSQL.html

@chadfurman: It's not currently supported. I put in a request on 9/27 for support to be added to Core RDS. Aurora doesn't currently support outbound replication (so no logical decoding). I've been very mindful in the past to try to support managed services but I'm concerned this could become a problem over time as it's becoming somewhat limiting.

Hello,
what is state of subscriptions now? I'm making decision to use postgraphql or not. Can I use subscription now or not. Thank you

We do not currently have subscriptions; as far as I know no-one has started writing the code for subscriptions. I plan to start working on them once v4 is released - it's top of my priorities post release - I've done a fair amount of research and have a plan in my head but nothing's concrete yet.

Hi Benjie!

Have you decided what mechanisms to use? Would it be postgres triggers?

Probably Logical Decoding, but first I need to find out how well supported it is on the hosting environments the users use. If you want to contribute your environment, please fill out this survey:

https://benjie10.typeform.com/to/ALL9df

(I also plan to support LISTEN/NOTIFY which you can fire through triggers; but that will be less customised)

Hi Benjie! Just started digging into logical decoding. I think LISTEN/NOTIFY would be a good place to start for a basic functionality. I am writing a warehouse management app and I basically need to notify clients to refresh the stock when somebody inserts data into the db. So the algorithm is quite simple: data inserted -> trigger -> notify -> listener triggers refetch on the client side. As far as I know nodejs can already listen to pg notifications (http://bjorngylling.com/2011-04-13/postgres-listen-notify-with-node-js.html). I use react-apollo on my client, so a websocket server must be implemented on the server side?

Yep; listen/notify will be the first thing I implement but it'll effectively just have query on the payload:

subscription {
  listen(channel: 'notify_channel') {
    query {
      allFoos { nodes { id title } }
    }
  }
}

(If anyone can think of a better name for listen/channel above, please let me know. Maybe something more like genericSubscribe?)

But we want the CrUD events to also trigger custom events, such as:

subscription {
  stockItemCreated {
    createdStockItem {
      id
      category
    }
  }
}

subscription {
  stockItemDeleted {
    deletedStockItemId
  }
}

These would probably also accept filters so you can be informed only when specific criteria are matched (e.g. you may only care if one of the items in your local store is deleted). We also need to ensure that the items are visible to you otherwise there's little point telling you that a stock item was created but you're not allowed to see it. This is where the complexity lies.

@benjie: @chadfurman: Sorry for dropping off this for so long. I finally dusted it off. Since my aspirations are pretty lofty I'd like to bow out on this ticket and offer you my vision for using PostgreSQL as an application server (zero coding, use the database, get instant pub/sub, work queue, rest and graphql with edge-based template binding with the option of static site or redis cache population on database changes as well as incoming and outgoing webhooks processed inside of nginx): JarvusInnovations/lapidus#4 (comment)

I'm going to try to break out the logical decoding slot management of Lapidus into a node module or possibly a shell wrapper, that might be useful, I'll ping you for feedback if I go that route (I may expose it as a rest api directly from OpenResty and then do a cli-wrapper around that).

Since all of that sounds pretty... ambitious, I'd like to point out I'm just gluing together fairly robust OSS and hopefully providing cli/rest/gui/documentation/examples as time allows.

Exciting! If you don't already have a PostgreSQL work queue, here's one I've been working on with a simple node worker (idea is you can implement the worker in any backend, or even multiple backends that each work different job types):

https://gist.github.com/benjie/839740697f5a1c46ee8da98a1efac218

Good luck with Topsy!

Not sure if this of any interest to anyone as it's outside the scope of Postgraphql, but there is a very interesting project at https://github.com/DxCx/graphql-rxjs whereby DxCx is getting RXJS and GraphQL to work together to try and achieve Live Queries. Pretty cool project

If you, or your company, would like to see subscriptions sooner rather than later then please consider supporting my work via Patreon:

https://www.patreon.com/benjie

(If your company requires some kind of invoice/contract/concrete value exchange then please get in touch and we'll see what we can figure out!)

🙏 Please support my Patreon: https://www.patreon.com/benjie - huge thanks to those of you already supporting it, and a gigantically humongous thanks to one very generous supporter in particular! 🙏

💼 If any companies want to support this work, please get in touch! There's one company already interested - if anyone else wants to join in that will help share the financial load! 💼


Ignoring the CRUD (Logical Decoding) subscriptions for now, I've come up with this idea for how generic LISTEN/NOTIFY subscriptions might work:

GraphQL

The GraphQL interface will need to support generic notifications, so it can't have specific types. We can obviously make query available on subscriptions, but that's not always that useful. However! We can also make the Node interface available on a subscription, and that is useful. We also need a way of filtering the subscriptions so we're not dealing with the fire hose. I propose the following GraphQL schema (approximation):

type ListenFilter {
  key: String!
  value: String
  inValues: [String!]
}

type ListenPayload {
  query: Query!
  node: Node
}

type Subscription {
  listen(
    topic: String!
    filters: [ListenFilter!]
  ): ListenPayload
}

schema {
  query: Query # standard PostGraphile schema
  mutation: Mutation # standard PostGraphile schema
  subscription: Subscription
}

So you might subscribe to a post being created event with something like:

subscribe {
  listen(topic: "postCreated", filters: [{key: "targetBlogId", value: "7"}]) {
    node {
      nodeId
      ... on Post {
        id
        title
        body
        authorByAuthorId {
          id
          name
        }
      }
    }
  }
}

PostgreSQL

So to make that work, we need to trigger the events. To do so we'll use PostgreSQL's NOTIFY command (or pg_notify function)

Channel name

I propose that the "topic" above is the PostgreSQL NOTIFY channel.

To make sure we're only firing the events that we intend, I propose that we prefix the PostgreSQL NOTIFY channel with postgraphile:.

Filters

The filters would run against the PostgreSQL NOTIFY payload which should be a simple JSON string-string object. (I would have gone with a query string, but these are more awkward to construct in PostgreSQL and harder to digest consistently.)

The keys that you specify on the payload are up to you. When performing the comparisons, we'll stringify each value in Node.js-land so it should be safe to pass 'small' integers/floats, but passing arrays or objects as values would be bad. Officially we only support string-string.

If the payload cannot be parsed as JSON then it will be treated as if it were {}; i.e. any filters would fail.

Filters will only be ran against keys that do not begin or end with a double underscore so that we can reserve __*__ keys for internal usage, such as...

Node identification

We also need a way of specifying which node it is. In PostGraphile nodeIDs are quite simple: it's just the base64 encoded array comprised of the table name and the primary key values; for example this NodeID: WyJ0ZWFtX21lbWJlcnMiLCIxYjE3ODY3Yy03ODM5LTQ4OWQtOWY2Ni1jYmJkMWYyYjQyYjUiLCI4ZGZlMjY3Ni0xNTQ4LTRiM2UtOTk1YS1iYjQ3NzUzNzNkZGMiXQ==

is just a base64 encoding of: ["team_members","1b17867c-7839-489d-9f66-cbbd1f2b42b5","8dfe2676-1548-4b3e-995a-bb4775373ddc"]

Which is the table team_members and the two primary key values team_uuid and member_uuid.

Now, in PostgreSQL json_build_array always puts spaces after the commas, so we can't do the Base64 encoding in PostgreSQL neatly (we could construct the JSON format ourself using string concatenation, but that's a bit icky) so we'll just pass the JSON down the wire to Node which can base64 encode it for us.

We should store this value into a key called __node__.

Example

Combining all the above you might have, for example:

CREATE FUNCTION app_private.posts__notify__create() RETURNS trigger AS $$
BEGIN
  PERFORM pg_notify('postgraphile:postCreated',
    json_build_object(
      'authorId', NEW.author_id,
      'blogId', NEW.blog_id,
      'section', NEW.section,
      '__node__', json_build_array('posts', NEW.id)
    )
  );
  RETURN NEW;
END;
$$ LANGUAGE plpgsql VOLATILE;

CREATE TRIGGER _900_notify_create AFTER INSERT ON posts FOR EACH ROW
  EXECUTE PROCEDURE app_private.posts__notify__create();

Security

Events with no node field

If no __node__ field is specified in the event payload then we will not perform any checks and will simply run the subscription payload using the subscribing users privileges. Obviously the node field will be null because no __node__ was specified, but there is a small amount of information leakage here in that you know that the event itself fired - e.g. if you listened for postCreated authorId=7 and that event didn't include a __node__ on it's payload then you'd be able to infer that author 7 created a post even though you can't necessarily see what that post is (you can also infer that an author with id 7 exists).

If this is a concern then you'd just have to make sure that your pg_notify payloads specify __node__ as in the example above.

Events with node specified

When __node__ is specified we can go one step further and check that the user doing the subscribing is allowed to view that node; e.g. we might run an SQL query with their privileges along the lines of select 1 from table_name where primary_key = $1.

By default this is an index-only scan so should be highly performant; however once RLS comes into the picture things may get a little more complex, however I still think this is a fairly good solution.

If no rows are returned by the above query then we will not trigger the GraphQL subscription payload, and thus the user will have no idea anything happened and thus no information is disclosed.

If a row is returned then we can continue to execute the rest of the subscription payload using the user's privileges and send the data down to them.

Additional protection against bad actors

For good actors I think the above is fairly robust and performant. However bad actors could try and abuse the system to perform DOS-style attacks. For example if they subscribed to postCreated with no filters then for every single post that was created the attacker would be performing an SQL query that invokes RLS. They could potentially set up a number of these in parallel (maybe even thousands) which could cause the system to be overwhelmed as every write to the database would immediately trigger thousands of SQL queries.

If this is a concern then the user could set up validation for any subscription requests (e.g. by writing a graphile-build plugin) that might enforce that certain filters are in place, or that might discard the subscription request outright if the user is not allowed to perform it. This kind of plugin is fairly easy to construct and is very flexible.

Revoking sessions/subscriptions

The above assumes that all queries will be running with the credentials that the user used when they initiated the subscription. If the user logs out (or is forcefully logged out!) but does not terminate the websocket connection then the subscriptions will continue to arrive using their old auth. I've not solved this part yet.

💼 If any companies want to support this work, please get in touch! There's one company already interested - if anyone else wants to join in that will help share the financial load! 💼

🙏 Please support my Patreon: https://www.patreon.com/benjie - huge thanks to those of you already supporting it, and a gigantically humongous thanks to one very generous supporter in particular! 🙏

Potentially actively kill the websocket connections every period of time (e.g. 15 minutes or so) to force re-authentication.

__latestOnly__

I'm thinking another reserved property we can add is __latestOnly__: uniqueStringHere - this would signal to the server that if it starts getting overwhelmed with notifications that it can drop any notifications on the same channel that have the same value for __latestOnly__ and only deliver the latest one - a way of clearing through the backlog faster.

This might be useful for UPDATE clauses where you don't care about the intermediary changes - you just want to know what it is NOW - in this case the __latestOnly__ value would be something unique and persistent about the record e.g. its primary key.

This would only take effect if that key was specified and it's value was non-null; e.g. it wouldn't necessarily make sense to use for INSERT actions because each of those need to be notified to the client.

I'm not planning to build this right now, but we can add it later without breaking backwards compatibility because __*__ are reserved properties.

When __node__ is specified we can go one step further and check that the user doing the subscribing is allowed to view that node; e.g. we might run an SQL query with their privileges along the lines of select 1 from table_name where primary_key = $1.

Problem: what if the event is informing us that the node was deleted? I think in that case the event would need to come along with __nodeDeleted__: true so we skip the node check (and also skip the node fetch). This does leak information though (i.e. that the record existed, and what it's primary keys were). I think this will just have to be a caveat that we accept.

Early on in one of my projects I was using RethinkDB changefeeds for realtime updates. I eventually went back to Postgres and decided to manually trigger subscription events via GraphQL mutations. I came across an interesting article where someone actually re-implemented changefeeds in Postgres and actually improved performance. There are links to the code in the article. I figured this might be helpful, as it seems like a proven and well-tested application of some of the ideas proposed here.

While there is a difference between @live and subscriptions, I think some of the ideas in the article are useful for building change detection dependencies, which may help the overall efficiency of the subscription system.

I'll admit, the implementation details are a little over my head, but I'm excited to see where this all ends up. Great work!

I haven't read this entire thread, so there might be duplicate information and/or ideas that have already been shot down.

  • I like nChan, but as of a couple of months ago, I ran into some issues when it was used in a Redis cluster. I'm going to revisit it again in a few months, but I'd make sure it's well-tested if going this route.
  • I'm currently using RabbitMQ with STOMP + Web STOMP plugins. It's working really well and I do like having standard AMQP configuration options like routing keys and topics.
  • At first I felt like RabbitMQ would be a bit overkill for this purpose, but if going the route of logical decoding instead of LISTEN/NOTIFY for reliability reasons, then I'd choose RabbitMQ over Redis to keep parity with the reliability theme.
  • One of the main issues is going to be how to bridge from Postgres into AMQP/RabbitMQ. I currently have a small application that subscribes in Postgres and dispatches to a RabbitMQ exchange. It's written in C++ and uses libpq + kqueue (FreeBSD) for efficient listen/notify. It's not my ideal solution, but it's working well currently for my own limited uses.

I'll add more comments as they come to me.

@jmealo Do you have some links that I can read up on as to the reliability guarantees of pg_listen/pg_notify? I'm having trouble finding relevant info. I'd like to look over what's already out there before going through the source lol. Thanks! :)

Probably the best place to read about them is in the docs:

if a NOTIFY is executed inside a transaction, the notify events are not delivered until and unless the transaction is committed

if a listening session receives a notification signal while it is within a transaction, the notification event will not be delivered to its connected client until just after the transaction is completed (either committed or aborted)

If the same channel name is signaled multiple times from the same transaction with identical payload strings, the database server can decide to deliver a single notification only. On the other hand, notifications with distinct payload strings will always be delivered as distinct notifications. Similarly, notifications from different transactions will never get folded into one notification. Except for dropping later instances of duplicate notifications, NOTIFY guarantees that notifications from the same transaction get delivered in the order they were sent. It is also guaranteed that messages from different transactions are delivered in the order in which the transactions committed.

Postgres only sends the notifications to connected clients, it does not cache them, so if a client loses connection it will not receive the notification.

Ah ok, so it's pertaining to Postgres behavior by design. I misinterpreted and thought the implication was that it sometimes failed to deliver notifications to actively connected clients.

My personal opinion is bridging between Redis or RabbitMQ from Postgres to queue up the notifications.
Between the two, it comes down to durability/reliability. If notifications for subscriptions don't require delivery guarantees, then I'd probably go with Redis. Otherwise, I'd go with RabbitMQ.

If going with Redis, then nChan would be a great option (provided that delivery is reliable). One potential issue though is that nChan isn't really designed such that clients can directly PUB/SUB to messages from Redis alone (they want you to use the HTTP API). A positive to this approach though is that it's integrated into nGinx, which means that HA is easier because it's decoupled. If an upstream server, like PostGraphile, goes down, the client won't lose the WebSocket connection.

With respect to RabbitMQ, I'm actually working on a plugin that subscribes to notifications from Postgres and publishes them into a RabbitMQ exchange. I currently like this approach, but we'll see how it goes once I'm done with it and have had a chance to see the pros and cons.

I use PUB/SUB for both server-side task queues as well as client notifications. The former requires guarantees while the latter does not. I have both RabbitMQ and Redis clusters going, but the former has more flexibility and reliability for me (I don't want it to drop task events, but client events aren't mandatory), so I'm using that as the underlying service for message storage and delivery.

Anyway, food for thought.

I'm not aware of any issues with LISTEN/NOTIFY other than those pointed out in the manual text; I know in 2010 they were discussing issues that the notifications were not sent across the whole cluster (only to the DB node to which you were connected) but I've yet to determine if this is still the case in 2018 - hopefully they've solved it by now, and since the manual doesn't seem to mention it (or at least I've not found a mention yet) I'm inclined to believe this is the case until proven otherwise. (I'm planning to test this for myself at some point.)

Here's the work queue I use with PostgreSQL / PostGraphile, it uses LISTEN/NOTIFY to get near-instant notification of new jobs, SKIP LOCKED to be able to run the tasks in parallel, and seems to work very stably. I've been running it in various places for about 6 months and no issues yet. You could easily adapt this so that the worker's sole purpose is to move the tasks into a different queue system if you prefer.

https://gist.github.com/benjie/839740697f5a1c46ee8da98a1efac218

Yup, that mailing list thread was the only thing that popped up when I was Googling. I'm also going to talk to some of the committers tomorrow for further feedback.

I've seen your gist before and it looks pretty good :) One issue though for me is that it requires extensions, which usually can't be installed onto hosted providers. The best article I've read thus far on using Postgres as a job queue is this one, which digs into the innards of the implementation and shows benchmarks.

I'm using Celery with RabbitMQ for my task queues and Redis for result storage and caching. I'm not sure what the best approach to PostGraphile subscriptions is going to end up being, but it should generally be independent of the storage system.

Both pgcrypto and uuid-ossp are pretty standard extensions available on all Postgres hosting providers I've looked at (RDS, Heroku, Google, Azure, etc). Though come to look at it I don't think it actually needs pgcrypto, and it only uses uuid in one place to generate a unique name for the queue if one is not given by default - both extensions could be removed with a minimum of effort - thanks for the feedback!

The post you reference is from 2015, before SKIP LOCKED was introduced with PostgreSQL 9.5 which improves the situations a bit. It's still not going to be as performant as a dedicated task queue, but it's a good starting point until you need more performance.

@benjie SKIP LOCKED is a god send, helps reduce the complexity :)

What's the general status on subscriptions thus far? I figure a quick & current status update would be good to have. The comments on this issue are getting pretty long and some are pretty dated, so it'll save some some time.

I'm open to contributing if I can as it's something that would be useful for my own use-cases in the coming few months. I mentioned in #523 that I'm currently using STOMP for push events, but I'd love to be able to use that as a secondary option and have the primary option be GraphQL subscriptions. I'm currently looking at potentially implementing a custom PubSub provider for graphql-subscriptions.

I'm thinking about using gun instead of socket.io to for sending messages from db to the browser and react-native. Does anyone have any experience with this?

I believe that the latest versions of PostGraphile and PostgreSQL now have the primitives required to implement this sensibly. I'm swamped at work right now but I'm cobbling something together for a prototype, I'll share a simple solution should I find one.

Cool 👍 Chat with me on Discord about it and I’ll help you make it a server plugin so you can work on it without forking 🤘

@benjie Is there any way I can use subscriptions with apollo-server? I am using 'postgraphile-apollo-server' with nestjs framework, it works great for queries and mutations however I am not sure how to use subscriptions. I can write my own custom subscription but I would like to do it via postgraphile plugin.

@JeetChaudhari I don't have enough experience with Apollo Server to know for sure; but have you tried creating a Subscription extension with makeExtendSchemaPlugin? Maybe it Just Works ™️?

https://www.graphile.org/postgraphile/make-extend-schema-plugin/

module.exports = makeExtendSchemaPlugin({
  typeDefs: gql`
    extend type Subscription {
      testSubscription: Int
    }
  `,

  resolvers: {
    Subscription: {
      testSubscription: {
        subscribe: () => {
          // return async iterator here
        },
        resolve: d => {
          console.log(d);
          return d;
        },
      },
    },
  },
});

@JeetChaudhari

My current approach is to utilize Apollo schema stitching with link composition.

I have a thin, root GraphQL server that stitches together other upstream APIs (both GraphQL & REST). One of those servers is an Apollo GraphQL server that's dedicated to subscriptions. The root server uses link composition (aka. splitting) to detect whether the incoming request is a subscription and, if so, routes it to the upstream subscription server. Regular queries & mutations go to PostGraphile (or other relevant servers).

@benjie I tried but always I am getting the following error.

{
"error": {
"message": "Subscription field must return Async Iterable. Received: undefined"
}
}

Maybe there is something wrong with my implementation. I will give it try with the example you provided and let you know.

@xorander00 Thank you for showing the approach. I would try to get this done through the plugin but if it would take too much time, I would try approach provided by you.

@benjie Thank You, it worked like charm, I was referring to https://github.com/nestjs/nest/blob/master/sample/12-graphql-apollo/src/cats/cats.resolvers.ts and as per example I wasn't writing resolve method, only subscribe. Here is my test subscription plugin.

import { makeExtendSchemaPlugin, gql } from 'graphile-utils';
import { PubSub } from 'graphql-subscriptions';

const pubSub = new PubSub();
let count = 0;
function emit() {
  count++;
  console.log('emit called');
  pubSub.publish('eventName', count);
  setTimeout(() => {
    emit();
  }, 1000);
}

emit();

module.exports = makeExtendSchemaPlugin({
  typeDefs: gql`
    extend type Subscription {
      testSubscription: Int
    }
  `,

  resolvers: {
    Subscription: {
      testSubscription: {
        subscribe: () => pubSub.asyncIterator('eventName'),
        resolve: d => {
          console.log(d);
          return d;
        },
      },
    },
  },
});

Super happy it worked for you! You may want to remove the console.log, I just added that to help you debug 👍

Super excited to announce that we just released 4.4.0-alpha.0 which includes OSS subscriptions and live queries support. To install, use postgraphile@next, and otherwise follow the instructions on the website: https://www.graphile.org/postgraphile/realtime/

I'd love to hear what you think! Come chat in our Discord: http://discord.gg/graphile

Thanks a lot for your hard work 🙂

Finally closing this, one of the oldest open issues in the repo, because 4.4.0 has been out for a while and seems to meet people's needs! 🎉