dashbitco/flow

noproc errors - connecting finite GenStage to Flow.partition

sunaku opened this issue · 2 comments

Hello,

I'm using GenStage 0.14.2 and Flow master (at 1ffac6a) under Elixir 1.9.0 and Erlang/OTP 22, where I'm encountering :noproc errors when I connect a short-lived GenStage producer to a Flow and then immediately partition that flow. Below is a minimal example for reproduction (see related issue #88).

In my actual use case, I'm connecting a large (but finite) GenStage to a Flow partition with 32 stages.

Thanks for your consideration.

Reproduction steps

  1. Sanity check: everything Just Works when it's Flow-only and GenStage isn't involved. ✔️
  2. GenStage has 3 items, from_stages() has 1 stage, and partition() has 1 stage. ✔️
  3. GenStage has 3 items, from_stages() has 1 stage, and partition() has 2 stages. ✔️
  4. GenStage has 3 items, from_stages() has 1 stage, and partition() has 3 stages. 💥
Erlang/OTP 22 [erts-10.4] [source] [64-bit] [smp:32:32] [ds:32:32:10] [async-threads:1] [hipe]

Interactive Elixir (1.9.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Flow.from_enumerable(1..3) |> Flow.partition(stages: 3) |> Flow.map(fn i -> ms = :timer.seconds(i); IO.inspect("#{inspect(self)}: sleep #{i}"); Process.sleep(ms); ms end) |> Enum.to_list()
"#PID<0.213.0>: sleep 2"
"#PID<0.203.0>: sleep 1"
"#PID<0.196.0>: sleep 3"
[1000, 2000, 3000]
iex(2)> {:ok, producer} = GenStage.from_enumerable(1..3, link: false); Flow.from_stages([producer], stages: 1) |> Flow.partition(stages: 1) |> Flow.map(fn i -> ms = :timer.seconds(i); IO.inspect("#{inspect(self)}: sleep #{i}"); Process.sleep(ms); ms end) |> Enum.to_list()
"#PID<0.231.0>: sleep 1"
"#PID<0.231.0>: sleep 2"
"#PID<0.231.0>: sleep 3"
[1000, 2000, 3000]
iex(3)> {:ok, producer} = GenStage.from_enumerable(1..3, link: false); Flow.from_stages([producer], stages: 1) |> Flow.partition(stages: 2) |> Flow.map(fn i -> ms = :timer.seconds(i); IO.inspect("#{inspect(self)}: sleep #{i}"); Process.sleep(ms); ms end) |> Enum.to_list()
"#PID<0.238.0>: sleep 1"
"#PID<0.239.0>: sleep 3"
"#PID<0.238.0>: sleep 2"
[3000, 1000, 2000]
iex(4)> {:ok, producer} = GenStage.from_enumerable(1..3, link: false); Flow.from_stages([producer], stages: 1) |> Flow.partition(stages: 3) |> Flow.map(fn i -> ms = :timer.seconds(i); IO.inspect("#{inspect(self)}: sleep #{i}"); Process.sleep(ms); ms end) |> Enum.to_list()
"#PID<0.246.0>: sleep 2"
"#PID<0.247.0>: sleep 1"

16:29:54.396 pid=<0.248.0> [info]  GenStage consumer #PID<0.248.0> is stopping after receiving cancel from producer #PID<0.245.0> with reason: :noproc


16:29:54.444 pid=<0.248.0> [error] GenServer #PID<0.248.0> terminating
** (stop) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
Last message: {:DOWN, #Reference<0.1394119603.2769551375.77567>, :process, #PID<0.245.0>, :noproc}
State: {%{}, %{done?: true, producers: %{}, trigger: #Function<2.127884580/3 in Flow.Window.Global.materialize/5>}, {2, 3}, [], #Function<32.81753312/4 in Flow.Materialize.mapper_ops/1>}
"#PID<0.247.0>: sleep 3"
** (exit) exited in: GenStage.close_stream(%{#Reference<0.1394119603.2769551372.77002> => {:subscribed, #PID<0.246.0>, :transient, 500, 1000, 1000}, #Reference<0.1394119603.2769551372.77003> => {:subscribed, #PID<0.247.0>, :transient, 500, 1000, 1000}})
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (gen_stage) lib/gen_stage/stream.ex:160: GenStage.Stream.close_stream/1
    (elixir) lib/stream.ex:1400: Stream.do_resource/5
    (elixir) lib/enum.ex:3023: Enum.reverse/1
    (elixir) lib/enum.ex:2668: Enum.to_list/1

Environment details

$ uname -a
Linux myhost 4.1.15.pnotify #18 SMP Thu May 18 15:50:05 PDT 2017 x86_64 GNU/Linux

$ elixir -v
Erlang/OTP 22 [erts-10.4] [source] [64-bit] [smp:12:12] [ds:12:12:10] [async-threads:1] [hipe]

Elixir 1.9.0 (compiled with Erlang/OTP 22)

$ cat mix.lock
%{
  "file_system": {:hex, :file_system, "0.2.7", "e6f7f155970975789f26e77b8b8d8ab084c59844d8ecfaf58cbda31c494d14aa", [:mix], [], "hexpm"},
  "flow": {:git, "https://github.com/plataformatec/flow.git", "1ffac6a801602bf8b02192488e58ce5728b581aa", []},
  "gen_stage": {:hex, :gen_stage, "0.14.2", "6a2a578a510c5bfca8a45e6b27552f613b41cf584b58210f017088d3d17d0b14", [:mix], [], "hexpm"},
  "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
  "mix_test_watch": {:hex, :mix_test_watch, "0.9.0", "c72132a6071261893518fa08e121e911c9358713f62794a90c95db59042af375", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm"},
}

You should use GenStage.demand(producer, :accumulate) or already start the producer with it set to accumulate so it doesn't emit events until the whole Flow pipeline is subscribed. Otherwise I am worried about silently ignoring "noproc" as it may hide other failures. Thanks for the report!

Wonderful! 🎉 Setting that :accumulate flag on GenStage.demand solved the problem. 👍 Thanks.

iex(4)> {:ok, producer} = GenStage.from_enumerable(1..3, link: false); GenStage.demand(producer, :accumulate); Flow.from_stages([producer], stages: 1) |> Flow.partition(stages: 3) |> Flow.map(fn i -> ms = :timer.seconds(i); IO.inspect("#{inspect(self)}: sleep #{i}"); Process.sleep(ms); ms end) |> Enum.to_list()
"#PID<0.1107.0>: sleep 2"
"#PID<0.1108.0>: sleep 1"
"#PID<0.1108.0>: sleep 3"
[2000, 1000, 3000]