joboccara/pipes

non exhaustive pipes

Opened this issue · 3 comments

It would be really nice if pipes could communicate that they are done. Take for example take, where there is some expensive transformation downstream, it would be sweet if take(n) would only 'trigger' n calculations. So maybe something like a done() method could be used to communicate upstream that the pushing can stop. For example the code for onReceive in transform looks like this.

    template<typename... Values, typename TailPipeline>
    void onReceive(Values&&... values, TailPipeline&& tailPipeline)
    {
        send(detail::invoke(function_.get(), FWD(values)...), tailPipeline);
    }

It does not check whether the tailPipeline wants the result. So if pipes had a way to check, then we could rewrite this as

    template<typename... Values, typename TailPipeline>
    void onReceive(Values&&... values, TailPipeline&& tailPipeline)
    {
        if( not tailPipeline.done() )
        {
            send(detail::invoke(function_.get(), FWD(values)...), tailPipeline);
        } 
        else 
        {
           done_ = true;
        }
    }

Here a link to a simple working example based primarily on the existing pipes code. It no longer uses "smart output iterators" as they are too "dumb". I removed the proxies and the pipeline as they weren't need to get this running.

Here is a more extensive example that removes some of the constraints in the previous one. In the example, the top part in the namespace pipes is essentially code form this library with some modifications. The lower part after using namespace pipes is mainly mine.

The example contains a transform >>= filter >>= take pipe because that is a compelling example of where this is useful.

I'm not sure whether this is a bug or not, but something like

std::vector<int> a{ 1, 2, 3, 4 };
std::vector<int> b{ 11, 22, 33, 44 };

mux( a, b ) >>= filter( []( auto i, auto j ) { return i + j > 20.0; } )
    >>= transform( []( auto i, auto j ) {  return std::make_tuple( i * 10, j - 5 ); } )
    >>= to_out_stream( std::cout ); 

fails with

binary '<<': no operator found which takes a right-hand operand of type 'T' (or there is no acceptable conversion)
with [T=std::tuple<int,int>]

I added extra code in my transform to handle the case where the transformation returns a tuple, as I didn't know how better to let it return multiple values.

template<typename... Values, typename Pipe>
  void
  recv( Pipe&& tail, Values&&... values ) {
     auto recv = [&]( auto&&... values ) {  tail.recv( FWD( values )... ); };
     using transformed = decltype( transform_( FWD( values )... ) );

     if( not done() ) {
       if constexpr( is_tuple<transformed>::value ) {
         std::apply( recv, transform_( FWD( values )... ) );
       } else {
         std::invoke( recv, transform_( FWD( values )... ) );
       }

       tail_done_ = tail.done();
     }
  }

I am still not sure why there is a distinction between pipes and pipelines. The code seems to work fine without it.

It is probably a good idea.

A pipe needs to pass its output to another pipe or pipeline, whereas a pipeline doesn't (it ends with a closing point such as pipes::push_back). Therefore, an input can only be connected to a pipeline, not a pipe.