Azure/spark-cdm-connector

[Issue] Clarification on setting up Databricks managed identity

Closed this issue · 1 comments

Did you read the pinned issues and search the error message?

Yes, but I didn't find the answer.

Summary of issue

I'm trying to run one of the example problems in a Databricks cluster, but I keep getting the following error:

Py4JJavaError: An error occurred while calling o842.save. : java.lang.NoSuchMethodError: com.databricks.backend.daemon.data.client.adl.AdlGen2CredentialContextTokenProvider.getToken()Lshaded/databricks/v20180920_b33d810/org/apache/hadoop/fs/azurebfs/oauth2/AzureADToken; at com.microsoft.cdm.utils.CDMTokenProvider.<init>(CDMTokenProvider.scala:15) at com.microsoft.cdm.HadoopTables.load(HadoopTables.scala:11) at com.microsoft.cdm.CDMCatalog.loadTable(CDMCatalog.scala:33)

I noticed in the overview documentation it says:

In Synapse, the Spark CDM Connector supports use of Managed identities for Azure resource to mediate access to the Azure datalake storage account containing the CDM folder. A managed identity is automatically created for every Synapse workspace. The connector uses the managed identity of the workspace that contains the notebook in which the connector is called to authenticate to the storage accounts being addressed.
You must ensure the identity used is granted access to the appropriate storage accounts. Grant Storage Blob Data Contributor to allow the library to write to CDM folders, or Storage Blob Data Reader to allow only read access. In both cases, no additional connector options are required.

If this also applies for Databricks, I don’t really understand the last sentence of the first paragraph. What is the managed identity of the workspace? How do I add that in my storage account to grant access? I've tried adding an access connector for Databricks as well a the dbmanagedidentity of my Databricks instance, but neither have worked. More clarification in the documentation would be extremely helpful.

Error stack trace

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
File <command-1874582228428420>:32
     28 df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
     30 # Creates the CDM manifest and adds the entity to it with gzip'd parquet partitions
     31 # with both physical and logical entity definitions 
---> 32 (df.write.format("com.microsoft.cdm")
     33   .option("storage", storageAccountName)
     34   .option("manifestPath", container + "/default.manifest.cdm.json")
     35   .option("entity", "TestEntity")
     36   .option("format", "parquet")
     37   .option("compression", "gzip")
     38   .save())
     40 # Append the same dataframe content to the entity in the default CSV format
     41 # (df.write.format("com.microsoft.cdm")
     42 #   .option("storage", storageAccountName)
   (...)
     51 #   .option("entity", "TestEntity")
     52 #   .load())
     54 readDf.select("*").show()

File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     46 start = time.perf_counter()
     47 try:
---> 48     res = func(*args, **kwargs)
     49     logger.log_success(
     50         module_name, class_name, function_name, time.perf_counter() - start, signature
     51     )
     52     return res

File /databricks/spark/python/pyspark/sql/readwriter.py:1193, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
   1191     self.format(format)
   1192 if path is None:
-> 1193     self._jwrite.save()
   1194 else:
   1195     self._jwrite.save(path)

File /databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /databricks/spark/python/pyspark/sql/utils.py:209, in capture_sql_exception.<locals>.deco(*a, **kw)
    207 def deco(*a: Any, **kw: Any) -> Any:
    208     try:
--> 209         return f(*a, **kw)
    210     except Py4JJavaError as e:
    211         converted = convert_exception(e.java_exception)

File /databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o891.save.
: java.lang.NoSuchMethodError: com.databricks.backend.daemon.data.client.adl.AdlGen2CredentialContextTokenProvider.getToken()Lshaded/databricks/v20180920_b33d810/org/apache/hadoop/fs/azurebfs/oauth2/AzureADToken;
	at com.microsoft.cdm.utils.CDMTokenProvider.<init>(CDMTokenProvider.scala:15)
	at com.microsoft.cdm.HadoopTables.load(HadoopTables.scala:11)
	at com.microsoft.cdm.CDMCatalog.loadTable(CDMCatalog.scala:33)
	at com.microsoft.cdm.CDMCatalog.loadTable(CDMCatalog.scala:15)
	at org.apache.spark.sql.connector.catalog.TableCatalog.tableExists(TableCatalog.java:183)
	at com.microsoft.cdm.CDMCatalog.tableExists(CDMCatalog.scala:15)
	at org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:84)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.$anonfun$result$1(V2CommandExec.scala:47)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:47)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:45)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:54)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:235)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:245)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:424)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:190)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1038)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:144)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:374)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:235)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:220)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:233)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:226)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:106)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:298)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:294)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:226)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:354)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:226)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:180)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:171)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:283)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:964)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:380)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:258)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:306)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
	at java.lang.Thread.run(Thread.java:750)

Platform name

Databricks

Spark version

3.3.1

CDM jar version

3.3-1.19.5

What is the format of the data you are trying to read/write?

.csv

The document is only relevant for Synapse. The pinned issues mention:

As referenced in #134, credential passthrough is a Synapse specific feature. Use app registration or SAS token auth if you are not using Synapse.