Error writing parquet file from JSON with object array
Opened this issue · 2 comments
Im trying to use JsonSource
and write it as Parquet using ParquetSink
. The issue I'm running into is that I get a ClassCastException
whenever the Parquet writer encounters a Row
created from json with an array of objects. From what I see, it's because the writer is trying to cast a scala Map
to a Seq[Any]
in the StructWriter
.
Here is where the exception is thrown: https://github.com/51zero/eel-sdk/blob/v1.2.4/eel-core/src/main/scala/io/eels/component/parquet/RecordConsumerWriter.scala#L105
Here is the stack trace:
Exception in thread "main" java.lang.ClassCastException: scala.collection.immutable.Map$Map1 cannot be cast to scala.collection.Seq
at io.eels.component.parquet.StructWriter.write(RecordConsumerWriter.scala:105)
at io.eels.component.parquet.ArrayParquetWriter.$anonfun$write$2(RecordConsumerWriter.scala:84)
at io.eels.component.parquet.ArrayParquetWriter.$anonfun$write$2$adapted(RecordConsumerWriter.scala:81)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at io.eels.component.parquet.ArrayParquetWriter.write(RecordConsumerWriter.scala:81)
at io.eels.component.parquet.StructWriter.$anonfun$write$3(RecordConsumerWriter.scala:113)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at io.eels.component.parquet.StructWriter.write(RecordConsumerWriter.scala:106)
at io.eels.component.parquet.RowWriter.write(RowWriteSupport.scala:42)
at io.eels.component.parquet.RowWriteSupport.write(RowWriteSupport.scala:33)
at io.eels.component.parquet.RowWriteSupport.write(RowWriteSupport.scala:15)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
at io.eels.component.parquet.ParquetSink$$anon$1.write(ParquetSink.scala:35)
at io.eels.datastream.SinkAction$$anon$2.$anonfun$run$3(SinkAction.scala:54)
at io.eels.datastream.SinkAction$$anon$2.$anonfun$run$3$adapted(SinkAction.scala:53)
at scala.collection.immutable.List.foreach(List.scala:392)
at io.eels.datastream.SinkAction$$anon$2.$anonfun$run$2(SinkAction.scala:53)
at io.eels.datastream.SinkAction$$anon$2.$anonfun$run$2$adapted(SinkAction.scala:52)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at io.eels.datastream.SinkAction$$anon$2.run(SinkAction.scala:52)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Here is a fully runnable example (will need hadoop deps if you don't already have them):
import java.io.{ByteArrayInputStream, File, FileInputStream}
import java.nio.charset.StandardCharsets
import io.eels.component.json.JsonSource
import io.eels.component.parquet.ParquetSink
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
object ArrayOfObjectsTest extends App {
// Setup Filesystem
val hadoopConf = new Configuration()
hadoopConf.addResource(
new FileInputStream(new File(getClass.getResource("/hadoop/core-site.xml").toURI))
)
hadoopConf.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)
implicit val fs: FileSystem = FileSystem.get(hadoopConf)
// Setup Parquet Sink
// Intentionally just overwrite the same file
private val hadoopPath = s"/tmp/parquet-test"
private val sink = ParquetSink(fs.makeQualified(new Path(hadoopPath)))
.withOverwrite(true)
// Setup JSON source with constant JSON value
// Notice the `objectArray` field is an array of objects
val source = JsonSource(() => {
val jsonString = """
{"objectArray":[{"key1":"value1"}]}
""".stripMargin
new ByteArrayInputStream(jsonString.getBytes(StandardCharsets.UTF_8))
})
// This throws: java.lang.ClassCastException: scala.collection.immutable.Map$Map1 cannot be cast to scala.collection.Seq
source.toDataStream().to(sink)
}
core-site.xml
file for local hadoop dev (I put this in my resources folder in a hadoop
sub directory):
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/tmp</value>
</property>
<property>
<name>fs.default.name</name>
<value>file:///</value>
</property>
</configuration>
I do realize that this is on version 1.2.4
of eels but there is no later version for scala 2.12 on Maven. Can you see any reason why Im getting this writer error? If it's an actual bug I'd be happy to take a stab at fixing it.
Hi haggy, thanks for raising this issue. This usecase is exactly the type of functionality eel looks to provide. We would be very welcome to you have a look at a fix, thanks.
Also, we'll look to get a new version out on 2.12 soon.
Thanks
@garyfrost I've started looking into this bug. Just an FYI, the build is currently failing due to a single test in the CsvSink
: ""support overwrite"
"[998e7733-fbba-4361-96fa-f92596031e5f].csv" was not equal to "[d7238986-3d82-463b-a716-0de0e115bbce].csv"
ScalaTestFailureLocation: io.eels.component.csv.CsvSinkTest$$anonfun$1$$anonfun$apply$mcV$sp$5 at (CsvSinkTest.scala:114)
Expected :"[d7238986-3d82-463b-a716-0de0e115bbce].csv"
Actual :"[998e7733-fbba-4361-96fa-f92596031e5f].csv"
<Click to see difference>
org.scalatest.exceptions.TestFailedException: "[998e7733-fbba-4361-96fa-f92596031e5f].csv" was not equal to "[d7238986-3d82-463b-a716-0de0e115bbce].csv"
at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340)
at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864)
at io.eels.component.csv.CsvSinkTest$$anonfun$1$$anonfun$apply$mcV$sp$5.apply(CsvSinkTest.scala:114)
at io.eels.component.csv.CsvSinkTest$$anonfun$1$$anonfun$apply$mcV$sp$5.apply(CsvSinkTest.scala:79)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.WordSpec.withFixture(WordSpec.scala:1881)
at org.scalatest.WordSpecLike$class.invokeWithFixture$1(WordSpecLike.scala:1075)
at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:1088)
at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:1088)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.WordSpecLike$class.runTest(WordSpecLike.scala:1088)
at io.eels.component.csv.CsvSinkTest.org$scalatest$BeforeAndAfter$$super$runTest(CsvSinkTest.scala:15)
at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:203)
at io.eels.component.csv.CsvSinkTest.runTest(CsvSinkTest.scala:15)
it's more of a heads up. Im not going to dig into that as it's a pre-existing issue and Im not familiar with that piece of it