into_collectable/2 and through_collectable/2 to complement from_enumerable(s)/1
tsutsu opened this issue · 2 comments
Problem description
It is already possible to route a flow into a Collectable, e.g.
Flow.from_enumerable([1, 2, 3]) |> Enum.into([])
This works, but forces the collecting to happen in the context of the process creating the Flow, rather than as a separate GenStage consumer process, and therefore "hogs" the Flow-spawning process's inbox from being used for other purposes, as discussed here.)
This can be sensible, if the Flow-spawning process is then going to use the Collected data—it won't attempt to do anything else until the Enum.into/2
completes anyway, and once it proceeds, it will need everything that was delivered to it to reside in its own process's heap. But if the Collectable exists solely to cause side-effects upon insertion rather than as a value object that will carry around its inserted values, this blocking behavior can be suboptimal, since the (potentially long-lived) parent process will end up full of garbage—and blocking as it GC-sweeps—from the messages that were delivered from the GenStage.stream
to the Collectable.
For example, Ecto's Ecto.Adapters.SQL.Stream
struct supports the Collectable behavior, allowing code like this:
db_stream = Ecto.Adapters.SQL.stream(MyRepo, "COPY foo FROM STDIN WITH (FORMAT csv, HEADER false)")
MyRepo.transaction fn ->
Enum.into(csv_flow, db_stream)
end
Here, the process executing the Ecto transaction will receive—and linearize!—all the data produced from csv_flow
, only to pass it off again to db_stream
, where the data will turn around and travel back out to a DBConnection process.
Proposed solution
Add a function, Flow.into_collectable(flow, collectable)
, which would be a terminal, demand-driving call for the Flow (like Enum.into/2
is.)
-
into_collectable/2
would pass each GenStage process in the current partition a copy of the collectable. For correct concurrency semantics, it may be advisable forcollectable
to actually becollectable_or_fn
where the user could supply a fun that is called by each GenStage process in the partition, and which returns a concurrency-isolated instance of the collectable.) -
Each GenStage process, upon receiving the collectable from
into_collectable/2
, would immediately callCollectable.into/1
on it the to get a reducer, and then would hold onto said reducer in its state. -
Each GenStage process would then, in its
handle_events/3
, apply the reducer to the received events.
Optionally, one could also add a function Flow.through_collectable(flow, collectable)
, which would work similarly, but would be non-terminal. The partition would simply be extended with a step that passes events into the reducer—but then, having done so and having acquired the modified reducer, would simply pass those same events unmodified to the next step in the partition (along with storing the modified reducer in its state.)
Flow.through_collectable/2
would be perfect for use-cases like that of Ecto.Adapters.SQL.stream/2
, where the goal is simply to cause the side-effect of storing the structs being processed into a database (i.e. "durable-izing" them) without necessarily wanting to end the processing of the structs there, and without necessarily having any need to linearize the durabilization process.
As well, both Flow.into_collectable/2
and Flow.through_collectable/2
would potentially get people to make a lot more of their libraries implement Collectable! The Collectable behavior is much simpler to implement than the GenStage consumer behavior; if implementing Collectable on a struct automatically gave a developer effectively all the advantages of a GenStage consumer, with only the time investment of writing the Collectable reducer, developers would likely be more interested in making their structs Collectable.
Thanks @tsutsu! I think we can implement into_collectable that behaves like through_collectable. If you do not care about the return value, then you can call Flow.emit(:nothing)
. We can probably implement it on top of Flow.reduce+Flow.map_state. Could you please send a PR? Thank you.
If there is still an interest on this feature, a PR will be welcome! For now we are closing this, thank you!