Tradias/asio-grpc

Consult some questions

Closed this issue · 9 comments

nqf commented

Thank you very much for providing this library. It is very useful, but as a beginner in GRPC, I don't quite understand how to use it.

syntax = "proto3";

package lion.v1;

service Example {
  rpc Notice(stream NoticeRequest) returns (stream NoticeResponse) {}
  rpc Order(OrderRequest) returns (OrderResponse) {}
}

This is my example

int main() {
	std::unique_ptr<grpc::Server> server;

	std::vector<std::shared_ptr<agrpc::GrpcContext>> grpc_contexts;
	grpc::ServerBuilder builder;
	for (int i = 0; i < 3; i++)
		grpc_contexts.emplace_back(std::make_shared<agrpc::GrpcContext>(builder.AddCompletionQueue()));

	builder.AddListeningPort(host, grpc::InsecureServerCredentials());
	ExampleService service;
	builder.RegisterService(&service);

	server = builder.BuildAndStart();

	using OrderRPC = lion::AwaitableServerRPC<&ExampleService::RequestOrder>;
	agrpc::register_awaitable_rpc_handler<OrderRPC>(
		*(grpc_contexts[1]), service,
		[&](OrderRPC& rpc, lion::v1::OrderRequest& request) -> asio::awaitable<void> {
			log("OrderRequest: %s", request.order_seq_no().c_str());
			lion::v1::OrderResponse response;
			response.set_order_seq_no("abc-OrderRequest");
			co_await rpc.finish(response, grpc::Status::OK);
		},
		lion::RethrowFirstArg{});

	asio::thread_pool thread_pool{1};
	// bidirectional streaming
	agrpc::register_awaitable_rpc_handler<NoticeRPC>(
		*(grpc_contexts[2]), service, notice_rpc_handler(thread_pool), lion::RethrowFirstArg{});

	std::vector<std::thread> threads;
	for (size_t i = 0; i < grpc_contexts.size(); ++i) {
		threads.emplace_back([&, i] {
			auto& grpc_context = *grpc_contexts[i];
			grpc_context.run();
		});
	}

	for (auto& thread : threads) {
		thread.join();
	}
}
  1. It registers two APIs(NoticeRPC, OrderRPC) in *(grpc_contexts[1]) and *(grpc_contexts[2]), Is this safe and correct? Because I noticed that different grpc_contexts use different CompletionQueue, But I don't know how it distributes different APIs to their corresponding CompletionQueue ,I found this, but unfortunately there is no answer https://stackoverflow.com/questions/77549329/grpc-distribution-of-messages-when-using-multiple-completion-queues
nqf commented
  1. Can I use bidirectional stream like this? Will the reader be canceled when sleep returns?
		while (true) {
			using namespace asio::experimental::awaitable_operators;
			auto ok = co_await (reader(rpc, channel) || sleep(3s));
		}
  1. It is safe and desirable to register one rpc to multiple GrpcContexts. It is not safe to register one rpc to a GrpcContext multiple times. In your case, for optimal concurrency, you might want to register both rpcs to each GrpcContext. See also the example https://github.com/Tradias/asio-grpc/blob/master/example/multi-threaded-server.cpp#L96.

  2. Asio propagates cancellation through asio::awaitables which means the rpc will be cancelled if reader doesn't complete within 3s. Assuming that reader handles all error_codes. (Which the streaming-server example does not btw https://github.com/Tradias/asio-grpc/blob/master/example/streaming-server.cpp#L118). Note that cancellation of agrpc::ServerRPC::read/write/finish/... is final, see also Per-Operation cancellation here

nqf commented

about 1, OrderRPC will only run on *(grpc_contexts[1]) thread right ? if I register OrderRPC to each GrpcContext, I understand that it will run alternately on two different threads (*(grpc_contexts[1]) and *(grpc_contexts[2])) right ?

agrpc::register_awaitable_rpc_handler<OrderRPC>(
		*(grpc_contexts[1]), service,
		[&](OrderRPC& rpc, lion::v1::OrderRequest& request) -> asio::awaitable<void> {
		},
		lion::RethrowFirstArg{});
nqf commented

Sorry, I didn't understand what you said about 2 ,But I found this, This seems to have the same functionality as the example I provided,So I can assume that it can safely cancel the reader(BidiStreaming), right? https://github.com/Tradias/asio-grpc/blob/master/example/snippets/server_rpc.cpp#L30

nqf commented

about 1, Its best practice should be like this right ?

	std::vector<std::thread> threads;
	for (size_t i = 0; i < grpc_contexts.size(); ++i) {
		threads.emplace_back([&, i] {
			auto& grpc_context = *grpc_contexts[i];

			using GetOrderSeqNoRPC = lion::AwaitableServerRPC<&ExampleService::RequestGetOrderSeqNo>;
			agrpc::register_awaitable_rpc_handler<GetOrderSeqNoRPC>(
				grpc_context, service,
				[&](GetOrderSeqNoRPC& rpc, lion::v1::GetOrderSeqNoRequest& request) -> asio::awaitable<void> {
				},
				lion::RethrowFirstArg{});

			using OrderRPC = lion::AwaitableServerRPC<&ExampleService::RequestOrder>;
			agrpc::register_awaitable_rpc_handler<OrderRPC>(
				grpc_context, service,
				[&](OrderRPC& rpc, lion::v1::OrderRequest& request) -> asio::awaitable<void> {
				},
				lion::RethrowFirstArg{});

			// bidirectional streaming
			agrpc::register_awaitable_rpc_handler<NoticeRPC>(
				grpc_context, service, notice_rpc_handler(thread_pool), lion::RethrowFirstArg{});

			grpc_context.run();
		});
	}
  1. Correct
  2. The example you linked is slightly different, the read continues on in the background. I believe in your case you want to cancel the rpc after sleeping for 3s. Check out the following code:
asio::awaitable<bool> reader(BidiStreamingRPC& rpc, BidiStreamingRPC::Request& request)
{
    co_return co_await rpc.read(request, asio::use_awaitable);
}

auto sleep(agrpc::GrpcContext& grpc_context, std::chrono::milliseconds duration)
{
    return agrpc::Alarm(grpc_context).wait(std::chrono::system_clock::now() + duration, asio::use_awaitable);
}

auto bidirectional_streaming_rpc_handler(agrpc::GrpcContext& grpc_context)
{
    return [&](BidiStreamingRPC& rpc) -> asio::awaitable<void>
    {
        while (true)
        {
            BidiStreamingRPC::Request request;
            using namespace asio::experimental::awaitable_operators;
            const auto result = co_await (reader(rpc, request) || sleep(grpc_context, std::chrono::seconds(3)));
            if (const bool* ok = std::get_if<0>(&result))
            {
                if (*ok)
                {
                    // client send a message, read next one
                    continue;
                }
                // rpc.read() ended in `false` which means the client is done sending data.
                co_await rpc.finish(grpc::Status::OK, asio::use_awaitable);
                co_return;
            }
            // sleep() completed. rpc.cancel() has been called already, no need to call `rpc.finish`.
            co_return;
        }
    };
}


// ...
    agrpc::register_awaitable_rpc_handler<BidiStreamingRPC>(
        grpc_context, service, bidirectional_streaming_rpc_handler(grpc_context), test::RethrowFirstArg{});

Note that in my code reader(rpc, request) could be replaced with rpc.read(request, asio::use_awaitable) for better performance but I assume you want to perform more asynchronous actions in reader.

nqf commented

Thank you very much, but about https://github.com/Tradias/asio-grpc/blob/master/example/snippets/server_rpc.cpp#L30 , if alarm expired,I understand that agrpc::read will be canceled, Did I misunderstand?

but about https://github.com/Tradias/asio-grpc/blob/master/example/snippets/server_rpc.cpp#L30 , if alarm expired,I understand that agrpc::read will be canceled, Did I misunderstand?

The read won't be cancelled, gRPC has no option for cancelling a single read without cancelling the entire rpc. The waiter.wait(asio::deferred) is cancelled which means the code can perform some actions while the read continues on in the background. The next call to waiter.wait() will then continue to wait for the read.

Note that the waiter API is experimental. In particular I would like to see agrpc::Waiter being added to asio. Once that happened, agrpc::Waiter will be deprecated.

nqf commented

Thank you very much for your answer