Is async macro leaking?
Opened this issue · 9 comments
Hi,
when you run code
object Main {
implicit val syncExecutor : ExecutionContext = new ExecutionContext {
override def execute(runnable: Runnable): Unit = runnable.run()
override def reportFailure(cause: Throwable): Unit = throw cause
}
def stop : Unit = {
Thread.sleep(60000)
println("stop")
}
class Witness
def main(args: Array[String]): Unit = {
async{
await(Future{new Witness})
stop
}
}
}
and then, during the sleep, you take a heap dump with VisualVM, you see that there is still an object of type Witness
in the heap. Is this normal?
@hurlebouc While I realize that this is just a reproducer, that syncExecutor implementation is completely broken. I think you can just replace all of it with ExecutionContext.parasitic
and make the reproducer shorter in the process.
The syncExecutor runs futures in the current thread. It's by design.
@hurlebouc parasitic is a correct implementation of a current thread ExecutionContext. The syncExecutor
above has several problems (execute will lead to stack issues and reportError is not supposed to throw exceptions.)
Ok.
@viktorklang : do you think this is related with the leaking problem?
@hurlebouc No idea, but it's a bit of a two-for-one to reduce the size of the reproducer AND help people avoid problems with their sync ExecutionContexts. :)
@viktorklang Thank you for the advice.
But event replacing syncExecutor
by parasitic
leads to the same problem. My hypothesis is that line await(Future{new Witness})
is converted to line val tmp = await(Future{new Witness})
by ANF transformation, which results in memory leaks because tmp
is not freed during execution of stop
.
This is the translation:
final class stateMachine$async extends scala.async.FutureStateMachine {
def <init>(): stateMachine$async = {
stateMachine$async.super.<init>(ec);
()
};
override def apply(tr$async: scala.util.Try): Unit = while$(){
try {
stateMachine$async.this.state() match {
case 0 => {
val awaitable$async: scala.concurrent.Future = scala.concurrent.Future.apply({
final <artifact> def $anonfun$apply(): scala.async.Leak.Witness = new scala.async.Leak.Witness();
(() => $anonfun$apply())
}, ec);
tr$async = stateMachine$async.this.getCompleted(awaitable$async);
stateMachine$async.this.state_=(1);
if (null.!=(tr$async))
while$()
else
{
stateMachine$async.this.onComplete(awaitable$async);
return ()
}
}
case 1 => {
<synthetic> val await$1: Object = {
val tryGetResult$async: Object = stateMachine$async.this.tryGet(tr$async);
if (stateMachine$async.this.==(tryGetResult$async))
return ()
else
tryGetResult$async.$asInstanceOf[Object]()
};
await$1;
Leak.this.stop();
stateMachine$async.this.completeSuccess(scala.runtime.BoxedUnit.UNIT);
return ()
}
case _ => throw new IllegalStateException(java.lang.String.valueOf(stateMachine$async.this.state()))
}
} catch {
case (throwable$async @ (_: Throwable)) => {
stateMachine$async.this.completeFailure(throwable$async);
return ()
}
};
while$()
};
override <bridge> <artifact> def apply(v1: Object): Object = {
stateMachine$async.this.apply(v1.$asInstanceOf[scala.util.Try]());
scala.runtime.BoxedUnit.UNIT
}
};
Your analysis is correct: the ANF transform introduces <synthetic> val await$1: Object = {
to hold the result of the future, even though you don't use this subsequently.
The workaround would be to manually discard the result of the future await(Future{new Witness; ()})
. It would be preferable to the ANF transform smarter to avoid the problem but I don't see a straighforward way to implement it yet.
You would have the same issue using map
or flatMap
and an executor that doesn't bounce the stack frame:
object Main {
def stop: Unit = {
Thread.sleep(60000)
println("stop")
}
class Witness
def main(args: Array[String]): Unit = {
Future(new Witness).map(_ => stop)
()
}
}
You'll see the same issue above. The problem isn't really the async macro. The problem is the fact that Future
memoizes its results. As long as a Future
is on the heap, its results are (once evaluated).
You're basically asking Future
to behave a bit like IO
. For example, if we port your example to Cats Effect:
import cats.effect._
import cats.effect.cps._
import scala.concurrent.duration._
object Main extends IOApp.Simple {
val stop: IO[Unit] =
IO.sleep(1.minute) >> IO.println("stop")
class Witness
val run =
async[IO] {
IO(new Witness).await
stop.await
}
}
If you run the above and take a heap dump during stop
, you'll find no Witness
anywhere. And just to prove nothing strange is going on, we can still run the Witness
creation through Future
and observe the same effect:
import cats.effect._
import cats.effect.cps._
import scala.concurrent.duration._
object Main extends IOApp.Simple {
val stop: IO[Unit] =
IO.sleep(1.minute) >> IO.println("stop")
class Witness
val run =
async[IO] {
IO.fromFuture(IO(Future(new Witness))).await
stop.await
}
}
Same idea. Since Future
retains its results, anything that retains the future also retains the results. IO
doesn't retain its results, so it doesn't exhibit the same issue. The async macro itself has no real impact one way or another aside from obscuring the val
which is holding the Future
(which would otherwise be more apparent in a direct construction).
The PR turns the capturing val
into a var
, and if we see the await$1
all by its lonesome in "statement position", we know it wasn't "consumed" by an enclosing expression or definition (which possibly would have to be a val or a method arg); at that point, null out the var.
At stage or round N of the state machine, we receive the result of the F at N-1, so presumably there is no live reference to the F.
F itself should be off the hook.