Azure/azure-event-hubs-spark

Checkpoint write to blob storage using hadoop configuration

ddevidence opened this issue · 3 comments

Bug Report:

Description: Spark Scala streaming application reads dataset from EventHub and writes processed dataset to ADLS Gen2, that part of the application (without hadoop configuration) works fine using the Client Credentials using the following lib

  • azure-eventhubs-spark_2.12 (ver 2.3.22)
  • DataLakeServiceClientBuilder/azure-storage-file-datalake (ver 12.15.0)

The issue is the usage of hadoop configuration using the client-credentials, doesn't work for writing checkpoints to the Blob storage for the streaming application described above

  • Actual behavior: As the application with the hadoop configuration is launched/initializes, the checkpoint IO to the blob storage fails with the following stacktrace

sorry about the long stacktrace..

23/05/30 20:46:21 ERROR AzureNativeFileSystemStore: Service returned StorageException when checking existence of container XXXXXXXXXXX in account XXXXXXXX.blob.core.windows.net com.microsoft.azure.storage.StorageException: An unknown failure occurred : Connection reset at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:67) at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:209) at com.microsoft.azure.storage.blob.CloudBlobContainer.exists(CloudBlobContainer.java:769) at com.microsoft.azure.storage.blob.CloudBlobContainer.exists(CloudBlobContainer.java:756) at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobContainerWrapperImpl.exists(StorageInterfaceImpl.java:233) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.connectUsingAnonymousCredentials(AzureNativeFileSystemStore.java:892) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:1118) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:566) at org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1423) at org.apache.hadoop.fs.DelegateToFileSystem.<init>(DelegateToFileSystem.java:54) at org.apache.hadoop.fs.azure.Wasbs.<init>(Wasbs.java:40) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Unknown Source) at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source) at org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:143) at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:181) at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:266) at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:342) at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:339) at java.base/java.security.AccessController.doPrivileged(Unknown Source) at java.base/javax.security.auth.Subject.doAs(Unknown Source) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339) at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465) at org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.<init>(CheckpointFileManager.scala:316) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.<init>(CheckpointFileManager.scala:357) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:209) at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.resolveCheckpointLocation(ResolveWriteToStream.scala:89) at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$$anonfun$apply$1.applyOrElse(ResolveWriteToStream.scala:42) at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$$anonfun$apply$1.applyOrElse(ResolveWriteToStream.scala:40) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:31) at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.apply(ResolveWriteToStream.scala:40) at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.apply(ResolveWriteToStream.scala:39) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211) at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:228) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:224) at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:224) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:270) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:346) at org.apache.spark.sql.streaming.DataStreamWriter.startQuery(DataStreamWriter.scala:430) at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:365) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:249) at com.usbank.shieldplatform.evidence.streaming.StreamEvidenceProcessor$.executeStreamProcessor(StreamEvidenceProcessor.scala:290) at com.usbank.shieldplatform.evidence.streaming.StreamEvidenceProcessor$.main(StreamEvidenceProcessor.scala:304) at com.usbank.shieldplatform.evidence.streaming.StreamEvidenceProcessor.main(StreamEvidenceProcessor.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

The hadoop configuration code

def createSparkSessionWithHadoopConfigBlobWrite(appName: String): SparkSession = { val spark = SparkSession.builder() .master("local[*]") .appName(appName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .config("spark.sql.streaming.log.enableTimeStamps", "true") .config("spark.sql.streaming.log.malformedEventLogEnabled", "true") .config(s"spark.hadoop.fs.azure.account.auth.type.$blobStorageAccount.blob.core.windows.net", "OAuth") .config(s"spark.hadoop.fs.azure.account.oauth.provider.type.$blobStorageAccount.blob.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") .config(s"spark.hadoop.fs.azure.account.oauth2.client.id.$blobStorageAccount.blob.core.windows.net", clientId) .config(s"spark.hadoop.fs.azure.account.oauth2.client.secret.$blobStorageAccount.blob.core.windows.net", clientSecret) .config(s"spark.hadoop.fs.azure.account.oauth2.client.endpoint.$blobStorageAccount.blob.core.windows.net", "https://login.microsoftonline.com/XXXXXX-XXXX-XXXX-XXXX-XXXXXXXX/oauth2/token") .config("spark.sql.streaming.checkpointLocation", s"wasbs://$containerName@$blobStorageAccount.blob.core.windows.net/$checkPointWritePath") .getOrCreate() println(s"CHKPOINT Location: wasbs://$containerName@$blobStorageAccount.blob.core.windows.net/$checkPointWritePath") spark }

  • Expected behavior: The Spark Scala streaming application should be able to perform Checkpoint writes to the BLOB storage based on the hadoop configuration provided as part of the SparkSession.builder().config

  • Spark version: 3.4.0

  • spark-eventhubs artifactId and version: azure-eventhubs-spark_2.12 (ver 2.3.22)

Currently, we only support checkpoint location which is a path in an HDFS compatible file system. We don't have a firm deadline for this feature yet.

Ref: https://learn.microsoft.com/en-us/azure/hdinsight/overview-data-lake-storage-gen2#core-functionality-of-azure-data-lake-storage-gen2

• Access that is compatible with Hadoop: In Azure Data Lake Storage Gen2, you can manage and access data just as you would with a Hadoop Distributed File System (HDFS). The Azure Blob File System (ABFS) driver is available within all Apache Hadoop environments, including Azure HDInsight and Azure Databricks. Use ABFS to access data stored in Data Lake Storage Gen2.

– this ref seems to indicate ADLS Gen2 supports Hadoop operations which lead to the assumption the library would support checkpoint write as part of the spark hadoopConfiguration.

Can you provide insight ? if this would be a significant feature update or perhaps something on the lower-end.. just want to have this available on a priority for the use-case at hand.

This will require some works and I am not able to provide a deadline for it