Memory usage for side-effect only flows is high using Flow.each instead of Flow.map
methyl opened this issue · 1 comments
methyl commented
Consider following stream processing:
File.stream!(path)
|> Enum.each(fn x -> rabbitmq_publish(x) end)
def rabbitmq_publish(x)
# publish and return nothing
nil
end
It does not need to return the data for further usage, so it's safe to immediately remove processed data from the memory, which seems to be the case when using plain streams (memory usage is constant).
However if I incorporate Flow into the mix:
File.stream!(path)
Flow.from_enumerable() |> Flow.each(fn x -> rabbitmq_publish(x) end) |> Flow.run
memory usage grows as if whole file was kept in the memory.
It seems the proper way that avoids memory usage would be to use Flow.map:
File.stream!(path)
Flow.from_enumerable() |> Flow.map(fn x -> rabbitmq_publish(x) end) |> Flow.run
Is this an expected behavior or something that could be improved? If it's expected, is it worth to mention it in the documentation?
josevalim commented
This behaviour is expected because each still sends messages downstream in the pipeline. I am thinking it is probably better to deprecate each
. Good catch!