Azure/spark-cdm-connector

Unable to read from secondary endpoint

Opened this issue · 0 comments

We are trying to read from the secondary endpoint for our Azure Data Lake, however, this doesn't seem to be working with the CDM connector.

dataFrame = spark.read.format("com.microsoft.cdm") \
  .option("storage", "storageaccount-secondary.dfs.core.usgovcloudapi.net") \
  .option("manifestPath", "path/to/model.json") \
  .option("entity", entity) \
  .load()

dataFrame.write.mode("append").parquet("abfss://container@storageaccount2.dfs.core.usgovcloudapi.net/path")

If we switch it to the primary endpoint (removing -secondary), then it works perfectly.

Here is the error we're getting:

Py4JJavaError: An error occurred while calling o1016.parquet.
: java.lang.NullPointerException
	at com.microsoft.cdm.utils.CDMModelCommon$$anonfun$3.apply(CDMModelCommon.scala:97)
	at com.microsoft.cdm.utils.CDMModelCommon$$anonfun$3.apply(CDMModelCommon.scala:96)
	at com.microsoft.cdm.read.CDMDataSourceReader$$anonfun$planInputPartitions$1.apply(CDMDataSourceReader.scala:71)
	at com.microsoft.cdm.read.CDMDataSourceReader$$anonfun$planInputPartitions$1.apply(CDMDataSourceReader.scala:69)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at com.microsoft.cdm.read.CDMDataSourceReader.planInputPartitions(CDMDataSourceReader.scala:69)
	at com.microsoft.cdm.read.CDMDataSourceReader.planInputPartitions(CDMDataSourceReader.scala:17)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:77)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:76)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:66)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:149)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering$1.apply(EnsureRequirements.scala:148)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:148)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:312)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:304)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:304)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:37)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:119)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:119)
	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
	at scala.collection.immutable.List.foldLeft(List.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:119)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$executedPlan$1.apply(QueryExecution.scala:101)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$executedPlan$1.apply(QueryExecution.scala:101)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measureTime(QueryPlanningTracker.scala:97)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:100)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:93)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:243)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$3.apply(QueryExecution.scala:243)
	at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:135)
	at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:243)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:134)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:682)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:286)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:272)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:572)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)