mmolimar/kafka-connect-fs

DataException: Not a struct schema: Schema{ARRAY}

mdespriee opened this issue · 2 comments

I have a confusing error while processing json files (record-per-line format). See stacktrace below.
We're used to process this kind of json with jackson, without any specific DeserializationFeature on jackson. The only difference is the record-per-line format, but this does not seem related to this error anyway.

Any clue ?
(i'm using v1.0.0)

 org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY}
 	at org.apache.kafka.connect.data.Struct.<init>(Struct.java:53)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.mapValue(JsonFileReader.java:197)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.lambda$mapValue$2(JsonFileReader.java:207)
 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
 	at java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:899)
 	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
 	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
 	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.mapValue(JsonFileReader.java:208)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.lambda$mapValue$1(JsonFileReader.java:201)
 	at java.util.Iterator.forEachRemaining(Iterator.java:116)
 	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.mapValue(JsonFileReader.java:200)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.lambda$mapValue$1(JsonFileReader.java:201)
 	at java.util.Iterator.forEachRemaining(Iterator.java:116)
 	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.mapValue(JsonFileReader.java:200)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.lambda$toStruct$0(JsonFileReader.java:161)
 	at java.util.Iterator.forEachRemaining(Iterator.java:116)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.toStruct(JsonFileReader.java:160)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.apply(JsonFileReader.java:153)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.apply(JsonFileReader.java:149)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.AbstractFileReader.next(AbstractFileReader.java:63)
 	at com.github.mmolimar.kafka.connect.fs.FsSourceTask.lambda$poll$0(FsSourceTask.java:83)
 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
 	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
 	at java.util.Iterator.forEachRemaining(Iterator.java:116)
 	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
 	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
 	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
 	at com.github.mmolimar.kafka.connect.fs.FsSourceTask.poll(FsSourceTask.java:92)
 	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270)
 	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237)
 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 	at java.lang.Thread.run(Thread.java:748)

I managed to reproduce it with this json: {"data": [{"name": "foo"},{"name": "bar"}]}

Hi!
There was an issue when extracting the schema from the JSON files and it's already fixed. A new version will be released within two weeks or so but, in the meantime, you could use the code which is in the develop branch.