tower-rs/tower-grpc

Independent bi-di streams

nevi-me opened this issue · 2 comments

Is it possible to create independent bidi streams, where for example the client sends a piece of data every 5 seconds, and the server responds every 10 seconds?

To provide context, I've got a use-case where a device sends its readings every 15 seconds, and once in a while, I need to send it a new interval to send readings in. So far, I send an interval back with each incoming reading, and this ends up flooding the device with responses.

The examples that I've seen so far show receiving a stream of requests, and responding with something per response. RouteChat being one such.

I would like to be able to:

// log is a struct with id: u64, readings: {...}

fn stream_logs(&mut self, request: Request<Streaming<Log>> -> Self::FrequencyFuture {
  request.into_inner().for_each(|log| {
    // do something with sample, not returning anything from here
  });
  
  // return a separate stream that notifies device to change log frequency within the same connection
  // - first problem is I don't know the log's device, because I can't get it from `request`
  // - I could pass it as a metadata field, then get it from `request` before consuming the stream
  future::ok(Response::new(my_device_monitor::get_required_freq(device_id: u64)))

  // my understanding is that the first stream that I consume would 'block' and I wouldn't be able to return the response, or is what I'm trying to do above possible?
}

Is what I'm trying to do above possible/achievable? Also, if the client disconnects, would the stream(s) get terminated automatically?

In this situation, I might suggest writing a custom stream combinator that takes the incoming stream polls that for the next item but doesnt return control on notready, then use a delay/interval to poll for the next delay and produce a stream item on that completing. This should allow you to process both concurrently.

Thanks @LucioFranco, I found a different way to achieve what I needed, but I'll try your suggestion when I get an opportunity to.