Applying transforms
renatomefi opened this issue · 8 comments
Hello,
First of all thanks for this tool, it looks super nice!
I've been trying to make a different setup where I actually apply a transform in between the mirroring, the problem is that my transform receives a null
in the record apply
as the kafka connect interface suggests, here is the example:
kafka-connect_1 | Caused by: java.lang.NullPointerException
kafka-connect_1 | at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:187)
kafka-connect_1 | at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:44)
kafka-connect_1 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
kafka-connect_1 | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
configuration
{
"connector.class": "com.salesforce.mirus.MirusSourceConnector",
"tasks.max": "1",
"topics.whitelist": "legacy_data_syncer.op.accounts",
"destination.topic.name.suffix": ".mirror",
"destination.bootstrap.servers": "kafka:9092",
"consumer.bootstrap.servers": "kafka:9092",
"connector.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"connector.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"consumer.client.id": "mirus-legacy_data_syncer.op.accounts1",
"consumer.key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"consumer.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope",
"transforms.unwrap.array.encoding":"document",
"transforms.unwrap.flatten.struct":"false"
}
Am I trying to something not supported or there's more going on?
Thanks for any help!
All standard Kafka Connect functionality should work, so this should be fine (it's not something I've actually tested before though). I think your deserializer
configuration keys might be wrong. Deserializers are "connector" level properties, but properties with theconsumer.
are passed direct to the embedded Kafka consumer client. Try changing consumer.key.deserializer
and consumer.value.deserializer
to key.deserializer
and value.deserializer
respectively.
Hello @pdavidson100
Thanks for the answer, yes, you're correct, although my deserializer still can't get the record correctly tho, but doesn't look related to mirus but to either my data, the deserializer or the converter, the fact is that I'm receiving my record value with both schema and payload within it.
I'm digging some more and although it was nothing to do with mirus if you have suggestions I'll be happy to hear!
Thanks again and I'm closing this issue
Hello @pdavidson100
So the problem seems to be that source connectors only apply converters when it's writing to kafka, not when it's reading
So I got two questions
Is it possible to manually call the converter in the task?
Or would it better to make mirus a sink connector?
Either way I'd be willing to open prs to make it possible to apply transforms that can manipulate the data. If you know another approach I'd love to hear!
Thanks
Hi @renatomefi . I'm not convinced there's a problem. KIP-66 says "For source connectors, transformations are applied on the collection of SourceRecord
retrieved from SourceTask.poll()." You can see the transforms being applied in the sendRecords()
method of WorkerSourceTask
. That default Source Connector behaviour should work for you: if everything is configured properly your SourceRecord
should be transformed before being sent to the destination cluster.
Hey @pdavidson100
I've spent some time trying to see the exactly scenario, allow me to start over and let's disconsider the previous conversation.
The TLDR would be:
The Mirus SourceConnector isn't compatible with Kafka Connect logs
Let's see the scenario:
- Given a topic which has a log body which looks like:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "id"
}
],
"optional": false
},
"payload": {
"id": "hiThereMirus"
}
}
This characterizes a kafka Connect log, this is like this in the topic since a source connector has populated this topic before
- In order to process this this message in another connector we have at least to configure the deserializer to be json, thus the SourceRecord will have proper content instead of bytes:
{
...
"consumer.key.deserializer": "org.apache.kafka.connect.json.JsonDeserializer",
"consumer.value.deserializer": "org.apache.kafka.connect.json.JsonDeserializer",
...
}
- We also want to utilize a json converter, since we'd like to keep the payload/schema structure:
{
...
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter.schemas.enable":"true",
...
}
-
What one would expect is that the SourceRecord is now pouplated with a schema and payload, but what we actually see is: (Using a official transform in order to remove custom code influence)
-
This happens because the converter is only applied in one direction, in the case of a SourceConnector it only applies when it's writing to a kafka topic as the documentation:
-
So since Mirus is based on the SourceConnector, it makes it inheritably incompatible with Kafka Connect Record, if we observe the code which creates the record since kafka doesn't do it for us, we can observe for instance that the schemas are arbitrary:
mirus/src/main/java/com/salesforce/mirus/MirusSourceTask.java
Lines 201 to 212 in 2224ae4
-
Thus my suggestion is, we make Mirus SourceConnector compatible with Kafka Connect logs, we can do it by utilizing the already set connector configuration or adding a couple of extra configurations to the connector, like for instance:
"source.key.converter": ".org.apache.kafka.connect.json.JsonConverter",
"source.value.converter": ".org.apache.kafka.connect.json.JsonConverter",
or one that utilizes the already set key.converter
and value.converter
"source.key.converter.enable": "true",
"source.value.converter.enable": "true",
I hope this all makes more sense now, if you think that's a good solution I'm willing to work on a PR, please let me know what you think!
Thanks @renatomefi, your explanation does make sense. As you point out, Mirus currently always creates SourceRecord
objects withSchema.OPTIONAL_BYTES_SCHEMA
, which limits your options for transformations. Your suggestion seems reasonable, and thank you for the PR. I will need to give it a careful review, which I hope to have time for later this week.
Hello @pdavidson100, thanks for merging #26 and #28
Would you consider tagging a new version?
Thanks again
Done - v0.1.0 is now available.