delta-io/connectors

[Flink-connector] - Use InitContext#getRestoredCheckpointId for checkpointId recovery

kristoffSC opened this issue · 0 comments

Currently DeltaWritter uses checkpoint ID stored in DeltaWriterBucketState.
This might not work well for cluster up scaling since this could potentially lead to that the task start with checkpointId = 0 although it is recovering from a checkpoint. Upscale/downscale Flink job requires job restart which will trigger recovery from a last checkpoint.

In this case, if there will be a new DeltaWritter created from upscale event and there will be no state to recover from for this task, in old implementation, this new DeltaWritter will have checkpointid = 0 assigned which may not be a correct value.

This has no impact on data correctness but it might produce confusing application log data since Committable objects created by new writers will use wrong checkpointId value that is used in logs.

Instead keeping checkpointId in bucket state, the checkpointId should be recovered/acquired from Flink's InitContext#getRestoredCheckpointId