Tradias/asio-grpc

Compiler error trying to use asio::experimental::use_promise as completion token

muusbolla opened this issue · 11 comments

I'm trying to use a promise as the completion token to agrpc methods, and getting a compiler error using gcc 11. Here is a minimal example, based off your streaming-server.cpp example:

// additional includes required:
#include <asio/experimental/promise.hpp>
#include <asio/this_coro.hpp>

asio::awaitable<void> handle_bidirectional_streaming_request(example::v1::Example::AsyncService& service)
{
    grpc::ServerContext server_context;
    grpc::ServerAsyncReaderWriter<example::v1::Response, example::v1::Request> reader_writer{&server_context};
    bool request_ok = co_await agrpc::request(&example::v1::Example::AsyncService::RequestBidirectionalStreaming,
                                              service, server_context, reader_writer);
    if (!request_ok)
    {
        // Server is shutting down.
        co_return;
    }
    example::v1::Request request;

    // none of the below work to put as COMPLETIONTOKEN - the following line fails to compile:
    // asio::experimental::use_promise
    // asio::experimental::use_promise_t<agrpc::GrpcContext>{}
    // asio::experimental::use_promise_t<agrpc::GrpcContext::executor_type>{}
    // asio::experimental::use_promise_t<agrpc::s::BasicGrpcExecutor<>>{}
    // asio::experimental::use_promise_t<asio::this_coro::executor_t>{}
    auto&& read_promise = agrpc::read(reader_writer, COMPLETIONTOKEN);

    co_await read_promise.async_wait(asio::use_awaitable);
}

The use case is that later in the function I would simultaneously await any of 3 conditions:
New request from client, finished writing response to client, or new response ready from data processing thread pool:

auto&& write_promise = agrpc::write(rw, response, COMPLETIONTOKEN);
auto&& data_ready_promise = // asynchronously dispatch work to data processing thread pool
auto rwd_promise = asio::experimental::promise<>::all(
    std::forward<decltype(read_promise)>(read_promise),
    std::forward<decltype(write_promise)>(write_promise),
    std::forward<decltype(data_ready_promise)>(data_ready_promise)
);
std::tie(read_ok, write_ok, data_ready_ok) = co_await rwd_promise.async_wait(asio::use_awaitable);

I get a very similar error when I try to do it using the new make_parallel_group:

example::v1::Request request;
example::v1::Response response;
auto read = agrpc::read(reader_writer, request, asio::experimental::deferred);
auto write = agrpc::write(reader_writer, response, asio::experimental::deferred);
auto result = co_await asio::experimental::make_parallel_group(read, write).async_wait(
        asio::experimental::wait_for_one_success(),
        asio::use_awaitable
);

I get the below error which is exactly the same as the error I got with one of the attempts to use use_promise:

.../asio-grpc-1.4.0/src/agrpc/detail/queryGrpcContext.hpp:32:16: error: invalid ‘static_cast’ from type ‘asio::system_context’ to type ‘agrpc::s::GrpcContext&’
[build]    32 |         return static_cast<agrpc::GrpcContext&>(asio::query(executor, asio::execution::context));
[build]       |                ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If I change the latest example from asio::use_awaitable to
asio::use_awaitable_t<agrpc::GrpcContext::executor_type>{}
, I see:

note: mismatched types ‘asio::any_io_executor’ and ‘agrpc::s::BasicGrpcExecutor<>’

The completion handler created from the completion token that is provided to the RPC functions must have an associated executor that refers to a GrpcContext (see also documentation). You can achieve that through the use of asio::bind_executor:

#include <asio/bind_executor.hpp>

auto&& read_promise = agrpc::read(reader_writer, asio::bind_executor(grpc_context, asio::experimental::use_promise));

and for parallel_group:

auto read = agrpc::read(reader_writer, request, asio::bind_executor(grpc_context, asio::experimental::deferred));
auto write = agrpc::write(reader_writer, response, asio::bind_executor(grpc_context, asio::experimental::deferred));
auto result = co_await asio::experimental::make_parallel_group(std::move(read), std::move(write)).async_wait(
        asio::experimental::wait_for_one_success(),
        asio::use_awaitable
);

I am thinking of adding an IoObject-like wrapper for grpc::AsyncService that would behave similar to other IoObjects in Asio, which would allow code such as:

asio::awaitable<void> handle_bidirectional_streaming_request(agrpc::GrpcContext& grpc_context, example::v1::Example::AsyncService& service)
{
  agrpc::AsyncService async_service{grpc_context, service};
  auto&& promise = async_service.read(reader_writer, asio::experimental::use_promise);
}

Let me know if such a class would be helpful for you then I can add it to the next release.


If you want to customize the executor of the asio::awaitable then you must use a different awaitable type:

// agrpc::GrpcAwaitable<void> is a type alias for `asio::awaitable<T, agrpc::GrpcExecutor>`
agrpc::GrpcAwaitable<void> example(example::v1::Example::AsyncService& service)
{
  // agrpc::GRPC_USE_AWAITABLE is simply `asio::use_awaitable_t<agrpc::GrpcExecutor>{}`
  auto&& read_promise = agrpc::read(reader_writer, agrpc::GRPC_USE_AWAITABLE);
}

Note that this does not work for use_promise_t because its implementation is bugged. It tries to get the initiating function's associated executor which is just wrong in my opinion. Nowhere else in Asio does any completion token try to do that and no initiating function in Asio has an associated executor.

It makes sense to me that a CompletionToken would need to be bound to an executor in order to run. However, it seems that the default agrpc::DefaultCompletionToken which resolves to asio::use_awaitable_t<> doesn't require to be bound? I'm able to compile all 3 of the below lines:

auto read = co_await agrpc::read(reader_writer, request);
auto read = co_await agrpc::read(reader_writer, request, asio::use_awaitable);
auto read = co_await agrpc::read(reader_writer, request, asio::use_awaitable_t<>{});

I see in the highlighted red section of the docs you linked (https://tradias.github.io/asio-grpc/structagrpc_1_1detail_1_1_request_fn.html#details) that it says it needs to be bound, but all the following examples on that doc page, such as the example for operator()() shows use of asio::use_awaitable without bind. In the example code the default completion token is used (https://github.com/Tradias/asio-grpc/blob/master/example/streaming-server.cpp, line 88 for example)

Mmh, maybe the documentation does not make it clear enough, let me know how to adjust the wording:

The completion handler of asio::use_awaitable_t has an implicit associated executor by design - it is asio::this_coro::executor. In other words, the executor that was used to co_spawn the coroutine.

If we co_spawn onto the grpc_context directly then we can use agrpc functions without binding:

agrpc::GrpcContext grpc_context{std::make_unique<grpc::CompletionQueue>()};
asio::co_spawn(grpc_context, [&]() -> asio::awaitable<void>
{
  auto executor = co_await asio::this_coro::executor;
  // executor is an asio::any_io_executor created from grpc_context.get_executor()
  // it can be used for agrpc functions, e.g.:
  grpc::Alarm alarm;
  co_await agrpc::wait(alarm, std::chrono::system_clock::now(), asio::use_awaitable);
  // or equivalent, relying on the default completion token
  co_await agrpc::wait(alarm, std::chrono::system_clock::now());
}, asio::detached);

But if we co_spawn onto a different execution context then we need binding:

asio::io_context io_context;
asio::co_spawn(io_context, [&]() -> asio::awaitable<void>
{
  auto executor = co_await asio::this_coro::executor;
  // executor is an asio::any_io_executor created from io_context.get_executor()
  // it cannot be used for agrpc functions directly, it needs executor binding
  grpc::Alarm alarm;
  // the following line would crash:
  //co_await agrpc::wait(alarm, std::chrono::system_clock::now(), asio::use_awaitable);
  // correct would be:
  co_await agrpc::wait(alarm, std::chrono::system_clock::now(), asio::bind_executor(grpc_context, asio::use_awaitable));
}, asio::detached);

Brilliant. Thank you very much. Being able to spawn and await a result from another executor is definitely on my list of needs for this project.

Open-ended question: What I am trying to build at the moment is a bidirectional streaming server that works like so (after the stream has been initially opened):
(1) On receipt of a new request (result of calling agrpc::read), post work to another thread pool (non-grpc asio::thread_pool) to retrieve and process some data, by calling asio::co_spawn or asio::post to the other thread pool.
(2) On completion of that request, get the result back on the agrpc thread. If no write is pending, initiate a new write. Else, queue the data for handling in (3)
(3) On completion of write, check the queue and initiate a new write if data is waiting, by calling agrpc::write.

I can build this using the native GRPC CompletionQueue API. But I'm struggling to figure out how to build it purely with coroutines. It seems that even for a simple bidi server that doesn't have (2) above, I need the ability to co_await agrpc::read and agrpc::write simultaneously. You have this in your streaming server example by the use of operator&&, however this only returns when both are finished. There is an operator||, but it cancels the second awaitable when the first one returns.

What I'm thinking of is an operator that can tie together two or more awaitables and wait for only one of them to complete, and NOT cancel the others. The one that completed would be processed, then replaced by a new awaitable. The ones that didn't complete would be left alone to continue running async, and the whole group would be re-awaited on the next loop. Pseudocode below:

auto read_task = agrpc::read(reader_writer, request);
auto write_task = agrpc::write(reader_writer, response);
while(true) {
  auto [did_read, read_ok, did_write, write_ok] = co_await either(read_task, write_task);
  if(did_read) {
    if (!read_ok) { co_return; }
    asio::post(other_thread_pool, [request]() { get_write_data_async(); }); // result will be enqueued to waiting_write_data
    read_task = agrpc::read(reader_writer, request);
  }
  if(did_write) {
    if (!write_ok) { co_return; }
    if (!waiting_write_data.empty()) {
      response.data = waiting_write_data.front();
      waiting_write_data.pop();
      write_task = agrpc::write(reader_writer, response);
    }
  }
}

I looked at the implementation of asio::experimental::awaitable_operators::operator|| and operator&& for inspiration. It seems like something similar, but replacing the call to wait_for_one_success() with wait_for_one_success(cancellation_type::none) might prevent it from canceling the unfinished tasks. However, it still seems like make_parallel_group is designed only to spawn new tasks that were deferred. I don't know how this would behave if I passed it one or more tasks that were already executing and one task that is new.

Also, in the case where there's nothing to write, the ability to assign a "null" value to write_task, and then check for null and only await the read_task would be useful. This could also be accomplished using a boolean and if statement, but if there is a more expressive way to do it, then that would be good to know.

I also put that code into the bidirectional streaming example: https://github.com/Tradias/asio-grpc/blob/master/example/streaming-server.cpp

@muusbolla Did you find some time to check out the above example code? Does it answer your questions?

@muusbolla since I have not heard from you in a while I hope that your issue has been resolved. Do not hesitate to open another issue if you have any questions or are facing any problems, I am happy to help.

Sorry for not responding. Priorities shifted and I ended up having to table this project. Thanks again for your great library and willingness to help :)