AbsaOSS/ABRiS

Nested avro schema registry not supported

Closed this issue · 4 comments

Hey,

I tried POC with this package with PySpark/scala.
I see that with nested registry schema, I get this exception:
Caused by: org.apache.avro.AvroRuntimeException: Not a record: [{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}]

While if I use regular schema it works ok - like
{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}.
I saw there's this issue that covers the nested schema: https://github.com/AbsaOSS/ABRiS/pull/61

ABRIS version - 6.3.0
spark version - 3.1.2
scala version - 2.12.10

Does the package not support nested schema?
let me know if more details are needed to understand my case, or if there is something I'm missing.

Hi @ShaniAlisarMH ABRiS supports unions, although a top-level union is probably rather rare. Could you paste the stacktrace please?

@kevinwallimann, of course!

>>> from_avro_abris_settings = from_avro_abris_config({'schema.registry.url': 'http://localhost:8081'}, 'test', False)
>>> output = df.withColumn("parsed", from_avro("value", from_avro_abris_settings))
>>> output.show()
22/07/25 14:00:40 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.spark.sql.avro.AbrisAvroDeserializer.$anonfun$deserializer$1(AbrisAvroDeserializer.scala:36)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.avro.AbrisAvroDeserializer.<init>(AbrisAvroDeserializer.scala:36)
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.deserializer$lzycompute(AvroDataToCatalyst.scala:71)
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.deserializer(AvroDataToCatalyst.scala:71)
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:87)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:832)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:359)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.avro.AvroRuntimeException: Not a record: [{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}]
	at org.apache.avro.Schema.getField(Schema.java:212)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:326)
	at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:72)
	at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:53)
	... 27 more
22/07/25 14:00:40 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1) (ip.internal executor driver): java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.spark.sql.avro.AbrisAvroDeserializer.$anonfun$deserializer$1(AbrisAvroDeserializer.scala:36)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.avro.AbrisAvroDeserializer.<init>(AbrisAvroDeserializer.scala:36)
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.deserializer$lzycompute(AvroDataToCatalyst.scala:71)
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.deserializer(AvroDataToCatalyst.scala:71)
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:87)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:832)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:359)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.avro.AvroRuntimeException: Not a record: [{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}]
	at org.apache.avro.Schema.getField(Schema.java:212)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:326)
	at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:72)
	at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:53)
	... 27 more

22/07/25 14:00:40 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 485, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o103.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (ip.internal executor driver): java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.spark.sql.avro.AbrisAvroDeserializer.$anonfun$deserializer$1(AbrisAvroDeserializer.scala:36)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.avro.AbrisAvroDeserializer.<init>(AbrisAvroDeserializer.scala:36)
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.deserializer$lzycompute(AvroDataToCatalyst.scala:71)
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.deserializer(AvroDataToCatalyst.scala:71)
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:87)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:832)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:359)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.avro.AvroRuntimeException: Not a record: [{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}]
	at org.apache.avro.Schema.getField(Schema.java:212)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:326)
	at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:72)
	at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:53)
	... 27 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2470)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2419)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2418)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2418)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1125)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1125)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1125)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2684)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2626)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2615)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2241)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2262)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2281)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:486)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:439)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3760)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2763)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3751)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3749)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2763)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2970)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:303)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:340)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.spark.sql.avro.AbrisAvroDeserializer.$anonfun$deserializer$1(AbrisAvroDeserializer.scala:36)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.avro.AbrisAvroDeserializer.<init>(AbrisAvroDeserializer.scala:36)
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.deserializer$lzycompute(AvroDataToCatalyst.scala:71)
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.deserializer(AvroDataToCatalyst.scala:71)
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:87)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:832)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:359)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.avro.AvroRuntimeException: Not a record: [{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}]
	at org.apache.avro.Schema.getField(Schema.java:212)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:326)
	at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:72)
	at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:53)
	... 27 more

Hi @ShaniAlisarMH

Unfortunately, this specific use-case is indeed impossible, bare union types are not supported. That's not a limitation of ABRiS, but rather of Spark, since ABRiS is just a wrapper around spark-avro with support for Confluent Schema Registry.

According to the documentation https://spark.apache.org/docs/3.1.2/sql-data-sources-avro.html#supported-types-for-avro---spark-sql-conversion, the conversion of a union should be supported, however I couldn't make this work, and I don't see how it can work with the current implementation of AvroDeserializer.

Basically, AvroDeserializer assumes a Catalyst StructType to always be an Avro Record type. However, that doesn't work nicely together with SchemaConverters which converts a Union type to a StructType as well. So the assumption in AvroDeserializer does not hold and therefore the exception occurs.

In Spark 3.2.1, the error message is a bit clearer:

org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro type [{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}] to SQL type STRUCT<`f1`: STRING>.

As a workaround, I suggest you wrap a record around your union to make the conversion work.

Thank you @kevinwallimann for looking into this! ill close this issue :)