joboccara/pipes

Allow pipelines to have a result (sink?) object

Closed this issue · 3 comments

I'm wondering if its possible to give pipelines a T sink() && function to return an arbitrary object upon completion. This would solve a number of issues such as allowing to create a collection on a single line, and implementing sink queries like count/contains/find/accumulate. This would require plumbing the return value through all the pipes at the end, as well as having the final pipeline essentially be an accumulator that holds some state.

At a first glance the easiest way to do this would be have the >>= operator for ranges and pipelines return sink() instead of void and adding a sink() overload to the pipelines.

// pipeline_base
auto sink() && { return void_t{}; } //  too bad we can't actually return void
  
// generic_pipeline
auto sink() && { return std::move(tailPipeline_).sink(); }

// fork
auto sink() && { return std::make_tuple(...); }

Then creating a count or to_vector sink pipeline would be fairly straightforward. Could also make a more generic to similar to what's being done with inserter.

template <typename Ele>
class vector_sink: public pipeline_base<vector_sink<T>>
{
public:
    template<typename T>
    void onReceive(T&& value) { vector_.push_back(FWD(value)); }

    auto sink() && { return std::move(vector_); }
    
private:
    std::vector<Ele> vector_;
};

I have no idea if any of the above compiles or their is some fundamental flaw in this approach, if not I can try to come up with a PR to try and fully implement it when I have some time.

Thanks for the interesting idea. What would the call site look like for the user of the pipeline? To understand how sink() would be called.

I'm imagining a user wouldn't ever need to call sink() directly, its just an api function for terminal pipelines like onReceive(). Just take advantage of the return slot of operator>>=(Range, Pipepline) which currently returns void.

So from your first example:

std::vector<int> destination = source >>= pipes::filter([](int i){ return i % 2 == 0; })
       >>= pipes::transform([](int i){ return i * 2; })
       >>= pipes::to_vector<int>();

A big downside of this compared to a pull model is you lose the ability to short circuit, so terminal pipelines like find() wouldn't ever be optimal. But accumulate is possible (which is enough to implement everything).

Edit: Thinking about it a bit more you could add short circuiting through some back pressure mechanism, but it wouldn't be straightforward.

Note: This is related to issue #33.