dashbitco/flow

Memory usage for side-effect only flows is high using Flow.each instead of Flow.map

methyl opened this issue · 1 comments

Consider following stream processing:

File.stream!(path) 
  |> Enum.each(fn x -> rabbitmq_publish(x) end)

def rabbitmq_publish(x)
  # publish and return nothing
  nil
end

It does not need to return the data for further usage, so it's safe to immediately remove processed data from the memory, which seems to be the case when using plain streams (memory usage is constant).

However if I incorporate Flow into the mix:

File.stream!(path) 
  Flow.from_enumerable() |> Flow.each(fn x -> rabbitmq_publish(x) end)  |> Flow.run

memory usage grows as if whole file was kept in the memory.

It seems the proper way that avoids memory usage would be to use Flow.map:

File.stream!(path) 
  Flow.from_enumerable() |> Flow.map(fn x -> rabbitmq_publish(x) end)  |> Flow.run

Is this an expected behavior or something that could be improved? If it's expected, is it worth to mention it in the documentation?

This behaviour is expected because each still sends messages downstream in the pipeline. I am thinking it is probably better to deprecate each. Good catch!