dashbitco/flow

Proposal: scan operator

mfclarke opened this issue · 2 comments

Hi, I'm not sure if this is the right way to go about this, but I'd like to propose a scan operator similar to Stream.scan. Something like:

def scan(flow, initial, combine, opts \\ []) do
  scan_window = Flow.Window.global |> Flow.Window.trigger_every(1, :keep)

  flow
  |> Flow.partition(Keyword.put(opts, :window, scan_window))
  |> Flow.reduce(initial, combine)
  |> Flow.emit(:state)
end

Now things like realtime counts are viable:

Flow.from_stage(MyTextInputStage)
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.scan(fn -> %{} end, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Flow.each(fn {word, count} -> Dashboards.update_word_count(word, count) end)
|> Flow.run

If you guys think it would be useful, I'd be more than happy to PR it with tests etc.

My concern with those functions is that in order to explain what it does and its runtime properties you pretty much need to describe its implementation. Given it can be implemented in very few lines of code, I would prefer to go with this route for now. :) Thanks for the proposal!

No problem, makes sense :)