Kafka Clients cannot provide Broker SSL credentials or Schema Registry Basic Auth
jheinnic opened this issue · 1 comments
We use Aiven as a service provider and therefore need to provide client SSL certificates to authenticate with our brokers and have to provide Basic HTTP Authentication credentials to interface with our schema registry. I've been working with the following sketch of a job config file:
metrics:
- /opt/spark/work-dir/assets/metric-01.yml
inputs:
cdc_events:
kafka:
servers:
- our-deployment-name-here-1999.aivencloud.com:16493
topic: crossed-wires-topic
schemaRegistryUrl: https://${SCHEMA_REGISTRY_BASIC_AUTH}@our-deployment-name-here-1999.aivencloud.com:16496/
consumerGroup: metor-group
options:
startingOffsets: earliest
kafka.security.protocol: SSL
kafka.ssl.protocol: TLS
kafka.ssl.key.password: ${KAFKA_SSL_KEY_PASSWORD}
kafka.ssl.keystore.location: /etc/secrets/kafka/client.keystore.p12
kafka.ssl.keystore.password: ${KAFKA_SSL_KEYSTORE_PASSWORD}
kafka.ssl.keystore.type: PKCS12
kafka.ssl.truststore.location: /etc/secrets/kafka/client.truststore.jks
kafka.ssl.truststore.password: ${KAFKA_SSL_TRUSTSTORE_PASSWORD}
kafka.ssl.truststore.type: JKS
basic.auth.credentials.source: USER_INFO
basic.auth.user.info: ${SCHEMA_REGISTRY_BASIC_AUTH}
The logs show that these settings are reaching neither the ABRiS schema registry client nor Metorikku's Kafka Consumer:
INFO 2020-04-19 23:06:45,387 11461 io.confluent.kafka.serializers.KafkaAvroDeserializerConfig [main] KafkaAvroDeserializerConfig values:
| bearer.auth.token = [hidden]
| schema.registry.url = [https://<SECRET_INFO>@our-deployment-name-here-1999.aivencloud.com:16496/ ]
| basic.auth.user.info = [hidden]
| auto.register.schemas = true
| max.schemas.per.subject = 1000
| basic.auth.credentials.source = URL
| schema.registry.basic.auth.user.info = [hidden]
| bearer.auth.credentials.source = STATIC_TOKEN
| specific.avro.reader = false
| value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
| key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
INFO 2020-04-19 23:06:44,203 10277 org.apache.kafka.clients.consumer.ConsumerConfig [main] ConsumerConfig values:
...
| bootstrap.servers = [our-deployment-name-here-1999.aivencloud.com:16493]
...
| security.protocol = PLAINTEXT
...
| ssl.key.password = null
| ssl.keymanager.algorithm = SunX509
| ssl.keystore.location = null
| ssl.keystore.password = null
| ssl.keystore.type = JKS
| ssl.protocol = TLS
...
| ssl.trustmanager.algorithm = PKIX
| ssl.truststore.location = null
| ssl.truststore.password = null
| ssl.truststore.type = JKS
Interestingly, although the Schema Registry reports the default credential source of URL, it will not even acknowledge basic HTTP authentication credentials as a fallback position.
I have found a temporary workaround, and share it here in hopes that it helps lead to a more durable solution...
I modified src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala
, changing this block at the end of read(sparkSession: SparkSession): DataFrame
val kafkaDataFrame = inputStream.load()
schemaRegistryUrl match {
case Some(url) => {
var schemaRegistryMap = createSchemaRegistryConfig(url, schemaSubject.getOrElse(topic.get) ,schemaId)
kafkaDataFrame.select(from_confluent_avro(col("value"), schemaRegistryMap) as "result").select("result.*")
}
case None => kafkaDataFrame
}
... by inserting a little extra logic intro the schemaRegistryMap, where they willl get selected and returned through the output DataFrame ...
val kafkaDataFrame = inputStream.load()
schemaRegistryUrl match {
case Some(url) => {
var schemaRegistryMap = createSchemaRegistryConfig(url, schemaSubject.getOrElse(topic.get) ,schemaId)
// BEGIN INSERTION
if (options.nonEmpty) {
schemaRegistryMap ++= options.get
}
// END INSERTION
kafkaDataFrame.select(from_confluent_avro(col("value"), schemaRegistryMap) as "result").select("result.*")
}
case None => kafkaDataFrame
}
An earlier block of code just before this region that looks like it may have been trying to do much the same is not effective, at least for this purpose, because the InputStream it populates does not get used to provide the map of configuration properties too ABRiS. I have not confirmed that map is also used to configure the Kafka Consumer, but since I started seeing my SSL client credential settings appear once I'd compiled this change in, I presume that they most likely have to.
// Not sure what this accomplishes?
if (options.nonEmpty) {
inputStream.options(options.get)
}
OK so you need the parameters at Abris level as they are forwarded to the schema registry.
Would love a PR on this, I think you can add a new notion of schemaRegistryOptions
that will be added to the schema registry as part of the createSchemaRegistryConfig
function, WDYT?