salesforce/mirus

Applying transforms

renatomefi opened this issue · 8 comments

Hello,

First of all thanks for this tool, it looks super nice!

I've been trying to make a different setup where I actually apply a transform in between the mirroring, the problem is that my transform receives a null in the record apply as the kafka connect interface suggests, here is the example:

kafka-connect_1            | Caused by: java.lang.NullPointerException
kafka-connect_1            | 	at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:187)
kafka-connect_1            | 	at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:44)
kafka-connect_1            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
kafka-connect_1            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

configuration

{
   "connector.class": "com.salesforce.mirus.MirusSourceConnector",
   "tasks.max": "1",
   "topics.whitelist": "legacy_data_syncer.op.accounts",
   "destination.topic.name.suffix": ".mirror",
   "destination.bootstrap.servers": "kafka:9092",
   "consumer.bootstrap.servers": "kafka:9092",
   "connector.key.converter": "org.apache.kafka.connect.json.JsonConverter",
   "connector.value.converter": "org.apache.kafka.connect.json.JsonConverter",
   "consumer.client.id": "mirus-legacy_data_syncer.op.accounts1",
   "consumer.key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
   "consumer.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
   "transforms":"unwrap",
   "transforms.unwrap.type":"io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope",
   "transforms.unwrap.array.encoding":"document",
   "transforms.unwrap.flatten.struct":"false"
}

Am I trying to something not supported or there's more going on?

Thanks for any help!

All standard Kafka Connect functionality should work, so this should be fine (it's not something I've actually tested before though). I think your deserializer configuration keys might be wrong. Deserializers are "connector" level properties, but properties with theconsumer.are passed direct to the embedded Kafka consumer client. Try changing consumer.key.deserializer and consumer.value.deserializer to key.deserializer and value.deserializer respectively.

Hello @pdavidson100

Thanks for the answer, yes, you're correct, although my deserializer still can't get the record correctly tho, but doesn't look related to mirus but to either my data, the deserializer or the converter, the fact is that I'm receiving my record value with both schema and payload within it.
I'm digging some more and although it was nothing to do with mirus if you have suggestions I'll be happy to hear!

Thanks again and I'm closing this issue

Hello @pdavidson100

So the problem seems to be that source connectors only apply converters when it's writing to kafka, not when it's reading

So I got two questions
Is it possible to manually call the converter in the task?
Or would it better to make mirus a sink connector?

Either way I'd be willing to open prs to make it possible to apply transforms that can manipulate the data. If you know another approach I'd love to hear!

Thanks

Hi @renatomefi . I'm not convinced there's a problem. KIP-66 says "For source connectors, transformations are applied on the collection of SourceRecord retrieved from SourceTask.poll()." You can see the transforms being applied in the sendRecords() method of WorkerSourceTask. That default Source Connector behaviour should work for you: if everything is configured properly your SourceRecord should be transformed before being sent to the destination cluster.

Hey @pdavidson100
I've spent some time trying to see the exactly scenario, allow me to start over and let's disconsider the previous conversation.

The TLDR would be:
The Mirus SourceConnector isn't compatible with Kafka Connect logs

Let's see the scenario:

  • Given a topic which has a log body which looks like:
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": true,
        "field": "id"
      }
    ],
    "optional": false
  },
  "payload": {
    "id": "hiThereMirus"
  }
}

This characterizes a kafka Connect log, this is like this in the topic since a source connector has populated this topic before

  • In order to process this this message in another connector we have at least to configure the deserializer to be json, thus the SourceRecord will have proper content instead of bytes:
{
...
   "consumer.key.deserializer": "org.apache.kafka.connect.json.JsonDeserializer",
   "consumer.value.deserializer": "org.apache.kafka.connect.json.JsonDeserializer",
...
}
  • We also want to utilize a json converter, since we'd like to keep the payload/schema structure:
{
...
   "key.converter": "org.apache.kafka.connect.json.JsonConverter",
   "value.converter": "org.apache.kafka.connect.json.JsonConverter",
   "key.converter.schemas.enable":"true",
   "value.converter.schemas.enable":"true",
...
}
  • What one would expect is that the SourceRecord is now pouplated with a schema and payload, but what we actually see is: (Using a official transform in order to remove custom code influence)
    screenshot from 2018-12-06 18-39-38

  • This happens because the converter is only applied in one direction, in the case of a SourceConnector it only applies when it's writing to a kafka topic as the documentation:
    screenshot from 2018-12-03 10-09-13

  • So since Mirus is based on the SourceConnector, it makes it inheritably incompatible with Kafka Connect Record, if we observe the code which creates the record since kafka doesn't do it for us, we can observe for instance that the schemas are arbitrary:

    return new SourceRecord(
    sourcePartition,
    offsetMap(consumerRecord.offset() + 1),
    destinationTopicNamePrefix + consumerRecord.topic() + destinationTopicNameSuffix,
    enablePartitionMatching ? consumerRecord.partition() : null,
    Schema.OPTIONAL_BYTES_SCHEMA,
    consumerRecord.key(),
    Schema.OPTIONAL_BYTES_SCHEMA,
    consumerRecord.value(),
    consumerRecord.timestamp(),
    connectHeaders);
    }

  • Thus my suggestion is, we make Mirus SourceConnector compatible with Kafka Connect logs, we can do it by utilizing the already set connector configuration or adding a couple of extra configurations to the connector, like for instance:

  "source.key.converter": ".org.apache.kafka.connect.json.JsonConverter",
  "source.value.converter": ".org.apache.kafka.connect.json.JsonConverter",

or one that utilizes the already set key.converter and value.converter

  "source.key.converter.enable": "true",
  "source.value.converter.enable": "true",

I hope this all makes more sense now, if you think that's a good solution I'm willing to work on a PR, please let me know what you think!

Thanks @renatomefi, your explanation does make sense. As you point out, Mirus currently always creates SourceRecord objects withSchema.OPTIONAL_BYTES_SCHEMA, which limits your options for transformations. Your suggestion seems reasonable, and thank you for the PR. I will need to give it a careful review, which I hope to have time for later this week.

Hello @pdavidson100, thanks for merging #26 and #28
Would you consider tagging a new version?

Thanks again

Done - v0.1.0 is now available.