Not able to run map stages after reduce
lackac opened this issue · 5 comments
After upgrading to 0.14
a previously working setup of Flow map steps -> reduce, emit(:state)
-> further map steps no longer seems to work.
Example failing flow
1..50
|> Flow.from_enumerable()
|> Flow.map(& &1 * 2)
|> Flow.partition(stages: 1, window: Flow.Window.count(5))
|> Flow.reduce(fn -> 0 end, &+/2)
|> Flow.emit(:state)
|> Flow.filter(& &1 > 200)
|> Enum.to_list
Expected result
[230, 280, 330, 380, 430, 480]
Actual outcome
** (exit) exited in: Enumerable.Flow.reduce(%Flow{operations: [{:mapper, :filter, [#Function<6.127694169/1 in :erl_eval.expr/5>]}, {:on_trigger, #Function<5.47868220/3 in Flow.emit/2>}, {:reduce, #Function<20.127694169/0 in :erl_eval.expr/5>, &:erlang.+/2}], options: [stages: 1], producers: {:flows, [%Flow{operations: [{:mapper, :map, [#Function<6.127694169/1 in :erl_eval.expr/5>]}], options: [stages: 8], producers: {:enumerables, [1..50]}, window: %Flow.Window.Global{periodically: [], trigger: nil}}]}, window: %Flow.Window.Count{count: 5, periodically: [], trigger: nil}}, {:cont, []}, #Function<131.83463370/2 in Enum.reverse/1>)
** (EXIT) an exception was raised:
** (CaseClauseError) no case clause matching: {[], [{:on_trigger, #Function<5.47868220/3 in Flow.emit/2>}, {:mapper, :filter, [#Function<6.127694169/1 in :erl_eval.expr/5>]}]}
(flow) lib/flow/materialize.ex:600: Flow.Materialize.build_trigger/1
(flow) lib/flow/materialize.ex:559: Flow.Materialize.reducer_ops/1
(flow) lib/flow/materialize.ex:52: Flow.Materialize.split_operations/3
(flow) lib/flow/materialize.ex:17: Flow.Materialize.materialize/4
(flow) lib/flow/coordinator.ex:25: Flow.Coordinator.init/1
(stdlib) gen_server.erl:374: :gen_server.init_it/2
(stdlib) gen_server.erl:342: :gen_server.init_it/6
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
(flow) lib/flow.ex:1653: Enumerable.Flow.reduce/3
(elixir) lib/enum.ex:1911: Enum.reverse/1
(elixir) lib/enum.ex:2588: Enum.to_list/1
iex(8)> 19:13:46.115 [error] GenServer #PID<0.585.0> terminating
** (CaseClauseError) no case clause matching: {[], [{:on_trigger, #Function<5.47868220/3 in Flow.emit/2>}, {:mapper, :filter, [#Function<6.127694169/1 in :erl_eval.expr/5>]}]}
(flow) lib/flow/materialize.ex:600: Flow.Materialize.build_trigger/1
(flow) lib/flow/materialize.ex:559: Flow.Materialize.reducer_ops/1
(flow) lib/flow/materialize.ex:52: Flow.Materialize.split_operations/3
(flow) lib/flow/materialize.ex:17: Flow.Materialize.materialize/4
(flow) lib/flow/coordinator.ex:25: Flow.Coordinator.init/1
(stdlib) gen_server.erl:374: :gen_server.init_it/2
(stdlib) gen_server.erl:342: :gen_server.init_it/6
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {:EXIT, #PID<0.584.0>, {{:case_clause, {[], [{:on_trigger, #Function<5.47868220/3 in Flow.emit/2>}, {:mapper, :filter, [#Function<6.127694169/1 in :erl_eval.expr/5>]}]}}, [{Flow.Materialize, :build_trigger, 1, [file: 'lib/flow/materialize.ex', line: 600]}, {Flow.Materialize, :reducer_ops, 1, [file: 'lib/flow/materialize.ex', line: 559]}, {Flow.Materialize, :split_operations, 3, [file: 'lib/flow/materialize.ex', line: 52]}, {Flow.Materialize, :materialize, 4, [file: 'lib/flow/materialize.ex', line: 17]}, {Flow.Coordinator, :init, 1, [file: 'lib/flow/coordinator.ex', line: 25]}, {:gen_server, :init_it, 2, [file: 'gen_server.erl', line: 374]}, {:gen_server, :init_it, 6, [file: 'gen_server.erl', line: 342]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}]}}
This example is limited for the purpose of demonstrating the issue. I understand that in this case a simple Enum.filter/2
would do the job. The idea is that after reduce it should be possible to carry on with the flow and run further map and reduce steps.
After some experimentation I've found that adding a Flow.partition
step right after the emit
makes it work again. Is that required in this case and was somehow implicitly performed in earlier versions? If that's the case it might be useful to document this somewhere.
In this case you can explicitly filter on trigger:
Flow.on_trigger(fn events ->
{Enum.filter(events, ...), events}
})
This is what the previous code would do. I will investigate this. We need to either support this or raise a decent error message.
Thanks for the feedback! I appreciate the control on_trigger
provides over the way the reduced state is handled. However, what I like about Flow the most is the magic of it. ;)
If you're planning to add back this behaviour I'll just add a Flow.partition()
call in the meantime and revisit this after it's solved.