strimzi/kafka-kubernetes-config-provider

Will this work for MirrorMaker2

annymsMthd opened this issue · 5 comments

I'm trying this with a MirrorMaker2 CR and it doesn't seem to be working. Is this wired up into the config generation for that?

I think it should be working - but it depends for what. There are not too many places where to use it with Strimzi's MirrorMaker2 CR I think.

Wow quick response:) This is our usage.

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaMirrorMaker2
metadata:
  name: local-kafka-mirror-maker
spec:
  version: 2.7.0
  replicas: 1
  connectCluster: local-kafka
  image: custom-mirror-maker-worker:0.1.5
  clusters:
    - alias: "global"
      bootstrapServers: some-broker9092
      tls:
        trustedCertificates: []
    - alias: local-kafka
      bootstrapServers: some-other-bootstrap:9092
      config:
        config.storage.replication.factor: 3
        offset.storage.replication.factor: 3
        status.storage.replication.factor: 3
  mirrors:
    - sourceCluster: "global"
      targetCluster:  local-kafka
      topicsPattern: "some-topic"
      sourceConnector:
        config:
          config.providers: secrets,configmaps
          config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
          config.providers.configmaps.class: io.strimzi.kafka.KubernetesConfigMapConfigProvider

          replication.factor: 3
          sync.topic.acls.enabled: "false"
          sync.group.offsets.enabled: "false"

          key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
          value.converter: org.apache.kafka.connect.converters.ByteArrayConverter

          transforms: "avroSchemaTransfer"
          transforms.avroSchemaTransfer.type: cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer
          transforms.avroSchemaTransfer.dest.schema.registry.url: "http://registry"
          transforms.avroSchemaTransfer.src.schema.registry.url: "https://some-other-registry"
          transforms.avroSchemaTransfer.src.basic.auth.credentials.source: "USER_INFO"
          transforms.avroSchemaTransfer.src.basic.auth.user.info: ${secrets:kafka/cloud-kafka:schemaRegistryUserInfo}
      heartbeatConnector:
        config:
          heartbeats.topic.replication.factor: 3
      checkpointConnector:
        config:
          checkpoints.topic.replication.factor: 3
  logging:
    type: inline
    loggers:
      log4j.rootLogger: "WARN"

I'm going to poke around and try to find the config that is generated. Currently we are getting:

021-11-13 23:17:02,900 ERROR Failed to start task global-> local-kafka.MirrorSourceConnector-0 (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-4]
org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.config.ConfigException: UserInfo must be provided when basic.auth.credentials.source is set to USER_INFO

Hmm, I never saw Mirror Maker used like this with transformation. Interesting.

The way I normally use it with Connect is that I initialise it in the Connect cluster configuration and not in the connectors. In the connectors, I just use it. Maybe you can try the same here and move the initialisation:

config.providers: secrets,configmaps
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
config.providers.configmaps.class: io.strimzi.kafka.KubernetesConfigMapConfigProvider

Into the config section of the target cluster. I.e.:

    - alias: local-kafka
      bootstrapServers: some-other-bootstrap:9092
      config:
        config.storage.replication.factor: 3
        offset.storage.replication.factor: 3
        status.storage.replication.factor: 3
        config.providers: secrets,configmaps
        config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
        config.providers.configmaps.class: io.strimzi.kafka.KubernetesConfigMapConfigProvider

Not sure it helps, but worth trying I guess.

Awesome yeah I'll give that a try. Thanks for the pointers.

We are doing it like this because these is a global kafka with its own schema registry. We want to replicate topics around to different clusters throughout the world and also replicate the schemas so we don't have to have the clusters go all the way to the global registry.

I am so sorry. This is actually working. Our flux deployment is messing with the ${secrets:kafka/cloud-kafka:schemaRegistryUserInfo} part. Thank you very much for your time and thanks for the great project!