subscription backpressure (?)
Closed this issue · 2 comments
Currently Spear.subscribe/4
emits info messages to the subscriber process with Kernel.send/2
as fast as the messages are received. I don't have any test cases for it but a simple test in IEx shows messages piling up in the subscriber mailbox until they're flushed (i.e. with flush/0
from the IEx helpers) or otherwise handled. There is currently no backpressure a subscriber could exert to ensure that their mailbox doesn't get flooded.
A consumer could work around this by reading up until they are at the end of the stream and then subscribing, and an implementation involving a GenStage which uses Spear.read_stream/3
or Spear.stream!/3
along with t:Enumerable.continuation/0
s to catch up based on consumer demand and then subscribes with Spear.subscribe/4
could handle this backpressure pretty well. (See the GenStage.Streamer
implementation for inspiration.) This implementation would suffer from bursty appends to the EventStoreDB stream, though, and could end up with a large mailbox regardless with sustained large writes to the stream. I'm sure there's some clever work-around to that problem if one can cheaply determine the process mailbox size. (I'm not too familiar with how to interact with the process mailbox.)
I'm a bit concerned that trying to solve this in spear is incorrect: doing any sort of GenServer.call/3
from the Spear.Connection
would inherently block the connection. If the Spear.Connection
process were to GenServer.cast/2
or Kernel.send/2
the message to some intermediary subscription GenServer which would then perform the GenServer.call/3
, that intermediary GenServer would have the full mailbox instead of the subscription process. Maybe that's acceptable or some sort of improvement, but if a Spear user wants to implement subscriptions that way, the current emit-style of Spear.subscribe/4
supports building that yourself.
apparently checking the mailbox size is possible: https://stackoverflow.com/a/7868233/7232773
iex> Spear.subscribe(conn, self(), :all, ..
iex> :erlang.process_info(self(), :message_queue_len)
{:message_queue_len, 79}
it looks like this is handled in extreme in a similar fashion with the reading subscription https://github.com/exponentially/extreme/blob/850e0a619f05826a32e0981f57687c7204453c14/lib/extreme/reading_subscription.ex
it reads events in chunks and buffers them until it catches up and then subscribes