dashbitco/flow

Flow.partition does not route data to the right stage

veetase opened this issue · 8 comments

I have a data set like this: [["1", "2", "3"], ["2", "2", "2"], ["1", "1", "1"], ["3", "3", "3"]], after partition Flow.partition(key: fn e -> Enum.at(e, 1) end), I surpose ["1", "2", "3"] and ["2", "2", "2"] should be routed to the same stage, but it not. Here is the example:

    a = [["1", "2", "3"], ["2", "2", "2"], ["1", "1", "1"], ["3", "3", "3"]]
    list = Flow.from_enumerable(a)
    |> Flow.partition(key: fn e -> Enum.at(e, 1) end)
    |> Flow.reduce(fn -> %{} end, fn(arr, map) ->
      new_map = case map do
        %{} ->
          Map.put(map, "earlier", arr)
        earlier ->
          old_number = Map.get(earlier, "earlier")
          |> Enum.at(1)
          new_number = Enum.at(arr, 1)
          IO.inspect(old_number == new_number)
          Map.put(earlier, "later", arr)
      end
      IO.inspect(new_map)
      new_map
    end)
    |> Enum.to_list()

The truth is data routed into four stages, and the element have a distinct stage, and the strange thing is the final list is [{"earlier", ["3", "3", "3"]}, {"earlier", ["1", "1", "1"]}].

The behaviour you describe is the behaviour I am getting. Try this out:

   a = [["1", "2", "3"], ["2", "2", "2"], ["1", "1", "1"], ["3", "3", "3"]]
    list = Flow.from_enumerable(a)
    |> Flow.partition(key: fn e -> Enum.at(e, 1) end)
    |> Flow.reduce(fn -> %{} end, fn arr, map ->
      IO.inspect {arr, self()}
      map
    end)
    |> Enum.to_list()

You can see that the lists you mentioned are being executed in the same process. It is also worth mentioning that the result returned by key is still hashed. If those exactly elements as the partition, then you need to implement :hash and not key. I will make sure to clarify this in the docs.

Thanks, now I get it.

@josevalim, :hash is also a function receiving the key's result as parameter, right? I tried but it seems not like this.

They are exclusive. You either need to use :hash or :key

@josevalim, I want group them exactly by the second element of the array, so I did this:

    a = [["1", "2", "3"], ["2", "2", "2"], ["1", "1", "1"], ["3", "3", "3"]]
    list = Flow.from_enumerable(a)
    |> Flow.partition(hash: fn e -> {e, Enum.at(e, 1)} end)
    |> Flow.reduce(fn -> %{} end, fn arr, map ->
      IO.inspect {arr, self()}
      map
    end)
    |> Enum.to_list()

It puts nothing and returned an empty list. Did I miss something?

@veetase The partitions are by default named after integers: 0, 1, 2, 3, ... and you are returning strings. We should investigate if we can raise on those cases.

Thanks for answering, it seems have a limit by default, mine is 0,1,2,3.