Passing a stream to Flow.from_enumerable is not behaving as expected
frekw opened this issue · 4 comments
Hi!
This might be me misunderstanding how this is supposed to work, but given the following quote from the docs:
The difference between max_demand and min_demand works as the batch size when the producer is full. If the producer has fewer events than requested by consumers, it usually sends the remaining events available.
My intuitive understanding was that the following example would start to process the events as soon as they're available. What I'm seeing is that Flow.from_enumerable/2
seems to buffer until it has max_demand
events available to send downstream until processing starts. Which would mean that if events are coming in slowly, it might take a very long time for the pipeline to start executing.
Setting max_demand: 1
starts the downstream phases immediately, but if I understand the docs correctly this change shouldn't be necessary since demand as well as data is available for processing. Is this a bug or am I missing something important?
A full example can be found here: https://github.com/frekw/flow-example
defmodule Example.Queue do
def start_link(limit) do
BlockingQueue.start_link(limit, name: __MODULE__)
end
def push(x) do
BlockingQueue.push(__MODULE__, x)
end
def to_stream() do
BlockingQueue.pop_stream(__MODULE__)
end
end
defmodule Example.Producer do
require Logger
use GenServer
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, 0, opts)
end
def init(state) do
loop()
{:ok, state}
end
defp loop do
Process.send_after(self(), :loop, 10)
end
def handle_info(:loop, state) do
Logger.info("pushing: #{state}")
Example.Queue.push("message-#{state}")
loop()
{:noreply, state + 1}
end
end
defmodule Example.Pipeline do
require Logger
def start_link() do
Logger.info("Starting pipeline")
Example.Queue.to_stream()
# This seems to work as I would expect.
# |> Flow.from_enumerable(max_demand: 1)
# This wait for an initial 1000 messages before
# Flow.each starts running.
|> Flow.from_enumerable(min_demand: 1)
|> Flow.each(fn x -> Logger.info("handled: #{x}") end)
|> Flow.start_link()
end
end
defmodule Example do
use Application
def start(_type, _args) do
import Supervisor.Spec
children = [
worker(Example.Queue, [:infinity]),
worker(Example.Producer, []),
worker(Example.Pipeline, []),
]
Supervisor.start_link(children, strategy: :rest_for_one)
end
end
I will clarify the docs. The issue is that with streams, it is impossible to know if an item is taking long to be computed or it is because the next item is not available. That's precisely why we created GenStage. If you have a stateful enumerable, then it simply won't play well with GenStage. The advice is to write it using GenStage instead.
Ok, I'll try implementing it as a GenStage producer instead. Thank you for your help!
Does this go for [producer_stage1, producer_stage2] |> GenStage.stream |> Flow.from_enumerable
, too, where producer_stage1
and producer_stage2
are both GenStage
producers?