Consult some questions
Closed this issue · 9 comments
Thank you very much for providing this library. It is very useful, but as a beginner in GRPC, I don't quite understand how to use it.
syntax = "proto3";
package lion.v1;
service Example {
rpc Notice(stream NoticeRequest) returns (stream NoticeResponse) {}
rpc Order(OrderRequest) returns (OrderResponse) {}
}
This is my example
int main() {
std::unique_ptr<grpc::Server> server;
std::vector<std::shared_ptr<agrpc::GrpcContext>> grpc_contexts;
grpc::ServerBuilder builder;
for (int i = 0; i < 3; i++)
grpc_contexts.emplace_back(std::make_shared<agrpc::GrpcContext>(builder.AddCompletionQueue()));
builder.AddListeningPort(host, grpc::InsecureServerCredentials());
ExampleService service;
builder.RegisterService(&service);
server = builder.BuildAndStart();
using OrderRPC = lion::AwaitableServerRPC<&ExampleService::RequestOrder>;
agrpc::register_awaitable_rpc_handler<OrderRPC>(
*(grpc_contexts[1]), service,
[&](OrderRPC& rpc, lion::v1::OrderRequest& request) -> asio::awaitable<void> {
log("OrderRequest: %s", request.order_seq_no().c_str());
lion::v1::OrderResponse response;
response.set_order_seq_no("abc-OrderRequest");
co_await rpc.finish(response, grpc::Status::OK);
},
lion::RethrowFirstArg{});
asio::thread_pool thread_pool{1};
// bidirectional streaming
agrpc::register_awaitable_rpc_handler<NoticeRPC>(
*(grpc_contexts[2]), service, notice_rpc_handler(thread_pool), lion::RethrowFirstArg{});
std::vector<std::thread> threads;
for (size_t i = 0; i < grpc_contexts.size(); ++i) {
threads.emplace_back([&, i] {
auto& grpc_context = *grpc_contexts[i];
grpc_context.run();
});
}
for (auto& thread : threads) {
thread.join();
}
}
- It registers two
APIs(NoticeRPC, OrderRPC)
in*(grpc_contexts[1]) and *(grpc_contexts[2])
, Is this safe and correct? Because I noticed that differentgrpc_contexts
use differentCompletionQueue
, But I don't know how it distributes different APIs to their correspondingCompletionQueue
,I found this, but unfortunately there is no answer https://stackoverflow.com/questions/77549329/grpc-distribution-of-messages-when-using-multiple-completion-queues
- Can I use bidirectional stream like this? Will the reader be canceled when sleep returns?
while (true) {
using namespace asio::experimental::awaitable_operators;
auto ok = co_await (reader(rpc, channel) || sleep(3s));
}
-
It is safe and desirable to register one rpc to multiple GrpcContexts. It is not safe to register one rpc to a GrpcContext multiple times. In your case, for optimal concurrency, you might want to register both rpcs to each GrpcContext. See also the example https://github.com/Tradias/asio-grpc/blob/master/example/multi-threaded-server.cpp#L96.
-
Asio propagates cancellation through
asio::awaitable
s which means the rpc will be cancelled ifreader
doesn't complete within 3s. Assuming thatreader
handles allerror_code
s. (Which the streaming-server example does not btw https://github.com/Tradias/asio-grpc/blob/master/example/streaming-server.cpp#L118). Note that cancellation ofagrpc::ServerRPC::read/write/finish/...
is final, see alsoPer-Operation cancellation
here
about 1, OrderRPC
will only run on *(grpc_contexts[1])
thread right ? if I register OrderRPC
to each GrpcContext, I understand that it will run alternately on two different threads (*(grpc_contexts[1]) and *(grpc_contexts[2]))
right ?
agrpc::register_awaitable_rpc_handler<OrderRPC>(
*(grpc_contexts[1]), service,
[&](OrderRPC& rpc, lion::v1::OrderRequest& request) -> asio::awaitable<void> {
},
lion::RethrowFirstArg{});
Sorry, I didn't understand what you said about 2 ,But I found this, This seems to have the same functionality as the example I provided,So I can assume that it can safely cancel the reader(BidiStreaming), right? https://github.com/Tradias/asio-grpc/blob/master/example/snippets/server_rpc.cpp#L30
about 1, Its best practice should be like this right ?
std::vector<std::thread> threads;
for (size_t i = 0; i < grpc_contexts.size(); ++i) {
threads.emplace_back([&, i] {
auto& grpc_context = *grpc_contexts[i];
using GetOrderSeqNoRPC = lion::AwaitableServerRPC<&ExampleService::RequestGetOrderSeqNo>;
agrpc::register_awaitable_rpc_handler<GetOrderSeqNoRPC>(
grpc_context, service,
[&](GetOrderSeqNoRPC& rpc, lion::v1::GetOrderSeqNoRequest& request) -> asio::awaitable<void> {
},
lion::RethrowFirstArg{});
using OrderRPC = lion::AwaitableServerRPC<&ExampleService::RequestOrder>;
agrpc::register_awaitable_rpc_handler<OrderRPC>(
grpc_context, service,
[&](OrderRPC& rpc, lion::v1::OrderRequest& request) -> asio::awaitable<void> {
},
lion::RethrowFirstArg{});
// bidirectional streaming
agrpc::register_awaitable_rpc_handler<NoticeRPC>(
grpc_context, service, notice_rpc_handler(thread_pool), lion::RethrowFirstArg{});
grpc_context.run();
});
}
- Correct
- The example you linked is slightly different, the read continues on in the background. I believe in your case you want to cancel the rpc after sleeping for 3s. Check out the following code:
asio::awaitable<bool> reader(BidiStreamingRPC& rpc, BidiStreamingRPC::Request& request)
{
co_return co_await rpc.read(request, asio::use_awaitable);
}
auto sleep(agrpc::GrpcContext& grpc_context, std::chrono::milliseconds duration)
{
return agrpc::Alarm(grpc_context).wait(std::chrono::system_clock::now() + duration, asio::use_awaitable);
}
auto bidirectional_streaming_rpc_handler(agrpc::GrpcContext& grpc_context)
{
return [&](BidiStreamingRPC& rpc) -> asio::awaitable<void>
{
while (true)
{
BidiStreamingRPC::Request request;
using namespace asio::experimental::awaitable_operators;
const auto result = co_await (reader(rpc, request) || sleep(grpc_context, std::chrono::seconds(3)));
if (const bool* ok = std::get_if<0>(&result))
{
if (*ok)
{
// client send a message, read next one
continue;
}
// rpc.read() ended in `false` which means the client is done sending data.
co_await rpc.finish(grpc::Status::OK, asio::use_awaitable);
co_return;
}
// sleep() completed. rpc.cancel() has been called already, no need to call `rpc.finish`.
co_return;
}
};
}
// ...
agrpc::register_awaitable_rpc_handler<BidiStreamingRPC>(
grpc_context, service, bidirectional_streaming_rpc_handler(grpc_context), test::RethrowFirstArg{});
Note that in my code reader(rpc, request)
could be replaced with rpc.read(request, asio::use_awaitable)
for better performance but I assume you want to perform more asynchronous actions in reader
.
Thank you very much, but about https://github.com/Tradias/asio-grpc/blob/master/example/snippets/server_rpc.cpp#L30 , if alarm expired,I understand that agrpc::read
will be canceled, Did I misunderstand?
but about https://github.com/Tradias/asio-grpc/blob/master/example/snippets/server_rpc.cpp#L30 , if alarm expired,I understand that agrpc::read will be canceled, Did I misunderstand?
The read
won't be cancelled, gRPC has no option for cancelling a single read without cancelling the entire rpc. The waiter.wait(asio::deferred)
is cancelled which means the code can perform some actions while the read continues on in the background. The next call to waiter.wait()
will then continue to wait for the read.
Note that the waiter API is experimental. In particular I would like to see agrpc::Waiter
being added to asio. Once that happened, agrpc::Waiter
will be deprecated.
Thank you very much for your answer