delta-io/connectors

Flink Source Connector - problem with Delta's shared forkJoinPool when computing Snapshot files

kristoffSC opened this issue · 7 comments

For Flink jobs using Delta-Flink source connector it may happen that Flink will throw exception:

Caused by: java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188)

Flink Exception description
This is a Flink's check to detect if user code (Flink Job code) uses a class loader that was already used by some other job that was executed on this cluster. This is a Flink's safety check to detect case when for any reason not all classes from previous, finished job were unloaded from Flink memory.

Such object reference leak between jobs is possible whenever we have a static reference to a class loader instance or whenever a shared, often static thread pool is used by Flink Job. In case of delta-standalone we have the latter.

The purpose of this check in Flink is to prevent OOM due to class reference leak.

Reproduce

  1. comment out configuration.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false); from connectors/flink DeltaTestUtils.java buildCluster method
  2. Run DeltaSourceBoundedExecutionITCaseTest::shouldReadDeltaTableUsingUserSchema test with substituting line
    shouldReadDeltaTable(deltaSource, failoverType) to shouldReadDeltaTable(deltaSource, FailoverType.NONE)

This will execute shouldReadDeltaTableUsingUserSchema test 3 times. First run will pass, where second will "stuck". Then you can observe Flink exception in logs.

Location of the Problem
The problem is located in SnapshotImpl.scala in loadInMemory method executed when computing Snapshot's state.
The loadInMemory uses Scalla's parallel collection -> ParVector which uses ForkJoinThreadPool that is shared for entire JVM. It may happen that Flink Job that uses Delta Source connector will compute Delta Snapshot files with thread that was already used by previous Job on the same cluster -> this will trigger exception from aboive.

Possible Solution
The solution would be to not use shared Fork Join Thread Pool but custom pool with scope bounded to Snapshot object instance.
In order to substitute thread pool for parallel collection instance you need to:
Define a new Pool:

import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory

  final private class ContextClassloaderFixedThreadFactory extends ThreadFactory {
    override def newThread(runnable: Runnable): Thread = {
      val thread = Executors.defaultThreadFactory.newThread(runnable)
      thread.setContextClassLoader(getClass.getClassLoader)
      thread
    }
  }

  import scala.concurrent._

  implicit val myEc = new ExecutionContext {

    val factory: ThreadFactory = new ThreadFactoryBuilder()
        .setDaemon(true)
        .setNameFormat("delta-worker-pool-%d")
        .setThreadFactory(new ContextClassloaderFixedThreadFactory())
        .build();

    val threadPool: ExecutorService = Executors.newFixedThreadPool(10, factory)

    def execute(runnable: Runnable): Unit = {
      threadPool.submit(runnable)
    }

    def reportFailure(t: Throwable): Unit = {}
  }

Use it in the parallel collection:

 val pv = new ParVector(paths.map(_.toString).sortWith(_ < _).toVector)
 pv.tasksupport = new ExecutionContextTaskSupport(myEc) <- substituting the default pool and execution context for the collection.
    pv.flatMap { path =>
      if (path.endsWith("json")) {
      ....
      }
     }

Final thoughts
Thing worth to mention is that described issue will occur only for Flink users. For other users, that are using "vanilla" delta-standalone the forkJoinThreadPool most likely will be just fine. Because of that maybe it will be good to defined an API for Snapshot object that will allow to inject custom thread pool. If no pool was injected that ForkJoinPool will be used.
With this change, we could use the new API in Flink connector and nothing would have change for other users.

Could you point out which line will touch the shared fork join thread pool? I would expect the flink connctor should not touch SnapshotImpl.loadInMemory.

Hi @zsxwing
Flink connector does not call SnapshotImpl.loadInMemory directly.

Flink connector calls SnapshotImpl.getAllFiles() which delegates to Snapshot's state, and state calls loadInMemory

Flink connector calls SnapshotImpl.getAllFiles() which delegates to Snapshot's state, and state calls loadInMemory

@kristoffSC Can we move to Snapshot.scan instead? getAllFiles is the old inefficient API. cc @scottsand-db

@zsxwing using snapshot.scan() is something that I can evaluate, however the same problem happens in Flink-Delta Sink, where we do not call snapshot.getAllFiles().

In Sink, we call OptimisticTransaction.commit which eventually delegates SnapshotImpl::setTransactionsScala
This initializes state which calls loadInMemory that uses parallel collection with shared thread pool.

Please see screen with stacktrace from debug session:
image

I do believe that problem lays in SnapshotImpl.loadInMemory which uses parallel collections with shared fork join pool.
And SnapshotImpl.loadInMemory can be called not only from getAllFiles()

I see. That part is not easy to fix. One quick fix in my mind would be creating a thread pool when creating SnapshotImpl. state` and destroy it when the load is done. The overhead of the thread pool should be trivial as this would only happen in the write path.

@zsxwing

well substituting thread pool for Scala's parallel collection is something I did in my POC fix for this issue that I posted in issue's description. Please take a look #424 . The "Possible Solution" paragraph.

That seems to solve the problem. However I'm not a scala developer, so I did not want to submit an actual PR.

From Flink connector point of view, thread pool can be scoped to DeltaLog instance or Snapshot Instance.
As long as it is not static or shared for entire JVM its ok.
Initializing a thread pool in DeltaLog would potentially make the overhead even smaller.

Would be the best if fix would solve problem globally, for every call of loadInMemory -> not only for commit but also for getAllFies (I know its an old API),

@kristoffSC this approach seems simpler, no? all tests pass. we just need to avoid using ForkJoinPool.commonPool(). We can still use an instance of ForkJoinPool

class SnapshotImpl(...) {
  private val forkJoinPool = new java.util.concurrent.ForkJoinPool()
  ...
  private def loadInMemory(paths: Seq[Path]): Seq[SingleAction] = {
    val pv = new ParVector(paths.map(_.toString).sortWith(_ < _).toVector)
    pv.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(forkJoinPool)
    pv.flatMap { path =>