delta-io/connectors

Create De/Serializers for DeltaLog instance for Flink connector

kristoffSC opened this issue · 6 comments

Currently DeltaLog instance for DeltaSource Flink connector has to be created two times.

  1. When submitting the Flink Job where DeltaLog is created by SourceBuilder to discover table schema.
  2. When Split Enumerator is created (for example to read Snapshot, fetch changes).

Creating DeltaTable/Snapshot instance can be CPU/Memory consuming operation and heavily depends on the table size.
Ideally, for submitting Flink Jobs that are using Delta Source would be to reuse DeltaLog instance created by Source builder and pass it to Source Enumerator. Unfortunately DeltaLog instance is not a serializable object therefore, when adding a non transient DeltaLog field to DeltaSourceIntenral we are getting an Exception (not a serializable object).

By having dedicated De/Serializator we could serialize DeltaLog to an array of bytes, pass it as a Field to DeltaSoueceInternal and Deserialize it back to DeltaLog instance in DeltaSourceIntenral::createEnumerator method. This would allow us to reuse the DeltaLog instance and not computing it again.

tdas commented

I dont think this is a good idea. Its generally a bad architecture to try serializing something that has unbound amount and unbounded variety of data to be serialized. Sometimes we have to do when there is no other way but in those cases the rest of infra is designed to be scalable unbounded serialized stream (like serializing stream data for network transfer). I dont think the Flink job metadata and checkpointing infra (where the serialized deltaLog will be put) is designed to be scalable.

So, yes, its not great that we are having create the DeltaLog again and incurring the cost of going through the snapshot multiple times. But serializig is not the right solution. Rather the right solution is understanding why is the cost of doing two different operations in 1 and 2 high. For example, in 1, all we need is the table schema right? We can make that much much faster than what it is right now. Finding the schema should be close to O(1) operation if done correctly by reading only the schema column from checkpoint parquet files and not going through all the AddFile data in the snapshot. This is more work and heavy optimization in Delta Standalone, but that is the right architectural solution.

Hope that makes sense.

@tdas Thanks for comment, to clarify.

I dont think the Flink job metadata and checkpointing infra (where the serialized deltaLog will be put) is designed to be scalable.

I''m not proposing to put serialzied DeltaLog in Flink's checkpoint, nothing like that.
What I'm proposing is only for job submission phase, where DeltaSource object is serialized and deserialized inside Job manager. This serialized object will not be persisted in any state.

This is more work and heavy optimization in Delta Standalone

Totally agree, but I'm not sure what kind of changes are planned for standalone and what kind of effort is acceptable.
My thought process was that creating a de/serializer for Flink connector would need less effort than changing yours standalone library. If making change in standalone is in fact something doable then sure, I do see that as a better architectural solution.

Trying to propose what I can from place where I am with limited seeing I have ;)
Thanks.

tdas commented

Of course! its always good to brainstorm various ideas and discuss the pros/cons of them.

I''m not proposing to put serialzied DeltaLog in Flink's checkpoint, nothing at all.
What I'm proposing is only for job submission phase, where DeltaSource object is serialized and deserialized inside Job manager. This serialized object will not be persisted in any state.

Ohh! What is the purpose of this serialization then? Is it ser/deser-ed within the same JVM? If yes, then a very simple trick is store a reference to the DeltaLog object in a static field (or a static map to handle multiple DeltaLog instances). After deser you can get back the DeltaLog object from the static field/map without having to initialize it again.

@tdas
This is not always in scope of one JVM [1]. It depends where "Flink client" is running [1].
But even if this would be in scope of one JVM I would consider using static map field for keeping DeltaLog instances as a potential security risk. This instance will be shared by every job running on this Job Manager, every DeltaSource instance which means it will keep DeltaLog created by every Delta Source. Static field is shared across every Java object of particular class.

If somehow user_A would figure out what is the Key of user_B DeltaLog entry in this map, even using brute force, he could take that instance and do bad things with that table.

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/flink-architecture/#anatomy-of-a-flink-cluster

tdas commented

Aaah, so JobManager is shared by jobs from multiple users. That makes sense. I understand the risk. Yeah optimizing standalone is the right approach here.

This repo has been deprecated and the code is moved under connectors module in https://github.com/delta-io/delta repository. Please create the issue in repository https://github.com/delta-io/delta. See #556 for details.