SimpleJsonConverter cannot convert nested jason in SimpleJsonConverter.java on kafka-connect-redis-kafka-3-1-4.0.0
armin-git opened this issue · 1 comments
Issue Guidelines
issue
this is my json
{ "group_name": "bogFan", "vendor_id": 32, "username": "351861861", "time": "12:27:12.4455667", "users": [ { "my_id": 32, "your_id": 31, "requests": [ { "sand_code": 321, "age_code": 311 }, { "sand_code": 322, "age_code": 312 } ] } ], "create_date": "2023-01-29 12:27" }
when I want to convert it and send it to redis, I get this message:
org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.DataException: Mismatching schema.
the error come from this file
line 172 and 217
SimpleJsonConverter cannot convert nested jason, for more details it cannot convert users
in my json.
for more details in that file in line 213
if (struct.schema() != schema) throw new DataException("Mismatching schema.");
cause throw exception while this code work for simple json but not my json.
What version of the Stream Reactor are you reporting this issue for?
kafka-connect-redis-kafka-3-1-4.0.0
I use kafka version 3.3-IV3
Are you running the correct version of Kafka/Confluent for the Stream reactor release?
yes
Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?
no
Have you read the docs?
yes
What is the expected behaviour?
it should convert my json noirmaly in save it on redis
What was observed?
connector task get fails
What is your connector properties configuration (my-connector.properties)?
this is connector config
{ "connector.class": "com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector", "errors.log.include.messages": "true", "connect.redis.port": "6379", "tasks.max": "1", "topics": "users-data", "connect.redis.host": "redis-master", "key.converter.schemas.enable": "false", "connect.redis.kcql": "INSERT INTO Users- SELECT * from users-data PK vendor_id TTL=7200", "name": "redis-sink", "value.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "errors.log.enable": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter" }
Please provide full log files (redact and sensitive information)
I see this error
at com.datamountaineer.streamreactor.connect.json.SimpleJsonConverter.convertToJson(SimpleJsonConverter.java:217)
at com.datamountaineer.streamreactor.connect.json.SimpleJsonConverter.convertToJson(SimpleJsonConverter.java:172)
Caused by: org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.DataException: Mismatching schema.
Thanks for raising and the detailed description.
I have added a fix for this on #916, please feel free to grab the artifact when it has finished building and see if this resolves the problem for you.