Flow.partition does not route data to the right stage
veetase opened this issue · 8 comments
I have a data set like this: [["1", "2", "3"], ["2", "2", "2"], ["1", "1", "1"], ["3", "3", "3"]]
, after partition Flow.partition(key: fn e -> Enum.at(e, 1) end)
, I surpose ["1", "2", "3"] and ["2", "2", "2"] should be routed to the same stage, but it not. Here is the example:
a = [["1", "2", "3"], ["2", "2", "2"], ["1", "1", "1"], ["3", "3", "3"]]
list = Flow.from_enumerable(a)
|> Flow.partition(key: fn e -> Enum.at(e, 1) end)
|> Flow.reduce(fn -> %{} end, fn(arr, map) ->
new_map = case map do
%{} ->
Map.put(map, "earlier", arr)
earlier ->
old_number = Map.get(earlier, "earlier")
|> Enum.at(1)
new_number = Enum.at(arr, 1)
IO.inspect(old_number == new_number)
Map.put(earlier, "later", arr)
end
IO.inspect(new_map)
new_map
end)
|> Enum.to_list()
The truth is data routed into four stages, and the element have a distinct stage, and the strange thing is the final list is [{"earlier", ["3", "3", "3"]}, {"earlier", ["1", "1", "1"]}]
.
The behaviour you describe is the behaviour I am getting. Try this out:
a = [["1", "2", "3"], ["2", "2", "2"], ["1", "1", "1"], ["3", "3", "3"]]
list = Flow.from_enumerable(a)
|> Flow.partition(key: fn e -> Enum.at(e, 1) end)
|> Flow.reduce(fn -> %{} end, fn arr, map ->
IO.inspect {arr, self()}
map
end)
|> Enum.to_list()
You can see that the lists you mentioned are being executed in the same process. It is also worth mentioning that the result returned by key is still hashed. If those exactly elements as the partition, then you need to implement :hash
and not key. I will make sure to clarify this in the docs.
Thanks, now I get it.
@josevalim, :hash is also a function receiving the key's result as parameter, right? I tried but it seems not like this.
They are exclusive. You either need to use :hash or :key
@josevalim, I want group them exactly by the second element of the array, so I did this:
a = [["1", "2", "3"], ["2", "2", "2"], ["1", "1", "1"], ["3", "3", "3"]]
list = Flow.from_enumerable(a)
|> Flow.partition(hash: fn e -> {e, Enum.at(e, 1)} end)
|> Flow.reduce(fn -> %{} end, fn arr, map ->
IO.inspect {arr, self()}
map
end)
|> Enum.to_list()
It puts nothing and returned an empty list. Did I miss something?
@veetase The partitions are by default named after integers: 0, 1, 2, 3, ... and you are returning strings. We should investigate if we can raise on those cases.
Thanks for answering, it seems have a limit by default, mine is 0,1,2,3.
Fixed here: elixir-lang/gen_stage@4e07298