error logged, but works?
isaac-rstor opened this issue · 4 comments
The following code works (test passes) but an error is emitted to log.
defmodule FlowtestTest do
use ExUnit.Case
defmodule TestProducer do
use GenStage
# stage emits the string "one" continuously.
def start_link(), do: GenStage.start_link(__MODULE__, :ok)
def init(:ok), do: {:producer, "one"}
def handle_demand(demand, state) do
supply = fn -> state end
|> Stream.repeatedly
|> Enum.take(demand)
{:noreply, supply, state}
end
end
defmodule TestProsumer do
use GenStage
#stage takes strings and turns them into atoms.
def start_link(), do: GenStage.start_link(__MODULE__, :ok)
def init(:ok), do: {:producer_consumer, :ok}
def handle_events(elist, _from, state) do
{:noreply, Enum.map(elist, &String.to_atom/1), state}
end
end
test "test" do
{:ok, producer} = TestProducer.start_link()
{:ok, prosumer} = TestProsumer.start_link()
GenStage.sync_subscribe(prosumer, to: producer, partition: 1)
assert [:one] == Flow.from_stages([prosumer])
|> Enum.take(1)
end
test "test2" do
{:ok, producer} = TestProducer.start_link()
{:ok, prosumer} = TestProsumer.start_link()
GenStage.sync_subscribe(prosumer, to: producer, partition: 1)
assert [:one] == Flow.from_stage(prosumer)
|> Enum.take(1)
end
end
these error are emitted (omitting pretty green success dots):
13:49:19.513 [error] Demand mode can only be set for producers, GenStage #PID<0.190.0> is a producer_consumer
13:49:19.516 [error] Demand mode can only be set for producers, GenStage #PID<0.196.0> is a producer_consumer
documentation says:
"producers are already running stages that have type :producer or :producer_consumer"
so the error shouldn't be emitted, correct? Can submit a pr on this if my supposition is true.
We will fix this once we allow the demand to be set to accumulate on producer_consumers too, which is the last feature planned in the v1.0 roadmap.
ok =D it turns out flow is a great tool to make unit tests on genstages easy to read, even if in prod your genstages aren't connected by flows.
@isaac-rstor in this case, you can use GenStage.stream
, it will provide the same feature set as flow (i.e. allow you to easily consume it). I will add a note for now that from_stage
has to be producer. Thanks!
Thanks for the tip! I'd missed GenStage.stream
!!