NFIBrokerage/spear

subscription backpressure (?)

Closed this issue · 2 comments

Currently Spear.subscribe/4 emits info messages to the subscriber process with Kernel.send/2 as fast as the messages are received. I don't have any test cases for it but a simple test in IEx shows messages piling up in the subscriber mailbox until they're flushed (i.e. with flush/0 from the IEx helpers) or otherwise handled. There is currently no backpressure a subscriber could exert to ensure that their mailbox doesn't get flooded.

A consumer could work around this by reading up until they are at the end of the stream and then subscribing, and an implementation involving a GenStage which uses Spear.read_stream/3 or Spear.stream!/3 along with t:Enumerable.continuation/0s to catch up based on consumer demand and then subscribes with Spear.subscribe/4 could handle this backpressure pretty well. (See the GenStage.Streamer implementation for inspiration.) This implementation would suffer from bursty appends to the EventStoreDB stream, though, and could end up with a large mailbox regardless with sustained large writes to the stream. I'm sure there's some clever work-around to that problem if one can cheaply determine the process mailbox size. (I'm not too familiar with how to interact with the process mailbox.)

I'm a bit concerned that trying to solve this in spear is incorrect: doing any sort of GenServer.call/3 from the Spear.Connection would inherently block the connection. If the Spear.Connection process were to GenServer.cast/2 or Kernel.send/2 the message to some intermediary subscription GenServer which would then perform the GenServer.call/3, that intermediary GenServer would have the full mailbox instead of the subscription process. Maybe that's acceptable or some sort of improvement, but if a Spear user wants to implement subscriptions that way, the current emit-style of Spear.subscribe/4 supports building that yourself.

apparently checking the mailbox size is possible: https://stackoverflow.com/a/7868233/7232773

iex> Spear.subscribe(conn, self(), :all, ..
iex> :erlang.process_info(self(), :message_queue_len)
{:message_queue_len, 79}

it looks like this is handled in extreme in a similar fashion with the reading subscription https://github.com/exponentially/extreme/blob/850e0a619f05826a32e0981f57687c7204453c14/lib/extreme/reading_subscription.ex

it reads events in chunks and buffers them until it catches up and then subscribes