rssh/dotty-cps-async

FutureAsyncMonad does not always handle exceptions within called functions

Closed this issue · 7 comments

I define a replacement FutureAsyncMonad in my project as:

given FutureAsyncMonadButGood(using
  ExecutionContext
): CpsSchedulingMonad[Future] with
  type F[+T] = Future[T]
  override type WF[T] = F[T]

  override inline def pure[T](t: T): Future[T] = Future.successful(t)

  override inline def map[A, B](fa: F[A])(f: A => B): F[B] =
    fa.map(f)

  override inline def flatMap[A, B](fa: F[A])(f: A => F[B]): F[B] =
    fa.flatMap(f)

  override inline def error[A](e: Throwable): F[A] =
    Future.failed(e)

  override def mapTry[A, B](fa: F[A])(f: Try[A] => B): F[B] =
    fa.transform { v => Success(f(v)) }

  override def flatMapTry[A, B](fa: F[A])(f: Try[A] => F[B]): F[B] =
    fa.transformWith { v => f(v) }

  override def restore[A](fa: F[A])(fx: Throwable => F[A]): F[A] =
    fa.recoverWith { case ex => fx(ex) }

  override def adoptCallbackStyle[A](source: (Try[A] => Unit) => Unit): F[A] =
    val p = Promise[A]
    source(p.complete(_))
    p.future

  override inline def spawn[A](op: => F[A]): F[A] = Future(op).flatten

  override def tryCancel[A](op: Future[A]): Future[Unit] =
    Future.failed(UnsupportedOperationException("FutureAsyncMonad.tryCancel is unsupported"))

You will notice some definitions are different (e.g. pure should use Future.successful to avoid spawning a new execution context for a value already evaluated) but most importantly, the spawn function is different. The difference between the previous and current one is in its handling of errors.

I wrote a quick test for my own project to make sure I'm not breaking anything, and indeed removing CpsEffectMonad from my own FutureAsyncMonad broke the test until I fixed the spawn method here.

import cps.*
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.*
import scala.util.{Failure, Success}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should

import scala.concurrent.ExecutionContext.Implicits.global
import scala.language.adhocExtensions // AnyFlatSpec

class AsyncTest extends AnyFlatSpec with should.Matchers:
  // using FutureAsyncMonadButGood from this package

  "async" `should` "report errors" in {
    def raise(x: Throwable): Unit = throw x
    val ex1 = IllegalArgumentException("blah")
    val ex2 = IllegalArgumentException("blah 2")
    val ex3 = IllegalArgumentException("blah 3")
    val a1 = async { raise(ex1) }
    val a2 = async { await(a1) }
    val a3 = async { raise(ex2); await(a1) }
    val a4 = async { 5 }
    val a5 = async { await(a4); raise(ex3) }
    val a6 = async { await(a4); await(a4); await(a5); await(a1) }
    val a7 = async { await(a4); await(a4); await(a1); await(a5) }

    Await.ready(a1, 1.second)
    Await.ready(a2, 1.second)
    Await.ready(a3, 1.second)
    Await.ready(a4, 1.second)
    Await.ready(a5, 1.second)
    Await.ready(a6, 1.second)
    Await.ready(a7, 1.second)
    a1.value `should` equal(Some(Failure(ex1)))
    a2.value `should` equal(Some(Failure(ex1)))
    a3.value `should` equal(Some(Failure(ex2)))
    a4.value `should` equal(Some(Success(5)))
    a5.value `should` equal(Some(Failure(ex3)))
    a6.value `should` equal(Some(Failure(ex3)))
    a7.value `should` equal(Some(Failure(ex1)))
  }

Note that the raise function prevents the macro from special-casing the throw statement (simulating an exception from within code from another library), which exposes the incorrect behaviour.

rssh commented
 override def flatMapTry[A, B](fa: F[A])(f: Try[A] => F[B]): F[B] =
    fa.transformWith { v => f(v) }

Here you did not check an exception from f, so it looks like behaviors in the test should not catch an exception.
All true. (hmm - not sure now, will recheck)

Can you reproduce a similar issue on top of the unchanged AsyncFutureMonad?

I tried this test with the FutureAsyncMonad provided by the library and it did not succeed. It called spawn on something which looked like raise(ex1); pure(()), and this created a Future which never completes. I am using the library from 0.9.1.

I believe scala's transformWith also correctly captures exceptions in its result

To elaborate, the original spawn is:

def spawn[A](op: => F[A]): F[A] = val p = Promise[A] summon[ExecutionContext].execute( () => p.completeWith(op) ) p.future 

(Not sure if formatting works, copy/paste into github app on mobile is a bit broken)

This will do p.completeWith(raise(ex1); pure(())) in another execution context, but since it's not managed by Future, the exception will go unreported and the Promise will not complete.

rssh commented

thanks, I see.
Fixed in master.

Thank you for the fix. Is there a particular reason Future.flatten is avoided?

rssh commented

I think it is no difference between submitting to executor 'by hands' or Future(op).flatten. So, it's just random choice.

This is true, but I would expect the pre-cooked solution to need less scrutiny when reviewing or debugging