flink-extended/flink-scala-api

How to package with sbt assembly?

Closed this issue · 8 comments

Following the g8 template, I can run the wordcount example successfully using sbt run. However, when I try packaging the code into a fat JAR with sbt assembly and submitting to a local flink cluster, I run into the following error

java.lang.NoSuchMethodError: 'scala.collection.immutable.ArraySeq scala.runtime.ScalaRunTime$.wrapRefArray(java.lang.Object[])'
        at WordCount$package$.main(WordCount.scala:11)
        at main.main(WordCount.scala:4)
        at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)

The following are the contents of my build.sbt file.

val scala3Version = "3.3.3"

lazy val root = project
  .in(file("."))
  .settings(
    name := "flink-test",
    version := "0.1.0-SNAPSHOT",
    scalaVersion := scala3Version,
    libraryDependencies += "org.flinkextended" %% "flink-scala-api" % "1.18.1_1.1.5",
    libraryDependencies += "org.apache.flink" % "flink-clients" % "1.18.1" % "provided"
  )

Compile / run := Defaults
  .runTask(
    Compile / fullClasspath,
    Compile / run / mainClass,
    Compile / run / runner
  )
  .evaluated

// stays inside the sbt console when we press "ctrl-c" while a Flink programme executes with "run" or "runMain"
Compile / run / fork := true
Global / cancelable := true

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x                             => MergeStrategy.first
}

@qcfu-bu thanks for the question.

This is what I usually do to run Scala-based Flink job:

// I do not usually package Scala std library(s) with SBT assembly, so set to "false"
assemblyPackageScala / assembleArtifact := false

Then when you go to run your Flink job in any other mode than local one, i.e. session cluster, application cluster, job cluster:

  1. Add both Scala standard libraries 2.13 and Scala 3 to the dependency classpath when running with Scala 3:
  • scala3-library_3-3.x.y.jar
  • scala-library-2.13.x.jar
    When running with 2.13, only one Scala 2.13 is enough.
  1. Change this Flink property by removing scala package from it:

classloader.parent-first-patterns.default: java.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.xml;javax.xml;org.apache.xerces;org.w3c;org.rocksdb.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback
It is needed to load Scala from child classloader to avoid loading old Scala 2.12 which is still in the Apache Flink 1.x distribution. Scala 2.12 won't be packaged in the Flink 2.x, so then we do not need to modify this property any more.

Thanks for the reply! But, I'm not quite sure what you mean by add Scala standard library to the dependency classpath. I tried to download the JARs of the scala libraries and put them in the flink/lib directory as follows

Screenshot 2024-06-08 at 12 57 01 PM

I've added the settings you've written to my build.sbt and flink-conf.yaml respectively.

But after submitting the rebuilt JAR to flink, I still get the same error as before.

Can you try two test to see what works better?

  1. Remove flink-scala_2.12-1.18.1.jar from the from flink/lib and start your job again
  2. Do not put Scala JARs into the flink/lib but add them to the user's classpath.
    In case you submit your job into the Flink Standalone cluster, then both Scala JAR files can be passed via flink CLI as
    flink run --classpath <url>

I've tried both methods.

  1. After removing the flink-scala JAR, Flink complains that it can't load the scala api. So I put the flink-scala JAR back into the flink/lib directory.
  2. After moving the 2 scala JARs out of flink/lib and passing them to the flink cli manually, I get a different error message.
Screenshot 2024-06-08 at 2 26 55 PM

This example works for me fine: https://github.com/novakov-alexey/flink-sandbox/blob/main/modules/core/src/main/scala/org/example/connectedStreams.scala

flink --version
Version: 1.18.0, Commit ID: a5548cc

> flink run --class org.example.ConnectedStreams ../flink-sandbox/modules/core/target/scala-3.3.1/core-assembly-0.1.0-SNAPSHOT.jar
Job has been submitted with JobID 35ac392accbb978f07034bd6753ffffe
Screenshot 2024-06-08 at 20 58 12

Both Scala libs in the flink/lib folder.

Can you also try it?

I've tried to build JARs using your sandbox repo and submitting them and it still does not work.
What's frustrating is that flink clearly sees the 2 scala libraries in the classpath.

Screenshot 2024-06-08 at 9 05 52 PM

I'm giving up on getting this to work. Thanks for the help.

Perhaps try with Docker containers to get some reproducibility.