Dynamically invoking withSpark in dockerized Ktor web app throws UnsupportedFileSystemException
Opened this issue · 3 comments
When withSpark
is invoked dynamically in a dockerized Ktor web app, an UnsupportedFileSystemException
is thrown.
Expected behavior: No exception is thrown.
Broadcast.kt (from kotlin-spark-api example)
import org.jetbrains.kotlinx.spark.api.map
import org.jetbrains.kotlinx.spark.api.withSpark
import java.io.Serializable
object Broadcast {
data class SomeClass(val a: IntArray, val b: Int) : Serializable
fun broadcast(): MutableList<Int> {
lateinit var result: MutableList<Int>
withSpark(master = "local") {
val broadcastVariable = spark.broadcast(SomeClass(a = intArrayOf(5, 6), b = 3))
result = listOf(1, 2, 3, 4, 5)
.toDS()
.map {
val receivedBroadcast = broadcastVariable.value
it + receivedBroadcast.a.first()
}
.collectAsList()
println(result)
}
return result
}
}
Routing.kt
fun Application.configureRouting() {
routing {
get("/") {
val list = Broadcast.broadcast()
call.respondText(list.toString())
}
}
}
Dockerfile
# syntax=docker/dockerfile:1
FROM eclipse-temurin:11-jre-jammy AS jre-jammy-spark
RUN curl https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3-scala2.13.tgz -o spark.tgz && \
tar -xf spark.tgz && \
mv spark-3.3.2-bin-hadoop3-scala2.13 /opt/spark && \
rm spark.tgz
ENV SPARK_HOME="/opt/spark"
ENV PATH="${PATH}:/opt/spark/bin:/opt/spark/sbin"
FROM gradle:8.4-jdk11 AS gradle-build
COPY --chown=gradle:gradle . /home/gradle/src
WORKDIR /home/gradle/src
RUN gradle buildFatJar --no-daemon
FROM jre-jammy-spark AS app
RUN mkdir -p /app
COPY --from=gradle-build /home/gradle/src/build/libs/*-all.jar /app/app.jar
ENTRYPOINT ["java","-jar","/app/app.jar"]
compose.yaml
services:
app:
build: .
ports:
- 8888:8888
In a shell, run:
$ docker compose up
Then, open http://localhost:8888 in a browser.
An org.apache.hadoop.fs.UnsupportedFileSystemException
will be thrown:
app-1 | 2024-04-09 10:26:26.484 [main] INFO ktor.application - Autoreload is disabled because the development mode is off.
app-1 | 2024-04-09 10:26:26.720 [main] INFO ktor.application - Application started in 0.261 seconds.
app-1 | 2024-04-09 10:26:26.816 [DefaultDispatcher-worker-1] INFO ktor.application - Responding at http://0.0.0.0:8888
app-1 | WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
app-1 | Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
app-1 | 2024-04-09 10:27:07.885 [eventLoopGroupProxy-4-1] WARN o.a.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
app-1 | WARNING: An illegal reflective access operation has occurred
app-1 | WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/app/app.jar) to constructor java.nio.DirectByteBuffer(long,int)
app-1 | WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
app-1 | WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
app-1 | WARNING: All illegal access operations will be denied in a future release
app-1 | 2024-04-09 10:27:10.066 [eventLoopGroupProxy-4-1] WARN o.a.spark.sql.internal.SharedState - URL.setURLStreamHandlerFactory failed to set FsUrlStreamHandlerFactory
app-1 | 2024-04-09 10:27:10.071 [eventLoopGroupProxy-4-1] WARN o.a.spark.sql.internal.SharedState - Cannot qualify the warehouse path, leaving it unqualified.
app-1 | org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
app-1 | at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3585)
app-1 | at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3608)
app-1 | at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
app-1 | at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3712)
app-1 | at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3663)
app-1 | at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:557)
app-1 | at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
app-1 | at org.apache.spark.sql.internal.SharedState$.qualifyWarehousePath(SharedState.scala:282)
app-1 | at org.apache.spark.sql.internal.SharedState.liftedTree1$1(SharedState.scala:80)
app-1 | at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:79)
app-1 | at org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:143)
app-1 | at scala.Option.getOrElse(Option.scala:201)
app-1 | at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:143)
app-1 | at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:142)
app-1 | at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:162)
app-1 | at scala.Option.getOrElse(Option.scala:201)
app-1 | at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:160)
app-1 | at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:157)
app-1 | at org.apache.spark.sql.SparkSession.$anonfun$new$3(SparkSession.scala:117)
app-1 | at scala.Option.map(Option.scala:242)
app-1 | at org.apache.spark.sql.SparkSession.$anonfun$new$1(SparkSession.scala:117)
app-1 | at org.apache.spark.sql.internal.SQLConf$.get(SQLConf.scala:230)
app-1 | at org.apache.spark.sql.catalyst.SerializerBuildHelper$.nullOnOverflow(SerializerBuildHelper.scala:29)
app-1 | at org.apache.spark.sql.catalyst.SerializerBuildHelper$.createSerializerForJavaBigDecimal(SerializerBuildHelper.scala:158)
app-1 | at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:549)
app-1 | at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
app-1 | at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
app-1 | at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
app-1 | at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
app-1 | at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:448)
app-1 | at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerForType$1(ScalaReflection.scala:437)
app-1 | at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
app-1 | at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
app-1 | at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
app-1 | at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
app-1 | at org.apache.spark.sql.catalyst.ScalaReflection$.serializerForType(ScalaReflection.scala:429)
app-1 | at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:55)
app-1 | at org.apache.spark.sql.Encoders$.DECIMAL(Encoders.scala:100)
app-1 | at org.apache.spark.sql.Encoders.DECIMAL(Encoders.scala)
app-1 | at org.jetbrains.kotlinx.spark.api.EncodingKt.<clinit>(Encoding.kt:87)
app-1 | at com.example.plugins.Broadcast.broadcast(Broadcast.kt:62)
app-1 | at com.example.plugins.RoutingKt$configureRouting$1$1.invokeSuspend(Routing.kt:10)
app-1 | at com.example.plugins.RoutingKt$configureRouting$1$1.invoke(Routing.kt)
app-1 | at com.example.plugins.RoutingKt$configureRouting$1$1.invoke(Routing.kt)
app-1 | at io.ktor.server.routing.Route$buildPipeline$1$1.invokeSuspend(Route.kt:116)
app-1 | at io.ktor.server.routing.Route$buildPipeline$1$1.invoke(Route.kt)
app-1 | at io.ktor.server.routing.Route$buildPipeline$1$1.invoke(Route.kt)
app-1 | at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
app-1 | at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
app-1 | at io.ktor.util.pipeline.SuspendFunctionGun.execute$ktor_utils(SuspendFunctionGun.kt:98)
app-1 | at io.ktor.util.pipeline.Pipeline.execute(Pipeline.kt:77)
app-1 | at io.ktor.server.routing.Routing$executeResult$$inlined$execute$1.invokeSuspend(Pipeline.kt:478)
app-1 | at io.ktor.server.routing.Routing$executeResult$$inlined$execute$1.invoke(Pipeline.kt)
app-1 | at io.ktor.server.routing.Routing$executeResult$$inlined$execute$1.invoke(Pipeline.kt)
app-1 | at io.ktor.util.debug.ContextUtilsKt.initContextInDebugMode(ContextUtils.kt:17)
app-1 | at io.ktor.server.routing.Routing.executeResult(Routing.kt:190)
app-1 | at io.ktor.server.routing.Routing.interceptor(Routing.kt:64)
app-1 | at io.ktor.server.routing.Routing$Plugin$install$1.invokeSuspend(Routing.kt:140)
app-1 | at io.ktor.server.routing.Routing$Plugin$install$1.invoke(Routing.kt)
app-1 | at io.ktor.server.routing.Routing$Plugin$install$1.invoke(Routing.kt)
app-1 | at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
app-1 | at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
app-1 | at io.ktor.server.engine.BaseApplicationEngineKt$installDefaultTransformationChecker$1.invokeSuspend(BaseApplicationEngine.kt:124)
app-1 | at io.ktor.server.engine.BaseApplicationEngineKt$installDefaultTransformationChecker$1.invoke(BaseApplicationEngine.kt)
app-1 | at io.ktor.server.engine.BaseApplicationEngineKt$installDefaultTransformationChecker$1.invoke(BaseApplicationEngine.kt)
app-1 | at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
app-1 | at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
app-1 | at io.ktor.util.pipeline.SuspendFunctionGun.execute$ktor_utils(SuspendFunctionGun.kt:98)
app-1 | at io.ktor.util.pipeline.Pipeline.execute(Pipeline.kt:77)
app-1 | at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1$invokeSuspend$$inlined$execute$1.invokeSuspend(Pipeline.kt:478)
app-1 | at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1$invokeSuspend$$inlined$execute$1.invoke(Pipeline.kt)
app-1 | at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1$invokeSuspend$$inlined$execute$1.invoke(Pipeline.kt)
app-1 | at io.ktor.util.debug.ContextUtilsKt.initContextInDebugMode(ContextUtils.kt:17)
app-1 | at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1.invokeSuspend(DefaultEnginePipeline.kt:123)
app-1 | at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1.invoke(DefaultEnginePipeline.kt)
app-1 | at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1.invoke(DefaultEnginePipeline.kt)
app-1 | at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
app-1 | at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
app-1 | at io.ktor.util.pipeline.SuspendFunctionGun.execute$ktor_utils(SuspendFunctionGun.kt:98)
app-1 | at io.ktor.util.pipeline.Pipeline.execute(Pipeline.kt:77)
app-1 | at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1$invokeSuspend$$inlined$execute$1.invokeSuspend(Pipeline.kt:478)
app-1 | at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1$invokeSuspend$$inlined$execute$1.invoke(Pipeline.kt)
app-1 | at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1$invokeSuspend$$inlined$execute$1.invoke(Pipeline.kt)
app-1 | at io.ktor.util.debug.ContextUtilsKt.initContextInDebugMode(ContextUtils.kt:17)
app-1 | at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1.invokeSuspend(NettyApplicationCallHandler.kt:140)
app-1 | at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1.invoke(NettyApplicationCallHandler.kt)
app-1 | at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1.invoke(NettyApplicationCallHandler.kt)
app-1 | at kotlinx.coroutines.intrinsics.UndispatchedKt.startCoroutineUndispatched(Undispatched.kt:44)
app-1 | at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:112)
app-1 | at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:126)
app-1 | at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch(Builders.common.kt:56)
app-1 | at kotlinx.coroutines.BuildersKt.launch(Unknown Source)
app-1 | at io.ktor.server.netty.NettyApplicationCallHandler.handleRequest(NettyApplicationCallHandler.kt:41)
app-1 | at io.ktor.server.netty.NettyApplicationCallHandler.channelRead(NettyApplicationCallHandler.kt:33)
app-1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
app-1 | at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
app-1 | at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:425)
app-1 | at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
app-1 | at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
app-1 | at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
app-1 | at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
app-1 | at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
app-1 | at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
app-1 | at io.ktor.server.netty.EventLoopGroupProxy$Companion.create$lambda$1$lambda$0(NettyApplicationEngine.kt:296)
app-1 | at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
app-1 | at java.base/java.lang.Thread.run(Unknown Source)
app-1 | [6, 7, 8, 9, 10]
For comparison, I created an equivalent dockerized Spring Boot app here.
Notably, no exception is thrown.
This seems to suggest that the issue lies within kotlin-spark
.
In the Spring Boot version, it seems that Spark creates a local temporary directory as part of the preparation of the Spark session before it invokes the broadcast function:
app-1 | 2024-04-10 09:00:43.393 INFO 1 --- [nio-8888-exec-1] o.a.s.SparkEnv : Registering BlockManagerMasterHeartbeat
app-1 | 2024-04-10 09:00:43.410 INFO 1 --- [nio-8888-exec-1] o.a.s.s.DiskBlockManager : Created local directory at /tmp/blockmgr-c9cef486-62f2-431a-8408-1e48b933da34
app-1 | 2024-04-10 09:00:43.436 INFO 1 --- [nio-8888-exec-1] o.a.s.s.m.MemoryStore : MemoryStore started with capacity 2.1 GiB
. . .
app-1 | 2024-04-10 09:00:43.829 INFO 1 --- [nio-8888-exec-1] o.a.s.s.m.MemoryStore : Block broadcast_0 stored as values in memory (estimated size 72.0 B, free 2.1 GiB)
app-1 | 2024-04-10 09:00:43.856 INFO 1 --- [nio-8888-exec-1] o.a.s.s.m.MemoryStore : Block broadcast_0_piece0 stored as bytes in memory (estimated size 146.0 B, free 2.1 GiB)
app-1 | 2024-04-10 09:00:43.858 INFO 1 --- [ckManagerMaster] o.a.s.s.BlockManagerInfo : Added broadcast_0_piece0 in memory on 1d1b66d9e151:43605 (size: 146.0 B, free: 2.1 GiB)
app-1 | 2024-04-10 09:00:43.862 INFO 1 --- [nio-8888-exec-1] o.a.s.SparkContext : Created broadcast 0 from broadcast at SparkBroadcast.java:30
It seems the exception is thrown in the Ktor app roughly at the point where the temporary directory would have been created:
app-1 | 2024-04-10 09:53:18.476 [eventLoopGroupProxy-4-1] WARN o.a.spark.sql.internal.SharedState - URL.setURLStreamHandlerFactory failed to set FsUrlStreamHandlerFactory
app-1 | 2024-04-10 09:53:18.477 [eventLoopGroupProxy-4-1] INFO o.a.spark.sql.internal.SharedState - Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
app-1 | 2024-04-10 09:53:18.482 [eventLoopGroupProxy-4-1] WARN o.a.spark.sql.internal.SharedState - Cannot qualify the warehouse path, leaving it unqualified.
app-1 | org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
. . .
app-1 | 2024-04-10 09:53:19.148 [eventLoopGroupProxy-4-1] INFO o.a.spark.storage.memory.MemoryStore - Block broadcast_0 stored as values in memory (estimated size 72.0 B, free 2.1 GiB)
app-1 | 2024-04-10 09:53:19.180 [eventLoopGroupProxy-4-1] INFO o.a.spark.storage.memory.MemoryStore - Block broadcast_0_piece0 stored as bytes in memory (estimated size 150.0 B, free 2.1 GiB)
app-1 | 2024-04-10 09:53:19.182 [dispatcher-BlockManagerMaster] INFO o.a.spark.storage.BlockManagerInfo - Added broadcast_0_piece0 in memory on dcb91ee36ad3:36665 (size: 150.0 B, free: 2.1 GiB)
app-1 | 2024-04-10 09:53:19.186 [eventLoopGroupProxy-4-1] INFO org.apache.spark.SparkContext - Created broadcast 0 from broadcast at Broadcast.kt:61
Like I responded on slack:
The file reading exception print happens when the DECIMAL encoder is pre-loaded by the Kotlin Spark API Encoders file. Can you try to instantiate the same encoder in the non-kotlin spark project to see what happens?
In the spark 3.4+ branch of the project the encoding part is completely overhauled, so this issue won't be there anymore. But it's still a WIP.
Your program still executes fine. It's a caught exception that's just logged to the output.
Thank you so much for pinpointing the source of the exception. I'm glad to know it's not because of an error on my part.
Since this issue will go away with the new release, it seems like spending any more time on it would be purely academic, so I won't trouble you any more and will let you get back to your more important work on the 3.4+ fix.
Thanks again and have a great weekend!