FLIP-27 new Source API integration
Closed this issue · 5 comments
Problem description
Flink 1.11 has done with FLIP-27. This introduces a new source API with some explanation in the official document.
This new API is marked Beta
now, but we can start this work in order to catch up with the Flink community. This new API can have more ability to control of the reader side.
In order to keep the compatibility, we will keep the old FlinkPravegaReader
.
Problem location
Source
Suggestions for an improvement
This is the design document for FLIP-27: https://github.com/pravega/flink-connectors/wiki/FLIP-27-integration-(work-in-progress), we can start some discussions here.
@becketqin @fpj After some sample code development under the design doc above: https://github.com/crazyzhou/flink-connectors/tree/flip-27/src/main/java/io/pravega/connectors/flink/source, the smooth read is running fine, but we find some big gaps in the recovery story. We need some advice from the Flink community and here is some explanation.
- Pravega reader group is synchronizing the read progress for all readers internally in the Pravega controller. It relies on its own Pravega checkpoint for a synchronized, coordinated global position of a reader group. A position of the reader may change and thus inconsistent during balancing within the group.
- The legacy source of Pravega Flink connector is using
MasterTriggerRestoreHook
to align Pravega checkpoint and Flink checkpoint for recovery. - Flink's
SourceReader
API relies on the physical view of all splits in the snapshot for the recovery story, but if we follow the abstraction of the design, since our split is not "physical" but a logical reader, the list ofPosition
s are not consistent for recovery. We plan to rewind the Pravega reader to the position stored in the latest Flink checkpoint by "offline and recreate" inhandleSplitsChange
method inSplitReader
(https://github.com/crazyzhou/flink-connectors/blob/3c4337afcb02b6c9c829656757777ad836bc497f/src/main/java/io/pravega/connectors/flink/source/PravegaSplitReader.java#L99), but we tested that it is inconsistent.
Considering this gap, here is the short-term and long-term plan for the next step.
- ST plan: Stick to the current design, then can we find a way as we initiate checkpoint in
MasterTriggerRestoreHook
in the new source API to fulfill the recovery story? - LT plan: A new design to map segment to the split, this is closer to the Flink's abstraction. We have a similar implementation in our batch source API
FlinkPravegaInputFormat
, but to extend it to streaming case, we need a new Pravega client to support a segment-level reader with order guarantee. And to support auto-scaling with the new abstraction, we noticed there is now onlySplitsAddition
forSplitsChange
, will we have more defined classes for this?
Meeting minutes for the recent discussion with Becket:
For our legacy reader, we are using the ExternallyInducedSource
interface to stop the Flink checkpoint barrier and notify Pravega to initiate a Pravega checkpoint using a ReaderCheckpointHook. This hook will return a CompletableFuture<Checkpoint>
to Flink job master and Flink's CheckpointCoordinator
will handle the barrier and the future complete well to complete our exactly once reader story. Note that this is a special interface that most other connectors do not take this path for checkpoint because they do not handle the checkpoint proactively.
In the new source API, it assumes all the reader progress can be interpreted as a bunch of offsets and has not provided such ExternallyInduced
interface for connectors like us to integrate. Although the enumerator offers a snapshot()
function to let the connector initiate the checkpoint, it assumes it can be an asynchorized behaviour and just return the checkpoint object, this causes that when the connector tries to initiate the Pravega checkpoint, the Flink checkpoint barrier may have already passed the Pravega checkpoint location, thus we will recieve duplicates when fault recovery as seen in my test.
Becket is now talking to the community to express our use case and requirements, they will start to fix it before Flink 1.12 code freeze.
Becket helped to create: https://issues.apache.org/jira/browse/FLINK-20270 to fix this.
The new ExternallyInducedSourceReader
requires keeping track of the checkpoint ID in the enumerator, but unlike the legacy source, it does not provide a method for the implementation to indicate what is the checkpoint ID for the current snapshot.
https://issues.apache.org/jira/browse/FLINK-22133 is created to fix this.