AbsaOSS/hyperdrive

Atum integration

Opened this issue · 1 comments

Investigate how Dataset.observe could be used to integrate with Atum

@Zejnilovic @dk1844 @jozefbakus
Dataset.observe is a new feature in Spark 3 to collect metrics and can be used for batch and streaming queries. It adds a CollectMetrics transformation to the execution plan. Unlike the name suggests, this is a transformation (UnaryExecNode) and not an action, so the aggregated metrics are accumulated as you go, without requiring to collect the whole dataset. The metrics can be consumed using a QueryExecutionListener for batch queries or a StreamingQueryListener for streaming queries. More details in the documentation: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Dataset.html#observe-java.lang.String-org.apache.spark.sql.Column-scala.collection.Seq-

https://github.com/AbsaOSS/hyperdrive/pull/214/files contains code demonstrating the usage of observe for streaming (observe-streaming.scala) as well for a batch query (observe-batch.scala)
The output for observe-streaming.scala is like this (executed locally in spark-shell 3.0.1):

BatchId: 0
BatchId: 0
Checkpoint 1 rowCount: 100
Checkpoint 1 sum: -4950
Checkpoint 1 sumAbs: 4950
Checkpoint 1 crc32: 204643287715
Checkpoint 2 rowCount: 50
Checkpoint 2 sum: -2450
Checkpoint 2 sumAbs: 2450
Checkpoint 2 crc32: 104499939000
BatchId: 1
Checkpoint 1 rowCount: 200
Checkpoint 1 sum: -39900
Checkpoint 1 sumAbs: 39900
Checkpoint 1 crc32: 454972867100
Checkpoint 2 rowCount: 100
Checkpoint 2 sum: -19900
Checkpoint 2 sumAbs: 19900
Checkpoint 2 crc32: 230573440034
BatchId: 2
Checkpoint 1 rowCount: 300
Checkpoint 1 sum: -134850
Checkpoint 1 sumAbs: 134850
Checkpoint 1 crc32: 683055184570
Checkpoint 2 rowCount: 150
Checkpoint 2 sum: -67350
Checkpoint 2 sumAbs: 67350
Checkpoint 2 crc32: 346526941219

and the physical plan looks like this

CollectMetrics checkpoint2, [count(1) AS rowCount#15L, sum(cast(value#1 as bigint)) AS sum#17L, sum(cast(abs(value#1) as bigint)) AS sumAbs#19L, sum(crc32value#3L) AS crc32#21L]
+- *(2) Filter ((value#1 % 2) = 0)
   +- CollectMetrics checkpoint1, [count(1) AS rowCount#7L, sum(cast(value#1 as bigint)) AS sum#9L, sum(cast(abs(value#1) as bigint)) AS sumAbs#11L, sum(crc32value#3L) AS crc32#13L]
      +- *(1) Project [value#1, crc32(cast(cast(value#1 as string) as binary)) AS crc32value#3L]
         +- *(1) Project [value#1]
            +- MicroBatchScan[value#1] MemoryStreamDataSource

The output for observe-batch.scala is like this

checkpoint1 rowCount: 100
checkpoint1 sum: -5050
checkpoint1 sumAbs: 5050
checkpoint1 crc32: 203474011273
checkpoint2 rowCount: 50
checkpoint2 sum: -2550
checkpoint2 sumAbs: 2550
checkpoint2 crc32: 103330662558

and the physical plan looks like this

CollectMetrics checkpoint2, [count(1) AS rowCount#17L, sum(cast(value#1 as bigint)) AS sum#19L, sum(cast(abs(value#1) as bigint)) AS sumAbs#21L, sum(crc32value#5L) AS crc32#23L]
+- *(1) Filter ((value#1 % 2) = 0)
   +- CollectMetrics checkpoint1, [count(1) AS rowCount#9L, sum(cast(value#1 as bigint)) AS sum#11L, sum(cast(abs(value#1) as bigint)) AS sumAbs#13L, sum(crc32value#5L) AS crc32#15L]
      +- LocalTableScan [value#1, crc32value#5L]

Between Checkpoint 1 and 2 is a filter that removes all odd values for demo purposes, hence checkpoint 2's row counts are halved.

observe is incredibly useful for streaming queries and by far the best way to collect user-defined metrics. In Hyperdrive, checkpoints could be created e.g. right after the reader and just before the writer, to ensure that no transformation step added or removed rows, or at any transformation step in between.

But also batch queries can benefit from observe because it's probably a much more lightweight way to count a dataset since it doesn't require plan executions to do the count. Thanks to that, probably caching would also not be required. However, observe does not support distinct aggregates. This is the only reason why I wouldn't suggest an attempt to rewrite Atum for Spark 3 on the basis of Dataset.observe

Going forward, control metrics aggregation could certainly be added as a transformer to Hyperdrive, without distinct counts. However, it might be worth considering if such a framework should be independent of Hyperdrive, or integrated into Atum.