mguenther/kafka-junit

Embedded Kafka not starting when SSL is enabled

aminders485 opened this issue · 6 comments

It is failing when it tries to start DefaultProducer as the SecurityProtocol is hardcoded as PLAINTEXT in getBrokerList for EmbeddedKafka

Yes, that is actually a known thing. It's not supported right now. I suppose this is a rather rare use case when working with an embedded Kafka cluster in a throw-away fashion. What is it actually that you try to test?

Removed the labels. I don't see an immediate use case for such an enhancement. What is it that you are trying to achieve with such a test?

I am trying to test my stream with certificates, that is, with SSL .
I have written an application that starts streaming of data into kafka topics through event streaming ,and I want the data to go through security .
So while writting Integeration Tests , my test case which doesnt have SSL works fine but when I try to write it with SSL after changing the properties , it fails .
I have kept my application configurable and it works with and without Security and I needed your help with the security part .
Please see the properties I am using currently for the security:
EmbeddedKafkaConfig.brokers()
.with(KafkaConfig$.MODULE$.ListenersProp(), sslListenerProtocol + listenerProperties)
.with(KafkaConfig$.MODULE$.AdvertisedListenersProp(),
sslListenerProtocol + advancedListenerProperties)
.with(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), "SSL:SSL")
.with(KafkaConfig$.MODULE$.SslProtocolProp(), "SSL")
.with(KafkaConfig$.MODULE$.SslEnabledProtocolsProp(), "TLSv1.2,TLSv1.1,TLSv1")
.with(KafkaConfig$.MODULE$.SslKeystoreTypeProp(), "JKS")
.with(KafkaConfig$.MODULE$.SslKeystoreLocationProp(),
"C:\Environment\Certificate\keystore-kafka-server.jks")
.with(KafkaConfig$.MODULE$.SslKeystorePasswordProp(), "12345678")
.with(KafkaConfig$.MODULE$.SslKeyPasswordProp(), "12345678")
.with(KafkaConfig$.MODULE$.SslTruststoreTypeProp(), "JKS")
.with(KafkaConfig$.MODULE$.SslTruststoreLocationProp(),
"C:\Environment\Certificate\truststore-kafka-server.jks")
.with(KafkaConfig$.MODULE$.SslTruststorePasswordProp(), "12345678")
.with(KafkaConfig$.MODULE$.SslClientAuthProp(), "required")
.with(KafkaConfig$.MODULE$.InterBrokerSecurityProtocolProp(), "SSL")
.with(KafkaConfig$.MODULE$.SslEndpointIdentificationAlgorithmProp(), "").build();

I get what you're trying to do but I'm not sure if you're testing this at the right level of abstraction. It's probably easier to run such an integration test against a well-configured, hardened Kafka cluster in an environment that is close to your production setup. You surely can do this with Kafka for JUnit and its ExternalKafkaCluster abstraction, which allows you to bind to an external cluster. Then it's just a matter of configuring writes, reads, observe operations properly (which, after all, you can do without any restriction).

I'm a bit reluctant to add this to the EmbeddedKafkaCluster abstraction. It's not meant to assert infrastructure related policies, but rather to check that Kafka-based software components work correctly on a functional level.

I'd be open to discuss a PR if you're willing to contribute to the project - as long as the design philosophy of Kafka for JUnit is respected.

That actually sounds about right .
Appreciate your support regarding this .
Can you share me an example how I can setup ExternalKafkaCluster .
Thanks