YotpoLtd/metorikku

Kafka Clients cannot provide Broker SSL credentials or Schema Registry Basic Auth

jheinnic opened this issue · 1 comments

We use Aiven as a service provider and therefore need to provide client SSL certificates to authenticate with our brokers and have to provide Basic HTTP Authentication credentials to interface with our schema registry. I've been working with the following sketch of a job config file:

metrics:
  - /opt/spark/work-dir/assets/metric-01.yml
inputs:
  cdc_events:
    kafka:
      servers:
        - our-deployment-name-here-1999.aivencloud.com:16493
      topic: crossed-wires-topic
      schemaRegistryUrl: https://${SCHEMA_REGISTRY_BASIC_AUTH}@our-deployment-name-here-1999.aivencloud.com:16496/
      consumerGroup: metor-group
      options:
        startingOffsets: earliest
        kafka.security.protocol: SSL
        kafka.ssl.protocol: TLS
        kafka.ssl.key.password: ${KAFKA_SSL_KEY_PASSWORD}
        kafka.ssl.keystore.location: /etc/secrets/kafka/client.keystore.p12
        kafka.ssl.keystore.password: ${KAFKA_SSL_KEYSTORE_PASSWORD}
        kafka.ssl.keystore.type: PKCS12
        kafka.ssl.truststore.location: /etc/secrets/kafka/client.truststore.jks
        kafka.ssl.truststore.password: ${KAFKA_SSL_TRUSTSTORE_PASSWORD}
        kafka.ssl.truststore.type: JKS
        basic.auth.credentials.source: USER_INFO
        basic.auth.user.info: ${SCHEMA_REGISTRY_BASIC_AUTH}

The logs show that these settings are reaching neither the ABRiS schema registry client nor Metorikku's Kafka Consumer:

INFO	2020-04-19 23:06:45,387	11461	io.confluent.kafka.serializers.KafkaAvroDeserializerConfig	[main]	KafkaAvroDeserializerConfig values:
  | bearer.auth.token = [hidden]
  | schema.registry.url = [https://<SECRET_INFO>@our-deployment-name-here-1999.aivencloud.com:16496/ ]
  | basic.auth.user.info = [hidden]
  | auto.register.schemas = true
  | max.schemas.per.subject = 1000
  | basic.auth.credentials.source = URL
  | schema.registry.basic.auth.user.info = [hidden]
  | bearer.auth.credentials.source = STATIC_TOKEN
  | specific.avro.reader = false
  | value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
  | key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
INFO	2020-04-19 23:06:44,203	10277	org.apache.kafka.clients.consumer.ConsumerConfig	[main]	ConsumerConfig values:
...
  | bootstrap.servers = [our-deployment-name-here-1999.aivencloud.com:16493]
...
  | security.protocol = PLAINTEXT
...
  | ssl.key.password = null
  | ssl.keymanager.algorithm = SunX509
  | ssl.keystore.location = null
  | ssl.keystore.password = null
  | ssl.keystore.type = JKS
  | ssl.protocol = TLS
...
  | ssl.trustmanager.algorithm = PKIX
  | ssl.truststore.location = null
  | ssl.truststore.password = null
  | ssl.truststore.type = JKS

Interestingly, although the Schema Registry reports the default credential source of URL, it will not even acknowledge basic HTTP authentication credentials as a fallback position.

I have found a temporary workaround, and share it here in hopes that it helps lead to a more durable solution...

I modified src/main/scala/com/yotpo/metorikku/input/readers/kafka/KafkaInput.scala, changing this block at the end of read(sparkSession: SparkSession): DataFrame

    val kafkaDataFrame = inputStream.load()
    schemaRegistryUrl match {
      case Some(url) => {
        var schemaRegistryMap = createSchemaRegistryConfig(url, schemaSubject.getOrElse(topic.get) ,schemaId)
        kafkaDataFrame.select(from_confluent_avro(col("value"), schemaRegistryMap) as "result").select("result.*")
      }
      case None => kafkaDataFrame
    }

... by inserting a little extra logic intro the schemaRegistryMap, where they willl get selected and returned through the output DataFrame ...

    val kafkaDataFrame = inputStream.load()
    schemaRegistryUrl match {
      case Some(url) => {
        var schemaRegistryMap = createSchemaRegistryConfig(url, schemaSubject.getOrElse(topic.get) ,schemaId)
        // BEGIN INSERTION
        if (options.nonEmpty) {
          schemaRegistryMap ++= options.get
        }
       // END INSERTION
        kafkaDataFrame.select(from_confluent_avro(col("value"), schemaRegistryMap) as "result").select("result.*")
      }
      case None => kafkaDataFrame
    }

An earlier block of code just before this region that looks like it may have been trying to do much the same is not effective, at least for this purpose, because the InputStream it populates does not get used to provide the map of configuration properties too ABRiS. I have not confirmed that map is also used to configure the Kafka Consumer, but since I started seeing my SSL client credential settings appear once I'd compiled this change in, I presume that they most likely have to.

 // Not sure what this accomplishes?
 if (options.nonEmpty) {
      inputStream.options(options.get)
}

OK so you need the parameters at Abris level as they are forwarded to the schema registry.
Would love a PR on this, I think you can add a new notion of schemaRegistryOptions that will be added to the schema registry as part of the createSchemaRegistryConfig function, WDYT?