Usage with other effect systems besides Future
jimmydivvy opened this issue ยท 27 comments
The README indicates that this library can be used with other implementations of the future pattern - however I couldn't find documentation on how to do this.
In particular I'm interested in using with scalaz Task. Is this possible?
I am also very interested in this. Up +1
See:
https://github.com/scala/async/blob/master/src/main/scala/scala/async/Async.scala#L45
For the spots you would need to replicate to add an adapter for Task
. You might need to model the absense of an execution context on Task
as a Unit
typed. You can see this in the example of the "not-actually-concurrent future system" in https://github.com/scala/async/blob/master/src/main/scala/scala/async/internal/AsyncId.scala
would someone like to turn this into a pull request with a small addition to the doc?
I think the major problem here would be that the FutureSystem
assumes the presence of a Promise
concept, and as far as I know the Task
in scalaz, or IO
in cats
don't have that.
However, there's https://github.com/pelotom/effectful, which seems to do the same thing for scalaz, or is it somehow different?
@adamw effectful is effectively abandoned: pelotom/effectful#15
@dsilvasc True, though there's also https://github.com/monadless/monadless (not sure if it's maintained, but it usually works good enough)
@adamw There's a question about monadless support for scala 2.13 from over 2 months ago with no response:
monadless/monadless#9
Don't know about maintenance plans for Stateless Future:
https://github.com/qifun/stateless-future
ThoughtWorks Each doesn't seem to have a scala 2.13 version published to Maven Central, but they might actively use it in production with their customers.
https://github.com/ThoughtWorksInc/each
ThoughtWorks seem to be moving on to a compiler plugin though -- same primary author from Each:
https://javadoc.io/page/com.thoughtworks.dsl/dsl_2.12/latest/com/thoughtworks/dsl/index.html
https://github.com/ThoughtWorksInc/Dsl.scala
https://github.com/ThoughtWorksInc/Dsl.scala/wiki/Benchmarks:-Dsl.scala-vs-Monix-vs-Cats-Effect-vs-Scalaz-Concurrent-vs-Scala-Async-vs-Scala-Continuation
Sorry for the tangent -- I think that's all of the related projects :)
I would be more concerned about unmerged PRs than issues without comment ;) Anyway, all of these seem rather abandoned. Would be nice of course to combine all these efforts into a single one, working in any Cats/Scalaz monad (like Kotlin's coroutines), but ... there's a finite amount of time ;)
I took a look at ScalaConcurrentAsync and AsyncId, then took a stab at implementing an adapter for a Task-like abstraction where Task[T]
is a wrapper around a memoized function (ExecutionContext => Future[T])
and with an API similar to that of Future (but without an implicit ExecutionContext parameter everywhere). Here's what a naive translation would look like:
import scala.async.internal.{AsyncBase, FutureSystem}
import scala.concurrent.Promise
import scala.reflect.macros.whitebox
import scala.language.experimental.macros
object TaskAsync extends AsyncBase {
lazy val futureSystem = TaskFutureSystem
type FS = TaskFutureSystem.type
def async[T](body: => T): Task[T] = macro asyncTaskImpl[T]
def asyncTaskImpl[T: c.WeakTypeTag](c: whitebox.Context)(body: c.Expr[T]): c.Expr[Task[T]] = {
val u: c.Expr[Unit] = c.Expr[Unit](c.parse("()"))
asyncImpl[T](c)(body)(u)
}
}
object TaskFutureSystem extends FutureSystem {
override type Prom[A] = Promise[A]
override type Fut[A] = Task[A]
override type ExecContext = Unit
override type Tryy[A] = scala.util.Try[A]
override def mkOps(c0: whitebox.Context): Ops { val c: c0.type } = new Ops {
val c: c0.type = c0
import c.universe._
def promType[A](implicit evidence: c.universe.WeakTypeTag[A]): c.universe.Type = weakTypeOf[Prom[A]]
def tryType[A](implicit evidence: c.universe.WeakTypeTag[A]): c.universe.Type = weakTypeOf[Tryy[A]]
def execContextType: Type = weakTypeOf[ExecContext]
def createProm[A](implicit evidence: c.universe.WeakTypeTag[A]): c.universe.Expr[Promise[A]] = reify {
Promise[A]()
}
def promiseToFuture[A: WeakTypeTag](prom: Expr[Prom[A]]): c.universe.Expr[Task[A]] = reify {
new Task(_ => prom.splice.future)
}
def future[A: WeakTypeTag](a: Expr[A])(execContext: Expr[ExecContext]): c.universe.Expr[Task[A]] = reify {
new Task(implicit ec => Future { a.splice })
}
def onComplete[A, B](
task: Expr[Fut[A]],
fun: Expr[scala.util.Try[A] => B],
execContext: Expr[ExecContext]): Expr[Unit] = reify {
task.splice.onComplete(fun.splice)
}
override def continueCompletedFutureOnSameThread: Boolean = true
override def getCompleted[A: WeakTypeTag](task: Expr[Fut[A]]): Expr[Tryy[A]] = reify {
if (task.splice.isCompleted) future.splice.value.get else null
}
def completeProm[A](prom: Expr[Prom[A]], value: Expr[scala.util.Try[A]]): Expr[Unit] = reify {
prom.splice.complete(value.splice)
c.Expr[Unit](Literal(Constant(()))).splice
}
def tryyIsFailure[A](tryy: Expr[scala.util.Try[A]]): Expr[Boolean] = reify {
tryy.splice.isFailure
}
def tryyGet[A](tryy: Expr[Tryy[A]]): Expr[A] = reify {
tryy.splice.get
}
def tryySuccess[A: WeakTypeTag](a: Expr[A]): Expr[Tryy[A]] = reify {
scala.util.Success[A](a.splice)
}
def tryyFailure[A: WeakTypeTag](a: Expr[Throwable]): Expr[Tryy[A]] = reify {
scala.util.Failure[A](a.splice)
}
}
}
This doesn't work because the async macro expects the implementation of the future
method to generate an expression that kicks off execution, and tasks don't start running when they're created.
To clarify, here's an example:
val runCount = new AtomicInteger(0)
val t = async {
val task = async {
runCount.incrementAndGet()
5
}
assert(runCount.get == 0)
val ten = await(task) * 2
assert(ten == 10)
assert(runCount.get == 1)
ten
}
val future = t.run()(ExecutionContext.global)
val result = Await.result(future, Duration.Inf)
assert(runCount.get == 1)
assert(result == 10)
and the macro expansion printed by -Ymacro-debug-lite
and some manual cleanup for legibility:
val runCount = new AtomicInteger(0)
class StateMachine extends AnyRef with (snapchat.concurrent.TaskFutureSystem.Tryy[Any] => Unit) with (() => Unit) {
private[this] var await$macro$4$macro$6: Int = _
private[this] var state: Int = 0
/*private[this] */
val result: snapchat.concurrent.TaskFutureSystem.Prom[Int] = Promise.apply[Int]()
// def result: snapchat.concurrent.TaskFutureSystem.Prom[Int] = result
private[this] val execContext: Unit = ()
// def execContext: Unit = execContext
def apply(tr: snapchat.concurrent.TaskFutureSystem.Tryy[Int]): Unit = while$macro$8() {
try {
state match {
case 0 =>
val task = new Task(implicit ec => Future {
runCount.incrementAndGet()
5
})
assert(runCount.get().==(0))
val awaitable$macro$3
: snapchat.concurrent.Task[Int] /* @scala.reflect.internal.annotations.uncheckedBounds */ =
task
state = 1
val completed$macro$7: snapchat.concurrent.TaskFutureSystem.Tryy[Int] =
if (awaitable$macro$3.isCompleted)
awaitable$macro$3.value.get
else
null
if (null.ne(completed$macro$7)) {
if (completed$macro$7.isFailure) {
/*stateMachine$macro$2.this.*/
result.complete(completed$macro$7)
return ()
} else {
await$macro$4$macro$6 = completed$macro$7.get
state = 2
}
()
} else {
awaitable$macro$3.onComplete(this)
return ()
}
()
case 2 => {
/*stateMachine$macro$2.this.*/
result.complete(Success({
val x$macro$5: Int = 2
val ten: Int = await$macro$4$macro$6.*(x$macro$5)
assert(ten.==(10))
assert(runCount.get().==(0))
ten
}))
()
}
return ()
case 1 =>
if (tr.isFailure) {
/*stateMachine$macro$2.this.*/
result
.complete(tr)
return ()
} else {
await$macro$4$macro$6 = tr.get
state = 2
}
()
case _ => throw new IllegalStateException()
}
} catch {
case throwable @ (_: Throwable) =>
result.complete(Failure[Int](throwable))
return ()
}
while$macro$8()
}
def apply(): Unit = StateMachine.this.apply(null)
}
val stateMachine = new StateMachine
new Task(implicit ec => Future { stateMachine.apply() })
new Task(_ => stateMachine$macro$2.result.future)
Note the last two lines. The call to stateMachine.apply()
is wrapped in a task that's never started, then discarded. The other task (on the last line) grabs the future from the promise that was created by new StateMachine
but that won't ever be completed because stateMachine.apply()
never actually runs.
@dsilva I ended up using https://github.com/monadless/monadless, which has the same functionality, but is general and works for any monad. Unfortunately, it's also unmaintained.
@adamw Thanks, that worked for me too. Looks like it rewrites code into calls to map and flatMap instead of state machines.
For a complete example of extending async/await, we have a version using Twitter Future
s here https://github.com/foursquare/twitter-util-async. I'm not sure how much it will help with the Task
implementation since it also involves a Promise
concept.
@SethTisue I could take a stab at the documentation update
I'm open to refactoring the internals of async to support abstractions like Task
.
I would be more concerned about unmerged PRs than issues without comment ;) Anyway, all of these seem rather abandoned. Would be nice of course to combine all these efforts into a single one, working in any Cats/Scalaz monad (like Kotlin's coroutines), but ... there's a finite amount of time ;)
Dsl.scala is not abandoned.
Dsl.scala is not only more general than scala.async
, but also more general than monadless
or any monad-based direct style DSL, because:
- Dsl.scala's built-in compiler plugins are name-based instead of symbol-based, can be used together with any type classes or even type class-free delimited continuations.
- Dsl.scala's built-in type class
Dsl
is more general thanMonad
.
The integration story for other awaitable types should get easier with scala/scala#8816.
The test case in that PR includes this integration for java.util.CompletableFuture
:
object CompletableFutureAwait {
def async[T](executor: Executor)(body: T): CompletableFuture[T] = macro impl
@compileTimeOnly("[async] `await` must be enclosed in `async`")
def await[T](completableFuture: CompletableFuture[T]): T = ???
def impl(c: blackbox.Context)(executor: c.Tree)(body: c.Tree): c.Tree = {
import c.universe._
val awaitSym = typeOf[CompletableFutureAwait.type].decl(TermName("await"))
def mark(t: DefDef): Tree = c.internal.markForAsyncTransform(c.internal.enclosingOwner, t, awaitSym, Map.empty)
val name = TypeName("stateMachine$$async_" + body.pos.line)
q"""
final class $name extends _root_.scala.tools.nsc.async.CompletableFutureStateMachine($executor) {
${mark(q"""override def apply(tr$$async: _root_.scala.util.Try[_root_.scala.AnyRef]) = ${body}""")}
}
new $name().start().asInstanceOf[${c.macroApplication.tpe}]
"""
}
}
abstract class CompletableFutureStateMachine(executor: Executor) extends AsyncStateMachine[CompletableFuture[AnyRef], Try[AnyRef]] with Runnable with BiConsumer[AnyRef, Throwable] {
Objects.requireNonNull(executor)
protected var result$async: CompletableFuture[AnyRef] = new CompletableFuture[AnyRef]();
// Adapters
def accept(value: AnyRef, throwable: Throwable): Unit = {
this(if (throwable != null) Failure(throwable) else Success(value))
}
def run(): Unit = {
apply(null)
}
// FSM translated method
def apply(tr$async: Try[AnyRef]): Unit
// Required methods
protected var state$async: Int = StateAssigner.Initial
protected def completeFailure(t: Throwable): Unit = result$async.completeExceptionally(t)
protected def completeSuccess(value: AnyRef): Unit = result$async.complete(value)
protected def onComplete(f: CompletableFuture[AnyRef]): Unit = f.whenCompleteAsync(this)
protected def getCompleted(f: CompletableFuture[AnyRef]): Try[AnyRef] = try {
val r = f.getNow(this)
if (r == this) null
else Success(r)
} catch {
case t: Throwable => Failure(t)
}
protected def tryGet(tr: Try[AnyRef]): AnyRef = tr match {
case Success(value) =>
value.asInstanceOf[AnyRef]
case Failure(throwable) =>
result$async.completeExceptionally(throwable)
this // sentinel value to indicate the dispatch loop should exit.
}
def start(): CompletableFuture[AnyRef] = {
executor.execute(this)
result$async
}
}
@retronym With that change, do you have in mind what an example integration without an executor would look like? It would be excellent if moving scala-async into the compiler can be done in away that makes it possible to integrate with task types like monix Task, scalaz Task, ZIO, and cats IO.
Here's one way to integrate a cats-eval like type.
package scala.tools.nsc
package async
import scala.language.experimental.macros
import scala.reflect.macros.blackbox
import scala.annotation.compileTimeOnly
import scala.tools.partest.async.AsyncStateMachine
object EvalAwait {
def evaluating[T](body: T): Eval[T] = macro impl
@compileTimeOnly("[async] `value` must be enclosed in `writing`")
def value[T](output: Eval[T]): T = ???
def impl(c: blackbox.Context)(body: c.Tree): c.Tree = {
import c.universe._
val awaitSym = typeOf[EvalAwait.type].decl(TermName("value"))
def mark(t: DefDef): Tree = c.internal.markForAsyncTransform(c.internal.enclosingOwner, t, awaitSym, Map.empty)
val name = TypeName("stateMachine$$async_" + body.pos.line)
q"""
final class $name extends _root_.scala.tools.nsc.async.EvalStateMachine {
${mark(q"""override def apply(tr$$async: _root_.scala.AnyRef) = ${body}""")}
}
new $name().start().asInstanceOf[${c.macroApplication.tpe}]
"""
}
}
abstract class Eval[A] {
def value: A
}
object Eval {
def now[T](t: T): Eval[T] = Now(t)
def later[T](t: => T): Eval[T] = new Later(() => t)
def always[T](t: => T): Eval[T] = new Always(() => t)
}
final case class Now[A](value: A) extends Eval[A] {
def memoize: Eval[A] = this
}
final class Later[A](f: () => A) extends Eval[A] {
private[this] var thunk: () => A = f
lazy val value: A = {
try thunk() finally thunk = null
}
}
final class Always[A](f: () => A) extends Eval[A] {
def value: A = f()
def memoize: Eval[A] = new Later(f)
}
abstract class EvalStateMachine extends AsyncStateMachine[Eval[AnyRef], AnyRef] {
var result$async: AnyRef = _
// FSM translated method
def apply(tr$async: AnyRef): Unit
// Required methods
protected var state$async: Int = 0
protected def completeFailure(t: Throwable): Unit = throw t
protected def completeSuccess(value: AnyRef): Unit = result$async = value
protected def onComplete(f: Eval[AnyRef]): Unit = throw new UnsupportedOperationException()
protected def getCompleted(f: Eval[AnyRef]): AnyRef = f.value
protected def tryGet(tr: AnyRef): AnyRef = tr
def start(): Eval[AnyRef] = {
apply(null)
Eval.now(result$async)
}
}
The following runs in a stack safe manner. It wouldn't integrate with the trampoline style evaluation in cat-eval flatMap etc, but all the composition of values in the evaluating
block is down without in a loop rather than recursion in the state machine.
import scala.tools.nsc.async._
import EvalAwait._
object Test {
def v1 = Eval.now("v1")
def v2 = Eval.now("v2")
def v3 = Eval.later("later")
def test: String = test0.value
def test0 = evaluating[String] {
var r1 = ""
var i = 0
while (i < 10000) {
r1 = value(v1) + value(v2) + value(v3)
i += 1
}
r1
}
}
For completeness, I'll also do a prototype integration with monix.Task
. A little extra indirection is needed to make the Task
multi-shot (retry-able) -- the state machine class can't be a Task
itself as we need a new instance of it instantiated for each call to Task.execute
.
Here's a possible monix-task integration: retronym/monad-ui@3b146ec
A new instance of the state machine is created for each execution of the Task
.
As an external library, without using the built in run loop in Monix, there doesn't seem to be an API for exploiting the fact that Now
constructor of Task
has an immediate value. The scala-async state machine can use this to avoid a context switch through the scheduler and instead immediately resume the state machine loop. That could be fixed by moving the DSL implementation into monix or by exposing additional API.
Review by @alexandru, perhaps?
Oh wow, looking good.
Loved scala-async
in the past, would be great to see it work for IO types.
Has a cats integration been considered? It wouldn't require a lot of knowledge from the plugin about the constructors of the effect, as long as only the interface of cats.Monad
is used. That, and probably cats.Traverse
to handle for comprehensions.
I definitely think that doing this more generally for cats/cats-effect types would make a lot of sense. Most of the machinery it looks like would only require Monad
, and the few pieces which require more than that would be fine with ConcurrentEffect
. I think it would require splitting the state machine into two such that the pieces which require tighter constraints are materialized only as necessary.
The async/await DSL has two selling points:
- less syntactic overhead than for-comprehensions (and way less than explicit flatMapping), which is particularly beneficial when you want to express control flow.
- a more efficient translation based on knowledge of the effect. This is based on the special case that the continuation as single-shot (called zero or one times, and not called in parallel). Rather than generating a separate lambda for each continuation, a single instance of a single state machine class acts as the continuation in all places.
- Prior to Scala 2.12, this used to be a win for generated code size. But now that lambdas classes are materialized at runtime with
LambdaMetafactory
, that's no longer a point of difference. - Immediate continuations (e.g. on a future whose value is already computed) can be optimized to run on the current thread without consuming stack frames.
- Captured state that is not referenced afterward is nulled out to reduce the retained memory size of the in-progress computation.
- Prior to Scala 2.12, this used to be a win for generated code size. But now that lambdas classes are materialized at runtime with
The difference between single-shot vs multi-shot effects is bit subtle. I thought we'd have trouble modelling monix.Task
because it needs to support retries, but this was possible by having the Task
instantiate a new state machine for each execution.
Take a look at https://github.com/retronym/monad-ui/ for some experiments I did with different DSLs atop the writer Monad.
So, does an async-like DSL still make sense for effects that are really multi-shot (like List
), or when then "shot-ity" is not known (like Monad
)? @pelotom seems to think so with effectful
monadically {
val v1 = value(List(1, 2 ,3))
val v2 = value(List(2, 3, 4))
v1 + v2
}
Scalac's -Xasync
phase could run the ANF transform over this but then have an alternative second half to the transform that emits continuations based on plain lambdas and flatMap calls. This translation could still be somewhat customizable (e.g. to use a typeclass vs assuming flatMap
is a member).
I don't think this DSL for List's looks appealing to use. It's best to focus on ConcurrentEffect
.
Here's a first cut at integration. Caveat: I haven't used cats-effect
before! https://github.com/retronym/monad-ui/pull/1/files
I only needed Effect
so far. One thing that is missing is a way determine if a value is immediately available without needing to go through a callback. The Future
implementation of async uses this to immediately move to the next iteration of the state machine loop (in a stack safe manner) and avoid either a) the overhead of going through the execution context or b) finding that an execution context continues on this thread be because it can detect an immediate value, and burning stack.
As a comparison, here's all the code necessary to support cats with Monadless:
https://github.com/monadless/monadless/blob/master/monadless-cats/src/main/scala/io/monadless/cats/MonadlessApplicative.scala
https://github.com/monadless/monadless/blob/master/monadless-cats/src/main/scala/io/monadless/cats/MonadlessMonad.scala
It can depend only on Applicative
(or use Monad
if necessary) and supports any implementation of the typeclasses.