Error in flow source not stoping parent process in a timely manner
juanperi opened this issue · 8 comments
The following is a super simplified version of some code that deals with external apis.
my problem is that as you can see, the function inside the stream is raising an error. But, sometimes the inspects at the bottom are reached, before the process is killed.
defmodule Test do
def run do
result =
(fn (page) ->
IO.inspect page, label: "page"
raise "ups"
end)
|> create_stream
|> Flow.from_enumerable()
|> Flow.map(fn(_value) -> %{} end)
|> Enum.to_list
IO.inspect "It should not reach here, but sometimes does"
IO.inspect result, label: "result"
end
def create_stream(api_func) do
Stream.resource(fn -> 1 end, &api_func.(&1), fn _ -> :ok end)
end
end
Test.run
When executing, sometimes I get:
$ mix run test.exs
page: 1
"It should not reach here, but sometimes does"
result: []
[error] GenServer #PID<0.502.0> terminating
** (RuntimeError) ups
test.exs:6: anonymous fn/1 in Test.run/0
(elixir) lib/stream.ex:1285: Stream.do_resource/5
(gen_stage) lib/gen_stage/streamer.ex:18: GenStage.Streamer.handle_demand/2
(gen_stage) lib/gen_stage.ex:2170: GenStage.noreply_callback/3
(gen_stage) lib/gen_stage.ex:2209: GenStage."-producer_demand/2-lists^foldl/2-0-"/3
(stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:686: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_cast", {:"$demand", :forward}}
State: #Function<0.55142349/1 in GenStage.Streamer.init/1>
if any of the work done by flow raised, I would expect the whole thing to fail in the Enum.to_list call, not afterwards.
Is there anything I can do stop the processing when running Enum.to_list
? Shouldn't the process die when calling Enum.to_list
I originally posted it in ElixirForum HERE, but i'm more and more convinced that it's a bug somewhere
an even simpler version of the failing code:
defmodule Test do
def run do
:start
|> Stream.iterate(fn _ -> raise RuntimeError end)
|> Flow.from_enumerable(stages: 1, max_demand: 1)
|> Flow.run
|> IO.inspect(label: "IT SHOULD NOT REACH HERE. PIPE RESULT:")
end
end
Test.run
It sometimes prints the "IT SHOULD NOT...." message, and sometimes it does not
After a bit of digging around it seems that unexpected behavior happens when gen_stage stream receives :DOWN
with :shutdown
reason. If we remove Flow.emit(:nothing)
by substituting Flow.run
with Enum.to_list
, then :DOWN
reason is a runtime error and it correctly exits.
even using Enum.to_list
, the problem is reproducible if we add a mapping step:
defmodule Test do
def run do
:start
|> Stream.iterate(fn _ -> raise RuntimeError end)
|> Flow.from_enumerable(stages: 1, max_demand: 1)
|> Flow.map(fn _ -> :ok end)
|> Enum.to_list
|> IO.inspect(label: "IT SHOULD NOT REACH HERE. PIPE RESULT:")
end
end
The error happens because each stage is in a supervisor. Then we have a race condition between:
- the supervisor reacting to a child that terminated abruptly
- the stages detecting that all of its parents terminated
There are a couple ways to fix this but the easiest is to probably go to every place in flow where we are expecting a DOWN message and crash if the DOWN message is not a normal, shutdown or {shutdown, _}.
to be honest, I couldn't fix it replacing the DOWN
if i'm not mistaken (which likely I am :P ) when setting up the supervisor inside Coordinator, we are monitoring the pids of the consumers, not the pids of the producers (which in this case is the one raising). could that be the reason?
and another thing that calls my attention is that start supervisor has:
children = [Supervisor.Spec.worker(GenStage, [], restart: :transient)]
Supervisor.start_link(children, strategy: :simple_one_for_one, max_restarts: 0)
why is the restart transient with 0 max restarts? why not restart: :temporary
@josevalim, DOWN messages are expected only in coordinator and for our test case the message is :shutdown
@tag :capture_log
test "re-raises errors" do
assert catch_exit(
:start
|> Stream.iterate(fn _ -> raise "oops" end)
|> Flow.from_enumerable(stages: 1, max_demand: 1)
|> Flow.run
)
end
Changing supervisor restart to :temporary
or :max_restarts
to default makes all test pass. But we have hard time explaining this.
Also, I tried monitoring producers in addition to intermediaries and crashing on DOWN message from them (which we do receive), but it would only register in logs and not actually be an exit. Not sure why.
@epilgrim @kagux you would also have to change the down inside flow/map_reducer.
The error happens because each stage is in a supervisor. Then we have a race condition between:
- the supervisor reacting to a child that terminated abruptly
- the stages detecting that all of its parents terminated
Here is what is happening when the test fails:
- the producer dies
- the supervisor notices the producer died and kills the producer-consumer with reason shutdown
- the client (the one who called Flow.run) sees the producer-consumer finishing with shutdown and thinks everything went OK
When you add trap_exit, you make it so that producer-consumer no longer exits with reason shutdown. Instead, it will notice the producer died and exit with the same reason as the producer, leading it to cascade.
When you change max_restarts or restart, you are changing the supervisor to no longer send the shutdown signal, which ends up with something similar.
With all of that said, I think trapping exit is a good way to go about this. Let's give the stages sometime to figure out the failures and then we act accordingly.