AbsaOSS/atum

Atum expects `path` to be always available

Opened this issue · 0 comments

When Atum's control measure tracking is enabled, the measure tracking fails in case one attempts to write the dataframe directly to Kafka, e.g.

spark.enableControlMeasuresTracking(somePath).setControlMeasuresWorkflow(someName)
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()

with an error

WARN util.ExecutionListenerManager: Error executing query execution listener
java.util.NoSuchElementException: key not found: path
	at scala.collection.MapLike$class.default(MapLike.scala:228)
	at org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.default(CaseInsensitiveMap.scala:28)
	at scala.collection.MapLike$class.apply(MapLike.scala:141)
	at org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.apply(CaseInsensitiveMap.scala:28)
	at za.co.absa.atum.utils.ExecutionPlanUtils$.inferOutputInfoFileName(ExecutionPlanUtils.scala:103)
	at za.co.absa.atum.core.SparkQueryExecutionListener.writeInfoFileForQuery(SparkQueryExecutionListener.scala:52)
	at za.co.absa.atum.core.SparkQueryExecutionListener.onSuccess(SparkQueryExecutionListener.scala:32)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:127)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:126)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:148)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:146)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
	at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
	at org.apache.spark.sql.util.ExecutionListenerManager.org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling(QueryExecutionListener.scala:146)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply$mcV$sp(QueryExecutionListener.scala:126)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:126)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:126)
	at org.apache.spark.sql.util.ExecutionListenerManager.readLock(QueryExecutionListener.scala:159)
	at org.apache.spark.sql.util.ExecutionListenerManager.onSuccess(QueryExecutionListener.scala:125)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:678)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:268)

The underlying reason is that Atum expects that the parameter path exists SaveIntoDataSourceCommand.options map -- which only holds if conventional dataframe writing is used (e.g. to HDFS).