lensesio/stream-reactor

SimpleJsonConverter cannot convert nested jason in SimpleJsonConverter.java on kafka-connect-redis-kafka-3-1-4.0.0

armin-git opened this issue · 1 comments

Issue Guidelines

issue

this is my json

{ "group_name": "bogFan", "vendor_id": 32, "username": "351861861", "time": "12:27:12.4455667", "users": [ { "my_id": 32, "your_id": 31, "requests": [ { "sand_code": 321, "age_code": 311 }, { "sand_code": 322, "age_code": 312 } ] } ], "create_date": "2023-01-29 12:27" }

when I want to convert it and send it to redis, I get this message:

org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.DataException: Mismatching schema.

the error come from this file

https://github.com/lensesio/kafka-connect-common/blob/master/src/main/java/com/datamountaineer/streamreactor/connect/json/SimpleJsonConverter.java

line 172 and 217

SimpleJsonConverter cannot convert nested jason, for more details it cannot convert users in my json.

for more details in that file in line 213
if (struct.schema() != schema) throw new DataException("Mismatching schema.");

cause throw exception while this code work for simple json but not my json.

What version of the Stream Reactor are you reporting this issue for?

kafka-connect-redis-kafka-3-1-4.0.0
I use kafka version 3.3-IV3

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

yes

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

no

Have you read the docs?

yes

What is the expected behaviour?

it should convert my json noirmaly in save it on redis

What was observed?

connector task get fails

What is your connector properties configuration (my-connector.properties)?

this is connector config
{ "connector.class": "com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector", "errors.log.include.messages": "true", "connect.redis.port": "6379", "tasks.max": "1", "topics": "users-data", "connect.redis.host": "redis-master", "key.converter.schemas.enable": "false", "connect.redis.kcql": "INSERT INTO Users- SELECT * from users-data PK vendor_id TTL=7200", "name": "redis-sink", "value.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "errors.log.enable": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter" }

Please provide full log files (redact and sensitive information)

I see this error

at com.datamountaineer.streamreactor.connect.json.SimpleJsonConverter.convertToJson(SimpleJsonConverter.java:217)
at com.datamountaineer.streamreactor.connect.json.SimpleJsonConverter.convertToJson(SimpleJsonConverter.java:172)
Caused by: org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.DataException: Mismatching schema.

Thanks for raising and the detailed description.

I have added a fix for this on #916, please feel free to grab the artifact when it has finished building and see if this resolves the problem for you.