delta-io/connectors

Scala incompatibility for Delta Flink connector when deploying connector jar to the Flink cluster.

kristoffSC opened this issue · 5 comments

One of the users from Flink's Slack channel [1] reach out to me [2] with a problem about Scala incompatibility issue while deploying Flink Job that uses Delta Connector. The user was unable to successfully submit the Flink job. The user was using 0.6.0 connector/standalone version and Flink 1.16.x.

The user was submitting a "thin" Job jar onto the cluster and was adding dependencies such as Delta standalone delta-flink directly to cluster. See "Root cause analysis" section below for details.

The exception:

Caused by: java.lang.NoSuchMethodError: scala.Some.value()Ljava/lang/Object;
    at io.delta.standalone.internal.storage.DelegatingLogStore.schemeBasedLogStore(DelegatingLogStore.scala:52)
    at io.delta.standalone.internal.storage.DelegatingLogStore.getDelegate(DelegatingLogStore.scala:76)
    at io.delta.standalone.internal.storage.DelegatingLogStore.read(DelegatingLogStore.scala:83)
    at io.delta.standalone.internal.Checkpoints.loadMetadataFromFile(Checkpoints.scala:136)
    at io.delta.standalone.internal.Checkpoints.lastCheckpoint(Checkpoints.scala:110)
    at io.delta.standalone.internal.Checkpoints.lastCheckpoint$(Checkpoints.scala:109)
    at io.delta.standalone.internal.DeltaLogImpl.lastCheckpoint(DeltaLogImpl.scala:42)
    at io.delta.standalone.internal.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:218)
    at io.delta.standalone.internal.SnapshotManagement.$init$(SnapshotManagement.scala:37)
    at io.delta.standalone.internal.DeltaLogImpl.<init>(DeltaLogImpl.scala:47)
    at io.delta.standalone.internal.DeltaLogImpl$.apply(DeltaLogImpl.scala:263)
    at io.delta.standalone.internal.DeltaLogImpl$.forTable(DeltaLogImpl.scala:245)
    at io.delta.standalone.internal.DeltaLogImpl.forTable(DeltaLogImpl.scala)
    at io.delta.standalone.DeltaLog.forTable(DeltaLog.java:176)
    at io.delta.flink.sink.internal.committer.DeltaGlobalCommitter.commit(DeltaGlobalCommitter.java:225)
    at

Root cause
Delta standalone library is expecting Scala 2.12.18 whereas Flink, starting from version 1.15 supports only Scala 2.12.17
It happens that there is a binary incompatibility between those two Scala versions that was also confirmed by Flink's PMC who stated that this incompatibility is one of the reason why Flink is still using Scala 2.12.17 [3].

Root cause analysis
Flink users have basically 2 approaches for creating Flink Job jars.

  1. Create an Uber Jar that contains user code, for example job's pipeline code created using Flink's data stream API and dependencies that are not in scope "provided".
    In this case, the uber jar contains everything that Job needs to run and it is not marked as provided.

  2. Create a "thin" jar that contains only user code and submit this to the Flink cluster. In this case it is expected that all necessary libraries, like connector libraries are available on Flink cluster. This is achieved by adding those libraries to Flink's lib folder.

Both ways are valid and used by Flink user's. I this particular case, user due to project use case, was limited to use only the 2nd option.

The exception happens for second case, while first case is working fine. The reason why the issue does not happen for the first case is that the uber Jar contains Scala 2.12.18. Moreover for the user code, Flink uses separate class loader and load classes from submitter jar probably to a separate class path that is not used by Flink['s runtime, hence no conflict with Flink runtime Scala version.

Proposed solution
Most likely Delta will not downgrade Scala dependency to match Flink's Scala 2.12.17 and Flink will not upgrade its Scala version either. That is why it would be good to provide to the users a working example how they can submit they jub using "thin" jar approach.

The proposed solution would be to build an uber jar containing delta connector and standalone library with shaded Scala dependency. This jar, lets call it DeltaAllDep.jar will be deployed on Flink cluster under Flink's lib folder. The user code does not require any changes other than changing delta-standalone and delta-flink dependencies scope to provided. User code does not need add dependency to `DeltaAllDep.jar'.

The pom.xml example for DeltaAllDep.jar is attached to this issue.
pom.zip

Users who would like to submit a "thin" and keep delta connector library directly on Flink cluster would have to shipped "DeltaAllDep.jar" into their clusters.

Proposed solution worked for Flink Slack user that originally reported this problem.

Suitable example and instructions should be added to examples/flink-example and README.md

[1] https://flink.apache.org/community/#slack
[2] https://apache-flink.slack.com/archives/C03G7LJTS2G/p1677578890965759
[3] https://apache-flink.slack.com/archives/C03G7LJTS2G/p1677597063907099?thread_ts=1677578890.965759&cid=C03G7LJTS2G

This issue might also be related with this commenthttps://github.com/delta-io/connectors/pull/389#issuecomment-1247076317 I made some time ago.

The DeltaAllDep.jar's pom.xml is based on the one presented in this comment.

The proposed solution would be to build an uber jar containing delta connector and standalone library with shaded Scala dependency.

Is this something that we do? And add to maven?

Or is it something that users do themselves? And we only need to add instructions?

The latter, something that users do themselves, and we only need to add instructions.

That SGTM

This repo has been deprecated and the code is moved under connectors module in https://github.com/delta-io/delta repository. Please create the issue in repository https://github.com/delta-io/delta. See #556 for details.