dashbitco/flow

error logged, but works?

isaac-rstor opened this issue · 4 comments

The following code works (test passes) but an error is emitted to log.

defmodule FlowtestTest do
  use ExUnit.Case

  defmodule TestProducer do
    use GenStage

    # stage emits the string "one" continuously.

    def start_link(), do: GenStage.start_link(__MODULE__, :ok)
    def init(:ok), do: {:producer, "one"}
    def handle_demand(demand, state) do
      supply = fn -> state end
      |> Stream.repeatedly
      |> Enum.take(demand)

      {:noreply, supply, state}
    end
  end

  defmodule TestProsumer do
    use GenStage

   #stage takes strings and turns them into atoms.

    def start_link(), do: GenStage.start_link(__MODULE__, :ok)
    def init(:ok), do: {:producer_consumer, :ok}
    def handle_events(elist, _from, state) do
      {:noreply, Enum.map(elist, &String.to_atom/1), state}
    end
  end

  test "test" do
    {:ok, producer} = TestProducer.start_link()
    {:ok, prosumer} = TestProsumer.start_link()
    GenStage.sync_subscribe(prosumer, to: producer, partition: 1)

    assert [:one] == Flow.from_stages([prosumer])
    |> Enum.take(1)
  end

  test "test2" do
    {:ok, producer} = TestProducer.start_link()
    {:ok, prosumer} = TestProsumer.start_link()
    GenStage.sync_subscribe(prosumer, to: producer, partition: 1)

    assert [:one] == Flow.from_stage(prosumer)
    |> Enum.take(1)
  end
end

these error are emitted (omitting pretty green success dots):

13:49:19.513 [error] Demand mode can only be set for producers, GenStage #PID<0.190.0> is a producer_consumer
13:49:19.516 [error] Demand mode can only be set for producers, GenStage #PID<0.196.0> is a producer_consumer

documentation says:

"producers are already running stages that have type :producer or :producer_consumer"

so the error shouldn't be emitted, correct? Can submit a pr on this if my supposition is true.

We will fix this once we allow the demand to be set to accumulate on producer_consumers too, which is the last feature planned in the v1.0 roadmap.

ok =D it turns out flow is a great tool to make unit tests on genstages easy to read, even if in prod your genstages aren't connected by flows.

@isaac-rstor in this case, you can use GenStage.stream, it will provide the same feature set as flow (i.e. allow you to easily consume it). I will add a note for now that from_stage has to be producer. Thanks!

Thanks for the tip! I'd missed GenStage.stream !!