Fails to write record containing map of array of record
Opened this issue · 3 comments
Hi,
Spark-avro fails to write a record that contains map of array of record with the following error:
org.apache.avro.file.DataFileWriter$AppendWriteException: org.apache.avro.UnresolvedUnionException: Not in union [{"type":"record","name":"properties","fields":[{"name":"string","type":["string","null"]}]},"null"]: {"string": "one"} at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:308) at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77) at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39) at com.databricks.spark.avro.AvroOutputWriter.write(AvroOutputWriter.scala:81) at com.databricks.spark.avro.AvroOutputWriter.write(AvroOutputWriter.scala:75) at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:327) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.avro.UnresolvedUnionException: Not in union [{"type":"record","name":"properties","fields":[{"name":"string","type":["string","null"]}]},"null"]: {"string": "one"} at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:740) at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:205) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:123) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159) at org.apache.avro.reflect.ReflectDatumWriter.writeObjectArray(ReflectDatumWriter.java:120) at org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:111) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:125) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159) at org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:121) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:125) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90) at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62) at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:302) ... 17 more
schema.json
{
"name": "Event",
"namespace": "com.laymain.sandbox.avro",
"type": "record",
"fields": [
{
"name": "properties",
"type": {
"type": "map",
"values": {
"type": "array",
"items": {
"type": "record",
"name": "ArrayObjectProperties",
"fields": [
{
"name": "string",
"type": "string",
"connect.parameters": {
"avro.java.string": "String"
},
"avro.java.string": "String"
}
]
}
}
}
}
]
}
event.json
{
"properties": {
"object": [
{ "string": "one" },
{ "string": "two" }
]
}
}
Avro file generated using avro-tools:
java -jar avro-tools-1.8.2.jar --schema-file schema.json event.json > event.avro
Spark code:
public static void main(String[] args) throws Exception {
URL avroResource = Main.class.getClassLoader().getResource("event.avro");
if (avroResource == null) {
throw new RuntimeException("Missing resource event.avro");
}
SparkSession sparkSession = SparkSession.builder()
.appName("com.laymain.sandbox.avro")
.master("local[*]")
.getOrCreate();
sparkSession
.read()
.format("com.databricks.spark.avro")
.load(avroResource.getPath())
.write()
.mode(SaveMode.Overwrite)
.format("com.databricks.spark.avro")
.save("output");
}
Due to #92 the generated schema for the Spark Row has all its fields nullable, which gives the following output schema:
{
"type": "record",
"name": "topLevelRecord",
"fields": [{
"name": "properties",
"type": [{
"type": "map",
"values": [{
"type": "array",
"items": [{
"type": "record",
"name": "properties",
"fields": [{
"name": "string",
"type": ["string", "null"]
}
]
}, "null"]
}, "null"]
}, "null"]
}
]
}
When I try to use this schema with the json object to generate a new avro file:
avro-tool fromjson --schema-file schema-generated.json event.json > event-generated.avro
I get the following error:
Exception in thread "main" org.apache.avro.AvroTypeException: Unknown union branch object at org.apache.avro.io.JsonDecoder.readIndex(JsonDecoder.java:445) at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at org.apache.avro.tool.DataFileWriteTool.run(DataFileWriteTool.java:99) at org.apache.avro.tool.Main.run(Main.java:87) at org.apache.avro.tool.Main.main(Main.java:76)
Is the generated schema invalid or is it a bug in Avro?
It is linked to the nullable issue (#92), the expected new input data for the generated schema is:
{
"properties": {
"map": {
"object": {
"array": [
{"properties": {"string": {"string": "one"}}},
{"properties": {"string": {"string": "two"}}}
]
}
}
}
}
instead of the initial input
{
"properties": {
"object": [
{ "string": "one" },
{ "string": "two" }
]
}
}
I found a workaround by using the schema to generate the right StructType with SchemaConverters and creating a new dataset with this StructType:
public static void main(String[] args) throws Exception {
Schema schema = new Schema.Parser().parse(Main.class.getClassLoader().getResourceAsStream("schema.json"));
DataType dataType = SchemaConverters.toSqlType(schema).dataType();
StructType structType = (StructType)dataType;
URL avroResource = Main.class.getClassLoader().getResource("event.avro");
if (avroResource == null) {
throw new RuntimeException("Missing resource event.avro");
}
SparkSession sparkSession = SparkSession.builder()
.appName("com.laymain.sandbox.avro")
.master("local[*]")
.getOrCreate();
Dataset<Row> dataset = sparkSession
.read()
.format("com.databricks.spark.avro")
.load(avroResource.getPath());
dataset
.sqlContext()
.createDataFrame(dataset.rdd(), structType)
.write()
.mode(SaveMode.Overwrite)
.format("com.databricks.spark.avro")
.save("output");
}