dashbitco/flow

Error in flow source not stoping parent process in a timely manner

juanperi opened this issue · 8 comments

The following is a super simplified version of some code that deals with external apis.
my problem is that as you can see, the function inside the stream is raising an error. But, sometimes the inspects at the bottom are reached, before the process is killed.

defmodule Test do
  def run do
    result =
      (fn (page) ->
        IO.inspect page, label: "page"
        raise "ups"
      end)
      |> create_stream
      |> Flow.from_enumerable()
      |> Flow.map(fn(_value) -> %{} end)
      |> Enum.to_list
    IO.inspect "It should not reach here, but sometimes does"
    IO.inspect result, label: "result"
  end

  def create_stream(api_func) do
    Stream.resource(fn -> 1 end, &api_func.(&1), fn _ -> :ok end)
  end
end

Test.run

When executing, sometimes I get:

$ mix run test.exs
page: 1
"It should not reach here, but sometimes does"
result: []
[error] GenServer #PID<0.502.0> terminating
** (RuntimeError) ups
    test.exs:6: anonymous fn/1 in Test.run/0
    (elixir) lib/stream.ex:1285: Stream.do_resource/5
    (gen_stage) lib/gen_stage/streamer.ex:18: GenStage.Streamer.handle_demand/2
    (gen_stage) lib/gen_stage.ex:2170: GenStage.noreply_callback/3
    (gen_stage) lib/gen_stage.ex:2209: GenStage."-producer_demand/2-lists^foldl/2-0-"/3
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_cast", {:"$demand", :forward}}
State: #Function<0.55142349/1 in GenStage.Streamer.init/1>

if any of the work done by flow raised, I would expect the whole thing to fail in the Enum.to_list call, not afterwards.
Is there anything I can do stop the processing when running Enum.to_list? Shouldn't the process die when calling Enum.to_list

I originally posted it in ElixirForum HERE, but i'm more and more convinced that it's a bug somewhere

an even simpler version of the failing code:

defmodule Test do
  def run do
    :start
    |> Stream.iterate(fn _ -> raise RuntimeError end)
    |> Flow.from_enumerable(stages: 1, max_demand: 1)
    |> Flow.run
    |> IO.inspect(label: "IT SHOULD NOT REACH HERE. PIPE RESULT:")
  end
end

Test.run

It sometimes prints the "IT SHOULD NOT...." message, and sometimes it does not

kagux commented

After a bit of digging around it seems that unexpected behavior happens when gen_stage stream receives :DOWN with :shutdown reason. If we remove Flow.emit(:nothing) by substituting Flow.run with Enum.to_list, then :DOWN reason is a runtime error and it correctly exits.

even using Enum.to_list, the problem is reproducible if we add a mapping step:

defmodule Test do
  def run do
    :start
    |> Stream.iterate(fn _ -> raise RuntimeError end)
    |> Flow.from_enumerable(stages: 1, max_demand: 1)
    |> Flow.map(fn _ -> :ok end)
    |> Enum.to_list
    |> IO.inspect(label: "IT SHOULD NOT REACH HERE. PIPE RESULT:")
  end
end

The error happens because each stage is in a supervisor. Then we have a race condition between:

  1. the supervisor reacting to a child that terminated abruptly
  2. the stages detecting that all of its parents terminated

There are a couple ways to fix this but the easiest is to probably go to every place in flow where we are expecting a DOWN message and crash if the DOWN message is not a normal, shutdown or {shutdown, _}.

to be honest, I couldn't fix it replacing the DOWN

if i'm not mistaken (which likely I am :P ) when setting up the supervisor inside Coordinator, we are monitoring the pids of the consumers, not the pids of the producers (which in this case is the one raising). could that be the reason?

and another thing that calls my attention is that start supervisor has:

    children = [Supervisor.Spec.worker(GenStage, [], restart: :transient)]
    Supervisor.start_link(children, strategy: :simple_one_for_one, max_restarts: 0)

why is the restart transient with 0 max restarts? why not restart: :temporary

kagux commented

@josevalim, DOWN messages are expected only in coordinator and for our test case the message is :shutdown

    @tag :capture_log
    test "re-raises errors" do
      assert catch_exit(
        :start
        |> Stream.iterate(fn _ -> raise "oops" end)
        |> Flow.from_enumerable(stages: 1, max_demand: 1)
        |> Flow.run
      )
    end

Changing supervisor restart to :temporary or :max_restarts to default makes all test pass. But we have hard time explaining this.
Also, I tried monitoring producers in addition to intermediaries and crashing on DOWN message from them (which we do receive), but it would only register in logs and not actually be an exit. Not sure why.

@epilgrim @kagux you would also have to change the down inside flow/map_reducer.

The error happens because each stage is in a supervisor. Then we have a race condition between:

  1. the supervisor reacting to a child that terminated abruptly
  2. the stages detecting that all of its parents terminated

Here is what is happening when the test fails:

  1. the producer dies
  2. the supervisor notices the producer died and kills the producer-consumer with reason shutdown
  3. the client (the one who called Flow.run) sees the producer-consumer finishing with shutdown and thinks everything went OK

When you add trap_exit, you make it so that producer-consumer no longer exits with reason shutdown. Instead, it will notice the producer died and exit with the same reason as the producer, leading it to cascade.

When you change max_restarts or restart, you are changing the supervisor to no longer send the shutdown signal, which ends up with something similar.

With all of that said, I think trapping exit is a good way to go about this. Let's give the stages sometime to figure out the failures and then we act accordingly.