morganstanley/modern-cpp-kafka

Integrate asynchronous producers with Boost.Asio

jm4R opened this issue ยท 4 comments

jm4R commented

Is there any canonical way to integrate asynchronous callbacks with boost::asio, eg. io_context? Of course we may forward callbacks by calling io_context::post with a deep copy of parameters but maybe there is more proficient way?

Hi, @jm4R Do you mean how to avoid deep copy within the message delivery callback? Here's an example, https://github.com/morganstanley/modern-cpp-kafka/blob/main/examples/kafka_async_producer_not_copy_payload.cc#L44
The trick is using a shared_ptr to maintain the life-cycle of the context, and then capture the smart pointer with the delivery callback (lambda).
Or, could you provide some demo code, showing what you're willing to do (with the boost::asio integration)?

jm4R commented

With the example you provided, I would have to make a deep copy of metadata and error variables, as it comes to me as a const&:

        // main thread
        producer_.send(
            record,
            [](const kafka_api::clients::producer::RecordMetadata& metadata,
               const kafka_api::Error& error) {
                // callback thread
                boost::asio::post(io_ctx, [metadata, error /*copies here*/] {
                    // main thread again (io_ctx::run() execution)
                    do_stuff(metadata, error);
                });
            });

Deeper explanation: The callback is emmited on a different thread than send was invoked. I want to do it in the same thread but asynchronously and I use Boost.Asio which allows me to do that. To integrate Kafka producer with Asio-based workflow I have to just post the callback to the end of processing queue by boost::asio::post.

Because kafka producers has clear IO/asynchronous nature it would be good to use it just like you use timers, sockets, serial_ports, etc. which are avilable in Boost.Asio and are proposed to std (hopefully C++23). This issue was just a question if such scenario was considered, nothing else.

@jm4R Thank you for the suggestion! Sounds good to provide an interface for boost::asio! I'll do some scoping in near future. ๐Ÿ˜ƒ
On another hand, if you're willing to handle the delivery callback with the main thread, it would be much simpler by using a producer initialized with EventsPollingOption::Manual option. https://github.com/morganstanley/modern-cpp-kafka/blob/main/doc/KafkaProducerQuickStart.md#kafkaproducer-with-kafkaclientskafkaclienteventspollingoption

@jm4R I revisited this issue and found actually there's nothing need to be done.

  • As long as we initialize the producer with option KafkaClient::EventsPollingOption::Manual, we could choose which thread to handle the delivery callback, -- just let the thread call the producer.pollEvents() periodically.
  • With boost::asio, we could post the delivery callback handling to the main thread processing queue, but, just as you mentioned, we have to deep copy the metadata and error variables -- there's no way to avoid the deep-copy, since the underlying librdkafka would destroy the rkmsg (which is an internal reference of the initial metadata) just after triggering the delivery callback.

A possible solution for you,

  1. Initialize the KafkaProducer with option KafkaClient::EventsPollingOption::Manual.
  2. Post producer.pollEvents() to the processing queue periodically, in order to handle the delivery callbacks (if any), --probably you need to count the in-flight messages (which have been sent but their delivery callbacks have not been called).

E.g.

        // initialize the producer with "manual poll" option
        KafkaProducer producer_(props, KafkaClient::EventsPollingOption::Manual);

        // main thread
        producer_.send(
            record,
            [](const kafka_api::clients::producer::RecordMetadata& metadata,
               const kafka_api::Error& error) {
                // the thread which calls `pollEvents()` would handle the callback
                do_stuff(metadata, error);
            });

        // at some place, we might want to post the callback to the end of processing queue (periodically)
        boost::asio::post(io_ctx, [&producer_] {
             // while, it would be more straight forward if we call `producer_.pollEvents()` directly, instead of using `boost::asio::post`
             producer_.pollEvents();
         });