Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic
gyoho opened this issue · 3 comments
Firstly, thanks for developing such a nice tool very useful for Kafka development.
I'm using the withRunningKafka
method to test publishing to an embedded Kafka instance using ReactiveKafka
.
"KafkaEvnetSink" should {
"streams event to kafka" in withRunningKafka {
val kafkaConfig = EmbeddedKafkaConfig()
val brokerList = s"localhost:${kafkaConfig.kafkaPort}"
val topic = "abc"
val sink = Sink.fromSubscriber(new ReactiveKafka().publish(ProducerProperties(brokerList, topic, new StringEncoder())))
Source(1 to 10).map(_.toString).to(sink).run()
consumeFirstStringMessageFrom(topic) shouldBe "1"
}
The above test throws this error:
15:08:12.857 ERROR k.producer.async.DefaultEventHandler - Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: abc
It seems that this is an issue of port and host according this SO post.
Is this a bug or am I using it wrong?
If you swap localhost
with 127.0.0.1
what happens?
Unfortunately, same error.
If you look at the post, the OP managed to fix it by adding the advertised.port
property. You can add custom properties to brokers/producers/consumers by using the EmbeddedKafkaConfig
class. Give it a go.