dashbitco/flow

Examples of supervised Flow?

youroff opened this issue · 13 comments

Are there any examples of supervised Flow?

I have a producer stage and I want to setup Flow with Window to pass events to Consumers after processing. I realize that I probably can create a child in the supervision tree, something like worker(Flow, [Producer], [function: Flow.from_stage]) but how (where) do I add logic to that flow?

UPD: Hm... Just tried to launch empty flow and "something like" didn't work

Something like this:

worker(YourApp.Flow, [])

And then something like:

defmodule YourApp.Flow do
  def start_link do
    Flow.from_stage(producer_stage)
    |> Flow.map(...)
    |> Flow.partition(...)
    |> Flow.reduce(...)
    |> Flow.start_link()
  end
end

Sorry for bothering you, but it's still not quite clear how to consume it. Is there a way to make it behave exactly like consumer_producer, but with use of windows and triggers? Also Window requires reduce to be present, but one of the things I'm trying to achieve is the following:

  1. Someone sends notifications in the queue, something like {user_id, message}
  2. On adding to the queue I append a timestamp: {user_id, message, timestamp}
  3. I want to receive (Window.session?) frames containing messages grouped by user within minute interval: [{1, m1, 0}, {1, m2, 30}] and then minute after [{1, m3, 65}, {1, m4, 90}] etc...
    Is this doable with Flow and Windows?

The producer and consumers need to be know when the flow is built. So building on the example above, you could do:

defmodule YourApp.Flow do
  def start_link(producer, consumer) do
    Flow.from_stage(producer)
    |> Flow.map(...)
    |> Flow.partition(...)
    |> Flow.reduce(...)
    |> Flow.into_stage(consumer)
  end
end

And every time you need to build a new flow for a set of producers/consumers, you call the functio above. Another option is to have a single flow but use the user_id for partitioning and sessioning. I think the session window may be able to do exactly as you want.

A counter proposal is this: try to write a flow, without being supervised, that solves your problem. You can create a sample.exs file in your project that depends on Flow and then run mix run sample.exs with some flow examples. Then you can paste the code here and I can help you make it supervised. :)

Oh, thank you very much. I tired to call start_link after into_stages and it didn't work. Should have read documentation more carefully. :) It's definitely a good idea to start in some kind of sandbox, I'll do it.

Now it makes much more sense, but I still have problems. So this piece of code works:

    Flow.from_stage(producer, stages: 1, window: Flow.Window.periodic(1, :second))
    |> Flow.reduce(fn -> [] end, & [&1 | &2])
    |> Flow.each_state(&IO.inspect/1)
    |> Flow.start_link(name: name)

It shows state once in a second, if no events came in within that interval — []
But I was trying to use Window.session and it didn't work, or maybe it worked the way I don't understand. So I have a tuple in event: {type, item, timestamp}, where timestamp is a result of System.system_time(:milliseconds), I'm trying to use it as follows:

    window = Flow.Window.session(1, :second, & elem(&1, 2))
    key = & elem(&1, 0)
    Flow.from_stage(producer, stages: 1, key: key, window: window)
    # the rest is the same

...and nothing happens. I guess I just misunderstood the idea behind of Session window, so how is it supposed to work?

A session will only be emitted when you see a distance of 1 second between events. Because you are using System.system_time(:milliseconds) as timestamp, I doubt you will ever see such difference.

So yeah, seems like I misunderstood the purpose of session. So it just collects events that happen on equal intervals and discard others? Such as (for 1 min gap):

{event, 0}
{event, 25_000} # discarded
{event, 60_000}
{event, 80_000} # discarded
{event, 120_000} 

Is that right? So if I want to combine notifications into bursts, should I try to use custom trigger instead? Assuming I have a flow of events {user_id, event}, and I want to do the following:

  1. Once event for certain user_id comes, open a buffer or window.
  2. Close buffer and pass it on, when:
    a) Amount of messages reach some value
    b) Some timeout passed after last event for this user_id was added into buffer

I don't have a working example yet, I'll try to create a gist that will reflect pipeline structure. But in general, what is the purpose of Window.session, what is the gap? Will it automatically partition flow by the key?

here's how I tried to do it:
https://github.com/youroff/flow_example

So if I perform: Pipeline.enqueue(x) it goes to queue, where x is an integer that is supposed to serve as a key for Window.session (in this example it's also an event itself) So what I wanted to achieve is that if I repeatedly call, let's say, Pipeline.enqueue(1) after 30 second it would print [{1, ...}, {1, ...}, ...] or if I use different integers, it would print two (or as many as many different keys I use) distinct frames with respective events: [{1, ...}, ...] and [{2, ...}, ...]

So I can just guess that it implicitly creates partitions so I have to departition it somehow?

A session will finish only after 30 second gaps in the data. Here is an example:

iex(1)> Pipeline.enqueue 1
:ok
iex(2)> Pipeline.enqueue 1
:ok
iex(3)> Pipeline.enqueue 1
:ok
iex(4)> Pipeline.enqueue 1
:ok
iex(5)> Pipeline.enqueue 1
:ok
# WAIT FOR 30 SECONDS
iex(6)> Pipeline.enqueue 1
:ok
iex(7)> [{1, 1487026346}, {1, 1487026346}, {1, 1487026345}, {1, 1487026345},
 {1, 1487026344}]

Notice how it did not automatically print after 30 seconds of inactivity. That's because we only know the user session finished after we got more data and we see 30 seconds has passed.

One "fix" is to set a timeout via a trigger:

window = Flow.Window.session(30, :second, & elem(&1, 1) * 1000)
             |> Flow.Window.trigger_periodically(5, :second)

This will allow you to see the progress while the session has not yet been completed. But, from everything you have described, it looks like you are interested in fixed windows.

From what you describe, you want regular windows.

Ah, right, now I get it. I should probably stick with periodic windows plus partition. Thanks a lot! 👍