DataException: Not a struct schema: Schema{ARRAY}
mdespriee opened this issue · 2 comments
mdespriee commented
I have a confusing error while processing json files (record-per-line format). See stacktrace below.
We're used to process this kind of json with jackson, without any specific DeserializationFeature on jackson. The only difference is the record-per-line format, but this does not seem related to this error anyway.
Any clue ?
(i'm using v1.0.0)
org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY}
at org.apache.kafka.connect.data.Struct.<init>(Struct.java:53)
at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.mapValue(JsonFileReader.java:197)
at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.lambda$mapValue$2(JsonFileReader.java:207)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:899)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.mapValue(JsonFileReader.java:208)
at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.lambda$mapValue$1(JsonFileReader.java:201)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.mapValue(JsonFileReader.java:200)
at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.lambda$mapValue$1(JsonFileReader.java:201)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.mapValue(JsonFileReader.java:200)
at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.lambda$toStruct$0(JsonFileReader.java:161)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.toStruct(JsonFileReader.java:160)
at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.apply(JsonFileReader.java:153)
at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.apply(JsonFileReader.java:149)
at com.github.mmolimar.kafka.connect.fs.file.reader.AbstractFileReader.next(AbstractFileReader.java:63)
at com.github.mmolimar.kafka.connect.fs.FsSourceTask.lambda$poll$0(FsSourceTask.java:83)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at com.github.mmolimar.kafka.connect.fs.FsSourceTask.poll(FsSourceTask.java:92)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
mdespriee commented
I managed to reproduce it with this json: {"data": [{"name": "foo"},{"name": "bar"}]}
mmolimar commented
Hi!
There was an issue when extracting the schema from the JSON files and it's already fixed. A new version will be released within two weeks or so but, in the meantime, you could use the code which is in the develop
branch.