dashbitco/flow

Naming Stages

Closed this issue · 4 comments

Hey all,

On my team, we have been using Flow and GenStage for the past 9 months or more. We use it to process streams of data from a number of different sources, and to process a lot of data throughout the day. Ideally, we don't shut down. Currently the system has OOM crashes and slows down because of backed-up message queues.

We consistently have an issue where a GenStage process will slow its processing, will begin to accumulate memory. The GenStages that we have written ourselves are all named, but the GenStage generated by our use of Flow are nameless. We have to guess where the problems exist and I think that is a problem.

I would like to be able to provide a name prefix, and to have each "stage" of the Flow (map, flat_map, filter, etc) to append another prefix, then have each GenStage within the "stage" append a further suffix, but just enough to not collide, like _#{i}, like the partitions in Registry.

An example:

input_stage
|> Flow.from_stage(window: initial_window)
|> Flow.flat_map(&processing_function/1, name: __MODULE__.Flow.ProcessingFunction)
|> Flow.partition(window: post_processing_window)
|> Flow.filter(&filtering_function/1, name: __MODULE__.Flow.FilteringFunction)
|> Flow.into_stages([output_stage], name: __MODULE__.Flow)

This will generate GenStage processes like:

__MODULE__.Flow.ProcessingFunction.FlatMap._0
__MODULE__.Flow.ProcessingFunction.FlatMap._1
__MODULE__.Flow.ProcessingFunction.FlatMap._2
__MODULE__.Flow.ProcessingFunction.FlatMap._3
__MODULE__.Flow.FilteringFunction.Filter._0
__MODULE__.Flow.FilteringFunction.Filter._1
__MODULE__.Flow.FilteringFunction.Filter._2
__MODULE__.Flow.FilteringFunction.Filter._3

A feature, like this, that allows us to reliably determine the line(s) of code from where a process originated, would take a lot of the guesswork out of debugging and performance tuning for my team, and make it so that we can use tools like :observer and WombatOAM more effectively.

If this is of interest to the maintainers, I would be happy to implement this feature, with your supervision & advice.

Currently the system has OOM crashes and slows down because of backed-up message queues.

Interesting. The whole point of GenStage is to control the amount of data we ingest into the system so it doesn't OOM. So it will be very interesting to find out who is the culprit.

input_stage
|> Flow.from_stage(window: initial_window)
|> Flow.flat_map(&processing_function/1, name: __MODULE__.Flow.ProcessingFunction)
|> Flow.partition(window: post_processing_window)
|> Flow.filter(&filtering_function/1, name: __MODULE__.Flow.FilteringFunction)
|> Flow.into_stages([output_stage], name: __MODULE__.Flow)

This example per se doesn't work because we don't create a process per operation. That would be slow. We only create processes on from_stage and on every partition. I believe this actually simplifies the problem but I thought I would clarify.

The other idea is to simply give a name on start_link or on run and name them like a matrix. So if you pass name: Foo, we will generate names like Foo.P0_0, Foo.P0_1, ..., Foo.P0_7, Foo.P1_0, Foo.P1_1, ..., Foo.P1_7, etc. Especially because the same flow can be started multiple times and we don't want the inner partition names to conflict.

Thoughts?

Interesting. The whole point of GenStage is to control the amount of data we ingest into the system so it doesn't OOM. So it will be very interesting to find out who is the culprit.

We have implemented back-pressure in each of the sources feeding into GenStage/Flow, and I appreciate the design principle.

This example per se doesn't work because we don't create a process per operation. That would be slow. We only create processes on from_stage and on every partition. I believe this actually simplifies the problem but I thought I would clarify.

We could name on each partition, if that fits the concept better.

The other idea is to simply give a name on start_link or on run and name them like a matrix. So if you pass name: Foo, we will generate names like Foo.P0_0, Foo.P0_1, ..., Foo.P0_7, Foo.P1_0, Foo.P1_1, ..., Foo.P1_7, etc. Especially because the same flow can be started multiple times and we don't want the inner partition names to conflict.

It could also be that partitions/stages aren't named unless they are given one, and that would come with the understanding that a Flow would conflict if started multiple times.


I was given the impression at some point, regular partitioning helps with keeping the memory footprint lower, and I was wondering if that was the case, or if you had other thoughts on structuring flows for better performance.

It could also be that partitions/stages aren't named unless they are given one, and that would come with the understanding that a Flow would conflict if started multiple times.

We could do a mixed approach. Let's pass a name when we start/run/into_stages the flow for now and name the partitions as P0, P1 and so forth, with each partition stage adding a _0, _1 and similar prefix. Then, if that is not enough, we can support also naming the partitions so we have something other than P0.

I was given the impression at some point, regular partitioning helps with keeping the memory footprint lower, and I was wondering if that was the case, or if you had other thoughts on structuring flows for better performance.

Partitioning would actually increase the memory footprint simply because you know have multiple entities, each with their own buffer, but you want that anyway because you want to leverage multicore.

However, notice that unnecessary partitioning will increase memory usage and reduce throughput with no benefit whatsoever. Flow takes care of using all cores regardless of the number of times you call partition. You should only partition when the problem you are trying to solve requires you to route the data around.

If you can solve a problem without using partition at all, that would be even better.

The source of OOM is likely to be one of:

  1. A GenStage producer that is not respecting the GenStage contract and trying to push data downstream even when downstream does not have demand to handle it
  2. A combination of Flow operations that trigger the back-pressure late

If you have access to the machine nodes, I would try running observer and order the processes by memory usage. Even if they are not named, you can get the stacktrace to have an idea from where they are coming from.

If there is still an interest on this feature, a PR will be welcome! For now we are closing this, thank you!