Azure/spark-cdm-connector

[Issue] Cannot read data when using Databricks Unity Catalog

Opened this issue · 13 comments

Did you read the pinned issues and search the error message?

Yes, but I didn't find the answer.

Summary of issue

When we migrated to a Unity Catalog Databricks workspace we noticed the package stopped working. It started throwing the error:

spark_catalog requires a single-part namespace, but got [xyz.dfs.core.windows.net, /xyz, model.json].

This is probably because the data reading works different with Unity. It still works if we use a cluster without Unity.

Have others run into the same issue? And can we help to resolve the issue?

Error stack trace

Exception: Stream account failed with exception: Traceback (most recent call last):
  File "", line 7, in ingest_stream
    stream.ingest()
  File "", line 7, in wrapper
    df: DataFrame = func(*args, **kwargs)
  File "", line 286, in ingest
    df = self._load()
  File "", line 12, in _load
    return (spark
  File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 48, in wrapper
    res = func(*args, **kwargs)
  File "/databricks/spark/python/pyspark/sql/readwriter.py", line 309, in load
    return self._df(self._jreader.load())
  File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/databricks/spark/python/pyspark/errors/exceptions.py", line 234, in deco
    raise converted from None
pyspark.errors.exceptions.AnalysisException: spark_catalog requires a single-part namespace, but got [[REDACTED].dfs.core.windows.net, /[REDACTED], model.json].

Platform name

Databricks

Spark version

3.3.2

CDM jar version

spark3.1-1.19.3

What is the format of the data you are trying to read/write?

.csv

Spark version
3.3.2
CDM jar version
spark3.1-1.19.3

@JulesHuisman There is a difference in Spark version between your platform and connector. You should first fix that.

Spark version
3.3.2
CDM jar version
spark3.1-1.19.3

@JulesHuisman There is a difference in Spark version between your platform and connector. You should first fix that.

Thanks for the tip! I do get a different issue now, which might bring me closer.

It now throws:

java.lang.ClassCastException: com.databricks.sql.managedcatalog.UnityCatalogV2Proxy cannot be cast to com.microsoft.cdm.CDMCatalog

Any idea what this might indicate?

This error is specific to Spark 3.3 and required some changes to the connector (which was done in the CDM Spark 3.3 connector) to make it work.
spark_catalog requires a single-part namespace, but got [xyz.dfs.core.windows.net, /xyz, model.json].

I missed a few lines in your post mentioning "It still works if we use a cluster without Unity". This was only developed for a cluster scenario. Without seeing the full stacktrace, I am guessing the error is due to the Databricks Unity Catalog platform
you are using.

Are there any workarounds in the meantime? Specifically for the

java.lang.ClassCastException: com.databricks.sql.managedcatalog.UnityCatalogV2Proxy cannot be cast to com.microsoft.cdm.CDMCatalog

error

@bradleyjamrozik Please give the full error stack trace for:

java.lang.ClassCastException: com.databricks.sql.managedcatalog.UnityCatalogV2Proxy cannot be cast to com.microsoft.cdm.CDMCatalog

We don't have any workarounds. As mentioned in #118, the original design of this is with Synapse Spark, which follows the open source code. I am suspecting the above error is originating from the platform you are using.

Py4JJavaError                             Traceback (most recent call last)
File <command-227313506375295>:8
      1 (spark.read.format("com.microsoft.cdm")
      2     .option("storage", "[REDACTED].dfs.core.windows.net")
      3     .option("manifestpath", "dynamics365-financeandoperations/[REDACTED].operations.dynamics.com/Tables/Common/Customer/Main/Main.manifest.cdm.json")
      4     .option("entity", "CustTable")
      5     .option("tenantid", tenantid)
      6     .option("appid", clientid)
      7     .option("appkey", clientsecret)
----> 8     .load()
      9 )

File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     46 start = time.perf_counter()
     47 try:
---> 48     res = func(*args, **kwargs)
     49     logger.log_success(
     50         module_name, class_name, function_name, time.perf_counter() - start, signature
     51     )
     52     return res

File /databricks/spark/python/pyspark/sql/readwriter.py:314, in DataFrameReader.load(self, path, format, schema, **options)
    312     return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    313 else:
--> 314     return self._df(self._jreader.load())

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception.<locals>.deco(*a, **kw)
    186 def deco(*a: Any, **kw: Any) -> Any:
    187     try:
--> 188         return f(*a, **kw)
    189     except Py4JJavaError as e:
    190         converted = convert_exception(e.java_exception)

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o532.load.
: java.lang.ClassCastException: com.databricks.sql.managedcatalog.UnityCatalogV2Proxy cannot be cast to com.microsoft.cdm.CDMCatalog
	at com.microsoft.cdm.DefaultSource.extractIdentifier(DefaultSource.scala:41)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.loadV2Source(DataSourceV2Utils.scala:115)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:333)
	at scala.Option.flatMap(Option.scala:271)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:331)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:226)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
	at py4j.Gateway.invoke(Gateway.java:306)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
	at java.lang.Thread.run(Thread.java:750)```

Makes sense, thank you

@bradleyjamrozik
In DefaultSource.scala, we explicitly define the "cdm" catalog and expect a CDMCatalog to be returned. Instead UnityCatalogV2Proxy is being returned instead.

It still works if we use a cluster without Unity.

This previous mention that it works leads me to believe the issue is due to how your platform is configured, which I do not have any insight. As mentioned in #118, this connector was originally built with Azure Synapse in mind.

@bradleyjamrozik In DefaultSource.scala, we explicitly define the "cdm" catalog and expect a CDMCatalog to be returned. Instead UnityCatalogV2Proxy is being returned instead.

It still works if we use a cluster without Unity.

This previous mention that it works leads me to believe the issue is due to how your platform is configured, which I do not have any insight. As mentioned in #118, this connector was originally built with Azure Synapse in mind.

Do you think this is a difficult fix, if we took the open source route?

Hi all,

any chance to get an update on this topic?

zpo commented

Same problem here. It would be awesome thing to make it run on Databrciks.

We built this package to make it work better with Databricks, it is not finished yet. But it might help: https://github.com/quantile-development/pyspark-cdm

there's a PR with documentation changes that make this issue, and the work around, clear.