manub/scalatest-embedded-kafka

Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic

gyoho opened this issue · 3 comments

gyoho commented

Firstly, thanks for developing such a nice tool very useful for Kafka development.

I'm using the withRunningKafka method to test publishing to an embedded Kafka instance using ReactiveKafka.

  "KafkaEvnetSink" should {
    "streams event to kafka" in withRunningKafka {
      val kafkaConfig = EmbeddedKafkaConfig()
      val brokerList = s"localhost:${kafkaConfig.kafkaPort}"
      val topic = "abc"
      val sink = Sink.fromSubscriber(new ReactiveKafka().publish(ProducerProperties(brokerList, topic, new StringEncoder())))

      Source(1 to 10).map(_.toString).to(sink).run()
      consumeFirstStringMessageFrom(topic) shouldBe "1"
    }

The above test throws this error:
15:08:12.857 ERROR k.producer.async.DefaultEventHandler - Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: abc

It seems that this is an issue of port and host according this SO post.

Is this a bug or am I using it wrong?

manub commented

If you swap localhost with 127.0.0.1 what happens?

gyoho commented

Unfortunately, same error.

manub commented

If you look at the post, the OP managed to fix it by adding the advertised.port property. You can add custom properties to brokers/producers/consumers by using the EmbeddedKafkaConfig class. Give it a go.