Proposal: scan operator
mfclarke opened this issue · 2 comments
mfclarke commented
Hi, I'm not sure if this is the right way to go about this, but I'd like to propose a scan operator similar to Stream.scan
. Something like:
def scan(flow, initial, combine, opts \\ []) do
scan_window = Flow.Window.global |> Flow.Window.trigger_every(1, :keep)
flow
|> Flow.partition(Keyword.put(opts, :window, scan_window))
|> Flow.reduce(initial, combine)
|> Flow.emit(:state)
end
Now things like realtime counts are viable:
Flow.from_stage(MyTextInputStage)
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.scan(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Flow.each(fn {word, count} -> Dashboards.update_word_count(word, count) end)
|> Flow.run
If you guys think it would be useful, I'd be more than happy to PR it with tests etc.
josevalim commented
My concern with those functions is that in order to explain what it does and its runtime properties you pretty much need to describe its implementation. Given it can be implemented in very few lines of code, I would prefer to go with this route for now. :) Thanks for the proposal!
mfclarke commented
No problem, makes sense :)