Segv for message that cannot be sent on kafka
persona94 opened this issue · 1 comments
persona94 commented
I have a server where my producer is not authorized to send a message on kafka. I use async send and the program crashes after the 1st send
My code to send
{
std::unique_lock<std::mutex> lk(recordMapMutex);
auto iter = recordMap.emplace(std::piecewise_construct,
std::forward_as_tuple(data->messageId),
std::forward_as_tuple(data->topic, kafka::Key{ data->key.c_str(), data->key.size() }, kafka::Value{ data->event.c_str(), data->event.size() }));
rec = &iter.first->second;
std::string prdIdStr = "producerId";
rec->headers() = { { prdIdStr, kafka::Value(instanceId.c_str(), instanceId.size()) } };
}
Here, data
is a class is passed in as a shared ptr. For now the shared pointer is pushed to a map and never removed, so the memory is valid even after the message is sent.
Here is the async send
producer->send(
*rec,
[&, data, cb, rec](const RecordMetadata& metadata, const Error& error)
{
{
// Disabled on purpose
// std::unique_lock<std::mutex> lk(recordMapMutex);
// recordMap.erase(data->messageId);
}
if (error)
{
std::cout < < "send hit error, calling CB" << std::endl;
cb(data, metadata.toString(), error.toString());
}
else
{
std::cout << "No error" << std::endl;
cb(data, metadata.toString(), "");
}
});
cb
is a callback function that takes in a shared_ptr
to the data
that was passed in, and 2 strings
The error I get from kafka is
Error Broker: Topic authorization failed [29] sending message
Some logs from kafka
KafkaProducer[feae661e-a7d61a65] NOINFO | [thrd:main]: Topic test-topic metadata information unknown
KafkaProducer[feae661e-a7d61a65] NOINFO | [thrd:main]: Topic test-topic partition count is zero: should refresh metadata
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: Requesting metadata for 1/1 topics: refresh unavailable topics
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: Request metadata for 1 topic(s): refresh unavailable topics
KafkaProducer[feae661e-a7d61a65] SEND | [thrd:ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: Sent MetadataRequest (v4, 57 bytes @ 0, CorrId 3)
KafkaProducer[feae661e-a7d61a65] RECV | [thrd:ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: Received MetadataResponse (v4, 284 bytes, CorrId 3, rtt 1.55ms)
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: ===== Received metadata (for 1 requested topics): refresh unavailable topics =====
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: ClusterId: MvZ7D3v5Qx2GezBHiz26Vg, ControllerId: 0
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: 3 brokers, 1 topics
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: Broker #0/3: kafka-kafka-0.kafka-kafka-brokers.kafka.svc.k8s-test:9093 NodeId 0
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: Broker #1/3: kafka-kafka-2.kafka-kafka-brokers.kafka.svc.k8s-test:9093 NodeId 2
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: Broker #2/3: kafka-kafka-1.kafka-kafka-brokers.kafka.svc.k8s-test:9093 NodeId 1
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: Topic #0/1: test-topic with 0 partitions: Broker: Topic authorization failed
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: Error in metadata reply for topic test-topic (PartCnt 0): Broker: Topic authorization failed
KafkaProducer[feae661e-a7d61a65] TOPICERROR | [thrd:main]: Topic test-topic has permanent error: Broker: Topic authorization failed
KafkaProducer[feae661e-a7d61a65] STATE | [thrd:main]: Topic test-topic changed state unknown -> error
KafkaProducer[feae661e-a7d61a65] PARTCNT | [thrd:main]: Failing all 1 unassigned messages in topic test-topic due to permanent topic error: Broker: Topic authorization failed
KafkaProducer[feae661e-a7d61a65] UAS | [thrd:main]: 0/1 messages were partitioned in topic test-topic
KafkaProducer[feae661e-a7d61a65] UAS | [thrd:main]: 1/1 messages failed partitioning in topic test-topic
KafkaProducer[feae661e-a7d61a65] METADATA | [thrd:main]: ssl://kafka-kafka-bootstrap.kafka:9093/bootstrap: 1/1 requested topic(s) seen in metadata
back trace from the core file
#0 0x0000000000f5a360 in rd_list_destroy ()
#1 0x0000000000f091b9 in rd_kafka_headers_destroy ()
#2 0x0000000000ea4fc4 in rd_kafka_produceva ()
#3 0x00000000008c0268 in kafka::clients::producer::KafkaProducer::send(kafka::clients::producer::ProducerRecord const&, std::function<void (kafka::clients::producer::RecordMetadata const&, kafka::Error const&)> const&, kafka::clients::producer::KafkaProducer::SendOption, kafka::clients::producer::KafkaProducer::ActionWhileQueueIsFull) (this=<optimized out>, record=..., deliveryCb=..., option=<optimized out>, action=<optimized out>) at /opt/rh/gcc-toolset-11/root/usr/include/c++/11/bits/stl_vector.h:918
persona94 commented
Hello, is anyone able to look into this?