apache/sedona

Apache Sedona on Fabric, does not read parquet from Lakehouse

Closed this issue · 7 comments

Expected behavior

Running df = sedona.read.format("geoparquet").load("/lakehouse/default/Files/samples/parquet/buildings.parquet")

should return a spark dataframe

Actual behavior

Returns an error

Py4JJavaError Traceback (most recent call last)
Cell In[16], line 1
----> 1 sedona.read.format("geoparquet").load("/lakehouse/default/Files/samples/parquet/buildings.parquet")

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py:300, in DataFrameReader.load(self, path, format, schema, **options)
298 self.options(**options)
299 if isinstance(path, str):
--> 300 return self._df(self._jreader.load(path))
301 elif path is not None:
302 if type(path) != list:

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.call(self, *args)
1316 command = proto.CALL_COMMAND_NAME +
1317 self.command_header +
1318 args_command +
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):

File /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception..deco(*a, **kw)
167 def deco(*a: Any, **kw: Any) -> Any:
168 try:
--> 169 return f(*a, **kw)
170 except Py4JJavaError as e:
171 converted = convert_exception(e.java_exception)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o6230.load.
: Operation failed: "Bad Request", 400, HEAD, http://onelake.dfs.fabric.microsoft.com/§redacted§/lakehouse/default/Files/samples/parquet/buildings.parquet?upn=false&action=getStatus&timeout=90
at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:231)
at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.lambda$execute$0(AbfsRestOperation.java:191)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:464)
at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:189)
at org.apache.hadoop.fs.azurebfs.services.AbfsClient.getPathStatus(AbfsClient.java:690)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFileStatus(AzureBlobFileSystemStore.java:1053)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:650)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:640)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1760)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.exists(AzureBlobFileSystem.java:1236)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:757)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:755)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:393)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

Steps to reproduce the problem

Add a parquet file to the default Lakehouse for a workspace. Try to read it from the path.

Some thing I noticed: Fabric mounts in the Lakehouse can be reached using the python os package but not with the notebookutils.mssparkutils python package. So mssparkutils will return the dfs location with the uuid, rather than the alias for mounted path. Might be something that is interesting.

Settings

Sedona version = 1.5.1

Apache Spark version = 3.4

Apache Flink version = Not applicable

API type = Python

Scala version = 2.12

JRE version = Cluster default

Python version = Cluster default

Environment = Microsoft Fabric

@robertnagy1 I guess you need to use something like abfss://XXX. This path must be available to all workers in the cluster.

This should also apply to all other Spark native data sources too.

That's the thing, Fabric mounts by default the dfs to the Lakehouse, so they have some internal logic that propagates this to other cluster workers. The dfs is never exposed to the end user only the alias. I dont know what Sedona expects when it reads parquet? and how is it trying to reach it? As mentioned in Azure synapse you would find the path you want to write with using mssparkutils.fsls("") whilst in Fabric you would find the path using os.listdir("")

@robertnagy1 Just tried. This works for me:

sedona.read.format("geoparquet").load("Files/example-1.0.0-beta.1.parquet").show()

image

My file location (choose relative path for Spark):
image

Replaced the path, seems like it is enough to refer to "Tables" you don't need to refer to the full mounted path /lakehouse/default/Tables. But i got a second error. I am using a osm_buildings which i converted to parquet.

Py4JJavaError Traceback (most recent call last)
Cell In[64], line 1
----> 1 df = sedona.read.format("geoparquet").load("Files/samples/parquet/buildings_parquet.parquet").show()

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py:899, in DataFrame.show(self, n, truncate, vertical)
893 raise PySparkTypeError(
894 error_class="NOT_BOOL",
895 message_parameters={"arg_name": "vertical", "arg_type": type(vertical).name},
896 )
898 if isinstance(truncate, bool) and truncate:
--> 899 print(self._jdf.showString(n, 20, vertical))
900 else:
901 try:

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.call(self, *args)
1316 command = proto.CALL_COMMAND_NAME +
1317 self.command_header +
1318 args_command +
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):

File /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception..deco(*a, **kw)
167 def deco(*a: Any, **kw: Any) -> Any:
168 try:
--> 169 return f(*a, **kw)
170 except Py4JJavaError as e:
171 converted = convert_exception(e.java_exception)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o6258.showString.
: java.lang.NoSuchMethodError: 'boolean org.apache.spark.sql.internal.SQLConf.parquetFilterPushDownStringStartWith()'
at org.apache.spark.sql.execution.datasources.parquet.GeoParquetFileFormat.buildReaderWithPartitionValues(GeoParquetFileFormat.scala:213)
at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:569)
at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:558)
at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:588)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:282)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:279)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:227)
at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:531)
at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:459)
at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:458)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:502)
at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:52)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:755)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:282)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:279)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:227)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:400)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:534)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:519)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4203)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3174)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4193)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:642)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4191)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:120)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:209)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:105)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:67)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4191)
at org.apache.spark.sql.Dataset.head(Dataset.scala:3174)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3395)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:297)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:336)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)

@robertnagy1 This is likely caused by Spark + Sedona version mismatch.

Spark 3.0 - 3.3 will use sedona-spark-shaded-3.0_2.12
Spark 3.4 will use sedona-spark-shaded-3.4_2.12
Spark 3.5 will use sedona-spark-shaded-3.5_2.12

You are totally right. I apologize for not paying attention.

No problem. Glad that you solved the problem! I will close the ticket.