grpc/grpc-swift

Seeing better performance with gRPC callback approach than with gRPC AsyncService/async-await APIs

palfonsoilluscio opened this issue ยท 15 comments

What are you trying to achieve?

Questions at the bottom

To stream a file in chunks from C++ server (using gRPC Async APIs approach) to Swift clients.

This is our .cpp server code (code is really simple):

#include "Streamer_grpc.h"

namespace Illuscio {

Streamer_GRPC::~Streamer_GRPC() {
    _server->Shutdown();
    _queue->Shutdown();
};

void Streamer_GRPC::Run( uint16_t port ) {
    std::string server_address = "[0.0.0.0](http://0.0.0.0/):" + std::to_string( port );

    ServerBuilder builder;

    builder.AddListeningPort( server_address, grpc::InsecureServerCredentials() );
    // Register "service_" as the instance through which we'll communicate with
    // clients. In this case it corresponds to an *asynchronous* service.
    builder.RegisterService( &_service );
    // Get hold of the completion queue used for the asynchronous communication
    // with the gRPC runtime.
    _queue = builder.AddCompletionQueue();
    // Finally assemble the server.
    _server = builder.BuildAndStart();
    std::cout << "Server listening on " << server_address << std::endl;

    // Proceed to the server's main loop.
    HandleRPCs();
}


Streamer_GRPC::TMPFileData::TMPFileData( PROTO_Streamer::AsyncService* service, ServerCompletionQueue* queue )
    : CallData { service, queue }, _responder { &_context } {
    _tag.id = MessageID::TMP_FILE;
    _tag.data = this;
    Proceed();
}

void Streamer_GRPC::TMPFileData::Proceed() {
    switch ( _status ) {
    case CallStatus::CREATE: {
        _status = CallStatus::OPEN_FILE;
        _service->RequestStreamFile( &_context, &_clientFileReq, &_responder, _cq, _cq, (void*) this );
        break;
    }

    case CallStatus::OPEN_FILE: {
        myfile.open(_clientFileReq.file_path(), std::ios::binary);
           
        if (myfile.is_open()) {
            _status = CallStatus::PROCESS;
            std::cout << "File was successfuly open..." << std::endl;
            static_cast<CallData*>(this)->Proceed();
        } else {
            // Handles file opening failure
            std::cout << "File open operation failed..." << std::endl;
            _responder.Finish(Status::CANCELLED, nullptr);
            return;
        }
   
        break;
    }

    case CallStatus::PROCESS: {
        new TMPFileData { _service, _cq };
   
        if (!myfile.eof()) {
            const int read_size = static_cast<int>(myfile.gcount());
            _fileChunk.set_chunk_data(buffer, read_size);
            _responder.Write(_fileChunk, this);
        } else {
            std::cout << "EOF reached..." << std::endl;
            _status = CallStatus::FINISH;
            _responder.Finish(Status::OK, this);
        }

        break;
    }  
   
    default: {
        delete this;
    }
    }
}

void Streamer_GRPC::HandleRPCs() {
    new TMPFileData { &_service, _queue.get() };
    void* tag;
    bool ok;

    while ( _queue->Next(&tag, &ok) ) {
       // GPR_ASSERT(_queue->Next(&tag, &ok));
       // GPR_ASSERT(ok);
        static_cast<CallData*>(tag)->Proceed();
    }
}
};

This is our proto file:

syntax = "proto3";
package Illuscio;

service PROTO_Streamer {
    rpc StreamFile(TMP_FileRequest) returns (stream TMP_FileChunk);
}

message TMP_FileRequest {
    string file_path = 1;
}

message TMP_FileChunk {
    bytes chunk_data = 1;
}

What have you tried so far?

This is the client code we have tried:
Below method calls the actual gRPC client method (StreamFile rpc). It actually just call next and wait for the server to write it and then the data is appended until there's no more data to append.

func getContentData(for sourceURL: URL) async throws -> Data {
        do {
            try connection.connect()
        } catch { throw MCError.serverConnectionError }

        // Make the gRPC call to fetch the file's data asynchronously
       
        Task {
            var receivedData = Data()
            var fileChunkIterator = (connection.client?.getAsset(with: sourceURL) as! GRPCAsyncResponseStream<Illuscio_TMP_FileChunk>).makeAsyncIterator()
            do {
                var fileChunk = try await fileChunkIterator.next()
                // Concurrently fetch and append chunks to receivedData
                while fileChunk != nil {
                    receivedData.append(fileChunk!.chunkData)
                    fileChunk = try await fileChunkIterator.next()
                }
               
                try? connection.disconnect()
            } catch (let error) {
                throw MCError.fileWriteToLocalFailed
            }
        }
            // Once the call completes, disconnect from the server
            return Data()
    }

Also we just realized that when using the async APIs, this is what is getting printed in the console when using logging level = .trace. The h2_data_bytes=5, which is weird because our write buffer size is 4096??:

2023-09-01T21:24:08-0500 trace RpcLogger : grpc_request_id=4116AC03-49BE-447D-BE37-8D78C6534FC4 h2_data_bytes=5 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:24:08-0500 trace RpcLogger : grpc_request_id=4116AC03-49BE-447D-BE37-8D78C6534FC4 h2_data_bytes=5 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:24:08-0500 trace RpcLogger : grpc_request_id=4116AC03-49BE-447D-BE37-8D78C6534FC4 h2_data_bytes=5 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:24:08-0500 trace RpcLogger : grpc_request_id=4116AC03-49BE-447D-BE37-8D78C6534FC4 h2_data_bytes=5 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:24:08-0500 trace RpcLogger : grpc_request_id=4116AC03-49BE-447D-BE37-8D78C6534FC4 h2_data_bytes=5 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame

These are the logs when using the callback API approach (h2_data_bytes is bigger 4104):

2023-09-01T21:47:55-0500 trace RpcLogger : grpc_request_id=DF4C2541-9165-4C2D-A1C2-4EEB70204F28 h2_data_bytes=4104 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:47:55-0500 trace RpcLogger : grpc_request_id=DF4C2541-9165-4C2D-A1C2-4EEB70204F28 h2_data_bytes=4104 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:47:55-0500 trace RpcLogger : grpc_request_id=DF4C2541-9165-4C2D-A1C2-4EEB70204F28 h2_data_bytes=4104 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:47:55-0500 trace RpcLogger : grpc_request_id=DF4C2541-9165-4C2D-A1C2-4EEB70204F28 h2_data_bytes=4104 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:47:55-0500 trace RpcLogger : grpc_request_id=DF4C2541-9165-4C2D-A1C2-4EEB70204F28 h2_data_bytes=4104 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:47:55-0500 trace RpcLogger : grpc_request_id=DF4C2541-9165-4C2D-A1C2-4EEB70204F28 h2_data_bytes=4104 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:47:55-0500 trace RpcLogger : grpc_request_id=DF4C2541-9165-4C2D-A1C2-4EEB70204F28 h2_data_bytes=4104 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:47:55-0500 trace RpcLogger : grpc_request_id=DF4C2541-9165-4C2D-A1C2-4EEB70204F28 h2_data_bytes=4104 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame

Questions:

1.- Unless we are missing something, we do not see the same performance as with the gRPC callbacks implementation. We know next is marked as await, so our understanding is it won't be asking for the file's next chunk until it is written on the callbackqueue server side, then we will be unblocked on client side and we can ask for the next chunk(iterator next method) and so on (since there's only 1 write task permitted at a time).....

2.- With callbacks, we were able to have a loop on the server iterating over a file and sending one chunk after the other until we reach the file's end. Because of this approach we were able to receive the chunks really really fast on the client side (which is not happening with the gRPC async approach).

3.- And we are obligated to use the Async approach since we will be serving several clients at the same time.

4.- The data size received is between 0-90 kb/s for async, for sync it is between 20-40 Mb/s. Why the difference?

5.- Are we missing something?

Thanks in advance.

Lukasa commented
  1. gRPC async/await will be slower than the callback-based approach for the forseeable future. This is because the callbacks can execute on the SwiftNIO event loops, avoiding context switches. Async/await cannot do this at this time. We judge that in general the easier to follow programming model makes this worthwhile.

    Note that grpc-swift uses a fairly complex buffering strategy for reads and writes, so you should not be limited to one chunk at a time on the network.

  2. Async grpc should still be able to send multiple chunks at once.

  3. Callback-based grpc-swift can serve multiple clients at once.

  4. Can you attach a pcap?

Hi @Lukasa:

Really appreciate the answers you provided above, since this project is really important for our company, thank you...

Let us summarize the issue again, just to confirm we all are on the same page:

On server side, we are using C++ (as the code provided previously) and the approach was to use the AsyncService (gRPC Async) just as it is used on the gRPC example on the link below (helloworld): https://github.com/grpc/grpc/blob/master/examples/cpp/helloworld/greeter_async_server.cc

With above approach, we are observing that the data rate received at the client (Swift client using async/await on the RPC method) is pretty low, between 0-90 kbps. Please have the pcaps for client and server below:

client_capture.pcap.zip
server_capture.pcap.zip

Now, the reason we say is low is because when we try with the C++ server code but using the Callback API approach as in this file example: https://github.com/grpc/grpc/blob/master/examples/cpp/helloworld/greeter_callback_server.cc, the data transmission rates we observe on the client side are between 28-40 Mbps (actually files get downloaded pretty fast), the difference is really noticeable.

Hope I was able to explain in detail what we are facing and we are still on the same page. Hopefully pcaps can help to understand the issue as well...

Kindly let us know if you need additional information.

**Btw, great that "Async grpc should still be able to send multiple chunks at once.", I think the order on the client side won't be altered, which is what we are looking for, am I right?

And also great that "Callback-based grpc-swift can serve multiple clients at once." You meant the Callback API approach as on "greeter_callback_server.cc", could serve multiple clients at once as well correct? (Well maybe some additional code will be needed, but is still possible)...**

Thanks in advance.

Lukasa commented

Can you confirm that you're benchmarking your Swift code compiled in release mode?

Yes, we are benchmarking using Swift logic in release mode.

Lukasa commented

Sorry, can I clarify something? In your two examples, is the only thing changing the C++ server code? Or are you changing the Swift client code in any way?

Sure, so the client Swift code is the same in both cases:

  • Basically getting the .makeIterator() from the RPC.
  • And calling await / next() in order to get the next file chunk.

C++ server code is the only one changing.

Lukasa commented

Oh interesting. This is almost certainly a problem in that C++ code then: the async version is sending tiny packets, while the callback one probably isn't.

That's exactly what is happening, when we see the network monitor on Xcode, the packets received are tiny if we compare them with the ones received when the callback-based logic on server is in place.

Tried to change the byte size of the chunk (bigger), but we get same results.

Are we missing any server side setting/configuration for Async case?

I mean, the code posted above looks pretty straight forward but there's a chance we are missing something...

Does below info throws a hint of some sort, or is just normal behavior?

"Also we just realized that when using the async APIs, this is what is getting printed in the console when using logging level = .trace. The h2_data_bytes=5, which is weird because our write buffer size is 4096??:

2023-09-01T21:24:08-0500 trace RpcLogger : grpc_request_id=4116AC03-49BE-447D-BE37-8D78C6534FC4 h2_data_bytes=5 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:24:08-0500 trace RpcLogger : grpc_request_id=4116AC03-49BE-447D-BE37-8D78C6534FC4 h2_data_bytes=5 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:24:08-0500 trace RpcLogger : grpc_request_id=4116AC03-49BE-447D-BE37-8D78C6534FC4 h2_data_bytes=5 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:24:08-0500 trace RpcLogger : grpc_request_id=4116AC03-49BE-447D-BE37-8D78C6534FC4 h2_data_bytes=5 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:24:08-0500 trace RpcLogger : grpc_request_id=4116AC03-49BE-447D-BE37-8D78C6534FC4 h2_data_bytes=5 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame

These are the logs when using the callback API approach (h2_data_bytes is bigger 4104):

2023-09-01T21:47:55-0500 trace RpcLogger : grpc_request_id=DF4C2541-9165-4C2D-A1C2-4EEB70204F28 h2_data_bytes=4104 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:47:55-0500 trace RpcLogger : grpc_request_id=DF4C2541-9165-4C2D-A1C2-4EEB70204F28 h2_data_bytes=4104 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:47:55-0500 trace RpcLogger : grpc_request_id=DF4C2541-9165-4C2D-A1C2-4EEB70204F28 h2_data_bytes=4104 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:47:55-0500 trace RpcLogger : grpc_request_id=DF4C2541-9165-4C2D-A1C2-4EEB70204F28 h2_data_bytes=4104 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:47:55-0500 trace RpcLogger : grpc_request_id=DF4C2541-9165-4C2D-A1C2-4EEB70204F28 h2_data_bytes=4104 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:47:55-0500 trace RpcLogger : grpc_request_id=DF4C2541-9165-4C2D-A1C2-4EEB70204F28 h2_data_bytes=4104 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:47:55-0500 trace RpcLogger : grpc_request_id=DF4C2541-9165-4C2D-A1C2-4EEB70204F28 h2_data_bytes=4104 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame
2023-09-01T21:47:55-0500 trace RpcLogger : grpc_request_id=DF4C2541-9165-4C2D-A1C2-4EEB70204F28 h2_data_bytes=4104 h2_end_stream=false h2_payload=DATA [GRPC] received HTTP2 frame"

Lukasa commented

Can you show me your header file? There are type definitions with methods I can't see. In particular, I can't see anywhere you assign buffer and I can't see anywhere you do the I/O.

Absolutely.

#include <boost/type_index.hpp>
#include <boost/type_index/runtime_cast/register_runtime_class.hpp>

#include <grpcpp/grpcpp.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>

#include <grpc/support/log.h>
#include <IlluscioPointCore/Proto/DataTransit.grpc.pb.h>

#include <IlluscioPointCore/Publish/Helpers/logging.h>
#include <IlluscioPointCore/Publish/Helpers/Config.h>

class AsyncAssetStreamerV2 {
 private:
    // An instance of our service, compiled from code generated by protoc
    ::Illuscio::PROTO_Streamer::AsyncService service_;
    // This is the Queue. It's shared for all the requests.
    std::unique_ptr<grpc::ServerCompletionQueue> cq_;
    // A gRPC server object
    std::unique_ptr<grpc::Server> server_;
    // Config, so the user can override our default parameters
    const Config config_;

    void init();
    void stop();

public:
    AsyncAssetStreamerV2(const Config& config)
        : config_{config} {}
    
    void Run();
    
    // Create and start a new instance of a request-type
    template <typename T>
    static void createNew(AsyncAssetStreamerV2& parent,
                          ::Illuscio::PROTO_Streamer::AsyncService& service,
                          ::grpc::ServerCompletionQueue& cq);

    class RequestBase {
    public:
        RequestBase(AsyncAssetStreamerV2& parent,
                    ::Illuscio::PROTO_Streamer::AsyncService& service,
                    ::grpc::ServerCompletionQueue& cq);

        virtual ~RequestBase() = default;

        // The state-machine
        virtual void proceed(bool ok) = 0;

        void done();

        template <typename reqT>
        static std::string me(reqT& req);

    protected:
        size_t getNewReqestId() noexcept;

        // The state required for all requests
        AsyncAssetStreamerV2& parent_;
        Illuscio::PROTO_Streamer::AsyncService& service_;
        ::grpc::ServerCompletionQueue& cq_;
        ::grpc::ServerContext ctx_;
        const size_t rpc_id_;
    };

    /*! Implementation for the `GetFilesMetadata()` RPC call.
     */
    class GetFilesMetadataRequest : public RequestBase {
    public:
        enum class State {
            CREATED,
            REPLIED,
            DONE
        };

        GetFilesMetadataRequest(AsyncAssetStreamerV2& parent,
                          ::Illuscio::PROTO_Streamer::AsyncService& service,
                          ::grpc::ServerCompletionQueue& cq);
           

        // State-machine to deal with a single request
        // This works almost like a co-routine, where we work our way down for each
        // time we are called. The State_ could just as well have been an integer/counter;
        void proceed(bool ok);

    private:
        ::Illuscio::SourceURL req_;
        ::Illuscio::FileMetadata reply_;
        ::grpc::ServerAsyncResponseWriter<::Illuscio::FileMetadata> responder_{&ctx_};
        State state_ = State::CREATED;
    };
    
    /*! Implementation for the `StreamFile()` RPC call.
     */
    class StreamFileRequest : public RequestBase {
    public:
        enum class State {
            CREATED,
            OPEN_FILE,
            PROCESSING,
            FINISHING,
            DONE
        };

        StreamFileRequest(AsyncAssetStreamerV2& parent,
                            ::Illuscio::PROTO_Streamer::AsyncService& service,
                            ::grpc::ServerCompletionQueue& cq);

        // State-machine to deal with a single request
        // This works almost like a co-routine, where we work our way down for each
        // time we are called. The State_ could just as well have been an integer/counter;
        void proceed(bool ok) override;

    private:
        ::Illuscio::TMP_FileRequest fileReq_;
        ::Illuscio::TMP_FileChunk fileChunk_;
        ::grpc::ServerAsyncWriter<::Illuscio::TMP_FileChunk> responder_{&ctx_};
        State state_ = State::CREATED;
        const int chunk_size = 64 * 1024;
        char buffer[64 * 1024];
        size_t replies_ = 0;
        std::ifstream myfile;
    };
};

buffer assignment is in the cpp file above, if you are referring to that (code snippet below):

if (!myfile.eof()) {
            const int read_size = static_cast<int>(myfile.gcount());
            _fileChunk.set_chunk_data(buffer, read_size);
            _responder.Write(_fileChunk, this);
...
Lukasa commented

That doesn't appear to populate buffer though. Where is the file I/O done? When do you read from myfile into buffer?

This is really embarrassing, the actual I/O is missing on the code , I apologize @Lukasa. Will try again hopefully it should work as expected and we can close this question. Thanks a lot...

Ok so both approaches are working now as expected, now according to the docs gRPC Async is faster than Callbacks having said that, should we pick the Async approach over Callback-based server's logic, thoughts?

The truth is we want to stream as fast as possible.

Lukasa commented

You should pick whichever approach makes your development easier. Performance optimization is always possible.

Makes sense, thanks.