pravega/flink-connectors

FLIP-27 new Source API integration

Closed this issue · 5 comments

Problem description
Flink 1.11 has done with FLIP-27. This introduces a new source API with some explanation in the official document.
This new API is marked Beta now, but we can start this work in order to catch up with the Flink community. This new API can have more ability to control of the reader side.
In order to keep the compatibility, we will keep the old FlinkPravegaReader.

Problem location
Source

Suggestions for an improvement

This is the design document for FLIP-27: https://github.com/pravega/flink-connectors/wiki/FLIP-27-integration-(work-in-progress), we can start some discussions here.

@becketqin @fpj After some sample code development under the design doc above: https://github.com/crazyzhou/flink-connectors/tree/flip-27/src/main/java/io/pravega/connectors/flink/source, the smooth read is running fine, but we find some big gaps in the recovery story. We need some advice from the Flink community and here is some explanation.

  1. Pravega reader group is synchronizing the read progress for all readers internally in the Pravega controller. It relies on its own Pravega checkpoint for a synchronized, coordinated global position of a reader group. A position of the reader may change and thus inconsistent during balancing within the group.
  2. The legacy source of Pravega Flink connector is using MasterTriggerRestoreHook to align Pravega checkpoint and Flink checkpoint for recovery.
  3. Flink's SourceReader API relies on the physical view of all splits in the snapshot for the recovery story, but if we follow the abstraction of the design, since our split is not "physical" but a logical reader, the list of Positions are not consistent for recovery. We plan to rewind the Pravega reader to the position stored in the latest Flink checkpoint by "offline and recreate" in handleSplitsChange method in SplitReader(https://github.com/crazyzhou/flink-connectors/blob/3c4337afcb02b6c9c829656757777ad836bc497f/src/main/java/io/pravega/connectors/flink/source/PravegaSplitReader.java#L99), but we tested that it is inconsistent.

Considering this gap, here is the short-term and long-term plan for the next step.

  • ST plan: Stick to the current design, then can we find a way as we initiate checkpoint in MasterTriggerRestoreHook in the new source API to fulfill the recovery story?
  • LT plan: A new design to map segment to the split, this is closer to the Flink's abstraction. We have a similar implementation in our batch source API FlinkPravegaInputFormat, but to extend it to streaming case, we need a new Pravega client to support a segment-level reader with order guarantee. And to support auto-scaling with the new abstraction, we noticed there is now only SplitsAddition for SplitsChange, will we have more defined classes for this?

Meeting minutes for the recent discussion with Becket:

For our legacy reader, we are using the ExternallyInducedSource interface to stop the Flink checkpoint barrier and notify Pravega to initiate a Pravega checkpoint using a ReaderCheckpointHook. This hook will return a CompletableFuture<Checkpoint> to Flink job master and Flink's CheckpointCoordinator will handle the barrier and the future complete well to complete our exactly once reader story. Note that this is a special interface that most other connectors do not take this path for checkpoint because they do not handle the checkpoint proactively.

In the new source API, it assumes all the reader progress can be interpreted as a bunch of offsets and has not provided such ExternallyInduced interface for connectors like us to integrate. Although the enumerator offers a snapshot() function to let the connector initiate the checkpoint, it assumes it can be an asynchorized behaviour and just return the checkpoint object, this causes that when the connector tries to initiate the Pravega checkpoint, the Flink checkpoint barrier may have already passed the Pravega checkpoint location, thus we will recieve duplicates when fault recovery as seen in my test.

Becket is now talking to the community to express our use case and requirements, they will start to fix it before Flink 1.12 code freeze.

Becket helped to create: https://issues.apache.org/jira/browse/FLINK-20270 to fix this.

The new ExternallyInducedSourceReader requires keeping track of the checkpoint ID in the enumerator, but unlike the legacy source, it does not provide a method for the implementation to indicate what is the checkpoint ID for the current snapshot.
https://issues.apache.org/jira/browse/FLINK-22133 is created to fix this.