delta-io/connectors

Add versions lag metric to DeltaSource

kwma3 opened this issue · 2 comments

kwma3 commented

We want to be able to monitor DeltaSource versions lag in a Flink job. One way to achieve this is for the DeltaSource to publish a metric with the number of versions behind the latest. This will help with monitoring jobs that are having back-pressure and not able to keep up with the Delta table publisher.

For reference, the FlinkKafkaConsumer publishes a records-lag-max metric for monitoring consumer lags.

I really like this proposition.

Few notes for one who will be implementing this in the future:
This metric will be applicable only for Delta Source working in Continuous mode were when doing a timetravel back to version X source will read all changes from that change to the current head version and keep monitoring the table for new changes.
The Bounded mode for Delta Source reads only a snapshot for given version when doing a time travel hence no need for such metric there.

Having said that, the implementation must not bleed out any "detail" specific for this metric to the Bounded mode path of Delta Source, in other words - please keep implementation detail abstract out and hide exposing them only in current code branch.

Second thing is that it is crucial to reuse existing DeltaLog instance that is created in ContinuousTableProcessor interface implementations and not create new one. We have noticed that currently computing DeltaLog for large table can be time/cpu intensive operation.

The metric raw value can be calculated by ContinuousTableProcessor implementations possibly delegating to TableMonitor

This repo has been deprecated and the code is moved under connectors module in https://github.com/delta-io/delta repository. Please create the issue in repository https://github.com/delta-io/delta. See #556 for details.