dashbitco/flow

Using a Flow with a GenStage producer causes errors

rschmukler opened this issue · 1 comments

When using a Flow as the enumerable for the events as a GenStage producer, the process can receive consumer messages causing it to error. Interestingly enough, most of the messages to get correctly routed to the consumer but there appears to be some issue with the producer also receiving them. It appears to be some sort of race case as removing the Process.sleep causes it to work fine for me on my machine. Additionally, the error seems to only happen at the beginning and then the events will run fine afterwards.

Here is some example code that can recreate the issue:

defmodule FlowTest do
  @moduledoc """
  Documentation for FlowTest.
  """

  def run do
    {:ok, producer} = Producer.start_link()
    {:ok, consumer} = Consumer.start_link()

    GenStage.sync_subscribe(consumer, to: producer, max_demand: 200, min_demand: 100)
  end
end

defmodule Producer do
  use GenStage

  #########################
  # Public API
  #########################
  
  def start_link() do
    GenStage.start_link(__MODULE__, :ok)
  end

  #########################
  # GenStage Callbacks
  #########################

  def init(_) do
    {:producer, %{cont: build_flow()}}
  end

  def handle_demand(demand, %{cont: cont} = state) do
    case cont.({:cont, {[], demand}}) do
      {:suspended, {list, 0}, cont} ->
        {:noreply, :lists.reverse(list), %{state | cont: cont}}

      {_finished, {list, _}} ->
        IO.puts "Done!"
        {:noreply, :lists.reverse(list), %{}}
    end
  end

  #########################
  # Private  Helper
  #########################
  defp build_flow() do
    flow =
      0..1_000_000
      |> Flow.from_enumerable(consumers: :permanent)
      |> Flow.map(fn val ->
        # Simulate a computation
        Process.sleep(10)
        val + 1
      end)

    # Largely borrowed from the GenStage.Streamer implementation
    &Enumerable.reduce(flow, &1, fn
      x, {acc, 1} ->
        {:suspend, {[x | acc], 0}}
      x, {acc, demand} ->
        {:cont, {[x | acc], demand - 1}}
    end)
  end

end

defmodule Consumer do
  use GenStage

  def start_link() do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(_) do
    {:consumer, :no_state}
  end

  def handle_events(events, _from, state) do
    IO.puts "Got #{length events} events"
    {:noreply, [], state}
  end
end

Here is some of the error messages that come through:

13:16:59.215 [error] GenStage producer Producer received $gen_consumer message: {:"$gen_consumer",
 {#PID<0.165.0>, {#Reference<0.0.1.90>, #Reference<0.0.1.93>}},
 [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, 1013,
  1014, 1015, 1016, 1017, 1018, 1019, 1020, 1021, 1022, 1023, 1024, 1025, 1026,
  1027, 1028, 1029, 1030, 1031, 1032, 1033, 1034, 1035, 1036, 1037, 1038, 1039,
  1040, 1041, 1042, 1043, 1044, 1045, 1046, 1047, 1048, ...]}


13:16:59.215 [error] GenStage producer Producer received $gen_consumer message: {:"$gen_consumer",
 {#PID<0.166.0>, {#Reference<0.0.1.90>, #Reference<0.0.1.94>}},
 [2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013,
  2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024, 2025, 2026,
  2027, 2028, 2029, 2030, 2031, 2032, 2033, 2034, 2035, 2036, 2037, 2038, 2039,
  2040, 2041, 2042, 2043, 2044, 2045, 2046, 2047, 2048, ...]}


13:16:59.215 [error] GenStage producer Producer received $gen_consumer message: {:"$gen_consumer",
 {#PID<0.167.0>, {#Reference<0.0.1.90>, #Reference<0.0.1.95>}},
 [3001, 3002, 3003, 3004, 3005, 3006, 3007, 3008, 3009, 3010, 3011, 3012, 3013,
  3014, 3015, 3016, 3017, 3018, 3019, 3020, 3021, 3022, 3023, 3024, 3025, 3026,
  3027, 3028, 3029, 3030, 3031, 3032, 3033, 3034, 3035, 3036, 3037, 3038, 3039,
  3040, 3041, 3042, 3043, 3044, 3045, 3046, 3047, 3048, ...]}


13:16:59.216 [error] GenStage producer Producer received $gen_consumer message: {:"$gen_consumer",
 {#PID<0.168.0>, {#Reference<0.0.1.90>, #Reference<0.0.1.96>}},
 [4001, 4002, 4003, 4004, 4005, 4006, 4007, 4008, 4009, 4010, 4011, 4012, 4013,
  4014, 4015, 4016, 4017, 4018, 4019, 4020, 4021, 4022, 4023, 4024, 4025, 4026,
  4027, 4028, 4029, 4030, 4031, 4032, 4033, 4034, 4035, 4036, 4037, 4038, 4039,
  4040, 4041, 4042, 4043, 4044, 4045, 4046, 4047, 4048, ...]}


13:16:59.216 [error] GenStage producer Producer received $gen_consumer message: {:"$gen_consumer",
 {#PID<0.169.0>, {#Reference<0.0.1.90>, #Reference<0.0.1.97>}},
 [5001, 5002, 5003, 5004, 5005, 5006, 5007, 5008, 5009, 5010, 5011, 5012, 5013,
  5014, 5015, 5016, 5017, 5018, 5019, 5020, 5021, 5022, 5023, 5024, 5025, 5026,
  5027, 5028, 5029, 5030, 5031, 5032, 5033, 5034, 5035, 5036, 5037, 5038, 5039,
  5040, 5041, 5042, 5043, 5044, 5045, 5046, 5047, 5048, ...]}


13:16:59.216 [error] GenStage producer Producer received $gen_consumer message: {:"$gen_consumer",
 {#PID<0.170.0>, {#Reference<0.0.1.90>, #Reference<0.0.1.98>}},
 [6001, 6002, 6003, 6004, 6005, 6006, 6007, 6008, 6009, 6010, 6011, 6012, 6013,
  6014, 6015, 6016, 6017, 6018, 6019, 6020, 6021, 6022, 6023, 6024, 6025, 6026,
  6027, 6028, 6029, 6030, 6031, 6032, 6033, 6034, 6035, 6036, 6037, 6038, 6039,
  6040, 6041, 6042, 6043, 6044, 6045, 6046, 6047, 6048, ...]}


13:16:59.216 [error] GenStage producer Producer received $gen_consumer message: {:"$gen_consumer",
 {#PID<0.171.0>, {#Reference<0.0.1.90>, #Reference<0.0.1.99>}},
 [7001, 7002, 7003, 7004, 7005, 7006, 7007, 7008, 7009, 7010, 7011, 7012, 7013,
  7014, 7015, 7016, 7017, 7018, 7019, 7020, 7021, 7022, 7023, 7024, 7025, 7026,
  7027, 7028, 7029, 7030, 7031, 7032, 7033, 7034, 7035, 7036, 7037, 7038, 7039,
  7040, 7041, 7042, 7043, 7044, 7045, 7046, 7047, 7048, ...]}

If you check out GenStage.stream, which a Flow as an Enumerable is, you will see that it hijacks the current process inbox. So you have the GenStage producer and the flow racing for the same messages in the process inbox. When you halt the flow, the messages it is not able to consume as a consumer remain in the box which are then consumed by the Flow. The approach of what you are trying to do is not going to work. Why don't you connect the consumer directly to the Flow?