dashbitco/flow

Passing a stream to Flow.from_enumerable is not behaving as expected

frekw opened this issue · 4 comments

frekw commented

Hi!

This might be me misunderstanding how this is supposed to work, but given the following quote from the docs:

The difference between max_demand and min_demand works as the batch size when the producer is full. If the producer has fewer events than requested by consumers, it usually sends the remaining events available.

My intuitive understanding was that the following example would start to process the events as soon as they're available. What I'm seeing is that Flow.from_enumerable/2 seems to buffer until it has max_demand events available to send downstream until processing starts. Which would mean that if events are coming in slowly, it might take a very long time for the pipeline to start executing.

Setting max_demand: 1 starts the downstream phases immediately, but if I understand the docs correctly this change shouldn't be necessary since demand as well as data is available for processing. Is this a bug or am I missing something important?

A full example can be found here: https://github.com/frekw/flow-example

defmodule Example.Queue do
  def start_link(limit) do
    BlockingQueue.start_link(limit, name: __MODULE__)
  end

  def push(x) do
    BlockingQueue.push(__MODULE__, x)
  end

  def to_stream() do
    BlockingQueue.pop_stream(__MODULE__)
  end
end

defmodule Example.Producer do
  require Logger
  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, 0, opts)
  end

  def init(state) do
    loop()

    {:ok, state}
  end

  defp loop do
    Process.send_after(self(), :loop, 10)
  end

  def handle_info(:loop, state) do
    Logger.info("pushing: #{state}")
    Example.Queue.push("message-#{state}")

    loop()

    {:noreply, state + 1}
  end
end

defmodule Example.Pipeline do
  require Logger
  def start_link() do
    Logger.info("Starting pipeline")

    Example.Queue.to_stream()

    # This seems to work as I would expect.
    # |> Flow.from_enumerable(max_demand: 1)

    # This wait for an initial 1000 messages before
    # Flow.each starts running.
    |> Flow.from_enumerable(min_demand: 1)
    |> Flow.each(fn x -> Logger.info("handled: #{x}") end)
    |> Flow.start_link()
  end
end

defmodule Example do
  use Application
  def start(_type, _args) do
    import Supervisor.Spec
    children = [
      worker(Example.Queue, [:infinity]),
      worker(Example.Producer, []),
      worker(Example.Pipeline, []),
    ]

    Supervisor.start_link(children, strategy: :rest_for_one)
  end
end

I will clarify the docs. The issue is that with streams, it is impossible to know if an item is taking long to be computed or it is because the next item is not available. That's precisely why we created GenStage. If you have a stateful enumerable, then it simply won't play well with GenStage. The advice is to write it using GenStage instead.

frekw commented

Ok, I'll try implementing it as a GenStage producer instead. Thank you for your help!

Does this go for [producer_stage1, producer_stage2] |> GenStage.stream |> Flow.from_enumerable, too, where producer_stage1 and producer_stage2 are both GenStage producers?

@lmarlow use Flow.from_stages in that case. :) GenStage.stream also uses in the inbox, so it has the same issue once inside Flow.from_enumerable.