dashbitco/flow

Flow.map_batch/2 fails with CaseClauseError

jeroenvisser101 opened this issue · 5 comments

# test.exs
Flow.from_enumerable(1..1000)
|> Flow.map_batch(fn batch -> [Enum.sum(batch)] end)
|> Flow.map(&IO.puts("Sum: #{&1}"))
|> Flow.run()
Click to see stacktrace
** (exit) exited in: Enumerable.Flow.reduce(%Flow{operations: [{:on_trigger, #Function<20.17204979/3 in Flow.inject_on_trigger/4>}, {:mapper, :map, [#Function<1.27327935 in file:test.exs>]}, {:batch, #Function<0.27327935 in file:test.exs>}], options: [stages: 32], producers: {:enumerables, [1..1000]}, window: %Flow.Window.Global{periodically: [], trigger: nil}}, {:cont, []}, #Function<146.29191728/2 in Enum.reverse/1>)
    ** (EXIT) an exception was raised:
        ** (CaseClauseError) no case clause matching: {[], [{:batch, #Function<0.27327935 in file:test.exs>}, {:mapper, :map, [#Function<1.27327935 in file:test.exs>]}, {:on_trigger, #Function<20.17204979/3 in Flow.inject_on_trigger/4>}]}
            (flow 1.0.0) lib/flow/materialize.ex:649: Flow.Materialize.build_trigger/1
            (flow 1.0.0) lib/flow/materialize.ex:611: Flow.Materialize.reducer_ops/1
            (flow 1.0.0) lib/flow/materialize.ex:45: Flow.Materialize.split_operations/1
            (flow 1.0.0) lib/flow/materialize.ex:17: Flow.Materialize.materialize/5
            (flow 1.0.0) lib/flow/coordinator.ex:34: Flow.Coordinator.init/1
            (stdlib 3.12.1) gen_server.erl:374: :gen_server.init_it/2
            (stdlib 3.12.1) gen_server.erl:342: :gen_server.init_it/6
            (stdlib 3.12.1) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
    (flow 1.0.0) lib/flow.ex:1995: Enumerable.Flow.reduce/3
    (elixir 1.10.3) lib/enum.ex:3383: Enum.reverse/1
    (elixir 1.10.3) lib/enum.ex:2982: Enum.to_list/1
    (flow 1.0.0) lib/flow.ex:992: Flow.run/1
    (elixir 1.10.3) lib/code.ex:926: Code.require_file/2
    (mix 1.10.3) lib/mix/tasks/run.ex:145: Mix.Tasks.Run.run/5
    (mix 1.10.3) lib/mix/tasks/run.ex:85: Mix.Tasks.Run.run/1
    (mix 1.10.3) lib/mix/task.ex:330: Mix.Task.run_task/3
    (mix 1.10.3) lib/mix/cli.ex:82: Mix.CLI.run_task/2
    (elixir 1.10.3) lib/code.ex:926: Code.require_file/2

{batchers, operations} = Enum.split_while(operations, &match?({:batch, _}, &1))

This line would never split right? Since the batch operation would be at the end?

Could this work maybe?

{operations, batchers} = Enum.split_while(operations, &(not match?({:batch, _}, &1)))

Both my test script and the test suite seem to agree. Should I also add a test case for this example?

Good catch! I have pushed some tests too. :)

Nice! Thanks :)