[Question] : How to detect client disconnection
rogerwrld opened this issue · 18 comments
Do you have an idea about how to detect client disconnection for a streaming rpc ?
Yes, there is no nice integration with agrpc::repeatedly_request
yet but you can roll your own accept-loop as described here: #51
See https://tradias.github.io/asio-grpc/structagrpc_1_1detail_1_1_notfiy_when_done_fn.html:
template <class RequestHandler>
asio::awaitable<void> request_loop(agrpc::GrpcContext& grpc_context, example::v1::Example::AsyncService& service,
RequestHandler request_handler)
{
grpc::ServerContext server_context;
auto on_done = agrpc::notify_when_done(grpc_context, server_context, asio::experimental::use_promise);
example::v1::Request request;
grpc::ServerAsyncResponseWriter<example::v1::Response> writer{&server_context};
const bool ok = co_await agrpc::request(&example::v1::Example::AsyncService::RequestUnary, service, server_context,
request, writer, asio::use_awaitable);
if (!ok)
{
// At this point, `agrpc::notify_when_done` will never complete.
grpc_context.work_finished();
co_return;
}
asio::co_spawn(grpc_context, request_loop(grpc_context, service, request_handler), asio::detached);
co_await request_handler(server_context, request, writer, std::move(on_done));
}
Then register the request handler, similar to repeatedly_request
:
auto request_handler = [&](grpc::ServerContext& server_context, test::msg::Request& request,
grpc::ServerAsyncWriter<test::msg::Response>& writer,
asio::experimental::promise<void()> on_done) -> asio::awaitable<void>
{
// Already cancelled?
// const bool is_cancelled = on_done.complete();
on_done(
[]()
{
// cancelled or finished
});
};
// register the request handler
asio::co_spawn(grpc_context, request_loop(grpc_context, service, std::move(request_handler)), asio::detached);
Thank you for your quick reply, I'm using a bidirectional streaming like the streaming server example.
Do I need to use only this new code or I use it in conjunction with the old repeatedly_request code.
You would use only this code. This line
asio::co_spawn(grpc_context, request_loop(grpc_context, service, std::move(request_handler)), asio::detached);
replaces the repeatedly_request
line.
As mentioned in boost asio documentation, async_wait does not exist anymore. (boost 1.82)
Seems like it got replaced by operator()
.
I tried to remove the async_wait call and adapt the code to use a bidirectional streaming rpc but I got compilation errors
Is there a unit test that uses bidirectional streaming with the request_loop call ?
Thanks a lot
No, but here is a more complete example:
#include <agrpc/asio_grpc.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/promise.hpp>
#include <boost/asio/experimental/use_promise.hpp>
#include <test/v1/test.grpc.pb.h>
namespace asio = boost::asio;
template <class RequestHandler>
asio::awaitable<void> request_loop(agrpc::GrpcContext& grpc_context, test::v1::Test::AsyncService& service,
RequestHandler request_handler)
{
grpc::ServerContext server_context;
auto on_done = agrpc::notify_when_done(grpc_context, server_context, asio::experimental::use_promise);
grpc::ServerAsyncReaderWriter<test::msg::Response, test::msg::Request> writer{&server_context};
const bool ok = co_await agrpc::request(&test::v1::Test::AsyncService::RequestBidirectionalStreaming, service,
server_context, writer, asio::use_awaitable);
if (!ok)
{
// At this point, `agrpc::notify_when_done` will never complete.
grpc_context.work_finished();
co_return;
}
asio::co_spawn(grpc_context, request_loop(grpc_context, service, request_handler), asio::detached);
co_await request_handler(server_context, writer, std::move(on_done));
}
void mmain(agrpc::GrpcContext& grpc_context, test::v1::Test::AsyncService& service)
{
auto request_handler = [&](grpc::ServerContext&, auto& reader_writer, auto on_done) -> asio::awaitable<void>
{
// Already cancelled?
// const bool is_cancelled = on_done.complete();
test::msg::Request request;
co_await agrpc::read(reader_writer, request);
on_done(
[]()
{
// cancelled or finished
});
};
// register the request handler
asio::co_spawn(grpc_context, request_loop(grpc_context, service, std::move(request_handler)), asio::detached);
}
Thank you for the update. I will test it and inform you of the outcome.
It works as expected !
I just modified :
asio::experimental::promise<void()> on_done
To :
auto on_done
Thank you very much for your assistance!
When trying to use the same logic below with the notify_when_done, I got a core file signal 6
// snippet
`
using namespace asio::experimental::awaitable_operators;
const auto ok = co_await (reader(reader_writer, channel) && writer(reader_writer, channel, thread_pool));
if (!ok)
{
co_return;
}
co_await agrpc::finish(reader_writer, grpc::Status::OK);
`
The issue is about :
terminate called without an active exception
I don't get any errors, here is the complete example:
#include "example/v1/example.grpc.pb.h"
#include "example/v1/example_ext.grpc.pb.h"
#include "helper.hpp"
#include "server_shutdown_asio.hpp"
#include <agrpc/asio_grpc.hpp>
#include <boost/asio/as_tuple.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/promise.hpp>
#include <boost/asio/experimental/use_promise.hpp>
#include <boost/asio/thread_pool.hpp>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <iostream>
#include <thread>
namespace asio = boost::asio;
// begin-snippet: server-side-bidirectional-streaming
// ---------------------------------------------------
// The following bidirectional-streaming example shows how to dispatch requests to a thread_pool and write responses
// back to the client.
// ---------------------------------------------------
// end-snippet
using Channel = asio::experimental::channel<void(boost::system::error_code, example::v1::Request)>;
// This function will read one requests from the client at a time. Note that gRPC only allows calling agrpc::read after
// a previous read has completed.
asio::awaitable<void> reader(grpc::ServerAsyncReaderWriter<example::v1::Response, example::v1::Request>& reader_writer,
Channel& channel)
{
while (true)
{
example::v1::Request request;
if (!co_await agrpc::read(reader_writer, request))
{
// Client is done writing.
break;
}
// Send request to writer. The `max_buffer_size` of the channel acts as backpressure.
(void)co_await channel.async_send(boost::system::error_code{}, std::move(request),
asio::as_tuple(asio::use_awaitable));
}
// Signal the writer to complete.
channel.close();
}
// The writer will pick up reads from the reader through the channel and switch to the thread_pool to compute their
// response.
asio::awaitable<bool> writer(grpc::ServerAsyncReaderWriter<example::v1::Response, example::v1::Request>& reader_writer,
Channel& channel, asio::thread_pool& thread_pool)
{
bool ok{true};
while (ok)
{
const auto [ec, request] = co_await channel.async_receive(asio::as_tuple(asio::use_awaitable));
if (ec)
{
// Channel got closed by the reader.
break;
}
// In this example we switch to the thread_pool to compute the response.
co_await asio::post(asio::bind_executor(thread_pool, asio::use_awaitable));
// Compute the response.
example::v1::Response response;
response.set_integer(request.integer() * 2);
// reader_writer is thread-safe so we can just interact with it from the thread_pool.
ok = co_await agrpc::write(reader_writer, response);
// Now we are back on the main thread.
}
co_return ok;
}
asio::awaitable<void> handle_bidirectional_streaming_request(example::v1::Example::AsyncService& service,
asio::thread_pool& thread_pool)
{
grpc::ServerContext server_context;
grpc::ServerAsyncReaderWriter<example::v1::Response, example::v1::Request> reader_writer{&server_context};
bool request_ok = co_await agrpc::request(&example::v1::Example::AsyncService::RequestBidirectionalStreaming,
service, server_context, reader_writer);
if (!request_ok)
{
// Server is shutting down.
co_return;
}
}
template <class RequestHandler>
asio::awaitable<void> request_loop(agrpc::GrpcContext& grpc_context, example::v1::Example::AsyncService& service,
RequestHandler request_handler)
{
grpc::ServerContext server_context;
auto on_done = agrpc::notify_when_done(grpc_context, server_context, asio::experimental::use_promise);
grpc::ServerAsyncReaderWriter<example::v1::Response, example::v1::Request> reader_writer{&server_context};
const bool ok = co_await agrpc::request(&example::v1::Example::AsyncService::RequestBidirectionalStreaming, service,
server_context, reader_writer, asio::use_awaitable);
if (!ok)
{
// At this point, `agrpc::notify_when_done` will never complete.
grpc_context.work_finished();
co_return;
}
asio::co_spawn(grpc_context, request_loop(grpc_context, service, request_handler), asio::detached);
co_await request_handler(server_context, reader_writer, std::move(on_done));
}
int main(int argc, const char** argv)
{
const auto port = argc >= 2 ? argv[1] : "50051";
const auto host = std::string("0.0.0.0:") + port;
std::unique_ptr<grpc::Server> server;
grpc::ServerBuilder builder;
agrpc::GrpcContext grpc_context{builder.AddCompletionQueue()};
builder.AddListeningPort(host, grpc::InsecureServerCredentials());
example::v1::Example::AsyncService service;
builder.RegisterService(&service);
example::v1::ExampleExt::AsyncService service_ext;
builder.RegisterService(&service_ext);
server = builder.BuildAndStart();
abort_if_not(bool{server});
example::ServerShutdown server_shutdown{*server, grpc_context};
asio::thread_pool thread_pool{1};
auto request_handler = [&](auto&, auto& reader_writer, auto on_done) -> asio::awaitable<void>
{
// Maximum number of requests that are buffered by the channel to enable
// backpressure.
on_done(
[]()
{
std::cout << "done" << std::endl;
});
static constexpr auto MAX_BUFFER_SIZE = 2;
Channel channel{co_await asio::this_coro::executor, MAX_BUFFER_SIZE};
using namespace asio::experimental::awaitable_operators;
const auto ok = co_await (reader(reader_writer, channel) && writer(reader_writer, channel, thread_pool));
if (!ok)
{
// Client has disconnected or server is shutting down.
co_return;
}
co_await agrpc::finish(reader_writer, grpc::Status::OK);
};
asio::co_spawn(grpc_context, request_loop(grpc_context, service, std::move(request_handler)), asio::detached);
grpc_context.run();
std::cout << "Shutdown completed\n";
}
Okay I will try that.
handle_bidirectional_streaming_request
Is not used here.
I'm having trouble with the agrpc::write function.
I haven't encountered any situations where asio-grpc reports a 'pure virtual method called' error.
It seems that the issue lies in how I'm utilizing boost asio.
The provided example works well, but as soon as I attempt to use it in a multithreading mode with multiple grpc contexts, it crashes. In my specific case, agrpc::write is invoked when data, produced by an external thread, is available. at this moment, I do not know which grpc context use..
Say I have a thread like this:
std::jthread th([]() { while(1) Response data; });
How to send it to the writer ?
I use co_spawn + boost asio concurrent channel ?
which grpc context do I need to use if I instanciated 10 threads (1 grpc context per thread)
I have also an off-topic question:
How did you learn boost asio to write a such great code ? 😀
Thanks
What if I use a concurrent queue between the producer and consumers ?
How to post data to the appropriate grpc context?
Thanks
How is your question different from #69, where produce_data
might as well be co_spawned onto a asio::thread_pool{1}
?
A single RPC should generally be handled by one GrpcContext. I might be possible to switch but I haven't tested it as it does not provide any benefits.
Only one read and one write can happen at the same time which means you always need some form of queueing of messages if you produce them in parallel to the reads/writes. That's where asio::experimental::channel
could come in.
When I tried to use thread_pool, I got this:
pure virtual method called
publish is called in a separated thread.
It's the produce_data
method in your example.
Thank you very much for your assistance !
There is a race condition on subs_
in Subject
, all non-static member functions are lacking some form of synchronization (e.g. a mutex).
What is the purpose of this line? co_await asio::post(asio::bind_executor(thread_pool_, asio::use_awaitable));
The following agrpc::write
is thread-safe anyways. My recommendation, remove the line.
I don't understand the use of thread_pool_
. In publish
you might as well use one of the grpc_contexts_
, for example in round-robin fashion:
std::atomic_size_t current{};
auto& next()
{
const auto cur = current.fetch_add(1);
const auto pos = cur % grpc_contexts_.size();
return *grpc_contexts_[pos];
}
void publish(NotificationPtr data) { co_spawn(next(), notify(std::move(data)), asio::detached); }
Your publish
function lacks backpressure awareness, meaning you can end up producing data much faster than your clients can consume it. Consider making the function asynchronous. Depending on your usage of publish
you might even be able to make it blocking:
void publish(NotificationPtr data) { co_spawn(next(), notify(std::move(data)), asio::use_future).get(); }
And then in your call to std::make_shared<Channel>()
specify a max_buffer_size.
When I started learning Asio I studied the reference documentation, in particular this page. But the author recently added an entire tutorial section that explains the concepts of the library in detail: https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio/overview/model.html. I think this might be a good place to start reading.