Integrate asynchronous producers with Boost.Asio
jm4R opened this issue ยท 4 comments
Is there any canonical way to integrate asynchronous callbacks with boost::asio
, eg. io_context
? Of course we may forward callbacks by calling io_context::post
with a deep copy of parameters but maybe there is more proficient way?
Hi, @jm4R Do you mean how to avoid deep copy within the message delivery callback? Here's an example, https://github.com/morganstanley/modern-cpp-kafka/blob/main/examples/kafka_async_producer_not_copy_payload.cc#L44
The trick is using a shared_ptr
to maintain the life-cycle of the context, and then capture the smart pointer with the delivery callback (lambda).
Or, could you provide some demo code, showing what you're willing to do (with the boost::asio
integration)?
With the example you provided, I would have to make a deep copy of metadata
and error
variables, as it comes to me as a const&
:
// main thread
producer_.send(
record,
[](const kafka_api::clients::producer::RecordMetadata& metadata,
const kafka_api::Error& error) {
// callback thread
boost::asio::post(io_ctx, [metadata, error /*copies here*/] {
// main thread again (io_ctx::run() execution)
do_stuff(metadata, error);
});
});
Deeper explanation: The callback is emmited on a different thread than send
was invoked. I want to do it in the same thread but asynchronously and I use Boost.Asio which allows me to do that. To integrate Kafka producer with Asio-based workflow I have to just post the callback to the end of processing queue by boost::asio::post
.
Because kafka producers has clear IO/asynchronous nature it would be good to use it just like you use timers, sockets, serial_ports, etc. which are avilable in Boost.Asio and are proposed to std (hopefully C++23). This issue was just a question if such scenario was considered, nothing else.
@jm4R Thank you for the suggestion! Sounds good to provide an interface for boost::asio
! I'll do some scoping in near future. ๐
On another hand, if you're willing to handle the delivery callback with the main thread, it would be much simpler by using a producer initialized with EventsPollingOption::Manual
option. https://github.com/morganstanley/modern-cpp-kafka/blob/main/doc/KafkaProducerQuickStart.md#kafkaproducer-with-kafkaclientskafkaclienteventspollingoption
@jm4R I revisited this issue and found actually there's nothing need to be done.
- As long as we initialize the producer with option
KafkaClient::EventsPollingOption::Manual
, we could choose which thread to handle thedelivery callback
, -- just let the thread call theproducer.pollEvents()
periodically. - With
boost::asio
, we could post thedelivery callback
handling to the main thread processing queue, but, just as you mentioned, we have to deep copy themetadata
anderror
variables -- there's no way to avoid the deep-copy, since the underlyinglibrdkafka
would destroy therkmsg
(which is an internal reference of the initialmetadata
) just after triggering thedelivery callback
.
A possible solution for you,
- Initialize the
KafkaProducer
with optionKafkaClient::EventsPollingOption::Manual
. - Post
producer.pollEvents()
to the processing queue periodically, in order to handle the delivery callbacks (if any), --probably you need to count the in-flight messages (which have been sent but their delivery callbacks have not been called).
E.g.
// initialize the producer with "manual poll" option
KafkaProducer producer_(props, KafkaClient::EventsPollingOption::Manual);
// main thread
producer_.send(
record,
[](const kafka_api::clients::producer::RecordMetadata& metadata,
const kafka_api::Error& error) {
// the thread which calls `pollEvents()` would handle the callback
do_stuff(metadata, error);
});
// at some place, we might want to post the callback to the end of processing queue (periodically)
boost::asio::post(io_ctx, [&producer_] {
// while, it would be more straight forward if we call `producer_.pollEvents()` directly, instead of using `boost::asio::post`
producer_.pollEvents();
});